Skip to main content

codex_codes/
messages.rs

1//! Typed dispatch for app-server notifications and server-to-client requests.
2//!
3//! The Codex app-server speaks JSON-RPC where every message carries a
4//! `method` discriminant alongside a free-form `params` blob. This module
5//! lifts that loose envelope into closed enums — [`Notification`] for
6//! server-initiated notifications and [`ServerRequest`] for server-initiated
7//! requests (the approval flow). Each variant wraps a typed param struct
8//! from [`crate::protocol`].
9//!
10//! The pattern mirrors the [`ContentBlock`] dispatch in the sibling
11//! `claude-codes` crate: hand-written [`Serialize`]/[`Deserialize`] impls
12//! inspect the discriminant, route known cases through `serde_json::from_value`
13//! into the typed struct, and route unknown methods into an `Unknown`
14//! variant — preserving the raw payload for forward compatibility with
15//! future codex versions.
16//!
17//! ## Typing contract
18//!
19//! - Unknown methods route to [`Notification::Unknown`] / [`ServerRequest::Unknown`]
20//!   without error. Encountering one in production typically means the
21//!   installed Codex CLI is newer than the bindings.
22//! - Known methods whose payload fails to deserialize **do** cause an error.
23//!   If you see one, the typed binding in [`crate::protocol`] is out of
24//!   sync with the wire format and needs to be updated.
25
26use crate::jsonrpc::RequestId;
27use crate::protocol::{
28    methods, AccountRateLimitsUpdatedNotification, AgentMessageDeltaNotification,
29    CmdOutputDeltaNotification, CommandExecutionApprovalParams, ErrorNotification,
30    FileChangeApprovalParams, FileChangeOutputDeltaNotification, ItemCompletedNotification,
31    ItemStartedNotification, McpServerStartupStatusUpdatedNotification, ReasoningDeltaNotification,
32    RemoteControlStatusChangedNotification, ThreadStartedNotification,
33    ThreadStatusChangedNotification, ThreadTokenUsageUpdatedNotification,
34    TurnCompletedNotification, TurnStartedNotification,
35};
36use serde::{Deserialize, Deserializer, Serialize, Serializer};
37use serde_json::Value;
38
39/// A server-to-client notification.
40///
41/// Each variant maps to a single `method` string on the wire. The `Unknown`
42/// variant captures methods this crate version doesn't model yet, preserving
43/// the raw payload for inspection.
44#[derive(Debug, Clone)]
45pub enum Notification {
46    /// `thread/started`
47    ThreadStarted(ThreadStartedNotification),
48    /// `thread/status/changed`
49    ThreadStatusChanged(ThreadStatusChangedNotification),
50    /// `thread/tokenUsage/updated`
51    ThreadTokenUsageUpdated(ThreadTokenUsageUpdatedNotification),
52    /// `turn/started`
53    TurnStarted(TurnStartedNotification),
54    /// `turn/completed`
55    TurnCompleted(TurnCompletedNotification),
56    /// `item/started`
57    ItemStarted(ItemStartedNotification),
58    /// `item/completed`
59    ItemCompleted(ItemCompletedNotification),
60    /// `item/agentMessage/delta`
61    AgentMessageDelta(AgentMessageDeltaNotification),
62    /// `item/commandExecution/outputDelta`
63    CmdOutputDelta(CmdOutputDeltaNotification),
64    /// `item/fileChange/outputDelta`
65    FileChangeOutputDelta(FileChangeOutputDeltaNotification),
66    /// `item/reasoning/summaryTextDelta`
67    ReasoningDelta(ReasoningDeltaNotification),
68    /// `error`
69    Error(ErrorNotification),
70    /// `account/rateLimits/updated`
71    AccountRateLimitsUpdated(AccountRateLimitsUpdatedNotification),
72    /// `mcpServer/startupStatus/updated`
73    McpServerStartupStatusUpdated(McpServerStartupStatusUpdatedNotification),
74    /// `remoteControl/status/changed`
75    RemoteControlStatusChanged(RemoteControlStatusChangedNotification),
76    /// A method this crate version does not yet model. The raw params are
77    /// preserved for caller inspection. Encountering this typically means
78    /// the installed codex CLI is newer than the bindings.
79    Unknown {
80        method: String,
81        params: Option<Value>,
82    },
83}
84
85impl Notification {
86    /// Return the wire `method` string for this notification.
87    pub fn method(&self) -> &str {
88        match self {
89            Self::ThreadStarted(_) => methods::THREAD_STARTED,
90            Self::ThreadStatusChanged(_) => methods::THREAD_STATUS_CHANGED,
91            Self::ThreadTokenUsageUpdated(_) => methods::THREAD_TOKEN_USAGE_UPDATED,
92            Self::TurnStarted(_) => methods::TURN_STARTED,
93            Self::TurnCompleted(_) => methods::TURN_COMPLETED,
94            Self::ItemStarted(_) => methods::ITEM_STARTED,
95            Self::ItemCompleted(_) => methods::ITEM_COMPLETED,
96            Self::AgentMessageDelta(_) => methods::AGENT_MESSAGE_DELTA,
97            Self::CmdOutputDelta(_) => methods::CMD_OUTPUT_DELTA,
98            Self::FileChangeOutputDelta(_) => methods::FILE_CHANGE_OUTPUT_DELTA,
99            Self::ReasoningDelta(_) => methods::REASONING_DELTA,
100            Self::Error(_) => methods::ERROR,
101            Self::AccountRateLimitsUpdated(_) => methods::ACCOUNT_RATE_LIMITS_UPDATED,
102            Self::McpServerStartupStatusUpdated(_) => methods::MCP_SERVER_STARTUP_STATUS_UPDATED,
103            Self::RemoteControlStatusChanged(_) => methods::REMOTE_CONTROL_STATUS_CHANGED,
104            Self::Unknown { method, .. } => method,
105        }
106    }
107
108    /// `true` if this notification's method isn't modeled by the crate.
109    pub fn is_unknown(&self) -> bool {
110        matches!(self, Self::Unknown { .. })
111    }
112
113    /// Construct a [`Notification`] from a `method` + `params` envelope.
114    ///
115    /// Returns an error if `method` is recognized but `params` doesn't
116    /// deserialize into the typed struct. Unknown methods route to
117    /// [`Notification::Unknown`] without error.
118    pub fn from_envelope(method: &str, params: Option<Value>) -> Result<Self, serde_json::Error> {
119        let params_value = params.clone().unwrap_or(Value::Null);
120        match method {
121            methods::THREAD_STARTED => {
122                serde_json::from_value(params_value).map(Self::ThreadStarted)
123            }
124            methods::THREAD_STATUS_CHANGED => {
125                serde_json::from_value(params_value).map(Self::ThreadStatusChanged)
126            }
127            methods::THREAD_TOKEN_USAGE_UPDATED => {
128                serde_json::from_value(params_value).map(Self::ThreadTokenUsageUpdated)
129            }
130            methods::TURN_STARTED => serde_json::from_value(params_value).map(Self::TurnStarted),
131            methods::TURN_COMPLETED => {
132                serde_json::from_value(params_value).map(Self::TurnCompleted)
133            }
134            methods::ITEM_STARTED => serde_json::from_value(params_value).map(Self::ItemStarted),
135            methods::ITEM_COMPLETED => {
136                serde_json::from_value(params_value).map(Self::ItemCompleted)
137            }
138            methods::AGENT_MESSAGE_DELTA => {
139                serde_json::from_value(params_value).map(Self::AgentMessageDelta)
140            }
141            methods::CMD_OUTPUT_DELTA => {
142                serde_json::from_value(params_value).map(Self::CmdOutputDelta)
143            }
144            methods::FILE_CHANGE_OUTPUT_DELTA => {
145                serde_json::from_value(params_value).map(Self::FileChangeOutputDelta)
146            }
147            methods::REASONING_DELTA => {
148                serde_json::from_value(params_value).map(Self::ReasoningDelta)
149            }
150            methods::ERROR => serde_json::from_value(params_value).map(Self::Error),
151            methods::ACCOUNT_RATE_LIMITS_UPDATED => {
152                serde_json::from_value(params_value).map(Self::AccountRateLimitsUpdated)
153            }
154            methods::MCP_SERVER_STARTUP_STATUS_UPDATED => {
155                serde_json::from_value(params_value).map(Self::McpServerStartupStatusUpdated)
156            }
157            methods::REMOTE_CONTROL_STATUS_CHANGED => {
158                serde_json::from_value(params_value).map(Self::RemoteControlStatusChanged)
159            }
160            _ => Ok(Self::Unknown {
161                method: method.to_string(),
162                params,
163            }),
164        }
165    }
166
167    /// Decompose this notification back into a `(method, params)` pair.
168    pub fn into_envelope(self) -> Result<(String, Option<Value>), serde_json::Error> {
169        fn pack<T: Serialize>(
170            method: &str,
171            v: &T,
172        ) -> Result<(String, Option<Value>), serde_json::Error> {
173            Ok((method.to_string(), Some(serde_json::to_value(v)?)))
174        }
175        match &self {
176            Self::ThreadStarted(v) => pack(methods::THREAD_STARTED, v),
177            Self::ThreadStatusChanged(v) => pack(methods::THREAD_STATUS_CHANGED, v),
178            Self::ThreadTokenUsageUpdated(v) => pack(methods::THREAD_TOKEN_USAGE_UPDATED, v),
179            Self::TurnStarted(v) => pack(methods::TURN_STARTED, v),
180            Self::TurnCompleted(v) => pack(methods::TURN_COMPLETED, v),
181            Self::ItemStarted(v) => pack(methods::ITEM_STARTED, v),
182            Self::ItemCompleted(v) => pack(methods::ITEM_COMPLETED, v),
183            Self::AgentMessageDelta(v) => pack(methods::AGENT_MESSAGE_DELTA, v),
184            Self::CmdOutputDelta(v) => pack(methods::CMD_OUTPUT_DELTA, v),
185            Self::FileChangeOutputDelta(v) => pack(methods::FILE_CHANGE_OUTPUT_DELTA, v),
186            Self::ReasoningDelta(v) => pack(methods::REASONING_DELTA, v),
187            Self::Error(v) => pack(methods::ERROR, v),
188            Self::AccountRateLimitsUpdated(v) => pack(methods::ACCOUNT_RATE_LIMITS_UPDATED, v),
189            Self::McpServerStartupStatusUpdated(v) => {
190                pack(methods::MCP_SERVER_STARTUP_STATUS_UPDATED, v)
191            }
192            Self::RemoteControlStatusChanged(v) => pack(methods::REMOTE_CONTROL_STATUS_CHANGED, v),
193            Self::Unknown { method, params } => Ok((method.clone(), params.clone())),
194        }
195    }
196}
197
198impl Serialize for Notification {
199    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
200        let (method, params) = self
201            .clone()
202            .into_envelope()
203            .map_err(serde::ser::Error::custom)?;
204        let mut env = serde_json::Map::new();
205        env.insert("method".to_string(), Value::String(method));
206        if let Some(p) = params {
207            env.insert("params".to_string(), p);
208        }
209        Value::Object(env).serialize(serializer)
210    }
211}
212
213impl<'de> Deserialize<'de> for Notification {
214    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
215        let value = Value::deserialize(deserializer)?;
216        let method = value
217            .get("method")
218            .and_then(|v| v.as_str())
219            .ok_or_else(|| serde::de::Error::missing_field("method"))?
220            .to_string();
221        let params = value.get("params").cloned();
222        Self::from_envelope(&method, params).map_err(serde::de::Error::custom)
223    }
224}
225
226/// A server-to-client request that requires a response (approval flow).
227///
228/// The wire envelope carries an `id` for response correlation; that `id` is
229/// held alongside this enum in [`ServerMessage::Request`] rather than embedded
230/// inside the variant, since responding doesn't depend on which approval-type
231/// was requested.
232#[derive(Debug, Clone)]
233pub enum ServerRequest {
234    /// `item/commandExecution/requestApproval`
235    CmdExecApproval(CommandExecutionApprovalParams),
236    /// `item/fileChange/requestApproval`
237    FileChangeApproval(FileChangeApprovalParams),
238    /// A request method this crate version does not yet model.
239    Unknown {
240        method: String,
241        params: Option<Value>,
242    },
243}
244
245impl ServerRequest {
246    /// Return the wire `method` string for this request.
247    pub fn method(&self) -> &str {
248        match self {
249            Self::CmdExecApproval(_) => methods::CMD_EXEC_APPROVAL,
250            Self::FileChangeApproval(_) => methods::FILE_CHANGE_APPROVAL,
251            Self::Unknown { method, .. } => method,
252        }
253    }
254
255    /// `true` if this request's method isn't modeled by the crate.
256    pub fn is_unknown(&self) -> bool {
257        matches!(self, Self::Unknown { .. })
258    }
259
260    /// Construct a [`ServerRequest`] from a `method` + `params` envelope.
261    pub fn from_envelope(method: &str, params: Option<Value>) -> Result<Self, serde_json::Error> {
262        let params_value = params.clone().unwrap_or(Value::Null);
263        match method {
264            methods::CMD_EXEC_APPROVAL => {
265                serde_json::from_value(params_value).map(Self::CmdExecApproval)
266            }
267            methods::FILE_CHANGE_APPROVAL => {
268                serde_json::from_value(params_value).map(Self::FileChangeApproval)
269            }
270            _ => Ok(Self::Unknown {
271                method: method.to_string(),
272                params,
273            }),
274        }
275    }
276}
277
278/// A message coming from the app-server.
279///
280/// Replaces the previous loose `{ method, params }` shape with typed enums.
281/// Match on the outer variant first to distinguish notifications (no response)
282/// from requests (need [`crate::AsyncClient::respond`] /
283/// [`crate::SyncClient::respond`]).
284#[derive(Debug, Clone)]
285pub enum ServerMessage {
286    /// A notification — no response required.
287    Notification(Notification),
288    /// A request — call `respond(id, ...)` on the client with the matching id.
289    Request {
290        id: RequestId,
291        request: ServerRequest,
292    },
293}
294
295impl ServerMessage {
296    /// `true` if this message is an unmodeled method (notification or request).
297    pub fn is_unknown(&self) -> bool {
298        match self {
299            Self::Notification(n) => n.is_unknown(),
300            Self::Request { request, .. } => request.is_unknown(),
301        }
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn test_notification_unknown_method_routes_to_unknown_variant() {
311        let n = Notification::from_envelope("foo/bar", Some(serde_json::json!({"x": 1})))
312            .expect("unknown methods do not error");
313        match n {
314            Notification::Unknown { method, params } => {
315                assert_eq!(method, "foo/bar");
316                assert_eq!(params, Some(serde_json::json!({"x": 1})));
317            }
318            other => panic!("expected Unknown, got {:?}", other),
319        }
320    }
321
322    #[test]
323    fn test_notification_known_method_with_bad_params_errors() {
324        // thread/started expects a `thread` field — wrong shape should error.
325        let err = Notification::from_envelope("thread/started", Some(serde_json::json!({})));
326        assert!(err.is_err());
327    }
328
329    #[test]
330    fn test_notification_round_trip_envelope() {
331        let wire = serde_json::json!({
332            "method": "item/agentMessage/delta",
333            "params": {"threadId": "t1", "itemId": "i1", "delta": "hi"},
334        });
335        let n: Notification = serde_json::from_value(wire.clone()).unwrap();
336        assert!(matches!(n, Notification::AgentMessageDelta(_)));
337        let back = serde_json::to_value(&n).unwrap();
338        assert_eq!(back, wire);
339    }
340}