Skip to main content

agent_office/services/mail/
mod.rs

1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11    #[error("Mailbox not found: {0}")]
12    MailboxNotFound(MailboxId),
13    
14    #[error("Agent not found: {0}")]
15    AgentNotFound(AgentId),
16    
17    #[error("Mail not found: {0}")]
18    MailNotFound(uuid::Uuid),
19    
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25    
26    #[error("Invalid agent name: {0}")]
27    InvalidAgentName(String),
28}
29
30pub type Result<T> = std::result::Result<T, MailError>;
31
32#[async_trait]
33pub trait MailService: Send + Sync {
34    // Agent operations
35    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
36    async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
37    async fn get_agent(&self, id: AgentId) -> Result<Agent>;
38    async fn list_agents(&self) -> Result<Vec<Agent>>;
39    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
40    async fn set_agent_session(&self, agent_id: AgentId, session_id: Option<String>) -> Result<Agent>;
41    
42    // Get agent by their mailbox ID (each agent has exactly one mailbox)
43    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
44    
45    // Get the single mailbox for an agent (auto-creates if doesn't exist)
46    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
47    
48    // Send mail from one agent to another
49    async fn send_agent_to_agent(
50        &self,
51        from_agent_id: AgentId,
52        to_agent_id: AgentId,
53        subject: impl Into<String> + Send,
54        body: impl Into<String> + Send,
55    ) -> Result<Mail>;
56    
57    // Get mail received by an agent's mailbox
58    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
59    
60    // Get mail sent by an agent's mailbox
61    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
62    
63    // Get recent mail for an agent (received in last N hours)
64    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
65    
66    // Mark mail as read
67    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
68    
69    // Mark mail as read by short ID (8-char prefix) - searches all mail system-wide
70    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
71    
72    // Check if agent has unread mail
73    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
74}
75
76pub struct MailServiceImpl<S: GraphStorage> {
77    storage: S,
78}
79
80impl<S: GraphStorage> MailServiceImpl<S> {
81    pub fn new(storage: S) -> Self {
82        Self { storage }
83    }
84
85    /// Helper to get mail by ID
86    async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
87        let node = self.storage.get_node(mail_id).await
88            .map_err(|e| match e {
89                StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
90                _ => MailError::Storage(e),
91            })?;
92        Mail::from_node(&node)
93            .ok_or(MailError::MailNotFound(mail_id))
94    }
95}
96
97#[async_trait]
98impl<S: GraphStorage> MailService for MailServiceImpl<S> {
99    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
100        let name = name.into();
101        
102        // Validate agent name: must be lowercase with no spaces
103        if name.chars().any(|c| c.is_uppercase() || c.is_whitespace()) {
104            return Err(MailError::InvalidAgentName(
105                format!("Agent name '{}' is invalid. Names must be lowercase with no spaces (e.g., 'my_agent', 'agent123').", name)
106            ));
107        }
108        
109        let agent = Agent::new(name);
110        let node = agent.to_node();
111        self.storage.create_node(&node).await?;
112        
113        Ok(agent)
114    }
115
116    async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
117        // First, get the agent to ensure they exist
118        let agent = self.get_agent(agent_id.clone()).await?;
119        let agent_node_id = string_to_node_id(&agent.id);
120        
121        // Get all mail in inbox and outbox to clear them
122        let inbox = self.get_mailbox_inbox(agent_node_id).await?;
123        let outbox = self.get_mailbox_outbox(agent_node_id).await?;
124        
125        // Delete all mail nodes
126        for mail in inbox.iter().chain(outbox.iter()) {
127            let _ = self.storage.delete_node(mail.id).await;
128        }
129        
130        // Finally, delete the agent node
131        self.storage.delete_node(agent_node_id).await?;
132        
133        Ok(())
134    }
135
136    async fn get_agent(&self, id: AgentId) -> Result<Agent> {
137        let node_id = string_to_node_id(&id);
138        let id_clone = id.clone();
139        let node = self.storage.get_node(node_id).await
140            .map_err(|e| match e {
141                StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
142                _ => MailError::Storage(e),
143            })?;
144        Agent::from_node(&node)
145            .ok_or(MailError::AgentNotFound(id))
146    }
147
148    async fn list_agents(&self) -> Result<Vec<Agent>> {
149        let query = GraphQuery::new().with_node_type("agent");
150        let nodes = self.storage.query_nodes(&query).await?;
151        let agents: Vec<Agent> = nodes.iter()
152            .filter_map(Agent::from_node)
153            .collect();
154        Ok(agents)
155    }
156
157    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
158        let mut agent = self.get_agent(agent_id).await?;
159        agent.status = status.into();
160        let node = agent.to_node();
161        self.storage.update_node(&node).await?;
162        Ok(agent)
163    }
164
165    async fn set_agent_session(&self, agent_id: AgentId, session_id: Option<String>) -> Result<Agent> {
166        let mut agent = self.get_agent(agent_id).await?;
167        agent.session_id = session_id;
168        let node = agent.to_node();
169        self.storage.update_node(&node).await?;
170        Ok(agent)
171    }
172
173    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
174        // The mailbox ID is the agent's node ID, so get the agent directly
175        let node = self.storage.get_node(mailbox_id).await
176            .map_err(|e| match e {
177                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
178                _ => MailError::Storage(e),
179            })?;
180        
181        Agent::from_node(&node)
182            .ok_or_else(|| MailError::InvalidOperation(
183                "Node exists but is not an agent".to_string()
184            ))
185    }
186
187    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
188        // Verify agent exists - the agent's node IS the mailbox
189        let agent = self.get_agent(agent_id.clone()).await?;
190        let node_id = string_to_node_id(&agent.id);
191        
192        // Create a mailbox representation from the agent
193        let mailbox = Mailbox {
194            id: node_id,
195            owner_id: agent.id,
196            name: "Mailbox".to_string(),
197            created_at: agent.created_at,
198        };
199        
200        Ok(mailbox)
201    }
202
203    async fn send_agent_to_agent(
204        &self,
205        from_agent_id: AgentId,
206        to_agent_id: AgentId,
207        subject: impl Into<String> + Send,
208        body: impl Into<String> + Send,
209    ) -> Result<Mail> {
210        // Verify both agents exist
211        let from_agent = self.get_agent(from_agent_id).await?;
212        let to_agent = self.get_agent(to_agent_id).await?;
213        
214        // Use agent node IDs as mailbox IDs
215        let from_mailbox_id = string_to_node_id(&from_agent.id);
216        let to_mailbox_id = string_to_node_id(&to_agent.id);
217        
218        // Create mail
219        let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
220        let node = mail.to_node();
221        
222        // Create mail node
223        self.storage.create_node(&node).await?;
224        
225        // Create edges for sender and receiver
226        let from_edge = Edge::new(
227            "sent_from",
228            from_mailbox_id,
229            mail.id,
230            Properties::new(),
231        );
232        self.storage.create_edge(&from_edge).await?;
233        
234        let to_edge = Edge::new(
235            "sent_to",
236            mail.id,
237            to_mailbox_id,
238            Properties::new(),
239        );
240        self.storage.create_edge(&to_edge).await?;
241        
242        Ok(mail)
243    }
244
245    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
246        // Verify mailbox (agent) exists
247        let _agent = self.storage.get_node(mailbox_id).await
248            .map_err(|e| match e {
249                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
250                _ => MailError::Storage(e),
251            })?;
252        
253        // Get all mail where there's an edge from mail -> mailbox (sent_to)
254        let incoming_edges = self.storage
255            .get_edges_to(mailbox_id, Some("sent_to"))
256            .await?;
257        
258        let mut mails = Vec::new();
259        for edge in incoming_edges {
260            if let Ok(mail) = self.get_mail(edge.from_node_id).await {
261                mails.push(mail);
262            }
263        }
264        
265        // Sort by creation date, newest first
266        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
267        
268        Ok(mails)
269    }
270
271    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
272        // Verify mailbox (agent) exists
273        let _agent = self.storage.get_node(mailbox_id).await
274            .map_err(|e| match e {
275                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
276                _ => MailError::Storage(e),
277            })?;
278        
279        // Get all mail where there's an edge from mailbox -> mail (sent_from)
280        let outgoing_edges = self.storage
281            .get_edges_from(mailbox_id, Some("sent_from"))
282            .await?;
283        
284        let mut mails = Vec::new();
285        for edge in outgoing_edges {
286            if let Ok(mail) = self.get_mail(edge.to_node_id).await {
287                mails.push(mail);
288            }
289        }
290        
291        // Sort by creation date, newest first
292        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
293        
294        Ok(mails)
295    }
296
297    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
298        let since = chrono::Utc::now() - chrono::Duration::hours(hours);
299        
300        // Get all mail in the inbox
301        let inbox = self.get_mailbox_inbox(mailbox_id).await?;
302        
303        // Filter to recent mail only
304        let recent: Vec<Mail> = inbox.into_iter()
305            .filter(|mail| mail.created_at >= since)
306            .take(limit)
307            .collect();
308        
309        Ok(recent)
310    }
311
312    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
313        let mut mail = self.get_mail(mail_id).await?;
314        mail.mark_as_read();
315        
316        let node = mail.to_node();
317        self.storage.update_node(&node).await?;
318        
319        Ok(mail)
320    }
321
322    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
323        // Query all mail nodes in the system
324        let query = GraphQuery::new().with_node_type("mail");
325        let nodes = self.storage.query_nodes(&query).await?;
326        
327        // Convert nodes to Mail and find matching short ID
328        let short_id_lower = short_id.to_lowercase();
329        let matching: Vec<_> = nodes.iter()
330            .filter_map(Mail::from_node)
331            .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
332            .collect();
333        
334        match matching.len() {
335            0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
336            1 => {
337                let mail_id = matching[0].id;
338                self.mark_mail_as_read(mail_id).await
339            }
340            _ => Err(MailError::InvalidOperation(
341                format!("Multiple mails match short ID '{}', please use full ID", short_id)
342            )),
343        }
344    }
345
346    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
347        // Get the agent's mailbox ID
348        let mailbox = self.get_agent_mailbox(agent_id).await?;
349        
350        // Get inbox and filter for unread
351        let inbox = self.get_mailbox_inbox(mailbox.id).await?;
352        let unread: Vec<Mail> = inbox.into_iter()
353            .filter(|mail| !mail.read)
354            .collect();
355        
356        let has_unread = !unread.is_empty();
357        
358        Ok((has_unread, unread))
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use crate::storage::memory::InMemoryStorage;
366
367    #[tokio::test]
368    async fn test_create_agent() {
369        let storage = InMemoryStorage::new();
370        let service = MailServiceImpl::new(storage);
371        
372        let agent = service.create_agent("test_agent").await.unwrap();
373        assert_eq!(agent.name, "test_agent");
374    }
375
376    #[tokio::test]
377    async fn test_create_agent_uppercase_fails() {
378        let storage = InMemoryStorage::new();
379        let service = MailServiceImpl::new(storage);
380        
381        let result = service.create_agent("TestAgent").await;
382        assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
383    }
384
385    #[tokio::test]
386    async fn test_create_agent_spaces_fails() {
387        let storage = InMemoryStorage::new();
388        let service = MailServiceImpl::new(storage);
389        
390        let result = service.create_agent("test agent").await;
391        assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
392    }
393
394    #[tokio::test]
395    async fn test_create_agent_auto_creates_mailbox() {
396        let storage = InMemoryStorage::new();
397        let service = MailServiceImpl::new(storage);
398        
399        let agent = service.create_agent("agent").await.unwrap();
400        
401        // Should be able to get the mailbox for the agent
402        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
403        assert_eq!(mailbox.owner_id, agent.id);
404        
405        // The mailbox ID should be the same as the agent's node ID
406        let expected_mailbox_id = string_to_node_id(&agent.id);
407        assert_eq!(mailbox.id, expected_mailbox_id);
408    }
409
410    #[tokio::test]
411    async fn test_send_and_receive_mail() {
412        let storage = InMemoryStorage::new();
413        let service = MailServiceImpl::new(storage);
414        
415        // Create two agents - mailboxes are auto-created
416        let agent1 = service.create_agent("sender").await.unwrap();
417        let agent2 = service.create_agent("receiver").await.unwrap();
418        
419        // Send mail directly between agents
420        let mail = service
421            .send_agent_to_agent(
422                agent1.id.clone(),
423                agent2.id.clone(),
424                "Hello",
425                "This is a test message",
426            )
427            .await
428            .unwrap();
429        
430        assert_eq!(mail.subject, "Hello");
431        assert_eq!(mail.body, "This is a test message");
432        assert!(!mail.read);
433        
434        // Check receiver's inbox
435        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
436        assert_eq!(inbox.len(), 1);
437        assert_eq!(inbox[0].subject, "Hello");
438        
439        // Check sender's outbox
440        let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
441        assert_eq!(outbox.len(), 1);
442        assert_eq!(outbox[0].subject, "Hello");
443    }
444
445    #[tokio::test]
446    async fn test_mark_mail_as_read() {
447        let storage = InMemoryStorage::new();
448        let service = MailServiceImpl::new(storage);
449        
450        let agent1 = service.create_agent("sender").await.unwrap();
451        let agent2 = service.create_agent("receiver").await.unwrap();
452        
453        let mail = service
454            .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
455            .await
456            .unwrap();
457        
458        assert!(!mail.read);
459        
460        let updated = service.mark_mail_as_read(mail.id).await.unwrap();
461        assert!(updated.read);
462    }
463
464    #[tokio::test]
465    async fn test_check_unread_mail() {
466        let storage = InMemoryStorage::new();
467        let service = MailServiceImpl::new(storage);
468        
469        let agent1 = service.create_agent("sender").await.unwrap();
470        let agent2 = service.create_agent("receiver").await.unwrap();
471        
472        // Initially no unread mail
473        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
474        assert!(!has_unread);
475        assert!(unread.is_empty());
476        
477        // Send mail
478        service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
479        
480        // Now there is unread mail
481        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
482        assert!(has_unread);
483        assert_eq!(unread.len(), 1);
484        
485        // Mark as read
486        let mail_id = unread[0].id;
487        service.mark_mail_as_read(mail_id).await.unwrap();
488        
489        // No more unread
490        let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
491        assert!(!has_unread);
492        assert!(unread.is_empty());
493    }
494
495    #[tokio::test]
496    async fn test_get_recent_mail() {
497        let storage = InMemoryStorage::new();
498        let service = MailServiceImpl::new(storage);
499        
500        let agent1 = service.create_agent("sender").await.unwrap();
501        let agent2 = service.create_agent("receiver").await.unwrap();
502        
503        // Send some mail
504        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
505        
506        // Get recent mail (last 24 hours)
507        let recent = service.get_recent_mail(
508            string_to_node_id(&agent2.id),
509            24,
510            10
511        ).await.unwrap();
512        
513        assert_eq!(recent.len(), 1);
514        assert_eq!(recent[0].subject, "Recent");
515    }
516
517    #[tokio::test]
518    async fn test_get_nonexistent_agent() {
519        let storage = InMemoryStorage::new();
520        let service = MailServiceImpl::new(storage);
521        
522        let fake_id = "nonexistent_agent".to_string();
523        let result = service.get_agent(fake_id).await;
524        
525        assert!(matches!(result, Err(MailError::AgentNotFound(_))));
526    }
527
528    #[tokio::test]
529    async fn test_get_agent_by_mailbox() {
530        let storage = InMemoryStorage::new();
531        let service = MailServiceImpl::new(storage);
532        
533        let agent = service.create_agent("test_agent").await.unwrap();
534        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
535        
536        // Get agent by mailbox
537        let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
538        assert_eq!(found_agent.id, agent.id);
539        assert_eq!(found_agent.name, agent.name);
540    }
541
542    #[tokio::test]
543    async fn test_list_agents() {
544        let storage = InMemoryStorage::new();
545        let service = MailServiceImpl::new(storage);
546        
547        // Create multiple agents
548        service.create_agent("agent_1").await.unwrap();
549        service.create_agent("agent_2").await.unwrap();
550        service.create_agent("agent_3").await.unwrap();
551        
552        let agents = service.list_agents().await.unwrap();
553        assert_eq!(agents.len(), 3);
554    }
555
556    #[tokio::test]
557    async fn test_set_agent_status() {
558        let storage = InMemoryStorage::new();
559        let service = MailServiceImpl::new(storage);
560        
561        let agent = service.create_agent("test_agent").await.unwrap();
562        assert_eq!(agent.status, "offline");
563        
564        let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
565        assert_eq!(updated.status, "online");
566        
567        // Verify persisted
568        let retrieved = service.get_agent(agent.id).await.unwrap();
569        assert_eq!(retrieved.status, "online");
570    }
571
572    #[tokio::test]
573    async fn test_multiple_mails_sorting() {
574        let storage = InMemoryStorage::new();
575        let service = MailServiceImpl::new(storage);
576        
577        let agent1 = service.create_agent("sender").await.unwrap();
578        let agent2 = service.create_agent("receiver").await.unwrap();
579        
580        // Send multiple mails
581        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
582        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
583        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
584        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
585        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
586        
587        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
588        assert_eq!(inbox.len(), 3);
589        // Should be sorted newest first
590        assert_eq!(inbox[0].subject, "Third");
591        assert_eq!(inbox[1].subject, "Second");
592        assert_eq!(inbox[2].subject, "First");
593    }
594}