Skip to main content

agent_core/core/
rpc_dispatch.rs

1//! Pure helper functions for the `synaps rpc` command dispatcher.
2//!
3//! These are extracted from `cmd::rpc` so they can be unit-tested via
4//! `cargo test --lib` without hitting the binary-crate's TUI dependencies.
5//!
6//! See `docs/rpc-protocol.md` and `synaps-bridge.SPEC.md §4` for the wire
7//! protocol specification these functions implement.
8
9use crate::core::rpc_protocol::{
10    AssistantEvent, RpcAttachment, RpcCommand, RpcEvent, TurnUsage,
11};
12use crate::core::stream_types::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
13
14// ─── Frame parsing ────────────────────────────────────────────────────────────
15
16/// Maximum allowed inbound frame size in bytes (1 MiB).
17pub const MAX_FRAME_BYTES: usize = 1024 * 1024;
18
19/// Parse a raw UTF-8 line into an [`RpcCommand`], enforcing the 1 MiB frame limit.
20///
21/// # Returns
22/// - `Ok(RpcCommand)` on success.
23/// - `Err(RpcEvent::Error { id: None, … })` on oversize or malformed input —
24///   the caller should emit the error event and **continue** (do not exit).
25pub fn parse_frame(line: &str, max_bytes: usize) -> Result<RpcCommand, RpcEvent> {
26    if line.len() > max_bytes {
27        return Err(RpcEvent::Error {
28            id: None,
29            message: "frame exceeds 1 MiB limit".to_string(),
30        });
31    }
32    serde_json::from_str::<RpcCommand>(line).map_err(|e| RpcEvent::Error {
33        id: None,
34        message: e.to_string(),
35    })
36}
37
38// ─── StreamEvent → RpcEvent mapping ──────────────────────────────────────────
39
40/// Map a single [`StreamEvent`] to an optional [`RpcEvent`].
41///
42/// Returns `None` for events that are intentionally dropped on the wire:
43/// - `LlmEvent::ToolResultDelta` — wire format has no streaming-result variant;
44///   the final `ToolResult` carries the complete text.
45/// - `AgentEvent::SteeringDelivered` — internal hook signal, not exposed.
46///
47/// `Session(*)` variants also return `None` — they carry session bookkeeping
48/// data (message history, usage counters, completion/error signals) that the
49/// streaming loop in `cmd::rpc` must handle directly with mutable access to
50/// [`RpcState`].
51pub fn map_stream_event(ev: &StreamEvent) -> Option<RpcEvent> {
52    match ev {
53        StreamEvent::Llm(LlmEvent::Thinking(s)) => Some(RpcEvent::MessageUpdate {
54            event: AssistantEvent::ThinkingDelta { delta: s.clone() },
55        }),
56        StreamEvent::Llm(LlmEvent::Text(s)) => Some(RpcEvent::MessageUpdate {
57            event: AssistantEvent::TextDelta { delta: s.clone() },
58        }),
59        StreamEvent::Llm(LlmEvent::ToolUseStart { tool_name, tool_id }) => {
60            Some(RpcEvent::MessageUpdate {
61                event: AssistantEvent::ToolcallStart {
62                    tool_id: tool_id.clone(),
63                    tool_name: tool_name.clone(),
64                },
65            })
66        }
67        StreamEvent::Llm(LlmEvent::ToolUseDelta { tool_id, delta }) => {
68            Some(RpcEvent::MessageUpdate {
69                event: AssistantEvent::ToolcallInputDelta {
70                    tool_id: tool_id.clone(),
71                    delta: delta.clone(),
72                },
73            })
74        }
75        // tool_name is intentionally dropped — already sent in ToolcallStart
76        StreamEvent::Llm(LlmEvent::ToolUse { tool_id, input, .. }) => {
77            Some(RpcEvent::MessageUpdate {
78                event: AssistantEvent::ToolcallInput {
79                    tool_id: tool_id.clone(),
80                    input: input.clone(),
81                },
82            })
83        }
84        StreamEvent::Llm(LlmEvent::ToolResult { tool_id, result }) => {
85            Some(RpcEvent::MessageUpdate {
86                event: AssistantEvent::ToolcallResult {
87                    tool_id: tool_id.clone(),
88                    result: result.clone(),
89                },
90            })
91        }
92        // Drop — wire format has no streaming-result variant; final ToolResult carries full text
93        StreamEvent::Llm(LlmEvent::ToolResultDelta { .. }) => None,
94
95        StreamEvent::Agent(AgentEvent::SubagentStart {
96            subagent_id,
97            agent_name,
98            task_preview,
99        }) => Some(RpcEvent::SubagentStart {
100            subagent_id: *subagent_id,
101            agent_name: agent_name.clone(),
102            task_preview: task_preview.clone(),
103        }),
104        StreamEvent::Agent(AgentEvent::SubagentUpdate {
105            subagent_id,
106            agent_name,
107            status,
108        }) => Some(RpcEvent::SubagentUpdate {
109            subagent_id: *subagent_id,
110            agent_name: agent_name.clone(),
111            status: status.clone(),
112        }),
113        StreamEvent::Agent(AgentEvent::SubagentDone {
114            subagent_id,
115            agent_name,
116            result_preview,
117            duration_secs,
118        }) => Some(RpcEvent::SubagentDone {
119            subagent_id: *subagent_id,
120            agent_name: agent_name.clone(),
121            result_preview: result_preview.clone(),
122            duration_secs: *duration_secs,
123        }),
124        // Drop — internal hook signal, not part of wire format
125        StreamEvent::Agent(AgentEvent::SteeringDelivered { .. }) => None,
126
127        // Session bookkeeping events are handled by the streaming loop in cmd::rpc
128        // with direct mutable access to RpcState; they are never forwarded as-is.
129        StreamEvent::Session(_) => None,
130    }
131}
132
133// ─── Usage accumulator ────────────────────────────────────────────────────────
134
135/// Option-accumulate for the cache-TTL split buckets: stay `None` until a
136/// split value arrives, then sum. `None` means "split never reported" —
137/// distinct from an explicit zero. Shared by every Usage accumulator that
138/// tracks the 5m/1h breakdown (RPC dispatch, subagent loops).
139pub fn merge_split(acc: &mut Option<u64>, v: Option<u64>) {
140    if let Some(v) = v {
141        *acc = Some(acc.unwrap_or(0) + v);
142    }
143}
144
145/// Accumulate a [`SessionEvent::Usage`] payload into a [`TurnUsage`] counter.
146///
147/// Non-Usage session events are silently ignored so callers can pass any
148/// [`SessionEvent`] without pre-filtering.  The `model` field is set from the
149/// first Usage event seen and never overwritten.
150pub fn accumulate_usage(acc: &mut TurnUsage, event: &SessionEvent) {
151    if let SessionEvent::Usage {
152        input_tokens,
153        output_tokens,
154        cache_read_input_tokens,
155        cache_creation_input_tokens,
156        cache_creation_5m,
157        cache_creation_1h,
158        model,
159    } = event
160    {
161        acc.input_tokens += input_tokens;
162        acc.output_tokens += output_tokens;
163        acc.cache_read_input_tokens += cache_read_input_tokens;
164        acc.cache_creation_input_tokens += cache_creation_input_tokens;
165        // Option-accumulate: stay None until a split arrives, then sum.
166        merge_split(&mut acc.cache_creation_5m, *cache_creation_5m);
167        merge_split(&mut acc.cache_creation_1h, *cache_creation_1h);
168        if acc.model.is_none() {
169            acc.model = model.clone();
170        }
171    }
172}
173
174// ─── User-content builder ─────────────────────────────────────────────────────
175
176/// Build the user message string to push into `api_messages`.
177///
178/// When attachments are present (v0) a human-readable note listing the file
179/// paths is prepended.  File bytes are **not** read — Task 10 handles that.
180fn quote_path(p: &str) -> String {
181    let escaped = p.replace('\\', "\\\\").replace('"', "\\\"");
182    format!("\"{escaped}\"")
183}
184
185pub fn build_user_content(message: &str, attachments: &[RpcAttachment]) -> String {
186    if attachments.is_empty() {
187        return message.to_string();
188    }
189    let parts: Vec<String> = attachments.iter().map(|a| quote_path(&a.path)).collect();
190    format!("[user attached files: {}]\n{}", parts.join(", "), message)
191}
192
193// ─── tools_list helper ───────────────────────────────────────────────────────
194
195/// Build the `tools_list` response body from a `ToolRegistry` schema snapshot.
196///
197/// The schema entries produced by [`ToolRegistry::tools_schema`] already have
198/// the shape `{name, description, input_schema}` that the bridge Phase 8
199/// `SynapsRpcSessionRouter.listTools()` expects. This function wraps them in
200/// the top-level `{ ok: true, tools: [...] }` envelope that the bridge
201/// validates (router.js line 112).
202pub fn build_tools_list_body(tools_schema: &[serde_json::Value]) -> serde_json::Value {
203    serde_json::json!({
204        "ok": true,
205        "tools": tools_schema,
206    })
207}
208
209// ─── Tests ────────────────────────────────────────────────────────────────────
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::core::rpc_protocol::{AssistantEvent, RpcCommand, RpcEvent, RpcAttachment, TurnUsage};
215    use crate::core::stream_types::{AgentEvent, LlmEvent, SessionEvent, StreamEvent};
216    use serde_json::json;
217
218    // ── parse_frame ──────────────────────────────────────────────────────────
219
220    #[test]
221    fn parse_frame_valid_prompt() {
222        let line = r#"{"type":"prompt","id":"abc","message":"hello"}"#;
223        let result = parse_frame(line, MAX_FRAME_BYTES);
224        assert!(result.is_ok(), "should parse valid prompt frame");
225        match result.unwrap() {
226            RpcCommand::Prompt { id, message, attachments } => {
227                assert_eq!(id, "abc");
228                assert_eq!(message, "hello");
229                assert!(attachments.is_empty());
230            }
231            other => panic!("unexpected variant: {:?}", other),
232        }
233    }
234
235    #[test]
236    fn parse_frame_valid_shutdown() {
237        let line = r#"{"type":"shutdown"}"#;
238        let result = parse_frame(line, MAX_FRAME_BYTES);
239        assert!(result.is_ok());
240        assert!(matches!(result.unwrap(), RpcCommand::Shutdown));
241    }
242
243    #[test]
244    fn parse_frame_valid_follow_up() {
245        let line = r#"{"type":"follow_up","id":"f1","message":"and then?"}"#;
246        let result = parse_frame(line, MAX_FRAME_BYTES);
247        match result.unwrap() {
248            RpcCommand::FollowUp { id, message } => {
249                assert_eq!(id, "f1");
250                assert_eq!(message, "and then?");
251            }
252            other => panic!("unexpected: {:?}", other),
253        }
254    }
255
256    #[test]
257    fn parse_frame_valid_abort() {
258        let line = r#"{"type":"abort","id":"x"}"#;
259        assert!(matches!(parse_frame(line, MAX_FRAME_BYTES).unwrap(), RpcCommand::Abort { .. }));
260    }
261
262    #[test]
263    fn parse_frame_malformed_json() {
264        let line = "not json at all";
265        let result = parse_frame(line, MAX_FRAME_BYTES);
266        assert!(result.is_err());
267        match result.unwrap_err() {
268            RpcEvent::Error { id, message } => {
269                assert!(id.is_none(), "malformed-JSON error must have id=None");
270                assert!(!message.is_empty(), "error message must be non-empty");
271            }
272            other => panic!("unexpected event: {:?}", other),
273        }
274    }
275
276    #[test]
277    fn parse_frame_valid_json_unknown_type() {
278        // Unknown `type` tags should be a deserialisation error (serde enum).
279        let line = r#"{"type":"does_not_exist","id":"1"}"#;
280        let result = parse_frame(line, MAX_FRAME_BYTES);
281        assert!(result.is_err(), "unknown type should fail to deserialise");
282    }
283
284    #[test]
285    fn parse_frame_oversize() {
286        let oversize = "x".repeat(MAX_FRAME_BYTES + 1);
287        let result = parse_frame(&oversize, MAX_FRAME_BYTES);
288        assert!(result.is_err());
289        match result.unwrap_err() {
290            RpcEvent::Error { id, message } => {
291                assert!(id.is_none());
292                assert!(
293                    message.contains("1 MiB"),
294                    "expected '1 MiB' in message, got: {message}"
295                );
296            }
297            other => panic!("unexpected event: {:?}", other),
298        }
299    }
300
301    #[test]
302    fn parse_frame_exactly_at_limit_valid_json() {
303        // A well-formed frame at exactly the limit must not trigger the size error.
304        let line = r#"{"type":"get_state","id":"x"}"#;
305        assert!(line.len() <= MAX_FRAME_BYTES);
306        let result = parse_frame(line, MAX_FRAME_BYTES);
307        assert!(result.is_ok());
308    }
309
310    #[test]
311    fn parse_frame_custom_small_limit() {
312        // Oversize relative to a custom limit.
313        let line = r#"{"type":"shutdown"}"#; // 19 bytes
314        let result = parse_frame(line, 5);   // limit = 5
315        assert!(result.is_err());
316        match result.unwrap_err() {
317            RpcEvent::Error { id, .. } => assert!(id.is_none()),
318            other => panic!("unexpected: {:?}", other),
319        }
320    }
321
322    // ── map_stream_event ─────────────────────────────────────────────────────
323
324    #[test]
325    fn map_llm_thinking() {
326        let ev = StreamEvent::Llm(LlmEvent::Thinking("hmm".to_string()));
327        let rpc = map_stream_event(&ev).expect("Thinking must produce an event");
328        match rpc {
329            RpcEvent::MessageUpdate {
330                event: AssistantEvent::ThinkingDelta { delta },
331            } => assert_eq!(delta, "hmm"),
332            other => panic!("unexpected: {:?}", other),
333        }
334    }
335
336    #[test]
337    fn map_llm_text() {
338        let ev = StreamEvent::Llm(LlmEvent::Text("hi".to_string()));
339        let rpc = map_stream_event(&ev).expect("Text must produce an event");
340        match rpc {
341            RpcEvent::MessageUpdate {
342                event: AssistantEvent::TextDelta { delta },
343            } => assert_eq!(delta, "hi"),
344            other => panic!("unexpected: {:?}", other),
345        }
346    }
347
348    #[test]
349    fn map_llm_tool_use_start() {
350        let ev = StreamEvent::Llm(LlmEvent::ToolUseStart {
351            tool_name: "bash".to_string(),
352            tool_id: "tid1".to_string(),
353        });
354        let rpc = map_stream_event(&ev).expect("ToolUseStart must produce an event");
355        match rpc {
356            RpcEvent::MessageUpdate {
357                event: AssistantEvent::ToolcallStart { tool_id, tool_name },
358            } => {
359                assert_eq!(tool_id, "tid1");
360                assert_eq!(tool_name, "bash");
361            }
362            other => panic!("unexpected: {:?}", other),
363        }
364    }
365
366    #[test]
367    fn map_llm_tool_use_delta() {
368        let ev = StreamEvent::Llm(LlmEvent::ToolUseDelta {
369            tool_id: "tid1".to_string(),
370            delta: r#"{"cmd":"#.to_string(),
371        });
372        let rpc = map_stream_event(&ev).expect("ToolUseDelta must produce an event");
373        match rpc {
374            RpcEvent::MessageUpdate {
375                event: AssistantEvent::ToolcallInputDelta { tool_id, delta },
376            } => {
377                assert_eq!(tool_id, "tid1");
378                assert_eq!(delta, r#"{"cmd":"#);
379            }
380            other => panic!("unexpected: {:?}", other),
381        }
382    }
383
384    #[test]
385    fn map_llm_tool_use_final_drops_tool_name() {
386        let ev = StreamEvent::Llm(LlmEvent::ToolUse {
387            tool_name: "bash".to_string(), // must be dropped per spec
388            tool_id: "tid1".to_string(),
389            input: json!({"cmd": "ls"}),
390        });
391        let rpc = map_stream_event(&ev).expect("ToolUse must produce an event");
392        match rpc {
393            RpcEvent::MessageUpdate {
394                event: AssistantEvent::ToolcallInput { tool_id, input },
395            } => {
396                assert_eq!(tool_id, "tid1");
397                assert_eq!(input, json!({"cmd": "ls"}));
398                // tool_name intentionally absent from ToolcallInput
399            }
400            other => panic!("unexpected: {:?}", other),
401        }
402    }
403
404    #[test]
405    fn map_llm_tool_result() {
406        let ev = StreamEvent::Llm(LlmEvent::ToolResult {
407            tool_id: "tid1".to_string(),
408            result: "output here".to_string(),
409        });
410        let rpc = map_stream_event(&ev).expect("ToolResult must produce an event");
411        match rpc {
412            RpcEvent::MessageUpdate {
413                event: AssistantEvent::ToolcallResult { tool_id, result },
414            } => {
415                assert_eq!(tool_id, "tid1");
416                assert_eq!(result, "output here");
417            }
418            other => panic!("unexpected: {:?}", other),
419        }
420    }
421
422    #[test]
423    fn map_llm_tool_result_delta_is_dropped() {
424        let ev = StreamEvent::Llm(LlmEvent::ToolResultDelta {
425            tool_id: "tid1".to_string(),
426            delta: "partial".to_string(),
427        });
428        assert!(
429            map_stream_event(&ev).is_none(),
430            "ToolResultDelta must be dropped — wire format has no streaming-result variant"
431        );
432    }
433
434    #[test]
435    fn map_agent_subagent_start() {
436        let ev = StreamEvent::Agent(AgentEvent::SubagentStart {
437            subagent_id: 7,
438            agent_name: "worker".to_string(),
439            task_preview: "do thing".to_string(),
440        });
441        let rpc = map_stream_event(&ev).expect("SubagentStart must produce an event");
442        match rpc {
443            RpcEvent::SubagentStart { subagent_id, agent_name, task_preview } => {
444                assert_eq!(subagent_id, 7);
445                assert_eq!(agent_name, "worker");
446                assert_eq!(task_preview, "do thing");
447            }
448            other => panic!("unexpected: {:?}", other),
449        }
450    }
451
452    #[test]
453    fn map_agent_subagent_update() {
454        let ev = StreamEvent::Agent(AgentEvent::SubagentUpdate {
455            subagent_id: 7,
456            agent_name: "worker".to_string(),
457            status: "running".to_string(),
458        });
459        let rpc = map_stream_event(&ev).expect("SubagentUpdate must produce an event");
460        match rpc {
461            RpcEvent::SubagentUpdate { subagent_id, agent_name, status } => {
462                assert_eq!(subagent_id, 7);
463                assert_eq!(agent_name, "worker");
464                assert_eq!(status, "running");
465            }
466            other => panic!("unexpected: {:?}", other),
467        }
468    }
469
470    #[test]
471    fn map_agent_subagent_done() {
472        let ev = StreamEvent::Agent(AgentEvent::SubagentDone {
473            subagent_id: 7,
474            agent_name: "worker".to_string(),
475            result_preview: "done!".to_string(),
476            duration_secs: 1.5,
477        });
478        let rpc = map_stream_event(&ev).expect("SubagentDone must produce an event");
479        match rpc {
480            RpcEvent::SubagentDone {
481                subagent_id,
482                agent_name,
483                result_preview,
484                duration_secs,
485            } => {
486                assert_eq!(subagent_id, 7);
487                assert_eq!(agent_name, "worker");
488                assert_eq!(result_preview, "done!");
489                assert!((duration_secs - 1.5).abs() < f64::EPSILON);
490            }
491            other => panic!("unexpected: {:?}", other),
492        }
493    }
494
495    #[test]
496    fn map_agent_steering_delivered_is_dropped() {
497        let ev = StreamEvent::Agent(AgentEvent::SteeringDelivered {
498            message: "steer".to_string(),
499        });
500        assert!(
501            map_stream_event(&ev).is_none(),
502            "SteeringDelivered must be dropped — internal hook signal"
503        );
504    }
505
506    #[test]
507    fn map_session_events_all_return_none() {
508        // All Session variants return None; the streaming loop handles them
509        // directly with mutable access to RpcState.
510        let events: &[StreamEvent] = &[
511            StreamEvent::Session(SessionEvent::Done),
512            StreamEvent::Session(SessionEvent::Error("oops".to_string())),
513            StreamEvent::Session(SessionEvent::MessageHistory(vec![])),
514            StreamEvent::Session(SessionEvent::Usage {
515                input_tokens: 1,
516                output_tokens: 2,
517                cache_read_input_tokens: 0,
518                cache_creation_input_tokens: 0,
519                cache_creation_5m: None,
520                cache_creation_1h: None,
521                model: None,
522            }),
523        ];
524        for ev in events {
525            assert!(
526                map_stream_event(ev).is_none(),
527                "Session event {:?} should return None",
528                ev
529            );
530        }
531    }
532
533    // ── accumulate_usage ─────────────────────────────────────────────────────
534
535    fn zero_usage() -> TurnUsage {
536        TurnUsage {
537            input_tokens: 0,
538            output_tokens: 0,
539            cache_read_input_tokens: 0,
540            cache_creation_input_tokens: 0,
541            cache_creation_5m: None,
542            cache_creation_1h: None,
543            model: None,
544        }
545    }
546
547    #[test]
548    fn accumulate_usage_basic() {
549        let mut acc = zero_usage();
550        let ev = SessionEvent::Usage {
551            input_tokens: 100,
552            output_tokens: 50,
553            cache_read_input_tokens: 10,
554            cache_creation_input_tokens: 5,
555            cache_creation_5m: Some(3),
556            cache_creation_1h: Some(2),
557            model: Some("claude-3-5".to_string()),
558        };
559        accumulate_usage(&mut acc, &ev);
560        assert_eq!(acc.input_tokens, 100);
561        assert_eq!(acc.output_tokens, 50);
562        assert_eq!(acc.cache_read_input_tokens, 10);
563        assert_eq!(acc.cache_creation_input_tokens, 5);
564        assert_eq!(acc.cache_creation_5m, Some(3));
565        assert_eq!(acc.cache_creation_1h, Some(2));
566        assert_eq!(acc.model.as_deref(), Some("claude-3-5"));
567    }
568
569    #[test]
570    fn accumulate_usage_additive_across_calls() {
571        let mut acc = TurnUsage {
572            input_tokens: 10,
573            output_tokens: 5,
574            cache_read_input_tokens: 0,
575            cache_creation_input_tokens: 0,
576            cache_creation_5m: Some(4),
577            cache_creation_1h: None,
578            model: Some("first-model".to_string()),
579        };
580        let ev = SessionEvent::Usage {
581            input_tokens: 20,
582            output_tokens: 8,
583            cache_read_input_tokens: 2,
584            cache_creation_input_tokens: 1,
585            cache_creation_5m: Some(1),
586            cache_creation_1h: Some(7),
587            model: Some("second-model".to_string()),
588        };
589        accumulate_usage(&mut acc, &ev);
590        assert_eq!(acc.input_tokens, 30);
591        assert_eq!(acc.output_tokens, 13);
592        assert_eq!(acc.cache_read_input_tokens, 2);
593        assert_eq!(acc.cache_creation_input_tokens, 1);
594        // Split: Option-accumulate — Some once any split arrives.
595        assert_eq!(acc.cache_creation_5m, Some(5));
596        assert_eq!(acc.cache_creation_1h, Some(7));
597        // Model must NOT be overwritten once set (first-wins semantics)
598        assert_eq!(acc.model.as_deref(), Some("first-model"));
599    }
600
601    #[test]
602    fn accumulate_usage_sets_model_when_none() {
603        let mut acc = zero_usage();
604        let ev = SessionEvent::Usage {
605            input_tokens: 1,
606            output_tokens: 1,
607            cache_read_input_tokens: 0,
608            cache_creation_input_tokens: 0,
609            cache_creation_5m: None,
610            cache_creation_1h: None,
611            model: Some("my-model".to_string()),
612        };
613        accumulate_usage(&mut acc, &ev);
614        assert_eq!(acc.model.as_deref(), Some("my-model"));
615    }
616
617    #[test]
618    fn accumulate_usage_ignores_done() {
619        let mut acc = zero_usage();
620        acc.input_tokens = 5;
621        accumulate_usage(&mut acc, &SessionEvent::Done);
622        assert_eq!(acc.input_tokens, 5, "Done must not mutate the accumulator");
623    }
624
625    #[test]
626    fn accumulate_usage_ignores_error() {
627        let mut acc = zero_usage();
628        acc.output_tokens = 3;
629        accumulate_usage(&mut acc, &SessionEvent::Error("boom".to_string()));
630        assert_eq!(acc.output_tokens, 3, "Error must not mutate the accumulator");
631    }
632
633    #[test]
634    fn accumulate_usage_ignores_message_history() {
635        let mut acc = zero_usage();
636        acc.input_tokens = 7;
637        accumulate_usage(&mut acc, &SessionEvent::MessageHistory(vec![]));
638        assert_eq!(acc.input_tokens, 7, "MessageHistory must not mutate the accumulator");
639    }
640
641    // ── build_user_content ───────────────────────────────────────────────────
642
643    #[test]
644    fn build_user_content_no_attachments() {
645        assert_eq!(build_user_content("hello", &[]), "hello");
646    }
647
648    #[test]
649    fn build_user_content_single_attachment() {
650        let attachments = vec![RpcAttachment {
651            path: "/tmp/a.txt".to_string(),
652            name: None,
653            mime: None,
654        }];
655        let msg = build_user_content("check this", &attachments);
656        assert!(msg.starts_with("[user attached files: \"/tmp/a.txt\"]"));
657        assert!(msg.contains("check this"));
658    }
659
660    #[test]
661    fn build_user_content_multiple_attachments() {
662        let attachments = vec![
663            RpcAttachment { path: "/tmp/a.txt".to_string(), name: None, mime: None },
664            RpcAttachment { path: "/tmp/b.pdf".to_string(), name: None, mime: None },
665        ];
666        let msg = build_user_content("check these", &attachments);
667        assert!(
668            msg.contains("[user attached files: \"/tmp/a.txt\", \"/tmp/b.pdf\"]"),
669            "paths must be quoted and comma-separated: {msg}"
670        );
671        assert!(msg.contains("check these"));
672    }
673
674    #[test]
675    fn build_user_content_preserves_original_message() {
676        let attachments = vec![RpcAttachment {
677            path: "/tmp/x".to_string(),
678            name: Some("x".to_string()),
679            mime: Some("text/plain".to_string()),
680        }];
681        let original = "multi\nline\nmessage";
682        let msg = build_user_content(original, &attachments);
683        assert!(msg.ends_with(original), "original message must appear verbatim at the end");
684    }
685
686    // ── build_user_content: quoting edge cases ───────────────────────────────
687
688    #[test]
689    fn build_user_content_path_with_comma_is_quoted() {
690        let attachments = vec![RpcAttachment {
691            path: "/tmp/a,b.pdf".to_string(),
692            name: None,
693            mime: None,
694        }];
695        let msg = build_user_content("look", &attachments);
696        assert!(
697            msg.contains("\"/tmp/a,b.pdf\""),
698            "comma path must be wrapped in quotes: {msg}"
699        );
700        // Must NOT appear as bare unquoted path
701        assert!(
702            !msg.contains("[user attached files: /tmp/a,b.pdf]"),
703            "bare unquoted comma path must not appear: {msg}"
704        );
705    }
706
707    #[test]
708    fn build_user_content_multiple_paths_each_quoted() {
709        let attachments = vec![
710            RpcAttachment { path: "/p1".to_string(), name: None, mime: None },
711            RpcAttachment { path: "/p2".to_string(), name: None, mime: None },
712        ];
713        let msg = build_user_content("x", &attachments);
714        assert!(
715            msg.contains("\"/p1\", \"/p2\""),
716            "each path must be individually quoted: {msg}"
717        );
718    }
719
720    #[test]
721    fn build_user_content_path_with_embedded_quote_is_escaped() {
722        let attachments = vec![RpcAttachment {
723            path: "/tmp/he\"llo".to_string(),
724            name: None,
725            mime: None,
726        }];
727        let msg = build_user_content("x", &attachments);
728        assert!(
729            msg.contains("\"/tmp/he\\\"llo\""),
730            "embedded double-quote must be backslash-escaped: {msg}"
731        );
732    }
733
734    #[test]
735    fn build_user_content_path_with_backslash_is_escaped() {
736        let attachments = vec![RpcAttachment {
737            path: "/tmp/a\\b".to_string(),
738            name: None,
739            mime: None,
740        }];
741        let msg = build_user_content("x", &attachments);
742        assert!(
743            msg.contains("\"/tmp/a\\\\b\""),
744            "backslash in path must be doubled: {msg}"
745        );
746    }
747
748    // ── build_tools_list_body ────────────────────────────────────────────────
749
750    #[test]
751    fn build_tools_list_body_empty() {
752        let body = super::build_tools_list_body(&[]);
753        assert_eq!(body["ok"], true);
754        assert!(body["tools"].is_array());
755        assert_eq!(body["tools"].as_array().unwrap().len(), 0);
756    }
757
758    #[test]
759    fn build_tools_list_body_with_entries() {
760        let schema = vec![
761            json!({"name": "bash", "description": "Run bash", "input_schema": {"type": "object"}}),
762            json!({"name": "read", "description": "Read file", "input_schema": {"type": "object"}}),
763        ];
764        let body = super::build_tools_list_body(&schema);
765        assert_eq!(body["ok"], true);
766        let tools = body["tools"].as_array().unwrap();
767        assert_eq!(tools.len(), 2);
768        assert_eq!(tools[0]["name"], "bash");
769        assert_eq!(tools[1]["name"], "read");
770    }
771
772    /// The bridge checks `response.ok === true && Array.isArray(response.tools)`.
773    /// Verify the body round-trips through serde and satisfies both conditions.
774    #[test]
775    fn build_tools_list_body_roundtrip_satisfies_bridge_contract() {
776        let schema = vec![
777            json!({"name": "bash", "description": "desc", "input_schema": {}}),
778        ];
779        let body = super::build_tools_list_body(&schema);
780        // Simulate serialise → deserialise (what the parent process and bridge each do).
781        let serialised = serde_json::to_string(&body).unwrap();
782        let parsed: serde_json::Value = serde_json::from_str(&serialised).unwrap();
783        assert_eq!(parsed["ok"], true, "bridge check: ok===true");
784        assert!(parsed["tools"].is_array(), "bridge check: Array.isArray(tools)");
785    }
786
787    // ── handle_compact lock-release invariant ────────────────────────────────
788
789    /// Structural proof that `handle_compact` releases the state lock before
790    /// the long-running `compact_conversation` await.
791    ///
792    /// The fix in `cmd::rpc::handle_compact` snapshots `(msgs, runtime)` inside
793    /// a block that ends *before* the await, so the `MutexGuard` is dropped at
794    /// the closing `}`.  This test uses a `tokio::sync::Mutex` to demonstrate
795    /// the same pattern: a second task can acquire the lock while the "slow
796    /// operation" is running, proving contention is bounded to the snapshot
797    /// phase only.
798    #[tokio::test]
799    async fn handle_compact_releases_lock_before_slow_await() {
800        use std::sync::Arc;
801        use tokio::sync::Mutex;
802
803        let shared: Arc<Mutex<u32>> = Arc::new(Mutex::new(0));
804
805        // Simulate the fixed handle_compact pattern:
806        //   1. brief lock to snapshot data
807        //   2. long operation with NO lock
808        //   3. brief lock to write result
809        let shared2 = shared.clone();
810        let task = tokio::spawn(async move {
811            // Phase 1: snapshot under lock.
812            let snapshot = {
813                let mut g = shared2.lock().await;
814                *g += 1; // mark "lock acquired for snapshot"
815                *g       // return snapshot value
816            };
817            // Lock is now RELEASED.
818
819            // Phase 2: slow operation — no lock held.
820            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
821
822            // Phase 3: write result back under lock.
823            let mut g = shared2.lock().await;
824            *g = snapshot + 100;
825        });
826
827        // While the "slow" phase is running, this second task must be able to
828        // acquire the lock without blocking for the full 20 ms.
829        tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
830        let acquired = tokio::time::timeout(
831            tokio::time::Duration::from_millis(5),
832            shared.lock(),
833        )
834        .await;
835        assert!(
836            acquired.is_ok(),
837            "second task must acquire the lock during the slow phase — \
838             handle_compact must NOT hold the lock across compact_conversation"
839        );
840        drop(acquired);
841
842        task.await.unwrap();
843        assert_eq!(*shared.lock().await, 101);
844    }
845}