Skip to main content

codex_runtime/runtime/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::sync::Arc;
4
5use crate::runtime::api::CommandExecOutputDeltaNotification;
6use crate::runtime::rpc_contract::methods;
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
9#[serde(untagged)]
10pub enum JsonRpcId {
11    Number(u64),
12    Text(String),
13}
14
15#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "camelCase")]
17pub enum Direction {
18    Inbound,
19    Outbound,
20}
21
22#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
23#[serde(rename_all = "camelCase")]
24pub enum MsgKind {
25    Response,
26    ServerRequest,
27    Notification,
28    Unknown,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct Envelope {
34    pub seq: u64,
35    pub ts_millis: i64,
36    pub direction: Direction,
37    pub kind: MsgKind,
38    pub rpc_id: Option<JsonRpcId>,
39    pub method: Option<Arc<str>>,
40    pub thread_id: Option<Arc<str>>,
41    pub turn_id: Option<Arc<str>>,
42    pub item_id: Option<Arc<str>>,
43    pub json: Arc<Value>,
44}
45
46#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "camelCase")]
48pub struct SkillsChangedNotification {}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct AgentMessageDeltaNotification {
52    pub thread_id: String,
53    pub turn_id: String,
54    pub item_id: Option<String>,
55    pub delta: String,
56}
57
58#[derive(Clone, Debug, PartialEq, Eq)]
59pub struct TurnCompletedNotification {
60    pub thread_id: String,
61    pub turn_id: String,
62    pub text: Option<String>,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct TurnFailedNotification {
67    pub thread_id: String,
68    pub turn_id: String,
69    pub code: Option<i64>,
70    pub message: Option<String>,
71}
72
73#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct TurnInterruptedNotification {
75    pub thread_id: String,
76    pub turn_id: String,
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct TurnCancelledNotification {
81    pub thread_id: String,
82    pub turn_id: String,
83}
84
85/// Detect the zero-payload `skills/changed` invalidation notification.
86/// Allocation: none. Complexity: O(1).
87pub fn extract_skills_changed_notification(
88    envelope: &Envelope,
89) -> Option<SkillsChangedNotification> {
90    if envelope.kind == MsgKind::Notification
91        && envelope.method.as_deref() == Some(crate::runtime::rpc_contract::methods::SKILLS_CHANGED)
92    {
93        Some(SkillsChangedNotification {})
94    } else {
95        None
96    }
97}
98
99/// Return true iff the envelope is a notification for the given method.
100fn is_notification(envelope: &Envelope, method: &str) -> bool {
101    envelope.kind == MsgKind::Notification && envelope.method.as_deref() == Some(method)
102}
103
104/// Extract (thread_id, turn_id) from an envelope, returning None if either is absent.
105fn thread_turn_ids(envelope: &Envelope) -> Option<(String, String)> {
106    Some((
107        envelope.thread_id.as_deref()?.to_owned(),
108        envelope.turn_id.as_deref()?.to_owned(),
109    ))
110}
111
112/// Parse one `command/exec/outputDelta` notification into a typed payload.
113/// Allocation: one params clone for serde deserialization. Complexity: O(n), n = delta payload size.
114pub fn extract_command_exec_output_delta(
115    envelope: &Envelope,
116) -> Option<CommandExecOutputDeltaNotification> {
117    if !is_notification(
118        envelope,
119        crate::runtime::rpc_contract::methods::COMMAND_EXEC_OUTPUT_DELTA,
120    ) {
121        return None;
122    }
123    let params = envelope.json.get("params")?.clone();
124    serde_json::from_value(params).ok()
125}
126
127/// Parse one `item/agentMessage/delta` notification into a typed payload.
128/// Allocation: clones thread/turn/item ids and delta String. Complexity: O(n), n = delta size.
129pub fn extract_agent_message_delta(envelope: &Envelope) -> Option<AgentMessageDeltaNotification> {
130    if !is_notification(envelope, methods::ITEM_AGENT_MESSAGE_DELTA) {
131        return None;
132    }
133    let (thread_id, turn_id) = thread_turn_ids(envelope)?;
134    Some(AgentMessageDeltaNotification {
135        thread_id,
136        turn_id,
137        item_id: envelope.item_id.as_deref().map(ToOwned::to_owned),
138        delta: envelope
139            .json
140            .get("params")?
141            .get("delta")?
142            .as_str()?
143            .to_owned(),
144    })
145}
146
147/// Parse one `turn/completed` notification into a typed payload.
148/// Allocation: clones ids and optional text. Complexity: O(n), n = text size.
149pub fn extract_turn_completed(envelope: &Envelope) -> Option<TurnCompletedNotification> {
150    if !is_notification(envelope, methods::TURN_COMPLETED) {
151        return None;
152    }
153    let (thread_id, turn_id) = thread_turn_ids(envelope)?;
154    let params = envelope.json.get("params")?;
155    Some(TurnCompletedNotification {
156        thread_id,
157        turn_id,
158        text: extract_text_from_params(params),
159    })
160}
161
162/// Parse one `turn/failed` notification into a typed payload.
163/// Allocation: clones ids and optional error message. Complexity: O(n), n = message size.
164pub fn extract_turn_failed(envelope: &Envelope) -> Option<TurnFailedNotification> {
165    if !is_notification(envelope, methods::TURN_FAILED) {
166        return None;
167    }
168    let (thread_id, turn_id) = thread_turn_ids(envelope)?;
169    let params = envelope.json.get("params")?;
170    let (code, message) = extract_error_message(params);
171    Some(TurnFailedNotification {
172        thread_id,
173        turn_id,
174        code,
175        message,
176    })
177}
178
179/// Parse one `turn/interrupted` notification into a typed payload.
180/// Allocation: clones ids. Complexity: O(1).
181pub fn extract_turn_interrupted(envelope: &Envelope) -> Option<TurnInterruptedNotification> {
182    if !is_notification(envelope, methods::TURN_INTERRUPTED) {
183        return None;
184    }
185    let (thread_id, turn_id) = thread_turn_ids(envelope)?;
186    Some(TurnInterruptedNotification { thread_id, turn_id })
187}
188
189/// Parse one `turn/cancelled` notification into a typed payload.
190/// Allocation: clones ids. Complexity: O(1).
191pub fn extract_turn_cancelled(envelope: &Envelope) -> Option<TurnCancelledNotification> {
192    if !is_notification(envelope, methods::TURN_CANCELLED) {
193        return None;
194    }
195    let (thread_id, turn_id) = thread_turn_ids(envelope)?;
196    Some(TurnCancelledNotification { thread_id, turn_id })
197}
198
199pub(crate) fn extract_text_from_params(params: &Value) -> Option<String> {
200    for ptr in ["/item/text", "/text", "/outputText", "/output/text"] {
201        if let Some(text) = params.pointer(ptr).and_then(Value::as_str) {
202            return Some(text.to_owned());
203        }
204    }
205
206    let content = params
207        .get("item")
208        .and_then(|item| item.get("content"))
209        .and_then(Value::as_array)?;
210    let mut joined = String::new();
211    for part in content {
212        if let Some(text) = part.get("text").and_then(Value::as_str) {
213            joined.push_str(text);
214        }
215    }
216    if joined.is_empty() {
217        None
218    } else {
219        Some(joined)
220    }
221}
222
223fn extract_error_message(root: &Value) -> (Option<i64>, Option<String>) {
224    let message = root
225        .get("message")
226        .and_then(Value::as_str)
227        .or_else(|| root.get("detail").and_then(Value::as_str))
228        .or_else(|| root.get("reason").and_then(Value::as_str))
229        .or_else(|| root.get("text").and_then(Value::as_str))
230        .or_else(|| {
231            root.get("error")
232                .and_then(|value| value.get("message"))
233                .and_then(Value::as_str)
234        })
235        .map(ToOwned::to_owned);
236    let code = root.get("code").and_then(Value::as_i64).or_else(|| {
237        root.get("error")
238            .and_then(|value| value.get("code"))
239            .and_then(Value::as_i64)
240    });
241    (code, message)
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use serde_json::json;
248
249    #[test]
250    fn detects_skills_changed_notification() {
251        let envelope = Envelope {
252            seq: 1,
253            ts_millis: 0,
254            direction: Direction::Inbound,
255            kind: MsgKind::Notification,
256            rpc_id: None,
257            method: Some(Arc::from("skills/changed")),
258            thread_id: None,
259            turn_id: None,
260            item_id: None,
261            json: Arc::new(json!({"method":"skills/changed","params":{}})),
262        };
263
264        assert_eq!(
265            extract_skills_changed_notification(&envelope),
266            Some(SkillsChangedNotification {})
267        );
268    }
269
270    #[test]
271    fn rejects_non_skills_changed_notification() {
272        let envelope = Envelope {
273            seq: 1,
274            ts_millis: 0,
275            direction: Direction::Inbound,
276            kind: MsgKind::ServerRequest,
277            rpc_id: Some(JsonRpcId::Number(1)),
278            method: Some(Arc::from("skills/changed")),
279            thread_id: None,
280            turn_id: None,
281            item_id: None,
282            json: Arc::new(json!({"id":1,"method":"skills/changed","params":{}})),
283        };
284
285        assert_eq!(extract_skills_changed_notification(&envelope), None);
286    }
287
288    #[test]
289    fn detects_command_exec_output_delta_notification() {
290        let envelope = Envelope {
291            seq: 1,
292            ts_millis: 0,
293            direction: Direction::Inbound,
294            kind: MsgKind::Notification,
295            rpc_id: None,
296            method: Some(Arc::from("command/exec/outputDelta")),
297            thread_id: None,
298            turn_id: None,
299            item_id: None,
300            json: Arc::new(json!({
301                "method":"command/exec/outputDelta",
302                "params":{
303                    "processId":"proc-1",
304                    "stream":"stdout",
305                    "deltaBase64":"aGVsbG8=",
306                    "capReached":false
307                }
308            })),
309        };
310
311        let notification =
312            extract_command_exec_output_delta(&envelope).expect("typed output delta notification");
313        assert_eq!(notification.process_id, "proc-1");
314        assert_eq!(notification.delta_base64, "aGVsbG8=");
315    }
316
317    #[test]
318    fn detects_agent_message_delta_notification() {
319        let envelope = Envelope {
320            seq: 1,
321            ts_millis: 0,
322            direction: Direction::Inbound,
323            kind: MsgKind::Notification,
324            rpc_id: None,
325            method: Some(Arc::from("item/agentMessage/delta")),
326            thread_id: Some(Arc::from("thr_1")),
327            turn_id: Some(Arc::from("turn_1")),
328            item_id: Some(Arc::from("item_1")),
329            json: Arc::new(json!({
330                "method":"item/agentMessage/delta",
331                "params":{"threadId":"thr_1","turnId":"turn_1","itemId":"item_1","delta":"hello"}
332            })),
333        };
334
335        let notification = extract_agent_message_delta(&envelope).expect("agent delta");
336        assert_eq!(notification.thread_id, "thr_1");
337        assert_eq!(notification.turn_id, "turn_1");
338        assert_eq!(notification.item_id.as_deref(), Some("item_1"));
339        assert_eq!(notification.delta, "hello");
340    }
341
342    #[test]
343    fn detects_turn_completed_notification() {
344        let envelope = Envelope {
345            seq: 1,
346            ts_millis: 0,
347            direction: Direction::Inbound,
348            kind: MsgKind::Notification,
349            rpc_id: None,
350            method: Some(Arc::from("turn/completed")),
351            thread_id: Some(Arc::from("thr_1")),
352            turn_id: Some(Arc::from("turn_1")),
353            item_id: None,
354            json: Arc::new(json!({
355                "method":"turn/completed",
356                "params":{"threadId":"thr_1","turnId":"turn_1","text":"done"}
357            })),
358        };
359
360        let notification = extract_turn_completed(&envelope).expect("turn completed");
361        assert_eq!(notification.thread_id, "thr_1");
362        assert_eq!(notification.turn_id, "turn_1");
363        assert_eq!(notification.text.as_deref(), Some("done"));
364    }
365
366    #[test]
367    fn detects_turn_failed_notification() {
368        let envelope = Envelope {
369            seq: 1,
370            ts_millis: 0,
371            direction: Direction::Inbound,
372            kind: MsgKind::Notification,
373            rpc_id: None,
374            method: Some(Arc::from("turn/failed")),
375            thread_id: Some(Arc::from("thr_1")),
376            turn_id: Some(Arc::from("turn_1")),
377            item_id: None,
378            json: Arc::new(json!({
379                "method":"turn/failed",
380                "params":{"threadId":"thr_1","turnId":"turn_1","error":{"code":429,"message":"rate limited"}}
381            })),
382        };
383
384        let notification = extract_turn_failed(&envelope).expect("turn failed");
385        assert_eq!(notification.thread_id, "thr_1");
386        assert_eq!(notification.turn_id, "turn_1");
387        assert_eq!(notification.code, Some(429));
388        assert_eq!(notification.message.as_deref(), Some("rate limited"));
389    }
390
391    #[test]
392    fn detects_turn_interrupted_notification() {
393        let envelope = Envelope {
394            seq: 1,
395            ts_millis: 0,
396            direction: Direction::Inbound,
397            kind: MsgKind::Notification,
398            rpc_id: None,
399            method: Some(Arc::from("turn/interrupted")),
400            thread_id: Some(Arc::from("thr_1")),
401            turn_id: Some(Arc::from("turn_1")),
402            item_id: None,
403            json: Arc::new(json!({
404                "method":"turn/interrupted",
405                "params":{"threadId":"thr_1","turnId":"turn_1"}
406            })),
407        };
408
409        let notification = extract_turn_interrupted(&envelope).expect("turn interrupted");
410        assert_eq!(notification.thread_id, "thr_1");
411        assert_eq!(notification.turn_id, "turn_1");
412    }
413
414    #[test]
415    fn detects_turn_cancelled_notification() {
416        let envelope = Envelope {
417            seq: 1,
418            ts_millis: 0,
419            direction: Direction::Inbound,
420            kind: MsgKind::Notification,
421            rpc_id: None,
422            method: Some(Arc::from("turn/cancelled")),
423            thread_id: Some(Arc::from("thr_1")),
424            turn_id: Some(Arc::from("turn_1")),
425            item_id: None,
426            json: Arc::new(json!({
427                "method":"turn/cancelled",
428                "params":{"threadId":"thr_1","turnId":"turn_1"}
429            })),
430        };
431
432        let notification = extract_turn_cancelled(&envelope).expect("turn cancelled");
433        assert_eq!(notification.thread_id, "thr_1");
434        assert_eq!(notification.turn_id, "turn_1");
435    }
436}