1use super::event_logger::EventLog;
47use crate::kernel::{ArtifactId, ExecutionError, ExecutionId, StepId, StepSourceType, StepType};
48use crate::kernel::{ExecutionContext, ExecutionEvent, ExecutionEventType};
49use futures::Stream;
50use serde::{Deserialize, Serialize};
51use std::pin::Pin;
52use std::sync::Arc;
53
54pub type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
63#[serde(rename_all = "snake_case")]
64pub enum StreamMode {
65 #[default]
67 Full,
68 Summary,
70 ControlOnly,
72 Silent,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
85#[serde(tag = "type")]
86pub enum StreamEvent {
87 #[serde(rename = "data-text-start")]
92 TextStart {
93 id: String,
94 #[serde(skip_serializing_if = "Option::is_none")]
95 execution_id: Option<String>,
96 },
97
98 #[serde(rename = "data-text-delta")]
100 TextDelta { id: String, delta: String },
101
102 #[serde(rename = "data-text-end")]
104 TextEnd { id: String },
105
106 #[serde(rename = "data-start-step")]
111 StartStep {
112 #[serde(skip_serializing_if = "Option::is_none")]
113 step_id: Option<String>,
114 },
115
116 #[serde(rename = "data-finish-step")]
118 FinishStep {
119 #[serde(skip_serializing_if = "Option::is_none")]
120 step_id: Option<String>,
121 },
122
123 #[serde(rename = "data-tool-input-start")]
128 ToolInputStart {
129 tool_call_id: String,
130 tool_name: String,
131 },
132
133 #[serde(rename = "data-tool-input-delta")]
135 ToolInputDelta {
136 tool_call_id: String,
137 input_text_delta: String,
138 },
139
140 #[serde(rename = "data-tool-input-available")]
142 ToolInputAvailable {
143 tool_call_id: String,
144 tool_name: String,
145 input: serde_json::Value,
146 },
147
148 #[serde(rename = "data-tool-output-available")]
150 ToolOutputAvailable {
151 tool_call_id: String,
152 output: serde_json::Value,
153 },
154
155 #[serde(rename = "data-permission-request")]
157 PermissionRequest {
158 execution_id: String,
159 tool_name: String,
160 arguments: serde_json::Value,
161 policy: String,
162 timestamp: i64,
163 },
164
165 #[serde(rename = "data-start")]
170 Start {
171 message_id: String,
172 #[serde(skip_serializing_if = "Option::is_none")]
173 execution_id: Option<String>,
174 },
175
176 #[serde(rename = "data-finish")]
178 Finish {
179 message_id: String,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 final_output: Option<String>,
182 },
183
184 #[serde(rename = "data-error")]
186 Error { error: ExecutionError },
187
188 #[serde(rename = "data-execution-start")]
193 ExecutionStart {
194 execution_id: String,
195 #[serde(skip_serializing_if = "Option::is_none")]
196 parent_id: Option<String>,
197 #[serde(skip_serializing_if = "Option::is_none")]
198 parent_type: Option<String>,
199 timestamp: i64,
200 },
201
202 #[serde(rename = "data-execution-end")]
204 ExecutionEnd {
205 execution_id: String,
206 #[serde(skip_serializing_if = "Option::is_none")]
207 final_output: Option<String>,
208 duration_ms: u64,
209 timestamp: i64,
210 },
211
212 #[serde(rename = "data-execution-failed")]
214 ExecutionFailed {
215 execution_id: String,
216 error: ExecutionError,
217 timestamp: i64,
218 },
219
220 #[serde(rename = "data-execution-paused")]
222 ExecutionPaused {
223 execution_id: String,
224 reason: String,
225 timestamp: i64,
226 },
227
228 #[serde(rename = "data-execution-resumed")]
230 ExecutionResumed {
231 execution_id: String,
232 timestamp: i64,
233 },
234
235 #[serde(rename = "data-execution-cancelled")]
237 ExecutionCancelled {
238 execution_id: String,
239 reason: String,
240 timestamp: i64,
241 },
242
243 #[serde(rename = "data-step-start")]
248 StepStart {
249 execution_id: String,
250 step_id: String,
251 step_type: String,
252 name: String,
253 timestamp: i64,
254 },
255
256 #[serde(rename = "data-step-end")]
258 StepEnd {
259 execution_id: String,
260 step_id: String,
261 #[serde(skip_serializing_if = "Option::is_none")]
262 output: Option<String>,
263 duration_ms: u64,
264 timestamp: i64,
265 },
266
267 #[serde(rename = "data-step-failed")]
269 StepFailed {
270 execution_id: String,
271 step_id: String,
272 error: ExecutionError,
273 timestamp: i64,
274 },
275
276 #[serde(rename = "data-step-discovered")]
278 StepDiscovered {
279 execution_id: String,
280 step_id: String,
281 #[serde(skip_serializing_if = "Option::is_none")]
282 discovered_by: Option<String>,
283 source_type: String,
284 reason: String,
285 depth: u32,
286 timestamp: i64,
287 },
288
289 #[serde(rename = "data-artifact-created")]
294 ArtifactCreated {
295 execution_id: String,
296 step_id: String,
297 artifact_id: String,
298 artifact_type: String,
299 timestamp: i64,
300 },
301
302 #[serde(rename = "data-state-snapshot")]
307 StateSnapshot {
308 execution_id: String,
309 #[serde(skip_serializing_if = "Option::is_none")]
310 step_id: Option<String>,
311 state: serde_json::Value,
312 timestamp: i64,
313 },
314
315 #[serde(rename = "data-checkpoint-saved")]
317 CheckpointSaved {
318 execution_id: String,
319 #[serde(skip_serializing_if = "Option::is_none")]
320 step_id: Option<String>,
321 checkpoint_id: String,
322 state_hash: String,
323 timestamp: i64,
324 },
325
326 #[serde(rename = "data-goal-evaluated")]
328 GoalEvaluated {
329 execution_id: String,
330 #[serde(skip_serializing_if = "Option::is_none")]
331 step_id: Option<String>,
332 goal_id: String,
333 status: String, #[serde(skip_serializing_if = "Option::is_none")]
335 score: Option<f64>,
336 #[serde(skip_serializing_if = "Option::is_none")]
337 reason: Option<String>,
338 timestamp: i64,
339 },
340
341 #[serde(rename = "data-inbox-message")]
346 InboxMessage {
347 execution_id: String,
348 message_id: String,
349 message_type: String,
350 timestamp: i64,
351 },
352
353 #[serde(rename = "data-policy-decision")]
358 PolicyDecision {
359 execution_id: String,
360 #[serde(skip_serializing_if = "Option::is_none")]
361 step_id: Option<String>,
362 tool_name: String,
363 decision: String, #[serde(skip_serializing_if = "Option::is_none")]
365 reason: Option<String>,
366 timestamp: i64,
367 },
368
369 #[serde(rename = "data-llm-call-start")]
374 LlmCallStart {
375 execution_id: String,
376 #[serde(skip_serializing_if = "Option::is_none")]
377 step_id: Option<String>,
378 callable_name: String,
379 #[serde(skip_serializing_if = "Option::is_none")]
380 model: Option<String>,
381 history_length: usize,
382 timestamp: i64,
383 },
384
385 #[serde(rename = "data-llm-call-end")]
387 LlmCallEnd {
388 execution_id: String,
389 #[serde(skip_serializing_if = "Option::is_none")]
390 step_id: Option<String>,
391 duration_ms: u64,
392 #[serde(skip_serializing_if = "Option::is_none")]
393 prompt_tokens: Option<u32>,
394 #[serde(skip_serializing_if = "Option::is_none")]
395 completion_tokens: Option<u32>,
396 #[serde(skip_serializing_if = "Option::is_none")]
397 total_tokens: Option<u32>,
398 timestamp: i64,
399 },
400
401 #[serde(rename = "data-llm-call-failed")]
403 LlmCallFailed {
404 execution_id: String,
405 #[serde(skip_serializing_if = "Option::is_none")]
406 step_id: Option<String>,
407 error: String,
408 #[serde(skip_serializing_if = "Option::is_none")]
409 duration_ms: Option<u64>,
410 timestamp: i64,
411 },
412
413 #[serde(rename = "data-token-usage")]
418 TokenUsageRecorded {
419 execution_id: String,
420 #[serde(skip_serializing_if = "Option::is_none")]
421 step_id: Option<String>,
422 prompt_tokens: u32,
423 completion_tokens: u32,
424 total_tokens: u32,
425 cumulative_tokens: u64,
426 #[serde(skip_serializing_if = "Option::is_none")]
427 cost_usd: Option<f64>,
428 #[serde(skip_serializing_if = "Option::is_none")]
429 cumulative_cost_usd: Option<f64>,
430 timestamp: i64,
431 },
432
433 #[serde(rename = "data-memory-recalled")]
438 MemoryRecalled {
439 execution_id: String,
440 #[serde(skip_serializing_if = "Option::is_none")]
441 step_id: Option<String>,
442 query: String,
443 memories_count: usize,
444 duration_ms: u64,
445 #[serde(skip_serializing_if = "Option::is_none")]
446 session_id: Option<String>,
447 timestamp: i64,
448 },
449
450 #[serde(rename = "data-memory-stored")]
452 MemoryStored {
453 execution_id: String,
454 #[serde(skip_serializing_if = "Option::is_none")]
455 step_id: Option<String>,
456 memory_type: String, #[serde(skip_serializing_if = "Option::is_none")]
458 session_id: Option<String>,
459 timestamp: i64,
460 },
461
462 #[serde(rename = "data-guardrail-evaluated")]
467 GuardrailEvaluated {
468 execution_id: String,
469 #[serde(skip_serializing_if = "Option::is_none")]
470 step_id: Option<String>,
471 guardrail_name: String,
472 decision: String, #[serde(skip_serializing_if = "Option::is_none")]
474 reason: Option<String>,
475 #[serde(skip_serializing_if = "Option::is_none")]
476 score: Option<f64>,
477 timestamp: i64,
478 },
479
480 #[serde(rename = "data-reasoning-trace")]
485 ReasoningTrace {
486 execution_id: String,
487 #[serde(skip_serializing_if = "Option::is_none")]
488 step_id: Option<String>,
489 reasoning_type: String, content: String,
491 #[serde(skip_serializing_if = "Option::is_none")]
492 truncated: Option<bool>,
493 timestamp: i64,
494 },
495
496 #[serde(rename = "data-context-snapshot")]
498 ContextSnapshot {
499 execution_id: String,
500 #[serde(skip_serializing_if = "Option::is_none")]
501 step_id: Option<String>,
502 message_count: usize,
503 estimated_tokens: u32,
504 #[serde(skip_serializing_if = "Option::is_none")]
505 max_tokens: Option<u32>,
506 utilization_pct: f64,
507 timestamp: i64,
508 },
509
510 #[serde(rename = "data-feedback-received")]
515 FeedbackReceived {
516 execution_id: String,
517 #[serde(skip_serializing_if = "Option::is_none")]
518 step_id: Option<String>,
519 feedback_type: String, #[serde(skip_serializing_if = "Option::is_none")]
521 score: Option<f64>,
522 #[serde(skip_serializing_if = "Option::is_none")]
523 comment: Option<String>,
524 #[serde(skip_serializing_if = "Option::is_none")]
525 user_id: Option<String>,
526 timestamp: i64,
527 },
528}
529
530impl StreamEvent {
531 fn now() -> i64 {
533 std::time::SystemTime::now()
534 .duration_since(std::time::UNIX_EPOCH)
535 .unwrap_or_default()
536 .as_millis() as i64
537 }
538
539 fn generate_id() -> String {
541 use std::time::{SystemTime, UNIX_EPOCH};
542 let nanos = SystemTime::now()
543 .duration_since(UNIX_EPOCH)
544 .unwrap()
545 .as_nanos();
546 format!("id_{:x}", nanos)
547 }
548
549 pub fn text_start(execution_id: Option<&ExecutionId>) -> Self {
555 Self::TextStart {
556 id: Self::generate_id(),
557 execution_id: execution_id.map(|e| e.as_str().to_string()),
558 }
559 }
560
561 pub fn text_delta(id: impl Into<String>, delta: impl Into<String>) -> Self {
563 Self::TextDelta {
564 id: id.into(),
565 delta: delta.into(),
566 }
567 }
568
569 pub fn text_end(id: impl Into<String>) -> Self {
571 Self::TextEnd { id: id.into() }
572 }
573
574 pub fn start(message_id: impl Into<String>, execution_id: Option<&ExecutionId>) -> Self {
576 Self::Start {
577 message_id: message_id.into(),
578 execution_id: execution_id.map(|e| e.as_str().to_string()),
579 }
580 }
581
582 pub fn finish(message_id: impl Into<String>, final_output: Option<String>) -> Self {
584 Self::Finish {
585 message_id: message_id.into(),
586 final_output,
587 }
588 }
589
590 pub fn error(error: ExecutionError) -> Self {
592 Self::Error { error }
593 }
594
595 pub fn start_step(step_id: Option<&StepId>) -> Self {
597 Self::StartStep {
598 step_id: step_id.map(|s| s.as_str().to_string()),
599 }
600 }
601
602 pub fn finish_step(step_id: Option<&StepId>) -> Self {
604 Self::FinishStep {
605 step_id: step_id.map(|s| s.as_str().to_string()),
606 }
607 }
608
609 pub fn tool_input_start(tool_call_id: impl Into<String>, tool_name: impl Into<String>) -> Self {
615 Self::ToolInputStart {
616 tool_call_id: tool_call_id.into(),
617 tool_name: tool_name.into(),
618 }
619 }
620
621 pub fn tool_input_available(
623 tool_call_id: impl Into<String>,
624 tool_name: impl Into<String>,
625 input: serde_json::Value,
626 ) -> Self {
627 Self::ToolInputAvailable {
628 tool_call_id: tool_call_id.into(),
629 tool_name: tool_name.into(),
630 input,
631 }
632 }
633
634 pub fn tool_output_available(
636 tool_call_id: impl Into<String>,
637 output: serde_json::Value,
638 ) -> Self {
639 Self::ToolOutputAvailable {
640 tool_call_id: tool_call_id.into(),
641 output,
642 }
643 }
644
645 pub fn permission_request(
647 execution_id: &ExecutionId,
648 tool_name: impl Into<String>,
649 arguments: serde_json::Value,
650 policy: impl Into<String>,
651 ) -> Self {
652 Self::PermissionRequest {
653 execution_id: execution_id.as_str().to_string(),
654 tool_name: tool_name.into(),
655 arguments,
656 policy: policy.into(),
657 timestamp: Self::now(),
658 }
659 }
660
661 pub fn execution_start(execution_id: &ExecutionId) -> Self {
667 Self::ExecutionStart {
668 execution_id: execution_id.as_str().to_string(),
669 parent_id: None,
670 parent_type: None,
671 timestamp: Self::now(),
672 }
673 }
674
675 pub fn execution_start_with_parent(
677 execution_id: &ExecutionId,
678 parent_id: impl Into<String>,
679 parent_type: impl Into<String>,
680 ) -> Self {
681 Self::ExecutionStart {
682 execution_id: execution_id.as_str().to_string(),
683 parent_id: Some(parent_id.into()),
684 parent_type: Some(parent_type.into()),
685 timestamp: Self::now(),
686 }
687 }
688
689 pub fn execution_end(
691 execution_id: &ExecutionId,
692 final_output: Option<String>,
693 duration_ms: u64,
694 ) -> Self {
695 Self::ExecutionEnd {
696 execution_id: execution_id.as_str().to_string(),
697 final_output,
698 duration_ms,
699 timestamp: Self::now(),
700 }
701 }
702
703 pub fn execution_failed(execution_id: &ExecutionId, error: ExecutionError) -> Self {
705 Self::ExecutionFailed {
706 execution_id: execution_id.as_str().to_string(),
707 error,
708 timestamp: Self::now(),
709 }
710 }
711
712 pub fn execution_paused(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
714 Self::ExecutionPaused {
715 execution_id: execution_id.as_str().to_string(),
716 reason: reason.into(),
717 timestamp: Self::now(),
718 }
719 }
720
721 pub fn execution_resumed(execution_id: &ExecutionId) -> Self {
723 Self::ExecutionResumed {
724 execution_id: execution_id.as_str().to_string(),
725 timestamp: Self::now(),
726 }
727 }
728
729 pub fn execution_cancelled(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
731 Self::ExecutionCancelled {
732 execution_id: execution_id.as_str().to_string(),
733 reason: reason.into(),
734 timestamp: Self::now(),
735 }
736 }
737
738 pub fn step_start(
744 execution_id: &ExecutionId,
745 step_id: &StepId,
746 step_type: StepType,
747 name: impl Into<String>,
748 ) -> Self {
749 Self::StepStart {
750 execution_id: execution_id.as_str().to_string(),
751 step_id: step_id.as_str().to_string(),
752 step_type: step_type.to_string(),
753 name: name.into(),
754 timestamp: Self::now(),
755 }
756 }
757
758 pub fn step_end(
760 execution_id: &ExecutionId,
761 step_id: &StepId,
762 output: Option<String>,
763 duration_ms: u64,
764 ) -> Self {
765 Self::StepEnd {
766 execution_id: execution_id.as_str().to_string(),
767 step_id: step_id.as_str().to_string(),
768 output,
769 duration_ms,
770 timestamp: Self::now(),
771 }
772 }
773
774 pub fn step_failed(
776 execution_id: &ExecutionId,
777 step_id: &StepId,
778 error: ExecutionError,
779 ) -> Self {
780 Self::StepFailed {
781 execution_id: execution_id.as_str().to_string(),
782 step_id: step_id.as_str().to_string(),
783 error,
784 timestamp: Self::now(),
785 }
786 }
787
788 pub fn step_discovered(
793 execution_id: &ExecutionId,
794 step_id: &StepId,
795 discovered_by: Option<&StepId>,
796 source_type: StepSourceType,
797 reason: impl Into<String>,
798 depth: u32,
799 ) -> Self {
800 Self::StepDiscovered {
801 execution_id: execution_id.as_str().to_string(),
802 step_id: step_id.as_str().to_string(),
803 discovered_by: discovered_by.map(|s| s.as_str().to_string()),
804 source_type: format!("{:?}", source_type).to_lowercase(),
805 reason: reason.into(),
806 depth,
807 timestamp: Self::now(),
808 }
809 }
810
811 pub fn artifact_created(
817 execution_id: &ExecutionId,
818 step_id: &StepId,
819 artifact_id: &ArtifactId,
820 artifact_type: impl Into<String>,
821 ) -> Self {
822 Self::ArtifactCreated {
823 execution_id: execution_id.as_str().to_string(),
824 step_id: step_id.as_str().to_string(),
825 artifact_id: artifact_id.as_str().to_string(),
826 artifact_type: artifact_type.into(),
827 timestamp: Self::now(),
828 }
829 }
830
831 pub fn inbox_message(
840 execution_id: &ExecutionId,
841 message_id: &str,
842 message_type: crate::inbox::InboxMessageType,
843 ) -> Self {
844 Self::InboxMessage {
845 execution_id: execution_id.as_str().to_string(),
846 message_id: message_id.to_string(),
847 message_type: format!("{:?}", message_type).to_lowercase(),
848 timestamp: Self::now(),
849 }
850 }
851
852 pub fn policy_decision_allow(
860 execution_id: &ExecutionId,
861 step_id: Option<&StepId>,
862 tool_name: impl Into<String>,
863 ) -> Self {
864 Self::PolicyDecision {
865 execution_id: execution_id.as_str().to_string(),
866 step_id: step_id.map(|s| s.as_str().to_string()),
867 tool_name: tool_name.into(),
868 decision: "allow".to_string(),
869 reason: None,
870 timestamp: Self::now(),
871 }
872 }
873
874 pub fn policy_decision_deny(
876 execution_id: &ExecutionId,
877 step_id: Option<&StepId>,
878 tool_name: impl Into<String>,
879 reason: impl Into<String>,
880 ) -> Self {
881 Self::PolicyDecision {
882 execution_id: execution_id.as_str().to_string(),
883 step_id: step_id.map(|s| s.as_str().to_string()),
884 tool_name: tool_name.into(),
885 decision: "deny".to_string(),
886 reason: Some(reason.into()),
887 timestamp: Self::now(),
888 }
889 }
890
891 pub fn policy_decision_warn(
893 execution_id: &ExecutionId,
894 step_id: Option<&StepId>,
895 tool_name: impl Into<String>,
896 message: impl Into<String>,
897 ) -> Self {
898 Self::PolicyDecision {
899 execution_id: execution_id.as_str().to_string(),
900 step_id: step_id.map(|s| s.as_str().to_string()),
901 tool_name: tool_name.into(),
902 decision: "warn".to_string(),
903 reason: Some(message.into()),
904 timestamp: Self::now(),
905 }
906 }
907
908 pub fn llm_call_start(
914 execution_id: &ExecutionId,
915 step_id: Option<&StepId>,
916 callable_name: impl Into<String>,
917 model: Option<String>,
918 history_length: usize,
919 ) -> Self {
920 Self::LlmCallStart {
921 execution_id: execution_id.as_str().to_string(),
922 step_id: step_id.map(|s| s.as_str().to_string()),
923 callable_name: callable_name.into(),
924 model,
925 history_length,
926 timestamp: Self::now(),
927 }
928 }
929
930 pub fn llm_call_end(
932 execution_id: &ExecutionId,
933 step_id: Option<&StepId>,
934 duration_ms: u64,
935 prompt_tokens: Option<u32>,
936 completion_tokens: Option<u32>,
937 total_tokens: Option<u32>,
938 ) -> Self {
939 Self::LlmCallEnd {
940 execution_id: execution_id.as_str().to_string(),
941 step_id: step_id.map(|s| s.as_str().to_string()),
942 duration_ms,
943 prompt_tokens,
944 completion_tokens,
945 total_tokens,
946 timestamp: Self::now(),
947 }
948 }
949
950 pub fn llm_call_failed(
952 execution_id: &ExecutionId,
953 step_id: Option<&StepId>,
954 error: impl Into<String>,
955 duration_ms: Option<u64>,
956 ) -> Self {
957 Self::LlmCallFailed {
958 execution_id: execution_id.as_str().to_string(),
959 step_id: step_id.map(|s| s.as_str().to_string()),
960 error: error.into(),
961 duration_ms,
962 timestamp: Self::now(),
963 }
964 }
965
966 pub fn token_usage_recorded(
972 execution_id: &ExecutionId,
973 step_id: Option<&StepId>,
974 prompt_tokens: u32,
975 completion_tokens: u32,
976 cumulative_tokens: u64,
977 cost_usd: Option<f64>,
978 cumulative_cost_usd: Option<f64>,
979 ) -> Self {
980 Self::TokenUsageRecorded {
981 execution_id: execution_id.as_str().to_string(),
982 step_id: step_id.map(|s| s.as_str().to_string()),
983 prompt_tokens,
984 completion_tokens,
985 total_tokens: prompt_tokens + completion_tokens,
986 cumulative_tokens,
987 cost_usd,
988 cumulative_cost_usd,
989 timestamp: Self::now(),
990 }
991 }
992
993 pub fn memory_recalled(
999 execution_id: &ExecutionId,
1000 step_id: Option<&StepId>,
1001 query: impl Into<String>,
1002 memories_count: usize,
1003 duration_ms: u64,
1004 session_id: Option<String>,
1005 ) -> Self {
1006 Self::MemoryRecalled {
1007 execution_id: execution_id.as_str().to_string(),
1008 step_id: step_id.map(|s| s.as_str().to_string()),
1009 query: query.into(),
1010 memories_count,
1011 duration_ms,
1012 session_id,
1013 timestamp: Self::now(),
1014 }
1015 }
1016
1017 pub fn memory_stored(
1019 execution_id: &ExecutionId,
1020 step_id: Option<&StepId>,
1021 memory_type: impl Into<String>,
1022 session_id: Option<String>,
1023 ) -> Self {
1024 Self::MemoryStored {
1025 execution_id: execution_id.as_str().to_string(),
1026 step_id: step_id.map(|s| s.as_str().to_string()),
1027 memory_type: memory_type.into(),
1028 session_id,
1029 timestamp: Self::now(),
1030 }
1031 }
1032
1033 pub fn guardrail_evaluated(
1039 execution_id: &ExecutionId,
1040 step_id: Option<&StepId>,
1041 guardrail_name: impl Into<String>,
1042 decision: impl Into<String>,
1043 reason: Option<String>,
1044 score: Option<f64>,
1045 ) -> Self {
1046 Self::GuardrailEvaluated {
1047 execution_id: execution_id.as_str().to_string(),
1048 step_id: step_id.map(|s| s.as_str().to_string()),
1049 guardrail_name: guardrail_name.into(),
1050 decision: decision.into(),
1051 reason,
1052 score,
1053 timestamp: Self::now(),
1054 }
1055 }
1056
1057 pub fn reasoning_trace(
1063 execution_id: &ExecutionId,
1064 step_id: Option<&StepId>,
1065 reasoning_type: impl Into<String>,
1066 content: impl Into<String>,
1067 truncated: Option<bool>,
1068 ) -> Self {
1069 Self::ReasoningTrace {
1070 execution_id: execution_id.as_str().to_string(),
1071 step_id: step_id.map(|s| s.as_str().to_string()),
1072 reasoning_type: reasoning_type.into(),
1073 content: content.into(),
1074 truncated,
1075 timestamp: Self::now(),
1076 }
1077 }
1078
1079 pub fn context_window_snapshot(
1081 execution_id: &ExecutionId,
1082 step_id: Option<&StepId>,
1083 message_count: usize,
1084 estimated_tokens: u32,
1085 max_tokens: Option<u32>,
1086 utilization_pct: f64,
1087 ) -> Self {
1088 Self::ContextSnapshot {
1089 execution_id: execution_id.as_str().to_string(),
1090 step_id: step_id.map(|s| s.as_str().to_string()),
1091 message_count,
1092 estimated_tokens,
1093 max_tokens,
1094 utilization_pct,
1095 timestamp: Self::now(),
1096 }
1097 }
1098
1099 pub fn feedback_received(
1105 execution_id: &ExecutionId,
1106 step_id: Option<&StepId>,
1107 feedback_type: impl Into<String>,
1108 score: Option<f64>,
1109 comment: Option<String>,
1110 user_id: Option<String>,
1111 ) -> Self {
1112 Self::FeedbackReceived {
1113 execution_id: execution_id.as_str().to_string(),
1114 step_id: step_id.map(|s| s.as_str().to_string()),
1115 feedback_type: feedback_type.into(),
1116 score,
1117 comment,
1118 user_id,
1119 timestamp: Self::now(),
1120 }
1121 }
1122
1123 pub fn is_control_event(&self) -> bool {
1129 matches!(
1130 self,
1131 Self::ExecutionPaused { .. }
1132 | Self::ExecutionResumed { .. }
1133 | Self::ExecutionCancelled { .. }
1134 )
1135 }
1136
1137 pub fn is_delta_event(&self) -> bool {
1139 matches!(self, Self::TextDelta { .. } | Self::ToolInputDelta { .. })
1140 }
1141
1142 pub fn is_summary_event(&self) -> bool {
1144 !self.is_delta_event()
1145 }
1146
1147 pub fn to_sse(&self) -> String {
1149 format!(
1150 "data: {}\n\n",
1151 serde_json::to_string(self).unwrap_or_default()
1152 )
1153 }
1154
1155 pub fn done() -> String {
1157 "data: [DONE]\n\n".to_string()
1158 }
1159}
1160
1161#[derive(Clone)]
1171pub struct EventEmitter {
1172 events: std::sync::Arc<std::sync::Mutex<Vec<StreamEvent>>>,
1173 mode: StreamMode,
1174 event_log: Option<Arc<EventLog>>,
1176 execution_context: Option<ExecutionContext>,
1178}
1179
1180impl std::fmt::Debug for EventEmitter {
1181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1182 f.debug_struct("EventEmitter")
1183 .field(
1184 "events_count",
1185 &self.events.lock().map(|e| e.len()).unwrap_or(0),
1186 )
1187 .field("mode", &self.mode)
1188 .field("has_event_log", &self.event_log.is_some())
1189 .finish()
1190 }
1191}
1192
1193impl Default for EventEmitter {
1194 fn default() -> Self {
1195 Self::new()
1196 }
1197}
1198
1199impl EventEmitter {
1200 pub fn new() -> Self {
1202 Self {
1203 events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1204 mode: StreamMode::Full,
1205 event_log: None,
1206 execution_context: None,
1207 }
1208 }
1209
1210 pub fn with_mode(mode: StreamMode) -> Self {
1212 Self {
1213 events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1214 mode,
1215 event_log: None,
1216 execution_context: None,
1217 }
1218 }
1219
1220 pub fn with_persistence(event_log: Arc<EventLog>, execution_id: ExecutionId) -> Self {
1225 Self {
1226 events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
1227 mode: StreamMode::Full,
1228 event_log: Some(event_log),
1229 execution_context: Some(ExecutionContext::new(execution_id)),
1230 }
1231 }
1232
1233 pub fn set_mode(&mut self, mode: StreamMode) {
1235 self.mode = mode;
1236 }
1237
1238 pub fn set_event_log(&mut self, event_log: Arc<EventLog>, execution_id: ExecutionId) {
1240 self.event_log = Some(event_log);
1241 self.execution_context = Some(ExecutionContext::new(execution_id));
1242 }
1243
1244 pub fn emit(&self, event: StreamEvent) {
1249 let should_emit = match self.mode {
1250 StreamMode::Full => true,
1251 StreamMode::Summary => event.is_summary_event(),
1252 StreamMode::ControlOnly => event.is_control_event(),
1253 StreamMode::Silent => false,
1254 };
1255
1256 if should_emit {
1257 if let (Some(event_log), Some(ctx)) = (&self.event_log, &self.execution_context) {
1260 if let Some(exec_event) = self.to_execution_event(&event, ctx) {
1261 let log = Arc::clone(event_log);
1262 let evt = exec_event;
1263 tokio::spawn(async move {
1265 if let Err(e) = log.append(evt).await {
1266 tracing::warn!("Failed to persist event: {}", e);
1267 }
1268 });
1269 }
1270 }
1271
1272 if let Ok(mut events) = self.events.lock() {
1273 events.push(event);
1274 }
1275 }
1276 }
1277
1278 fn to_execution_event(
1283 &self,
1284 stream_event: &StreamEvent,
1285 ctx: &ExecutionContext,
1286 ) -> Option<ExecutionEvent> {
1287 let event_type = match stream_event {
1288 StreamEvent::ExecutionStart { .. } => ExecutionEventType::ExecutionStart,
1290 StreamEvent::ExecutionEnd { .. } => ExecutionEventType::ExecutionEnd,
1291 StreamEvent::ExecutionFailed { .. } => ExecutionEventType::ExecutionFailed,
1292 StreamEvent::ExecutionPaused { .. } => ExecutionEventType::ControlPause,
1293 StreamEvent::ExecutionResumed { .. } => ExecutionEventType::ControlResume,
1294 StreamEvent::ExecutionCancelled { .. } => ExecutionEventType::ExecutionCancelled,
1295
1296 StreamEvent::StepStart { .. } => ExecutionEventType::StepStart,
1298 StreamEvent::StepEnd { .. } => ExecutionEventType::StepEnd,
1299 StreamEvent::StepFailed { .. } => ExecutionEventType::StepFailed,
1300 StreamEvent::StepDiscovered { .. } => ExecutionEventType::StepDiscovered,
1301
1302 StreamEvent::ArtifactCreated { .. } => ExecutionEventType::ArtifactCreated,
1304
1305 StreamEvent::StateSnapshot { .. } => ExecutionEventType::StateSnapshot,
1307
1308 StreamEvent::InboxMessage { .. } => ExecutionEventType::InboxMessage,
1310
1311 StreamEvent::PolicyDecision { .. } => ExecutionEventType::DecisionMade,
1313
1314 StreamEvent::CheckpointSaved { .. } => ExecutionEventType::CheckpointSaved,
1315 StreamEvent::GoalEvaluated { .. } => ExecutionEventType::GoalEvaluated,
1316
1317 StreamEvent::ToolInputAvailable { .. } => ExecutionEventType::ToolCallStart,
1319 StreamEvent::ToolOutputAvailable { .. } => ExecutionEventType::ToolCallEnd,
1320 StreamEvent::PermissionRequest { .. } => ExecutionEventType::DecisionMade,
1321
1322 StreamEvent::LlmCallStart { .. } => ExecutionEventType::LlmCallStart,
1324 StreamEvent::LlmCallEnd { .. } => ExecutionEventType::LlmCallEnd,
1325 StreamEvent::LlmCallFailed { .. } => ExecutionEventType::LlmCallFailed,
1326
1327 StreamEvent::TokenUsageRecorded { .. } => ExecutionEventType::TokenUsageRecorded,
1329
1330 StreamEvent::MemoryRecalled { .. } => ExecutionEventType::MemoryRecalled,
1332 StreamEvent::MemoryStored { .. } => ExecutionEventType::MemoryStored,
1333
1334 StreamEvent::GuardrailEvaluated { .. } => ExecutionEventType::GuardrailEvaluated,
1336
1337 StreamEvent::ReasoningTrace { .. } => ExecutionEventType::ReasoningTrace,
1339 StreamEvent::ContextSnapshot { .. } => ExecutionEventType::ContextSnapshot,
1340
1341 StreamEvent::FeedbackReceived { .. } => ExecutionEventType::FeedbackReceived,
1343
1344 StreamEvent::Start { .. }
1350 | StreamEvent::Finish { .. }
1351 | StreamEvent::Error { .. }
1352 | StreamEvent::StartStep { .. }
1353 | StreamEvent::FinishStep { .. }
1354 | StreamEvent::TextStart { .. }
1355 | StreamEvent::TextDelta { .. }
1356 | StreamEvent::TextEnd { .. }
1357 | StreamEvent::ToolInputStart { .. }
1358 | StreamEvent::ToolInputDelta { .. } => return None,
1359 };
1360
1361 let mut context = ctx.clone();
1362 let mut payload: Option<serde_json::Value> = None;
1363 let mut duration_ms: Option<u64> = None;
1364
1365 match stream_event {
1366 StreamEvent::ExecutionEnd {
1367 final_output,
1368 duration_ms: dur,
1369 ..
1370 } => {
1371 duration_ms = Some(*dur);
1372 if let Some(output) = final_output {
1373 payload = Some(serde_json::json!({ "output": output }));
1374 }
1375 }
1376 StreamEvent::ExecutionFailed { error, .. } => {
1377 payload = serde_json::to_value(error)
1378 .ok()
1379 .map(|err| serde_json::json!({ "error": err }));
1380 }
1381 StreamEvent::ExecutionCancelled { reason, .. }
1382 | StreamEvent::ExecutionPaused { reason, .. } => {
1383 payload = Some(serde_json::json!({ "reason": reason }));
1384 }
1385 StreamEvent::StepStart {
1386 step_id,
1387 step_type,
1388 name,
1389 ..
1390 } => {
1391 context = context.with_step(StepId::from_string(step_id));
1392 payload = Some(serde_json::json!({ "step_type": step_type, "name": name }));
1393 }
1394 StreamEvent::StepEnd {
1395 step_id,
1396 output,
1397 duration_ms: dur,
1398 ..
1399 } => {
1400 context = context.with_step(StepId::from_string(step_id));
1401 duration_ms = Some(*dur);
1402 if let Some(out) = output {
1403 payload = Some(serde_json::json!({ "output": out }));
1404 }
1405 }
1406 StreamEvent::StepFailed { step_id, error, .. } => {
1407 context = context.with_step(StepId::from_string(step_id));
1408 payload = serde_json::to_value(error)
1409 .ok()
1410 .map(|err| serde_json::json!({ "error": err }));
1411 }
1412 StreamEvent::StepDiscovered {
1413 step_id,
1414 discovered_by,
1415 source_type,
1416 reason,
1417 depth,
1418 ..
1419 } => {
1420 context = context.with_step(StepId::from_string(step_id));
1421 payload = Some(serde_json::json!({
1422 "discovered_by": discovered_by,
1423 "source_type": source_type,
1424 "reason": reason,
1425 "depth": depth,
1426 }));
1427 }
1428 StreamEvent::ArtifactCreated {
1429 step_id,
1430 artifact_id,
1431 artifact_type,
1432 ..
1433 } => {
1434 context = context
1435 .with_step(StepId::from_string(step_id))
1436 .with_artifact(ArtifactId::from_string(artifact_id));
1437 payload = Some(serde_json::json!({ "artifact_type": artifact_type }));
1438 }
1439 StreamEvent::StateSnapshot { step_id, state, .. } => {
1440 if let Some(step) = step_id {
1441 context = context.with_step(StepId::from_string(step));
1442 }
1443 payload = Some(state.clone());
1444 }
1445 StreamEvent::InboxMessage {
1446 message_id,
1447 message_type,
1448 ..
1449 } => {
1450 payload = Some(serde_json::json!({
1451 "message_id": message_id,
1452 "message_type": message_type
1453 }));
1454 }
1455 StreamEvent::PolicyDecision {
1456 step_id,
1457 tool_name,
1458 decision,
1459 reason,
1460 ..
1461 } => {
1462 if let Some(step) = step_id {
1463 context = context.with_step(StepId::from_string(step));
1464 }
1465 payload = Some(serde_json::json!({
1466 "tool_name": tool_name,
1467 "decision": decision,
1468 "reason": reason,
1469 }));
1470 }
1471 StreamEvent::ToolInputAvailable {
1472 tool_call_id,
1473 tool_name,
1474 input,
1475 ..
1476 } => {
1477 payload = Some(serde_json::json!({
1478 "tool_call_id": tool_call_id,
1479 "tool_name": tool_name,
1480 "input": input,
1481 }));
1482 }
1483 StreamEvent::ToolOutputAvailable {
1484 tool_call_id,
1485 output,
1486 ..
1487 } => {
1488 payload = Some(serde_json::json!({
1489 "tool_call_id": tool_call_id,
1490 "output": output,
1491 }));
1492 }
1493 StreamEvent::PermissionRequest {
1494 tool_name,
1495 arguments,
1496 policy,
1497 ..
1498 } => {
1499 payload = Some(serde_json::json!({
1500 "tool_name": tool_name,
1501 "arguments": arguments,
1502 "policy": policy,
1503 }));
1504 }
1505 StreamEvent::CheckpointSaved {
1506 checkpoint_id,
1507 step_id,
1508 state_hash,
1509 ..
1510 } => {
1511 if let Some(step) = step_id {
1512 context = context.with_step(StepId::from_string(step));
1513 }
1514 payload = Some(serde_json::json!({
1515 "checkpoint_id": checkpoint_id,
1516 "state_hash": state_hash,
1517 }));
1518 }
1519 StreamEvent::GoalEvaluated {
1520 goal_id,
1521 step_id,
1522 status,
1523 score,
1524 reason,
1525 ..
1526 } => {
1527 if let Some(step) = step_id {
1528 context = context.with_step(StepId::from_string(step));
1529 }
1530 payload = Some(serde_json::json!({
1531 "goal_id": goal_id,
1532 "status": status,
1533 "score": score,
1534 "reason": reason,
1535 }));
1536 }
1537 StreamEvent::ExecutionStart { .. } => {}
1538
1539 StreamEvent::LlmCallStart {
1541 step_id,
1542 callable_name,
1543 model,
1544 history_length,
1545 ..
1546 } => {
1547 if let Some(step) = step_id {
1548 context = context.with_step(StepId::from_string(step));
1549 }
1550 payload = Some(serde_json::json!({
1551 "callable_name": callable_name,
1552 "model": model,
1553 "history_length": history_length,
1554 }));
1555 }
1556 StreamEvent::LlmCallEnd {
1557 step_id,
1558 duration_ms: dur,
1559 prompt_tokens,
1560 completion_tokens,
1561 total_tokens,
1562 ..
1563 } => {
1564 if let Some(step) = step_id {
1565 context = context.with_step(StepId::from_string(step));
1566 }
1567 duration_ms = Some(*dur);
1568 payload = Some(serde_json::json!({
1569 "prompt_tokens": prompt_tokens,
1570 "completion_tokens": completion_tokens,
1571 "total_tokens": total_tokens,
1572 }));
1573 }
1574 StreamEvent::LlmCallFailed {
1575 step_id,
1576 error,
1577 duration_ms: dur,
1578 ..
1579 } => {
1580 if let Some(step) = step_id {
1581 context = context.with_step(StepId::from_string(step));
1582 }
1583 duration_ms = *dur;
1584 payload = Some(serde_json::json!({
1585 "error": error,
1586 }));
1587 }
1588
1589 StreamEvent::TokenUsageRecorded {
1591 step_id,
1592 prompt_tokens,
1593 completion_tokens,
1594 total_tokens,
1595 cumulative_tokens,
1596 cost_usd,
1597 cumulative_cost_usd,
1598 ..
1599 } => {
1600 if let Some(step) = step_id {
1601 context = context.with_step(StepId::from_string(step));
1602 }
1603 payload = Some(serde_json::json!({
1604 "prompt_tokens": prompt_tokens,
1605 "completion_tokens": completion_tokens,
1606 "total_tokens": total_tokens,
1607 "cumulative_tokens": cumulative_tokens,
1608 "cost_usd": cost_usd,
1609 "cumulative_cost_usd": cumulative_cost_usd,
1610 }));
1611 }
1612
1613 StreamEvent::MemoryRecalled {
1615 step_id,
1616 query,
1617 memories_count,
1618 duration_ms: dur,
1619 session_id,
1620 ..
1621 } => {
1622 if let Some(step) = step_id {
1623 context = context.with_step(StepId::from_string(step));
1624 }
1625 duration_ms = Some(*dur);
1626 payload = Some(serde_json::json!({
1627 "query": query,
1628 "memories_count": memories_count,
1629 "session_id": session_id,
1630 }));
1631 }
1632 StreamEvent::MemoryStored {
1633 step_id,
1634 memory_type,
1635 session_id,
1636 ..
1637 } => {
1638 if let Some(step) = step_id {
1639 context = context.with_step(StepId::from_string(step));
1640 }
1641 payload = Some(serde_json::json!({
1642 "memory_type": memory_type,
1643 "session_id": session_id,
1644 }));
1645 }
1646
1647 StreamEvent::GuardrailEvaluated {
1649 step_id,
1650 guardrail_name,
1651 decision,
1652 reason,
1653 score,
1654 ..
1655 } => {
1656 if let Some(step) = step_id {
1657 context = context.with_step(StepId::from_string(step));
1658 }
1659 payload = Some(serde_json::json!({
1660 "guardrail_name": guardrail_name,
1661 "decision": decision,
1662 "reason": reason,
1663 "score": score,
1664 }));
1665 }
1666
1667 StreamEvent::ReasoningTrace {
1669 step_id,
1670 reasoning_type,
1671 content,
1672 truncated,
1673 ..
1674 } => {
1675 if let Some(step) = step_id {
1676 context = context.with_step(StepId::from_string(step));
1677 }
1678 payload = Some(serde_json::json!({
1679 "reasoning_type": reasoning_type,
1680 "content": content,
1681 "truncated": truncated,
1682 }));
1683 }
1684 StreamEvent::ContextSnapshot {
1685 step_id,
1686 message_count,
1687 estimated_tokens,
1688 max_tokens,
1689 utilization_pct,
1690 ..
1691 } => {
1692 if let Some(step) = step_id {
1693 context = context.with_step(StepId::from_string(step));
1694 }
1695 payload = Some(serde_json::json!({
1696 "message_count": message_count,
1697 "estimated_tokens": estimated_tokens,
1698 "max_tokens": max_tokens,
1699 "utilization_pct": utilization_pct,
1700 }));
1701 }
1702
1703 StreamEvent::FeedbackReceived {
1705 step_id,
1706 feedback_type,
1707 score,
1708 comment,
1709 user_id,
1710 ..
1711 } => {
1712 if let Some(step) = step_id {
1713 context = context.with_step(StepId::from_string(step));
1714 }
1715 payload = Some(serde_json::json!({
1716 "feedback_type": feedback_type,
1717 "score": score,
1718 "comment": comment,
1719 "user_id": user_id,
1720 }));
1721 }
1722
1723 _ => {}
1724 }
1725
1726 let mut event = ExecutionEvent::new(event_type, context);
1727 if let Some(ms) = duration_ms {
1728 event.duration_ms = Some(ms);
1729 }
1730 if let Some(data) = payload {
1731 event = event.with_payload(data);
1732 }
1733
1734 Some(event)
1735 }
1736
1737 pub fn emit_force(&self, event: StreamEvent) {
1739 if let Ok(mut events) = self.events.lock() {
1740 events.push(event);
1741 }
1742 }
1743
1744 pub fn drain(&self) -> Vec<StreamEvent> {
1746 if let Ok(mut events) = self.events.lock() {
1747 std::mem::take(&mut *events)
1748 } else {
1749 vec![]
1750 }
1751 }
1752
1753 pub fn mode(&self) -> StreamMode {
1755 self.mode
1756 }
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761 use super::*;
1762 use crate::kernel::{ExecutionId, StepId, StepType};
1763
1764 #[test]
1767 fn test_stream_mode_default() {
1768 assert_eq!(StreamMode::default(), StreamMode::Full);
1769 }
1770
1771 #[test]
1772 fn test_stream_mode_variants() {
1773 let modes = [
1774 StreamMode::Full,
1775 StreamMode::Summary,
1776 StreamMode::ControlOnly,
1777 StreamMode::Silent,
1778 ];
1779 assert_eq!(modes.len(), 4);
1780 }
1781
1782 #[test]
1785 fn test_stream_event_is_control_event() {
1786 let exec_id = ExecutionId::new();
1787
1788 assert!(StreamEvent::execution_paused(&exec_id, "paused").is_control_event());
1790 assert!(StreamEvent::execution_resumed(&exec_id).is_control_event());
1791 assert!(StreamEvent::execution_cancelled(&exec_id, "cancelled").is_control_event());
1792
1793 assert!(!StreamEvent::execution_start(&exec_id).is_control_event());
1795 assert!(!StreamEvent::text_start(None).is_control_event());
1796 }
1797
1798 #[test]
1799 fn test_stream_event_is_delta_event() {
1800 let delta = StreamEvent::text_delta("id1", "chunk");
1802 assert!(delta.is_delta_event());
1803
1804 let start = StreamEvent::text_start(None);
1806 assert!(!start.is_delta_event());
1807 }
1808
1809 #[test]
1810 fn test_stream_event_is_summary_event() {
1811 let start = StreamEvent::text_start(None);
1813 assert!(start.is_summary_event());
1814
1815 let delta = StreamEvent::text_delta("id1", "chunk");
1817 assert!(!delta.is_summary_event());
1818 }
1819
1820 #[test]
1821 fn test_stream_event_to_sse() {
1822 let event = StreamEvent::text_end("test-id");
1823 let sse = event.to_sse();
1824
1825 assert!(sse.starts_with("data: "));
1826 assert!(sse.ends_with("\n\n"));
1827 assert!(sse.contains("test-id"));
1828 }
1829
1830 #[test]
1831 fn test_stream_event_done() {
1832 let done = StreamEvent::done();
1833 assert_eq!(done, "data: [DONE]\n\n");
1834 }
1835
1836 #[test]
1839 fn test_stream_event_text_factories() {
1840 let exec_id = ExecutionId::new();
1841
1842 let start = StreamEvent::text_start(Some(&exec_id));
1843 assert!(matches!(start, StreamEvent::TextStart { .. }));
1844
1845 let delta = StreamEvent::text_delta("id1", "hello");
1846 assert!(matches!(delta, StreamEvent::TextDelta { delta, .. } if delta == "hello"));
1847
1848 let end = StreamEvent::text_end("id1");
1849 assert!(matches!(end, StreamEvent::TextEnd { .. }));
1850 }
1851
1852 #[test]
1853 fn test_stream_event_execution_factories() {
1854 let exec_id = ExecutionId::new();
1855
1856 let start = StreamEvent::execution_start(&exec_id);
1857 assert!(matches!(start, StreamEvent::ExecutionStart { .. }));
1858
1859 let end = StreamEvent::execution_end(&exec_id, Some("output".to_string()), 100);
1860 assert!(matches!(
1861 end,
1862 StreamEvent::ExecutionEnd {
1863 duration_ms: 100,
1864 ..
1865 }
1866 ));
1867
1868 use crate::kernel::ExecutionError;
1869 let error = ExecutionError::kernel_internal("error message");
1870 let failed = StreamEvent::execution_failed(&exec_id, error);
1871 assert!(matches!(failed, StreamEvent::ExecutionFailed { .. }));
1872
1873 let paused = StreamEvent::execution_paused(&exec_id, "reason");
1874 assert!(matches!(paused, StreamEvent::ExecutionPaused { .. }));
1875
1876 let resumed = StreamEvent::execution_resumed(&exec_id);
1877 assert!(matches!(resumed, StreamEvent::ExecutionResumed { .. }));
1878
1879 let cancelled = StreamEvent::execution_cancelled(&exec_id, "cancel reason");
1880 assert!(matches!(cancelled, StreamEvent::ExecutionCancelled { .. }));
1881 }
1882
1883 #[test]
1884 fn test_stream_event_step_factories() {
1885 let exec_id = ExecutionId::new();
1886 let step_id = StepId::new();
1887
1888 let start =
1889 StreamEvent::step_start(&exec_id, &step_id, StepType::FunctionNode, "test_step");
1890 assert!(matches!(start, StreamEvent::StepStart { .. }));
1891
1892 let end = StreamEvent::step_end(&exec_id, &step_id, Some("output".to_string()), 50);
1893 assert!(matches!(
1894 end,
1895 StreamEvent::StepEnd {
1896 duration_ms: 50,
1897 ..
1898 }
1899 ));
1900
1901 use crate::kernel::ExecutionError;
1902 let error = ExecutionError::kernel_internal("step error");
1903 let failed = StreamEvent::step_failed(&exec_id, &step_id, error);
1904 assert!(matches!(failed, StreamEvent::StepFailed { .. }));
1905 }
1906
1907 #[test]
1908 fn test_stream_event_tool_factories() {
1909 let input_start = StreamEvent::tool_input_start("call-123", "web_search");
1910 assert!(
1911 matches!(input_start, StreamEvent::ToolInputStart { tool_name, .. } if tool_name == "web_search")
1912 );
1913
1914 let input_avail = StreamEvent::tool_input_available(
1915 "call-123",
1916 "web_search",
1917 serde_json::json!({"q": "test"}),
1918 );
1919 assert!(matches!(
1920 input_avail,
1921 StreamEvent::ToolInputAvailable { .. }
1922 ));
1923
1924 let output_avail =
1925 StreamEvent::tool_output_available("call-123", serde_json::json!({"result": "ok"}));
1926 assert!(matches!(
1927 output_avail,
1928 StreamEvent::ToolOutputAvailable { .. }
1929 ));
1930 }
1931
1932 #[test]
1935 fn test_event_emitter_new() {
1936 let emitter = EventEmitter::new();
1937 assert_eq!(emitter.mode(), StreamMode::Full);
1938 }
1939
1940 #[test]
1941 fn test_event_emitter_with_mode() {
1942 let emitter = EventEmitter::with_mode(StreamMode::Summary);
1943 assert_eq!(emitter.mode(), StreamMode::Summary);
1944 }
1945
1946 #[test]
1947 fn test_event_emitter_set_mode() {
1948 let mut emitter = EventEmitter::new();
1949 emitter.set_mode(StreamMode::Silent);
1950 assert_eq!(emitter.mode(), StreamMode::Silent);
1951 }
1952
1953 #[test]
1954 fn test_event_emitter_emit_and_drain() {
1955 let emitter = EventEmitter::new();
1956 let exec_id = ExecutionId::new();
1957
1958 emitter.emit(StreamEvent::execution_start(&exec_id));
1959 emitter.emit(StreamEvent::execution_end(&exec_id, None, 100));
1960
1961 let events = emitter.drain();
1962 assert_eq!(events.len(), 2);
1963
1964 let events_after = emitter.drain();
1966 assert!(events_after.is_empty());
1967 }
1968
1969 #[test]
1970 fn test_event_emitter_mode_full() {
1971 let emitter = EventEmitter::with_mode(StreamMode::Full);
1972
1973 emitter.emit(StreamEvent::text_delta("id", "chunk"));
1975 emitter.emit(StreamEvent::text_end("id"));
1976
1977 let events = emitter.drain();
1978 assert_eq!(events.len(), 2);
1979 }
1980
1981 #[test]
1982 fn test_event_emitter_mode_summary() {
1983 let emitter = EventEmitter::with_mode(StreamMode::Summary);
1984
1985 emitter.emit(StreamEvent::text_delta("id", "chunk"));
1987 emitter.emit(StreamEvent::text_end("id"));
1988
1989 let events = emitter.drain();
1990 assert_eq!(events.len(), 1); }
1992
1993 #[test]
1994 fn test_event_emitter_mode_control_only() {
1995 let emitter = EventEmitter::with_mode(StreamMode::ControlOnly);
1996 let exec_id = ExecutionId::new();
1997
1998 emitter.emit(StreamEvent::execution_start(&exec_id));
2000 emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
2001
2002 let events = emitter.drain();
2003 assert_eq!(events.len(), 1); }
2005
2006 #[test]
2007 fn test_event_emitter_mode_silent() {
2008 let emitter = EventEmitter::with_mode(StreamMode::Silent);
2009 let exec_id = ExecutionId::new();
2010
2011 emitter.emit(StreamEvent::execution_start(&exec_id));
2013 emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
2014
2015 let events = emitter.drain();
2016 assert!(events.is_empty());
2017 }
2018
2019 #[test]
2020 fn test_event_emitter_emit_force() {
2021 let emitter = EventEmitter::with_mode(StreamMode::Silent);
2022 let exec_id = ExecutionId::new();
2023
2024 emitter.emit_force(StreamEvent::execution_start(&exec_id));
2026
2027 let events = emitter.drain();
2028 assert_eq!(events.len(), 1);
2029 }
2030
2031 #[test]
2032 fn test_event_emitter_serialization() {
2033 use crate::kernel::ExecutionError;
2034 let error = ExecutionError::kernel_internal("Test error").with_code("ERR_CODE".to_string());
2035 let event = StreamEvent::error(error);
2036 let json = serde_json::to_string(&event).unwrap();
2037
2038 assert!(json.contains("data-error"));
2039 assert!(json.contains("Test error"));
2040 assert!(json.contains("ERR_CODE"));
2041 }
2042}