use std::path::{Path, PathBuf};
use serde::Serialize;
use serde_json::Map;
use tracing::trace;
use crate::address::AgentAddress;
use crate::config;
use crate::error::AtmError;
use crate::home;
use crate::identity;
use crate::mailbox;
use crate::mailbox::source::{
SourceFile, SourcedMessage, discover_source_paths, load_source_files,
rediscover_and_validate_source_paths,
};
use crate::mailbox::surface::dedupe_legacy_message_id_surface;
use crate::observability::{CommandEvent, ObservabilityPort};
use crate::read::state;
use crate::schema::{LegacyMessageId, MessageEnvelope};
use crate::send::{input, summary};
use crate::types::{AgentName, IsoTimestamp, TeamName};
#[derive(Debug, Clone)]
pub struct AckRequest {
pub home_dir: PathBuf,
pub current_dir: PathBuf,
pub actor_override: Option<AgentName>,
pub team_override: Option<TeamName>,
pub message_id: LegacyMessageId,
pub reply_body: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct AckOutcome {
pub action: &'static str,
pub team: TeamName,
pub agent: AgentName,
pub message_id: LegacyMessageId,
#[serde(skip_serializing_if = "Option::is_none")]
pub task_id: Option<String>,
pub reply_target: String,
pub reply_message_id: LegacyMessageId,
pub reply_text: String,
}
pub fn ack_mail(
request: AckRequest,
observability: &dyn ObservabilityPort,
) -> Result<AckOutcome, AtmError> {
let config = config::load_config(&request.current_dir)?;
let actor =
identity::resolve_actor_identity(request.actor_override.as_deref(), config.as_ref())?;
let team = config::resolve_team(request.team_override.as_deref(), config.as_ref())
.ok_or_else(AtmError::team_unavailable)?;
let team_dir = home::team_dir_from_home(&request.home_dir, &team)?;
if !team_dir.exists() {
return Err(AtmError::team_not_found(&team));
}
let team_config = config::load_team_config(&team_dir)?;
if !team_config
.members
.iter()
.any(|member| member.name == actor)
{
return Err(AtmError::agent_not_found(&actor, &team));
}
let mut actor_source_paths = discover_source_paths(&request.home_dir, &team, &actor)?;
let source_locks = mailbox::lock::acquire_many_sorted(
actor_source_paths.clone(),
mailbox::lock::default_lock_timeout(),
)?;
actor_source_paths = rediscover_and_validate_source_paths(
&actor_source_paths,
&request.home_dir,
&team,
&actor,
)?;
let source_files = load_source_files(&actor_source_paths)?;
let source_message = find_source_message(&source_files, request.message_id, &actor, &team)?;
match (
state::derive_read_state(&source_message.envelope),
state::derive_ack_state(&source_message.envelope),
) {
(crate::types::ReadState::Read, crate::types::AckState::PendingAck) => {}
(_, crate::types::AckState::Acknowledged) => {
return Err(AtmError::validation(format!(
"message {} is already acknowledged",
request.message_id
))
.with_recovery(
"Refresh the mailbox with `atm read` and choose a message that is still pending acknowledgement.",
));
}
_ => {
return Err(AtmError::validation(format!(
"message {} is not in the (read, pending_ack) state",
request.message_id
))
.with_recovery(
"Refresh the mailbox with `atm read` and choose a message that is still pending acknowledgement.",
));
}
}
let (reply_agent, reply_team) = resolve_reply_target(&source_message.envelope, &team)?;
let reply_team_dir = home::team_dir_from_home(&request.home_dir, &reply_team)?;
if !reply_team_dir.exists() {
return Err(AtmError::team_not_found(&reply_team));
}
let reply_team_config = config::load_team_config(&reply_team_dir)?;
if !reply_team_config
.members
.iter()
.any(|member| member.name == reply_agent.as_str())
{
return Err(AtmError::agent_not_found(&reply_agent, &reply_team));
}
let ack_timestamp = IsoTimestamp::now();
let reply_text = input::validate_message_text(request.reply_body)?;
let reply_message_id = LegacyMessageId::new();
let source_task_id = source_message.envelope.task_id.clone();
let reply_message = MessageEnvelope {
from: actor.clone(),
text: reply_text.clone(),
timestamp: ack_timestamp,
read: false,
source_team: Some(team.clone()),
summary: Some(summary::build_summary(&reply_text, None)),
message_id: Some(reply_message_id),
pending_ack_at: None,
acknowledged_at: None,
acknowledges_message_id: Some(request.message_id),
task_id: None,
extra: Map::new(),
};
let reply_inbox_path =
home::inbox_path_from_home(&request.home_dir, &reply_team, &reply_agent)?;
let final_write_paths = if actor_source_paths
.iter()
.any(|path| path == &reply_inbox_path)
{
actor_source_paths.clone()
} else {
let mut final_paths = actor_source_paths.clone();
final_paths.push(reply_inbox_path.clone());
final_paths
};
drop(source_locks);
let _final_locks = mailbox::lock::acquire_many_sorted(
final_write_paths,
mailbox::lock::default_lock_timeout(),
)?;
actor_source_paths = rediscover_and_validate_source_paths(
&actor_source_paths,
&request.home_dir,
&team,
&actor,
)?;
let mut source_files = load_source_files(&actor_source_paths)?;
let source_message = find_source_message(&source_files, request.message_id, &actor, &team)?;
match (
state::derive_read_state(&source_message.envelope),
state::derive_ack_state(&source_message.envelope),
) {
(crate::types::ReadState::Read, crate::types::AckState::PendingAck) => {}
_ => {
return Err(AtmError::validation(format!(
"message {} is not in the (read, pending_ack) state",
request.message_id
))
.with_recovery(
"Refresh the mailbox with `atm read` and retry the acknowledgement if the message is still pending acknowledgement.",
));
}
}
update_source_message(&mut source_files, &source_message, ack_timestamp)?;
append_reply_message(&mut source_files, &reply_inbox_path, reply_message)?;
persist_source_files(&source_files)?;
let outcome = AckOutcome {
action: "ack",
team: team.clone().into(),
agent: actor.clone().into(),
message_id: request.message_id,
task_id: source_task_id.clone(),
reply_target: format!("{reply_agent}@{reply_team}"),
reply_message_id,
reply_text: reply_text.clone(),
};
let _ = observability.emit(CommandEvent {
command: "ack",
action: "ack",
outcome: "ok",
team,
agent: actor.clone(),
sender: actor,
message_id: Some(request.message_id),
requires_ack: false,
dry_run: false,
task_id: source_task_id,
error_code: None,
error_message: None,
});
Ok(outcome)
}
fn resolve_reply_target(
message: &MessageEnvelope,
current_team: &str,
) -> Result<(AgentName, TeamName), AtmError> {
if let Some(identity) = canonical_sender_identity(message) {
let parsed: AgentAddress = identity.parse()?;
let team = parsed.team.ok_or_else(AtmError::team_unavailable)?;
return Ok((parsed.agent.into(), team.into()));
}
let parsed: AgentAddress = if message.from.contains('@') {
message.from.parse()?
} else {
AgentAddress {
agent: message.from.clone(),
team: message
.source_team
.clone()
.or_else(|| Some(current_team.to_string())),
}
};
let team = parsed.team.ok_or_else(AtmError::team_unavailable)?;
Ok((parsed.agent.into(), team.into()))
}
fn canonical_sender_identity(message: &MessageEnvelope) -> Option<String> {
message
.extra
.get("metadata")
.and_then(serde_json::Value::as_object)
.and_then(|metadata| metadata.get("atm"))
.and_then(serde_json::Value::as_object)
.and_then(|atm| atm.get("fromIdentity"))
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned)
}
fn merged_surface(source_files: &[SourceFile]) -> Vec<SourcedMessage> {
source_files
.iter()
.flat_map(|source| {
source
.messages
.iter()
.cloned()
.enumerate()
.map(|(source_index, envelope)| SourcedMessage {
envelope,
source_path: source.path.clone(),
source_index: source_index.into(),
})
})
.collect()
}
fn find_source_message(
source_files: &[SourceFile],
message_id: LegacyMessageId,
actor: &str,
team: &str,
) -> Result<SourcedMessage, AtmError> {
dedupe_legacy_message_id_surface(
merged_surface(source_files),
|message: &SourcedMessage| message.envelope.message_id,
|message: &SourcedMessage| message.envelope.timestamp,
)
.into_iter()
.filter_map(|message| match message.envelope.message_id {
Some(_) => Some(message),
None => {
trace!(
source_path = %message.source_path.display(),
source_index = usize::from(message.source_index),
"skipping source message without message_id during ack lookup"
);
None
}
})
.find(|message| message.envelope.message_id == Some(message_id))
.ok_or_else(|| {
AtmError::validation(format!(
"message {} was not found in {}@{}",
message_id, actor, team
))
.with_recovery(
"Refresh the mailbox with `atm read` and choose a message that is still present in the pending-ack surface.",
)
})
}
fn update_source_message(
source_files: &mut [SourceFile],
source_message: &SourcedMessage,
acknowledged_at: IsoTimestamp,
) -> Result<(), AtmError> {
let source_file = source_files
.iter_mut()
.find(|source| source.path == source_message.source_path)
.ok_or_else(|| {
AtmError::mailbox_write(format!(
"source inbox disappeared during acknowledgement: {}",
source_message.source_path.display()
))
})?;
let stored = source_file
.messages
.get_mut(source_message.source_index.get())
.ok_or_else(|| {
AtmError::mailbox_write(format!(
"source message index {} disappeared during acknowledgement",
usize::from(source_message.source_index)
))
})?;
let transitioned = state::StoredMessage::<
crate::types::ReadReadState,
crate::types::PendingAckState,
>::read_pending_ack(stored.clone())
.acknowledge(acknowledged_at)
.envelope;
*stored = transitioned;
Ok(())
}
fn append_reply_message(
source_files: &mut Vec<SourceFile>,
reply_inbox_path: &Path,
reply_message: MessageEnvelope,
) -> Result<(), AtmError> {
if let Some(source_file) = source_files
.iter_mut()
.find(|source| source.path == reply_inbox_path)
{
source_file.messages.push(reply_message);
return Ok(());
}
source_files.push(SourceFile {
path: reply_inbox_path.to_path_buf(),
messages: mailbox::read_messages(reply_inbox_path)?,
});
source_files
.last_mut()
.expect("Vec::push is infallible — last_mut always returns Some after push")
.messages
.push(reply_message);
source_files.sort_by(|left, right| left.path.cmp(&right.path));
Ok(())
}
fn persist_source_files(source_files: &[SourceFile]) -> Result<(), AtmError> {
for source in source_files {
mailbox::atomic::write_messages(&source.path, &source.messages)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::{canonical_sender_identity, resolve_reply_target};
use crate::schema::MessageEnvelope;
use crate::types::IsoTimestamp;
fn message_with_from(from: &str) -> MessageEnvelope {
MessageEnvelope {
from: from.to_string(),
text: "hello".to_string(),
timestamp: IsoTimestamp::now(),
read: false,
source_team: Some("atm-dev".to_string()),
summary: None,
message_id: None,
pending_ack_at: None,
acknowledged_at: None,
acknowledges_message_id: None,
task_id: None,
extra: serde_json::Map::new(),
}
}
#[test]
fn canonical_sender_identity_reads_metadata_override() {
let mut message = message_with_from("lead");
message.extra.insert(
"metadata".to_string(),
json!({"atm": {"fromIdentity": "team-lead@src-gen"}}),
);
assert_eq!(
canonical_sender_identity(&message).as_deref(),
Some("team-lead@src-gen")
);
}
#[test]
fn resolve_reply_target_prefers_canonical_sender_identity_metadata() {
let mut message = message_with_from("lead");
message.source_team = Some("atm-dev".to_string());
message.extra.insert(
"metadata".to_string(),
json!({"atm": {"fromIdentity": "team-lead@src-gen"}}),
);
let target = resolve_reply_target(&message, "atm-dev").expect("reply target");
assert_eq!(target, ("team-lead".into(), "src-gen".into()));
}
}