agent_office/services/mail/
mod.rs1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11 #[error("Mailbox not found: {0}")]
12 MailboxNotFound(MailboxId),
13
14 #[error("Agent not found: {0}")]
15 AgentNotFound(AgentId),
16
17 #[error("Mail not found: {0}")]
18 MailNotFound(uuid::Uuid),
19
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33 async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
34 async fn get_agent(&self, id: AgentId) -> Result<Agent>;
35 async fn list_agents(&self) -> Result<Vec<Agent>>;
36 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
37
38 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
40
41 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
43
44 async fn send_agent_to_agent(
46 &self,
47 from_agent_id: AgentId,
48 to_agent_id: AgentId,
49 subject: impl Into<String> + Send,
50 body: impl Into<String> + Send,
51 ) -> Result<Mail>;
52
53 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
55
56 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
58
59 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
61
62 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
64
65 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
67
68 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
70}
71
72pub struct MailServiceImpl<S: GraphStorage> {
73 storage: S,
74}
75
76impl<S: GraphStorage> MailServiceImpl<S> {
77 pub fn new(storage: S) -> Self {
78 Self { storage }
79 }
80
81 async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
83 let node = self.storage.get_node(mail_id).await
84 .map_err(|e| match e {
85 StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
86 _ => MailError::Storage(e),
87 })?;
88 Mail::from_node(&node)
89 .ok_or(MailError::MailNotFound(mail_id))
90 }
91}
92
93#[async_trait]
94impl<S: GraphStorage> MailService for MailServiceImpl<S> {
95 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
96 let agent = Agent::new(name);
97 let node = agent.to_node();
98 self.storage.create_node(&node).await?;
99
100 Ok(agent)
101 }
102
103 async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
104 let agent = self.get_agent(agent_id.clone()).await?;
106 let agent_node_id = string_to_node_id(&agent.id);
107
108 let inbox = self.get_mailbox_inbox(agent_node_id).await?;
110 let outbox = self.get_mailbox_outbox(agent_node_id).await?;
111
112 for mail in inbox.iter().chain(outbox.iter()) {
114 let _ = self.storage.delete_node(mail.id).await;
115 }
116
117 self.storage.delete_node(agent_node_id).await?;
119
120 Ok(())
121 }
122
123 async fn get_agent(&self, id: AgentId) -> Result<Agent> {
124 let node_id = string_to_node_id(&id);
125 let id_clone = id.clone();
126 let node = self.storage.get_node(node_id).await
127 .map_err(|e| match e {
128 StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
129 _ => MailError::Storage(e),
130 })?;
131 Agent::from_node(&node)
132 .ok_or(MailError::AgentNotFound(id))
133 }
134
135 async fn list_agents(&self) -> Result<Vec<Agent>> {
136 let query = GraphQuery::new().with_node_type("agent");
137 let nodes = self.storage.query_nodes(&query).await?;
138 let agents: Vec<Agent> = nodes.iter()
139 .filter_map(Agent::from_node)
140 .collect();
141 Ok(agents)
142 }
143
144 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
145 let mut agent = self.get_agent(agent_id).await?;
146 agent.status = status.into();
147 let node = agent.to_node();
148 self.storage.update_node(&node).await?;
149 Ok(agent)
150 }
151
152 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
153 let node = self.storage.get_node(mailbox_id).await
155 .map_err(|e| match e {
156 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
157 _ => MailError::Storage(e),
158 })?;
159
160 Agent::from_node(&node)
161 .ok_or_else(|| MailError::InvalidOperation(
162 "Node exists but is not an agent".to_string()
163 ))
164 }
165
166 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
167 let agent = self.get_agent(agent_id.clone()).await?;
169 let node_id = string_to_node_id(&agent.id);
170
171 let mailbox = Mailbox {
173 id: node_id,
174 owner_id: agent.id,
175 name: "Mailbox".to_string(),
176 created_at: agent.created_at,
177 };
178
179 Ok(mailbox)
180 }
181
182 async fn send_agent_to_agent(
183 &self,
184 from_agent_id: AgentId,
185 to_agent_id: AgentId,
186 subject: impl Into<String> + Send,
187 body: impl Into<String> + Send,
188 ) -> Result<Mail> {
189 let from_agent = self.get_agent(from_agent_id).await?;
191 let to_agent = self.get_agent(to_agent_id).await?;
192
193 let from_mailbox_id = string_to_node_id(&from_agent.id);
195 let to_mailbox_id = string_to_node_id(&to_agent.id);
196
197 let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
199 let node = mail.to_node();
200
201 self.storage.create_node(&node).await?;
203
204 let from_edge = Edge::new(
206 "sent_from",
207 from_mailbox_id,
208 mail.id,
209 Properties::new(),
210 );
211 self.storage.create_edge(&from_edge).await?;
212
213 let to_edge = Edge::new(
214 "sent_to",
215 mail.id,
216 to_mailbox_id,
217 Properties::new(),
218 );
219 self.storage.create_edge(&to_edge).await?;
220
221 Ok(mail)
222 }
223
224 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
225 let _agent = self.storage.get_node(mailbox_id).await
227 .map_err(|e| match e {
228 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
229 _ => MailError::Storage(e),
230 })?;
231
232 let incoming_edges = self.storage
234 .get_edges_to(mailbox_id, Some("sent_to"))
235 .await?;
236
237 let mut mails = Vec::new();
238 for edge in incoming_edges {
239 if let Ok(mail) = self.get_mail(edge.from_node_id).await {
240 mails.push(mail);
241 }
242 }
243
244 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
246
247 Ok(mails)
248 }
249
250 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
251 let _agent = self.storage.get_node(mailbox_id).await
253 .map_err(|e| match e {
254 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
255 _ => MailError::Storage(e),
256 })?;
257
258 let outgoing_edges = self.storage
260 .get_edges_from(mailbox_id, Some("sent_from"))
261 .await?;
262
263 let mut mails = Vec::new();
264 for edge in outgoing_edges {
265 if let Ok(mail) = self.get_mail(edge.to_node_id).await {
266 mails.push(mail);
267 }
268 }
269
270 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
272
273 Ok(mails)
274 }
275
276 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
277 let since = chrono::Utc::now() - chrono::Duration::hours(hours);
278
279 let inbox = self.get_mailbox_inbox(mailbox_id).await?;
281
282 let recent: Vec<Mail> = inbox.into_iter()
284 .filter(|mail| mail.created_at >= since)
285 .take(limit)
286 .collect();
287
288 Ok(recent)
289 }
290
291 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
292 let mut mail = self.get_mail(mail_id).await?;
293 mail.mark_as_read();
294
295 let node = mail.to_node();
296 self.storage.update_node(&node).await?;
297
298 Ok(mail)
299 }
300
301 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
302 let query = GraphQuery::new().with_node_type("mail");
304 let nodes = self.storage.query_nodes(&query).await?;
305
306 let short_id_lower = short_id.to_lowercase();
308 let matching: Vec<_> = nodes.iter()
309 .filter_map(Mail::from_node)
310 .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
311 .collect();
312
313 match matching.len() {
314 0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
315 1 => {
316 let mail_id = matching[0].id;
317 self.mark_mail_as_read(mail_id).await
318 }
319 _ => Err(MailError::InvalidOperation(
320 format!("Multiple mails match short ID '{}', please use full ID", short_id)
321 )),
322 }
323 }
324
325 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
326 let mailbox = self.get_agent_mailbox(agent_id).await?;
328
329 let inbox = self.get_mailbox_inbox(mailbox.id).await?;
331 let unread: Vec<Mail> = inbox.into_iter()
332 .filter(|mail| !mail.read)
333 .collect();
334
335 let has_unread = !unread.is_empty();
336
337 Ok((has_unread, unread))
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::storage::memory::InMemoryStorage;
345
346 #[tokio::test]
347 async fn test_create_agent() {
348 let storage = InMemoryStorage::new();
349 let service = MailServiceImpl::new(storage);
350
351 let agent = service.create_agent("Test Agent").await.unwrap();
352 assert_eq!(agent.name, "Test Agent");
353 }
354
355 #[tokio::test]
356 async fn test_create_agent_auto_creates_mailbox() {
357 let storage = InMemoryStorage::new();
358 let service = MailServiceImpl::new(storage);
359
360 let agent = service.create_agent("Agent").await.unwrap();
361
362 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
364 assert_eq!(mailbox.owner_id, agent.id);
365
366 let expected_mailbox_id = string_to_node_id(&agent.id);
368 assert_eq!(mailbox.id, expected_mailbox_id);
369 }
370
371 #[tokio::test]
372 async fn test_send_and_receive_mail() {
373 let storage = InMemoryStorage::new();
374 let service = MailServiceImpl::new(storage);
375
376 let agent1 = service.create_agent("Sender").await.unwrap();
378 let agent2 = service.create_agent("Receiver").await.unwrap();
379
380 let mail = service
382 .send_agent_to_agent(
383 agent1.id.clone(),
384 agent2.id.clone(),
385 "Hello",
386 "This is a test message",
387 )
388 .await
389 .unwrap();
390
391 assert_eq!(mail.subject, "Hello");
392 assert_eq!(mail.body, "This is a test message");
393 assert!(!mail.read);
394
395 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
397 assert_eq!(inbox.len(), 1);
398 assert_eq!(inbox[0].subject, "Hello");
399
400 let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
402 assert_eq!(outbox.len(), 1);
403 assert_eq!(outbox[0].subject, "Hello");
404 }
405
406 #[tokio::test]
407 async fn test_mark_mail_as_read() {
408 let storage = InMemoryStorage::new();
409 let service = MailServiceImpl::new(storage);
410
411 let agent1 = service.create_agent("Sender").await.unwrap();
412 let agent2 = service.create_agent("Receiver").await.unwrap();
413
414 let mail = service
415 .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
416 .await
417 .unwrap();
418
419 assert!(!mail.read);
420
421 let updated = service.mark_mail_as_read(mail.id).await.unwrap();
422 assert!(updated.read);
423 }
424
425 #[tokio::test]
426 async fn test_check_unread_mail() {
427 let storage = InMemoryStorage::new();
428 let service = MailServiceImpl::new(storage);
429
430 let agent1 = service.create_agent("Sender").await.unwrap();
431 let agent2 = service.create_agent("Receiver").await.unwrap();
432
433 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
435 assert!(!has_unread);
436 assert!(unread.is_empty());
437
438 service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
440
441 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
443 assert!(has_unread);
444 assert_eq!(unread.len(), 1);
445
446 let mail_id = unread[0].id;
448 service.mark_mail_as_read(mail_id).await.unwrap();
449
450 let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
452 assert!(!has_unread);
453 assert!(unread.is_empty());
454 }
455
456 #[tokio::test]
457 async fn test_get_recent_mail() {
458 let storage = InMemoryStorage::new();
459 let service = MailServiceImpl::new(storage);
460
461 let agent1 = service.create_agent("Sender").await.unwrap();
462 let agent2 = service.create_agent("Receiver").await.unwrap();
463
464 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
466
467 let recent = service.get_recent_mail(
469 string_to_node_id(&agent2.id),
470 24,
471 10
472 ).await.unwrap();
473
474 assert_eq!(recent.len(), 1);
475 assert_eq!(recent[0].subject, "Recent");
476 }
477
478 #[tokio::test]
479 async fn test_get_nonexistent_agent() {
480 let storage = InMemoryStorage::new();
481 let service = MailServiceImpl::new(storage);
482
483 let fake_id = "nonexistent_agent".to_string();
484 let result = service.get_agent(fake_id).await;
485
486 assert!(matches!(result, Err(MailError::AgentNotFound(_))));
487 }
488
489 #[tokio::test]
490 async fn test_get_agent_by_mailbox() {
491 let storage = InMemoryStorage::new();
492 let service = MailServiceImpl::new(storage);
493
494 let agent = service.create_agent("Test Agent").await.unwrap();
495 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
496
497 let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
499 assert_eq!(found_agent.id, agent.id);
500 assert_eq!(found_agent.name, agent.name);
501 }
502
503 #[tokio::test]
504 async fn test_list_agents() {
505 let storage = InMemoryStorage::new();
506 let service = MailServiceImpl::new(storage);
507
508 service.create_agent("Agent 1").await.unwrap();
510 service.create_agent("Agent 2").await.unwrap();
511 service.create_agent("Agent 3").await.unwrap();
512
513 let agents = service.list_agents().await.unwrap();
514 assert_eq!(agents.len(), 3);
515 }
516
517 #[tokio::test]
518 async fn test_set_agent_status() {
519 let storage = InMemoryStorage::new();
520 let service = MailServiceImpl::new(storage);
521
522 let agent = service.create_agent("Test Agent").await.unwrap();
523 assert_eq!(agent.status, "offline");
524
525 let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
526 assert_eq!(updated.status, "online");
527
528 let retrieved = service.get_agent(agent.id).await.unwrap();
530 assert_eq!(retrieved.status, "online");
531 }
532
533 #[tokio::test]
534 async fn test_multiple_mails_sorting() {
535 let storage = InMemoryStorage::new();
536 let service = MailServiceImpl::new(storage);
537
538 let agent1 = service.create_agent("Sender").await.unwrap();
539 let agent2 = service.create_agent("Receiver").await.unwrap();
540
541 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
543 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
544 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
545 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
546 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
547
548 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
549 assert_eq!(inbox.len(), 3);
550 assert_eq!(inbox[0].subject, "Third");
552 assert_eq!(inbox[1].subject, "Second");
553 assert_eq!(inbox[2].subject, "First");
554 }
555}