Skip to main content

agent_office/services/mail/
mod.rs

1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11    #[error("Mailbox not found: {0}")]
12    MailboxNotFound(MailboxId),
13    
14    #[error("Agent not found: {0}")]
15    AgentNotFound(AgentId),
16    
17    #[error("Mail not found: {0}")]
18    MailNotFound(uuid::Uuid),
19    
20    #[error("Storage error: {0}")]
21    Storage(#[from] StorageError),
22    
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31    // Agent operations
32    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33    async fn get_agent(&self, id: AgentId) -> Result<Agent>;
34    async fn list_agents(&self) -> Result<Vec<Agent>>;
35    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
36    
37    // Get agent by their mailbox ID (each agent has exactly one mailbox)
38    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
39    
40    // Get the single mailbox for an agent (auto-creates if doesn't exist)
41    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
42    
43    // Send mail from one agent to another
44    async fn send_agent_to_agent(
45        &self,
46        from_agent_id: AgentId,
47        to_agent_id: AgentId,
48        subject: impl Into<String> + Send,
49        body: impl Into<String> + Send,
50    ) -> Result<Mail>;
51    
52    // Get mail received by an agent's mailbox
53    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
54    
55    // Get mail sent by an agent's mailbox
56    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
57    
58    // Get recent mail for an agent (received in last N hours)
59    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
60    
61    // Mark mail as read
62    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
63    
64    // Mark mail as read by short ID (8-char prefix) - searches all mail system-wide
65    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
66    
67    // Check if agent has unread mail
68    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
69}
70
71pub struct MailServiceImpl<S: GraphStorage> {
72    storage: S,
73}
74
75impl<S: GraphStorage> MailServiceImpl<S> {
76    pub fn new(storage: S) -> Self {
77        Self { storage }
78    }
79
80    /// Helper to get mail by ID
81    async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
82        let node = self.storage.get_node(mail_id).await
83            .map_err(|e| match e {
84                StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
85                _ => MailError::Storage(e),
86            })?;
87        Mail::from_node(&node)
88            .ok_or(MailError::MailNotFound(mail_id))
89    }
90}
91
92#[async_trait]
93impl<S: GraphStorage> MailService for MailServiceImpl<S> {
94    async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
95        let agent = Agent::new(name);
96        let node = agent.to_node();
97        self.storage.create_node(&node).await?;
98        
99        Ok(agent)
100    }
101
102    async fn get_agent(&self, id: AgentId) -> Result<Agent> {
103        let node_id = string_to_node_id(&id);
104        let id_clone = id.clone();
105        let node = self.storage.get_node(node_id).await
106            .map_err(|e| match e {
107                StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
108                _ => MailError::Storage(e),
109            })?;
110        Agent::from_node(&node)
111            .ok_or(MailError::AgentNotFound(id))
112    }
113
114    async fn list_agents(&self) -> Result<Vec<Agent>> {
115        let query = GraphQuery::new().with_node_type("agent");
116        let nodes = self.storage.query_nodes(&query).await?;
117        let agents: Vec<Agent> = nodes.iter()
118            .filter_map(Agent::from_node)
119            .collect();
120        Ok(agents)
121    }
122
123    async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
124        let mut agent = self.get_agent(agent_id).await?;
125        agent.status = status.into();
126        let node = agent.to_node();
127        self.storage.update_node(&node).await?;
128        Ok(agent)
129    }
130
131    async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
132        // The mailbox ID is the agent's node ID, so get the agent directly
133        let node = self.storage.get_node(mailbox_id).await
134            .map_err(|e| match e {
135                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
136                _ => MailError::Storage(e),
137            })?;
138        
139        Agent::from_node(&node)
140            .ok_or_else(|| MailError::InvalidOperation(
141                "Node exists but is not an agent".to_string()
142            ))
143    }
144
145    async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
146        // Verify agent exists - the agent's node IS the mailbox
147        let agent = self.get_agent(agent_id.clone()).await?;
148        let node_id = string_to_node_id(&agent.id);
149        
150        // Create a mailbox representation from the agent
151        let mailbox = Mailbox {
152            id: node_id,
153            owner_id: agent.id,
154            name: "Mailbox".to_string(),
155            created_at: agent.created_at,
156        };
157        
158        Ok(mailbox)
159    }
160
161    async fn send_agent_to_agent(
162        &self,
163        from_agent_id: AgentId,
164        to_agent_id: AgentId,
165        subject: impl Into<String> + Send,
166        body: impl Into<String> + Send,
167    ) -> Result<Mail> {
168        // Verify both agents exist
169        let from_agent = self.get_agent(from_agent_id).await?;
170        let to_agent = self.get_agent(to_agent_id).await?;
171        
172        // Use agent node IDs as mailbox IDs
173        let from_mailbox_id = string_to_node_id(&from_agent.id);
174        let to_mailbox_id = string_to_node_id(&to_agent.id);
175        
176        // Create mail
177        let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
178        let node = mail.to_node();
179        
180        // Create mail node
181        self.storage.create_node(&node).await?;
182        
183        // Create edges for sender and receiver
184        let from_edge = Edge::new(
185            "sent_from",
186            from_mailbox_id,
187            mail.id,
188            Properties::new(),
189        );
190        self.storage.create_edge(&from_edge).await?;
191        
192        let to_edge = Edge::new(
193            "sent_to",
194            mail.id,
195            to_mailbox_id,
196            Properties::new(),
197        );
198        self.storage.create_edge(&to_edge).await?;
199        
200        Ok(mail)
201    }
202
203    async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
204        // Verify mailbox (agent) exists
205        let _agent = self.storage.get_node(mailbox_id).await
206            .map_err(|e| match e {
207                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
208                _ => MailError::Storage(e),
209            })?;
210        
211        // Get all mail where there's an edge from mail -> mailbox (sent_to)
212        let incoming_edges = self.storage
213            .get_edges_to(mailbox_id, Some("sent_to"))
214            .await?;
215        
216        let mut mails = Vec::new();
217        for edge in incoming_edges {
218            if let Ok(mail) = self.get_mail(edge.from_node_id).await {
219                mails.push(mail);
220            }
221        }
222        
223        // Sort by creation date, newest first
224        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
225        
226        Ok(mails)
227    }
228
229    async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
230        // Verify mailbox (agent) exists
231        let _agent = self.storage.get_node(mailbox_id).await
232            .map_err(|e| match e {
233                StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
234                _ => MailError::Storage(e),
235            })?;
236        
237        // Get all mail where there's an edge from mailbox -> mail (sent_from)
238        let outgoing_edges = self.storage
239            .get_edges_from(mailbox_id, Some("sent_from"))
240            .await?;
241        
242        let mut mails = Vec::new();
243        for edge in outgoing_edges {
244            if let Ok(mail) = self.get_mail(edge.to_node_id).await {
245                mails.push(mail);
246            }
247        }
248        
249        // Sort by creation date, newest first
250        mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
251        
252        Ok(mails)
253    }
254
255    async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
256        let since = chrono::Utc::now() - chrono::Duration::hours(hours);
257        
258        // Get all mail in the inbox
259        let inbox = self.get_mailbox_inbox(mailbox_id).await?;
260        
261        // Filter to recent mail only
262        let recent: Vec<Mail> = inbox.into_iter()
263            .filter(|mail| mail.created_at >= since)
264            .take(limit)
265            .collect();
266        
267        Ok(recent)
268    }
269
270    async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
271        let mut mail = self.get_mail(mail_id).await?;
272        mail.mark_as_read();
273        
274        let node = mail.to_node();
275        self.storage.update_node(&node).await?;
276        
277        Ok(mail)
278    }
279
280    async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
281        // Query all mail nodes in the system
282        let query = GraphQuery::new().with_node_type("mail");
283        let nodes = self.storage.query_nodes(&query).await?;
284        
285        // Convert nodes to Mail and find matching short ID
286        let short_id_lower = short_id.to_lowercase();
287        let matching: Vec<_> = nodes.iter()
288            .filter_map(Mail::from_node)
289            .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
290            .collect();
291        
292        match matching.len() {
293            0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
294            1 => {
295                let mail_id = matching[0].id;
296                self.mark_mail_as_read(mail_id).await
297            }
298            _ => Err(MailError::InvalidOperation(
299                format!("Multiple mails match short ID '{}', please use full ID", short_id)
300            )),
301        }
302    }
303
304    async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
305        // Get the agent's mailbox ID
306        let mailbox = self.get_agent_mailbox(agent_id).await?;
307        
308        // Get inbox and filter for unread
309        let inbox = self.get_mailbox_inbox(mailbox.id).await?;
310        let unread: Vec<Mail> = inbox.into_iter()
311            .filter(|mail| !mail.read)
312            .collect();
313        
314        let has_unread = !unread.is_empty();
315        
316        Ok((has_unread, unread))
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::storage::memory::InMemoryStorage;
324
325    #[tokio::test]
326    async fn test_create_agent() {
327        let storage = InMemoryStorage::new();
328        let service = MailServiceImpl::new(storage);
329        
330        let agent = service.create_agent("Test Agent").await.unwrap();
331        assert_eq!(agent.name, "Test Agent");
332    }
333
334    #[tokio::test]
335    async fn test_create_agent_auto_creates_mailbox() {
336        let storage = InMemoryStorage::new();
337        let service = MailServiceImpl::new(storage);
338        
339        let agent = service.create_agent("Agent").await.unwrap();
340        
341        // Should be able to get the mailbox for the agent
342        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
343        assert_eq!(mailbox.owner_id, agent.id);
344        
345        // The mailbox ID should be the same as the agent's node ID
346        let expected_mailbox_id = string_to_node_id(&agent.id);
347        assert_eq!(mailbox.id, expected_mailbox_id);
348    }
349
350    #[tokio::test]
351    async fn test_send_and_receive_mail() {
352        let storage = InMemoryStorage::new();
353        let service = MailServiceImpl::new(storage);
354        
355        // Create two agents - mailboxes are auto-created
356        let agent1 = service.create_agent("Sender").await.unwrap();
357        let agent2 = service.create_agent("Receiver").await.unwrap();
358        
359        // Send mail directly between agents
360        let mail = service
361            .send_agent_to_agent(
362                agent1.id.clone(),
363                agent2.id.clone(),
364                "Hello",
365                "This is a test message",
366            )
367            .await
368            .unwrap();
369        
370        assert_eq!(mail.subject, "Hello");
371        assert_eq!(mail.body, "This is a test message");
372        assert!(!mail.read);
373        
374        // Check receiver's inbox
375        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
376        assert_eq!(inbox.len(), 1);
377        assert_eq!(inbox[0].subject, "Hello");
378        
379        // Check sender's outbox
380        let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
381        assert_eq!(outbox.len(), 1);
382        assert_eq!(outbox[0].subject, "Hello");
383    }
384
385    #[tokio::test]
386    async fn test_mark_mail_as_read() {
387        let storage = InMemoryStorage::new();
388        let service = MailServiceImpl::new(storage);
389        
390        let agent1 = service.create_agent("Sender").await.unwrap();
391        let agent2 = service.create_agent("Receiver").await.unwrap();
392        
393        let mail = service
394            .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
395            .await
396            .unwrap();
397        
398        assert!(!mail.read);
399        
400        let updated = service.mark_mail_as_read(mail.id).await.unwrap();
401        assert!(updated.read);
402    }
403
404    #[tokio::test]
405    async fn test_check_unread_mail() {
406        let storage = InMemoryStorage::new();
407        let service = MailServiceImpl::new(storage);
408        
409        let agent1 = service.create_agent("Sender").await.unwrap();
410        let agent2 = service.create_agent("Receiver").await.unwrap();
411        
412        // Initially no unread mail
413        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
414        assert!(!has_unread);
415        assert!(unread.is_empty());
416        
417        // Send mail
418        service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
419        
420        // Now there is unread mail
421        let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
422        assert!(has_unread);
423        assert_eq!(unread.len(), 1);
424        
425        // Mark as read
426        let mail_id = unread[0].id;
427        service.mark_mail_as_read(mail_id).await.unwrap();
428        
429        // No more unread
430        let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
431        assert!(!has_unread);
432        assert!(unread.is_empty());
433    }
434
435    #[tokio::test]
436    async fn test_get_recent_mail() {
437        let storage = InMemoryStorage::new();
438        let service = MailServiceImpl::new(storage);
439        
440        let agent1 = service.create_agent("Sender").await.unwrap();
441        let agent2 = service.create_agent("Receiver").await.unwrap();
442        
443        // Send some mail
444        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
445        
446        // Get recent mail (last 24 hours)
447        let recent = service.get_recent_mail(
448            string_to_node_id(&agent2.id),
449            24,
450            10
451        ).await.unwrap();
452        
453        assert_eq!(recent.len(), 1);
454        assert_eq!(recent[0].subject, "Recent");
455    }
456
457    #[tokio::test]
458    async fn test_get_nonexistent_agent() {
459        let storage = InMemoryStorage::new();
460        let service = MailServiceImpl::new(storage);
461        
462        let fake_id = "nonexistent_agent".to_string();
463        let result = service.get_agent(fake_id).await;
464        
465        assert!(matches!(result, Err(MailError::AgentNotFound(_))));
466    }
467
468    #[tokio::test]
469    async fn test_get_agent_by_mailbox() {
470        let storage = InMemoryStorage::new();
471        let service = MailServiceImpl::new(storage);
472        
473        let agent = service.create_agent("Test Agent").await.unwrap();
474        let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
475        
476        // Get agent by mailbox
477        let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
478        assert_eq!(found_agent.id, agent.id);
479        assert_eq!(found_agent.name, agent.name);
480    }
481
482    #[tokio::test]
483    async fn test_list_agents() {
484        let storage = InMemoryStorage::new();
485        let service = MailServiceImpl::new(storage);
486        
487        // Create multiple agents
488        service.create_agent("Agent 1").await.unwrap();
489        service.create_agent("Agent 2").await.unwrap();
490        service.create_agent("Agent 3").await.unwrap();
491        
492        let agents = service.list_agents().await.unwrap();
493        assert_eq!(agents.len(), 3);
494    }
495
496    #[tokio::test]
497    async fn test_set_agent_status() {
498        let storage = InMemoryStorage::new();
499        let service = MailServiceImpl::new(storage);
500        
501        let agent = service.create_agent("Test Agent").await.unwrap();
502        assert_eq!(agent.status, "offline");
503        
504        let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
505        assert_eq!(updated.status, "online");
506        
507        // Verify persisted
508        let retrieved = service.get_agent(agent.id).await.unwrap();
509        assert_eq!(retrieved.status, "online");
510    }
511
512    #[tokio::test]
513    async fn test_multiple_mails_sorting() {
514        let storage = InMemoryStorage::new();
515        let service = MailServiceImpl::new(storage);
516        
517        let agent1 = service.create_agent("Sender").await.unwrap();
518        let agent2 = service.create_agent("Receiver").await.unwrap();
519        
520        // Send multiple mails
521        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
522        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
523        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
524        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
525        service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
526        
527        let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
528        assert_eq!(inbox.len(), 3);
529        // Should be sorted newest first
530        assert_eq!(inbox[0].subject, "Third");
531        assert_eq!(inbox[1].subject, "Second");
532        assert_eq!(inbox[2].subject, "First");
533    }
534}