Skip to main content

enact_core/streaming/
event_stream.rs

1//! Streaming Event Protocol
2//!
3//! Standardized event protocol for real-time streaming using SSE (Server-Sent Events).
4//! All events use `data-*` prefix for AI SDK UI compatibility.
5//!
6//! ## Protocol Categories
7//!
8//! ### Standard Text Events (AI SDK compatible)
9//! - `data-text-start` - Start of text generation
10//! - `data-text-delta` - Incremental text chunk
11//! - `data-text-end` - End of text generation
12//!
13//! ### Standard Step Events (AI SDK compatible)
14//! - `data-start-step` - Step boundary start
15//! - `data-finish-step` - Step boundary end
16//!
17//! ### Standard Tool Events (AI SDK compatible)
18//! - `data-tool-input-start` - Tool call started
19//! - `data-tool-input-delta` - Tool input streaming
20//! - `data-tool-input-available` - Tool input complete
21//! - `data-tool-output-available` - Tool result available
22//!
23//! ### Standard Lifecycle Events (AI SDK compatible)
24//! - `data-start` - Message/stream start
25//! - `data-finish` - Message/stream finish
26//! - `data-error` - Error occurred
27//!
28//! ### Custom Execution Events (Enact-specific)
29//! - `data-execution-start` - Execution started
30//! - `data-execution-end` - Execution completed
31//! - `data-execution-failed` - Execution failed
32//! - `data-execution-paused` - Execution paused
33//! - `data-execution-resumed` - Execution resumed
34//! - `data-execution-cancelled` - Execution cancelled
35//!
36//! ### Custom Step Events (Enact-specific)
37//! - `data-step-start` - Step started (with metadata)
38//! - `data-step-end` - Step completed
39//! - `data-step-failed` - Step failed
40//!
41//! ### Custom Artifact Events (Enact-specific)
42//! - `data-artifact-created` - Artifact produced
43//!
44//! @see https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol
45
46use super::event_logger::EventLog;
47use crate::kernel::{ArtifactId, ExecutionError, ExecutionId, StepId, StepSourceType, StepType};
48use crate::kernel::{ExecutionContext, ExecutionEvent, ExecutionEventType};
49use futures::Stream;
50use serde::{Deserialize, Serialize};
51use std::pin::Pin;
52use std::sync::Arc;
53
54/// Event stream type - async stream of events
55pub type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
56
57// =============================================================================
58// Stream Mode
59// =============================================================================
60
61/// StreamMode - Controls what events are included in the stream
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum StreamMode {
65    /// Full event stream - all events including token-level deltas
66    #[default]
67    Full,
68    /// Summary mode - only major events (step start/end, completion)
69    Summary,
70    /// Control only - only control signals (pause/resume/cancel)
71    ControlOnly,
72    /// Silent - no streaming (for batch processing)
73    Silent,
74}
75
76// =============================================================================
77// Stream Event - Standardized Protocol with data-* prefix
78// =============================================================================
79
80/// StreamEvent - All streaming events with `data-*` prefix
81///
82/// Events are tagged for direct SSE serialization.
83/// UI adapters can consume these directly or transform as needed.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(tag = "type")]
86pub enum StreamEvent {
87    // =========================================================================
88    // Standard AI SDK Events - Text Streaming
89    // =========================================================================
90    /// Text generation started
91    #[serde(rename = "data-text-start")]
92    TextStart {
93        id: String,
94        #[serde(skip_serializing_if = "Option::is_none")]
95        execution_id: Option<String>,
96    },
97
98    /// Incremental text chunk
99    #[serde(rename = "data-text-delta")]
100    TextDelta { id: String, delta: String },
101
102    /// Text generation ended
103    #[serde(rename = "data-text-end")]
104    TextEnd { id: String },
105
106    // =========================================================================
107    // Standard AI SDK Events - Step Control
108    // =========================================================================
109    /// Step boundary start (AI SDK step concept)
110    #[serde(rename = "data-start-step")]
111    StartStep {
112        #[serde(skip_serializing_if = "Option::is_none")]
113        step_id: Option<String>,
114    },
115
116    /// Step boundary end (AI SDK step concept)
117    #[serde(rename = "data-finish-step")]
118    FinishStep {
119        #[serde(skip_serializing_if = "Option::is_none")]
120        step_id: Option<String>,
121    },
122
123    // =========================================================================
124    // Standard AI SDK Events - Tool Calls
125    // =========================================================================
126    /// Tool input started
127    #[serde(rename = "data-tool-input-start")]
128    ToolInputStart {
129        tool_call_id: String,
130        tool_name: String,
131    },
132
133    /// Tool input delta (streaming input)
134    #[serde(rename = "data-tool-input-delta")]
135    ToolInputDelta {
136        tool_call_id: String,
137        input_text_delta: String,
138    },
139
140    /// Tool input available (complete input)
141    #[serde(rename = "data-tool-input-available")]
142    ToolInputAvailable {
143        tool_call_id: String,
144        tool_name: String,
145        input: serde_json::Value,
146    },
147
148    /// Tool output available
149    #[serde(rename = "data-tool-output-available")]
150    ToolOutputAvailable {
151        tool_call_id: String,
152        output: serde_json::Value,
153    },
154
155    /// Human-in-the-loop permission request before tool execution
156    #[serde(rename = "data-permission-request")]
157    PermissionRequest {
158        execution_id: String,
159        tool_name: String,
160        arguments: serde_json::Value,
161        policy: String,
162        timestamp: i64,
163    },
164
165    // =========================================================================
166    // Standard AI SDK Events - Lifecycle
167    // =========================================================================
168    /// Stream/message start
169    #[serde(rename = "data-start")]
170    Start {
171        message_id: String,
172        #[serde(skip_serializing_if = "Option::is_none")]
173        execution_id: Option<String>,
174    },
175
176    /// Stream/message finish
177    #[serde(rename = "data-finish")]
178    Finish {
179        message_id: String,
180        #[serde(skip_serializing_if = "Option::is_none")]
181        final_output: Option<String>,
182    },
183
184    /// Error occurred
185    #[serde(rename = "data-error")]
186    Error { error: ExecutionError },
187
188    // =========================================================================
189    // Custom Enact Events - Execution Lifecycle
190    // =========================================================================
191    /// Execution started
192    #[serde(rename = "data-execution-start")]
193    ExecutionStart {
194        execution_id: String,
195        #[serde(skip_serializing_if = "Option::is_none")]
196        parent_id: Option<String>,
197        #[serde(skip_serializing_if = "Option::is_none")]
198        parent_type: Option<String>,
199        timestamp: i64,
200    },
201
202    /// Execution completed successfully
203    #[serde(rename = "data-execution-end")]
204    ExecutionEnd {
205        execution_id: String,
206        #[serde(skip_serializing_if = "Option::is_none")]
207        final_output: Option<String>,
208        duration_ms: u64,
209        timestamp: i64,
210    },
211
212    /// Execution failed
213    #[serde(rename = "data-execution-failed")]
214    ExecutionFailed {
215        execution_id: String,
216        error: ExecutionError,
217        timestamp: i64,
218    },
219
220    /// Execution paused
221    #[serde(rename = "data-execution-paused")]
222    ExecutionPaused {
223        execution_id: String,
224        reason: String,
225        timestamp: i64,
226    },
227
228    /// Execution resumed
229    #[serde(rename = "data-execution-resumed")]
230    ExecutionResumed {
231        execution_id: String,
232        timestamp: i64,
233    },
234
235    /// Execution cancelled
236    #[serde(rename = "data-execution-cancelled")]
237    ExecutionCancelled {
238        execution_id: String,
239        reason: String,
240        timestamp: i64,
241    },
242
243    // =========================================================================
244    // Custom Enact Events - Step Lifecycle
245    // =========================================================================
246    /// Step started (with full metadata)
247    #[serde(rename = "data-step-start")]
248    StepStart {
249        execution_id: String,
250        step_id: String,
251        step_type: String,
252        name: String,
253        timestamp: i64,
254    },
255
256    /// Step completed
257    #[serde(rename = "data-step-end")]
258    StepEnd {
259        execution_id: String,
260        step_id: String,
261        #[serde(skip_serializing_if = "Option::is_none")]
262        output: Option<String>,
263        duration_ms: u64,
264        timestamp: i64,
265    },
266
267    /// Step failed
268    #[serde(rename = "data-step-failed")]
269    StepFailed {
270        execution_id: String,
271        step_id: String,
272        error: ExecutionError,
273        timestamp: i64,
274    },
275
276    /// Step discovered during execution (dynamic discovery for agentic loops)
277    #[serde(rename = "data-step-discovered")]
278    StepDiscovered {
279        execution_id: String,
280        step_id: String,
281        #[serde(skip_serializing_if = "Option::is_none")]
282        discovered_by: Option<String>,
283        source_type: String,
284        reason: String,
285        depth: u32,
286        timestamp: i64,
287    },
288
289    // =========================================================================
290    // Custom Enact Events - Artifacts
291    // =========================================================================
292    /// Artifact created
293    #[serde(rename = "data-artifact-created")]
294    ArtifactCreated {
295        execution_id: String,
296        step_id: String,
297        artifact_id: String,
298        artifact_type: String,
299        timestamp: i64,
300    },
301
302    // =========================================================================
303    // Custom Enact Events - State
304    // =========================================================================
305    /// State snapshot captured
306    #[serde(rename = "data-state-snapshot")]
307    StateSnapshot {
308        execution_id: String,
309        #[serde(skip_serializing_if = "Option::is_none")]
310        step_id: Option<String>,
311        state: serde_json::Value,
312        timestamp: i64,
313    },
314
315    /// Checkpoint saved (Goal-driven persistence)
316    #[serde(rename = "data-checkpoint-saved")]
317    CheckpointSaved {
318        execution_id: String,
319        #[serde(skip_serializing_if = "Option::is_none")]
320        step_id: Option<String>,
321        checkpoint_id: String,
322        state_hash: String,
323        timestamp: i64,
324    },
325
326    /// Goal evaluated
327    #[serde(rename = "data-goal-evaluated")]
328    GoalEvaluated {
329        execution_id: String,
330        #[serde(skip_serializing_if = "Option::is_none")]
331        step_id: Option<String>,
332        goal_id: String,
333        status: String, // "met", "not_met", "progressing"
334        #[serde(skip_serializing_if = "Option::is_none")]
335        score: Option<f64>,
336        #[serde(skip_serializing_if = "Option::is_none")]
337        reason: Option<String>,
338        timestamp: i64,
339    },
340
341    // =========================================================================
342    // Custom Enact Events - Inbox (Mid-Execution Guidance)
343    // =========================================================================
344    /// Inbox message received (INV-INBOX-003: audit trail)
345    #[serde(rename = "data-inbox-message")]
346    InboxMessage {
347        execution_id: String,
348        message_id: String,
349        message_type: String,
350        timestamp: i64,
351    },
352
353    // =========================================================================
354    // Custom Enact Events - Decisions (Audit Trail)
355    // =========================================================================
356    /// Policy decision made (audit trail for tool policy evaluation)
357    #[serde(rename = "data-policy-decision")]
358    PolicyDecision {
359        execution_id: String,
360        #[serde(skip_serializing_if = "Option::is_none")]
361        step_id: Option<String>,
362        tool_name: String,
363        decision: String, // "allow", "deny", "warn"
364        #[serde(skip_serializing_if = "Option::is_none")]
365        reason: Option<String>,
366        timestamp: i64,
367    },
368
369    // =========================================================================
370    // Custom Enact Events - LLM Call Observability
371    // =========================================================================
372    /// LLM call started
373    #[serde(rename = "data-llm-call-start")]
374    LlmCallStart {
375        execution_id: String,
376        #[serde(skip_serializing_if = "Option::is_none")]
377        step_id: Option<String>,
378        callable_name: String,
379        #[serde(skip_serializing_if = "Option::is_none")]
380        model: Option<String>,
381        history_length: usize,
382        timestamp: i64,
383    },
384
385    /// LLM call completed
386    #[serde(rename = "data-llm-call-end")]
387    LlmCallEnd {
388        execution_id: String,
389        #[serde(skip_serializing_if = "Option::is_none")]
390        step_id: Option<String>,
391        duration_ms: u64,
392        #[serde(skip_serializing_if = "Option::is_none")]
393        prompt_tokens: Option<u32>,
394        #[serde(skip_serializing_if = "Option::is_none")]
395        completion_tokens: Option<u32>,
396        #[serde(skip_serializing_if = "Option::is_none")]
397        total_tokens: Option<u32>,
398        timestamp: i64,
399    },
400
401    /// LLM call failed
402    #[serde(rename = "data-llm-call-failed")]
403    LlmCallFailed {
404        execution_id: String,
405        #[serde(skip_serializing_if = "Option::is_none")]
406        step_id: Option<String>,
407        error: String,
408        #[serde(skip_serializing_if = "Option::is_none")]
409        duration_ms: Option<u64>,
410        timestamp: i64,
411    },
412
413    // =========================================================================
414    // Custom Enact Events - Token & Cost Accounting
415    // =========================================================================
416    /// Token usage recorded (cumulative)
417    #[serde(rename = "data-token-usage")]
418    TokenUsageRecorded {
419        execution_id: String,
420        #[serde(skip_serializing_if = "Option::is_none")]
421        step_id: Option<String>,
422        prompt_tokens: u32,
423        completion_tokens: u32,
424        total_tokens: u32,
425        cumulative_tokens: u64,
426        #[serde(skip_serializing_if = "Option::is_none")]
427        cost_usd: Option<f64>,
428        #[serde(skip_serializing_if = "Option::is_none")]
429        cumulative_cost_usd: Option<f64>,
430        timestamp: i64,
431    },
432
433    // =========================================================================
434    // Custom Enact Events - Memory Access
435    // =========================================================================
436    /// Memory recalled (retrieved from memory store)
437    #[serde(rename = "data-memory-recalled")]
438    MemoryRecalled {
439        execution_id: String,
440        #[serde(skip_serializing_if = "Option::is_none")]
441        step_id: Option<String>,
442        query: String,
443        memories_count: usize,
444        duration_ms: u64,
445        #[serde(skip_serializing_if = "Option::is_none")]
446        session_id: Option<String>,
447        timestamp: i64,
448    },
449
450    /// Memory stored
451    #[serde(rename = "data-memory-stored")]
452    MemoryStored {
453        execution_id: String,
454        #[serde(skip_serializing_if = "Option::is_none")]
455        step_id: Option<String>,
456        memory_type: String, // "episodic", "semantic", "working"
457        #[serde(skip_serializing_if = "Option::is_none")]
458        session_id: Option<String>,
459        timestamp: i64,
460    },
461
462    // =========================================================================
463    // Custom Enact Events - Guardrail/Safety
464    // =========================================================================
465    /// Guardrail evaluated
466    #[serde(rename = "data-guardrail-evaluated")]
467    GuardrailEvaluated {
468        execution_id: String,
469        #[serde(skip_serializing_if = "Option::is_none")]
470        step_id: Option<String>,
471        guardrail_name: String,
472        decision: String, // "pass", "block", "warn"
473        #[serde(skip_serializing_if = "Option::is_none")]
474        reason: Option<String>,
475        #[serde(skip_serializing_if = "Option::is_none")]
476        score: Option<f64>,
477        timestamp: i64,
478    },
479
480    // =========================================================================
481    // Custom Enact Events - Reasoning & Context
482    // =========================================================================
483    /// Reasoning trace captured
484    #[serde(rename = "data-reasoning-trace")]
485    ReasoningTrace {
486        execution_id: String,
487        #[serde(skip_serializing_if = "Option::is_none")]
488        step_id: Option<String>,
489        reasoning_type: String, // "chain_of_thought", "plan", "reflection"
490        content: String,
491        #[serde(skip_serializing_if = "Option::is_none")]
492        truncated: Option<bool>,
493        timestamp: i64,
494    },
495
496    /// Context window snapshot
497    #[serde(rename = "data-context-snapshot")]
498    ContextSnapshot {
499        execution_id: String,
500        #[serde(skip_serializing_if = "Option::is_none")]
501        step_id: Option<String>,
502        message_count: usize,
503        estimated_tokens: u32,
504        #[serde(skip_serializing_if = "Option::is_none")]
505        max_tokens: Option<u32>,
506        utilization_pct: f64,
507        timestamp: i64,
508    },
509
510    // =========================================================================
511    // Custom Enact Events - Feedback
512    // =========================================================================
513    /// Feedback received
514    #[serde(rename = "data-feedback-received")]
515    FeedbackReceived {
516        execution_id: String,
517        #[serde(skip_serializing_if = "Option::is_none")]
518        step_id: Option<String>,
519        feedback_type: String, // "thumbs_up", "thumbs_down", "rating", "correction"
520        #[serde(skip_serializing_if = "Option::is_none")]
521        score: Option<f64>,
522        #[serde(skip_serializing_if = "Option::is_none")]
523        comment: Option<String>,
524        #[serde(skip_serializing_if = "Option::is_none")]
525        user_id: Option<String>,
526        timestamp: i64,
527    },
528}
529
530impl StreamEvent {
531    /// Get current timestamp in milliseconds
532    fn now() -> i64 {
533        std::time::SystemTime::now()
534            .duration_since(std::time::UNIX_EPOCH)
535            .unwrap_or_default()
536            .as_millis() as i64
537    }
538
539    /// Generate a unique ID for text/step tracking
540    fn generate_id() -> String {
541        use std::time::{SystemTime, UNIX_EPOCH};
542        let nanos = SystemTime::now()
543            .duration_since(UNIX_EPOCH)
544            .unwrap()
545            .as_nanos();
546        format!("id_{:x}", nanos)
547    }
548
549    // =========================================================================
550    // Standard AI SDK Event Factories
551    // =========================================================================
552
553    /// Create a text-start event
554    pub fn text_start(execution_id: Option<&ExecutionId>) -> Self {
555        Self::TextStart {
556            id: Self::generate_id(),
557            execution_id: execution_id.map(|e| e.as_str().to_string()),
558        }
559    }
560
561    /// Create a text-delta event
562    pub fn text_delta(id: impl Into<String>, delta: impl Into<String>) -> Self {
563        Self::TextDelta {
564            id: id.into(),
565            delta: delta.into(),
566        }
567    }
568
569    /// Create a text-end event
570    pub fn text_end(id: impl Into<String>) -> Self {
571        Self::TextEnd { id: id.into() }
572    }
573
574    /// Create a start event
575    pub fn start(message_id: impl Into<String>, execution_id: Option<&ExecutionId>) -> Self {
576        Self::Start {
577            message_id: message_id.into(),
578            execution_id: execution_id.map(|e| e.as_str().to_string()),
579        }
580    }
581
582    /// Create a finish event
583    pub fn finish(message_id: impl Into<String>, final_output: Option<String>) -> Self {
584        Self::Finish {
585            message_id: message_id.into(),
586            final_output,
587        }
588    }
589
590    /// Create an error event
591    pub fn error(error: ExecutionError) -> Self {
592        Self::Error { error }
593    }
594
595    /// Create a start-step event (AI SDK)
596    pub fn start_step(step_id: Option<&StepId>) -> Self {
597        Self::StartStep {
598            step_id: step_id.map(|s| s.as_str().to_string()),
599        }
600    }
601
602    /// Create a finish-step event (AI SDK)
603    pub fn finish_step(step_id: Option<&StepId>) -> Self {
604        Self::FinishStep {
605            step_id: step_id.map(|s| s.as_str().to_string()),
606        }
607    }
608
609    // =========================================================================
610    // Tool Event Factories
611    // =========================================================================
612
613    /// Create a tool-input-start event
614    pub fn tool_input_start(tool_call_id: impl Into<String>, tool_name: impl Into<String>) -> Self {
615        Self::ToolInputStart {
616            tool_call_id: tool_call_id.into(),
617            tool_name: tool_name.into(),
618        }
619    }
620
621    /// Create a tool-input-available event
622    pub fn tool_input_available(
623        tool_call_id: impl Into<String>,
624        tool_name: impl Into<String>,
625        input: serde_json::Value,
626    ) -> Self {
627        Self::ToolInputAvailable {
628            tool_call_id: tool_call_id.into(),
629            tool_name: tool_name.into(),
630            input,
631        }
632    }
633
634    /// Create a tool-output-available event
635    pub fn tool_output_available(
636        tool_call_id: impl Into<String>,
637        output: serde_json::Value,
638    ) -> Self {
639        Self::ToolOutputAvailable {
640            tool_call_id: tool_call_id.into(),
641            output,
642        }
643    }
644
645    /// Create a permission-request event
646    pub fn permission_request(
647        execution_id: &ExecutionId,
648        tool_name: impl Into<String>,
649        arguments: serde_json::Value,
650        policy: impl Into<String>,
651    ) -> Self {
652        Self::PermissionRequest {
653            execution_id: execution_id.as_str().to_string(),
654            tool_name: tool_name.into(),
655            arguments,
656            policy: policy.into(),
657            timestamp: Self::now(),
658        }
659    }
660
661    // =========================================================================
662    // Execution Event Factories
663    // =========================================================================
664
665    /// Create an execution-start event
666    pub fn execution_start(execution_id: &ExecutionId) -> Self {
667        Self::ExecutionStart {
668            execution_id: execution_id.as_str().to_string(),
669            parent_id: None,
670            parent_type: None,
671            timestamp: Self::now(),
672        }
673    }
674
675    /// Create an execution-start event with parent
676    pub fn execution_start_with_parent(
677        execution_id: &ExecutionId,
678        parent_id: impl Into<String>,
679        parent_type: impl Into<String>,
680    ) -> Self {
681        Self::ExecutionStart {
682            execution_id: execution_id.as_str().to_string(),
683            parent_id: Some(parent_id.into()),
684            parent_type: Some(parent_type.into()),
685            timestamp: Self::now(),
686        }
687    }
688
689    /// Create an execution-end event
690    pub fn execution_end(
691        execution_id: &ExecutionId,
692        final_output: Option<String>,
693        duration_ms: u64,
694    ) -> Self {
695        Self::ExecutionEnd {
696            execution_id: execution_id.as_str().to_string(),
697            final_output,
698            duration_ms,
699            timestamp: Self::now(),
700        }
701    }
702
703    /// Create an execution-failed event
704    pub fn execution_failed(execution_id: &ExecutionId, error: ExecutionError) -> Self {
705        Self::ExecutionFailed {
706            execution_id: execution_id.as_str().to_string(),
707            error,
708            timestamp: Self::now(),
709        }
710    }
711
712    /// Create an execution-paused event
713    pub fn execution_paused(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
714        Self::ExecutionPaused {
715            execution_id: execution_id.as_str().to_string(),
716            reason: reason.into(),
717            timestamp: Self::now(),
718        }
719    }
720
721    /// Create an execution-resumed event
722    pub fn execution_resumed(execution_id: &ExecutionId) -> Self {
723        Self::ExecutionResumed {
724            execution_id: execution_id.as_str().to_string(),
725            timestamp: Self::now(),
726        }
727    }
728
729    /// Create an execution-cancelled event
730    pub fn execution_cancelled(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
731        Self::ExecutionCancelled {
732            execution_id: execution_id.as_str().to_string(),
733            reason: reason.into(),
734            timestamp: Self::now(),
735        }
736    }
737
738    // =========================================================================
739    // Step Event Factories
740    // =========================================================================
741
742    /// Create a step-start event
743    pub fn step_start(
744        execution_id: &ExecutionId,
745        step_id: &StepId,
746        step_type: StepType,
747        name: impl Into<String>,
748    ) -> Self {
749        Self::StepStart {
750            execution_id: execution_id.as_str().to_string(),
751            step_id: step_id.as_str().to_string(),
752            step_type: step_type.to_string(),
753            name: name.into(),
754            timestamp: Self::now(),
755        }
756    }
757
758    /// Create a step-end event
759    pub fn step_end(
760        execution_id: &ExecutionId,
761        step_id: &StepId,
762        output: Option<String>,
763        duration_ms: u64,
764    ) -> Self {
765        Self::StepEnd {
766            execution_id: execution_id.as_str().to_string(),
767            step_id: step_id.as_str().to_string(),
768            output,
769            duration_ms,
770            timestamp: Self::now(),
771        }
772    }
773
774    /// Create a step-failed event
775    pub fn step_failed(
776        execution_id: &ExecutionId,
777        step_id: &StepId,
778        error: ExecutionError,
779    ) -> Self {
780        Self::StepFailed {
781            execution_id: execution_id.as_str().to_string(),
782            step_id: step_id.as_str().to_string(),
783            error,
784            timestamp: Self::now(),
785        }
786    }
787
788    /// Create a step-discovered event
789    ///
790    /// Emitted when a new step is dynamically discovered during agentic execution.
791    /// This enables full audit trails for discovered steps (doc-30 Resource Discovery).
792    pub fn step_discovered(
793        execution_id: &ExecutionId,
794        step_id: &StepId,
795        discovered_by: Option<&StepId>,
796        source_type: StepSourceType,
797        reason: impl Into<String>,
798        depth: u32,
799    ) -> Self {
800        Self::StepDiscovered {
801            execution_id: execution_id.as_str().to_string(),
802            step_id: step_id.as_str().to_string(),
803            discovered_by: discovered_by.map(|s| s.as_str().to_string()),
804            source_type: format!("{:?}", source_type).to_lowercase(),
805            reason: reason.into(),
806            depth,
807            timestamp: Self::now(),
808        }
809    }
810
811    // =========================================================================
812    // Artifact Event Factories
813    // =========================================================================
814
815    /// Create an artifact-created event
816    pub fn artifact_created(
817        execution_id: &ExecutionId,
818        step_id: &StepId,
819        artifact_id: &ArtifactId,
820        artifact_type: impl Into<String>,
821    ) -> Self {
822        Self::ArtifactCreated {
823            execution_id: execution_id.as_str().to_string(),
824            step_id: step_id.as_str().to_string(),
825            artifact_id: artifact_id.as_str().to_string(),
826            artifact_type: artifact_type.into(),
827            timestamp: Self::now(),
828        }
829    }
830
831    // =========================================================================
832    // Inbox Event Factories (INV-INBOX-003: Audit Trail)
833    // =========================================================================
834
835    /// Create an inbox-message event for audit trail
836    ///
837    /// ## Invariant INV-INBOX-003
838    /// All inbox messages MUST emit events for audit trail.
839    pub fn inbox_message(
840        execution_id: &ExecutionId,
841        message_id: &str,
842        message_type: crate::inbox::InboxMessageType,
843    ) -> Self {
844        Self::InboxMessage {
845            execution_id: execution_id.as_str().to_string(),
846            message_id: message_id.to_string(),
847            message_type: format!("{:?}", message_type).to_lowercase(),
848            timestamp: Self::now(),
849        }
850    }
851
852    // =========================================================================
853    // Policy Decision Event Factories (Audit Trail)
854    // =========================================================================
855
856    /// Create a policy-decision event for audit trail
857    ///
858    /// Records tool policy evaluation decisions for compliance and audit.
859    pub fn policy_decision_allow(
860        execution_id: &ExecutionId,
861        step_id: Option<&StepId>,
862        tool_name: impl Into<String>,
863    ) -> Self {
864        Self::PolicyDecision {
865            execution_id: execution_id.as_str().to_string(),
866            step_id: step_id.map(|s| s.as_str().to_string()),
867            tool_name: tool_name.into(),
868            decision: "allow".to_string(),
869            reason: None,
870            timestamp: Self::now(),
871        }
872    }
873
874    /// Create a policy-decision deny event
875    pub fn policy_decision_deny(
876        execution_id: &ExecutionId,
877        step_id: Option<&StepId>,
878        tool_name: impl Into<String>,
879        reason: impl Into<String>,
880    ) -> Self {
881        Self::PolicyDecision {
882            execution_id: execution_id.as_str().to_string(),
883            step_id: step_id.map(|s| s.as_str().to_string()),
884            tool_name: tool_name.into(),
885            decision: "deny".to_string(),
886            reason: Some(reason.into()),
887            timestamp: Self::now(),
888        }
889    }
890
891    /// Create a policy-decision warn event
892    pub fn policy_decision_warn(
893        execution_id: &ExecutionId,
894        step_id: Option<&StepId>,
895        tool_name: impl Into<String>,
896        message: impl Into<String>,
897    ) -> Self {
898        Self::PolicyDecision {
899            execution_id: execution_id.as_str().to_string(),
900            step_id: step_id.map(|s| s.as_str().to_string()),
901            tool_name: tool_name.into(),
902            decision: "warn".to_string(),
903            reason: Some(message.into()),
904            timestamp: Self::now(),
905        }
906    }
907
908    // =========================================================================
909    // LLM Call Observability Event Factories
910    // =========================================================================
911
912    /// Create an llm-call-start event
913    pub fn llm_call_start(
914        execution_id: &ExecutionId,
915        step_id: Option<&StepId>,
916        callable_name: impl Into<String>,
917        model: Option<String>,
918        history_length: usize,
919    ) -> Self {
920        Self::LlmCallStart {
921            execution_id: execution_id.as_str().to_string(),
922            step_id: step_id.map(|s| s.as_str().to_string()),
923            callable_name: callable_name.into(),
924            model,
925            history_length,
926            timestamp: Self::now(),
927        }
928    }
929
930    /// Create an llm-call-end event
931    pub fn llm_call_end(
932        execution_id: &ExecutionId,
933        step_id: Option<&StepId>,
934        duration_ms: u64,
935        prompt_tokens: Option<u32>,
936        completion_tokens: Option<u32>,
937        total_tokens: Option<u32>,
938    ) -> Self {
939        Self::LlmCallEnd {
940            execution_id: execution_id.as_str().to_string(),
941            step_id: step_id.map(|s| s.as_str().to_string()),
942            duration_ms,
943            prompt_tokens,
944            completion_tokens,
945            total_tokens,
946            timestamp: Self::now(),
947        }
948    }
949
950    /// Create an llm-call-failed event
951    pub fn llm_call_failed(
952        execution_id: &ExecutionId,
953        step_id: Option<&StepId>,
954        error: impl Into<String>,
955        duration_ms: Option<u64>,
956    ) -> Self {
957        Self::LlmCallFailed {
958            execution_id: execution_id.as_str().to_string(),
959            step_id: step_id.map(|s| s.as_str().to_string()),
960            error: error.into(),
961            duration_ms,
962            timestamp: Self::now(),
963        }
964    }
965
966    // =========================================================================
967    // Token & Cost Accounting Event Factories
968    // =========================================================================
969
970    /// Create a token-usage-recorded event
971    pub fn token_usage_recorded(
972        execution_id: &ExecutionId,
973        step_id: Option<&StepId>,
974        prompt_tokens: u32,
975        completion_tokens: u32,
976        cumulative_tokens: u64,
977        cost_usd: Option<f64>,
978        cumulative_cost_usd: Option<f64>,
979    ) -> Self {
980        Self::TokenUsageRecorded {
981            execution_id: execution_id.as_str().to_string(),
982            step_id: step_id.map(|s| s.as_str().to_string()),
983            prompt_tokens,
984            completion_tokens,
985            total_tokens: prompt_tokens + completion_tokens,
986            cumulative_tokens,
987            cost_usd,
988            cumulative_cost_usd,
989            timestamp: Self::now(),
990        }
991    }
992
993    // =========================================================================
994    // Memory Access Event Factories
995    // =========================================================================
996
997    /// Create a memory-recalled event
998    pub fn memory_recalled(
999        execution_id: &ExecutionId,
1000        step_id: Option<&StepId>,
1001        query: impl Into<String>,
1002        memories_count: usize,
1003        duration_ms: u64,
1004        session_id: Option<String>,
1005    ) -> Self {
1006        Self::MemoryRecalled {
1007            execution_id: execution_id.as_str().to_string(),
1008            step_id: step_id.map(|s| s.as_str().to_string()),
1009            query: query.into(),
1010            memories_count,
1011            duration_ms,
1012            session_id,
1013            timestamp: Self::now(),
1014        }
1015    }
1016
1017    /// Create a memory-stored event
1018    pub fn memory_stored(
1019        execution_id: &ExecutionId,
1020        step_id: Option<&StepId>,
1021        memory_type: impl Into<String>,
1022        session_id: Option<String>,
1023    ) -> Self {
1024        Self::MemoryStored {
1025            execution_id: execution_id.as_str().to_string(),
1026            step_id: step_id.map(|s| s.as_str().to_string()),
1027            memory_type: memory_type.into(),
1028            session_id,
1029            timestamp: Self::now(),
1030        }
1031    }
1032
1033    // =========================================================================
1034    // Guardrail/Safety Event Factories
1035    // =========================================================================
1036
1037    /// Create a guardrail-evaluated event
1038    pub fn guardrail_evaluated(
1039        execution_id: &ExecutionId,
1040        step_id: Option<&StepId>,
1041        guardrail_name: impl Into<String>,
1042        decision: impl Into<String>,
1043        reason: Option<String>,
1044        score: Option<f64>,
1045    ) -> Self {
1046        Self::GuardrailEvaluated {
1047            execution_id: execution_id.as_str().to_string(),
1048            step_id: step_id.map(|s| s.as_str().to_string()),
1049            guardrail_name: guardrail_name.into(),
1050            decision: decision.into(),
1051            reason,
1052            score,
1053            timestamp: Self::now(),
1054        }
1055    }
1056
1057    // =========================================================================
1058    // Reasoning & Context Event Factories
1059    // =========================================================================
1060
1061    /// Create a reasoning-trace event
1062    pub fn reasoning_trace(
1063        execution_id: &ExecutionId,
1064        step_id: Option<&StepId>,
1065        reasoning_type: impl Into<String>,
1066        content: impl Into<String>,
1067        truncated: Option<bool>,
1068    ) -> Self {
1069        Self::ReasoningTrace {
1070            execution_id: execution_id.as_str().to_string(),
1071            step_id: step_id.map(|s| s.as_str().to_string()),
1072            reasoning_type: reasoning_type.into(),
1073            content: content.into(),
1074            truncated,
1075            timestamp: Self::now(),
1076        }
1077    }
1078
1079    /// Create a context-window-snapshot event
1080    pub fn context_window_snapshot(
1081        execution_id: &ExecutionId,
1082        step_id: Option<&StepId>,
1083        message_count: usize,
1084        estimated_tokens: u32,
1085        max_tokens: Option<u32>,
1086        utilization_pct: f64,
1087    ) -> Self {
1088        Self::ContextSnapshot {
1089            execution_id: execution_id.as_str().to_string(),
1090            step_id: step_id.map(|s| s.as_str().to_string()),
1091            message_count,
1092            estimated_tokens,
1093            max_tokens,
1094            utilization_pct,
1095            timestamp: Self::now(),
1096        }
1097    }
1098
1099    // =========================================================================
1100    // Feedback Event Factories
1101    // =========================================================================
1102
1103    /// Create a feedback-received event
1104    pub fn feedback_received(
1105        execution_id: &ExecutionId,
1106        step_id: Option<&StepId>,
1107        feedback_type: impl Into<String>,
1108        score: Option<f64>,
1109        comment: Option<String>,
1110        user_id: Option<String>,
1111    ) -> Self {
1112        Self::FeedbackReceived {
1113            execution_id: execution_id.as_str().to_string(),
1114            step_id: step_id.map(|s| s.as_str().to_string()),
1115            feedback_type: feedback_type.into(),
1116            score,
1117            comment,
1118            user_id,
1119            timestamp: Self::now(),
1120        }
1121    }
1122
1123    // =========================================================================
1124    // Utility Methods
1125    // =========================================================================
1126
1127    /// Check if this is a control event (pause/resume/cancel)
1128    pub fn is_control_event(&self) -> bool {
1129        matches!(
1130            self,
1131            Self::ExecutionPaused { .. }
1132                | Self::ExecutionResumed { .. }
1133                | Self::ExecutionCancelled { .. }
1134        )
1135    }
1136
1137    /// Check if this is a delta/streaming event
1138    pub fn is_delta_event(&self) -> bool {
1139        matches!(self, Self::TextDelta { .. } | Self::ToolInputDelta { .. })
1140    }
1141
1142    /// Check if this is a summary-level event (not a delta)
1143    pub fn is_summary_event(&self) -> bool {
1144        !self.is_delta_event()
1145    }
1146
1147    /// Serialize to SSE format: `data: {...}\n\n`
1148    pub fn to_sse(&self) -> String {
1149        format!(
1150            "data: {}\n\n",
1151            serde_json::to_string(self).unwrap_or_default()
1152        )
1153    }
1154
1155    /// Create a [DONE] termination signal
1156    pub fn done() -> String {
1157        "data: [DONE]\n\n".to_string()
1158    }
1159}
1160
1161// =============================================================================
1162// Event Emitter
1163// =============================================================================
1164
1165/// Event emitter - collects and filters events during execution
1166///
1167/// Supports optional persistence to EventLog (INV-PERSIST-002: Event Immutability).
1168/// When an EventLog is configured, events are persisted before being added to
1169/// the in-memory buffer.
1170#[derive(Clone)]
1171pub struct EventEmitter {
1172    events: std::sync::Arc<std::sync::Mutex<Vec<StreamEvent>>>,
1173    mode: StreamMode,
1174    /// Optional event log for persistence
1175    event_log: Option<Arc<EventLog>>,
1176    /// Execution context for persisted events
1177    execution_context: Option<ExecutionContext>,
1178}
1179
1180impl std::fmt::Debug for EventEmitter {
1181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1182        f.debug_struct("EventEmitter")
1183            .field(
1184                "events_count",
1185                &self.events.lock().map(|e| e.len()).unwrap_or(0),
1186            )
1187            .field("mode", &self.mode)
1188            .field("has_event_log", &self.event_log.is_some())
1189            .finish()
1190    }
1191}
1192
1193impl Default for EventEmitter {
1194    fn default() -> Self {
1195        Self::new()
1196    }
1197}
1198
1199impl EventEmitter {
1200    /// Create a new EventEmitter
1201    pub fn new() -> Self {
1202        Self {
1203            events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1204            mode: StreamMode::Full,
1205            event_log: None,
1206            execution_context: None,
1207        }
1208    }
1209
1210    /// Create a new EventEmitter with a specific mode
1211    pub fn with_mode(mode: StreamMode) -> Self {
1212        Self {
1213            events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1214            mode,
1215            event_log: None,
1216            execution_context: None,
1217        }
1218    }
1219
1220    /// Create an EventEmitter with persistence support
1221    ///
1222    /// Events will be persisted to the EventLog before being added to the buffer.
1223    /// This ensures durability (INV-PERSIST-002).
1224    pub fn with_persistence(event_log: Arc<EventLog>, execution_id: ExecutionId) -> Self {
1225        Self {
1226            events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1227            mode: StreamMode::Full,
1228            event_log: Some(event_log),
1229            execution_context: Some(ExecutionContext::new(execution_id)),
1230        }
1231    }
1232
1233    /// Set the stream mode
1234    pub fn set_mode(&mut self, mode: StreamMode) {
1235        self.mode = mode;
1236    }
1237
1238    /// Enable persistence with an event log
1239    pub fn set_event_log(&mut self, event_log: Arc<EventLog>, execution_id: ExecutionId) {
1240        self.event_log = Some(event_log);
1241        self.execution_context = Some(ExecutionContext::new(execution_id));
1242    }
1243
1244    /// Emit an event (filtered by mode)
1245    ///
1246    /// If persistence is configured, the event is persisted to the EventLog
1247    /// BEFORE being added to the in-memory buffer. This ensures durability.
1248    pub fn emit(&self, event: StreamEvent) {
1249        let should_emit = match self.mode {
1250            StreamMode::Full => true,
1251            StreamMode::Summary => event.is_summary_event(),
1252            StreamMode::ControlOnly => event.is_control_event(),
1253            StreamMode::Silent => false,
1254        };
1255
1256        if should_emit {
1257            // Persist to EventLog if configured (async, best-effort for now)
1258            // In production, this should be awaited before proceeding
1259            if let (Some(event_log), Some(ctx)) = (&self.event_log, &self.execution_context) {
1260                if let Some(exec_event) = self.to_execution_event(&event, ctx) {
1261                    let log = Arc::clone(event_log);
1262                    let evt = exec_event;
1263                    // Spawn persistence task (best-effort for non-async emit)
1264                    tokio::spawn(async move {
1265                        if let Err(e) = log.append(evt).await {
1266                            tracing::warn!("Failed to persist event: {}", e);
1267                        }
1268                    });
1269                }
1270            }
1271
1272            if let Ok(mut events) = self.events.lock() {
1273                events.push(event);
1274            }
1275        }
1276    }
1277
1278    /// Convert StreamEvent to ExecutionEvent for persistence
1279    ///
1280    /// Maps StreamEvent variants to ExecutionEventType.
1281    /// Some events are skipped as they don't need persistence (deltas, intermediates).
1282    fn to_execution_event(
1283        &self,
1284        stream_event: &StreamEvent,
1285        ctx: &ExecutionContext,
1286    ) -> Option<ExecutionEvent> {
1287        let event_type = match stream_event {
1288            // Execution lifecycle events
1289            StreamEvent::ExecutionStart { .. } => ExecutionEventType::ExecutionStart,
1290            StreamEvent::ExecutionEnd { .. } => ExecutionEventType::ExecutionEnd,
1291            StreamEvent::ExecutionFailed { .. } => ExecutionEventType::ExecutionFailed,
1292            StreamEvent::ExecutionPaused { .. } => ExecutionEventType::ControlPause,
1293            StreamEvent::ExecutionResumed { .. } => ExecutionEventType::ControlResume,
1294            StreamEvent::ExecutionCancelled { .. } => ExecutionEventType::ExecutionCancelled,
1295
1296            // Step lifecycle events
1297            StreamEvent::StepStart { .. } => ExecutionEventType::StepStart,
1298            StreamEvent::StepEnd { .. } => ExecutionEventType::StepEnd,
1299            StreamEvent::StepFailed { .. } => ExecutionEventType::StepFailed,
1300            StreamEvent::StepDiscovered { .. } => ExecutionEventType::StepDiscovered,
1301
1302            // Artifact events
1303            StreamEvent::ArtifactCreated { .. } => ExecutionEventType::ArtifactCreated,
1304
1305            // State snapshots
1306            StreamEvent::StateSnapshot { .. } => ExecutionEventType::StateSnapshot,
1307
1308            // Inbox messages (INV-INBOX-003: audit trail)
1309            StreamEvent::InboxMessage { .. } => ExecutionEventType::InboxMessage,
1310
1311            // Policy decisions (audit trail for tool policy evaluation)
1312            StreamEvent::PolicyDecision { .. } => ExecutionEventType::DecisionMade,
1313
1314            StreamEvent::CheckpointSaved { .. } => ExecutionEventType::CheckpointSaved,
1315            StreamEvent::GoalEvaluated { .. } => ExecutionEventType::GoalEvaluated,
1316
1317            // Tool execution (persistence now enabled)
1318            StreamEvent::ToolInputAvailable { .. } => ExecutionEventType::ToolCallStart,
1319            StreamEvent::ToolOutputAvailable { .. } => ExecutionEventType::ToolCallEnd,
1320            StreamEvent::PermissionRequest { .. } => ExecutionEventType::DecisionMade,
1321
1322            // LLM Call Observability
1323            StreamEvent::LlmCallStart { .. } => ExecutionEventType::LlmCallStart,
1324            StreamEvent::LlmCallEnd { .. } => ExecutionEventType::LlmCallEnd,
1325            StreamEvent::LlmCallFailed { .. } => ExecutionEventType::LlmCallFailed,
1326
1327            // Token & Cost Accounting
1328            StreamEvent::TokenUsageRecorded { .. } => ExecutionEventType::TokenUsageRecorded,
1329
1330            // Memory Access
1331            StreamEvent::MemoryRecalled { .. } => ExecutionEventType::MemoryRecalled,
1332            StreamEvent::MemoryStored { .. } => ExecutionEventType::MemoryStored,
1333
1334            // Guardrail/Safety
1335            StreamEvent::GuardrailEvaluated { .. } => ExecutionEventType::GuardrailEvaluated,
1336
1337            // Reasoning & Context
1338            StreamEvent::ReasoningTrace { .. } => ExecutionEventType::ReasoningTrace,
1339            StreamEvent::ContextSnapshot { .. } => ExecutionEventType::ContextSnapshot,
1340
1341            // Feedback
1342            StreamEvent::FeedbackReceived { .. } => ExecutionEventType::FeedbackReceived,
1343
1344            // Skip events that don't need persistence:
1345            // - Text deltas (streaming intermediates)
1346            // - Tool input deltas (streaming intermediates)
1347            // - Generic start/finish (covered by execution lifecycle)
1348            // - Step boundary markers (covered by step lifecycle)
1349            StreamEvent::Start { .. }
1350            | StreamEvent::Finish { .. }
1351            | StreamEvent::Error { .. }
1352            | StreamEvent::StartStep { .. }
1353            | StreamEvent::FinishStep { .. }
1354            | StreamEvent::TextStart { .. }
1355            | StreamEvent::TextDelta { .. }
1356            | StreamEvent::TextEnd { .. }
1357            | StreamEvent::ToolInputStart { .. }
1358            | StreamEvent::ToolInputDelta { .. } => return None,
1359        };
1360
1361        let mut context = ctx.clone();
1362        let mut payload: Option<serde_json::Value> = None;
1363        let mut duration_ms: Option<u64> = None;
1364
1365        match stream_event {
1366            StreamEvent::ExecutionEnd {
1367                final_output,
1368                duration_ms: dur,
1369                ..
1370            } => {
1371                duration_ms = Some(*dur);
1372                if let Some(output) = final_output {
1373                    payload = Some(serde_json::json!({ "output": output }));
1374                }
1375            }
1376            StreamEvent::ExecutionFailed { error, .. } => {
1377                payload = serde_json::to_value(error)
1378                    .ok()
1379                    .map(|err| serde_json::json!({ "error": err }));
1380            }
1381            StreamEvent::ExecutionCancelled { reason, .. }
1382            | StreamEvent::ExecutionPaused { reason, .. } => {
1383                payload = Some(serde_json::json!({ "reason": reason }));
1384            }
1385            StreamEvent::StepStart {
1386                step_id,
1387                step_type,
1388                name,
1389                ..
1390            } => {
1391                context = context.with_step(StepId::from_string(step_id));
1392                payload = Some(serde_json::json!({ "step_type": step_type, "name": name }));
1393            }
1394            StreamEvent::StepEnd {
1395                step_id,
1396                output,
1397                duration_ms: dur,
1398                ..
1399            } => {
1400                context = context.with_step(StepId::from_string(step_id));
1401                duration_ms = Some(*dur);
1402                if let Some(out) = output {
1403                    payload = Some(serde_json::json!({ "output": out }));
1404                }
1405            }
1406            StreamEvent::StepFailed { step_id, error, .. } => {
1407                context = context.with_step(StepId::from_string(step_id));
1408                payload = serde_json::to_value(error)
1409                    .ok()
1410                    .map(|err| serde_json::json!({ "error": err }));
1411            }
1412            StreamEvent::StepDiscovered {
1413                step_id,
1414                discovered_by,
1415                source_type,
1416                reason,
1417                depth,
1418                ..
1419            } => {
1420                context = context.with_step(StepId::from_string(step_id));
1421                payload = Some(serde_json::json!({
1422                    "discovered_by": discovered_by,
1423                    "source_type": source_type,
1424                    "reason": reason,
1425                    "depth": depth,
1426                }));
1427            }
1428            StreamEvent::ArtifactCreated {
1429                step_id,
1430                artifact_id,
1431                artifact_type,
1432                ..
1433            } => {
1434                context = context
1435                    .with_step(StepId::from_string(step_id))
1436                    .with_artifact(ArtifactId::from_string(artifact_id));
1437                payload = Some(serde_json::json!({ "artifact_type": artifact_type }));
1438            }
1439            StreamEvent::StateSnapshot { step_id, state, .. } => {
1440                if let Some(step) = step_id {
1441                    context = context.with_step(StepId::from_string(step));
1442                }
1443                payload = Some(state.clone());
1444            }
1445            StreamEvent::InboxMessage {
1446                message_id,
1447                message_type,
1448                ..
1449            } => {
1450                payload = Some(serde_json::json!({
1451                    "message_id": message_id,
1452                    "message_type": message_type
1453                }));
1454            }
1455            StreamEvent::PolicyDecision {
1456                step_id,
1457                tool_name,
1458                decision,
1459                reason,
1460                ..
1461            } => {
1462                if let Some(step) = step_id {
1463                    context = context.with_step(StepId::from_string(step));
1464                }
1465                payload = Some(serde_json::json!({
1466                    "tool_name": tool_name,
1467                    "decision": decision,
1468                    "reason": reason,
1469                }));
1470            }
1471            StreamEvent::ToolInputAvailable {
1472                tool_call_id,
1473                tool_name,
1474                input,
1475                ..
1476            } => {
1477                payload = Some(serde_json::json!({
1478                    "tool_call_id": tool_call_id,
1479                    "tool_name": tool_name,
1480                    "input": input,
1481                }));
1482            }
1483            StreamEvent::ToolOutputAvailable {
1484                tool_call_id,
1485                output,
1486                ..
1487            } => {
1488                payload = Some(serde_json::json!({
1489                    "tool_call_id": tool_call_id,
1490                    "output": output,
1491                }));
1492            }
1493            StreamEvent::PermissionRequest {
1494                tool_name,
1495                arguments,
1496                policy,
1497                ..
1498            } => {
1499                payload = Some(serde_json::json!({
1500                    "tool_name": tool_name,
1501                    "arguments": arguments,
1502                    "policy": policy,
1503                }));
1504            }
1505            StreamEvent::CheckpointSaved {
1506                checkpoint_id,
1507                step_id,
1508                state_hash,
1509                ..
1510            } => {
1511                if let Some(step) = step_id {
1512                    context = context.with_step(StepId::from_string(step));
1513                }
1514                payload = Some(serde_json::json!({
1515                    "checkpoint_id": checkpoint_id,
1516                    "state_hash": state_hash,
1517                }));
1518            }
1519            StreamEvent::GoalEvaluated {
1520                goal_id,
1521                step_id,
1522                status,
1523                score,
1524                reason,
1525                ..
1526            } => {
1527                if let Some(step) = step_id {
1528                    context = context.with_step(StepId::from_string(step));
1529                }
1530                payload = Some(serde_json::json!({
1531                    "goal_id": goal_id,
1532                    "status": status,
1533                    "score": score,
1534                    "reason": reason,
1535                }));
1536            }
1537            StreamEvent::ExecutionStart { .. } => {}
1538
1539            // LLM Call Observability payloads
1540            StreamEvent::LlmCallStart {
1541                step_id,
1542                callable_name,
1543                model,
1544                history_length,
1545                ..
1546            } => {
1547                if let Some(step) = step_id {
1548                    context = context.with_step(StepId::from_string(step));
1549                }
1550                payload = Some(serde_json::json!({
1551                    "callable_name": callable_name,
1552                    "model": model,
1553                    "history_length": history_length,
1554                }));
1555            }
1556            StreamEvent::LlmCallEnd {
1557                step_id,
1558                duration_ms: dur,
1559                prompt_tokens,
1560                completion_tokens,
1561                total_tokens,
1562                ..
1563            } => {
1564                if let Some(step) = step_id {
1565                    context = context.with_step(StepId::from_string(step));
1566                }
1567                duration_ms = Some(*dur);
1568                payload = Some(serde_json::json!({
1569                    "prompt_tokens": prompt_tokens,
1570                    "completion_tokens": completion_tokens,
1571                    "total_tokens": total_tokens,
1572                }));
1573            }
1574            StreamEvent::LlmCallFailed {
1575                step_id,
1576                error,
1577                duration_ms: dur,
1578                ..
1579            } => {
1580                if let Some(step) = step_id {
1581                    context = context.with_step(StepId::from_string(step));
1582                }
1583                duration_ms = *dur;
1584                payload = Some(serde_json::json!({
1585                    "error": error,
1586                }));
1587            }
1588
1589            // Token & Cost Accounting payloads
1590            StreamEvent::TokenUsageRecorded {
1591                step_id,
1592                prompt_tokens,
1593                completion_tokens,
1594                total_tokens,
1595                cumulative_tokens,
1596                cost_usd,
1597                cumulative_cost_usd,
1598                ..
1599            } => {
1600                if let Some(step) = step_id {
1601                    context = context.with_step(StepId::from_string(step));
1602                }
1603                payload = Some(serde_json::json!({
1604                    "prompt_tokens": prompt_tokens,
1605                    "completion_tokens": completion_tokens,
1606                    "total_tokens": total_tokens,
1607                    "cumulative_tokens": cumulative_tokens,
1608                    "cost_usd": cost_usd,
1609                    "cumulative_cost_usd": cumulative_cost_usd,
1610                }));
1611            }
1612
1613            // Memory Access payloads
1614            StreamEvent::MemoryRecalled {
1615                step_id,
1616                query,
1617                memories_count,
1618                duration_ms: dur,
1619                session_id,
1620                ..
1621            } => {
1622                if let Some(step) = step_id {
1623                    context = context.with_step(StepId::from_string(step));
1624                }
1625                duration_ms = Some(*dur);
1626                payload = Some(serde_json::json!({
1627                    "query": query,
1628                    "memories_count": memories_count,
1629                    "session_id": session_id,
1630                }));
1631            }
1632            StreamEvent::MemoryStored {
1633                step_id,
1634                memory_type,
1635                session_id,
1636                ..
1637            } => {
1638                if let Some(step) = step_id {
1639                    context = context.with_step(StepId::from_string(step));
1640                }
1641                payload = Some(serde_json::json!({
1642                    "memory_type": memory_type,
1643                    "session_id": session_id,
1644                }));
1645            }
1646
1647            // Guardrail/Safety payloads
1648            StreamEvent::GuardrailEvaluated {
1649                step_id,
1650                guardrail_name,
1651                decision,
1652                reason,
1653                score,
1654                ..
1655            } => {
1656                if let Some(step) = step_id {
1657                    context = context.with_step(StepId::from_string(step));
1658                }
1659                payload = Some(serde_json::json!({
1660                    "guardrail_name": guardrail_name,
1661                    "decision": decision,
1662                    "reason": reason,
1663                    "score": score,
1664                }));
1665            }
1666
1667            // Reasoning & Context payloads
1668            StreamEvent::ReasoningTrace {
1669                step_id,
1670                reasoning_type,
1671                content,
1672                truncated,
1673                ..
1674            } => {
1675                if let Some(step) = step_id {
1676                    context = context.with_step(StepId::from_string(step));
1677                }
1678                payload = Some(serde_json::json!({
1679                    "reasoning_type": reasoning_type,
1680                    "content": content,
1681                    "truncated": truncated,
1682                }));
1683            }
1684            StreamEvent::ContextSnapshot {
1685                step_id,
1686                message_count,
1687                estimated_tokens,
1688                max_tokens,
1689                utilization_pct,
1690                ..
1691            } => {
1692                if let Some(step) = step_id {
1693                    context = context.with_step(StepId::from_string(step));
1694                }
1695                payload = Some(serde_json::json!({
1696                    "message_count": message_count,
1697                    "estimated_tokens": estimated_tokens,
1698                    "max_tokens": max_tokens,
1699                    "utilization_pct": utilization_pct,
1700                }));
1701            }
1702
1703            // Feedback payloads
1704            StreamEvent::FeedbackReceived {
1705                step_id,
1706                feedback_type,
1707                score,
1708                comment,
1709                user_id,
1710                ..
1711            } => {
1712                if let Some(step) = step_id {
1713                    context = context.with_step(StepId::from_string(step));
1714                }
1715                payload = Some(serde_json::json!({
1716                    "feedback_type": feedback_type,
1717                    "score": score,
1718                    "comment": comment,
1719                    "user_id": user_id,
1720                }));
1721            }
1722
1723            _ => {}
1724        }
1725
1726        let mut event = ExecutionEvent::new(event_type, context);
1727        if let Some(ms) = duration_ms {
1728            event.duration_ms = Some(ms);
1729        }
1730        if let Some(data) = payload {
1731            event = event.with_payload(data);
1732        }
1733
1734        Some(event)
1735    }
1736
1737    /// Emit an event unconditionally (ignores mode)
1738    pub fn emit_force(&self, event: StreamEvent) {
1739        if let Ok(mut events) = self.events.lock() {
1740            events.push(event);
1741        }
1742    }
1743
1744    /// Get all collected events
1745    pub fn drain(&self) -> Vec<StreamEvent> {
1746        if let Ok(mut events) = self.events.lock() {
1747            std::mem::take(&mut *events)
1748        } else {
1749            vec![]
1750        }
1751    }
1752
1753    /// Get the current stream mode
1754    pub fn mode(&self) -> StreamMode {
1755        self.mode
1756    }
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761    use super::*;
1762    use crate::kernel::{ExecutionId, StepId, StepType};
1763
1764    // ============ StreamMode Tests ============
1765
1766    #[test]
1767    fn test_stream_mode_default() {
1768        assert_eq!(StreamMode::default(), StreamMode::Full);
1769    }
1770
1771    #[test]
1772    fn test_stream_mode_variants() {
1773        let modes = [
1774            StreamMode::Full,
1775            StreamMode::Summary,
1776            StreamMode::ControlOnly,
1777            StreamMode::Silent,
1778        ];
1779        assert_eq!(modes.len(), 4);
1780    }
1781
1782    // ============ StreamEvent Utility Tests ============
1783
1784    #[test]
1785    fn test_stream_event_is_control_event() {
1786        let exec_id = ExecutionId::new();
1787
1788        // Control events
1789        assert!(StreamEvent::execution_paused(&exec_id, "paused").is_control_event());
1790        assert!(StreamEvent::execution_resumed(&exec_id).is_control_event());
1791        assert!(StreamEvent::execution_cancelled(&exec_id, "cancelled").is_control_event());
1792
1793        // Non-control events
1794        assert!(!StreamEvent::execution_start(&exec_id).is_control_event());
1795        assert!(!StreamEvent::text_start(None).is_control_event());
1796    }
1797
1798    #[test]
1799    fn test_stream_event_is_delta_event() {
1800        // Delta events
1801        let delta = StreamEvent::text_delta("id1", "chunk");
1802        assert!(delta.is_delta_event());
1803
1804        // Non-delta events
1805        let start = StreamEvent::text_start(None);
1806        assert!(!start.is_delta_event());
1807    }
1808
1809    #[test]
1810    fn test_stream_event_is_summary_event() {
1811        // Summary events (non-delta)
1812        let start = StreamEvent::text_start(None);
1813        assert!(start.is_summary_event());
1814
1815        // Delta events are not summary
1816        let delta = StreamEvent::text_delta("id1", "chunk");
1817        assert!(!delta.is_summary_event());
1818    }
1819
1820    #[test]
1821    fn test_stream_event_to_sse() {
1822        let event = StreamEvent::text_end("test-id");
1823        let sse = event.to_sse();
1824
1825        assert!(sse.starts_with("data: "));
1826        assert!(sse.ends_with("\n\n"));
1827        assert!(sse.contains("test-id"));
1828    }
1829
1830    #[test]
1831    fn test_stream_event_done() {
1832        let done = StreamEvent::done();
1833        assert_eq!(done, "data: [DONE]\n\n");
1834    }
1835
1836    // ============ StreamEvent Factory Tests ============
1837
1838    #[test]
1839    fn test_stream_event_text_factories() {
1840        let exec_id = ExecutionId::new();
1841
1842        let start = StreamEvent::text_start(Some(&exec_id));
1843        assert!(matches!(start, StreamEvent::TextStart { .. }));
1844
1845        let delta = StreamEvent::text_delta("id1", "hello");
1846        assert!(matches!(delta, StreamEvent::TextDelta { delta, .. } if delta == "hello"));
1847
1848        let end = StreamEvent::text_end("id1");
1849        assert!(matches!(end, StreamEvent::TextEnd { .. }));
1850    }
1851
1852    #[test]
1853    fn test_stream_event_execution_factories() {
1854        let exec_id = ExecutionId::new();
1855
1856        let start = StreamEvent::execution_start(&exec_id);
1857        assert!(matches!(start, StreamEvent::ExecutionStart { .. }));
1858
1859        let end = StreamEvent::execution_end(&exec_id, Some("output".to_string()), 100);
1860        assert!(matches!(
1861            end,
1862            StreamEvent::ExecutionEnd {
1863                duration_ms: 100,
1864                ..
1865            }
1866        ));
1867
1868        use crate::kernel::ExecutionError;
1869        let error = ExecutionError::kernel_internal("error message");
1870        let failed = StreamEvent::execution_failed(&exec_id, error);
1871        assert!(matches!(failed, StreamEvent::ExecutionFailed { .. }));
1872
1873        let paused = StreamEvent::execution_paused(&exec_id, "reason");
1874        assert!(matches!(paused, StreamEvent::ExecutionPaused { .. }));
1875
1876        let resumed = StreamEvent::execution_resumed(&exec_id);
1877        assert!(matches!(resumed, StreamEvent::ExecutionResumed { .. }));
1878
1879        let cancelled = StreamEvent::execution_cancelled(&exec_id, "cancel reason");
1880        assert!(matches!(cancelled, StreamEvent::ExecutionCancelled { .. }));
1881    }
1882
1883    #[test]
1884    fn test_stream_event_step_factories() {
1885        let exec_id = ExecutionId::new();
1886        let step_id = StepId::new();
1887
1888        let start =
1889            StreamEvent::step_start(&exec_id, &step_id, StepType::FunctionNode, "test_step");
1890        assert!(matches!(start, StreamEvent::StepStart { .. }));
1891
1892        let end = StreamEvent::step_end(&exec_id, &step_id, Some("output".to_string()), 50);
1893        assert!(matches!(
1894            end,
1895            StreamEvent::StepEnd {
1896                duration_ms: 50,
1897                ..
1898            }
1899        ));
1900
1901        use crate::kernel::ExecutionError;
1902        let error = ExecutionError::kernel_internal("step error");
1903        let failed = StreamEvent::step_failed(&exec_id, &step_id, error);
1904        assert!(matches!(failed, StreamEvent::StepFailed { .. }));
1905    }
1906
1907    #[test]
1908    fn test_stream_event_tool_factories() {
1909        let input_start = StreamEvent::tool_input_start("call-123", "web_search");
1910        assert!(
1911            matches!(input_start, StreamEvent::ToolInputStart { tool_name, .. } if tool_name == "web_search")
1912        );
1913
1914        let input_avail = StreamEvent::tool_input_available(
1915            "call-123",
1916            "web_search",
1917            serde_json::json!({"q": "test"}),
1918        );
1919        assert!(matches!(
1920            input_avail,
1921            StreamEvent::ToolInputAvailable { .. }
1922        ));
1923
1924        let output_avail =
1925            StreamEvent::tool_output_available("call-123", serde_json::json!({"result": "ok"}));
1926        assert!(matches!(
1927            output_avail,
1928            StreamEvent::ToolOutputAvailable { .. }
1929        ));
1930    }
1931
1932    // ============ EventEmitter Tests ============
1933
1934    #[test]
1935    fn test_event_emitter_new() {
1936        let emitter = EventEmitter::new();
1937        assert_eq!(emitter.mode(), StreamMode::Full);
1938    }
1939
1940    #[test]
1941    fn test_event_emitter_with_mode() {
1942        let emitter = EventEmitter::with_mode(StreamMode::Summary);
1943        assert_eq!(emitter.mode(), StreamMode::Summary);
1944    }
1945
1946    #[test]
1947    fn test_event_emitter_set_mode() {
1948        let mut emitter = EventEmitter::new();
1949        emitter.set_mode(StreamMode::Silent);
1950        assert_eq!(emitter.mode(), StreamMode::Silent);
1951    }
1952
1953    #[test]
1954    fn test_event_emitter_emit_and_drain() {
1955        let emitter = EventEmitter::new();
1956        let exec_id = ExecutionId::new();
1957
1958        emitter.emit(StreamEvent::execution_start(&exec_id));
1959        emitter.emit(StreamEvent::execution_end(&exec_id, None, 100));
1960
1961        let events = emitter.drain();
1962        assert_eq!(events.len(), 2);
1963
1964        // Drain should clear events
1965        let events_after = emitter.drain();
1966        assert!(events_after.is_empty());
1967    }
1968
1969    #[test]
1970    fn test_event_emitter_mode_full() {
1971        let emitter = EventEmitter::with_mode(StreamMode::Full);
1972
1973        // All events should be emitted
1974        emitter.emit(StreamEvent::text_delta("id", "chunk"));
1975        emitter.emit(StreamEvent::text_end("id"));
1976
1977        let events = emitter.drain();
1978        assert_eq!(events.len(), 2);
1979    }
1980
1981    #[test]
1982    fn test_event_emitter_mode_summary() {
1983        let emitter = EventEmitter::with_mode(StreamMode::Summary);
1984
1985        // Delta events should be filtered out
1986        emitter.emit(StreamEvent::text_delta("id", "chunk"));
1987        emitter.emit(StreamEvent::text_end("id"));
1988
1989        let events = emitter.drain();
1990        assert_eq!(events.len(), 1); // Only text_end (summary event)
1991    }
1992
1993    #[test]
1994    fn test_event_emitter_mode_control_only() {
1995        let emitter = EventEmitter::with_mode(StreamMode::ControlOnly);
1996        let exec_id = ExecutionId::new();
1997
1998        // Non-control events should be filtered out
1999        emitter.emit(StreamEvent::execution_start(&exec_id));
2000        emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
2001
2002        let events = emitter.drain();
2003        assert_eq!(events.len(), 1); // Only paused (control event)
2004    }
2005
2006    #[test]
2007    fn test_event_emitter_mode_silent() {
2008        let emitter = EventEmitter::with_mode(StreamMode::Silent);
2009        let exec_id = ExecutionId::new();
2010
2011        // All events should be filtered out
2012        emitter.emit(StreamEvent::execution_start(&exec_id));
2013        emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
2014
2015        let events = emitter.drain();
2016        assert!(events.is_empty());
2017    }
2018
2019    #[test]
2020    fn test_event_emitter_emit_force() {
2021        let emitter = EventEmitter::with_mode(StreamMode::Silent);
2022        let exec_id = ExecutionId::new();
2023
2024        // Force emit should bypass mode filter
2025        emitter.emit_force(StreamEvent::execution_start(&exec_id));
2026
2027        let events = emitter.drain();
2028        assert_eq!(events.len(), 1);
2029    }
2030
2031    #[test]
2032    fn test_event_emitter_serialization() {
2033        use crate::kernel::ExecutionError;
2034        let error = ExecutionError::kernel_internal("Test error").with_code("ERR_CODE".to_string());
2035        let event = StreamEvent::error(error);
2036        let json = serde_json::to_string(&event).unwrap();
2037
2038        assert!(json.contains("data-error"));
2039        assert!(json.contains("Test error"));
2040        assert!(json.contains("ERR_CODE"));
2041    }
2042}