use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use crate::source::{AgentEvent, Transport};
use crate::state::{ActivityState, AgentSlot, SceneState};
use crate::AgentId;
pub const HOOK_WINS_WINDOW: Duration = Duration::from_millis(500);
pub const EXIT_GRACE_WINDOW: Duration = Duration::from_millis(4500);
pub const ACTIVE_GRACE_WINDOW: Duration = Duration::from_millis(1500);
pub const STALE_ACTIVE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
pub const STALE_IDLE_TIMEOUT: Duration = Duration::from_secs(30 * 60);
pub const STALE_WAITING_TIMEOUT: Duration = Duration::from_secs(60 * 60);
pub const STALE_UNKNOWN_CWD_TIMEOUT: Duration = Duration::from_secs(3 * 60);
#[derive(Debug, Default)]
pub struct Reducer {
recent_hook_tool_uses: HashMap<(AgentId, String), SystemTime>,
active_tasks: HashMap<AgentId, HashSet<String>>,
next_label_n: u32,
}
impl Reducer {
pub fn new() -> Self {
Self::default()
}
pub fn tick(&mut self, scene: &mut SceneState, now: SystemTime) {
self.gc(now);
self.sweep_exited(scene, now);
self.expire_pending_idles(scene, now);
self.sweep_stale(scene, now);
}
pub fn apply(
&mut self,
scene: &mut SceneState,
event: AgentEvent,
now: SystemTime,
from: Transport,
) {
self.gc(now);
self.sweep_exited(scene, now);
self.expire_pending_idles(scene, now);
let id = event.agent_id();
if from == Transport::Hook {
let in_task = self.active_tasks.get(&id).is_some_and(|s| !s.is_empty());
let suppress = match &event {
AgentEvent::ActivityStart { .. } => in_task,
AgentEvent::ActivityEnd { tool_use_id, .. } => {
let is_task_self_end = tool_use_id
.as_ref()
.is_some_and(|t| self.active_tasks.get(&id).is_some_and(|s| s.contains(t)));
in_task && !is_task_self_end
}
_ => false,
};
if suppress {
return;
}
}
if from == Transport::Jsonl {
if let Some(tuid) = event_tool_use_id(&event) {
if self
.recent_hook_tool_uses
.contains_key(&(id, tuid.to_string()))
{
return;
}
}
}
if from == Transport::Hook {
if let Some(tuid) = event_tool_use_id(&event) {
self.recent_hook_tool_uses
.insert((id, tuid.to_string()), now);
}
}
let mut handled_by_task_tracking = false;
match &event {
AgentEvent::ActivityStart {
agent_id,
tool_use_id: Some(tuid),
detail: Some(d),
..
} if d.is_task() => {
self.active_tasks
.entry(*agent_id)
.or_default()
.insert(tuid.clone());
if let Some(slot) = scene.agents.get_mut(agent_id) {
slot.state = ActivityState::Active {
activity: crate::source::Activity::Typing,
tool_use_id: Some(Arc::<str>::from(tuid.as_str())),
detail: Some(Arc::<str>::from("Delegating")),
};
slot.state_started_at = now;
slot.pending_idle_at = None;
}
}
AgentEvent::ActivityEnd {
agent_id,
tool_use_id: Some(tuid),
} => {
if let Some(set) = self.active_tasks.get_mut(agent_id) {
if set.remove(tuid) {
handled_by_task_tracking = true;
if let Some(slot) = scene.agents.get_mut(agent_id) {
slot.last_event_at = now;
if set.is_empty() {
slot.pending_idle_at = Some(now);
}
}
}
}
}
_ => {}
}
match event {
AgentEvent::SessionStart {
agent_id,
source,
session_id,
cwd,
parent_id,
} => {
if scene.agents.contains_key(&agent_id) {
return;
}
let Some(desk_index) = scene.next_free_desk() else {
tracing::warn!(
?agent_id,
cwd = %cwd.display(),
session_id = %session_id,
max_desks = scene.max_desks,
"dropped SessionStart — all desks occupied; bump --max-desks"
);
return;
};
self.next_label_n += 1;
let has_cwd = cwd
.file_name()
.and_then(|n| n.to_str())
.filter(|s| !s.is_empty())
.is_some();
let label: Arc<str> = if has_cwd {
Arc::<str>::from(cwd.file_name().and_then(|n| n.to_str()).unwrap_or(&source))
} else {
let prefix: String = source.chars().take(2).collect();
Arc::<str>::from(format!("{prefix}#{}", self.next_label_n).as_str())
};
scene.agents.insert(
agent_id,
AgentSlot {
agent_id,
source: Arc::<str>::from(source.as_str()),
session_id: Arc::<str>::from(session_id.as_str()),
cwd: Arc::<std::path::Path>::from(cwd.as_path()),
label,
state: ActivityState::Idle,
state_started_at: now,
last_event_at: now,
created_at: now,
exiting_at: None,
pending_idle_at: None,
desk_index,
tool_call_count: 0,
active_ms: 0,
unknown_cwd: !has_cwd,
parent_id,
},
);
}
AgentEvent::ActivityStart {
agent_id,
activity,
tool_use_id,
detail,
} => {
if let Some(slot) = scene.agents.get_mut(&agent_id) {
if !detail.as_ref().is_some_and(|d| d.is_task()) {
slot.tool_call_count += 1;
}
if matches!(slot.state, ActivityState::Active { .. }) {
let elapsed = now
.duration_since(slot.state_started_at)
.unwrap_or_default()
.as_millis() as u64;
slot.active_ms += elapsed;
}
slot.state = ActivityState::Active {
activity,
tool_use_id: tool_use_id.map(|s| Arc::<str>::from(s.as_str())),
detail: detail.map(|d| Arc::<str>::from(d.display())),
};
slot.state_started_at = now;
slot.last_event_at = now;
slot.pending_idle_at = None;
}
}
AgentEvent::ActivityEnd { agent_id, .. } => {
if !handled_by_task_tracking {
if let Some(slot) = scene.agents.get_mut(&agent_id) {
if matches!(slot.state, ActivityState::Active { .. }) {
slot.pending_idle_at = Some(now);
}
slot.last_event_at = now;
}
}
}
AgentEvent::Waiting { agent_id, reason } => {
if let Some(slot) = scene.agents.get_mut(&agent_id) {
slot.state = ActivityState::Waiting {
reason: Arc::<str>::from(reason.as_str()),
};
slot.state_started_at = now;
slot.last_event_at = now;
slot.pending_idle_at = None;
}
}
AgentEvent::Rename { agent_id, label } => {
if let Some(slot) = scene.agents.get_mut(&agent_id) {
if &*slot.label != label.as_str() {
slot.label = Arc::<str>::from(label.as_str());
}
slot.last_event_at = now;
}
}
AgentEvent::SessionEnd { agent_id } => {
if let Some(slot) = scene.agents.get_mut(&agent_id) {
if slot.exiting_at.is_none() {
slot.exiting_at = Some(now);
}
}
let mut visited = HashSet::new();
visited.insert(agent_id);
let mut frontier = vec![agent_id];
while let Some(parent) = frontier.pop() {
let children: Vec<AgentId> = scene
.agents
.values()
.filter(|s| s.parent_id == Some(parent) && s.exiting_at.is_none())
.map(|s| s.agent_id)
.collect();
for cid in children {
if visited.insert(cid) {
if let Some(slot) = scene.agents.get_mut(&cid) {
slot.exiting_at = Some(now);
}
frontier.push(cid);
}
}
}
}
}
}
fn gc(&mut self, now: SystemTime) {
self.recent_hook_tool_uses
.retain(|_, ts| now.duration_since(*ts).is_ok_and(|d| d < HOOK_WINS_WINDOW));
}
fn expire_pending_idles(&mut self, scene: &mut SceneState, now: SystemTime) {
for slot in scene.agents.values_mut() {
let Some(pending) = slot.pending_idle_at else {
continue;
};
if now
.duration_since(pending)
.is_ok_and(|d| d >= ACTIVE_GRACE_WINDOW)
{
if matches!(slot.state, ActivityState::Active { .. }) {
let elapsed = pending
.duration_since(slot.state_started_at)
.unwrap_or_default()
.as_millis() as u64;
slot.active_ms += elapsed;
slot.state = ActivityState::Idle;
slot.state_started_at = now;
}
slot.pending_idle_at = None;
}
}
}
fn sweep_stale(&mut self, scene: &mut SceneState, now: SystemTime) {
for slot in scene.agents.values_mut() {
if slot.exiting_at.is_some() {
continue;
}
let age = now
.duration_since(slot.last_event_at)
.unwrap_or(Duration::ZERO);
let unknown_cwd = slot.unknown_cwd;
let threshold = if unknown_cwd {
STALE_UNKNOWN_CWD_TIMEOUT
} else {
match &slot.state {
ActivityState::Active { .. } => STALE_ACTIVE_TIMEOUT,
ActivityState::Idle => STALE_IDLE_TIMEOUT,
ActivityState::Waiting { .. } => STALE_WAITING_TIMEOUT,
}
};
if age > threshold {
tracing::info!(
agent_id = ?slot.agent_id,
label = %slot.label,
age_secs = age.as_secs(),
threshold_secs = threshold.as_secs(),
"stale agent — marking exiting"
);
slot.exiting_at = Some(now);
}
}
}
fn sweep_exited(&mut self, scene: &mut SceneState, now: SystemTime) {
let expired: Vec<AgentId> = scene
.agents
.iter()
.filter_map(|(id, slot)| {
slot.exiting_at
.filter(|t| now.duration_since(*t).is_ok_and(|d| d > EXIT_GRACE_WINDOW))
.map(|_| *id)
})
.collect();
for id in expired {
scene.agents.remove(&id);
self.active_tasks.remove(&id);
}
}
}
fn event_tool_use_id(ev: &AgentEvent) -> Option<&str> {
match ev {
AgentEvent::ActivityStart { tool_use_id, .. }
| AgentEvent::ActivityEnd { tool_use_id, .. } => tool_use_id.as_deref(),
_ => None,
}
}