Skip to main content

agent_office/services/mail/
mod.rs

1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11    #[error("Mailbox not found: {0}")]
12    MailboxNotFound(MailboxId),
13    
14    #[error("Agent not found: {0}")]
15    AgentNotFound(AgentId),
16    
17    #[error("Mail not found: {0}")]
18    MailNotFound(uuid::Uuid),
19    
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31    // Agent operations
32    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33    async fn get_agent(&self, id: AgentId) -> Result<Agent>;
34    async fn list_agents(&self) -> Result<Vec<Agent>>;
35    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
36    
37    // Get agent by their mailbox ID (each agent has exactly one mailbox)
38    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
39    
40    // Get the single mailbox for an agent (auto-creates if doesn't exist)
41    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
42    
43    // Send mail from one agent to another
44    async fn send_agent_to_agent(
45        &self,
46        from_agent_id: AgentId,
47        to_agent_id: AgentId,
48        subject: impl Into<String> + Send,
49        body: impl Into<String> + Send,
50    ) -> Result<Mail>;
51    
52    // Get mail received by an agent's mailbox
53    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
54    
55    // Get mail sent by an agent's mailbox
56    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
57    
58    // Get recent mail for an agent (received in last N hours)
59    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
60    
61    // Mark mail as read
62    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
63    
64    // Check if agent has unread mail
65    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
66}
67
68pub struct MailServiceImpl<S: GraphStorage> {
69    storage: S,
70}
71
72impl<S: GraphStorage> MailServiceImpl<S> {
73    pub fn new(storage: S) -> Self {
74        Self { storage }
75    }
76
77    /// Helper to get mail by ID
78    async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
79        let node = self.storage.get_node(mail_id).await
80            .map_err(|e| match e {
81                StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
82                _ => MailError::Storage(e),
83            })?;
84        Mail::from_node(&node)
85            .ok_or(MailError::MailNotFound(mail_id))
86    }
87}
88
89#[async_trait]
90impl<S: GraphStorage> MailService for MailServiceImpl<S> {
91    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
92        let agent = Agent::new(name);
93        let node = agent.to_node();
94        self.storage.create_node(&node).await?;
95        
96        Ok(agent)
97    }
98
99    async fn get_agent(&self, id: AgentId) -> Result<Agent> {
100        let node_id = string_to_node_id(&id);
101        let id_clone = id.clone();
102        let node = self.storage.get_node(node_id).await
103            .map_err(|e| match e {
104                StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
105                _ => MailError::Storage(e),
106            })?;
107        Agent::from_node(&node)
108            .ok_or(MailError::AgentNotFound(id))
109    }
110
111    async fn list_agents(&self) -> Result<Vec<Agent>> {
112        let query = GraphQuery::new().with_node_type("agent");
113        let nodes = self.storage.query_nodes(&query).await?;
114        let agents: Vec<Agent> = nodes.iter()
115            .filter_map(Agent::from_node)
116            .collect();
117        Ok(agents)
118    }
119
120    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
121        let mut agent = self.get_agent(agent_id).await?;
122        agent.status = status.into();
123        let node = agent.to_node();
124        self.storage.update_node(&node).await?;
125        Ok(agent)
126    }
127
128    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
129        // The mailbox ID is the agent's node ID, so get the agent directly
130        let node = self.storage.get_node(mailbox_id).await
131            .map_err(|e| match e {
132                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
133                _ => MailError::Storage(e),
134            })?;
135        
136        Agent::from_node(&node)
137            .ok_or_else(|| MailError::InvalidOperation(
138                "Node exists but is not an agent".to_string()
139            ))
140    }
141
142    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
143        // Verify agent exists - the agent's node IS the mailbox
144        let agent = self.get_agent(agent_id.clone()).await?;
145        let node_id = string_to_node_id(&agent.id);
146        
147        // Create a mailbox representation from the agent
148        let mailbox = Mailbox {
149            id: node_id,
150            owner_id: agent.id,
151            name: "Mailbox".to_string(),
152            created_at: agent.created_at,
153        };
154        
155        Ok(mailbox)
156    }
157
158    async fn send_agent_to_agent(
159        &self,
160        from_agent_id: AgentId,
161        to_agent_id: AgentId,
162        subject: impl Into<String> + Send,
163        body: impl Into<String> + Send,
164    ) -> Result<Mail> {
165        // Verify both agents exist
166        let from_agent = self.get_agent(from_agent_id).await?;
167        let to_agent = self.get_agent(to_agent_id).await?;
168        
169        // Use agent node IDs as mailbox IDs
170        let from_mailbox_id = string_to_node_id(&from_agent.id);
171        let to_mailbox_id = string_to_node_id(&to_agent.id);
172        
173        // Create mail
174        let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
175        let node = mail.to_node();
176        
177        // Create mail node
178        self.storage.create_node(&node).await?;
179        
180        // Create edges for sender and receiver
181        let from_edge = Edge::new(
182            "sent_from",
183            from_mailbox_id,
184            mail.id,
185            Properties::new(),
186        );
187        self.storage.create_edge(&from_edge).await?;
188        
189        let to_edge = Edge::new(
190            "sent_to",
191            mail.id,
192            to_mailbox_id,
193            Properties::new(),
194        );
195        self.storage.create_edge(&to_edge).await?;
196        
197        Ok(mail)
198    }
199
200    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
201        // Verify mailbox (agent) exists
202        let _agent = self.storage.get_node(mailbox_id).await
203            .map_err(|e| match e {
204                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
205                _ => MailError::Storage(e),
206            })?;
207        
208        // Get all mail where there's an edge from mail -> mailbox (sent_to)
209        let incoming_edges = self.storage
210            .get_edges_to(mailbox_id, Some("sent_to"))
211            .await?;
212        
213        let mut mails = Vec::new();
214        for edge in incoming_edges {
215            if let Ok(mail) = self.get_mail(edge.from_node_id).await {
216                mails.push(mail);
217            }
218        }
219        
220        // Sort by creation date, newest first
221        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
222        
223        Ok(mails)
224    }
225
226    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
227        // Verify mailbox (agent) exists
228        let _agent = self.storage.get_node(mailbox_id).await
229            .map_err(|e| match e {
230                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
231                _ => MailError::Storage(e),
232            })?;
233        
234        // Get all mail where there's an edge from mailbox -> mail (sent_from)
235        let outgoing_edges = self.storage
236            .get_edges_from(mailbox_id, Some("sent_from"))
237            .await?;
238        
239        let mut mails = Vec::new();
240        for edge in outgoing_edges {
241            if let Ok(mail) = self.get_mail(edge.to_node_id).await {
242                mails.push(mail);
243            }
244        }
245        
246        // Sort by creation date, newest first
247        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
248        
249        Ok(mails)
250    }
251
252    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
253        let since = chrono::Utc::now() - chrono::Duration::hours(hours);
254        
255        // Get all mail in the inbox
256        let inbox = self.get_mailbox_inbox(mailbox_id).await?;
257        
258        // Filter to recent mail only
259        let recent: Vec<Mail> = inbox.into_iter()
260            .filter(|mail| mail.created_at >= since)
261            .take(limit)
262            .collect();
263        
264        Ok(recent)
265    }
266
267    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
268        let mut mail = self.get_mail(mail_id).await?;
269        mail.mark_as_read();
270        
271        let node = mail.to_node();
272        self.storage.update_node(&node).await?;
273        
274        Ok(mail)
275    }
276
277    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
278        // Get the agent's mailbox ID
279        let mailbox = self.get_agent_mailbox(agent_id).await?;
280        
281        // Get inbox and filter for unread
282        let inbox = self.get_mailbox_inbox(mailbox.id).await?;
283        let unread: Vec<Mail> = inbox.into_iter()
284            .filter(|mail| !mail.read)
285            .collect();
286        
287        let has_unread = !unread.is_empty();
288        
289        Ok((has_unread, unread))
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::storage::memory::InMemoryStorage;
297
298    #[tokio::test]
299    async fn test_create_agent() {
300        let storage = InMemoryStorage::new();
301        let service = MailServiceImpl::new(storage);
302        
303        let agent = service.create_agent("Test Agent").await.unwrap();
304        assert_eq!(agent.name, "Test Agent");
305    }
306
307    #[tokio::test]
308    async fn test_create_agent_auto_creates_mailbox() {
309        let storage = InMemoryStorage::new();
310        let service = MailServiceImpl::new(storage);
311        
312        let agent = service.create_agent("Agent").await.unwrap();
313        
314        // Should be able to get the mailbox for the agent
315        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
316        assert_eq!(mailbox.owner_id, agent.id);
317        
318        // The mailbox ID should be the same as the agent's node ID
319        let expected_mailbox_id = string_to_node_id(&agent.id);
320        assert_eq!(mailbox.id, expected_mailbox_id);
321    }
322
323    #[tokio::test]
324    async fn test_send_and_receive_mail() {
325        let storage = InMemoryStorage::new();
326        let service = MailServiceImpl::new(storage);
327        
328        // Create two agents - mailboxes are auto-created
329        let agent1 = service.create_agent("Sender").await.unwrap();
330        let agent2 = service.create_agent("Receiver").await.unwrap();
331        
332        // Send mail directly between agents
333        let mail = service
334            .send_agent_to_agent(
335                agent1.id.clone(),
336                agent2.id.clone(),
337                "Hello",
338                "This is a test message",
339            )
340            .await
341            .unwrap();
342        
343        assert_eq!(mail.subject, "Hello");
344        assert_eq!(mail.body, "This is a test message");
345        assert!(!mail.read);
346        
347        // Check receiver's inbox
348        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
349        assert_eq!(inbox.len(), 1);
350        assert_eq!(inbox[0].subject, "Hello");
351        
352        // Check sender's outbox
353        let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
354        assert_eq!(outbox.len(), 1);
355        assert_eq!(outbox[0].subject, "Hello");
356    }
357
358    #[tokio::test]
359    async fn test_mark_mail_as_read() {
360        let storage = InMemoryStorage::new();
361        let service = MailServiceImpl::new(storage);
362        
363        let agent1 = service.create_agent("Sender").await.unwrap();
364        let agent2 = service.create_agent("Receiver").await.unwrap();
365        
366        let mail = service
367            .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
368            .await
369            .unwrap();
370        
371        assert!(!mail.read);
372        
373        let updated = service.mark_mail_as_read(mail.id).await.unwrap();
374        assert!(updated.read);
375    }
376
377    #[tokio::test]
378    async fn test_check_unread_mail() {
379        let storage = InMemoryStorage::new();
380        let service = MailServiceImpl::new(storage);
381        
382        let agent1 = service.create_agent("Sender").await.unwrap();
383        let agent2 = service.create_agent("Receiver").await.unwrap();
384        
385        // Initially no unread mail
386        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
387        assert!(!has_unread);
388        assert!(unread.is_empty());
389        
390        // Send mail
391        service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
392        
393        // Now there is unread mail
394        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
395        assert!(has_unread);
396        assert_eq!(unread.len(), 1);
397        
398        // Mark as read
399        let mail_id = unread[0].id;
400        service.mark_mail_as_read(mail_id).await.unwrap();
401        
402        // No more unread
403        let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
404        assert!(!has_unread);
405        assert!(unread.is_empty());
406    }
407
408    #[tokio::test]
409    async fn test_get_recent_mail() {
410        let storage = InMemoryStorage::new();
411        let service = MailServiceImpl::new(storage);
412        
413        let agent1 = service.create_agent("Sender").await.unwrap();
414        let agent2 = service.create_agent("Receiver").await.unwrap();
415        
416        // Send some mail
417        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
418        
419        // Get recent mail (last 24 hours)
420        let recent = service.get_recent_mail(
421            string_to_node_id(&agent2.id),
422            24,
423            10
424        ).await.unwrap();
425        
426        assert_eq!(recent.len(), 1);
427        assert_eq!(recent[0].subject, "Recent");
428    }
429
430    #[tokio::test]
431    async fn test_get_nonexistent_agent() {
432        let storage = InMemoryStorage::new();
433        let service = MailServiceImpl::new(storage);
434        
435        let fake_id = "nonexistent_agent".to_string();
436        let result = service.get_agent(fake_id).await;
437        
438        assert!(matches!(result, Err(MailError::AgentNotFound(_))));
439    }
440
441    #[tokio::test]
442    async fn test_get_agent_by_mailbox() {
443        let storage = InMemoryStorage::new();
444        let service = MailServiceImpl::new(storage);
445        
446        let agent = service.create_agent("Test Agent").await.unwrap();
447        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
448        
449        // Get agent by mailbox
450        let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
451        assert_eq!(found_agent.id, agent.id);
452        assert_eq!(found_agent.name, agent.name);
453    }
454
455    #[tokio::test]
456    async fn test_list_agents() {
457        let storage = InMemoryStorage::new();
458        let service = MailServiceImpl::new(storage);
459        
460        // Create multiple agents
461        service.create_agent("Agent 1").await.unwrap();
462        service.create_agent("Agent 2").await.unwrap();
463        service.create_agent("Agent 3").await.unwrap();
464        
465        let agents = service.list_agents().await.unwrap();
466        assert_eq!(agents.len(), 3);
467    }
468
469    #[tokio::test]
470    async fn test_set_agent_status() {
471        let storage = InMemoryStorage::new();
472        let service = MailServiceImpl::new(storage);
473        
474        let agent = service.create_agent("Test Agent").await.unwrap();
475        assert_eq!(agent.status, "offline");
476        
477        let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
478        assert_eq!(updated.status, "online");
479        
480        // Verify persisted
481        let retrieved = service.get_agent(agent.id).await.unwrap();
482        assert_eq!(retrieved.status, "online");
483    }
484
485    #[tokio::test]
486    async fn test_multiple_mails_sorting() {
487        let storage = InMemoryStorage::new();
488        let service = MailServiceImpl::new(storage);
489        
490        let agent1 = service.create_agent("Sender").await.unwrap();
491        let agent2 = service.create_agent("Receiver").await.unwrap();
492        
493        // Send multiple mails
494        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
495        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
496        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
497        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
498        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
499        
500        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
501        assert_eq!(inbox.len(), 3);
502        // Should be sorted newest first
503        assert_eq!(inbox[0].subject, "Third");
504        assert_eq!(inbox[1].subject, "Second");
505        assert_eq!(inbox[2].subject, "First");
506    }
507}