Skip to main content

a3s_code_core/
subagent_task_tracker.rs

1//! In-memory tracker for delegated subagent tasks.
2//!
3//! Materializes a queryable view of subagent task lifecycle from the
4//! `AgentEvent` stream. The event stream remains the authoritative record;
5//! this module exists so callers can ask "what is task X doing right now?"
6//! without scanning `run_events()`.
7
8use crate::agent::AgentEvent;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use tokio::sync::RwLock;
12use tokio_util::sync::CancellationToken;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16#[non_exhaustive]
17pub enum SubagentStatus {
18    Running,
19    Completed,
20    Failed,
21    Cancelled,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct SubagentProgressEntry {
26    pub timestamp_ms: u64,
27    pub status: String,
28    pub metadata: serde_json::Value,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubagentTaskSnapshot {
33    pub task_id: String,
34    pub parent_session_id: String,
35    pub child_session_id: String,
36    pub agent: String,
37    pub description: String,
38    pub status: SubagentStatus,
39    pub started_ms: u64,
40    pub updated_ms: u64,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub finished_ms: Option<u64>,
43    #[serde(skip_serializing_if = "Option::is_none")]
44    pub output: Option<String>,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub success: Option<bool>,
47    pub progress: Vec<SubagentProgressEntry>,
48}
49
50#[derive(Debug, Default)]
51pub struct InMemorySubagentTaskTracker {
52    tasks: RwLock<HashMap<String, SubagentTaskSnapshot>>,
53    cancellers: RwLock<HashMap<String, CancellationToken>>,
54}
55
56impl InMemorySubagentTaskTracker {
57    pub fn new() -> Self {
58        Self::default()
59    }
60
61    /// Register a `CancellationToken` for a running task so callers can
62    /// trigger cancellation through `cancel(task_id)`. The task executor
63    /// is expected to remove the entry on exit via `clear_canceller`.
64    pub async fn register_canceller(&self, task_id: &str, token: CancellationToken) {
65        self.cancellers
66            .write()
67            .await
68            .insert(task_id.to_string(), token);
69    }
70
71    pub async fn clear_canceller(&self, task_id: &str) {
72        self.cancellers.write().await.remove(task_id);
73    }
74
75    /// Fire the registered token and mark the snapshot as `Cancelled`.
76    /// Returns `true` if a token was found (caller can interpret as
77    /// "cancellation initiated"), `false` if the task id was unknown or
78    /// the task already finished. The eventual `SubagentEnd` event won't
79    /// overwrite the Cancelled status — see `record_event`.
80    pub async fn cancel(&self, task_id: &str) -> bool {
81        let token = self.cancellers.write().await.remove(task_id);
82        match token {
83            Some(token) => {
84                token.cancel();
85                let now = now_ms();
86                let mut tasks = self.tasks.write().await;
87                if let Some(entry) = tasks.get_mut(task_id) {
88                    if entry.status == SubagentStatus::Running {
89                        entry.status = SubagentStatus::Cancelled;
90                        entry.updated_ms = now;
91                    }
92                }
93                true
94            }
95            None => false,
96        }
97    }
98
99    /// Apply a single agent event to the tracker. Non-subagent events are ignored.
100    pub async fn record_event(&self, event: &AgentEvent) {
101        match event {
102            AgentEvent::SubagentStart {
103                task_id,
104                session_id,
105                parent_session_id,
106                agent,
107                description,
108            } => {
109                let now = now_ms();
110                let mut tasks = self.tasks.write().await;
111                tasks
112                    .entry(task_id.clone())
113                    .and_modify(|task| {
114                        // Late start (e.g. background path) — keep the first-seen
115                        // started_ms but refresh fields we now know.
116                        task.parent_session_id = parent_session_id.clone();
117                        task.child_session_id = session_id.clone();
118                        task.agent = agent.clone();
119                        task.description = description.clone();
120                        task.updated_ms = now;
121                    })
122                    .or_insert_with(|| SubagentTaskSnapshot {
123                        task_id: task_id.clone(),
124                        parent_session_id: parent_session_id.clone(),
125                        child_session_id: session_id.clone(),
126                        agent: agent.clone(),
127                        description: description.clone(),
128                        status: SubagentStatus::Running,
129                        started_ms: now,
130                        updated_ms: now,
131                        finished_ms: None,
132                        output: None,
133                        success: None,
134                        progress: Vec::new(),
135                    });
136            }
137            AgentEvent::SubagentProgress {
138                task_id,
139                session_id,
140                status,
141                metadata,
142            } => {
143                let now = now_ms();
144                let mut tasks = self.tasks.write().await;
145                let entry = tasks
146                    .entry(task_id.clone())
147                    .or_insert_with(|| SubagentTaskSnapshot {
148                        task_id: task_id.clone(),
149                        parent_session_id: String::new(),
150                        child_session_id: session_id.clone(),
151                        agent: String::new(),
152                        description: String::new(),
153                        status: SubagentStatus::Running,
154                        started_ms: now,
155                        updated_ms: now,
156                        finished_ms: None,
157                        output: None,
158                        success: None,
159                        progress: Vec::new(),
160                    });
161                entry.updated_ms = now;
162                entry.progress.push(SubagentProgressEntry {
163                    timestamp_ms: now,
164                    status: status.clone(),
165                    metadata: metadata.clone(),
166                });
167            }
168            AgentEvent::SubagentEnd {
169                task_id,
170                session_id,
171                agent,
172                output,
173                success,
174            } => {
175                let now = now_ms();
176                let mut tasks = self.tasks.write().await;
177                let entry = tasks
178                    .entry(task_id.clone())
179                    .or_insert_with(|| SubagentTaskSnapshot {
180                        task_id: task_id.clone(),
181                        parent_session_id: String::new(),
182                        child_session_id: session_id.clone(),
183                        agent: agent.clone(),
184                        description: String::new(),
185                        status: SubagentStatus::Running,
186                        started_ms: now,
187                        updated_ms: now,
188                        finished_ms: None,
189                        output: None,
190                        success: None,
191                        progress: Vec::new(),
192                    });
193                // Preserve a pre-set Cancelled status (set by `cancel()`)
194                // — a late SubagentEnd from the cancelled child loop is
195                // expected and must not downgrade the terminal state.
196                if entry.status != SubagentStatus::Cancelled {
197                    entry.status = if *success {
198                        SubagentStatus::Completed
199                    } else {
200                        SubagentStatus::Failed
201                    };
202                }
203                entry.updated_ms = now;
204                entry.finished_ms = Some(now);
205                entry.output = Some(output.clone());
206                entry.success = Some(*success);
207            }
208            _ => {}
209        }
210    }
211
212    pub async fn get(&self, task_id: &str) -> Option<SubagentTaskSnapshot> {
213        self.tasks.read().await.get(task_id).cloned()
214    }
215
216    pub async fn list(&self) -> Vec<SubagentTaskSnapshot> {
217        let mut tasks = self
218            .tasks
219            .read()
220            .await
221            .values()
222            .cloned()
223            .collect::<Vec<_>>();
224        tasks.sort_by_key(|task| task.started_ms);
225        tasks
226    }
227
228    pub async fn list_pending(&self) -> Vec<SubagentTaskSnapshot> {
229        self.list()
230            .await
231            .into_iter()
232            .filter(|task| task.status == SubagentStatus::Running)
233            .collect()
234    }
235
236    pub async fn list_for_parent(&self, parent_session_id: &str) -> Vec<SubagentTaskSnapshot> {
237        self.list()
238            .await
239            .into_iter()
240            .filter(|task| task.parent_session_id == parent_session_id)
241            .collect()
242    }
243}
244
245fn now_ms() -> u64 {
246    use std::time::{SystemTime, UNIX_EPOCH};
247    SystemTime::now()
248        .duration_since(UNIX_EPOCH)
249        .map(|d| d.as_millis() as u64)
250        .unwrap_or(0)
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    fn start_event(task_id: &str, parent: &str, child: &str) -> AgentEvent {
258        AgentEvent::SubagentStart {
259            task_id: task_id.to_string(),
260            session_id: child.to_string(),
261            parent_session_id: parent.to_string(),
262            agent: "explore".to_string(),
263            description: "find things".to_string(),
264        }
265    }
266
267    fn progress_event(task_id: &str, child: &str, status: &str) -> AgentEvent {
268        AgentEvent::SubagentProgress {
269            task_id: task_id.to_string(),
270            session_id: child.to_string(),
271            status: status.to_string(),
272            metadata: serde_json::json!({}),
273        }
274    }
275
276    fn end_event(task_id: &str, child: &str, success: bool) -> AgentEvent {
277        AgentEvent::SubagentEnd {
278            task_id: task_id.to_string(),
279            session_id: child.to_string(),
280            agent: "explore".to_string(),
281            output: "done".to_string(),
282            success,
283        }
284    }
285
286    #[tokio::test]
287    async fn lifecycle_start_progress_end_transitions_status() {
288        let tracker = InMemorySubagentTaskTracker::new();
289
290        tracker
291            .record_event(&start_event("task-1", "parent", "child"))
292            .await;
293        let snap = tracker.get("task-1").await.unwrap();
294        assert_eq!(snap.status, SubagentStatus::Running);
295        assert_eq!(snap.parent_session_id, "parent");
296        assert_eq!(snap.child_session_id, "child");
297        assert!(snap.finished_ms.is_none());
298
299        tracker
300            .record_event(&progress_event("task-1", "child", "tool_completed: bash"))
301            .await;
302        let snap = tracker.get("task-1").await.unwrap();
303        assert_eq!(snap.status, SubagentStatus::Running);
304        assert_eq!(snap.progress.len(), 1);
305
306        tracker
307            .record_event(&end_event("task-1", "child", true))
308            .await;
309        let snap = tracker.get("task-1").await.unwrap();
310        assert_eq!(snap.status, SubagentStatus::Completed);
311        assert_eq!(snap.success, Some(true));
312        assert_eq!(snap.output.as_deref(), Some("done"));
313        assert!(snap.finished_ms.is_some());
314    }
315
316    #[tokio::test]
317    async fn failed_end_event_marks_status_failed() {
318        let tracker = InMemorySubagentTaskTracker::new();
319        tracker
320            .record_event(&start_event("task-2", "parent", "child"))
321            .await;
322        tracker
323            .record_event(&end_event("task-2", "child", false))
324            .await;
325        let snap = tracker.get("task-2").await.unwrap();
326        assert_eq!(snap.status, SubagentStatus::Failed);
327        assert_eq!(snap.success, Some(false));
328    }
329
330    #[tokio::test]
331    async fn pending_list_excludes_completed_tasks() {
332        let tracker = InMemorySubagentTaskTracker::new();
333        tracker
334            .record_event(&start_event("task-a", "parent", "child-a"))
335            .await;
336        tracker
337            .record_event(&start_event("task-b", "parent", "child-b"))
338            .await;
339        tracker
340            .record_event(&end_event("task-a", "child-a", true))
341            .await;
342
343        let pending = tracker.list_pending().await;
344        assert_eq!(pending.len(), 1);
345        assert_eq!(pending[0].task_id, "task-b");
346    }
347
348    #[tokio::test]
349    async fn list_for_parent_filters_by_session() {
350        let tracker = InMemorySubagentTaskTracker::new();
351        tracker
352            .record_event(&start_event("task-a", "session-1", "child-a"))
353            .await;
354        tracker
355            .record_event(&start_event("task-b", "session-2", "child-b"))
356            .await;
357
358        let mine = tracker.list_for_parent("session-1").await;
359        assert_eq!(mine.len(), 1);
360        assert_eq!(mine[0].task_id, "task-a");
361    }
362
363    #[tokio::test]
364    async fn end_before_start_still_records_terminal_state() {
365        let tracker = InMemorySubagentTaskTracker::new();
366        tracker
367            .record_event(&end_event("task-late", "child", true))
368            .await;
369        let snap = tracker.get("task-late").await.unwrap();
370        assert_eq!(snap.status, SubagentStatus::Completed);
371    }
372
373    #[tokio::test]
374    async fn non_subagent_events_are_ignored() {
375        let tracker = InMemorySubagentTaskTracker::new();
376        tracker
377            .record_event(&AgentEvent::TextDelta {
378                text: "ignore me".to_string(),
379            })
380            .await;
381        assert!(tracker.list().await.is_empty());
382    }
383
384    #[tokio::test]
385    async fn cancel_fires_token_and_marks_snapshot_cancelled() {
386        let tracker = InMemorySubagentTaskTracker::new();
387        tracker
388            .record_event(&start_event("task-c", "parent", "child"))
389            .await;
390
391        let token = CancellationToken::new();
392        tracker.register_canceller("task-c", token.clone()).await;
393        assert!(!token.is_cancelled());
394
395        let fired = tracker.cancel("task-c").await;
396        assert!(fired, "cancel should report success");
397        assert!(token.is_cancelled(), "registered token should be triggered");
398
399        let snap = tracker.get("task-c").await.unwrap();
400        assert_eq!(snap.status, SubagentStatus::Cancelled);
401    }
402
403    #[tokio::test]
404    async fn cancel_returns_false_for_unknown_task() {
405        let tracker = InMemorySubagentTaskTracker::new();
406        assert!(!tracker.cancel("task-does-not-exist").await);
407    }
408
409    #[tokio::test]
410    async fn late_subagent_end_does_not_downgrade_cancelled_status() {
411        let tracker = InMemorySubagentTaskTracker::new();
412        tracker
413            .record_event(&start_event("task-d", "parent", "child"))
414            .await;
415        let token = CancellationToken::new();
416        tracker.register_canceller("task-d", token).await;
417        assert!(tracker.cancel("task-d").await);
418
419        // The cancelled child loop will still emit a (likely failed)
420        // SubagentEnd. The terminal status should remain Cancelled.
421        tracker
422            .record_event(&end_event("task-d", "child", false))
423            .await;
424        let snap = tracker.get("task-d").await.unwrap();
425        assert_eq!(snap.status, SubagentStatus::Cancelled);
426        assert!(snap.finished_ms.is_some());
427        assert_eq!(snap.success, Some(false));
428    }
429
430    #[tokio::test]
431    async fn clear_canceller_disarms_future_cancel_calls() {
432        let tracker = InMemorySubagentTaskTracker::new();
433        tracker
434            .record_event(&start_event("task-e", "parent", "child"))
435            .await;
436        let token = CancellationToken::new();
437        tracker.register_canceller("task-e", token.clone()).await;
438        tracker.clear_canceller("task-e").await;
439
440        assert!(!tracker.cancel("task-e").await);
441        assert!(!token.is_cancelled());
442    }
443}