agent_office/services/mail/
mod.rs1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11 #[error("Mailbox not found: {0}")]
12 MailboxNotFound(MailboxId),
13
14 #[error("Agent not found: {0}")]
15 AgentNotFound(AgentId),
16
17 #[error("Mail not found: {0}")]
18 MailNotFound(uuid::Uuid),
19
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25
26 #[error("Invalid agent name: {0}")]
27 InvalidAgentName(String),
28}
29
30pub type Result<T> = std::result::Result<T, MailError>;
31
32#[async_trait]
33pub trait MailService: Send + Sync {
34 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
36 async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
37 async fn get_agent(&self, id: AgentId) -> Result<Agent>;
38 async fn list_agents(&self) -> Result<Vec<Agent>>;
39 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
40
41 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
43
44 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
46
47 async fn send_agent_to_agent(
49 &self,
50 from_agent_id: AgentId,
51 to_agent_id: AgentId,
52 subject: impl Into<String> + Send,
53 body: impl Into<String> + Send,
54 ) -> Result<Mail>;
55
56 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
58
59 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
61
62 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
64
65 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
67
68 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
70
71 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
73}
74
75pub struct MailServiceImpl<S: GraphStorage> {
76 storage: S,
77}
78
79impl<S: GraphStorage> MailServiceImpl<S> {
80 pub fn new(storage: S) -> Self {
81 Self { storage }
82 }
83
84 async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
86 let node = self.storage.get_node(mail_id).await
87 .map_err(|e| match e {
88 StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
89 _ => MailError::Storage(e),
90 })?;
91 Mail::from_node(&node)
92 .ok_or(MailError::MailNotFound(mail_id))
93 }
94}
95
96#[async_trait]
97impl<S: GraphStorage> MailService for MailServiceImpl<S> {
98 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
99 let name = name.into();
100
101 if name.chars().any(|c| c.is_uppercase() || c.is_whitespace()) {
103 return Err(MailError::InvalidAgentName(
104 format!("Agent name '{}' is invalid. Names must be lowercase with no spaces (e.g., 'my_agent', 'agent123').", name)
105 ));
106 }
107
108 let agent = Agent::new(name);
109 let node = agent.to_node();
110 self.storage.create_node(&node).await?;
111
112 Ok(agent)
113 }
114
115 async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
116 let agent = self.get_agent(agent_id.clone()).await?;
118 let agent_node_id = string_to_node_id(&agent.id);
119
120 let inbox = self.get_mailbox_inbox(agent_node_id).await?;
122 let outbox = self.get_mailbox_outbox(agent_node_id).await?;
123
124 for mail in inbox.iter().chain(outbox.iter()) {
126 let _ = self.storage.delete_node(mail.id).await;
127 }
128
129 self.storage.delete_node(agent_node_id).await?;
131
132 Ok(())
133 }
134
135 async fn get_agent(&self, id: AgentId) -> Result<Agent> {
136 let node_id = string_to_node_id(&id);
137 let id_clone = id.clone();
138 let node = self.storage.get_node(node_id).await
139 .map_err(|e| match e {
140 StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
141 _ => MailError::Storage(e),
142 })?;
143 Agent::from_node(&node)
144 .ok_or(MailError::AgentNotFound(id))
145 }
146
147 async fn list_agents(&self) -> Result<Vec<Agent>> {
148 let query = GraphQuery::new().with_node_type("agent");
149 let nodes = self.storage.query_nodes(&query).await?;
150 let agents: Vec<Agent> = nodes.iter()
151 .filter_map(Agent::from_node)
152 .collect();
153 Ok(agents)
154 }
155
156 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
157 let mut agent = self.get_agent(agent_id).await?;
158 agent.status = status.into();
159 let node = agent.to_node();
160 self.storage.update_node(&node).await?;
161 Ok(agent)
162 }
163
164 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
165 let node = self.storage.get_node(mailbox_id).await
167 .map_err(|e| match e {
168 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
169 _ => MailError::Storage(e),
170 })?;
171
172 Agent::from_node(&node)
173 .ok_or_else(|| MailError::InvalidOperation(
174 "Node exists but is not an agent".to_string()
175 ))
176 }
177
178 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
179 let agent = self.get_agent(agent_id.clone()).await?;
181 let node_id = string_to_node_id(&agent.id);
182
183 let mailbox = Mailbox {
185 id: node_id,
186 owner_id: agent.id,
187 name: "Mailbox".to_string(),
188 created_at: agent.created_at,
189 };
190
191 Ok(mailbox)
192 }
193
194 async fn send_agent_to_agent(
195 &self,
196 from_agent_id: AgentId,
197 to_agent_id: AgentId,
198 subject: impl Into<String> + Send,
199 body: impl Into<String> + Send,
200 ) -> Result<Mail> {
201 let from_agent = self.get_agent(from_agent_id).await?;
203 let to_agent = self.get_agent(to_agent_id).await?;
204
205 let from_mailbox_id = string_to_node_id(&from_agent.id);
207 let to_mailbox_id = string_to_node_id(&to_agent.id);
208
209 let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
211 let node = mail.to_node();
212
213 self.storage.create_node(&node).await?;
215
216 let from_edge = Edge::new(
218 "sent_from",
219 from_mailbox_id,
220 mail.id,
221 Properties::new(),
222 );
223 self.storage.create_edge(&from_edge).await?;
224
225 let to_edge = Edge::new(
226 "sent_to",
227 mail.id,
228 to_mailbox_id,
229 Properties::new(),
230 );
231 self.storage.create_edge(&to_edge).await?;
232
233 Ok(mail)
234 }
235
236 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
237 let _agent = self.storage.get_node(mailbox_id).await
239 .map_err(|e| match e {
240 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
241 _ => MailError::Storage(e),
242 })?;
243
244 let incoming_edges = self.storage
246 .get_edges_to(mailbox_id, Some("sent_to"))
247 .await?;
248
249 let mut mails = Vec::new();
250 for edge in incoming_edges {
251 if let Ok(mail) = self.get_mail(edge.from_node_id).await {
252 mails.push(mail);
253 }
254 }
255
256 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
258
259 Ok(mails)
260 }
261
262 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
263 let _agent = self.storage.get_node(mailbox_id).await
265 .map_err(|e| match e {
266 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
267 _ => MailError::Storage(e),
268 })?;
269
270 let outgoing_edges = self.storage
272 .get_edges_from(mailbox_id, Some("sent_from"))
273 .await?;
274
275 let mut mails = Vec::new();
276 for edge in outgoing_edges {
277 if let Ok(mail) = self.get_mail(edge.to_node_id).await {
278 mails.push(mail);
279 }
280 }
281
282 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
284
285 Ok(mails)
286 }
287
288 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
289 let since = chrono::Utc::now() - chrono::Duration::hours(hours);
290
291 let inbox = self.get_mailbox_inbox(mailbox_id).await?;
293
294 let recent: Vec<Mail> = inbox.into_iter()
296 .filter(|mail| mail.created_at >= since)
297 .take(limit)
298 .collect();
299
300 Ok(recent)
301 }
302
303 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
304 let mut mail = self.get_mail(mail_id).await?;
305 mail.mark_as_read();
306
307 let node = mail.to_node();
308 self.storage.update_node(&node).await?;
309
310 Ok(mail)
311 }
312
313 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
314 let query = GraphQuery::new().with_node_type("mail");
316 let nodes = self.storage.query_nodes(&query).await?;
317
318 let short_id_lower = short_id.to_lowercase();
320 let matching: Vec<_> = nodes.iter()
321 .filter_map(Mail::from_node)
322 .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
323 .collect();
324
325 match matching.len() {
326 0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
327 1 => {
328 let mail_id = matching[0].id;
329 self.mark_mail_as_read(mail_id).await
330 }
331 _ => Err(MailError::InvalidOperation(
332 format!("Multiple mails match short ID '{}', please use full ID", short_id)
333 )),
334 }
335 }
336
337 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
338 let mailbox = self.get_agent_mailbox(agent_id).await?;
340
341 let inbox = self.get_mailbox_inbox(mailbox.id).await?;
343 let unread: Vec<Mail> = inbox.into_iter()
344 .filter(|mail| !mail.read)
345 .collect();
346
347 let has_unread = !unread.is_empty();
348
349 Ok((has_unread, unread))
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356 use crate::storage::memory::InMemoryStorage;
357
358 #[tokio::test]
359 async fn test_create_agent() {
360 let storage = InMemoryStorage::new();
361 let service = MailServiceImpl::new(storage);
362
363 let agent = service.create_agent("test_agent").await.unwrap();
364 assert_eq!(agent.name, "test_agent");
365 }
366
367 #[tokio::test]
368 async fn test_create_agent_uppercase_fails() {
369 let storage = InMemoryStorage::new();
370 let service = MailServiceImpl::new(storage);
371
372 let result = service.create_agent("TestAgent").await;
373 assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
374 }
375
376 #[tokio::test]
377 async fn test_create_agent_spaces_fails() {
378 let storage = InMemoryStorage::new();
379 let service = MailServiceImpl::new(storage);
380
381 let result = service.create_agent("test agent").await;
382 assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
383 }
384
385 #[tokio::test]
386 async fn test_create_agent_auto_creates_mailbox() {
387 let storage = InMemoryStorage::new();
388 let service = MailServiceImpl::new(storage);
389
390 let agent = service.create_agent("agent").await.unwrap();
391
392 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
394 assert_eq!(mailbox.owner_id, agent.id);
395
396 let expected_mailbox_id = string_to_node_id(&agent.id);
398 assert_eq!(mailbox.id, expected_mailbox_id);
399 }
400
401 #[tokio::test]
402 async fn test_send_and_receive_mail() {
403 let storage = InMemoryStorage::new();
404 let service = MailServiceImpl::new(storage);
405
406 let agent1 = service.create_agent("sender").await.unwrap();
408 let agent2 = service.create_agent("receiver").await.unwrap();
409
410 let mail = service
412 .send_agent_to_agent(
413 agent1.id.clone(),
414 agent2.id.clone(),
415 "Hello",
416 "This is a test message",
417 )
418 .await
419 .unwrap();
420
421 assert_eq!(mail.subject, "Hello");
422 assert_eq!(mail.body, "This is a test message");
423 assert!(!mail.read);
424
425 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
427 assert_eq!(inbox.len(), 1);
428 assert_eq!(inbox[0].subject, "Hello");
429
430 let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
432 assert_eq!(outbox.len(), 1);
433 assert_eq!(outbox[0].subject, "Hello");
434 }
435
436 #[tokio::test]
437 async fn test_mark_mail_as_read() {
438 let storage = InMemoryStorage::new();
439 let service = MailServiceImpl::new(storage);
440
441 let agent1 = service.create_agent("sender").await.unwrap();
442 let agent2 = service.create_agent("receiver").await.unwrap();
443
444 let mail = service
445 .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
446 .await
447 .unwrap();
448
449 assert!(!mail.read);
450
451 let updated = service.mark_mail_as_read(mail.id).await.unwrap();
452 assert!(updated.read);
453 }
454
455 #[tokio::test]
456 async fn test_check_unread_mail() {
457 let storage = InMemoryStorage::new();
458 let service = MailServiceImpl::new(storage);
459
460 let agent1 = service.create_agent("sender").await.unwrap();
461 let agent2 = service.create_agent("receiver").await.unwrap();
462
463 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
465 assert!(!has_unread);
466 assert!(unread.is_empty());
467
468 service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
470
471 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
473 assert!(has_unread);
474 assert_eq!(unread.len(), 1);
475
476 let mail_id = unread[0].id;
478 service.mark_mail_as_read(mail_id).await.unwrap();
479
480 let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
482 assert!(!has_unread);
483 assert!(unread.is_empty());
484 }
485
486 #[tokio::test]
487 async fn test_get_recent_mail() {
488 let storage = InMemoryStorage::new();
489 let service = MailServiceImpl::new(storage);
490
491 let agent1 = service.create_agent("sender").await.unwrap();
492 let agent2 = service.create_agent("receiver").await.unwrap();
493
494 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
496
497 let recent = service.get_recent_mail(
499 string_to_node_id(&agent2.id),
500 24,
501 10
502 ).await.unwrap();
503
504 assert_eq!(recent.len(), 1);
505 assert_eq!(recent[0].subject, "Recent");
506 }
507
508 #[tokio::test]
509 async fn test_get_nonexistent_agent() {
510 let storage = InMemoryStorage::new();
511 let service = MailServiceImpl::new(storage);
512
513 let fake_id = "nonexistent_agent".to_string();
514 let result = service.get_agent(fake_id).await;
515
516 assert!(matches!(result, Err(MailError::AgentNotFound(_))));
517 }
518
519 #[tokio::test]
520 async fn test_get_agent_by_mailbox() {
521 let storage = InMemoryStorage::new();
522 let service = MailServiceImpl::new(storage);
523
524 let agent = service.create_agent("test_agent").await.unwrap();
525 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
526
527 let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
529 assert_eq!(found_agent.id, agent.id);
530 assert_eq!(found_agent.name, agent.name);
531 }
532
533 #[tokio::test]
534 async fn test_list_agents() {
535 let storage = InMemoryStorage::new();
536 let service = MailServiceImpl::new(storage);
537
538 service.create_agent("agent_1").await.unwrap();
540 service.create_agent("agent_2").await.unwrap();
541 service.create_agent("agent_3").await.unwrap();
542
543 let agents = service.list_agents().await.unwrap();
544 assert_eq!(agents.len(), 3);
545 }
546
547 #[tokio::test]
548 async fn test_set_agent_status() {
549 let storage = InMemoryStorage::new();
550 let service = MailServiceImpl::new(storage);
551
552 let agent = service.create_agent("test_agent").await.unwrap();
553 assert_eq!(agent.status, "offline");
554
555 let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
556 assert_eq!(updated.status, "online");
557
558 let retrieved = service.get_agent(agent.id).await.unwrap();
560 assert_eq!(retrieved.status, "online");
561 }
562
563 #[tokio::test]
564 async fn test_multiple_mails_sorting() {
565 let storage = InMemoryStorage::new();
566 let service = MailServiceImpl::new(storage);
567
568 let agent1 = service.create_agent("sender").await.unwrap();
569 let agent2 = service.create_agent("receiver").await.unwrap();
570
571 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
573 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
574 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
575 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
576 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
577
578 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
579 assert_eq!(inbox.len(), 3);
580 assert_eq!(inbox[0].subject, "Third");
582 assert_eq!(inbox[1].subject, "Second");
583 assert_eq!(inbox[2].subject, "First");
584 }
585}