Skip to main content

agent_office/services/mail/
mod.rs

1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11    #[error("Mailbox not found: {0}")]
12    MailboxNotFound(MailboxId),
13    
14    #[error("Agent not found: {0}")]
15    AgentNotFound(AgentId),
16    
17    #[error("Mail not found: {0}")]
18    MailNotFound(uuid::Uuid),
19    
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25    
26    #[error("Invalid agent name: {0}")]
27    InvalidAgentName(String),
28}
29
30pub type Result<T> = std::result::Result<T, MailError>;
31
32#[async_trait]
33pub trait MailService: Send + Sync {
34    // Agent operations
35    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
36    async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
37    async fn get_agent(&self, id: AgentId) -> Result<Agent>;
38    async fn list_agents(&self) -> Result<Vec<Agent>>;
39    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
40    
41    // Get agent by their mailbox ID (each agent has exactly one mailbox)
42    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
43    
44    // Get the single mailbox for an agent (auto-creates if doesn't exist)
45    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
46    
47    // Send mail from one agent to another
48    async fn send_agent_to_agent(
49        &self,
50        from_agent_id: AgentId,
51        to_agent_id: AgentId,
52        subject: impl Into<String> + Send,
53        body: impl Into<String> + Send,
54    ) -> Result<Mail>;
55    
56    // Get mail received by an agent's mailbox
57    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
58    
59    // Get mail sent by an agent's mailbox
60    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
61    
62    // Get recent mail for an agent (received in last N hours)
63    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
64    
65    // Mark mail as read
66    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
67    
68    // Mark mail as read by short ID (8-char prefix) - searches all mail system-wide
69    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
70    
71    // Check if agent has unread mail
72    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
73}
74
75pub struct MailServiceImpl<S: GraphStorage> {
76    storage: S,
77}
78
79impl<S: GraphStorage> MailServiceImpl<S> {
80    pub fn new(storage: S) -> Self {
81        Self { storage }
82    }
83
84    /// Helper to get mail by ID
85    async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
86        let node = self.storage.get_node(mail_id).await
87            .map_err(|e| match e {
88                StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
89                _ => MailError::Storage(e),
90            })?;
91        Mail::from_node(&node)
92            .ok_or(MailError::MailNotFound(mail_id))
93    }
94}
95
96#[async_trait]
97impl<S: GraphStorage> MailService for MailServiceImpl<S> {
98    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
99        let name = name.into();
100        
101        // Validate agent name: must be lowercase with no spaces
102        if name.chars().any(|c| c.is_uppercase() || c.is_whitespace()) {
103            return Err(MailError::InvalidAgentName(
104                format!("Agent name '{}' is invalid. Names must be lowercase with no spaces (e.g., 'my_agent', 'agent123').", name)
105            ));
106        }
107        
108        let agent = Agent::new(name);
109        let node = agent.to_node();
110        self.storage.create_node(&node).await?;
111        
112        Ok(agent)
113    }
114
115    async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
116        // First, get the agent to ensure they exist
117        let agent = self.get_agent(agent_id.clone()).await?;
118        let agent_node_id = string_to_node_id(&agent.id);
119        
120        // Get all mail in inbox and outbox to clear them
121        let inbox = self.get_mailbox_inbox(agent_node_id).await?;
122        let outbox = self.get_mailbox_outbox(agent_node_id).await?;
123        
124        // Delete all mail nodes
125        for mail in inbox.iter().chain(outbox.iter()) {
126            let _ = self.storage.delete_node(mail.id).await;
127        }
128        
129        // Finally, delete the agent node
130        self.storage.delete_node(agent_node_id).await?;
131        
132        Ok(())
133    }
134
135    async fn get_agent(&self, id: AgentId) -> Result<Agent> {
136        let node_id = string_to_node_id(&id);
137        let id_clone = id.clone();
138        let node = self.storage.get_node(node_id).await
139            .map_err(|e| match e {
140                StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
141                _ => MailError::Storage(e),
142            })?;
143        Agent::from_node(&node)
144            .ok_or(MailError::AgentNotFound(id))
145    }
146
147    async fn list_agents(&self) -> Result<Vec<Agent>> {
148        let query = GraphQuery::new().with_node_type("agent");
149        let nodes = self.storage.query_nodes(&query).await?;
150        let agents: Vec<Agent> = nodes.iter()
151            .filter_map(Agent::from_node)
152            .collect();
153        Ok(agents)
154    }
155
156    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
157        let mut agent = self.get_agent(agent_id).await?;
158        agent.status = status.into();
159        let node = agent.to_node();
160        self.storage.update_node(&node).await?;
161        Ok(agent)
162    }
163
164    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
165        // The mailbox ID is the agent's node ID, so get the agent directly
166        let node = self.storage.get_node(mailbox_id).await
167            .map_err(|e| match e {
168                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
169                _ => MailError::Storage(e),
170            })?;
171        
172        Agent::from_node(&node)
173            .ok_or_else(|| MailError::InvalidOperation(
174                "Node exists but is not an agent".to_string()
175            ))
176    }
177
178    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
179        // Verify agent exists - the agent's node IS the mailbox
180        let agent = self.get_agent(agent_id.clone()).await?;
181        let node_id = string_to_node_id(&agent.id);
182        
183        // Create a mailbox representation from the agent
184        let mailbox = Mailbox {
185            id: node_id,
186            owner_id: agent.id,
187            name: "Mailbox".to_string(),
188            created_at: agent.created_at,
189        };
190        
191        Ok(mailbox)
192    }
193
194    async fn send_agent_to_agent(
195        &self,
196        from_agent_id: AgentId,
197        to_agent_id: AgentId,
198        subject: impl Into<String> + Send,
199        body: impl Into<String> + Send,
200    ) -> Result<Mail> {
201        // Verify both agents exist
202        let from_agent = self.get_agent(from_agent_id).await?;
203        let to_agent = self.get_agent(to_agent_id).await?;
204        
205        // Use agent node IDs as mailbox IDs
206        let from_mailbox_id = string_to_node_id(&from_agent.id);
207        let to_mailbox_id = string_to_node_id(&to_agent.id);
208        
209        // Create mail
210        let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
211        let node = mail.to_node();
212        
213        // Create mail node
214        self.storage.create_node(&node).await?;
215        
216        // Create edges for sender and receiver
217        let from_edge = Edge::new(
218            "sent_from",
219            from_mailbox_id,
220            mail.id,
221            Properties::new(),
222        );
223        self.storage.create_edge(&from_edge).await?;
224        
225        let to_edge = Edge::new(
226            "sent_to",
227            mail.id,
228            to_mailbox_id,
229            Properties::new(),
230        );
231        self.storage.create_edge(&to_edge).await?;
232        
233        Ok(mail)
234    }
235
236    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
237        // Verify mailbox (agent) exists
238        let _agent = self.storage.get_node(mailbox_id).await
239            .map_err(|e| match e {
240                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
241                _ => MailError::Storage(e),
242            })?;
243        
244        // Get all mail where there's an edge from mail -> mailbox (sent_to)
245        let incoming_edges = self.storage
246            .get_edges_to(mailbox_id, Some("sent_to"))
247            .await?;
248        
249        let mut mails = Vec::new();
250        for edge in incoming_edges {
251            if let Ok(mail) = self.get_mail(edge.from_node_id).await {
252                mails.push(mail);
253            }
254        }
255        
256        // Sort by creation date, newest first
257        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
258        
259        Ok(mails)
260    }
261
262    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
263        // Verify mailbox (agent) exists
264        let _agent = self.storage.get_node(mailbox_id).await
265            .map_err(|e| match e {
266                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
267                _ => MailError::Storage(e),
268            })?;
269        
270        // Get all mail where there's an edge from mailbox -> mail (sent_from)
271        let outgoing_edges = self.storage
272            .get_edges_from(mailbox_id, Some("sent_from"))
273            .await?;
274        
275        let mut mails = Vec::new();
276        for edge in outgoing_edges {
277            if let Ok(mail) = self.get_mail(edge.to_node_id).await {
278                mails.push(mail);
279            }
280        }
281        
282        // Sort by creation date, newest first
283        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
284        
285        Ok(mails)
286    }
287
288    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
289        let since = chrono::Utc::now() - chrono::Duration::hours(hours);
290        
291        // Get all mail in the inbox
292        let inbox = self.get_mailbox_inbox(mailbox_id).await?;
293        
294        // Filter to recent mail only
295        let recent: Vec<Mail> = inbox.into_iter()
296            .filter(|mail| mail.created_at >= since)
297            .take(limit)
298            .collect();
299        
300        Ok(recent)
301    }
302
303    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
304        let mut mail = self.get_mail(mail_id).await?;
305        mail.mark_as_read();
306        
307        let node = mail.to_node();
308        self.storage.update_node(&node).await?;
309        
310        Ok(mail)
311    }
312
313    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
314        // Query all mail nodes in the system
315        let query = GraphQuery::new().with_node_type("mail");
316        let nodes = self.storage.query_nodes(&query).await?;
317        
318        // Convert nodes to Mail and find matching short ID
319        let short_id_lower = short_id.to_lowercase();
320        let matching: Vec<_> = nodes.iter()
321            .filter_map(Mail::from_node)
322            .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
323            .collect();
324        
325        match matching.len() {
326            0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
327            1 => {
328                let mail_id = matching[0].id;
329                self.mark_mail_as_read(mail_id).await
330            }
331            _ => Err(MailError::InvalidOperation(
332                format!("Multiple mails match short ID '{}', please use full ID", short_id)
333            )),
334        }
335    }
336
337    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
338        // Get the agent's mailbox ID
339        let mailbox = self.get_agent_mailbox(agent_id).await?;
340        
341        // Get inbox and filter for unread
342        let inbox = self.get_mailbox_inbox(mailbox.id).await?;
343        let unread: Vec<Mail> = inbox.into_iter()
344            .filter(|mail| !mail.read)
345            .collect();
346        
347        let has_unread = !unread.is_empty();
348        
349        Ok((has_unread, unread))
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356    use crate::storage::memory::InMemoryStorage;
357
358    #[tokio::test]
359    async fn test_create_agent() {
360        let storage = InMemoryStorage::new();
361        let service = MailServiceImpl::new(storage);
362        
363        let agent = service.create_agent("test_agent").await.unwrap();
364        assert_eq!(agent.name, "test_agent");
365    }
366
367    #[tokio::test]
368    async fn test_create_agent_uppercase_fails() {
369        let storage = InMemoryStorage::new();
370        let service = MailServiceImpl::new(storage);
371        
372        let result = service.create_agent("TestAgent").await;
373        assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
374    }
375
376    #[tokio::test]
377    async fn test_create_agent_spaces_fails() {
378        let storage = InMemoryStorage::new();
379        let service = MailServiceImpl::new(storage);
380        
381        let result = service.create_agent("test agent").await;
382        assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
383    }
384
385    #[tokio::test]
386    async fn test_create_agent_auto_creates_mailbox() {
387        let storage = InMemoryStorage::new();
388        let service = MailServiceImpl::new(storage);
389        
390        let agent = service.create_agent("agent").await.unwrap();
391        
392        // Should be able to get the mailbox for the agent
393        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
394        assert_eq!(mailbox.owner_id, agent.id);
395        
396        // The mailbox ID should be the same as the agent's node ID
397        let expected_mailbox_id = string_to_node_id(&agent.id);
398        assert_eq!(mailbox.id, expected_mailbox_id);
399    }
400
401    #[tokio::test]
402    async fn test_send_and_receive_mail() {
403        let storage = InMemoryStorage::new();
404        let service = MailServiceImpl::new(storage);
405        
406        // Create two agents - mailboxes are auto-created
407        let agent1 = service.create_agent("sender").await.unwrap();
408        let agent2 = service.create_agent("receiver").await.unwrap();
409        
410        // Send mail directly between agents
411        let mail = service
412            .send_agent_to_agent(
413                agent1.id.clone(),
414                agent2.id.clone(),
415                "Hello",
416                "This is a test message",
417            )
418            .await
419            .unwrap();
420        
421        assert_eq!(mail.subject, "Hello");
422        assert_eq!(mail.body, "This is a test message");
423        assert!(!mail.read);
424        
425        // Check receiver's inbox
426        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
427        assert_eq!(inbox.len(), 1);
428        assert_eq!(inbox[0].subject, "Hello");
429        
430        // Check sender's outbox
431        let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
432        assert_eq!(outbox.len(), 1);
433        assert_eq!(outbox[0].subject, "Hello");
434    }
435
436    #[tokio::test]
437    async fn test_mark_mail_as_read() {
438        let storage = InMemoryStorage::new();
439        let service = MailServiceImpl::new(storage);
440        
441        let agent1 = service.create_agent("sender").await.unwrap();
442        let agent2 = service.create_agent("receiver").await.unwrap();
443        
444        let mail = service
445            .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
446            .await
447            .unwrap();
448        
449        assert!(!mail.read);
450        
451        let updated = service.mark_mail_as_read(mail.id).await.unwrap();
452        assert!(updated.read);
453    }
454
455    #[tokio::test]
456    async fn test_check_unread_mail() {
457        let storage = InMemoryStorage::new();
458        let service = MailServiceImpl::new(storage);
459        
460        let agent1 = service.create_agent("sender").await.unwrap();
461        let agent2 = service.create_agent("receiver").await.unwrap();
462        
463        // Initially no unread mail
464        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
465        assert!(!has_unread);
466        assert!(unread.is_empty());
467        
468        // Send mail
469        service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
470        
471        // Now there is unread mail
472        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
473        assert!(has_unread);
474        assert_eq!(unread.len(), 1);
475        
476        // Mark as read
477        let mail_id = unread[0].id;
478        service.mark_mail_as_read(mail_id).await.unwrap();
479        
480        // No more unread
481        let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
482        assert!(!has_unread);
483        assert!(unread.is_empty());
484    }
485
486    #[tokio::test]
487    async fn test_get_recent_mail() {
488        let storage = InMemoryStorage::new();
489        let service = MailServiceImpl::new(storage);
490        
491        let agent1 = service.create_agent("sender").await.unwrap();
492        let agent2 = service.create_agent("receiver").await.unwrap();
493        
494        // Send some mail
495        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
496        
497        // Get recent mail (last 24 hours)
498        let recent = service.get_recent_mail(
499            string_to_node_id(&agent2.id),
500            24,
501            10
502        ).await.unwrap();
503        
504        assert_eq!(recent.len(), 1);
505        assert_eq!(recent[0].subject, "Recent");
506    }
507
508    #[tokio::test]
509    async fn test_get_nonexistent_agent() {
510        let storage = InMemoryStorage::new();
511        let service = MailServiceImpl::new(storage);
512        
513        let fake_id = "nonexistent_agent".to_string();
514        let result = service.get_agent(fake_id).await;
515        
516        assert!(matches!(result, Err(MailError::AgentNotFound(_))));
517    }
518
519    #[tokio::test]
520    async fn test_get_agent_by_mailbox() {
521        let storage = InMemoryStorage::new();
522        let service = MailServiceImpl::new(storage);
523        
524        let agent = service.create_agent("test_agent").await.unwrap();
525        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
526        
527        // Get agent by mailbox
528        let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
529        assert_eq!(found_agent.id, agent.id);
530        assert_eq!(found_agent.name, agent.name);
531    }
532
533    #[tokio::test]
534    async fn test_list_agents() {
535        let storage = InMemoryStorage::new();
536        let service = MailServiceImpl::new(storage);
537        
538        // Create multiple agents
539        service.create_agent("agent_1").await.unwrap();
540        service.create_agent("agent_2").await.unwrap();
541        service.create_agent("agent_3").await.unwrap();
542        
543        let agents = service.list_agents().await.unwrap();
544        assert_eq!(agents.len(), 3);
545    }
546
547    #[tokio::test]
548    async fn test_set_agent_status() {
549        let storage = InMemoryStorage::new();
550        let service = MailServiceImpl::new(storage);
551        
552        let agent = service.create_agent("test_agent").await.unwrap();
553        assert_eq!(agent.status, "offline");
554        
555        let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
556        assert_eq!(updated.status, "online");
557        
558        // Verify persisted
559        let retrieved = service.get_agent(agent.id).await.unwrap();
560        assert_eq!(retrieved.status, "online");
561    }
562
563    #[tokio::test]
564    async fn test_multiple_mails_sorting() {
565        let storage = InMemoryStorage::new();
566        let service = MailServiceImpl::new(storage);
567        
568        let agent1 = service.create_agent("sender").await.unwrap();
569        let agent2 = service.create_agent("receiver").await.unwrap();
570        
571        // Send multiple mails
572        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
573        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
574        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
575        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
576        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
577        
578        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
579        assert_eq!(inbox.len(), 3);
580        // Should be sorted newest first
581        assert_eq!(inbox[0].subject, "Third");
582        assert_eq!(inbox[1].subject, "Second");
583        assert_eq!(inbox[2].subject, "First");
584    }
585}