pub(crate) mod filters;
pub(crate) mod seen_state;
pub(crate) mod state;
pub(crate) mod wait;
use std::collections::HashMap;
use std::path::PathBuf;
use serde::Serialize;
use serde_json::Value;
use crate::config;
use crate::error::AtmError;
use crate::home;
use crate::identity;
use crate::mailbox;
use crate::mailbox::source::{
SourceFile, SourcedMessage, discover_source_paths, load_source_files,
rediscover_and_validate_source_paths, resolve_target,
};
use crate::mailbox::surface::dedupe_legacy_message_id_surface;
use crate::observability::{CommandEvent, ObservabilityPort};
use crate::schema::MessageEnvelope;
use crate::types::{
AckActivationMode, AgentName, DisplayBucket, IsoTimestamp, MessageClass, ReadSelection,
SourceIndex, TeamName,
};
#[derive(Debug, Clone)]
pub struct ReadQuery {
pub home_dir: PathBuf,
pub current_dir: PathBuf,
pub actor_override: Option<AgentName>,
pub target_address: Option<String>,
pub team_override: Option<TeamName>,
pub selection_mode: ReadSelection,
pub seen_state_filter: bool,
pub seen_state_update: bool,
pub ack_activation_mode: AckActivationMode,
pub limit: Option<usize>,
pub sender_filter: Option<String>,
pub timestamp_filter: Option<IsoTimestamp>,
pub timeout_secs: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct BucketCounts {
pub unread: usize,
pub pending_ack: usize,
pub history: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct ClassifiedMessage {
#[serde(skip)]
source_index: SourceIndex,
#[serde(skip)]
source_path: PathBuf,
pub bucket: DisplayBucket,
pub class: MessageClass,
#[serde(flatten)]
pub envelope: MessageEnvelope,
}
#[derive(Debug, Clone, Serialize)]
pub struct ReadOutcome {
pub action: &'static str,
pub team: TeamName,
pub agent: AgentName,
pub selection_mode: ReadSelection,
pub history_collapsed: bool,
pub mutation_applied: bool,
pub count: usize,
pub messages: Vec<ClassifiedMessage>,
pub bucket_counts: BucketCounts,
}
pub fn read_mail(
query: ReadQuery,
observability: &dyn ObservabilityPort,
) -> Result<ReadOutcome, AtmError> {
let config = config::load_config(&query.current_dir)?;
let actor = identity::resolve_actor_identity(query.actor_override.as_deref(), config.as_ref())?;
let actor_team = config::resolve_team(query.team_override.as_deref(), config.as_ref());
let target = resolve_target(
query.target_address.as_deref(),
&actor,
query.team_override.as_deref(),
config.as_ref(),
)?;
let team_dir = home::team_dir_from_home(&query.home_dir, &target.team)?;
if !team_dir.exists() {
return Err(AtmError::team_not_found(&target.team).with_recovery(
"Create the team config for the requested team or target a different team before retrying `atm read`.",
));
}
let team_config = config::load_team_config(&team_dir)?;
if target.explicit
&& !team_config
.members
.iter()
.any(|member| member.name == target.agent)
{
return Err(
AtmError::agent_not_found(&target.agent, &target.team).with_recovery(
"Update the team membership in config.json or read a different mailbox target.",
),
);
}
let own_inbox = actor == target.agent && actor_team.as_deref() == Some(target.team.as_str());
let seen_watermark = if query.seen_state_filter && query.selection_mode != ReadSelection::All {
seen_state::load_seen_watermark(&query.home_dir, &target.team, &target.agent)?
} else {
None
};
let mut source_paths = discover_source_paths(&query.home_dir, &target.team, &target.agent)?;
let load_locked_snapshot = |paths: &mut Vec<PathBuf>| -> Result<
(Vec<SourceFile>, BucketCounts, Vec<ClassifiedMessage>),
AtmError,
> {
let _locks = mailbox::lock::acquire_many_sorted(
paths.clone(),
mailbox::lock::default_lock_timeout(),
)?;
*paths = rediscover_and_validate_source_paths(
paths,
&query.home_dir,
&target.team,
&target.agent,
)?;
load_selection_state(paths, &query, seen_watermark)
};
let (mut source_files, mut bucket_counts, mut selected) =
load_locked_snapshot(&mut source_paths)?;
let mut timed_out = false;
if selected.is_empty()
&& let Some(timeout_secs) = query.timeout_secs
{
let wait_satisfied = wait::wait_for_eligible_message(
timeout_secs,
|| {
let mut poll_paths =
discover_source_paths(&query.home_dir, &target.team, &target.agent)?;
let _poll_locks = mailbox::lock::acquire_many_sorted(
poll_paths.clone(),
mailbox::lock::default_lock_timeout(),
)?;
poll_paths = rediscover_and_validate_source_paths(
&poll_paths,
&query.home_dir,
&target.team,
&target.agent,
)?;
Ok(apply_idle_notification_dedup(
dedupe_legacy_message_id_surface(
merged_surface(&load_source_files(&poll_paths)?),
|message: &SourcedMessage| message.envelope.message_id,
|message: &SourcedMessage| message.envelope.timestamp,
),
))
},
|messages| !selected_after_filters(messages, &query, seen_watermark).is_empty(),
)?;
if wait_satisfied {
source_paths = discover_source_paths(&query.home_dir, &target.team, &target.agent)?;
(source_files, bucket_counts, selected) = load_locked_snapshot(&mut source_paths)?;
} else {
timed_out = true;
}
}
selected.sort_by(|left, right| {
right
.envelope
.timestamp
.cmp(&left.envelope.timestamp)
.then_with(|| right.envelope.message_id.cmp(&left.envelope.message_id))
.then_with(|| right.source_index.cmp(&left.source_index))
});
if let Some(limit) = query.limit {
selected.truncate(limit);
}
let mutation_applied = if timed_out || selected.is_empty() {
false
} else {
let _locks = mailbox::lock::acquire_many_sorted(
source_paths.clone(),
mailbox::lock::default_lock_timeout(),
)?;
source_paths = rediscover_and_validate_source_paths(
&source_paths,
&query.home_dir,
&target.team,
&target.agent,
)?;
(source_files, bucket_counts, selected) =
load_selection_state(&source_paths, &query, seen_watermark)?;
selected.sort_by(|left, right| {
right
.envelope
.timestamp
.cmp(&left.envelope.timestamp)
.then_with(|| right.envelope.message_id.cmp(&left.envelope.message_id))
.then_with(|| right.source_index.cmp(&left.source_index))
});
if let Some(limit) = query.limit {
selected.truncate(limit);
}
let mutation_applied = apply_display_mutations(
&mut source_files,
&selected,
query.ack_activation_mode,
own_inbox,
);
if mutation_applied {
persist_source_files(&source_files)?;
}
mutation_applied
};
if query.seen_state_update
&& !selected.is_empty()
&& let Some(latest_timestamp) = selected
.iter()
.map(|message| message.envelope.timestamp)
.max()
{
seen_state::save_seen_watermark(
&query.home_dir,
&target.team,
&target.agent,
latest_timestamp,
)?;
}
let output_messages = selected
.into_iter()
.map(|selected_message| ClassifiedMessage {
source_index: selected_message.source_index,
source_path: selected_message.source_path.clone(),
bucket: selected_message.bucket,
class: selected_message.class,
envelope: source_files
.iter()
.find(|source| source.path == selected_message.source_path)
.and_then(|source| source.messages.get(selected_message.source_index.get()))
.cloned()
.unwrap_or(selected_message.envelope),
})
.collect::<Vec<_>>();
let history_collapsed = query.selection_mode != ReadSelection::All
&& query.selection_mode != ReadSelection::ActionableWithHistory
&& bucket_counts.history > 0;
let outcome = ReadOutcome {
action: "read",
team: target.team.clone().into(),
agent: target.agent.clone().into(),
selection_mode: query.selection_mode,
history_collapsed,
mutation_applied,
count: output_messages.len(),
messages: output_messages,
bucket_counts,
};
let _ = observability.emit(CommandEvent {
command: "read",
action: "read",
outcome: if timed_out { "timeout" } else { "ok" },
team: outcome.team.to_string(),
agent: outcome.agent.to_string(),
sender: actor,
message_id: None,
requires_ack: false,
dry_run: false,
task_id: None,
error_code: None,
error_message: None,
});
Ok(outcome)
}
fn load_selection_state(
source_paths: &[PathBuf],
query: &ReadQuery,
seen_watermark: Option<IsoTimestamp>,
) -> Result<(Vec<SourceFile>, BucketCounts, Vec<ClassifiedMessage>), AtmError> {
let source_files = load_source_files(source_paths)?;
let classified_all = classify_all(apply_idle_notification_dedup(
dedupe_legacy_message_id_surface(
merged_surface(&source_files),
|message: &SourcedMessage| message.envelope.message_id,
|message: &SourcedMessage| message.envelope.timestamp,
),
));
let bucket_counts = bucket_counts_for(&classified_all);
let filtered = apply_filters(
classified_all.clone(),
query.sender_filter.as_deref(),
query.timestamp_filter,
);
let selected = select_messages(&filtered, query.selection_mode, seen_watermark);
Ok((source_files, bucket_counts, selected))
}
fn merged_surface(source_files: &[SourceFile]) -> Vec<SourcedMessage> {
source_files
.iter()
.flat_map(|source| {
source
.messages
.iter()
.cloned()
.enumerate()
.map(|(source_index, envelope)| SourcedMessage {
envelope,
source_path: source.path.clone(),
source_index: source_index.into(),
})
})
.collect()
}
fn apply_idle_notification_dedup(deduped: Vec<SourcedMessage>) -> Vec<SourcedMessage> {
let latest_idle_for_sender = messages_from_idle_sender(&deduped);
deduped
.into_iter()
.enumerate()
.filter_map(|(index, message)| {
dedupe_idle_notifications(index, &message, &latest_idle_for_sender).then_some(message)
})
.collect()
}
fn dedupe_idle_notifications(
index: usize,
message: &SourcedMessage,
latest_idle_for_sender: &HashMap<String, usize>,
) -> bool {
if !is_unread_idle_notification(&message.envelope) {
return true;
}
idle_sender(&message.envelope)
.and_then(|sender| latest_idle_for_sender.get(&sender))
.map(|keep_index| *keep_index == index)
.unwrap_or(true)
}
fn messages_from_idle_sender(messages: &[SourcedMessage]) -> HashMap<String, usize> {
let mut latest_idle_for_sender = HashMap::new();
for (index, message) in messages.iter().enumerate() {
if !is_unread_idle_notification(&message.envelope) {
continue;
}
if let Some(sender) = idle_sender(&message.envelope) {
latest_idle_for_sender
.entry(sender)
.and_modify(|keep_index| *keep_index = index)
.or_insert(index);
}
}
latest_idle_for_sender
}
fn is_unread_idle_notification(message: &MessageEnvelope) -> bool {
!message.read && idle_notification_sender(message).is_some()
}
fn idle_sender(message: &MessageEnvelope) -> Option<String> {
idle_notification_sender(message)
}
fn idle_notification_sender(message: &MessageEnvelope) -> Option<String> {
serde_json::from_str::<Value>(&message.text)
.ok()
.and_then(|value| {
(value.get("type").and_then(Value::as_str) == Some("idle_notification"))
.then(|| {
value
.get("from")
.and_then(Value::as_str)
.map(str::to_string)
})
.flatten()
})
}
fn classify_all(messages: Vec<SourcedMessage>) -> Vec<ClassifiedMessage> {
messages
.into_iter()
.map(|message| {
let class = state::classify_message(&message.envelope);
let bucket = state::display_bucket_for_class(class);
ClassifiedMessage {
source_index: message.source_index,
source_path: message.source_path,
bucket,
class,
envelope: message.envelope,
}
})
.collect()
}
fn apply_filters(
messages: Vec<ClassifiedMessage>,
sender_filter: Option<&str>,
timestamp_filter: Option<IsoTimestamp>,
) -> Vec<ClassifiedMessage> {
filters::apply_timestamp_filter(
filters::apply_sender_filter(messages, sender_filter),
timestamp_filter,
)
}
fn bucket_counts_for(messages: &[ClassifiedMessage]) -> BucketCounts {
messages.iter().fold(
BucketCounts {
unread: 0,
pending_ack: 0,
history: 0,
},
|mut counts, message| {
match message.bucket {
DisplayBucket::Unread => counts.unread += 1,
DisplayBucket::PendingAck => counts.pending_ack += 1,
DisplayBucket::History => counts.history += 1,
}
counts
},
)
}
fn select_messages(
messages: &[ClassifiedMessage],
selection_mode: ReadSelection,
seen_watermark: Option<IsoTimestamp>,
) -> Vec<ClassifiedMessage> {
let watermark = if selection_mode == ReadSelection::All {
None
} else {
seen_watermark
};
filters::apply_selection_mode(messages.to_vec(), selection_mode, watermark)
}
fn selected_after_filters(
messages: &[SourcedMessage],
query: &ReadQuery,
seen_watermark: Option<IsoTimestamp>,
) -> Vec<ClassifiedMessage> {
let classified = classify_all(messages.to_vec());
let filtered = apply_filters(
classified,
query.sender_filter.as_deref(),
query.timestamp_filter,
);
select_messages(&filtered, query.selection_mode, seen_watermark)
}
fn apply_display_mutations(
source_files: &mut [SourceFile],
displayed_messages: &[ClassifiedMessage],
ack_activation_mode: AckActivationMode,
own_inbox: bool,
) -> bool {
let mut mutation_applied = false;
let promote_unread =
own_inbox && ack_activation_mode == AckActivationMode::PromoteDisplayedUnread;
let now = IsoTimestamp::now();
for message in displayed_messages {
let transitioned = transition_displayed_message(message, promote_unread, now);
let updated = transitioned.into_envelope();
if updated != message.envelope
&& let Some(source_file) = source_files
.iter_mut()
.find(|source| source.path == message.source_path)
&& let Some(stored) = source_file.messages.get_mut(message.source_index.get())
{
*stored = updated;
mutation_applied = true;
}
}
mutation_applied
}
fn transition_displayed_message(
message: &ClassifiedMessage,
promote_unread: bool,
now: IsoTimestamp,
) -> state::TransitionedMessage {
let read_state = state::derive_read_state(&message.envelope);
let ack_state = state::derive_ack_state(&message.envelope);
match (read_state, ack_state) {
(crate::types::ReadState::Unread, crate::types::AckState::NoAckRequired) if promote_unread => {
state::TransitionedMessage::ReadPendingAck(
state::StoredMessage::<crate::types::UnreadReadState, crate::types::NoAckState>::unread_no_ack(
message.envelope.clone(),
)
.display_and_require_ack(now),
)
}
(crate::types::ReadState::Unread, crate::types::AckState::NoAckRequired) => {
state::TransitionedMessage::ReadNoAck(
state::StoredMessage::<crate::types::UnreadReadState, crate::types::NoAckState>::unread_no_ack(
message.envelope.clone(),
)
.display_without_ack(),
)
}
(crate::types::ReadState::Unread, crate::types::AckState::PendingAck) => {
state::TransitionedMessage::ReadPendingAck(
state::StoredMessage::<
crate::types::UnreadReadState,
crate::types::PendingAckState,
>::unread_pending_ack(message.envelope.clone())
.mark_read_pending_ack(),
)
}
(crate::types::ReadState::Unread, crate::types::AckState::Acknowledged)
| (crate::types::ReadState::Read, crate::types::AckState::NoAckRequired)
| (crate::types::ReadState::Read, crate::types::AckState::PendingAck)
| (crate::types::ReadState::Read, crate::types::AckState::Acknowledged) => {
let mut unchanged = message.envelope.clone();
if !unchanged.read {
unchanged.read = true;
}
state::TransitionedMessage::Unchanged(unchanged)
}
}
}
fn persist_source_files(source_files: &[SourceFile]) -> Result<(), AtmError> {
for source in source_files {
mailbox::atomic::write_messages(&source.path, &source.messages)?;
}
Ok(())
}