agent_office/services/mail/
mod.rs1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11 #[error("Mailbox not found: {0}")]
12 MailboxNotFound(MailboxId),
13
14 #[error("Agent not found: {0}")]
15 AgentNotFound(AgentId),
16
17 #[error("Mail not found: {0}")]
18 MailNotFound(uuid::Uuid),
19
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33 async fn get_agent(&self, id: AgentId) -> Result<Agent>;
34 async fn list_agents(&self) -> Result<Vec<Agent>>;
35 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
36
37 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
39
40 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
42
43 async fn send_agent_to_agent(
45 &self,
46 from_agent_id: AgentId,
47 to_agent_id: AgentId,
48 subject: impl Into<String> + Send,
49 body: impl Into<String> + Send,
50 ) -> Result<Mail>;
51
52 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
54
55 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
57
58 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
60
61 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
63
64 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
66
67 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
69}
70
71pub struct MailServiceImpl<S: GraphStorage> {
72 storage: S,
73}
74
75impl<S: GraphStorage> MailServiceImpl<S> {
76 pub fn new(storage: S) -> Self {
77 Self { storage }
78 }
79
80 async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
82 let node = self.storage.get_node(mail_id).await
83 .map_err(|e| match e {
84 StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
85 _ => MailError::Storage(e),
86 })?;
87 Mail::from_node(&node)
88 .ok_or(MailError::MailNotFound(mail_id))
89 }
90}
91
92#[async_trait]
93impl<S: GraphStorage> MailService for MailServiceImpl<S> {
94 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
95 let agent = Agent::new(name);
96 let node = agent.to_node();
97 self.storage.create_node(&node).await?;
98
99 Ok(agent)
100 }
101
102 async fn get_agent(&self, id: AgentId) -> Result<Agent> {
103 let node_id = string_to_node_id(&id);
104 let id_clone = id.clone();
105 let node = self.storage.get_node(node_id).await
106 .map_err(|e| match e {
107 StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
108 _ => MailError::Storage(e),
109 })?;
110 Agent::from_node(&node)
111 .ok_or(MailError::AgentNotFound(id))
112 }
113
114 async fn list_agents(&self) -> Result<Vec<Agent>> {
115 let query = GraphQuery::new().with_node_type("agent");
116 let nodes = self.storage.query_nodes(&query).await?;
117 let agents: Vec<Agent> = nodes.iter()
118 .filter_map(Agent::from_node)
119 .collect();
120 Ok(agents)
121 }
122
123 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
124 let mut agent = self.get_agent(agent_id).await?;
125 agent.status = status.into();
126 let node = agent.to_node();
127 self.storage.update_node(&node).await?;
128 Ok(agent)
129 }
130
131 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
132 let node = self.storage.get_node(mailbox_id).await
134 .map_err(|e| match e {
135 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
136 _ => MailError::Storage(e),
137 })?;
138
139 Agent::from_node(&node)
140 .ok_or_else(|| MailError::InvalidOperation(
141 "Node exists but is not an agent".to_string()
142 ))
143 }
144
145 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
146 let agent = self.get_agent(agent_id.clone()).await?;
148 let node_id = string_to_node_id(&agent.id);
149
150 let mailbox = Mailbox {
152 id: node_id,
153 owner_id: agent.id,
154 name: "Mailbox".to_string(),
155 created_at: agent.created_at,
156 };
157
158 Ok(mailbox)
159 }
160
161 async fn send_agent_to_agent(
162 &self,
163 from_agent_id: AgentId,
164 to_agent_id: AgentId,
165 subject: impl Into<String> + Send,
166 body: impl Into<String> + Send,
167 ) -> Result<Mail> {
168 let from_agent = self.get_agent(from_agent_id).await?;
170 let to_agent = self.get_agent(to_agent_id).await?;
171
172 let from_mailbox_id = string_to_node_id(&from_agent.id);
174 let to_mailbox_id = string_to_node_id(&to_agent.id);
175
176 let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
178 let node = mail.to_node();
179
180 self.storage.create_node(&node).await?;
182
183 let from_edge = Edge::new(
185 "sent_from",
186 from_mailbox_id,
187 mail.id,
188 Properties::new(),
189 );
190 self.storage.create_edge(&from_edge).await?;
191
192 let to_edge = Edge::new(
193 "sent_to",
194 mail.id,
195 to_mailbox_id,
196 Properties::new(),
197 );
198 self.storage.create_edge(&to_edge).await?;
199
200 Ok(mail)
201 }
202
203 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
204 let _agent = self.storage.get_node(mailbox_id).await
206 .map_err(|e| match e {
207 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
208 _ => MailError::Storage(e),
209 })?;
210
211 let incoming_edges = self.storage
213 .get_edges_to(mailbox_id, Some("sent_to"))
214 .await?;
215
216 let mut mails = Vec::new();
217 for edge in incoming_edges {
218 if let Ok(mail) = self.get_mail(edge.from_node_id).await {
219 mails.push(mail);
220 }
221 }
222
223 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
225
226 Ok(mails)
227 }
228
229 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
230 let _agent = self.storage.get_node(mailbox_id).await
232 .map_err(|e| match e {
233 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
234 _ => MailError::Storage(e),
235 })?;
236
237 let outgoing_edges = self.storage
239 .get_edges_from(mailbox_id, Some("sent_from"))
240 .await?;
241
242 let mut mails = Vec::new();
243 for edge in outgoing_edges {
244 if let Ok(mail) = self.get_mail(edge.to_node_id).await {
245 mails.push(mail);
246 }
247 }
248
249 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
251
252 Ok(mails)
253 }
254
255 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
256 let since = chrono::Utc::now() - chrono::Duration::hours(hours);
257
258 let inbox = self.get_mailbox_inbox(mailbox_id).await?;
260
261 let recent: Vec<Mail> = inbox.into_iter()
263 .filter(|mail| mail.created_at >= since)
264 .take(limit)
265 .collect();
266
267 Ok(recent)
268 }
269
270 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
271 let mut mail = self.get_mail(mail_id).await?;
272 mail.mark_as_read();
273
274 let node = mail.to_node();
275 self.storage.update_node(&node).await?;
276
277 Ok(mail)
278 }
279
280 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
281 let query = GraphQuery::new().with_node_type("mail");
283 let nodes = self.storage.query_nodes(&query).await?;
284
285 let short_id_lower = short_id.to_lowercase();
287 let matching: Vec<_> = nodes.iter()
288 .filter_map(Mail::from_node)
289 .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
290 .collect();
291
292 match matching.len() {
293 0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
294 1 => {
295 let mail_id = matching[0].id;
296 self.mark_mail_as_read(mail_id).await
297 }
298 _ => Err(MailError::InvalidOperation(
299 format!("Multiple mails match short ID '{}', please use full ID", short_id)
300 )),
301 }
302 }
303
304 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
305 let mailbox = self.get_agent_mailbox(agent_id).await?;
307
308 let inbox = self.get_mailbox_inbox(mailbox.id).await?;
310 let unread: Vec<Mail> = inbox.into_iter()
311 .filter(|mail| !mail.read)
312 .collect();
313
314 let has_unread = !unread.is_empty();
315
316 Ok((has_unread, unread))
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use crate::storage::memory::InMemoryStorage;
324
325 #[tokio::test]
326 async fn test_create_agent() {
327 let storage = InMemoryStorage::new();
328 let service = MailServiceImpl::new(storage);
329
330 let agent = service.create_agent("Test Agent").await.unwrap();
331 assert_eq!(agent.name, "Test Agent");
332 }
333
334 #[tokio::test]
335 async fn test_create_agent_auto_creates_mailbox() {
336 let storage = InMemoryStorage::new();
337 let service = MailServiceImpl::new(storage);
338
339 let agent = service.create_agent("Agent").await.unwrap();
340
341 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
343 assert_eq!(mailbox.owner_id, agent.id);
344
345 let expected_mailbox_id = string_to_node_id(&agent.id);
347 assert_eq!(mailbox.id, expected_mailbox_id);
348 }
349
350 #[tokio::test]
351 async fn test_send_and_receive_mail() {
352 let storage = InMemoryStorage::new();
353 let service = MailServiceImpl::new(storage);
354
355 let agent1 = service.create_agent("Sender").await.unwrap();
357 let agent2 = service.create_agent("Receiver").await.unwrap();
358
359 let mail = service
361 .send_agent_to_agent(
362 agent1.id.clone(),
363 agent2.id.clone(),
364 "Hello",
365 "This is a test message",
366 )
367 .await
368 .unwrap();
369
370 assert_eq!(mail.subject, "Hello");
371 assert_eq!(mail.body, "This is a test message");
372 assert!(!mail.read);
373
374 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
376 assert_eq!(inbox.len(), 1);
377 assert_eq!(inbox[0].subject, "Hello");
378
379 let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
381 assert_eq!(outbox.len(), 1);
382 assert_eq!(outbox[0].subject, "Hello");
383 }
384
385 #[tokio::test]
386 async fn test_mark_mail_as_read() {
387 let storage = InMemoryStorage::new();
388 let service = MailServiceImpl::new(storage);
389
390 let agent1 = service.create_agent("Sender").await.unwrap();
391 let agent2 = service.create_agent("Receiver").await.unwrap();
392
393 let mail = service
394 .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
395 .await
396 .unwrap();
397
398 assert!(!mail.read);
399
400 let updated = service.mark_mail_as_read(mail.id).await.unwrap();
401 assert!(updated.read);
402 }
403
404 #[tokio::test]
405 async fn test_check_unread_mail() {
406 let storage = InMemoryStorage::new();
407 let service = MailServiceImpl::new(storage);
408
409 let agent1 = service.create_agent("Sender").await.unwrap();
410 let agent2 = service.create_agent("Receiver").await.unwrap();
411
412 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
414 assert!(!has_unread);
415 assert!(unread.is_empty());
416
417 service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
419
420 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
422 assert!(has_unread);
423 assert_eq!(unread.len(), 1);
424
425 let mail_id = unread[0].id;
427 service.mark_mail_as_read(mail_id).await.unwrap();
428
429 let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
431 assert!(!has_unread);
432 assert!(unread.is_empty());
433 }
434
435 #[tokio::test]
436 async fn test_get_recent_mail() {
437 let storage = InMemoryStorage::new();
438 let service = MailServiceImpl::new(storage);
439
440 let agent1 = service.create_agent("Sender").await.unwrap();
441 let agent2 = service.create_agent("Receiver").await.unwrap();
442
443 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
445
446 let recent = service.get_recent_mail(
448 string_to_node_id(&agent2.id),
449 24,
450 10
451 ).await.unwrap();
452
453 assert_eq!(recent.len(), 1);
454 assert_eq!(recent[0].subject, "Recent");
455 }
456
457 #[tokio::test]
458 async fn test_get_nonexistent_agent() {
459 let storage = InMemoryStorage::new();
460 let service = MailServiceImpl::new(storage);
461
462 let fake_id = "nonexistent_agent".to_string();
463 let result = service.get_agent(fake_id).await;
464
465 assert!(matches!(result, Err(MailError::AgentNotFound(_))));
466 }
467
468 #[tokio::test]
469 async fn test_get_agent_by_mailbox() {
470 let storage = InMemoryStorage::new();
471 let service = MailServiceImpl::new(storage);
472
473 let agent = service.create_agent("Test Agent").await.unwrap();
474 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
475
476 let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
478 assert_eq!(found_agent.id, agent.id);
479 assert_eq!(found_agent.name, agent.name);
480 }
481
482 #[tokio::test]
483 async fn test_list_agents() {
484 let storage = InMemoryStorage::new();
485 let service = MailServiceImpl::new(storage);
486
487 service.create_agent("Agent 1").await.unwrap();
489 service.create_agent("Agent 2").await.unwrap();
490 service.create_agent("Agent 3").await.unwrap();
491
492 let agents = service.list_agents().await.unwrap();
493 assert_eq!(agents.len(), 3);
494 }
495
496 #[tokio::test]
497 async fn test_set_agent_status() {
498 let storage = InMemoryStorage::new();
499 let service = MailServiceImpl::new(storage);
500
501 let agent = service.create_agent("Test Agent").await.unwrap();
502 assert_eq!(agent.status, "offline");
503
504 let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
505 assert_eq!(updated.status, "online");
506
507 let retrieved = service.get_agent(agent.id).await.unwrap();
509 assert_eq!(retrieved.status, "online");
510 }
511
512 #[tokio::test]
513 async fn test_multiple_mails_sorting() {
514 let storage = InMemoryStorage::new();
515 let service = MailServiceImpl::new(storage);
516
517 let agent1 = service.create_agent("Sender").await.unwrap();
518 let agent2 = service.create_agent("Receiver").await.unwrap();
519
520 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
522 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
523 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
524 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
525 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
526
527 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
528 assert_eq!(inbox.len(), 3);
529 assert_eq!(inbox[0].subject, "Third");
531 assert_eq!(inbox[1].subject, "Second");
532 assert_eq!(inbox[2].subject, "First");
533 }
534}