use super::event_logger::EventLog;
use crate::kernel::{ArtifactId, ExecutionError, ExecutionId, StepId, StepSourceType, StepType};
use crate::kernel::{ExecutionContext, ExecutionEvent, ExecutionEventType};
use futures::Stream;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use std::sync::Arc;
pub type EventStream = Pin<Box<dyn Stream<Item = StreamEvent> + Send>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StreamMode {
#[default]
Full,
Summary,
ControlOnly,
Silent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum StreamEvent {
#[serde(rename = "data-text-start")]
TextStart {
id: String,
#[serde(skip_serializing_if = "Option::is_none")]
execution_id: Option<String>,
},
#[serde(rename = "data-text-delta")]
TextDelta { id: String, delta: String },
#[serde(rename = "data-text-end")]
TextEnd { id: String },
#[serde(rename = "data-start-step")]
StartStep {
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
},
#[serde(rename = "data-finish-step")]
FinishStep {
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
},
#[serde(rename = "data-tool-input-start")]
ToolInputStart {
tool_call_id: String,
tool_name: String,
},
#[serde(rename = "data-tool-input-delta")]
ToolInputDelta {
tool_call_id: String,
input_text_delta: String,
},
#[serde(rename = "data-tool-input-available")]
ToolInputAvailable {
tool_call_id: String,
tool_name: String,
input: serde_json::Value,
},
#[serde(rename = "data-tool-output-available")]
ToolOutputAvailable {
tool_call_id: String,
output: serde_json::Value,
},
#[serde(rename = "data-permission-request")]
PermissionRequest {
execution_id: String,
tool_name: String,
arguments: serde_json::Value,
policy: String,
timestamp: i64,
},
#[serde(rename = "data-start")]
Start {
message_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
execution_id: Option<String>,
},
#[serde(rename = "data-finish")]
Finish {
message_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
final_output: Option<String>,
},
#[serde(rename = "data-error")]
Error { error: ExecutionError },
#[serde(rename = "data-execution-start")]
ExecutionStart {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
parent_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
parent_type: Option<String>,
timestamp: i64,
},
#[serde(rename = "data-execution-end")]
ExecutionEnd {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
final_output: Option<String>,
duration_ms: u64,
timestamp: i64,
},
#[serde(rename = "data-execution-failed")]
ExecutionFailed {
execution_id: String,
error: ExecutionError,
timestamp: i64,
},
#[serde(rename = "data-execution-paused")]
ExecutionPaused {
execution_id: String,
reason: String,
timestamp: i64,
},
#[serde(rename = "data-execution-resumed")]
ExecutionResumed {
execution_id: String,
timestamp: i64,
},
#[serde(rename = "data-execution-cancelled")]
ExecutionCancelled {
execution_id: String,
reason: String,
timestamp: i64,
},
#[serde(rename = "data-step-start")]
StepStart {
execution_id: String,
step_id: String,
step_type: String,
name: String,
timestamp: i64,
},
#[serde(rename = "data-step-end")]
StepEnd {
execution_id: String,
step_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
output: Option<String>,
duration_ms: u64,
timestamp: i64,
},
#[serde(rename = "data-step-failed")]
StepFailed {
execution_id: String,
step_id: String,
error: ExecutionError,
timestamp: i64,
},
#[serde(rename = "data-step-discovered")]
StepDiscovered {
execution_id: String,
step_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
discovered_by: Option<String>,
source_type: String,
reason: String,
depth: u32,
timestamp: i64,
},
#[serde(rename = "data-artifact-created")]
ArtifactCreated {
execution_id: String,
step_id: String,
artifact_id: String,
artifact_type: String,
timestamp: i64,
},
#[serde(rename = "data-state-snapshot")]
StateSnapshot {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
state: serde_json::Value,
timestamp: i64,
},
#[serde(rename = "data-checkpoint-saved")]
CheckpointSaved {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
checkpoint_id: String,
state_hash: String,
timestamp: i64,
},
#[serde(rename = "data-goal-evaluated")]
GoalEvaluated {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
goal_id: String,
status: String, #[serde(skip_serializing_if = "Option::is_none")]
score: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
timestamp: i64,
},
#[serde(rename = "data-inbox-message")]
InboxMessage {
execution_id: String,
message_id: String,
message_type: String,
timestamp: i64,
},
#[serde(rename = "data-policy-decision")]
PolicyDecision {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
tool_name: String,
decision: String, #[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
timestamp: i64,
},
#[serde(rename = "data-llm-call-start")]
LlmCallStart {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
callable_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
model: Option<String>,
history_length: usize,
timestamp: i64,
},
#[serde(rename = "data-llm-call-end")]
LlmCallEnd {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
prompt_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
completion_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
total_tokens: Option<u32>,
timestamp: i64,
},
#[serde(rename = "data-llm-call-failed")]
LlmCallFailed {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
error: String,
#[serde(skip_serializing_if = "Option::is_none")]
duration_ms: Option<u64>,
timestamp: i64,
},
#[serde(rename = "data-token-usage")]
TokenUsageRecorded {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
prompt_tokens: u32,
completion_tokens: u32,
total_tokens: u32,
cumulative_tokens: u64,
#[serde(skip_serializing_if = "Option::is_none")]
cost_usd: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
cumulative_cost_usd: Option<f64>,
timestamp: i64,
},
#[serde(rename = "data-memory-recalled")]
MemoryRecalled {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
query: String,
memories_count: usize,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
timestamp: i64,
},
#[serde(rename = "data-memory-stored")]
MemoryStored {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
memory_type: String, #[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
timestamp: i64,
},
#[serde(rename = "data-guardrail-evaluated")]
GuardrailEvaluated {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
guardrail_name: String,
decision: String, #[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
score: Option<f64>,
timestamp: i64,
},
#[serde(rename = "data-reasoning-trace")]
ReasoningTrace {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
reasoning_type: String, content: String,
#[serde(skip_serializing_if = "Option::is_none")]
truncated: Option<bool>,
timestamp: i64,
},
#[serde(rename = "data-context-snapshot")]
ContextSnapshot {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
message_count: usize,
estimated_tokens: u32,
#[serde(skip_serializing_if = "Option::is_none")]
max_tokens: Option<u32>,
utilization_pct: f64,
timestamp: i64,
},
#[serde(rename = "data-feedback-received")]
FeedbackReceived {
execution_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
step_id: Option<String>,
feedback_type: String, #[serde(skip_serializing_if = "Option::is_none")]
score: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
comment: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
user_id: Option<String>,
timestamp: i64,
},
}
impl StreamEvent {
fn now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
fn generate_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("id_{:x}", nanos)
}
pub fn text_start(execution_id: Option<&ExecutionId>) -> Self {
Self::TextStart {
id: Self::generate_id(),
execution_id: execution_id.map(|e| e.as_str().to_string()),
}
}
pub fn text_delta(id: impl Into<String>, delta: impl Into<String>) -> Self {
Self::TextDelta {
id: id.into(),
delta: delta.into(),
}
}
pub fn text_end(id: impl Into<String>) -> Self {
Self::TextEnd { id: id.into() }
}
pub fn start(message_id: impl Into<String>, execution_id: Option<&ExecutionId>) -> Self {
Self::Start {
message_id: message_id.into(),
execution_id: execution_id.map(|e| e.as_str().to_string()),
}
}
pub fn finish(message_id: impl Into<String>, final_output: Option<String>) -> Self {
Self::Finish {
message_id: message_id.into(),
final_output,
}
}
pub fn error(error: ExecutionError) -> Self {
Self::Error { error }
}
pub fn start_step(step_id: Option<&StepId>) -> Self {
Self::StartStep {
step_id: step_id.map(|s| s.as_str().to_string()),
}
}
pub fn finish_step(step_id: Option<&StepId>) -> Self {
Self::FinishStep {
step_id: step_id.map(|s| s.as_str().to_string()),
}
}
pub fn tool_input_start(tool_call_id: impl Into<String>, tool_name: impl Into<String>) -> Self {
Self::ToolInputStart {
tool_call_id: tool_call_id.into(),
tool_name: tool_name.into(),
}
}
pub fn tool_input_available(
tool_call_id: impl Into<String>,
tool_name: impl Into<String>,
input: serde_json::Value,
) -> Self {
Self::ToolInputAvailable {
tool_call_id: tool_call_id.into(),
tool_name: tool_name.into(),
input,
}
}
pub fn tool_output_available(
tool_call_id: impl Into<String>,
output: serde_json::Value,
) -> Self {
Self::ToolOutputAvailable {
tool_call_id: tool_call_id.into(),
output,
}
}
pub fn permission_request(
execution_id: &ExecutionId,
tool_name: impl Into<String>,
arguments: serde_json::Value,
policy: impl Into<String>,
) -> Self {
Self::PermissionRequest {
execution_id: execution_id.as_str().to_string(),
tool_name: tool_name.into(),
arguments,
policy: policy.into(),
timestamp: Self::now(),
}
}
pub fn execution_start(execution_id: &ExecutionId) -> Self {
Self::ExecutionStart {
execution_id: execution_id.as_str().to_string(),
parent_id: None,
parent_type: None,
timestamp: Self::now(),
}
}
pub fn execution_start_with_parent(
execution_id: &ExecutionId,
parent_id: impl Into<String>,
parent_type: impl Into<String>,
) -> Self {
Self::ExecutionStart {
execution_id: execution_id.as_str().to_string(),
parent_id: Some(parent_id.into()),
parent_type: Some(parent_type.into()),
timestamp: Self::now(),
}
}
pub fn execution_end(
execution_id: &ExecutionId,
final_output: Option<String>,
duration_ms: u64,
) -> Self {
Self::ExecutionEnd {
execution_id: execution_id.as_str().to_string(),
final_output,
duration_ms,
timestamp: Self::now(),
}
}
pub fn execution_failed(execution_id: &ExecutionId, error: ExecutionError) -> Self {
Self::ExecutionFailed {
execution_id: execution_id.as_str().to_string(),
error,
timestamp: Self::now(),
}
}
pub fn execution_paused(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
Self::ExecutionPaused {
execution_id: execution_id.as_str().to_string(),
reason: reason.into(),
timestamp: Self::now(),
}
}
pub fn execution_resumed(execution_id: &ExecutionId) -> Self {
Self::ExecutionResumed {
execution_id: execution_id.as_str().to_string(),
timestamp: Self::now(),
}
}
pub fn execution_cancelled(execution_id: &ExecutionId, reason: impl Into<String>) -> Self {
Self::ExecutionCancelled {
execution_id: execution_id.as_str().to_string(),
reason: reason.into(),
timestamp: Self::now(),
}
}
pub fn step_start(
execution_id: &ExecutionId,
step_id: &StepId,
step_type: StepType,
name: impl Into<String>,
) -> Self {
Self::StepStart {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.as_str().to_string(),
step_type: step_type.to_string(),
name: name.into(),
timestamp: Self::now(),
}
}
pub fn step_end(
execution_id: &ExecutionId,
step_id: &StepId,
output: Option<String>,
duration_ms: u64,
) -> Self {
Self::StepEnd {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.as_str().to_string(),
output,
duration_ms,
timestamp: Self::now(),
}
}
pub fn step_failed(
execution_id: &ExecutionId,
step_id: &StepId,
error: ExecutionError,
) -> Self {
Self::StepFailed {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.as_str().to_string(),
error,
timestamp: Self::now(),
}
}
pub fn step_discovered(
execution_id: &ExecutionId,
step_id: &StepId,
discovered_by: Option<&StepId>,
source_type: StepSourceType,
reason: impl Into<String>,
depth: u32,
) -> Self {
Self::StepDiscovered {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.as_str().to_string(),
discovered_by: discovered_by.map(|s| s.as_str().to_string()),
source_type: format!("{:?}", source_type).to_lowercase(),
reason: reason.into(),
depth,
timestamp: Self::now(),
}
}
pub fn artifact_created(
execution_id: &ExecutionId,
step_id: &StepId,
artifact_id: &ArtifactId,
artifact_type: impl Into<String>,
) -> Self {
Self::ArtifactCreated {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.as_str().to_string(),
artifact_id: artifact_id.as_str().to_string(),
artifact_type: artifact_type.into(),
timestamp: Self::now(),
}
}
pub fn inbox_message(
execution_id: &ExecutionId,
message_id: &str,
message_type: crate::inbox::InboxMessageType,
) -> Self {
Self::InboxMessage {
execution_id: execution_id.as_str().to_string(),
message_id: message_id.to_string(),
message_type: format!("{:?}", message_type).to_lowercase(),
timestamp: Self::now(),
}
}
pub fn policy_decision_allow(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
tool_name: impl Into<String>,
) -> Self {
Self::PolicyDecision {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
tool_name: tool_name.into(),
decision: "allow".to_string(),
reason: None,
timestamp: Self::now(),
}
}
pub fn policy_decision_deny(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
tool_name: impl Into<String>,
reason: impl Into<String>,
) -> Self {
Self::PolicyDecision {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
tool_name: tool_name.into(),
decision: "deny".to_string(),
reason: Some(reason.into()),
timestamp: Self::now(),
}
}
pub fn policy_decision_warn(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
tool_name: impl Into<String>,
message: impl Into<String>,
) -> Self {
Self::PolicyDecision {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
tool_name: tool_name.into(),
decision: "warn".to_string(),
reason: Some(message.into()),
timestamp: Self::now(),
}
}
pub fn llm_call_start(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
callable_name: impl Into<String>,
model: Option<String>,
history_length: usize,
) -> Self {
Self::LlmCallStart {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
callable_name: callable_name.into(),
model,
history_length,
timestamp: Self::now(),
}
}
pub fn llm_call_end(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
duration_ms: u64,
prompt_tokens: Option<u32>,
completion_tokens: Option<u32>,
total_tokens: Option<u32>,
) -> Self {
Self::LlmCallEnd {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
duration_ms,
prompt_tokens,
completion_tokens,
total_tokens,
timestamp: Self::now(),
}
}
pub fn llm_call_failed(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
error: impl Into<String>,
duration_ms: Option<u64>,
) -> Self {
Self::LlmCallFailed {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
error: error.into(),
duration_ms,
timestamp: Self::now(),
}
}
pub fn token_usage_recorded(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
prompt_tokens: u32,
completion_tokens: u32,
cumulative_tokens: u64,
cost_usd: Option<f64>,
cumulative_cost_usd: Option<f64>,
) -> Self {
Self::TokenUsageRecorded {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
prompt_tokens,
completion_tokens,
total_tokens: prompt_tokens + completion_tokens,
cumulative_tokens,
cost_usd,
cumulative_cost_usd,
timestamp: Self::now(),
}
}
pub fn memory_recalled(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
query: impl Into<String>,
memories_count: usize,
duration_ms: u64,
session_id: Option<String>,
) -> Self {
Self::MemoryRecalled {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
query: query.into(),
memories_count,
duration_ms,
session_id,
timestamp: Self::now(),
}
}
pub fn memory_stored(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
memory_type: impl Into<String>,
session_id: Option<String>,
) -> Self {
Self::MemoryStored {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
memory_type: memory_type.into(),
session_id,
timestamp: Self::now(),
}
}
pub fn guardrail_evaluated(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
guardrail_name: impl Into<String>,
decision: impl Into<String>,
reason: Option<String>,
score: Option<f64>,
) -> Self {
Self::GuardrailEvaluated {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
guardrail_name: guardrail_name.into(),
decision: decision.into(),
reason,
score,
timestamp: Self::now(),
}
}
pub fn reasoning_trace(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
reasoning_type: impl Into<String>,
content: impl Into<String>,
truncated: Option<bool>,
) -> Self {
Self::ReasoningTrace {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
reasoning_type: reasoning_type.into(),
content: content.into(),
truncated,
timestamp: Self::now(),
}
}
pub fn context_window_snapshot(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
message_count: usize,
estimated_tokens: u32,
max_tokens: Option<u32>,
utilization_pct: f64,
) -> Self {
Self::ContextSnapshot {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
message_count,
estimated_tokens,
max_tokens,
utilization_pct,
timestamp: Self::now(),
}
}
pub fn feedback_received(
execution_id: &ExecutionId,
step_id: Option<&StepId>,
feedback_type: impl Into<String>,
score: Option<f64>,
comment: Option<String>,
user_id: Option<String>,
) -> Self {
Self::FeedbackReceived {
execution_id: execution_id.as_str().to_string(),
step_id: step_id.map(|s| s.as_str().to_string()),
feedback_type: feedback_type.into(),
score,
comment,
user_id,
timestamp: Self::now(),
}
}
pub fn is_control_event(&self) -> bool {
matches!(
self,
Self::ExecutionPaused { .. }
| Self::ExecutionResumed { .. }
| Self::ExecutionCancelled { .. }
)
}
pub fn is_delta_event(&self) -> bool {
matches!(self, Self::TextDelta { .. } | Self::ToolInputDelta { .. })
}
pub fn is_summary_event(&self) -> bool {
!self.is_delta_event()
}
pub fn to_sse(&self) -> String {
format!(
"data: {}\n\n",
serde_json::to_string(self).unwrap_or_default()
)
}
pub fn done() -> String {
"data: [DONE]\n\n".to_string()
}
}
#[derive(Clone)]
pub struct EventEmitter {
events: std::sync::Arc<std::sync::Mutex<Vec<StreamEvent>>>,
mode: StreamMode,
event_log: Option<Arc<EventLog>>,
execution_context: Option<ExecutionContext>,
}
impl std::fmt::Debug for EventEmitter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventEmitter")
.field(
"events_count",
&self.events.lock().map(|e| e.len()).unwrap_or(0),
)
.field("mode", &self.mode)
.field("has_event_log", &self.event_log.is_some())
.finish()
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
impl EventEmitter {
pub fn new() -> Self {
Self {
events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
mode: StreamMode::Full,
event_log: None,
execution_context: None,
}
}
pub fn with_mode(mode: StreamMode) -> Self {
Self {
events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
mode,
event_log: None,
execution_context: None,
}
}
pub fn with_persistence(event_log: Arc<EventLog>, execution_id: ExecutionId) -> Self {
Self {
events: std::sync::Arc::new(std::sync::Mutex::new(Vec::new())),
mode: StreamMode::Full,
event_log: Some(event_log),
execution_context: Some(ExecutionContext::new(execution_id)),
}
}
pub fn set_mode(&mut self, mode: StreamMode) {
self.mode = mode;
}
pub fn set_event_log(&mut self, event_log: Arc<EventLog>, execution_id: ExecutionId) {
self.event_log = Some(event_log);
self.execution_context = Some(ExecutionContext::new(execution_id));
}
pub fn emit(&self, event: StreamEvent) {
let should_emit = match self.mode {
StreamMode::Full => true,
StreamMode::Summary => event.is_summary_event(),
StreamMode::ControlOnly => event.is_control_event(),
StreamMode::Silent => false,
};
if should_emit {
if let (Some(event_log), Some(ctx)) = (&self.event_log, &self.execution_context) {
if let Some(exec_event) = self.to_execution_event(&event, ctx) {
let log = Arc::clone(event_log);
let evt = exec_event;
tokio::spawn(async move {
if let Err(e) = log.append(evt).await {
tracing::warn!("Failed to persist event: {}", e);
}
});
}
}
if let Ok(mut events) = self.events.lock() {
events.push(event);
}
}
}
fn to_execution_event(
&self,
stream_event: &StreamEvent,
ctx: &ExecutionContext,
) -> Option<ExecutionEvent> {
let event_type = match stream_event {
StreamEvent::ExecutionStart { .. } => ExecutionEventType::ExecutionStart,
StreamEvent::ExecutionEnd { .. } => ExecutionEventType::ExecutionEnd,
StreamEvent::ExecutionFailed { .. } => ExecutionEventType::ExecutionFailed,
StreamEvent::ExecutionPaused { .. } => ExecutionEventType::ControlPause,
StreamEvent::ExecutionResumed { .. } => ExecutionEventType::ControlResume,
StreamEvent::ExecutionCancelled { .. } => ExecutionEventType::ExecutionCancelled,
StreamEvent::StepStart { .. } => ExecutionEventType::StepStart,
StreamEvent::StepEnd { .. } => ExecutionEventType::StepEnd,
StreamEvent::StepFailed { .. } => ExecutionEventType::StepFailed,
StreamEvent::StepDiscovered { .. } => ExecutionEventType::StepDiscovered,
StreamEvent::ArtifactCreated { .. } => ExecutionEventType::ArtifactCreated,
StreamEvent::StateSnapshot { .. } => ExecutionEventType::StateSnapshot,
StreamEvent::InboxMessage { .. } => ExecutionEventType::InboxMessage,
StreamEvent::PolicyDecision { .. } => ExecutionEventType::DecisionMade,
StreamEvent::CheckpointSaved { .. } => ExecutionEventType::CheckpointSaved,
StreamEvent::GoalEvaluated { .. } => ExecutionEventType::GoalEvaluated,
StreamEvent::ToolInputAvailable { .. } => ExecutionEventType::ToolCallStart,
StreamEvent::ToolOutputAvailable { .. } => ExecutionEventType::ToolCallEnd,
StreamEvent::PermissionRequest { .. } => ExecutionEventType::DecisionMade,
StreamEvent::LlmCallStart { .. } => ExecutionEventType::LlmCallStart,
StreamEvent::LlmCallEnd { .. } => ExecutionEventType::LlmCallEnd,
StreamEvent::LlmCallFailed { .. } => ExecutionEventType::LlmCallFailed,
StreamEvent::TokenUsageRecorded { .. } => ExecutionEventType::TokenUsageRecorded,
StreamEvent::MemoryRecalled { .. } => ExecutionEventType::MemoryRecalled,
StreamEvent::MemoryStored { .. } => ExecutionEventType::MemoryStored,
StreamEvent::GuardrailEvaluated { .. } => ExecutionEventType::GuardrailEvaluated,
StreamEvent::ReasoningTrace { .. } => ExecutionEventType::ReasoningTrace,
StreamEvent::ContextSnapshot { .. } => ExecutionEventType::ContextSnapshot,
StreamEvent::FeedbackReceived { .. } => ExecutionEventType::FeedbackReceived,
StreamEvent::Start { .. }
| StreamEvent::Finish { .. }
| StreamEvent::Error { .. }
| StreamEvent::StartStep { .. }
| StreamEvent::FinishStep { .. }
| StreamEvent::TextStart { .. }
| StreamEvent::TextDelta { .. }
| StreamEvent::TextEnd { .. }
| StreamEvent::ToolInputStart { .. }
| StreamEvent::ToolInputDelta { .. } => return None,
};
let mut context = ctx.clone();
let mut payload: Option<serde_json::Value> = None;
let mut duration_ms: Option<u64> = None;
match stream_event {
StreamEvent::ExecutionEnd {
final_output,
duration_ms: dur,
..
} => {
duration_ms = Some(*dur);
if let Some(output) = final_output {
payload = Some(serde_json::json!({ "output": output }));
}
}
StreamEvent::ExecutionFailed { error, .. } => {
payload = serde_json::to_value(error)
.ok()
.map(|err| serde_json::json!({ "error": err }));
}
StreamEvent::ExecutionCancelled { reason, .. }
| StreamEvent::ExecutionPaused { reason, .. } => {
payload = Some(serde_json::json!({ "reason": reason }));
}
StreamEvent::StepStart {
step_id,
step_type,
name,
..
} => {
context = context.with_step(StepId::from_string(step_id));
payload = Some(serde_json::json!({ "step_type": step_type, "name": name }));
}
StreamEvent::StepEnd {
step_id,
output,
duration_ms: dur,
..
} => {
context = context.with_step(StepId::from_string(step_id));
duration_ms = Some(*dur);
if let Some(out) = output {
payload = Some(serde_json::json!({ "output": out }));
}
}
StreamEvent::StepFailed { step_id, error, .. } => {
context = context.with_step(StepId::from_string(step_id));
payload = serde_json::to_value(error)
.ok()
.map(|err| serde_json::json!({ "error": err }));
}
StreamEvent::StepDiscovered {
step_id,
discovered_by,
source_type,
reason,
depth,
..
} => {
context = context.with_step(StepId::from_string(step_id));
payload = Some(serde_json::json!({
"discovered_by": discovered_by,
"source_type": source_type,
"reason": reason,
"depth": depth,
}));
}
StreamEvent::ArtifactCreated {
step_id,
artifact_id,
artifact_type,
..
} => {
context = context
.with_step(StepId::from_string(step_id))
.with_artifact(ArtifactId::from_string(artifact_id));
payload = Some(serde_json::json!({ "artifact_type": artifact_type }));
}
StreamEvent::StateSnapshot { step_id, state, .. } => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(state.clone());
}
StreamEvent::InboxMessage {
message_id,
message_type,
..
} => {
payload = Some(serde_json::json!({
"message_id": message_id,
"message_type": message_type
}));
}
StreamEvent::PolicyDecision {
step_id,
tool_name,
decision,
reason,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"tool_name": tool_name,
"decision": decision,
"reason": reason,
}));
}
StreamEvent::ToolInputAvailable {
tool_call_id,
tool_name,
input,
..
} => {
payload = Some(serde_json::json!({
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"input": input,
}));
}
StreamEvent::ToolOutputAvailable {
tool_call_id,
output,
..
} => {
payload = Some(serde_json::json!({
"tool_call_id": tool_call_id,
"output": output,
}));
}
StreamEvent::PermissionRequest {
tool_name,
arguments,
policy,
..
} => {
payload = Some(serde_json::json!({
"tool_name": tool_name,
"arguments": arguments,
"policy": policy,
}));
}
StreamEvent::CheckpointSaved {
checkpoint_id,
step_id,
state_hash,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"checkpoint_id": checkpoint_id,
"state_hash": state_hash,
}));
}
StreamEvent::GoalEvaluated {
goal_id,
step_id,
status,
score,
reason,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"goal_id": goal_id,
"status": status,
"score": score,
"reason": reason,
}));
}
StreamEvent::ExecutionStart { .. } => {}
StreamEvent::LlmCallStart {
step_id,
callable_name,
model,
history_length,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"callable_name": callable_name,
"model": model,
"history_length": history_length,
}));
}
StreamEvent::LlmCallEnd {
step_id,
duration_ms: dur,
prompt_tokens,
completion_tokens,
total_tokens,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
duration_ms = Some(*dur);
payload = Some(serde_json::json!({
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
}));
}
StreamEvent::LlmCallFailed {
step_id,
error,
duration_ms: dur,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
duration_ms = *dur;
payload = Some(serde_json::json!({
"error": error,
}));
}
StreamEvent::TokenUsageRecorded {
step_id,
prompt_tokens,
completion_tokens,
total_tokens,
cumulative_tokens,
cost_usd,
cumulative_cost_usd,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"cumulative_tokens": cumulative_tokens,
"cost_usd": cost_usd,
"cumulative_cost_usd": cumulative_cost_usd,
}));
}
StreamEvent::MemoryRecalled {
step_id,
query,
memories_count,
duration_ms: dur,
session_id,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
duration_ms = Some(*dur);
payload = Some(serde_json::json!({
"query": query,
"memories_count": memories_count,
"session_id": session_id,
}));
}
StreamEvent::MemoryStored {
step_id,
memory_type,
session_id,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"memory_type": memory_type,
"session_id": session_id,
}));
}
StreamEvent::GuardrailEvaluated {
step_id,
guardrail_name,
decision,
reason,
score,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"guardrail_name": guardrail_name,
"decision": decision,
"reason": reason,
"score": score,
}));
}
StreamEvent::ReasoningTrace {
step_id,
reasoning_type,
content,
truncated,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"reasoning_type": reasoning_type,
"content": content,
"truncated": truncated,
}));
}
StreamEvent::ContextSnapshot {
step_id,
message_count,
estimated_tokens,
max_tokens,
utilization_pct,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"message_count": message_count,
"estimated_tokens": estimated_tokens,
"max_tokens": max_tokens,
"utilization_pct": utilization_pct,
}));
}
StreamEvent::FeedbackReceived {
step_id,
feedback_type,
score,
comment,
user_id,
..
} => {
if let Some(step) = step_id {
context = context.with_step(StepId::from_string(step));
}
payload = Some(serde_json::json!({
"feedback_type": feedback_type,
"score": score,
"comment": comment,
"user_id": user_id,
}));
}
_ => {}
}
let mut event = ExecutionEvent::new(event_type, context);
if let Some(ms) = duration_ms {
event.duration_ms = Some(ms);
}
if let Some(data) = payload {
event = event.with_payload(data);
}
Some(event)
}
pub fn emit_force(&self, event: StreamEvent) {
if let Ok(mut events) = self.events.lock() {
events.push(event);
}
}
pub fn drain(&self) -> Vec<StreamEvent> {
if let Ok(mut events) = self.events.lock() {
std::mem::take(&mut *events)
} else {
vec![]
}
}
pub fn mode(&self) -> StreamMode {
self.mode
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::kernel::{ExecutionId, StepId, StepType};
#[test]
fn test_stream_mode_default() {
assert_eq!(StreamMode::default(), StreamMode::Full);
}
#[test]
fn test_stream_mode_variants() {
let modes = [
StreamMode::Full,
StreamMode::Summary,
StreamMode::ControlOnly,
StreamMode::Silent,
];
assert_eq!(modes.len(), 4);
}
#[test]
fn test_stream_event_is_control_event() {
let exec_id = ExecutionId::new();
assert!(StreamEvent::execution_paused(&exec_id, "paused").is_control_event());
assert!(StreamEvent::execution_resumed(&exec_id).is_control_event());
assert!(StreamEvent::execution_cancelled(&exec_id, "cancelled").is_control_event());
assert!(!StreamEvent::execution_start(&exec_id).is_control_event());
assert!(!StreamEvent::text_start(None).is_control_event());
}
#[test]
fn test_stream_event_is_delta_event() {
let delta = StreamEvent::text_delta("id1", "chunk");
assert!(delta.is_delta_event());
let start = StreamEvent::text_start(None);
assert!(!start.is_delta_event());
}
#[test]
fn test_stream_event_is_summary_event() {
let start = StreamEvent::text_start(None);
assert!(start.is_summary_event());
let delta = StreamEvent::text_delta("id1", "chunk");
assert!(!delta.is_summary_event());
}
#[test]
fn test_stream_event_to_sse() {
let event = StreamEvent::text_end("test-id");
let sse = event.to_sse();
assert!(sse.starts_with("data: "));
assert!(sse.ends_with("\n\n"));
assert!(sse.contains("test-id"));
}
#[test]
fn test_stream_event_done() {
let done = StreamEvent::done();
assert_eq!(done, "data: [DONE]\n\n");
}
#[test]
fn test_stream_event_text_factories() {
let exec_id = ExecutionId::new();
let start = StreamEvent::text_start(Some(&exec_id));
assert!(matches!(start, StreamEvent::TextStart { .. }));
let delta = StreamEvent::text_delta("id1", "hello");
assert!(matches!(delta, StreamEvent::TextDelta { delta, .. } if delta == "hello"));
let end = StreamEvent::text_end("id1");
assert!(matches!(end, StreamEvent::TextEnd { .. }));
}
#[test]
fn test_stream_event_execution_factories() {
let exec_id = ExecutionId::new();
let start = StreamEvent::execution_start(&exec_id);
assert!(matches!(start, StreamEvent::ExecutionStart { .. }));
let end = StreamEvent::execution_end(&exec_id, Some("output".to_string()), 100);
assert!(matches!(
end,
StreamEvent::ExecutionEnd {
duration_ms: 100,
..
}
));
use crate::kernel::ExecutionError;
let error = ExecutionError::kernel_internal("error message");
let failed = StreamEvent::execution_failed(&exec_id, error);
assert!(matches!(failed, StreamEvent::ExecutionFailed { .. }));
let paused = StreamEvent::execution_paused(&exec_id, "reason");
assert!(matches!(paused, StreamEvent::ExecutionPaused { .. }));
let resumed = StreamEvent::execution_resumed(&exec_id);
assert!(matches!(resumed, StreamEvent::ExecutionResumed { .. }));
let cancelled = StreamEvent::execution_cancelled(&exec_id, "cancel reason");
assert!(matches!(cancelled, StreamEvent::ExecutionCancelled { .. }));
}
#[test]
fn test_stream_event_step_factories() {
let exec_id = ExecutionId::new();
let step_id = StepId::new();
let start =
StreamEvent::step_start(&exec_id, &step_id, StepType::FunctionNode, "test_step");
assert!(matches!(start, StreamEvent::StepStart { .. }));
let end = StreamEvent::step_end(&exec_id, &step_id, Some("output".to_string()), 50);
assert!(matches!(
end,
StreamEvent::StepEnd {
duration_ms: 50,
..
}
));
use crate::kernel::ExecutionError;
let error = ExecutionError::kernel_internal("step error");
let failed = StreamEvent::step_failed(&exec_id, &step_id, error);
assert!(matches!(failed, StreamEvent::StepFailed { .. }));
}
#[test]
fn test_stream_event_tool_factories() {
let input_start = StreamEvent::tool_input_start("call-123", "web_search");
assert!(
matches!(input_start, StreamEvent::ToolInputStart { tool_name, .. } if tool_name == "web_search")
);
let input_avail = StreamEvent::tool_input_available(
"call-123",
"web_search",
serde_json::json!({"q": "test"}),
);
assert!(matches!(
input_avail,
StreamEvent::ToolInputAvailable { .. }
));
let output_avail =
StreamEvent::tool_output_available("call-123", serde_json::json!({"result": "ok"}));
assert!(matches!(
output_avail,
StreamEvent::ToolOutputAvailable { .. }
));
}
#[test]
fn test_event_emitter_new() {
let emitter = EventEmitter::new();
assert_eq!(emitter.mode(), StreamMode::Full);
}
#[test]
fn test_event_emitter_with_mode() {
let emitter = EventEmitter::with_mode(StreamMode::Summary);
assert_eq!(emitter.mode(), StreamMode::Summary);
}
#[test]
fn test_event_emitter_set_mode() {
let mut emitter = EventEmitter::new();
emitter.set_mode(StreamMode::Silent);
assert_eq!(emitter.mode(), StreamMode::Silent);
}
#[test]
fn test_event_emitter_emit_and_drain() {
let emitter = EventEmitter::new();
let exec_id = ExecutionId::new();
emitter.emit(StreamEvent::execution_start(&exec_id));
emitter.emit(StreamEvent::execution_end(&exec_id, None, 100));
let events = emitter.drain();
assert_eq!(events.len(), 2);
let events_after = emitter.drain();
assert!(events_after.is_empty());
}
#[test]
fn test_event_emitter_mode_full() {
let emitter = EventEmitter::with_mode(StreamMode::Full);
emitter.emit(StreamEvent::text_delta("id", "chunk"));
emitter.emit(StreamEvent::text_end("id"));
let events = emitter.drain();
assert_eq!(events.len(), 2);
}
#[test]
fn test_event_emitter_mode_summary() {
let emitter = EventEmitter::with_mode(StreamMode::Summary);
emitter.emit(StreamEvent::text_delta("id", "chunk"));
emitter.emit(StreamEvent::text_end("id"));
let events = emitter.drain();
assert_eq!(events.len(), 1); }
#[test]
fn test_event_emitter_mode_control_only() {
let emitter = EventEmitter::with_mode(StreamMode::ControlOnly);
let exec_id = ExecutionId::new();
emitter.emit(StreamEvent::execution_start(&exec_id));
emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
let events = emitter.drain();
assert_eq!(events.len(), 1); }
#[test]
fn test_event_emitter_mode_silent() {
let emitter = EventEmitter::with_mode(StreamMode::Silent);
let exec_id = ExecutionId::new();
emitter.emit(StreamEvent::execution_start(&exec_id));
emitter.emit(StreamEvent::execution_paused(&exec_id, "test"));
let events = emitter.drain();
assert!(events.is_empty());
}
#[test]
fn test_event_emitter_emit_force() {
let emitter = EventEmitter::with_mode(StreamMode::Silent);
let exec_id = ExecutionId::new();
emitter.emit_force(StreamEvent::execution_start(&exec_id));
let events = emitter.drain();
assert_eq!(events.len(), 1);
}
#[test]
fn test_event_emitter_serialization() {
use crate::kernel::ExecutionError;
let error = ExecutionError::kernel_internal("Test error").with_code("ERR_CODE".to_string());
let event = StreamEvent::error(error);
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains("data-error"));
assert!(json.contains("Test error"));
assert!(json.contains("ERR_CODE"));
}
}