steer_core/events/
mod.rs

1use crate::api::Model;
2use crate::app::{Message, Operation, OperationOutcome};
3use crate::session::SessionInfo;
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use steer_tools::ToolCall;
8use steer_tools::ToolResult;
9
10/// Token usage information
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct Usage {
13    pub input_tokens: u32,
14    pub output_tokens: u32,
15}
16
17/// Unified event type for external consumers
18#[derive(Debug, Clone, Serialize, Deserialize)]
19#[serde(tag = "type", rename_all = "snake_case")]
20pub enum StreamEvent {
21    // Message events
22    MessagePart {
23        content: String,
24        message_id: String,
25    },
26    MessageComplete {
27        message: Message,
28        #[serde(skip_serializing_if = "Option::is_none")]
29        usage: Option<Usage>,
30        #[serde(default)]
31        metadata: HashMap<String, String>,
32        model: Model,
33    },
34
35    // Tool events
36    ToolCallStarted {
37        tool_call: ToolCall,
38        #[serde(default)]
39        metadata: HashMap<String, String>,
40        model: Model,
41    },
42    ToolCallCompleted {
43        tool_call_id: String,
44        result: ToolResult,
45        #[serde(default)]
46        metadata: HashMap<String, String>,
47        model: Model,
48    },
49    ToolCallFailed {
50        tool_call_id: String,
51        error: String,
52        #[serde(default)]
53        metadata: HashMap<String, String>,
54        model: Model,
55    },
56    ToolApprovalRequired {
57        tool_call: ToolCall,
58        timeout_ms: Option<u64>,
59        #[serde(default)]
60        metadata: HashMap<String, String>,
61    },
62
63    // Session events
64    SessionCreated {
65        session_id: String,
66        metadata: SessionMetadata,
67    },
68    SessionResumed {
69        session_id: String,
70        event_offset: u64,
71    },
72    SessionSaved {
73        session_id: String,
74    },
75
76    // Operation events
77    OperationStarted {
78        operation_id: uuid::Uuid,
79        operation: Operation,
80    },
81    OperationCompleted {
82        operation_id: uuid::Uuid,
83        outcome: OperationOutcome,
84    },
85    OperationCancelled {
86        operation_id: uuid::Uuid,
87        reason: String,
88    },
89
90    // System events
91    Error {
92        message: String,
93        error_type: ErrorType,
94    },
95
96    // Workspace events
97    WorkspaceChanged,
98    WorkspaceFiles {
99        files: Vec<String>,
100    },
101}
102
103/// Event with metadata for persistence and replay
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct StreamEventWithMetadata {
106    pub sequence_num: u64,
107    pub timestamp: DateTime<Utc>,
108    pub session_id: String,
109    pub event: StreamEvent,
110}
111
112impl StreamEventWithMetadata {
113    pub fn new(sequence_num: u64, session_id: String, event: StreamEvent) -> Self {
114        Self {
115            sequence_num,
116            timestamp: Utc::now(),
117            session_id,
118            event,
119        }
120    }
121}
122
123/// Session metadata for events
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct SessionMetadata {
126    pub model: Model,
127    pub created_at: DateTime<Utc>,
128    pub metadata: HashMap<String, String>,
129}
130
131impl From<&SessionInfo> for SessionMetadata {
132    fn from(session_info: &SessionInfo) -> Self {
133        Self {
134            model: session_info
135                .last_model
136                .unwrap_or(crate::api::Model::ClaudeSonnet4_20250514),
137            created_at: session_info.created_at,
138            metadata: session_info.metadata.clone(),
139        }
140    }
141}
142
143/// Error types for system events
144#[derive(Debug, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "snake_case")]
146pub enum ErrorType {
147    /// API error (OpenAI, Anthropic, etc.)
148    Api,
149    /// Tool execution error
150    Tool,
151    /// Session management error
152    Session,
153    /// Persistence/storage error
154    Storage,
155    /// Authentication/authorization error
156    Auth,
157    /// Network/transport error
158    Network,
159    /// Internal server error
160    Internal,
161    /// Validation error
162    Validation,
163    /// Resource limit exceeded
164    ResourceLimit,
165    /// Operation timeout
166    Timeout,
167}
168
169impl StreamEvent {
170    /// Check if this event indicates an error condition
171    pub fn is_error(&self) -> bool {
172        matches!(
173            self,
174            StreamEvent::Error { .. } | StreamEvent::ToolCallFailed { .. }
175        )
176    }
177
178    /// Get the operation ID if this event relates to an operation
179    pub fn operation_id(&self) -> Option<&uuid::Uuid> {
180        match self {
181            StreamEvent::OperationStarted { operation_id, .. }
182            | StreamEvent::OperationCompleted { operation_id, .. }
183            | StreamEvent::OperationCancelled { operation_id, .. } => Some(operation_id),
184            _ => None,
185        }
186    }
187
188    /// Get the session ID if this event relates to a session
189    pub fn session_id(&self) -> Option<&str> {
190        match self {
191            StreamEvent::SessionCreated { session_id, .. }
192            | StreamEvent::SessionResumed { session_id, .. }
193            | StreamEvent::SessionSaved { session_id } => Some(session_id),
194            _ => None,
195        }
196    }
197
198    /// Get the tool call ID if this event relates to a tool call
199    pub fn tool_call_id(&self) -> Option<&str> {
200        match self {
201            StreamEvent::ToolCallStarted { tool_call, .. } => Some(&tool_call.id),
202            StreamEvent::ToolCallCompleted { tool_call_id, .. } => Some(tool_call_id),
203            StreamEvent::ToolCallFailed { tool_call_id, .. } => Some(tool_call_id),
204            StreamEvent::ToolApprovalRequired { tool_call, .. } => Some(&tool_call.id),
205            _ => None,
206        }
207    }
208
209    /// Get the message ID if this event relates to a message
210    pub fn message_id(&self) -> Option<&str> {
211        match self {
212            StreamEvent::MessagePart { message_id, .. } => Some(message_id),
213            StreamEvent::MessageComplete { message, .. } => Some(message.id()),
214            _ => None,
215        }
216    }
217}
218
219/// Event filter for client subscriptions
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct EventFilter {
222    /// Only events matching these types
223    pub event_types: Option<Vec<String>>,
224    /// Only events after this sequence number
225    pub after_sequence: Option<u64>,
226    /// Only events for these sessions
227    pub session_ids: Option<Vec<String>>,
228    /// Only events for these operations
229    pub operation_ids: Option<Vec<String>>,
230    /// Only events for these tool calls
231    pub tool_call_ids: Option<Vec<String>>,
232}
233
234impl EventFilter {
235    /// Create an empty filter (matches all events)
236    pub fn all() -> Self {
237        Self {
238            event_types: None,
239            after_sequence: None,
240            session_ids: None,
241            operation_ids: None,
242            tool_call_ids: None,
243        }
244    }
245
246    /// Create a filter for specific event types
247    pub fn for_types(types: Vec<String>) -> Self {
248        Self {
249            event_types: Some(types),
250            after_sequence: None,
251            session_ids: None,
252            operation_ids: None,
253            tool_call_ids: None,
254        }
255    }
256
257    /// Create a filter for events after a sequence number
258    pub fn after_sequence(sequence: u64) -> Self {
259        Self {
260            event_types: None,
261            after_sequence: Some(sequence),
262            session_ids: None,
263            operation_ids: None,
264            tool_call_ids: None,
265        }
266    }
267
268    /// Create a filter for specific sessions
269    pub fn for_sessions(session_ids: Vec<String>) -> Self {
270        Self {
271            event_types: None,
272            after_sequence: None,
273            session_ids: Some(session_ids),
274            operation_ids: None,
275            tool_call_ids: None,
276        }
277    }
278
279    /// Check if an event matches this filter
280    pub fn matches(&self, event_with_metadata: &StreamEventWithMetadata) -> bool {
281        // Check sequence number
282        if let Some(after_seq) = self.after_sequence {
283            if event_with_metadata.sequence_num <= after_seq {
284                return false;
285            }
286        }
287
288        // Check session ID
289        if let Some(ref session_ids) = self.session_ids {
290            if !session_ids.contains(&event_with_metadata.session_id) {
291                return false;
292            }
293        }
294
295        // Check event type
296        if let Some(ref event_types) = self.event_types {
297            let event_type = match &event_with_metadata.event {
298                StreamEvent::MessagePart { .. } => "message_part",
299                StreamEvent::MessageComplete { .. } => "message_complete",
300                StreamEvent::ToolCallStarted { .. } => "tool_call_started",
301                StreamEvent::ToolCallCompleted { .. } => "tool_call_completed",
302                StreamEvent::ToolCallFailed { .. } => "tool_call_failed",
303                StreamEvent::ToolApprovalRequired { .. } => "tool_approval_required",
304                StreamEvent::SessionCreated { .. } => "session_created",
305                StreamEvent::SessionResumed { .. } => "session_resumed",
306                StreamEvent::SessionSaved { .. } => "session_saved",
307                StreamEvent::OperationStarted { .. } => "operation_started",
308                StreamEvent::OperationCompleted { .. } => "operation_completed",
309                StreamEvent::OperationCancelled { .. } => "operation_cancelled",
310                StreamEvent::Error { .. } => "error",
311                StreamEvent::WorkspaceChanged => "workspace_changed",
312                StreamEvent::WorkspaceFiles { .. } => "workspace_files",
313            };
314            if !event_types.contains(&event_type.to_string()) {
315                return false;
316            }
317        }
318
319        // Check operation ID
320        if let Some(ref operation_ids) = self.operation_ids {
321            if let Some(op_id) = event_with_metadata.event.operation_id() {
322                if !operation_ids.contains(&op_id.to_string()) {
323                    return false;
324                }
325            } else {
326                return false;
327            }
328        }
329
330        // Check tool call ID
331        if let Some(ref tool_call_ids) = self.tool_call_ids {
332            if let Some(tool_id) = event_with_metadata.event.tool_call_id() {
333                if !tool_call_ids.contains(&tool_id.to_string()) {
334                    return false;
335                }
336            } else {
337                return false;
338            }
339        }
340
341        true
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::app::Message;
349    use crate::app::conversation::{AssistantContent, MessageData};
350
351    #[test]
352    fn test_stream_event_serialization() {
353        let event = StreamEvent::ToolCallFailed {
354            tool_call_id: "tool_123".to_string(),
355            error: "Failed to execute".to_string(),
356            metadata: HashMap::new(),
357            model: crate::api::Model::ClaudeSonnet4_20250514,
358        };
359
360        let serialized = serde_json::to_string(&event).unwrap();
361        let deserialized: StreamEvent = serde_json::from_str(&serialized).unwrap();
362
363        assert!(matches!(deserialized, StreamEvent::ToolCallFailed { .. }));
364        match deserialized {
365            StreamEvent::ToolCallFailed {
366                tool_call_id,
367                error,
368                ..
369            } => {
370                assert_eq!(tool_call_id, "tool_123");
371                assert_eq!(error, "Failed to execute");
372            }
373            _ => unreachable!(),
374        }
375    }
376
377    #[test]
378    fn test_event_with_metadata() {
379        let event = StreamEvent::MessagePart {
380            message_id: "msg_123".to_string(),
381            content: "Hello".to_string(),
382        };
383        let event_with_metadata = StreamEventWithMetadata::new(1, "session_123".to_string(), event);
384
385        assert_eq!(event_with_metadata.sequence_num, 1);
386        assert_eq!(event_with_metadata.session_id, "session_123");
387        assert!(event_with_metadata.timestamp <= Utc::now());
388    }
389
390    #[test]
391    fn test_event_type_checks() {
392        let error_event = StreamEvent::Error {
393            message: "Test error".to_string(),
394            error_type: ErrorType::Api,
395        };
396        assert!(error_event.is_error());
397
398        let tool_failed = StreamEvent::ToolCallFailed {
399            tool_call_id: "tool_123".to_string(),
400            error: "Command failed".to_string(),
401            metadata: HashMap::new(),
402            model: crate::api::Model::ClaudeSonnet4_20250514,
403        };
404        assert!(tool_failed.is_error());
405
406        let tool_approval = StreamEvent::ToolApprovalRequired {
407            tool_call: ToolCall {
408                id: "tool_123".to_string(),
409                name: "edit_file".to_string(),
410                parameters: serde_json::json!({}),
411            },
412            timeout_ms: None,
413            metadata: HashMap::new(),
414        };
415        assert!(!tool_approval.is_error());
416    }
417
418    #[test]
419    fn test_event_id_extraction() {
420        let tool_event = StreamEvent::ToolCallFailed {
421            tool_call_id: "tool_123".to_string(),
422            error: "Failed".to_string(),
423            metadata: HashMap::new(),
424            model: crate::api::Model::ClaudeSonnet4_20250514,
425        };
426        assert_eq!(tool_event.tool_call_id(), Some("tool_123"));
427
428        let message_event = StreamEvent::MessagePart {
429            message_id: "msg_123".to_string(),
430            content: "Hello".to_string(),
431        };
432        assert_eq!(message_event.message_id(), Some("msg_123"));
433
434        let op_id = uuid::Uuid::new_v4();
435        let operation_event = StreamEvent::OperationStarted {
436            operation_id: op_id,
437            operation: crate::app::Operation::Bash {
438                cmd: "echo hello".to_string(),
439            },
440        };
441        assert_eq!(operation_event.operation_id(), Some(&op_id));
442
443        let session_event = StreamEvent::SessionCreated {
444            session_id: "session_123".to_string(),
445            metadata: SessionMetadata {
446                model: crate::api::Model::ClaudeSonnet4_20250514,
447                created_at: Utc::now(),
448                metadata: HashMap::new(),
449            },
450        };
451        assert_eq!(session_event.session_id(), Some("session_123"));
452    }
453
454    #[test]
455    fn test_event_filter() {
456        let event = StreamEvent::ToolCallFailed {
457            tool_call_id: "tool_123".to_string(),
458            error: "Failed".to_string(),
459            metadata: HashMap::new(),
460            model: crate::api::Model::ClaudeSonnet4_20250514,
461        };
462        let event_with_metadata = StreamEventWithMetadata::new(5, "session_123".to_string(), event);
463
464        // Test sequence filter
465        let after_filter = EventFilter::after_sequence(3);
466        assert!(after_filter.matches(&event_with_metadata));
467
468        let before_filter = EventFilter::after_sequence(5);
469        assert!(!before_filter.matches(&event_with_metadata));
470
471        // Test session filter
472        let session_filter = EventFilter::for_sessions(vec!["session_123".to_string()]);
473        assert!(session_filter.matches(&event_with_metadata));
474
475        let wrong_session_filter = EventFilter::for_sessions(vec!["session_456".to_string()]);
476        assert!(!wrong_session_filter.matches(&event_with_metadata));
477
478        // Test type filter
479        let type_filter = EventFilter::for_types(vec!["tool_call_failed".to_string()]);
480        assert!(type_filter.matches(&event_with_metadata));
481
482        let wrong_type_filter = EventFilter::for_types(vec!["message_part".to_string()]);
483        assert!(!wrong_type_filter.matches(&event_with_metadata));
484    }
485
486    #[test]
487    fn test_message_complete_event() {
488        let message = Message {
489            data: MessageData::Assistant {
490                content: vec![AssistantContent::Text {
491                    text: "Hello world".to_string(),
492                }],
493            },
494            timestamp: 0,
495            id: "msg_123".to_string(),
496            parent_message_id: None,
497        };
498
499        let event = StreamEvent::MessageComplete {
500            message,
501            usage: None,
502            metadata: HashMap::new(),
503            model: crate::api::Model::ClaudeSonnet4_20250514,
504        };
505        assert_eq!(event.message_id(), Some("msg_123"));
506    }
507}