agent_office/services/mail/
mod.rs1use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
2use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
3use crate::storage::{GraphStorage, StorageError};
4use async_trait::async_trait;
5use thiserror::Error;
6
7pub mod domain;
8
9#[derive(Error, Debug)]
10pub enum MailError {
11 #[error("Mailbox not found: {0}")]
12 MailboxNotFound(MailboxId),
13
14 #[error("Agent not found: {0}")]
15 AgentNotFound(AgentId),
16
17 #[error("Mail not found: {0}")]
18 MailNotFound(uuid::Uuid),
19
20 #[error("Storage error: {0}")]
21 Storage(#[from] StorageError),
22
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25}
26
27pub type Result<T> = std::result::Result<T, MailError>;
28
29#[async_trait]
30pub trait MailService: Send + Sync {
31 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
33 async fn get_agent(&self, id: AgentId) -> Result<Agent>;
34 async fn list_agents(&self) -> Result<Vec<Agent>>;
35 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
36
37 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
39
40 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
42
43 async fn send_agent_to_agent(
45 &self,
46 from_agent_id: AgentId,
47 to_agent_id: AgentId,
48 subject: impl Into<String> + Send,
49 body: impl Into<String> + Send,
50 ) -> Result<Mail>;
51
52 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
54
55 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
57
58 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
60
61 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
63
64 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
66}
67
68pub struct MailServiceImpl<S: GraphStorage> {
69 storage: S,
70}
71
72impl<S: GraphStorage> MailServiceImpl<S> {
73 pub fn new(storage: S) -> Self {
74 Self { storage }
75 }
76
77 async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
79 let node = self.storage.get_node(mail_id).await
80 .map_err(|e| match e {
81 StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
82 _ => MailError::Storage(e),
83 })?;
84 Mail::from_node(&node)
85 .ok_or(MailError::MailNotFound(mail_id))
86 }
87}
88
89#[async_trait]
90impl<S: GraphStorage> MailService for MailServiceImpl<S> {
91 async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
92 let agent = Agent::new(name);
93 let node = agent.to_node();
94 self.storage.create_node(&node).await?;
95
96 Ok(agent)
97 }
98
99 async fn get_agent(&self, id: AgentId) -> Result<Agent> {
100 let node_id = string_to_node_id(&id);
101 let id_clone = id.clone();
102 let node = self.storage.get_node(node_id).await
103 .map_err(|e| match e {
104 StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
105 _ => MailError::Storage(e),
106 })?;
107 Agent::from_node(&node)
108 .ok_or(MailError::AgentNotFound(id))
109 }
110
111 async fn list_agents(&self) -> Result<Vec<Agent>> {
112 let query = GraphQuery::new().with_node_type("agent");
113 let nodes = self.storage.query_nodes(&query).await?;
114 let agents: Vec<Agent> = nodes.iter()
115 .filter_map(Agent::from_node)
116 .collect();
117 Ok(agents)
118 }
119
120 async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
121 let mut agent = self.get_agent(agent_id).await?;
122 agent.status = status.into();
123 let node = agent.to_node();
124 self.storage.update_node(&node).await?;
125 Ok(agent)
126 }
127
128 async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
129 let node = self.storage.get_node(mailbox_id).await
131 .map_err(|e| match e {
132 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
133 _ => MailError::Storage(e),
134 })?;
135
136 Agent::from_node(&node)
137 .ok_or_else(|| MailError::InvalidOperation(
138 "Node exists but is not an agent".to_string()
139 ))
140 }
141
142 async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
143 let agent = self.get_agent(agent_id.clone()).await?;
145 let node_id = string_to_node_id(&agent.id);
146
147 let mailbox = Mailbox {
149 id: node_id,
150 owner_id: agent.id,
151 name: "Mailbox".to_string(),
152 created_at: agent.created_at,
153 };
154
155 Ok(mailbox)
156 }
157
158 async fn send_agent_to_agent(
159 &self,
160 from_agent_id: AgentId,
161 to_agent_id: AgentId,
162 subject: impl Into<String> + Send,
163 body: impl Into<String> + Send,
164 ) -> Result<Mail> {
165 let from_agent = self.get_agent(from_agent_id).await?;
167 let to_agent = self.get_agent(to_agent_id).await?;
168
169 let from_mailbox_id = string_to_node_id(&from_agent.id);
171 let to_mailbox_id = string_to_node_id(&to_agent.id);
172
173 let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
175 let node = mail.to_node();
176
177 self.storage.create_node(&node).await?;
179
180 let from_edge = Edge::new(
182 "sent_from",
183 from_mailbox_id,
184 mail.id,
185 Properties::new(),
186 );
187 self.storage.create_edge(&from_edge).await?;
188
189 let to_edge = Edge::new(
190 "sent_to",
191 mail.id,
192 to_mailbox_id,
193 Properties::new(),
194 );
195 self.storage.create_edge(&to_edge).await?;
196
197 Ok(mail)
198 }
199
200 async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
201 let _agent = self.storage.get_node(mailbox_id).await
203 .map_err(|e| match e {
204 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
205 _ => MailError::Storage(e),
206 })?;
207
208 let incoming_edges = self.storage
210 .get_edges_to(mailbox_id, Some("sent_to"))
211 .await?;
212
213 let mut mails = Vec::new();
214 for edge in incoming_edges {
215 if let Ok(mail) = self.get_mail(edge.from_node_id).await {
216 mails.push(mail);
217 }
218 }
219
220 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
222
223 Ok(mails)
224 }
225
226 async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
227 let _agent = self.storage.get_node(mailbox_id).await
229 .map_err(|e| match e {
230 StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
231 _ => MailError::Storage(e),
232 })?;
233
234 let outgoing_edges = self.storage
236 .get_edges_from(mailbox_id, Some("sent_from"))
237 .await?;
238
239 let mut mails = Vec::new();
240 for edge in outgoing_edges {
241 if let Ok(mail) = self.get_mail(edge.to_node_id).await {
242 mails.push(mail);
243 }
244 }
245
246 mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
248
249 Ok(mails)
250 }
251
252 async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
253 let since = chrono::Utc::now() - chrono::Duration::hours(hours);
254
255 let inbox = self.get_mailbox_inbox(mailbox_id).await?;
257
258 let recent: Vec<Mail> = inbox.into_iter()
260 .filter(|mail| mail.created_at >= since)
261 .take(limit)
262 .collect();
263
264 Ok(recent)
265 }
266
267 async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
268 let mut mail = self.get_mail(mail_id).await?;
269 mail.mark_as_read();
270
271 let node = mail.to_node();
272 self.storage.update_node(&node).await?;
273
274 Ok(mail)
275 }
276
277 async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
278 let mailbox = self.get_agent_mailbox(agent_id).await?;
280
281 let inbox = self.get_mailbox_inbox(mailbox.id).await?;
283 let unread: Vec<Mail> = inbox.into_iter()
284 .filter(|mail| !mail.read)
285 .collect();
286
287 let has_unread = !unread.is_empty();
288
289 Ok((has_unread, unread))
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use crate::storage::memory::InMemoryStorage;
297
298 #[tokio::test]
299 async fn test_create_agent() {
300 let storage = InMemoryStorage::new();
301 let service = MailServiceImpl::new(storage);
302
303 let agent = service.create_agent("Test Agent").await.unwrap();
304 assert_eq!(agent.name, "Test Agent");
305 }
306
307 #[tokio::test]
308 async fn test_create_agent_auto_creates_mailbox() {
309 let storage = InMemoryStorage::new();
310 let service = MailServiceImpl::new(storage);
311
312 let agent = service.create_agent("Agent").await.unwrap();
313
314 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
316 assert_eq!(mailbox.owner_id, agent.id);
317
318 let expected_mailbox_id = string_to_node_id(&agent.id);
320 assert_eq!(mailbox.id, expected_mailbox_id);
321 }
322
323 #[tokio::test]
324 async fn test_send_and_receive_mail() {
325 let storage = InMemoryStorage::new();
326 let service = MailServiceImpl::new(storage);
327
328 let agent1 = service.create_agent("Sender").await.unwrap();
330 let agent2 = service.create_agent("Receiver").await.unwrap();
331
332 let mail = service
334 .send_agent_to_agent(
335 agent1.id.clone(),
336 agent2.id.clone(),
337 "Hello",
338 "This is a test message",
339 )
340 .await
341 .unwrap();
342
343 assert_eq!(mail.subject, "Hello");
344 assert_eq!(mail.body, "This is a test message");
345 assert!(!mail.read);
346
347 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
349 assert_eq!(inbox.len(), 1);
350 assert_eq!(inbox[0].subject, "Hello");
351
352 let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
354 assert_eq!(outbox.len(), 1);
355 assert_eq!(outbox[0].subject, "Hello");
356 }
357
358 #[tokio::test]
359 async fn test_mark_mail_as_read() {
360 let storage = InMemoryStorage::new();
361 let service = MailServiceImpl::new(storage);
362
363 let agent1 = service.create_agent("Sender").await.unwrap();
364 let agent2 = service.create_agent("Receiver").await.unwrap();
365
366 let mail = service
367 .send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
368 .await
369 .unwrap();
370
371 assert!(!mail.read);
372
373 let updated = service.mark_mail_as_read(mail.id).await.unwrap();
374 assert!(updated.read);
375 }
376
377 #[tokio::test]
378 async fn test_check_unread_mail() {
379 let storage = InMemoryStorage::new();
380 let service = MailServiceImpl::new(storage);
381
382 let agent1 = service.create_agent("Sender").await.unwrap();
383 let agent2 = service.create_agent("Receiver").await.unwrap();
384
385 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
387 assert!(!has_unread);
388 assert!(unread.is_empty());
389
390 service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
392
393 let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
395 assert!(has_unread);
396 assert_eq!(unread.len(), 1);
397
398 let mail_id = unread[0].id;
400 service.mark_mail_as_read(mail_id).await.unwrap();
401
402 let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
404 assert!(!has_unread);
405 assert!(unread.is_empty());
406 }
407
408 #[tokio::test]
409 async fn test_get_recent_mail() {
410 let storage = InMemoryStorage::new();
411 let service = MailServiceImpl::new(storage);
412
413 let agent1 = service.create_agent("Sender").await.unwrap();
414 let agent2 = service.create_agent("Receiver").await.unwrap();
415
416 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
418
419 let recent = service.get_recent_mail(
421 string_to_node_id(&agent2.id),
422 24,
423 10
424 ).await.unwrap();
425
426 assert_eq!(recent.len(), 1);
427 assert_eq!(recent[0].subject, "Recent");
428 }
429
430 #[tokio::test]
431 async fn test_get_nonexistent_agent() {
432 let storage = InMemoryStorage::new();
433 let service = MailServiceImpl::new(storage);
434
435 let fake_id = "nonexistent_agent".to_string();
436 let result = service.get_agent(fake_id).await;
437
438 assert!(matches!(result, Err(MailError::AgentNotFound(_))));
439 }
440
441 #[tokio::test]
442 async fn test_get_agent_by_mailbox() {
443 let storage = InMemoryStorage::new();
444 let service = MailServiceImpl::new(storage);
445
446 let agent = service.create_agent("Test Agent").await.unwrap();
447 let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
448
449 let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
451 assert_eq!(found_agent.id, agent.id);
452 assert_eq!(found_agent.name, agent.name);
453 }
454
455 #[tokio::test]
456 async fn test_list_agents() {
457 let storage = InMemoryStorage::new();
458 let service = MailServiceImpl::new(storage);
459
460 service.create_agent("Agent 1").await.unwrap();
462 service.create_agent("Agent 2").await.unwrap();
463 service.create_agent("Agent 3").await.unwrap();
464
465 let agents = service.list_agents().await.unwrap();
466 assert_eq!(agents.len(), 3);
467 }
468
469 #[tokio::test]
470 async fn test_set_agent_status() {
471 let storage = InMemoryStorage::new();
472 let service = MailServiceImpl::new(storage);
473
474 let agent = service.create_agent("Test Agent").await.unwrap();
475 assert_eq!(agent.status, "offline");
476
477 let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
478 assert_eq!(updated.status, "online");
479
480 let retrieved = service.get_agent(agent.id).await.unwrap();
482 assert_eq!(retrieved.status, "online");
483 }
484
485 #[tokio::test]
486 async fn test_multiple_mails_sorting() {
487 let storage = InMemoryStorage::new();
488 let service = MailServiceImpl::new(storage);
489
490 let agent1 = service.create_agent("Sender").await.unwrap();
491 let agent2 = service.create_agent("Receiver").await.unwrap();
492
493 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
495 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
496 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
497 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
498 service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
499
500 let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
501 assert_eq!(inbox.len(), 3);
502 assert_eq!(inbox[0].subject, "Third");
504 assert_eq!(inbox[1].subject, "Second");
505 assert_eq!(inbox[2].subject, "First");
506 }
507}