use crate::io::{
error::InboxError,
inbox::{WriteOutcome, inbox_append},
};
use crate::schema::InboxMessage;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::{Path, PathBuf};
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpooledMessage {
pub target_team: String,
pub target_agent: String,
pub message: InboxMessage,
pub retry_count: u32,
pub max_retries: u32,
pub created_at: String,
pub last_attempt: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpoolStatus {
pub delivered: usize,
pub pending: usize,
pub failed: usize,
}
pub fn spool_message(
team: &str,
agent: &str,
message: &InboxMessage,
) -> Result<PathBuf, InboxError> {
spool_message_with_base(team, agent, message, None)
}
fn spool_message_with_base(
team: &str,
agent: &str,
message: &InboxMessage,
base_dir: Option<&Path>,
) -> Result<PathBuf, InboxError> {
let spool_dir = get_spool_dir_with_base("pending", base_dir)?;
fs::create_dir_all(&spool_dir).map_err(|e| InboxError::Io {
path: spool_dir.clone(),
source: e,
})?;
let now = chrono::Utc::now();
let timestamp = now.timestamp();
let filename = format!("{timestamp}-{agent}@{team}.json");
let spool_path = spool_dir.join(&filename);
let spooled = SpooledMessage {
target_team: team.to_string(),
target_agent: agent.to_string(),
message: message.clone(),
retry_count: 0,
max_retries: 10,
created_at: now.to_rfc3339(),
last_attempt: now.to_rfc3339(),
};
let content = serde_json::to_vec_pretty(&spooled).map_err(|e| InboxError::Json {
path: spool_path.clone(),
source: e,
})?;
fs::write(&spool_path, content).map_err(|e| InboxError::Io {
path: spool_path.clone(),
source: e,
})?;
Ok(spool_path)
}
pub fn spool_drain(inbox_base: &Path) -> Result<SpoolStatus, InboxError> {
spool_drain_with_base(inbox_base, None)
}
fn spool_drain_with_base(
inbox_base: &Path,
base_dir: Option<&Path>,
) -> Result<SpoolStatus, InboxError> {
let pending_dir = get_spool_dir_with_base("pending", base_dir)?;
let failed_dir = get_spool_dir_with_base("failed", base_dir)?;
fs::create_dir_all(&pending_dir).map_err(|e| InboxError::Io {
path: pending_dir.clone(),
source: e,
})?;
fs::create_dir_all(&failed_dir).map_err(|e| InboxError::Io {
path: failed_dir.clone(),
source: e,
})?;
let mut delivered = 0;
if pending_dir.exists() {
let entries = fs::read_dir(&pending_dir).map_err(|e| InboxError::Io {
path: pending_dir.clone(),
source: e,
})?;
for entry in entries {
let entry = entry.map_err(|e| InboxError::Io {
path: pending_dir.clone(),
source: e,
})?;
let path = entry.path();
if !path.is_file() || path.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
match process_spooled_message(&path, inbox_base, &failed_dir) {
Ok(true) => {
let _ = fs::remove_file(&path); delivered += 1;
}
Ok(false) => {
}
Err(e) => {
warn!("Failed to process {path:?}: {e}");
}
}
}
}
let pending = count_files(&pending_dir)?;
let failed = count_files(&failed_dir)?;
Ok(SpoolStatus {
delivered,
pending,
failed,
})
}
fn process_spooled_message(
spool_path: &Path,
inbox_base: &Path,
failed_dir: &Path,
) -> Result<bool, InboxError> {
let content = fs::read(spool_path).map_err(|e| InboxError::Io {
path: spool_path.to_path_buf(),
source: e,
})?;
let mut spooled: SpooledMessage =
serde_json::from_slice(&content).map_err(|e| InboxError::Json {
path: spool_path.to_path_buf(),
source: e,
})?;
let inbox_path = inbox_base
.join(&spooled.target_team)
.join("inboxes")
.join(format!("{}.json", spooled.target_agent));
let delivery_result = (|| -> Result<WriteOutcome, InboxError> {
if let Some(parent) = inbox_path.parent() {
fs::create_dir_all(parent).map_err(|e| InboxError::Io {
path: parent.to_path_buf(),
source: e,
})?;
}
inbox_append(
&inbox_path,
&spooled.message,
&spooled.target_team,
&spooled.target_agent,
)
})();
match delivery_result {
Ok(WriteOutcome::Success | WriteOutcome::ConflictResolved { .. }) => {
return Ok(true);
}
Ok(WriteOutcome::Queued {
spool_path: new_spool_path,
}) => {
let _ = fs::remove_file(&new_spool_path);
}
Err(_) => {
}
}
spooled.retry_count += 1;
spooled.last_attempt = chrono::Utc::now().to_rfc3339();
if spooled.retry_count >= spooled.max_retries {
let failed_path =
failed_dir.join(
spool_path
.file_name()
.ok_or_else(|| InboxError::SpoolError {
message: format!("Invalid spool path: {spool_path:?}"),
})?,
);
let failed_content = serde_json::to_vec_pretty(&spooled).map_err(|e| InboxError::Json {
path: failed_path.clone(),
source: e,
})?;
fs::write(&failed_path, failed_content).map_err(|e| InboxError::Io {
path: failed_path.clone(),
source: e,
})?;
let _ = fs::remove_file(spool_path);
} else {
let updated_content =
serde_json::to_vec_pretty(&spooled).map_err(|e| InboxError::Json {
path: spool_path.to_path_buf(),
source: e,
})?;
fs::write(spool_path, updated_content).map_err(|e| InboxError::Io {
path: spool_path.to_path_buf(),
source: e,
})?;
}
Ok(false)
}
fn get_spool_dir_with_base(subdir: &str, base_dir: Option<&Path>) -> Result<PathBuf, InboxError> {
let spool_dir = if let Some(base) = base_dir {
base.join("spool").join(subdir)
} else {
let home = crate::home::get_home_dir().map_err(|e| InboxError::SpoolError {
message: format!("Could not determine home directory: {e}"),
})?;
home.join(".config/atm/spool").join(subdir)
};
Ok(spool_dir)
}
fn count_files(dir: &Path) -> Result<usize, InboxError> {
if !dir.exists() {
return Ok(0);
}
let entries = fs::read_dir(dir).map_err(|e| InboxError::Io {
path: dir.to_path_buf(),
source: e,
})?;
let count = entries
.filter_map(|e| e.ok())
.filter(|e| {
e.path().is_file() && e.path().extension().and_then(|s| s.to_str()) == Some("json")
})
.count();
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::WriteOutcome;
use std::collections::HashMap;
use tempfile::TempDir;
fn create_test_message(from: &str, text: &str, message_id: Option<String>) -> InboxMessage {
InboxMessage {
from: from.to_string(),
text: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
read: false,
summary: None,
message_id,
unknown_fields: HashMap::new(),
}
}
#[test]
fn test_spool_message_format() {
let temp_dir = TempDir::new().unwrap();
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
assert!(spool_path.exists());
assert!(
spool_path
.file_name()
.unwrap()
.to_str()
.unwrap()
.contains("test-agent@test-team.json")
);
let content = fs::read_to_string(&spool_path).unwrap();
let spooled: SpooledMessage = serde_json::from_str(&content).unwrap();
assert_eq!(spooled.target_team, "test-team");
assert_eq!(spooled.target_agent, "test-agent");
assert_eq!(spooled.message.text, "Test message");
assert_eq!(spooled.retry_count, 0);
assert_eq!(spooled.max_retries, 10);
assert!(!spooled.created_at.is_empty());
assert!(!spooled.last_attempt.is_empty());
}
#[test]
fn test_spool_drain_delivers_messages() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
fs::create_dir_all(&inbox_base).unwrap();
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
assert!(spool_path.exists());
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 1);
assert_eq!(status.pending, 0);
assert_eq!(status.failed, 0);
let inbox_path = inbox_base
.join("test-team")
.join("inboxes")
.join("test-agent.json");
assert!(inbox_path.exists());
let inbox_content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&inbox_content).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].text, "Test message");
assert!(!spool_path.exists());
}
#[test]
fn test_spool_drain_increments_retry_count() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
let content = fs::read_to_string(&spool_path).unwrap();
let mut spooled: SpooledMessage = serde_json::from_str(&content).unwrap();
assert_eq!(spooled.retry_count, 0);
assert_eq!(spooled.max_retries, 10);
spooled.retry_count = 1;
spooled.last_attempt = chrono::Utc::now().to_rfc3339();
fs::write(&spool_path, serde_json::to_string_pretty(&spooled).unwrap()).unwrap();
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 1);
assert_eq!(status.pending, 0);
assert!(!spool_path.exists());
}
#[test]
fn test_spool_drain_moves_to_failed() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
let content = fs::read_to_string(&spool_path).unwrap();
let mut spooled: SpooledMessage = serde_json::from_str(&content).unwrap();
spooled.retry_count = 10; fs::write(&spool_path, serde_json::to_string_pretty(&spooled).unwrap()).unwrap();
let inboxes_dir = inbox_base.join("test-team").join("inboxes");
fs::create_dir_all(inbox_base.join("test-team")).unwrap();
fs::write(&inboxes_dir, "not a directory").unwrap();
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 0);
assert_eq!(status.pending, 0);
assert_eq!(status.failed, 1);
assert!(!spool_path.exists());
let failed_dir = get_spool_dir_with_base("failed", Some(temp_dir.path())).unwrap();
let failed_files: Vec<_> = fs::read_dir(&failed_dir)
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("json"))
.collect();
assert_eq!(failed_files.len(), 1);
}
#[test]
fn test_spool_status_counts() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
fs::create_dir_all(&inbox_base).unwrap();
for i in 0..3 {
let message = create_test_message(
"team-lead",
&format!("Message {i}"),
Some(format!("msg-{i:03}")),
);
spool_message_with_base(
"test-team",
&format!("agent-{i}"),
&message,
Some(temp_dir.path()),
)
.unwrap();
}
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 3);
assert_eq!(status.pending, 0);
assert_eq!(status.failed, 0);
}
#[test]
fn test_spool_directories_auto_created() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
assert!(spool_path.exists());
assert!(spool_path.parent().unwrap().exists());
let _ = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
let failed_dir = get_spool_dir_with_base("failed", Some(temp_dir.path())).unwrap();
assert!(failed_dir.exists());
}
#[test]
fn test_spool_drain_keeps_pending_on_queued_outcome() {
use crate::io::lock::acquire_lock;
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
fs::create_dir_all(&inbox_base).unwrap();
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let spool_path =
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
assert!(spool_path.exists());
let content = fs::read_to_string(&spool_path).unwrap();
let initial: SpooledMessage = serde_json::from_str(&content).unwrap();
assert_eq!(initial.retry_count, 0);
let inbox_path = inbox_base
.join("test-team")
.join("inboxes")
.join("test-agent.json");
fs::create_dir_all(inbox_path.parent().unwrap()).unwrap();
fs::write(&inbox_path, "[]").unwrap();
let lock_path = inbox_path.with_extension("lock");
let _held_lock = acquire_lock(&lock_path, 0).unwrap();
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 0, "Should not deliver when lock is held");
assert_eq!(status.pending, 1, "Spool file should remain in pending");
assert!(spool_path.exists(), "Spool file should not be deleted");
let content = fs::read_to_string(&spool_path).unwrap();
let updated: SpooledMessage = serde_json::from_str(&content).unwrap();
assert_eq!(updated.retry_count, 1, "retry_count should be incremented");
assert_ne!(
updated.last_attempt, initial.last_attempt,
"last_attempt should be updated"
);
}
#[test]
fn test_duplicate_detection_in_spool_drain() {
let temp_dir = TempDir::new().unwrap();
let inbox_base = temp_dir.path().join("teams");
fs::create_dir_all(&inbox_base).unwrap();
let message = create_test_message("team-lead", "Test message", Some("msg-001".to_string()));
let inbox_path = inbox_base
.join("test-team")
.join("inboxes")
.join("test-agent.json");
fs::create_dir_all(inbox_path.parent().unwrap()).unwrap();
let result = inbox_append(&inbox_path, &message, "test-team", "test-agent").unwrap();
assert_eq!(result, WriteOutcome::Success);
spool_message_with_base("test-team", "test-agent", &message, Some(temp_dir.path()))
.unwrap();
let status = spool_drain_with_base(&inbox_base, Some(temp_dir.path())).unwrap();
assert_eq!(status.delivered, 1); assert_eq!(status.pending, 0);
assert_eq!(status.failed, 0);
let inbox_content = fs::read_to_string(&inbox_path).unwrap();
let messages: Vec<InboxMessage> = serde_json::from_str(&inbox_content).unwrap();
assert_eq!(messages.len(), 1);
}
}