use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::error::AtmError;
use crate::mailbox::atomic;
use crate::mailbox::lock;
use crate::mailbox::source::{
SourceFile, discover_source_paths, load_source_files, rediscover_and_validate_source_paths,
};
use crate::schema::MessageEnvelope;
pub(crate) fn commit_mailbox_state(
path: &Path,
messages: &[MessageEnvelope],
) -> Result<(), AtmError> {
atomic::write_messages(path, messages)
}
pub(crate) fn commit_source_files(source_files: &[SourceFile]) -> Result<(), AtmError> {
for source in source_files {
commit_mailbox_state(&source.path, &source.messages)?;
}
Ok(())
}
pub(crate) fn observe_source_files(
home_dir: &Path,
team: &str,
agent: &str,
) -> Result<Vec<SourceFile>, AtmError> {
let source_paths = discover_source_paths(home_dir, team, agent)?;
load_source_files(&source_paths)
}
pub(crate) fn with_locked_source_files<T, I, F>(
home_dir: &Path,
team: &str,
agent: &str,
extra_write_paths: I,
timeout: Duration,
body: F,
) -> Result<T, AtmError>
where
I: IntoIterator<Item = PathBuf>,
F: FnOnce(&[PathBuf], &mut Vec<SourceFile>) -> Result<T, AtmError>,
{
let source_paths = discover_source_paths(home_dir, team, agent)?;
let mut write_paths = source_paths.clone();
write_paths.extend(extra_write_paths);
let _locks = lock::acquire_many_sorted(write_paths, timeout)?;
let source_paths = rediscover_and_validate_source_paths(&source_paths, home_dir, team, agent)?;
maybe_remove_locked_source_file_for_test(&source_paths)?;
let mut source_files = load_source_files(&source_paths)?;
body(&source_paths, &mut source_files)
}
fn maybe_remove_locked_source_file_for_test(source_paths: &[PathBuf]) -> Result<(), AtmError> {
if std::env::var_os("ATM_TEST_REMOVE_LOCKED_INBOX_BEFORE_LOAD").is_none() {
return Ok(());
}
let Some(path) = source_paths.first() else {
return Ok(());
};
std::fs::remove_file(path).map_err(|error| {
AtmError::mailbox_write(format!(
"failed to remove locked inbox {} during test injection: {error}",
path.display()
))
.with_recovery(
"Clear ATM_TEST_REMOVE_LOCKED_INBOX_BEFORE_LOAD or restore the missing inbox file before retrying the injected test path.",
)
.with_source(error)
})
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use uuid::Uuid;
use super::{commit_mailbox_state, commit_source_files};
use crate::mailbox::read_messages;
use crate::mailbox::source::SourceFile;
use crate::schema::{LegacyMessageId, MessageEnvelope};
use crate::types::IsoTimestamp;
#[test]
fn commit_mailbox_state_rewrites_mailbox_jsonl_with_only_new_messages() {
let tempdir = tempdir().expect("tempdir");
let path = tempdir.path().join("arch-ctm.json");
std::fs::write(&path, "{\"stale\":true}\n").expect("seed mailbox");
let messages = vec![
sample_message("team-lead", "first replacement"),
sample_message("qa", "second replacement"),
];
commit_mailbox_state(&path, &messages).expect("commit mailbox");
let raw = std::fs::read_to_string(&path).expect("mailbox contents");
assert!(!raw.contains("stale"));
assert_eq!(raw.lines().count(), 2);
assert!(raw.ends_with('\n'));
assert_eq!(read_messages(&path).expect("read mailbox"), messages);
}
#[test]
fn commit_source_files_commits_each_source_path() {
let tempdir = tempdir().expect("tempdir");
let left_path = tempdir.path().join("arch-ctm.json");
let right_path = tempdir.path().join("qa.json");
let left_messages = vec![sample_message("team-lead", "left message")];
let right_messages = vec![
sample_message("arch-ctm", "right first"),
sample_message("team-lead", "right second"),
];
commit_source_files(&[
SourceFile {
path: left_path.clone(),
messages: left_messages.clone(),
},
SourceFile {
path: right_path.clone(),
messages: right_messages.clone(),
},
])
.expect("commit source files");
assert_eq!(
read_messages(&left_path).expect("left inbox"),
left_messages
);
assert_eq!(
read_messages(&right_path).expect("right inbox"),
right_messages
);
}
#[test]
fn commit_source_files_stops_after_first_write_error() {
let tempdir = tempdir().expect("tempdir");
let first_path = tempdir.path().join("first.json");
let invalid_path = tempdir.path().to_path_buf();
let later_path = tempdir.path().join("later.json");
let error = commit_source_files(&[
SourceFile {
path: first_path.clone(),
messages: vec![sample_message("team-lead", "first")],
},
SourceFile {
path: invalid_path,
messages: vec![sample_message("qa", "broken")],
},
SourceFile {
path: later_path.clone(),
messages: vec![sample_message("arch-ctm", "later")],
},
])
.expect_err("write failure");
assert!(error.is_mailbox_write());
assert_eq!(read_messages(&first_path).expect("first inbox").len(), 1);
assert!(!later_path.exists());
}
fn sample_message(from: &str, text: &str) -> MessageEnvelope {
let message_id = LegacyMessageId::from(Uuid::new_v4());
let mut extra = serde_json::Map::new();
let mut metadata = serde_json::Map::new();
let mut atm = serde_json::Map::new();
atm.insert(
"messageId".to_string(),
serde_json::Value::String(message_id.into_atm_message_id().to_string()),
);
atm.insert(
"sourceTeam".to_string(),
serde_json::Value::String("atm-dev".to_string()),
);
metadata.insert("atm".to_string(), serde_json::Value::Object(atm));
extra.insert("metadata".to_string(), serde_json::Value::Object(metadata));
MessageEnvelope {
from: from.to_string(),
text: text.to_string(),
timestamp: IsoTimestamp::now(),
read: false,
source_team: Some("atm-dev".to_string()),
summary: None,
message_id: Some(message_id),
pending_ack_at: None,
acknowledged_at: None,
acknowledges_message_id: None,
task_id: None,
extra,
}
}
}