1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::sync::Arc;
4
5use crate::runtime::api::CommandExecOutputDeltaNotification;
6use crate::runtime::rpc_contract::methods;
7
8#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
9#[serde(untagged)]
10pub enum JsonRpcId {
11 Number(u64),
12 Text(String),
13}
14
15#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "camelCase")]
17pub enum Direction {
18 Inbound,
19 Outbound,
20}
21
22#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq)]
23#[serde(rename_all = "camelCase")]
24pub enum MsgKind {
25 Response,
26 ServerRequest,
27 Notification,
28 Unknown,
29}
30
31#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
32#[serde(rename_all = "camelCase")]
33pub struct Envelope {
34 pub seq: u64,
35 pub ts_millis: i64,
36 pub direction: Direction,
37 pub kind: MsgKind,
38 pub rpc_id: Option<JsonRpcId>,
39 pub method: Option<Arc<str>>,
40 pub thread_id: Option<Arc<str>>,
41 pub turn_id: Option<Arc<str>>,
42 pub item_id: Option<Arc<str>>,
43 pub json: Arc<Value>,
44}
45
46#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "camelCase")]
48pub struct SkillsChangedNotification {}
49
50#[derive(Clone, Debug, PartialEq, Eq)]
51pub struct AgentMessageDeltaNotification {
52 pub thread_id: String,
53 pub turn_id: String,
54 pub item_id: Option<String>,
55 pub delta: String,
56}
57
58#[derive(Clone, Debug, PartialEq, Eq)]
59pub struct TurnCompletedNotification {
60 pub thread_id: String,
61 pub turn_id: String,
62 pub text: Option<String>,
63}
64
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct TurnFailedNotification {
67 pub thread_id: String,
68 pub turn_id: String,
69 pub code: Option<i64>,
70 pub message: Option<String>,
71}
72
73#[derive(Clone, Debug, PartialEq, Eq)]
74pub struct TurnInterruptedNotification {
75 pub thread_id: String,
76 pub turn_id: String,
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct TurnCancelledNotification {
81 pub thread_id: String,
82 pub turn_id: String,
83}
84
85pub fn extract_skills_changed_notification(
88 envelope: &Envelope,
89) -> Option<SkillsChangedNotification> {
90 if envelope.kind == MsgKind::Notification
91 && envelope.method.as_deref() == Some(crate::runtime::rpc_contract::methods::SKILLS_CHANGED)
92 {
93 Some(SkillsChangedNotification {})
94 } else {
95 None
96 }
97}
98
99fn is_notification(envelope: &Envelope, method: &str) -> bool {
101 envelope.kind == MsgKind::Notification && envelope.method.as_deref() == Some(method)
102}
103
104fn thread_turn_ids(envelope: &Envelope) -> Option<(String, String)> {
106 Some((
107 envelope.thread_id.as_deref()?.to_owned(),
108 envelope.turn_id.as_deref()?.to_owned(),
109 ))
110}
111
112pub fn extract_command_exec_output_delta(
115 envelope: &Envelope,
116) -> Option<CommandExecOutputDeltaNotification> {
117 if !is_notification(
118 envelope,
119 crate::runtime::rpc_contract::methods::COMMAND_EXEC_OUTPUT_DELTA,
120 ) {
121 return None;
122 }
123 let params = envelope.json.get("params")?.clone();
124 serde_json::from_value(params).ok()
125}
126
127pub fn extract_agent_message_delta(envelope: &Envelope) -> Option<AgentMessageDeltaNotification> {
130 if !is_notification(envelope, methods::ITEM_AGENT_MESSAGE_DELTA) {
131 return None;
132 }
133 let (thread_id, turn_id) = thread_turn_ids(envelope)?;
134 Some(AgentMessageDeltaNotification {
135 thread_id,
136 turn_id,
137 item_id: envelope.item_id.as_deref().map(ToOwned::to_owned),
138 delta: envelope
139 .json
140 .get("params")?
141 .get("delta")?
142 .as_str()?
143 .to_owned(),
144 })
145}
146
147pub fn extract_turn_completed(envelope: &Envelope) -> Option<TurnCompletedNotification> {
150 if !is_notification(envelope, methods::TURN_COMPLETED) {
151 return None;
152 }
153 let (thread_id, turn_id) = thread_turn_ids(envelope)?;
154 let params = envelope.json.get("params")?;
155 Some(TurnCompletedNotification {
156 thread_id,
157 turn_id,
158 text: extract_text_from_params(params),
159 })
160}
161
162pub fn extract_turn_failed(envelope: &Envelope) -> Option<TurnFailedNotification> {
165 if !is_notification(envelope, methods::TURN_FAILED) {
166 return None;
167 }
168 let (thread_id, turn_id) = thread_turn_ids(envelope)?;
169 let params = envelope.json.get("params")?;
170 let (code, message) = extract_error_message(params);
171 Some(TurnFailedNotification {
172 thread_id,
173 turn_id,
174 code,
175 message,
176 })
177}
178
179pub fn extract_turn_interrupted(envelope: &Envelope) -> Option<TurnInterruptedNotification> {
182 if !is_notification(envelope, methods::TURN_INTERRUPTED) {
183 return None;
184 }
185 let (thread_id, turn_id) = thread_turn_ids(envelope)?;
186 Some(TurnInterruptedNotification { thread_id, turn_id })
187}
188
189pub fn extract_turn_cancelled(envelope: &Envelope) -> Option<TurnCancelledNotification> {
192 if !is_notification(envelope, methods::TURN_CANCELLED) {
193 return None;
194 }
195 let (thread_id, turn_id) = thread_turn_ids(envelope)?;
196 Some(TurnCancelledNotification { thread_id, turn_id })
197}
198
199pub(crate) fn extract_text_from_params(params: &Value) -> Option<String> {
200 for ptr in ["/item/text", "/text", "/outputText", "/output/text"] {
201 if let Some(text) = params.pointer(ptr).and_then(Value::as_str) {
202 return Some(text.to_owned());
203 }
204 }
205
206 let content = params
207 .get("item")
208 .and_then(|item| item.get("content"))
209 .and_then(Value::as_array)?;
210 let mut joined = String::new();
211 for part in content {
212 if let Some(text) = part.get("text").and_then(Value::as_str) {
213 joined.push_str(text);
214 }
215 }
216 if joined.is_empty() {
217 None
218 } else {
219 Some(joined)
220 }
221}
222
223fn extract_error_message(root: &Value) -> (Option<i64>, Option<String>) {
224 let message = root
225 .get("message")
226 .and_then(Value::as_str)
227 .or_else(|| root.get("detail").and_then(Value::as_str))
228 .or_else(|| root.get("reason").and_then(Value::as_str))
229 .or_else(|| root.get("text").and_then(Value::as_str))
230 .or_else(|| {
231 root.get("error")
232 .and_then(|value| value.get("message"))
233 .and_then(Value::as_str)
234 })
235 .map(ToOwned::to_owned);
236 let code = root.get("code").and_then(Value::as_i64).or_else(|| {
237 root.get("error")
238 .and_then(|value| value.get("code"))
239 .and_then(Value::as_i64)
240 });
241 (code, message)
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use serde_json::json;
248
249 #[test]
250 fn detects_skills_changed_notification() {
251 let envelope = Envelope {
252 seq: 1,
253 ts_millis: 0,
254 direction: Direction::Inbound,
255 kind: MsgKind::Notification,
256 rpc_id: None,
257 method: Some(Arc::from("skills/changed")),
258 thread_id: None,
259 turn_id: None,
260 item_id: None,
261 json: Arc::new(json!({"method":"skills/changed","params":{}})),
262 };
263
264 assert_eq!(
265 extract_skills_changed_notification(&envelope),
266 Some(SkillsChangedNotification {})
267 );
268 }
269
270 #[test]
271 fn rejects_non_skills_changed_notification() {
272 let envelope = Envelope {
273 seq: 1,
274 ts_millis: 0,
275 direction: Direction::Inbound,
276 kind: MsgKind::ServerRequest,
277 rpc_id: Some(JsonRpcId::Number(1)),
278 method: Some(Arc::from("skills/changed")),
279 thread_id: None,
280 turn_id: None,
281 item_id: None,
282 json: Arc::new(json!({"id":1,"method":"skills/changed","params":{}})),
283 };
284
285 assert_eq!(extract_skills_changed_notification(&envelope), None);
286 }
287
288 #[test]
289 fn detects_command_exec_output_delta_notification() {
290 let envelope = Envelope {
291 seq: 1,
292 ts_millis: 0,
293 direction: Direction::Inbound,
294 kind: MsgKind::Notification,
295 rpc_id: None,
296 method: Some(Arc::from("command/exec/outputDelta")),
297 thread_id: None,
298 turn_id: None,
299 item_id: None,
300 json: Arc::new(json!({
301 "method":"command/exec/outputDelta",
302 "params":{
303 "processId":"proc-1",
304 "stream":"stdout",
305 "deltaBase64":"aGVsbG8=",
306 "capReached":false
307 }
308 })),
309 };
310
311 let notification =
312 extract_command_exec_output_delta(&envelope).expect("typed output delta notification");
313 assert_eq!(notification.process_id, "proc-1");
314 assert_eq!(notification.delta_base64, "aGVsbG8=");
315 }
316
317 #[test]
318 fn detects_agent_message_delta_notification() {
319 let envelope = Envelope {
320 seq: 1,
321 ts_millis: 0,
322 direction: Direction::Inbound,
323 kind: MsgKind::Notification,
324 rpc_id: None,
325 method: Some(Arc::from("item/agentMessage/delta")),
326 thread_id: Some(Arc::from("thr_1")),
327 turn_id: Some(Arc::from("turn_1")),
328 item_id: Some(Arc::from("item_1")),
329 json: Arc::new(json!({
330 "method":"item/agentMessage/delta",
331 "params":{"threadId":"thr_1","turnId":"turn_1","itemId":"item_1","delta":"hello"}
332 })),
333 };
334
335 let notification = extract_agent_message_delta(&envelope).expect("agent delta");
336 assert_eq!(notification.thread_id, "thr_1");
337 assert_eq!(notification.turn_id, "turn_1");
338 assert_eq!(notification.item_id.as_deref(), Some("item_1"));
339 assert_eq!(notification.delta, "hello");
340 }
341
342 #[test]
343 fn detects_turn_completed_notification() {
344 let envelope = Envelope {
345 seq: 1,
346 ts_millis: 0,
347 direction: Direction::Inbound,
348 kind: MsgKind::Notification,
349 rpc_id: None,
350 method: Some(Arc::from("turn/completed")),
351 thread_id: Some(Arc::from("thr_1")),
352 turn_id: Some(Arc::from("turn_1")),
353 item_id: None,
354 json: Arc::new(json!({
355 "method":"turn/completed",
356 "params":{"threadId":"thr_1","turnId":"turn_1","text":"done"}
357 })),
358 };
359
360 let notification = extract_turn_completed(&envelope).expect("turn completed");
361 assert_eq!(notification.thread_id, "thr_1");
362 assert_eq!(notification.turn_id, "turn_1");
363 assert_eq!(notification.text.as_deref(), Some("done"));
364 }
365
366 #[test]
367 fn detects_turn_failed_notification() {
368 let envelope = Envelope {
369 seq: 1,
370 ts_millis: 0,
371 direction: Direction::Inbound,
372 kind: MsgKind::Notification,
373 rpc_id: None,
374 method: Some(Arc::from("turn/failed")),
375 thread_id: Some(Arc::from("thr_1")),
376 turn_id: Some(Arc::from("turn_1")),
377 item_id: None,
378 json: Arc::new(json!({
379 "method":"turn/failed",
380 "params":{"threadId":"thr_1","turnId":"turn_1","error":{"code":429,"message":"rate limited"}}
381 })),
382 };
383
384 let notification = extract_turn_failed(&envelope).expect("turn failed");
385 assert_eq!(notification.thread_id, "thr_1");
386 assert_eq!(notification.turn_id, "turn_1");
387 assert_eq!(notification.code, Some(429));
388 assert_eq!(notification.message.as_deref(), Some("rate limited"));
389 }
390
391 #[test]
392 fn detects_turn_interrupted_notification() {
393 let envelope = Envelope {
394 seq: 1,
395 ts_millis: 0,
396 direction: Direction::Inbound,
397 kind: MsgKind::Notification,
398 rpc_id: None,
399 method: Some(Arc::from("turn/interrupted")),
400 thread_id: Some(Arc::from("thr_1")),
401 turn_id: Some(Arc::from("turn_1")),
402 item_id: None,
403 json: Arc::new(json!({
404 "method":"turn/interrupted",
405 "params":{"threadId":"thr_1","turnId":"turn_1"}
406 })),
407 };
408
409 let notification = extract_turn_interrupted(&envelope).expect("turn interrupted");
410 assert_eq!(notification.thread_id, "thr_1");
411 assert_eq!(notification.turn_id, "turn_1");
412 }
413
414 #[test]
415 fn detects_turn_cancelled_notification() {
416 let envelope = Envelope {
417 seq: 1,
418 ts_millis: 0,
419 direction: Direction::Inbound,
420 kind: MsgKind::Notification,
421 rpc_id: None,
422 method: Some(Arc::from("turn/cancelled")),
423 thread_id: Some(Arc::from("thr_1")),
424 turn_id: Some(Arc::from("turn_1")),
425 item_id: None,
426 json: Arc::new(json!({
427 "method":"turn/cancelled",
428 "params":{"threadId":"thr_1","turnId":"turn_1"}
429 })),
430 };
431
432 let notification = extract_turn_cancelled(&envelope).expect("turn cancelled");
433 assert_eq!(notification.thread_id, "thr_1");
434 assert_eq!(notification.turn_id, "turn_1");
435 }
436}