1use crate::agent::AgentEvent;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use tokio::sync::RwLock;
12use tokio_util::sync::CancellationToken;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16#[non_exhaustive]
17pub enum SubagentStatus {
18 Running,
19 Completed,
20 Failed,
21 Cancelled,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct SubagentProgressEntry {
26 pub timestamp_ms: u64,
27 pub status: String,
28 pub metadata: serde_json::Value,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct SubagentTaskSnapshot {
33 pub task_id: String,
34 pub parent_session_id: String,
35 pub child_session_id: String,
36 pub agent: String,
37 pub description: String,
38 pub status: SubagentStatus,
39 pub started_ms: u64,
40 pub updated_ms: u64,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub finished_ms: Option<u64>,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 pub output: Option<String>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub success: Option<bool>,
47 pub progress: Vec<SubagentProgressEntry>,
48}
49
50#[derive(Debug, Default)]
51pub struct InMemorySubagentTaskTracker {
52 tasks: RwLock<HashMap<String, SubagentTaskSnapshot>>,
53 cancellers: RwLock<HashMap<String, CancellationToken>>,
54}
55
56impl InMemorySubagentTaskTracker {
57 pub fn new() -> Self {
58 Self::default()
59 }
60
61 pub async fn register_canceller(&self, task_id: &str, token: CancellationToken) {
65 self.cancellers
66 .write()
67 .await
68 .insert(task_id.to_string(), token);
69 }
70
71 pub async fn clear_canceller(&self, task_id: &str) {
72 self.cancellers.write().await.remove(task_id);
73 }
74
75 pub async fn cancel(&self, task_id: &str) -> bool {
81 let token = self.cancellers.write().await.remove(task_id);
82 match token {
83 Some(token) => {
84 token.cancel();
85 let now = now_ms();
86 let mut tasks = self.tasks.write().await;
87 if let Some(entry) = tasks.get_mut(task_id) {
88 if entry.status == SubagentStatus::Running {
89 entry.status = SubagentStatus::Cancelled;
90 entry.updated_ms = now;
91 }
92 }
93 true
94 }
95 None => false,
96 }
97 }
98
99 pub async fn record_event(&self, event: &AgentEvent) {
101 match event {
102 AgentEvent::SubagentStart {
103 task_id,
104 session_id,
105 parent_session_id,
106 agent,
107 description,
108 } => {
109 let now = now_ms();
110 let mut tasks = self.tasks.write().await;
111 tasks
112 .entry(task_id.clone())
113 .and_modify(|task| {
114 task.parent_session_id = parent_session_id.clone();
117 task.child_session_id = session_id.clone();
118 task.agent = agent.clone();
119 task.description = description.clone();
120 task.updated_ms = now;
121 })
122 .or_insert_with(|| SubagentTaskSnapshot {
123 task_id: task_id.clone(),
124 parent_session_id: parent_session_id.clone(),
125 child_session_id: session_id.clone(),
126 agent: agent.clone(),
127 description: description.clone(),
128 status: SubagentStatus::Running,
129 started_ms: now,
130 updated_ms: now,
131 finished_ms: None,
132 output: None,
133 success: None,
134 progress: Vec::new(),
135 });
136 }
137 AgentEvent::SubagentProgress {
138 task_id,
139 session_id,
140 status,
141 metadata,
142 } => {
143 let now = now_ms();
144 let mut tasks = self.tasks.write().await;
145 let entry = tasks
146 .entry(task_id.clone())
147 .or_insert_with(|| SubagentTaskSnapshot {
148 task_id: task_id.clone(),
149 parent_session_id: String::new(),
150 child_session_id: session_id.clone(),
151 agent: String::new(),
152 description: String::new(),
153 status: SubagentStatus::Running,
154 started_ms: now,
155 updated_ms: now,
156 finished_ms: None,
157 output: None,
158 success: None,
159 progress: Vec::new(),
160 });
161 entry.updated_ms = now;
162 entry.progress.push(SubagentProgressEntry {
163 timestamp_ms: now,
164 status: status.clone(),
165 metadata: metadata.clone(),
166 });
167 }
168 AgentEvent::SubagentEnd {
169 task_id,
170 session_id,
171 agent,
172 output,
173 success,
174 } => {
175 let now = now_ms();
176 let mut tasks = self.tasks.write().await;
177 let entry = tasks
178 .entry(task_id.clone())
179 .or_insert_with(|| SubagentTaskSnapshot {
180 task_id: task_id.clone(),
181 parent_session_id: String::new(),
182 child_session_id: session_id.clone(),
183 agent: agent.clone(),
184 description: String::new(),
185 status: SubagentStatus::Running,
186 started_ms: now,
187 updated_ms: now,
188 finished_ms: None,
189 output: None,
190 success: None,
191 progress: Vec::new(),
192 });
193 if entry.status != SubagentStatus::Cancelled {
197 entry.status = if *success {
198 SubagentStatus::Completed
199 } else {
200 SubagentStatus::Failed
201 };
202 }
203 entry.updated_ms = now;
204 entry.finished_ms = Some(now);
205 entry.output = Some(output.clone());
206 entry.success = Some(*success);
207 }
208 _ => {}
209 }
210 }
211
212 pub async fn get(&self, task_id: &str) -> Option<SubagentTaskSnapshot> {
213 self.tasks.read().await.get(task_id).cloned()
214 }
215
216 pub async fn list(&self) -> Vec<SubagentTaskSnapshot> {
217 let mut tasks = self
218 .tasks
219 .read()
220 .await
221 .values()
222 .cloned()
223 .collect::<Vec<_>>();
224 tasks.sort_by_key(|task| task.started_ms);
225 tasks
226 }
227
228 pub async fn list_pending(&self) -> Vec<SubagentTaskSnapshot> {
229 self.list()
230 .await
231 .into_iter()
232 .filter(|task| task.status == SubagentStatus::Running)
233 .collect()
234 }
235
236 pub async fn list_for_parent(&self, parent_session_id: &str) -> Vec<SubagentTaskSnapshot> {
237 self.list()
238 .await
239 .into_iter()
240 .filter(|task| task.parent_session_id == parent_session_id)
241 .collect()
242 }
243}
244
245fn now_ms() -> u64 {
246 use std::time::{SystemTime, UNIX_EPOCH};
247 SystemTime::now()
248 .duration_since(UNIX_EPOCH)
249 .map(|d| d.as_millis() as u64)
250 .unwrap_or(0)
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 fn start_event(task_id: &str, parent: &str, child: &str) -> AgentEvent {
258 AgentEvent::SubagentStart {
259 task_id: task_id.to_string(),
260 session_id: child.to_string(),
261 parent_session_id: parent.to_string(),
262 agent: "explore".to_string(),
263 description: "find things".to_string(),
264 }
265 }
266
267 fn progress_event(task_id: &str, child: &str, status: &str) -> AgentEvent {
268 AgentEvent::SubagentProgress {
269 task_id: task_id.to_string(),
270 session_id: child.to_string(),
271 status: status.to_string(),
272 metadata: serde_json::json!({}),
273 }
274 }
275
276 fn end_event(task_id: &str, child: &str, success: bool) -> AgentEvent {
277 AgentEvent::SubagentEnd {
278 task_id: task_id.to_string(),
279 session_id: child.to_string(),
280 agent: "explore".to_string(),
281 output: "done".to_string(),
282 success,
283 }
284 }
285
286 #[tokio::test]
287 async fn lifecycle_start_progress_end_transitions_status() {
288 let tracker = InMemorySubagentTaskTracker::new();
289
290 tracker
291 .record_event(&start_event("task-1", "parent", "child"))
292 .await;
293 let snap = tracker.get("task-1").await.unwrap();
294 assert_eq!(snap.status, SubagentStatus::Running);
295 assert_eq!(snap.parent_session_id, "parent");
296 assert_eq!(snap.child_session_id, "child");
297 assert!(snap.finished_ms.is_none());
298
299 tracker
300 .record_event(&progress_event("task-1", "child", "tool_completed: bash"))
301 .await;
302 let snap = tracker.get("task-1").await.unwrap();
303 assert_eq!(snap.status, SubagentStatus::Running);
304 assert_eq!(snap.progress.len(), 1);
305
306 tracker
307 .record_event(&end_event("task-1", "child", true))
308 .await;
309 let snap = tracker.get("task-1").await.unwrap();
310 assert_eq!(snap.status, SubagentStatus::Completed);
311 assert_eq!(snap.success, Some(true));
312 assert_eq!(snap.output.as_deref(), Some("done"));
313 assert!(snap.finished_ms.is_some());
314 }
315
316 #[tokio::test]
317 async fn failed_end_event_marks_status_failed() {
318 let tracker = InMemorySubagentTaskTracker::new();
319 tracker
320 .record_event(&start_event("task-2", "parent", "child"))
321 .await;
322 tracker
323 .record_event(&end_event("task-2", "child", false))
324 .await;
325 let snap = tracker.get("task-2").await.unwrap();
326 assert_eq!(snap.status, SubagentStatus::Failed);
327 assert_eq!(snap.success, Some(false));
328 }
329
330 #[tokio::test]
331 async fn pending_list_excludes_completed_tasks() {
332 let tracker = InMemorySubagentTaskTracker::new();
333 tracker
334 .record_event(&start_event("task-a", "parent", "child-a"))
335 .await;
336 tracker
337 .record_event(&start_event("task-b", "parent", "child-b"))
338 .await;
339 tracker
340 .record_event(&end_event("task-a", "child-a", true))
341 .await;
342
343 let pending = tracker.list_pending().await;
344 assert_eq!(pending.len(), 1);
345 assert_eq!(pending[0].task_id, "task-b");
346 }
347
348 #[tokio::test]
349 async fn list_for_parent_filters_by_session() {
350 let tracker = InMemorySubagentTaskTracker::new();
351 tracker
352 .record_event(&start_event("task-a", "session-1", "child-a"))
353 .await;
354 tracker
355 .record_event(&start_event("task-b", "session-2", "child-b"))
356 .await;
357
358 let mine = tracker.list_for_parent("session-1").await;
359 assert_eq!(mine.len(), 1);
360 assert_eq!(mine[0].task_id, "task-a");
361 }
362
363 #[tokio::test]
364 async fn end_before_start_still_records_terminal_state() {
365 let tracker = InMemorySubagentTaskTracker::new();
366 tracker
367 .record_event(&end_event("task-late", "child", true))
368 .await;
369 let snap = tracker.get("task-late").await.unwrap();
370 assert_eq!(snap.status, SubagentStatus::Completed);
371 }
372
373 #[tokio::test]
374 async fn non_subagent_events_are_ignored() {
375 let tracker = InMemorySubagentTaskTracker::new();
376 tracker
377 .record_event(&AgentEvent::TextDelta {
378 text: "ignore me".to_string(),
379 })
380 .await;
381 assert!(tracker.list().await.is_empty());
382 }
383
384 #[tokio::test]
385 async fn cancel_fires_token_and_marks_snapshot_cancelled() {
386 let tracker = InMemorySubagentTaskTracker::new();
387 tracker
388 .record_event(&start_event("task-c", "parent", "child"))
389 .await;
390
391 let token = CancellationToken::new();
392 tracker.register_canceller("task-c", token.clone()).await;
393 assert!(!token.is_cancelled());
394
395 let fired = tracker.cancel("task-c").await;
396 assert!(fired, "cancel should report success");
397 assert!(token.is_cancelled(), "registered token should be triggered");
398
399 let snap = tracker.get("task-c").await.unwrap();
400 assert_eq!(snap.status, SubagentStatus::Cancelled);
401 }
402
403 #[tokio::test]
404 async fn cancel_returns_false_for_unknown_task() {
405 let tracker = InMemorySubagentTaskTracker::new();
406 assert!(!tracker.cancel("task-does-not-exist").await);
407 }
408
409 #[tokio::test]
410 async fn late_subagent_end_does_not_downgrade_cancelled_status() {
411 let tracker = InMemorySubagentTaskTracker::new();
412 tracker
413 .record_event(&start_event("task-d", "parent", "child"))
414 .await;
415 let token = CancellationToken::new();
416 tracker.register_canceller("task-d", token).await;
417 assert!(tracker.cancel("task-d").await);
418
419 tracker
422 .record_event(&end_event("task-d", "child", false))
423 .await;
424 let snap = tracker.get("task-d").await.unwrap();
425 assert_eq!(snap.status, SubagentStatus::Cancelled);
426 assert!(snap.finished_ms.is_some());
427 assert_eq!(snap.success, Some(false));
428 }
429
430 #[tokio::test]
431 async fn clear_canceller_disarms_future_cancel_calls() {
432 let tracker = InMemorySubagentTaskTracker::new();
433 tracker
434 .record_event(&start_event("task-e", "parent", "child"))
435 .await;
436 let token = CancellationToken::new();
437 tracker.register_canceller("task-e", token.clone()).await;
438 tracker.clear_canceller("task-e").await;
439
440 assert!(!tracker.cancel("task-e").await);
441 assert!(!token.is_cancelled());
442 }
443}