Skip to main content

ai_agent/utils/
sdk_event_queue.rs

1//! SDK event queue utilities.
2//! Translated from ~/claudecode/openclaudecode/src/utils/sdkEventQueue.ts
3//!
4//! SDK events are emitted during headless/streaming sessions and drained
5//! by the CLI output layer. In TUI mode they are silently dropped to avoid
6//! queue buildup.
7
8use std::collections::VecDeque;
9use std::sync::{Arc, Mutex};
10
11use serde::{Deserialize, Serialize};
12use uuid::Uuid;
13
14use crate::bootstrap::state::{get_is_non_interactive_session, get_session_id};
15
16/// Maximum number of SDK events to keep in the queue before dropping oldest.
17const MAX_QUEUE_SIZE: usize = 1000;
18
19/// A typed SDK event subtype.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum SdkEventType {
23    /// A background task has started.
24    TaskStarted,
25    /// A task is making progress (called periodically).
26    TaskProgress,
27    /// A task has reached a terminal state.
28    TaskNotification,
29    /// The session has changed state (idle/running/requires_action).
30    SessionStateChanged,
31}
32
33/// A single SDK event with all possible fields (matching TS union type).
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35pub struct SdkEvent {
36    /// Always "system" for internal SDK events.
37    #[serde(rename = "type")]
38    pub event_type: String,
39    /// The event subtype discriminator.
40    pub subtype: SdkEventType,
41    /// The task ID this event relates to (optional).
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub task_id: Option<String>,
44    /// The tool use ID that triggered the task (optional).
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub tool_use_id: Option<String>,
47    /// Human-readable description of the task (optional).
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub description: Option<String>,
50    /// The type of task (e.g. "local_agent", "local_bash") (optional).
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub task_type: Option<String>,
53    /// The workflow name for workflow-based tasks (optional).
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub workflow_name: Option<String>,
56    /// The prompt for dream tasks (optional).
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub prompt: Option<String>,
59    /// Usage statistics for task progress (optional).
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub usage: Option<SdkEventUsage>,
62    /// Last tool name used during task progress (optional).
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub last_tool_name: Option<String>,
65    /// Summary of progress (optional).
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub summary: Option<String>,
68    /// Workflow progress data (optional).
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub workflow_progress: Option<Vec<serde_json::Value>>,
71    /// For task_notification: terminal status of the task.
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub status: Option<String>,
74    /// For task_notification: path to the task output file.
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub output_file: Option<String>,
77    /// For session_state_changed: the new session state.
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub state: Option<String>,
80}
81
82/// Usage statistics for task_progress events.
83#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
84pub struct SdkEventUsage {
85    pub total_tokens: u64,
86    pub tool_uses: u64,
87    pub duration_ms: u64,
88}
89
90/// A drained SDK event with UUID and session ID appended.
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct DrainedSdkEvent {
93    pub uuid: String,
94    pub session_id: String,
95    #[serde(flatten)]
96    pub event: SdkEvent,
97}
98
99impl SdkEvent {
100    /// Create a new task_started event.
101    pub fn task_started(
102        task_id: String,
103        tool_use_id: Option<String>,
104        description: String,
105        task_type: Option<String>,
106        workflow_name: Option<String>,
107        prompt: Option<String>,
108    ) -> Self {
109        Self {
110            event_type: "system".to_string(),
111            subtype: SdkEventType::TaskStarted,
112            task_id: Some(task_id),
113            tool_use_id,
114            description: Some(description),
115            task_type,
116            workflow_name,
117            prompt,
118            usage: None,
119            last_tool_name: None,
120            summary: None,
121            workflow_progress: None,
122            status: None,
123            output_file: None,
124            state: None,
125        }
126    }
127
128    /// Create a new task_progress event.
129    pub fn task_progress(
130        task_id: String,
131        tool_use_id: Option<String>,
132        description: String,
133        usage: SdkEventUsage,
134        last_tool_name: Option<String>,
135        summary: Option<String>,
136        workflow_progress: Option<Vec<serde_json::Value>>,
137    ) -> Self {
138        Self {
139            event_type: "system".to_string(),
140            subtype: SdkEventType::TaskProgress,
141            task_id: Some(task_id),
142            tool_use_id,
143            description: Some(description),
144            task_type: None,
145            workflow_name: None,
146            prompt: None,
147            usage: Some(usage),
148            last_tool_name,
149            summary,
150            workflow_progress,
151            status: None,
152            output_file: None,
153            state: None,
154        }
155    }
156
157    /// Create a new task_notification event for a terminal task.
158    pub fn task_notification(
159        task_id: String,
160        tool_use_id: Option<String>,
161        status: String,
162        output_file: String,
163        summary: String,
164        usage: Option<SdkEventUsage>,
165    ) -> Self {
166        Self {
167            event_type: "system".to_string(),
168            subtype: SdkEventType::TaskNotification,
169            task_id: Some(task_id),
170            tool_use_id,
171            description: None,
172            task_type: None,
173            workflow_name: None,
174            prompt: None,
175            usage,
176            last_tool_name: None,
177            summary: if summary.is_empty() {
178                None
179            } else {
180                Some(summary)
181            },
182            workflow_progress: None,
183            status: Some(status),
184            output_file: if output_file.is_empty() {
185                None
186            } else {
187                Some(output_file)
188            },
189            state: None,
190        }
191    }
192
193    /// Create a new session_state_changed event.
194    pub fn session_state_changed(state: String) -> Self {
195        Self {
196            event_type: "system".to_string(),
197            subtype: SdkEventType::SessionStateChanged,
198            task_id: None,
199            tool_use_id: None,
200            description: None,
201            task_type: None,
202            workflow_name: None,
203            prompt: None,
204            usage: None,
205            last_tool_name: None,
206            summary: None,
207            workflow_progress: None,
208            status: None,
209            output_file: None,
210            state: Some(state),
211        }
212    }
213}
214
215/// Thread-safe event queue for SDK communication.
216pub struct SdkEventQueue {
217    events: Arc<Mutex<VecDeque<SdkEvent>>>,
218}
219
220impl SdkEventQueue {
221    pub fn new() -> Self {
222        Self {
223            events: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_QUEUE_SIZE))),
224        }
225    }
226
227    /// Push an event into the queue. Drops oldest if over capacity.
228    pub fn push(&self, event: SdkEvent) {
229        let mut queue = self.events.lock().unwrap();
230        if queue.len() >= MAX_QUEUE_SIZE {
231            queue.pop_front();
232        }
233        queue.push_back(event);
234    }
235
236    /// Pop the next event from the queue.
237    pub fn pop(&self) -> Option<SdkEvent> {
238        self.events.lock().unwrap().pop_front()
239    }
240
241    /// Drain all events from the queue (returns all and clears).
242    pub fn drain(&self) -> Vec<SdkEvent> {
243        self.events.lock().unwrap().drain(..).collect()
244    }
245
246    pub fn len(&self) -> usize {
247        self.events.lock().unwrap().len()
248    }
249
250    pub fn is_empty(&self) -> bool {
251        self.len() == 0
252    }
253
254    pub fn clear(&self) {
255        self.events.lock().unwrap().clear();
256    }
257}
258
259impl Default for SdkEventQueue {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265/// The global SDK event queue singleton.
266/// Only consumed (drained) in headless/streaming mode.
267static GLOBAL_QUEUE: std::sync::LazyLock<SdkEventQueue> =
268    std::sync::LazyLock::new(SdkEventQueue::new);
269
270/// Enqueue an SDK event.
271/// In TUI mode (interactive session), events are dropped to avoid queue buildup.
272pub fn enqueue_sdk_event(event: SdkEvent) {
273    if !get_is_non_interactive_session() {
274        return;
275    }
276    GLOBAL_QUEUE.push(event);
277}
278
279/// Drain all pending SDK events and return them with uuid and session_id.
280/// Call this from the output layer (print.ts equivalent) to deliver events
281/// to SDK consumers.
282pub fn drain_sdk_events() -> Vec<DrainedSdkEvent> {
283    let events = GLOBAL_QUEUE.drain();
284    if events.is_empty() {
285        return Vec::new();
286    }
287    let session_id = get_session_id();
288    events
289        .into_iter()
290        .map(|event| DrainedSdkEvent {
291            uuid: Uuid::new_v4().to_string(),
292            session_id: session_id.clone(),
293            event,
294        })
295        .collect()
296}
297
298/// Emit a `task_started` SDK event.
299/// Called by `register_task()` when a new task is registered.
300pub fn emit_task_started(
301    task_id: &str,
302    tool_use_id: Option<String>,
303    description: &str,
304    task_type: Option<String>,
305    workflow_name: Option<String>,
306    prompt: Option<String>,
307) {
308    enqueue_sdk_event(SdkEvent::task_started(
309        task_id.to_string(),
310        tool_use_id,
311        description.to_string(),
312        task_type,
313        workflow_name,
314        prompt,
315    ));
316}
317
318/// Emit a `task_progress` SDK event.
319/// Shared by background agents (per tool_use) and workflows (per flush batch).
320pub fn emit_task_progress(params: TaskProgressParams) {
321    enqueue_sdk_event(SdkEvent::task_progress(
322        params.task_id,
323        params.tool_use_id,
324        params.description,
325        params.usage,
326        params.last_tool_name,
327        params.summary,
328        params.workflow_progress,
329    ));
330}
331
332/// Parameters for task_progress event emission.
333pub struct TaskProgressParams {
334    pub task_id: String,
335    pub tool_use_id: Option<String>,
336    pub description: String,
337    pub usage: SdkEventUsage,
338    pub last_tool_name: Option<String>,
339    pub summary: Option<String>,
340    pub workflow_progress: Option<Vec<serde_json::Value>>,
341}
342
343/// Emit a `task_notification` SDK event for a task reaching a terminal state.
344///
345/// This is the closing bookend to `emit_task_started` (always called via
346/// `register_task`). Call this from any exit path that sets a task terminal
347/// WITHOUT going through enqueuePendingNotification (print.rs parses the XML
348/// into the same SDK event, so paths that do both would double-emit).
349/// Paths that suppress the XML notification (notified: true pre-set, kill
350/// paths, abort branches) must call this directly so SDK consumers
351/// (Scuttle's bg-task dot, VS Code subagent panel) see the task close.
352pub fn emit_task_terminated_sdk(
353    task_id: &str,
354    tool_use_id: Option<String>,
355    status: &str,
356    summary: Option<String>,
357    output_file: Option<String>,
358    usage: Option<SdkEventUsage>,
359) {
360    enqueue_sdk_event(SdkEvent::task_notification(
361        task_id.to_string(),
362        tool_use_id,
363        status.to_string(),
364        output_file.unwrap_or_default(),
365        summary.unwrap_or_default(),
366        usage,
367    ));
368}
369
370/// Emit a `session_state_changed` SDK event.
371/// Mirrors `notifySessionStateChanged` — the 'idle' transition fires after
372/// heldBackResult flushes and the bg-agent do-while loop exits, serving as
373/// the authoritative "turn is over" signal for SDK consumers.
374pub fn emit_session_state_changed(state: &str) {
375    enqueue_sdk_event(SdkEvent::session_state_changed(state.to_string()));
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_sdk_event_task_started() {
384        let event = SdkEvent::task_started(
385            "task-1".to_string(),
386            Some("tool-1".to_string()),
387            "Test task".to_string(),
388            Some("local_agent".to_string()),
389            None,
390            None,
391        );
392        assert_eq!(event.subtype, SdkEventType::TaskStarted);
393        assert_eq!(event.task_id, Some("task-1".to_string()));
394        assert_eq!(event.event_type, "system");
395    }
396
397    #[test]
398    fn test_sdk_event_task_notification() {
399        let event = SdkEvent::task_notification(
400            "task-1".to_string(),
401            None,
402            "completed".to_string(),
403            "/tmp/task_output.txt".to_string(),
404            "Task completed successfully".to_string(),
405            None,
406        );
407        assert_eq!(event.subtype, SdkEventType::TaskNotification);
408        assert_eq!(event.status, Some("completed".to_string()));
409    }
410
411    #[test]
412    fn test_sdk_event_session_state_changed() {
413        let event = SdkEvent::session_state_changed("idle".to_string());
414        assert_eq!(event.subtype, SdkEventType::SessionStateChanged);
415        assert_eq!(event.state, Some("idle".to_string()));
416    }
417
418    #[test]
419    fn test_sdk_event_queue() {
420        let queue = SdkEventQueue::new();
421        let event = SdkEvent::task_started(
422            "task-1".to_string(),
423            None,
424            "Test".to_string(),
425            None,
426            None,
427            None,
428        );
429        queue.push(event.clone());
430        assert_eq!(queue.len(), 1);
431        assert_eq!(queue.pop(), Some(event));
432        assert_eq!(queue.len(), 0);
433    }
434
435    #[test]
436    fn test_sdk_event_queue_capacity() {
437        let queue = SdkEventQueue::new();
438        // Push more than MAX_QUEUE_SIZE to test drop-oldest behavior
439        for i in 0..1000 {
440            queue.push(SdkEvent::task_started(
441                format!("task-{i}"),
442                None,
443                "Test".to_string(),
444                None,
445                None,
446                None,
447            ));
448        }
449        assert_eq!(queue.len(), 1000);
450    }
451
452    #[test]
453    fn test_sdk_event_usage() {
454        let usage = SdkEventUsage {
455            total_tokens: 1500,
456            tool_uses: 5,
457            duration_ms: 3000,
458        };
459        assert_eq!(usage.total_tokens, 1500);
460        assert_eq!(usage.tool_uses, 5);
461        assert_eq!(usage.duration_ms, 3000);
462    }
463}