Skip to main content

agent_office/services/mail/
mod.rs

1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11    #[error("Mailbox not found: {0}")]
12    MailboxNotFound(MailboxId),
13    
14    #[error("Agent not found: {0}")]
15    AgentNotFound(AgentId),
16    
17    #[error("Mail not found: {0}")]
18    MailNotFound(uuid::Uuid),
19    
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31    // Agent operations
32    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33    async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
34    async fn get_agent(&self, id: AgentId) -> Result<Agent>;
35    async fn list_agents(&self) -> Result<Vec<Agent>>;
36    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
37    
38    // Get agent by their mailbox ID (each agent has exactly one mailbox)
39    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
40    
41    // Get the single mailbox for an agent (auto-creates if doesn't exist)
42    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
43    
44    // Send mail from one agent to another
45    async fn send_agent_to_agent(
46        &self,
47        from_agent_id: AgentId,
48        to_agent_id: AgentId,
49        subject: impl Into<String> + Send,
50        body: impl Into<String> + Send,
51    ) -> Result<Mail>;
52    
53    // Get mail received by an agent's mailbox
54    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
55    
56    // Get mail sent by an agent's mailbox
57    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
58    
59    // Get recent mail for an agent (received in last N hours)
60    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
61    
62    // Mark mail as read
63    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
64    
65    // Mark mail as read by short ID (8-char prefix) - searches all mail system-wide
66    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
67    
68    // Check if agent has unread mail
69    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
70}
71
72pub struct MailServiceImpl<S: GraphStorage> {
73    storage: S,
74}
75
76impl<S: GraphStorage> MailServiceImpl<S> {
77    pub fn new(storage: S) -> Self {
78        Self { storage }
79    }
80
81    /// Helper to get mail by ID
82    async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
83        let node = self.storage.get_node(mail_id).await
84            .map_err(|e| match e {
85                StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
86                _ => MailError::Storage(e),
87            })?;
88        Mail::from_node(&node)
89            .ok_or(MailError::MailNotFound(mail_id))
90    }
91}
92
93#[async_trait]
94impl<S: GraphStorage> MailService for MailServiceImpl<S> {
95    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
96        let agent = Agent::new(name);
97        let node = agent.to_node();
98        self.storage.create_node(&node).await?;
99        
100        Ok(agent)
101    }
102
103    async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
104        // First, get the agent to ensure they exist
105        let agent = self.get_agent(agent_id.clone()).await?;
106        let agent_node_id = string_to_node_id(&agent.id);
107        
108        // Get all mail in inbox and outbox to clear them
109        let inbox = self.get_mailbox_inbox(agent_node_id).await?;
110        let outbox = self.get_mailbox_outbox(agent_node_id).await?;
111        
112        // Delete all mail nodes
113        for mail in inbox.iter().chain(outbox.iter()) {
114            let _ = self.storage.delete_node(mail.id).await;
115        }
116        
117        // Finally, delete the agent node
118        self.storage.delete_node(agent_node_id).await?;
119        
120        Ok(())
121    }
122
123    async fn get_agent(&self, id: AgentId) -> Result<Agent> {
124        let node_id = string_to_node_id(&id);
125        let id_clone = id.clone();
126        let node = self.storage.get_node(node_id).await
127            .map_err(|e| match e {
128                StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
129                _ => MailError::Storage(e),
130            })?;
131        Agent::from_node(&node)
132            .ok_or(MailError::AgentNotFound(id))
133    }
134
135    async fn list_agents(&self) -> Result<Vec<Agent>> {
136        let query = GraphQuery::new().with_node_type("agent");
137        let nodes = self.storage.query_nodes(&query).await?;
138        let agents: Vec<Agent> = nodes.iter()
139            .filter_map(Agent::from_node)
140            .collect();
141        Ok(agents)
142    }
143
144    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
145        let mut agent = self.get_agent(agent_id).await?;
146        agent.status = status.into();
147        let node = agent.to_node();
148        self.storage.update_node(&node).await?;
149        Ok(agent)
150    }
151
152    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
153        // The mailbox ID is the agent's node ID, so get the agent directly
154        let node = self.storage.get_node(mailbox_id).await
155            .map_err(|e| match e {
156                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
157                _ => MailError::Storage(e),
158            })?;
159        
160        Agent::from_node(&node)
161            .ok_or_else(|| MailError::InvalidOperation(
162                "Node exists but is not an agent".to_string()
163            ))
164    }
165
166    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
167        // Verify agent exists - the agent's node IS the mailbox
168        let agent = self.get_agent(agent_id.clone()).await?;
169        let node_id = string_to_node_id(&agent.id);
170        
171        // Create a mailbox representation from the agent
172        let mailbox = Mailbox {
173            id: node_id,
174            owner_id: agent.id,
175            name: "Mailbox".to_string(),
176            created_at: agent.created_at,
177        };
178        
179        Ok(mailbox)
180    }
181
182    async fn send_agent_to_agent(
183        &self,
184        from_agent_id: AgentId,
185        to_agent_id: AgentId,
186        subject: impl Into<String> + Send,
187        body: impl Into<String> + Send,
188    ) -> Result<Mail> {
189        // Verify both agents exist
190        let from_agent = self.get_agent(from_agent_id).await?;
191        let to_agent = self.get_agent(to_agent_id).await?;
192        
193        // Use agent node IDs as mailbox IDs
194        let from_mailbox_id = string_to_node_id(&from_agent.id);
195        let to_mailbox_id = string_to_node_id(&to_agent.id);
196        
197        // Create mail
198        let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
199        let node = mail.to_node();
200        
201        // Create mail node
202        self.storage.create_node(&node).await?;
203        
204        // Create edges for sender and receiver
205        let from_edge = Edge::new(
206            "sent_from",
207            from_mailbox_id,
208            mail.id,
209            Properties::new(),
210        );
211        self.storage.create_edge(&from_edge).await?;
212        
213        let to_edge = Edge::new(
214            "sent_to",
215            mail.id,
216            to_mailbox_id,
217            Properties::new(),
218        );
219        self.storage.create_edge(&to_edge).await?;
220        
221        Ok(mail)
222    }
223
224    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
225        // Verify mailbox (agent) exists
226        let _agent = self.storage.get_node(mailbox_id).await
227            .map_err(|e| match e {
228                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
229                _ => MailError::Storage(e),
230            })?;
231        
232        // Get all mail where there's an edge from mail -> mailbox (sent_to)
233        let incoming_edges = self.storage
234            .get_edges_to(mailbox_id, Some("sent_to"))
235            .await?;
236        
237        let mut mails = Vec::new();
238        for edge in incoming_edges {
239            if let Ok(mail) = self.get_mail(edge.from_node_id).await {
240                mails.push(mail);
241            }
242        }
243        
244        // Sort by creation date, newest first
245        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
246        
247        Ok(mails)
248    }
249
250    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
251        // Verify mailbox (agent) exists
252        let _agent = self.storage.get_node(mailbox_id).await
253            .map_err(|e| match e {
254                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
255                _ => MailError::Storage(e),
256            })?;
257        
258        // Get all mail where there's an edge from mailbox -> mail (sent_from)
259        let outgoing_edges = self.storage
260            .get_edges_from(mailbox_id, Some("sent_from"))
261            .await?;
262        
263        let mut mails = Vec::new();
264        for edge in outgoing_edges {
265            if let Ok(mail) = self.get_mail(edge.to_node_id).await {
266                mails.push(mail);
267            }
268        }
269        
270        // Sort by creation date, newest first
271        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
272        
273        Ok(mails)
274    }
275
276    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
277        let since = chrono::Utc::now() - chrono::Duration::hours(hours);
278        
279        // Get all mail in the inbox
280        let inbox = self.get_mailbox_inbox(mailbox_id).await?;
281        
282        // Filter to recent mail only
283        let recent: Vec<Mail> = inbox.into_iter()
284            .filter(|mail| mail.created_at >= since)
285            .take(limit)
286            .collect();
287        
288        Ok(recent)
289    }
290
291    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
292        let mut mail = self.get_mail(mail_id).await?;
293        mail.mark_as_read();
294        
295        let node = mail.to_node();
296        self.storage.update_node(&node).await?;
297        
298        Ok(mail)
299    }
300
301    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
302        // Query all mail nodes in the system
303        let query = GraphQuery::new().with_node_type("mail");
304        let nodes = self.storage.query_nodes(&query).await?;
305        
306        // Convert nodes to Mail and find matching short ID
307        let short_id_lower = short_id.to_lowercase();
308        let matching: Vec<_> = nodes.iter()
309            .filter_map(Mail::from_node)
310            .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
311            .collect();
312        
313        match matching.len() {
314            0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
315            1 => {
316                let mail_id = matching[0].id;
317                self.mark_mail_as_read(mail_id).await
318            }
319            _ => Err(MailError::InvalidOperation(
320                format!("Multiple mails match short ID '{}', please use full ID", short_id)
321            )),
322        }
323    }
324
325    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
326        // Get the agent's mailbox ID
327        let mailbox = self.get_agent_mailbox(agent_id).await?;
328        
329        // Get inbox and filter for unread
330        let inbox = self.get_mailbox_inbox(mailbox.id).await?;
331        let unread: Vec<Mail> = inbox.into_iter()
332            .filter(|mail| !mail.read)
333            .collect();
334        
335        let has_unread = !unread.is_empty();
336        
337        Ok((has_unread, unread))
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::storage::memory::InMemoryStorage;
345
346    #[tokio::test]
347    async fn test_create_agent() {
348        let storage = InMemoryStorage::new();
349        let service = MailServiceImpl::new(storage);
350        
351        let agent = service.create_agent("Test Agent").await.unwrap();
352        assert_eq!(agent.name, "Test Agent");
353    }
354
355    #[tokio::test]
356    async fn test_create_agent_auto_creates_mailbox() {
357        let storage = InMemoryStorage::new();
358        let service = MailServiceImpl::new(storage);
359        
360        let agent = service.create_agent("Agent").await.unwrap();
361        
362        // Should be able to get the mailbox for the agent
363        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
364        assert_eq!(mailbox.owner_id, agent.id);
365        
366        // The mailbox ID should be the same as the agent's node ID
367        let expected_mailbox_id = string_to_node_id(&agent.id);
368        assert_eq!(mailbox.id, expected_mailbox_id);
369    }
370
371    #[tokio::test]
372    async fn test_send_and_receive_mail() {
373        let storage = InMemoryStorage::new();
374        let service = MailServiceImpl::new(storage);
375        
376        // Create two agents - mailboxes are auto-created
377        let agent1 = service.create_agent("Sender").await.unwrap();
378        let agent2 = service.create_agent("Receiver").await.unwrap();
379        
380        // Send mail directly between agents
381        let mail = service
382            .send_agent_to_agent(
383                agent1.id.clone(),
384                agent2.id.clone(),
385                "Hello",
386                "This is a test message",
387            )
388            .await
389            .unwrap();
390        
391        assert_eq!(mail.subject, "Hello");
392        assert_eq!(mail.body, "This is a test message");
393        assert!(!mail.read);
394        
395        // Check receiver's inbox
396        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
397        assert_eq!(inbox.len(), 1);
398        assert_eq!(inbox[0].subject, "Hello");
399        
400        // Check sender's outbox
401        let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
402        assert_eq!(outbox.len(), 1);
403        assert_eq!(outbox[0].subject, "Hello");
404    }
405
406    #[tokio::test]
407    async fn test_mark_mail_as_read() {
408        let storage = InMemoryStorage::new();
409        let service = MailServiceImpl::new(storage);
410        
411        let agent1 = service.create_agent("Sender").await.unwrap();
412        let agent2 = service.create_agent("Receiver").await.unwrap();
413        
414        let mail = service
415            .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
416            .await
417            .unwrap();
418        
419        assert!(!mail.read);
420        
421        let updated = service.mark_mail_as_read(mail.id).await.unwrap();
422        assert!(updated.read);
423    }
424
425    #[tokio::test]
426    async fn test_check_unread_mail() {
427        let storage = InMemoryStorage::new();
428        let service = MailServiceImpl::new(storage);
429        
430        let agent1 = service.create_agent("Sender").await.unwrap();
431        let agent2 = service.create_agent("Receiver").await.unwrap();
432        
433        // Initially no unread mail
434        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
435        assert!(!has_unread);
436        assert!(unread.is_empty());
437        
438        // Send mail
439        service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
440        
441        // Now there is unread mail
442        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
443        assert!(has_unread);
444        assert_eq!(unread.len(), 1);
445        
446        // Mark as read
447        let mail_id = unread[0].id;
448        service.mark_mail_as_read(mail_id).await.unwrap();
449        
450        // No more unread
451        let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
452        assert!(!has_unread);
453        assert!(unread.is_empty());
454    }
455
456    #[tokio::test]
457    async fn test_get_recent_mail() {
458        let storage = InMemoryStorage::new();
459        let service = MailServiceImpl::new(storage);
460        
461        let agent1 = service.create_agent("Sender").await.unwrap();
462        let agent2 = service.create_agent("Receiver").await.unwrap();
463        
464        // Send some mail
465        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
466        
467        // Get recent mail (last 24 hours)
468        let recent = service.get_recent_mail(
469            string_to_node_id(&agent2.id),
470            24,
471            10
472        ).await.unwrap();
473        
474        assert_eq!(recent.len(), 1);
475        assert_eq!(recent[0].subject, "Recent");
476    }
477
478    #[tokio::test]
479    async fn test_get_nonexistent_agent() {
480        let storage = InMemoryStorage::new();
481        let service = MailServiceImpl::new(storage);
482        
483        let fake_id = "nonexistent_agent".to_string();
484        let result = service.get_agent(fake_id).await;
485        
486        assert!(matches!(result, Err(MailError::AgentNotFound(_))));
487    }
488
489    #[tokio::test]
490    async fn test_get_agent_by_mailbox() {
491        let storage = InMemoryStorage::new();
492        let service = MailServiceImpl::new(storage);
493        
494        let agent = service.create_agent("Test Agent").await.unwrap();
495        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
496        
497        // Get agent by mailbox
498        let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
499        assert_eq!(found_agent.id, agent.id);
500        assert_eq!(found_agent.name, agent.name);
501    }
502
503    #[tokio::test]
504    async fn test_list_agents() {
505        let storage = InMemoryStorage::new();
506        let service = MailServiceImpl::new(storage);
507        
508        // Create multiple agents
509        service.create_agent("Agent 1").await.unwrap();
510        service.create_agent("Agent 2").await.unwrap();
511        service.create_agent("Agent 3").await.unwrap();
512        
513        let agents = service.list_agents().await.unwrap();
514        assert_eq!(agents.len(), 3);
515    }
516
517    #[tokio::test]
518    async fn test_set_agent_status() {
519        let storage = InMemoryStorage::new();
520        let service = MailServiceImpl::new(storage);
521        
522        let agent = service.create_agent("Test Agent").await.unwrap();
523        assert_eq!(agent.status, "offline");
524        
525        let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
526        assert_eq!(updated.status, "online");
527        
528        // Verify persisted
529        let retrieved = service.get_agent(agent.id).await.unwrap();
530        assert_eq!(retrieved.status, "online");
531    }
532
533    #[tokio::test]
534    async fn test_multiple_mails_sorting() {
535        let storage = InMemoryStorage::new();
536        let service = MailServiceImpl::new(storage);
537        
538        let agent1 = service.create_agent("Sender").await.unwrap();
539        let agent2 = service.create_agent("Receiver").await.unwrap();
540        
541        // Send multiple mails
542        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
543        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
544        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
545        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
546        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
547        
548        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
549        assert_eq!(inbox.len(), 3);
550        // Should be sorted newest first
551        assert_eq!(inbox[0].subject, "Third");
552        assert_eq!(inbox[1].subject, "Second");
553        assert_eq!(inbox[2].subject, "First");
554    }
555}