Skip to main content

agent_client_protocol_conductor/
trace.rs

1//! Trace event types for the sequence diagram viewer.
2//!
3//! Events are serialized as newline-delimited JSON (`.jsons` files).
4//! The viewer loads these files to render interactive sequence diagrams.
5
6use std::collections::HashMap;
7use std::fs::OpenOptions;
8use std::io::{BufWriter, Write};
9use std::path::Path;
10use std::time::Instant;
11
12use agent_client_protocol::schema::{McpOverAcpMessage, SuccessorMessage};
13use agent_client_protocol::{DynConnectTo, JsonRpcMessage, Role, UntypedMessage, jsonrpcmsg};
14use rustc_hash::FxHashMap;
15use serde::{Deserialize, Serialize};
16
17use crate::ComponentIndex;
18use crate::snoop::SnooperComponent;
19
20/// A trace event representing message flow between components.
21#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum TraceEvent {
25    /// A JSON-RPC request from one component to another.
26    Request(RequestEvent),
27
28    /// A JSON-RPC response to a prior request.
29    Response(ResponseEvent),
30
31    /// A JSON-RPC notification (no response expected).
32    Notification(NotificationEvent),
33}
34
35/// Protocol type for messages.
36#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38#[non_exhaustive]
39pub enum Protocol {
40    /// Standard ACP protocol messages.
41    Acp,
42    /// MCP-over-ACP messages (agent calling proxy's MCP server).
43    Mcp,
44}
45
46/// A JSON-RPC request from one component to another.
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[non_exhaustive]
49pub struct RequestEvent {
50    /// Monotonic timestamp (seconds since trace start).
51    pub ts: f64,
52
53    /// Protocol: ACP or MCP.
54    pub protocol: Protocol,
55
56    /// Source component (e.g., "client", "proxy:0", "proxy:1", "agent").
57    pub from: String,
58
59    /// Destination component.
60    pub to: String,
61
62    /// JSON-RPC request ID (for correlating with response).
63    pub id: serde_json::Value,
64
65    /// JSON-RPC method name.
66    pub method: String,
67
68    /// ACP session ID, if known (null before session/new completes).
69    #[serde(skip_serializing_if = "Option::is_none")]
70    pub session: Option<String>,
71
72    /// Full request params.
73    pub params: serde_json::Value,
74}
75
76/// A JSON-RPC response to a prior request.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[non_exhaustive]
79pub struct ResponseEvent {
80    /// Monotonic timestamp (seconds since trace start).
81    pub ts: f64,
82
83    /// Source component (who sent the response).
84    pub from: String,
85
86    /// Destination component (who receives the response).
87    pub to: String,
88
89    /// JSON-RPC request ID this responds to.
90    pub id: serde_json::Value,
91
92    /// True if this is an error response.
93    pub is_error: bool,
94
95    /// Response result or error object.
96    pub payload: serde_json::Value,
97}
98
99/// A JSON-RPC notification (no response expected).
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[non_exhaustive]
102pub struct NotificationEvent {
103    /// Monotonic timestamp (seconds since trace start).
104    pub ts: f64,
105
106    /// Protocol: ACP or MCP.
107    pub protocol: Protocol,
108
109    /// Source component.
110    pub from: String,
111
112    /// Destination component.
113    pub to: String,
114
115    /// JSON-RPC method name.
116    pub method: String,
117
118    /// ACP session ID, if known.
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub session: Option<String>,
121
122    /// Full notification params.
123    pub params: serde_json::Value,
124}
125
126/// Trait for destinations that can receive trace events.
127pub trait WriteEvent: Send + 'static {
128    /// Write a trace event to the destination.
129    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()>;
130}
131
132/// Writes trace events as newline-delimited JSON to a `Write` impl.
133pub(crate) struct EventWriter<W> {
134    writer: W,
135}
136
137impl<W: Write> EventWriter<W> {
138    pub fn new(writer: W) -> Self {
139        Self { writer }
140    }
141}
142
143impl<W: Write + Send + 'static> WriteEvent for EventWriter<W> {
144    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
145        serde_json::to_writer(&mut self.writer, event).map_err(std::io::Error::other)?;
146        self.writer.write_all(b"\n")?;
147        self.writer.flush()
148    }
149}
150
151/// Impl for UnboundedSender - sends events to a channel (useful for testing).
152impl WriteEvent for futures::channel::mpsc::UnboundedSender<TraceEvent> {
153    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
154        self.unbounded_send(event.clone())
155            .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
156    }
157}
158
159/// Writer for trace events.
160pub struct TraceWriter {
161    dest: Box<dyn WriteEvent>,
162    start_time: Instant,
163
164    /// When we trace a request, we store its id along with the
165    /// details here. When we see responses, we try to match them up.
166    request_details: FxHashMap<serde_json::Value, RequestDetails>,
167}
168
169impl std::fmt::Debug for TraceWriter {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("TraceWriter")
172            .field("start_time", &self.start_time)
173            .finish_non_exhaustive()
174    }
175}
176
177struct RequestDetails {
178    #[expect(dead_code)]
179    protocol: Protocol,
180
181    #[expect(dead_code)]
182    method: String,
183
184    request_from: ComponentIndex,
185    request_to: ComponentIndex,
186}
187
188impl TraceWriter {
189    /// Create a new trace writer from any WriteEvent destination.
190    pub fn new<D: WriteEvent>(dest: D) -> Self {
191        Self {
192            dest: Box::new(dest),
193            start_time: Instant::now(),
194            request_details: HashMap::default(),
195        }
196    }
197
198    /// Create a new trace writer that writes to a file path.
199    pub fn from_path(path: impl AsRef<Path>) -> std::io::Result<Self> {
200        let file = OpenOptions::new()
201            .create(true)
202            .write(true)
203            .truncate(true)
204            .open(path.as_ref())?;
205        Ok(Self::new(EventWriter::new(BufWriter::new(file))))
206    }
207
208    /// Get the elapsed time since trace start, in seconds.
209    fn elapsed(&self) -> f64 {
210        self.start_time.elapsed().as_secs_f64()
211    }
212
213    /// Write a trace event.
214    fn write_event(&mut self, event: &TraceEvent) {
215        // Ignore errors - tracing should not break the conductor
216        drop(self.dest.write_event(event));
217    }
218
219    /// Write a request event.
220    #[expect(clippy::too_many_arguments)]
221    fn request(
222        &mut self,
223        protocol: Protocol,
224        from: ComponentIndex,
225        to: ComponentIndex,
226        id: serde_json::Value,
227        method: String,
228        session: Option<String>,
229        params: serde_json::Value,
230    ) {
231        self.request_details.insert(
232            id.clone(),
233            RequestDetails {
234                protocol,
235                method: method.clone(),
236                request_from: from,
237                request_to: to,
238            },
239        );
240        self.write_event(&TraceEvent::Request(RequestEvent {
241            ts: self.elapsed(),
242            protocol,
243            from: format!("{from:?}"),
244            to: format!("{to:?}"),
245            id,
246            method,
247            session,
248            params,
249        }));
250    }
251
252    /// Write a response event.
253    fn response(
254        &mut self,
255        from: ComponentIndex,
256        to: ComponentIndex,
257        id: serde_json::Value,
258        is_error: bool,
259        payload: serde_json::Value,
260    ) {
261        self.write_event(&TraceEvent::Response(ResponseEvent {
262            ts: self.elapsed(),
263            from: format!("{from:?}"),
264            to: format!("{to:?}"),
265            id,
266            is_error,
267            payload,
268        }));
269    }
270
271    /// Write a notification event.
272    fn notification(
273        &mut self,
274        protocol: Protocol,
275        from: ComponentIndex,
276        to: ComponentIndex,
277        method: impl Into<String>,
278        session: Option<String>,
279        params: serde_json::Value,
280    ) {
281        self.write_event(&TraceEvent::Notification(NotificationEvent {
282            ts: self.elapsed(),
283            protocol,
284            from: format!("{from:?}"),
285            to: format!("{to:?}"),
286            method: method.into(),
287            session,
288            params,
289        }));
290    }
291
292    /// Trace a raw JSON-RPC message being sent from one component to another.
293    fn trace_message(&mut self, traced_message: TracedMessage) {
294        let TracedMessage {
295            component_index,
296            successor_index,
297            incoming,
298            message,
299        } = traced_message;
300
301        // We get every message going into or out of a proxy. This includes
302        // a fair number of duplicates: for example, if proxy P0 sends to P1,
303        // we'll get it as an *outgoing* message from P0 and an *incoming* message to P1.
304        // So we want to keep just one copy.
305        //
306        // We retain:
307        //
308        // * Incoming requests/notifications targeting a PROXY.
309        // * Incoming requests/notifications targeting the AGENT.
310
311        match message {
312            jsonrpcmsg::Message::Request(req) => {
313                let MessageInfo {
314                    successor,
315                    id,
316                    protocol,
317                    method,
318                    params,
319                } = MessageInfo::from_req(req);
320
321                let (from, to) = match (successor, incoming, component_index, successor_index) {
322                    // An incoming request/notification to a proxy from its predecessor.
323                    (Successor(false), Incoming(true), ComponentIndex::Proxy(proxy_index), _) => (
324                        ComponentIndex::predecessor_of(proxy_index),
325                        ComponentIndex::Proxy(proxy_index),
326                    ),
327
328                    // An incoming request/notification to any component from its successor.
329                    //
330                    // This includes incoming messages to the client in the case where we have no proxies.
331                    (Successor(true), Incoming(true), component_index, successor_index) => {
332                        (successor_index, component_index)
333                    }
334
335                    // An outgoing request/notification from a component to its successor
336                    // *and* its successor is not a proxy.
337                    //
338                    // (If its successor is a proxy, we ignore it, because we'll also see the
339                    // message in "incoming" form).
340                    (Successor(true), Incoming(false), component_index, ComponentIndex::Agent) => {
341                        (component_index, ComponentIndex::Agent)
342                    }
343
344                    _ => return,
345                };
346
347                match id {
348                    Some(id) => {
349                        self.request(protocol, from, to, id_to_json(&id), method, None, params);
350                    }
351                    None => {
352                        self.notification(protocol, from, to, method, None, params);
353                    }
354                }
355            }
356            jsonrpcmsg::Message::Response(resp) => {
357                // Lookup the response by its id.
358                // All of the messages we are intercepting go to our proxies,
359                // and we always assign them globally unique
360                if let Some(id) = resp.id {
361                    let id = id_to_json(&id);
362                    if let Some(RequestDetails {
363                        protocol: _,
364                        method: _,
365                        request_from,
366                        request_to,
367                    }) = self.request_details.remove(&id)
368                    {
369                        let (is_error, payload) = match (&resp.result, &resp.error) {
370                            (Some(result), _) => (false, result.clone()),
371                            (_, Some(error)) => {
372                                (true, serde_json::to_value(error).unwrap_or_default())
373                            }
374                            (None, None) => (false, serde_json::Value::Null),
375                        };
376                        self.response(request_to, request_from, id, is_error, payload);
377                    }
378                }
379            }
380        }
381    }
382
383    /// Spawn a trace writer task.
384    ///
385    /// Returns a `TraceHandle` that can be cloned and used from multiple tasks,
386    /// and a future that should be spawned (e.g., via `with_spawned`).
387    pub(crate) fn spawn(
388        mut self: TraceWriter,
389    ) -> (
390        TraceHandle,
391        impl std::future::Future<Output = Result<(), agent_client_protocol::Error>>,
392    ) {
393        use futures::StreamExt;
394
395        let (tx, mut rx) = futures::channel::mpsc::unbounded();
396
397        let future = async move {
398            while let Some(event) = rx.next().await {
399                self.trace_message(event);
400            }
401            Ok(())
402        };
403
404        (TraceHandle { tx }, future)
405    }
406}
407
408/// A cloneable handle for sending trace events to the trace writer task.
409///
410/// Create with [`spawn_trace_writer`], then clone and pass to bridges.
411#[derive(Clone, Debug)]
412pub(crate) struct TraceHandle {
413    tx: futures::channel::mpsc::UnboundedSender<TracedMessage>,
414}
415
416impl TraceHandle {
417    /// Trace a raw JSON-RPC message being sent from one component to another.
418    fn trace_message(
419        &self,
420        component_index: ComponentIndex,
421        successor_index: ComponentIndex,
422        incoming: Incoming,
423        message: &jsonrpcmsg::Message,
424    ) -> Result<(), agent_client_protocol::Error> {
425        self.tx
426            .unbounded_send(TracedMessage {
427                component_index,
428                successor_index,
429                incoming,
430                message: message.clone(),
431            })
432            .map_err(agent_client_protocol::util::internal_error)
433    }
434
435    /// Create a tracing bridge that wraps a proxy component.
436    ///
437    /// Spawns a bridge task that forwards messages between the channel and the component
438    /// while tracing them. Returns the wrapped component.
439    ///
440    /// Tracing strategy:
441    /// - **Left→Right (incoming)**: Trace requests/notifications, skip responses
442    /// - **Right→Left (outgoing)**: Trace responses, and if `trace_outgoing_requests` is true,
443    ///   also trace requests/notifications (needed for edge bridges at conductor boundaries)
444    ///
445    /// - `cx`: Connection context for spawning the bridge task
446    /// - `left_name`: Logical name of the component on the "left" side (e.g., "client", "proxy:0")
447    /// - `right_name`: Logical name of the component on the "right" side (e.g., "proxy:0", "agent")
448    /// - `component`: The component to wrap
449    pub fn bridge_component<R: Role>(
450        &self,
451        proxy_index: ComponentIndex,
452        successor_index: ComponentIndex,
453        proxy: impl agent_client_protocol::ConnectTo<R>,
454    ) -> DynConnectTo<R> {
455        DynConnectTo::new(SnooperComponent::new(
456            proxy,
457            {
458                let trace_handle = self.clone();
459                move |msg| {
460                    trace_handle.trace_message(proxy_index, successor_index, Incoming(true), msg)
461                }
462            },
463            {
464                let trace_handle = self.clone();
465                move |msg| {
466                    trace_handle.trace_message(proxy_index, successor_index, Incoming(false), msg)
467                }
468            },
469        ))
470    }
471}
472
473/// Convert a jsonrpcmsg::Id to serde_json::Value.
474fn id_to_json(id: &jsonrpcmsg::Id) -> serde_json::Value {
475    match id {
476        jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
477        jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
478        jsonrpcmsg::Id::Null => serde_json::Value::Null,
479    }
480}
481
482/// A message observed going over a channel connected to `left` and `right`.
483/// This could be a successor message, a mcp-over-acp message, etc.
484#[derive(Debug)]
485struct TracedMessage {
486    component_index: ComponentIndex,
487    successor_index: ComponentIndex,
488    incoming: Incoming,
489    message: jsonrpcmsg::Message,
490}
491
492/// Fully interpreted message info.
493#[derive(Debug)]
494struct MessageInfo {
495    successor: Successor,
496    id: Option<jsonrpcmsg::Id>,
497    protocol: Protocol,
498    method: String,
499    params: serde_json::Value,
500}
501
502#[derive(Copy, Clone, Debug)]
503struct Successor(bool);
504
505#[derive(Copy, Clone, Debug)]
506struct Incoming(bool);
507
508impl MessageInfo {
509    /// Extract logical message info from method and params.
510    ///
511    /// This unwraps protocol wrappers to get the "real" message:
512    /// - `_proxy/successor` messages are unwrapped to get the inner message
513    /// - `_proxy/initialize` messages are unwrapped to get `initialize`
514    /// - `_mcp/message` messages are detected and marked as MCP protocol
515    ///
516    /// Returns (protocol, method, params).
517    fn from_req(req: jsonrpcmsg::Request) -> Self {
518        let untyped = UntypedMessage::parse_message(&req.method, &req.params)
519            .expect("untyped message is infallible");
520        Self::from_untyped(Successor(false), req.id, Protocol::Acp, untyped)
521    }
522
523    fn from_untyped(
524        successor: Successor,
525        id: Option<jsonrpcmsg::Id>,
526        protocol: Protocol,
527        untyped: UntypedMessage,
528    ) -> Self {
529        if let Ok(m) = SuccessorMessage::parse_message(&untyped.method, &untyped.params) {
530            return Self::from_untyped(Successor(true), id, protocol, m.message);
531        }
532
533        if let Ok(m) = McpOverAcpMessage::parse_message(&untyped.method, &untyped.params) {
534            return Self::from_untyped(successor, id, Protocol::Mcp, m.message);
535        }
536
537        Self {
538            successor,
539            id,
540            protocol,
541            method: untyped.method,
542            params: untyped.params,
543        }
544    }
545}