agent_office/services/mail/
mod.rs1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11 #[error("Mailbox not found: {0}")]
12 MailboxNotFound(MailboxId),
13
14 #[error("Agent not found: {0}")]
15 AgentNotFound(AgentId),
16
17 #[error("Mail not found: {0}")]
18 MailNotFound(uuid::Uuid),
19
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25
26 #[error("Invalid agent name: {0}")]
27 InvalidAgentName(String),
28}
29
30pub type Result<T> = std::result::Result<T, MailError>;
31
32#[async_trait]
33pub trait MailService: Send + Sync {
34 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
36 async fn delete_agent(&self, agent_id: AgentId) -> Result<()>;
37 async fn get_agent(&self, id: AgentId) -> Result<Agent>;
38 async fn list_agents(&self) -> Result<Vec<Agent>>;
39 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
40 async fn set_agent_session(&self, agent_id: AgentId, session_id: Option<String>) -> Result<Agent>;
41
42 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
44
45 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
47
48 async fn send_agent_to_agent(
50 &self,
51 from_agent_id: AgentId,
52 to_agent_id: AgentId,
53 subject: impl Into<String> + Send,
54 body: impl Into<String> + Send,
55 ) -> Result<Mail>;
56
57 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
59
60 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
62
63 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
65
66 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
68
69 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail>;
71
72 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
74}
75
76pub struct MailServiceImpl<S: GraphStorage> {
77 storage: S,
78}
79
80impl<S: GraphStorage> MailServiceImpl<S> {
81 pub fn new(storage: S) -> Self {
82 Self { storage }
83 }
84
85 async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
87 let node = self.storage.get_node(mail_id).await
88 .map_err(|e| match e {
89 StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
90 _ => MailError::Storage(e),
91 })?;
92 Mail::from_node(&node)
93 .ok_or(MailError::MailNotFound(mail_id))
94 }
95}
96
97#[async_trait]
98impl<S: GraphStorage> MailService for MailServiceImpl<S> {
99 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
100 let name = name.into();
101
102 if name.chars().any(|c| c.is_uppercase() || c.is_whitespace()) {
104 return Err(MailError::InvalidAgentName(
105 format!("Agent name '{}' is invalid. Names must be lowercase with no spaces (e.g., 'my_agent', 'agent123').", name)
106 ));
107 }
108
109 let agent = Agent::new(name);
110 let node = agent.to_node();
111 self.storage.create_node(&node).await?;
112
113 Ok(agent)
114 }
115
116 async fn delete_agent(&self, agent_id: AgentId) -> Result<()> {
117 let agent = self.get_agent(agent_id.clone()).await?;
119 let agent_node_id = string_to_node_id(&agent.id);
120
121 let inbox = self.get_mailbox_inbox(agent_node_id).await?;
123 let outbox = self.get_mailbox_outbox(agent_node_id).await?;
124
125 for mail in inbox.iter().chain(outbox.iter()) {
127 let _ = self.storage.delete_node(mail.id).await;
128 }
129
130 self.storage.delete_node(agent_node_id).await?;
132
133 Ok(())
134 }
135
136 async fn get_agent(&self, id: AgentId) -> Result<Agent> {
137 let node_id = string_to_node_id(&id);
138 let id_clone = id.clone();
139 let node = self.storage.get_node(node_id).await
140 .map_err(|e| match e {
141 StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
142 _ => MailError::Storage(e),
143 })?;
144 Agent::from_node(&node)
145 .ok_or(MailError::AgentNotFound(id))
146 }
147
148 async fn list_agents(&self) -> Result<Vec<Agent>> {
149 let query = GraphQuery::new().with_node_type("agent");
150 let nodes = self.storage.query_nodes(&query).await?;
151 let agents: Vec<Agent> = nodes.iter()
152 .filter_map(Agent::from_node)
153 .collect();
154 Ok(agents)
155 }
156
157 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
158 let mut agent = self.get_agent(agent_id).await?;
159 agent.status = status.into();
160 let node = agent.to_node();
161 self.storage.update_node(&node).await?;
162 Ok(agent)
163 }
164
165 async fn set_agent_session(&self, agent_id: AgentId, session_id: Option<String>) -> Result<Agent> {
166 let mut agent = self.get_agent(agent_id).await?;
167 agent.session_id = session_id;
168 let node = agent.to_node();
169 self.storage.update_node(&node).await?;
170 Ok(agent)
171 }
172
173 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
174 let node = self.storage.get_node(mailbox_id).await
176 .map_err(|e| match e {
177 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
178 _ => MailError::Storage(e),
179 })?;
180
181 Agent::from_node(&node)
182 .ok_or_else(|| MailError::InvalidOperation(
183 "Node exists but is not an agent".to_string()
184 ))
185 }
186
187 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
188 let agent = self.get_agent(agent_id.clone()).await?;
190 let node_id = string_to_node_id(&agent.id);
191
192 let mailbox = Mailbox {
194 id: node_id,
195 owner_id: agent.id,
196 name: "Mailbox".to_string(),
197 created_at: agent.created_at,
198 };
199
200 Ok(mailbox)
201 }
202
203 async fn send_agent_to_agent(
204 &self,
205 from_agent_id: AgentId,
206 to_agent_id: AgentId,
207 subject: impl Into<String> + Send,
208 body: impl Into<String> + Send,
209 ) -> Result<Mail> {
210 let from_agent = self.get_agent(from_agent_id).await?;
212 let to_agent = self.get_agent(to_agent_id).await?;
213
214 let from_mailbox_id = string_to_node_id(&from_agent.id);
216 let to_mailbox_id = string_to_node_id(&to_agent.id);
217
218 let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
220 let node = mail.to_node();
221
222 self.storage.create_node(&node).await?;
224
225 let from_edge = Edge::new(
227 "sent_from",
228 from_mailbox_id,
229 mail.id,
230 Properties::new(),
231 );
232 self.storage.create_edge(&from_edge).await?;
233
234 let to_edge = Edge::new(
235 "sent_to",
236 mail.id,
237 to_mailbox_id,
238 Properties::new(),
239 );
240 self.storage.create_edge(&to_edge).await?;
241
242 Ok(mail)
243 }
244
245 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
246 let _agent = self.storage.get_node(mailbox_id).await
248 .map_err(|e| match e {
249 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
250 _ => MailError::Storage(e),
251 })?;
252
253 let incoming_edges = self.storage
255 .get_edges_to(mailbox_id, Some("sent_to"))
256 .await?;
257
258 let mut mails = Vec::new();
259 for edge in incoming_edges {
260 if let Ok(mail) = self.get_mail(edge.from_node_id).await {
261 mails.push(mail);
262 }
263 }
264
265 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
267
268 Ok(mails)
269 }
270
271 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
272 let _agent = self.storage.get_node(mailbox_id).await
274 .map_err(|e| match e {
275 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
276 _ => MailError::Storage(e),
277 })?;
278
279 let outgoing_edges = self.storage
281 .get_edges_from(mailbox_id, Some("sent_from"))
282 .await?;
283
284 let mut mails = Vec::new();
285 for edge in outgoing_edges {
286 if let Ok(mail) = self.get_mail(edge.to_node_id).await {
287 mails.push(mail);
288 }
289 }
290
291 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
293
294 Ok(mails)
295 }
296
297 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
298 let since = chrono::Utc::now() - chrono::Duration::hours(hours);
299
300 let inbox = self.get_mailbox_inbox(mailbox_id).await?;
302
303 let recent: Vec<Mail> = inbox.into_iter()
305 .filter(|mail| mail.created_at >= since)
306 .take(limit)
307 .collect();
308
309 Ok(recent)
310 }
311
312 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
313 let mut mail = self.get_mail(mail_id).await?;
314 mail.mark_as_read();
315
316 let node = mail.to_node();
317 self.storage.update_node(&node).await?;
318
319 Ok(mail)
320 }
321
322 async fn mark_mail_as_read_by_short_id(&self, short_id: &str) -> Result<Mail> {
323 let query = GraphQuery::new().with_node_type("mail");
325 let nodes = self.storage.query_nodes(&query).await?;
326
327 let short_id_lower = short_id.to_lowercase();
329 let matching: Vec<_> = nodes.iter()
330 .filter_map(Mail::from_node)
331 .filter(|m| m.id.to_string().to_lowercase().starts_with(&short_id_lower))
332 .collect();
333
334 match matching.len() {
335 0 => Err(MailError::MailNotFound(uuid::Uuid::nil())),
336 1 => {
337 let mail_id = matching[0].id;
338 self.mark_mail_as_read(mail_id).await
339 }
340 _ => Err(MailError::InvalidOperation(
341 format!("Multiple mails match short ID '{}', please use full ID", short_id)
342 )),
343 }
344 }
345
346 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
347 let mailbox = self.get_agent_mailbox(agent_id).await?;
349
350 let inbox = self.get_mailbox_inbox(mailbox.id).await?;
352 let unread: Vec<Mail> = inbox.into_iter()
353 .filter(|mail| !mail.read)
354 .collect();
355
356 let has_unread = !unread.is_empty();
357
358 Ok((has_unread, unread))
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365 use crate::storage::memory::InMemoryStorage;
366
367 #[tokio::test]
368 async fn test_create_agent() {
369 let storage = InMemoryStorage::new();
370 let service = MailServiceImpl::new(storage);
371
372 let agent = service.create_agent("test_agent").await.unwrap();
373 assert_eq!(agent.name, "test_agent");
374 }
375
376 #[tokio::test]
377 async fn test_create_agent_uppercase_fails() {
378 let storage = InMemoryStorage::new();
379 let service = MailServiceImpl::new(storage);
380
381 let result = service.create_agent("TestAgent").await;
382 assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
383 }
384
385 #[tokio::test]
386 async fn test_create_agent_spaces_fails() {
387 let storage = InMemoryStorage::new();
388 let service = MailServiceImpl::new(storage);
389
390 let result = service.create_agent("test agent").await;
391 assert!(matches!(result, Err(MailError::InvalidAgentName(_))));
392 }
393
394 #[tokio::test]
395 async fn test_create_agent_auto_creates_mailbox() {
396 let storage = InMemoryStorage::new();
397 let service = MailServiceImpl::new(storage);
398
399 let agent = service.create_agent("agent").await.unwrap();
400
401 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
403 assert_eq!(mailbox.owner_id, agent.id);
404
405 let expected_mailbox_id = string_to_node_id(&agent.id);
407 assert_eq!(mailbox.id, expected_mailbox_id);
408 }
409
410 #[tokio::test]
411 async fn test_send_and_receive_mail() {
412 let storage = InMemoryStorage::new();
413 let service = MailServiceImpl::new(storage);
414
415 let agent1 = service.create_agent("sender").await.unwrap();
417 let agent2 = service.create_agent("receiver").await.unwrap();
418
419 let mail = service
421 .send_agent_to_agent(
422 agent1.id.clone(),
423 agent2.id.clone(),
424 "Hello",
425 "This is a test message",
426 )
427 .await
428 .unwrap();
429
430 assert_eq!(mail.subject, "Hello");
431 assert_eq!(mail.body, "This is a test message");
432 assert!(!mail.read);
433
434 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
436 assert_eq!(inbox.len(), 1);
437 assert_eq!(inbox[0].subject, "Hello");
438
439 let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
441 assert_eq!(outbox.len(), 1);
442 assert_eq!(outbox[0].subject, "Hello");
443 }
444
445 #[tokio::test]
446 async fn test_mark_mail_as_read() {
447 let storage = InMemoryStorage::new();
448 let service = MailServiceImpl::new(storage);
449
450 let agent1 = service.create_agent("sender").await.unwrap();
451 let agent2 = service.create_agent("receiver").await.unwrap();
452
453 let mail = service
454 .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
455 .await
456 .unwrap();
457
458 assert!(!mail.read);
459
460 let updated = service.mark_mail_as_read(mail.id).await.unwrap();
461 assert!(updated.read);
462 }
463
464 #[tokio::test]
465 async fn test_check_unread_mail() {
466 let storage = InMemoryStorage::new();
467 let service = MailServiceImpl::new(storage);
468
469 let agent1 = service.create_agent("sender").await.unwrap();
470 let agent2 = service.create_agent("receiver").await.unwrap();
471
472 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
474 assert!(!has_unread);
475 assert!(unread.is_empty());
476
477 service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
479
480 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
482 assert!(has_unread);
483 assert_eq!(unread.len(), 1);
484
485 let mail_id = unread[0].id;
487 service.mark_mail_as_read(mail_id).await.unwrap();
488
489 let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
491 assert!(!has_unread);
492 assert!(unread.is_empty());
493 }
494
495 #[tokio::test]
496 async fn test_get_recent_mail() {
497 let storage = InMemoryStorage::new();
498 let service = MailServiceImpl::new(storage);
499
500 let agent1 = service.create_agent("sender").await.unwrap();
501 let agent2 = service.create_agent("receiver").await.unwrap();
502
503 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
505
506 let recent = service.get_recent_mail(
508 string_to_node_id(&agent2.id),
509 24,
510 10
511 ).await.unwrap();
512
513 assert_eq!(recent.len(), 1);
514 assert_eq!(recent[0].subject, "Recent");
515 }
516
517 #[tokio::test]
518 async fn test_get_nonexistent_agent() {
519 let storage = InMemoryStorage::new();
520 let service = MailServiceImpl::new(storage);
521
522 let fake_id = "nonexistent_agent".to_string();
523 let result = service.get_agent(fake_id).await;
524
525 assert!(matches!(result, Err(MailError::AgentNotFound(_))));
526 }
527
528 #[tokio::test]
529 async fn test_get_agent_by_mailbox() {
530 let storage = InMemoryStorage::new();
531 let service = MailServiceImpl::new(storage);
532
533 let agent = service.create_agent("test_agent").await.unwrap();
534 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
535
536 let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
538 assert_eq!(found_agent.id, agent.id);
539 assert_eq!(found_agent.name, agent.name);
540 }
541
542 #[tokio::test]
543 async fn test_list_agents() {
544 let storage = InMemoryStorage::new();
545 let service = MailServiceImpl::new(storage);
546
547 service.create_agent("agent_1").await.unwrap();
549 service.create_agent("agent_2").await.unwrap();
550 service.create_agent("agent_3").await.unwrap();
551
552 let agents = service.list_agents().await.unwrap();
553 assert_eq!(agents.len(), 3);
554 }
555
556 #[tokio::test]
557 async fn test_set_agent_status() {
558 let storage = InMemoryStorage::new();
559 let service = MailServiceImpl::new(storage);
560
561 let agent = service.create_agent("test_agent").await.unwrap();
562 assert_eq!(agent.status, "offline");
563
564 let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
565 assert_eq!(updated.status, "online");
566
567 let retrieved = service.get_agent(agent.id).await.unwrap();
569 assert_eq!(retrieved.status, "online");
570 }
571
572 #[tokio::test]
573 async fn test_multiple_mails_sorting() {
574 let storage = InMemoryStorage::new();
575 let service = MailServiceImpl::new(storage);
576
577 let agent1 = service.create_agent("sender").await.unwrap();
578 let agent2 = service.create_agent("receiver").await.unwrap();
579
580 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
582 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
583 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
584 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
585 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
586
587 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
588 assert_eq!(inbox.len(), 3);
589 assert_eq!(inbox[0].subject, "Third");
591 assert_eq!(inbox[1].subject, "Second");
592 assert_eq!(inbox[2].subject, "First");
593 }
594}