use crate::domain::{Edge, GraphQuery, Properties, string_to_node_id};
use crate::services::mail::domain::{Agent, AgentId, Mail, Mailbox, MailboxId};
use crate::storage::{GraphStorage, StorageError};
use async_trait::async_trait;
use thiserror::Error;
pub mod domain;
#[derive(Error, Debug)]
pub enum MailError {
#[error("Mailbox not found: {0}")]
MailboxNotFound(MailboxId),
#[error("Agent not found: {0}")]
AgentNotFound(AgentId),
#[error("Mail not found: {0}")]
MailNotFound(uuid::Uuid),
#[error("Storage error: {0}")]
Storage(#[from] StorageError),
#[error("Invalid operation: {0}")]
InvalidOperation(String),
}
pub type Result<T> = std::result::Result<T, MailError>;
#[async_trait]
pub trait MailService: Send + Sync {
async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent>;
async fn get_agent(&self, id: AgentId) -> Result<Agent>;
async fn list_agents(&self) -> Result<Vec<Agent>>;
async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent>;
async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent>;
async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox>;
async fn send_agent_to_agent(
&self,
from_agent_id: AgentId,
to_agent_id: AgentId,
subject: impl Into<String> + Send,
body: impl Into<String> + Send,
) -> Result<Mail>;
async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>>;
async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>>;
async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail>;
async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)>;
}
pub struct MailServiceImpl<S: GraphStorage> {
storage: S,
}
impl<S: GraphStorage> MailServiceImpl<S> {
pub fn new(storage: S) -> Self {
Self { storage }
}
async fn get_mail(&self, mail_id: uuid::Uuid) -> Result<Mail> {
let node = self.storage.get_node(mail_id).await
.map_err(|e| match e {
StorageError::NodeNotFound(_) => MailError::MailNotFound(mail_id),
_ => MailError::Storage(e),
})?;
Mail::from_node(&node)
.ok_or(MailError::MailNotFound(mail_id))
}
}
#[async_trait]
impl<S: GraphStorage> MailService for MailServiceImpl<S> {
async fn create_agent(&self, name: impl Into<String> + Send) -> Result<Agent> {
let agent = Agent::new(name);
let node = agent.to_node();
self.storage.create_node(&node).await?;
Ok(agent)
}
async fn get_agent(&self, id: AgentId) -> Result<Agent> {
let node_id = string_to_node_id(&id);
let id_clone = id.clone();
let node = self.storage.get_node(node_id).await
.map_err(|e| match e {
StorageError::NodeNotFound(_) => MailError::AgentNotFound(id_clone),
_ => MailError::Storage(e),
})?;
Agent::from_node(&node)
.ok_or(MailError::AgentNotFound(id))
}
async fn list_agents(&self) -> Result<Vec<Agent>> {
let query = GraphQuery::new().with_node_type("agent");
let nodes = self.storage.query_nodes(&query).await?;
let agents: Vec<Agent> = nodes.iter()
.filter_map(Agent::from_node)
.collect();
Ok(agents)
}
async fn set_agent_status(&self, agent_id: AgentId, status: impl Into<String> + Send) -> Result<Agent> {
let mut agent = self.get_agent(agent_id).await?;
agent.status = status.into();
let node = agent.to_node();
self.storage.update_node(&node).await?;
Ok(agent)
}
async fn get_agent_by_mailbox(&self, mailbox_id: MailboxId) -> Result<Agent> {
let node = self.storage.get_node(mailbox_id).await
.map_err(|e| match e {
StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
_ => MailError::Storage(e),
})?;
Agent::from_node(&node)
.ok_or_else(|| MailError::InvalidOperation(
"Node exists but is not an agent".to_string()
))
}
async fn get_agent_mailbox(&self, agent_id: AgentId) -> Result<Mailbox> {
let agent = self.get_agent(agent_id.clone()).await?;
let node_id = string_to_node_id(&agent.id);
let mailbox = Mailbox {
id: node_id,
owner_id: agent.id,
name: "Mailbox".to_string(),
created_at: agent.created_at,
};
Ok(mailbox)
}
async fn send_agent_to_agent(
&self,
from_agent_id: AgentId,
to_agent_id: AgentId,
subject: impl Into<String> + Send,
body: impl Into<String> + Send,
) -> Result<Mail> {
let from_agent = self.get_agent(from_agent_id).await?;
let to_agent = self.get_agent(to_agent_id).await?;
let from_mailbox_id = string_to_node_id(&from_agent.id);
let to_mailbox_id = string_to_node_id(&to_agent.id);
let mail = Mail::new(from_mailbox_id, to_mailbox_id, subject, body);
let node = mail.to_node();
self.storage.create_node(&node).await?;
let from_edge = Edge::new(
"sent_from",
from_mailbox_id,
mail.id,
Properties::new(),
);
self.storage.create_edge(&from_edge).await?;
let to_edge = Edge::new(
"sent_to",
mail.id,
to_mailbox_id,
Properties::new(),
);
self.storage.create_edge(&to_edge).await?;
Ok(mail)
}
async fn get_mailbox_inbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
let _agent = self.storage.get_node(mailbox_id).await
.map_err(|e| match e {
StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
_ => MailError::Storage(e),
})?;
let incoming_edges = self.storage
.get_edges_to(mailbox_id, Some("sent_to"))
.await?;
let mut mails = Vec::new();
for edge in incoming_edges {
if let Ok(mail) = self.get_mail(edge.from_node_id).await {
mails.push(mail);
}
}
mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(mails)
}
async fn get_mailbox_outbox(&self, mailbox_id: MailboxId) -> Result<Vec<Mail>> {
let _agent = self.storage.get_node(mailbox_id).await
.map_err(|e| match e {
StorageError::NodeNotFound(_) => MailError::MailboxNotFound(mailbox_id),
_ => MailError::Storage(e),
})?;
let outgoing_edges = self.storage
.get_edges_from(mailbox_id, Some("sent_from"))
.await?;
let mut mails = Vec::new();
for edge in outgoing_edges {
if let Ok(mail) = self.get_mail(edge.to_node_id).await {
mails.push(mail);
}
}
mails.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(mails)
}
async fn get_recent_mail(&self, mailbox_id: MailboxId, hours: i64, limit: usize) -> Result<Vec<Mail>> {
let since = chrono::Utc::now() - chrono::Duration::hours(hours);
let inbox = self.get_mailbox_inbox(mailbox_id).await?;
let recent: Vec<Mail> = inbox.into_iter()
.filter(|mail| mail.created_at >= since)
.take(limit)
.collect();
Ok(recent)
}
async fn mark_mail_as_read(&self, mail_id: uuid::Uuid) -> Result<Mail> {
let mut mail = self.get_mail(mail_id).await?;
mail.mark_as_read();
let node = mail.to_node();
self.storage.update_node(&node).await?;
Ok(mail)
}
async fn check_unread_mail(&self, agent_id: AgentId) -> Result<(bool, Vec<Mail>)> {
let mailbox = self.get_agent_mailbox(agent_id).await?;
let inbox = self.get_mailbox_inbox(mailbox.id).await?;
let unread: Vec<Mail> = inbox.into_iter()
.filter(|mail| !mail.read)
.collect();
let has_unread = !unread.is_empty();
Ok((has_unread, unread))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::memory::InMemoryStorage;
#[tokio::test]
async fn test_create_agent() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent = service.create_agent("Test Agent").await.unwrap();
assert_eq!(agent.name, "Test Agent");
}
#[tokio::test]
async fn test_create_agent_auto_creates_mailbox() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent = service.create_agent("Agent").await.unwrap();
let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
assert_eq!(mailbox.owner_id, agent.id);
let expected_mailbox_id = string_to_node_id(&agent.id);
assert_eq!(mailbox.id, expected_mailbox_id);
}
#[tokio::test]
async fn test_send_and_receive_mail() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent1 = service.create_agent("Sender").await.unwrap();
let agent2 = service.create_agent("Receiver").await.unwrap();
let mail = service
.send_agent_to_agent(
agent1.id.clone(),
agent2.id.clone(),
"Hello",
"This is a test message",
)
.await
.unwrap();
assert_eq!(mail.subject, "Hello");
assert_eq!(mail.body, "This is a test message");
assert!(!mail.read);
let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
assert_eq!(inbox.len(), 1);
assert_eq!(inbox[0].subject, "Hello");
let outbox = service.get_mailbox_outbox(string_to_node_id(&agent1.id)).await.unwrap();
assert_eq!(outbox.len(), 1);
assert_eq!(outbox[0].subject, "Hello");
}
#[tokio::test]
async fn test_mark_mail_as_read() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent1 = service.create_agent("Sender").await.unwrap();
let agent2 = service.create_agent("Receiver").await.unwrap();
let mail = service
.send_agent_to_agent(agent1.id, agent2.id, "Test", "Body")
.await
.unwrap();
assert!(!mail.read);
let updated = service.mark_mail_as_read(mail.id).await.unwrap();
assert!(updated.read);
}
#[tokio::test]
async fn test_check_unread_mail() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent1 = service.create_agent("Sender").await.unwrap();
let agent2 = service.create_agent("Receiver").await.unwrap();
let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
assert!(!has_unread);
assert!(unread.is_empty());
service.send_agent_to_agent(agent1.id, agent2.id.clone(), "Test", "Body").await.unwrap();
let (has_unread, unread) = service.check_unread_mail(agent2.id.clone()).await.unwrap();
assert!(has_unread);
assert_eq!(unread.len(), 1);
let mail_id = unread[0].id;
service.mark_mail_as_read(mail_id).await.unwrap();
let (has_unread, unread) = service.check_unread_mail(agent2.id).await.unwrap();
assert!(!has_unread);
assert!(unread.is_empty());
}
#[tokio::test]
async fn test_get_recent_mail() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent1 = service.create_agent("Sender").await.unwrap();
let agent2 = service.create_agent("Receiver").await.unwrap();
service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Recent", "Body").await.unwrap();
let recent = service.get_recent_mail(
string_to_node_id(&agent2.id),
24,
10
).await.unwrap();
assert_eq!(recent.len(), 1);
assert_eq!(recent[0].subject, "Recent");
}
#[tokio::test]
async fn test_get_nonexistent_agent() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let fake_id = "nonexistent_agent".to_string();
let result = service.get_agent(fake_id).await;
assert!(matches!(result, Err(MailError::AgentNotFound(_))));
}
#[tokio::test]
async fn test_get_agent_by_mailbox() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent = service.create_agent("Test Agent").await.unwrap();
let mailbox = service.get_agent_mailbox(agent.id.clone()).await.unwrap();
let found_agent = service.get_agent_by_mailbox(mailbox.id).await.unwrap();
assert_eq!(found_agent.id, agent.id);
assert_eq!(found_agent.name, agent.name);
}
#[tokio::test]
async fn test_list_agents() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
service.create_agent("Agent 1").await.unwrap();
service.create_agent("Agent 2").await.unwrap();
service.create_agent("Agent 3").await.unwrap();
let agents = service.list_agents().await.unwrap();
assert_eq!(agents.len(), 3);
}
#[tokio::test]
async fn test_set_agent_status() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent = service.create_agent("Test Agent").await.unwrap();
assert_eq!(agent.status, "offline");
let updated = service.set_agent_status(agent.id.clone(), "online").await.unwrap();
assert_eq!(updated.status, "online");
let retrieved = service.get_agent(agent.id).await.unwrap();
assert_eq!(retrieved.status, "online");
}
#[tokio::test]
async fn test_multiple_mails_sorting() {
let storage = InMemoryStorage::new();
let service = MailServiceImpl::new(storage);
let agent1 = service.create_agent("Sender").await.unwrap();
let agent2 = service.create_agent("Receiver").await.unwrap();
service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "First", "Body1").await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Second", "Body2").await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
service.send_agent_to_agent(agent1.id.clone(), agent2.id.clone(), "Third", "Body3").await.unwrap();
let inbox = service.get_mailbox_inbox(string_to_node_id(&agent2.id)).await.unwrap();
assert_eq!(inbox.len(), 3);
assert_eq!(inbox[0].subject, "Third");
assert_eq!(inbox[1].subject, "Second");
assert_eq!(inbox[2].subject, "First");
}
}