Skip to main content

codex_runtime/runtime/
events.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::sync::Arc;
4
5use crate::runtime::api::CommandExecOutputDeltaNotification;
6
7#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
8#[serde(untagged)]
9pub enum JsonRpcId {
10    Number(u64),
11    Text(String),
12}
13
14#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
15#[serde(rename_all = "camelCase")]
16pub enum Direction {
17    Inbound,
18    Outbound,
19}
20
21#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
22#[serde(rename_all = "camelCase")]
23pub enum MsgKind {
24    Response,
25    ServerRequest,
26    Notification,
27    Unknown,
28}
29
30#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
31#[serde(rename_all = "camelCase")]
32pub struct Envelope {
33    pub seq: u64,
34    pub ts_millis: i64,
35    pub direction: Direction,
36    pub kind: MsgKind,
37    pub rpc_id: Option<JsonRpcId>,
38    pub method: Option<Arc<str>>,
39    pub thread_id: Option<Arc<str>>,
40    pub turn_id: Option<Arc<str>>,
41    pub item_id: Option<Arc<str>>,
42    pub json: Arc<Value>,
43}
44
45#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
46#[serde(rename_all = "camelCase")]
47pub struct SkillsChangedNotification {}
48
49/// Detect the zero-payload `skills/changed` invalidation notification.
50/// Allocation: none. Complexity: O(1).
51pub fn extract_skills_changed_notification(
52    envelope: &Envelope,
53) -> Option<SkillsChangedNotification> {
54    if envelope.kind == MsgKind::Notification
55        && envelope.method.as_deref() == Some(crate::runtime::rpc_contract::methods::SKILLS_CHANGED)
56    {
57        Some(SkillsChangedNotification {})
58    } else {
59        None
60    }
61}
62
63/// Parse one `command/exec/outputDelta` notification into a typed payload.
64/// Allocation: one params clone for serde deserialization. Complexity: O(n), n = delta payload size.
65pub fn extract_command_exec_output_delta(
66    envelope: &Envelope,
67) -> Option<CommandExecOutputDeltaNotification> {
68    if envelope.kind != MsgKind::Notification
69        || envelope.method.as_deref()
70            != Some(crate::runtime::rpc_contract::methods::COMMAND_EXEC_OUTPUT_DELTA)
71    {
72        return None;
73    }
74
75    let params = envelope.json.get("params")?.clone();
76    serde_json::from_value(params).ok()
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use serde_json::json;
83
84    #[test]
85    fn detects_skills_changed_notification() {
86        let envelope = Envelope {
87            seq: 1,
88            ts_millis: 0,
89            direction: Direction::Inbound,
90            kind: MsgKind::Notification,
91            rpc_id: None,
92            method: Some(Arc::from("skills/changed")),
93            thread_id: None,
94            turn_id: None,
95            item_id: None,
96            json: Arc::new(json!({"method":"skills/changed","params":{}})),
97        };
98
99        assert_eq!(
100            extract_skills_changed_notification(&envelope),
101            Some(SkillsChangedNotification {})
102        );
103    }
104
105    #[test]
106    fn rejects_non_skills_changed_notification() {
107        let envelope = Envelope {
108            seq: 1,
109            ts_millis: 0,
110            direction: Direction::Inbound,
111            kind: MsgKind::ServerRequest,
112            rpc_id: Some(JsonRpcId::Number(1)),
113            method: Some(Arc::from("skills/changed")),
114            thread_id: None,
115            turn_id: None,
116            item_id: None,
117            json: Arc::new(json!({"id":1,"method":"skills/changed","params":{}})),
118        };
119
120        assert_eq!(extract_skills_changed_notification(&envelope), None);
121    }
122
123    #[test]
124    fn detects_command_exec_output_delta_notification() {
125        let envelope = Envelope {
126            seq: 1,
127            ts_millis: 0,
128            direction: Direction::Inbound,
129            kind: MsgKind::Notification,
130            rpc_id: None,
131            method: Some(Arc::from("command/exec/outputDelta")),
132            thread_id: None,
133            turn_id: None,
134            item_id: None,
135            json: Arc::new(json!({
136                "method":"command/exec/outputDelta",
137                "params":{
138                    "processId":"proc-1",
139                    "stream":"stdout",
140                    "deltaBase64":"aGVsbG8=",
141                    "capReached":false
142                }
143            })),
144        };
145
146        let notification =
147            extract_command_exec_output_delta(&envelope).expect("typed output delta notification");
148        assert_eq!(notification.process_id, "proc-1");
149        assert_eq!(notification.delta_base64, "aGVsbG8=");
150    }
151}