Skip to main content

agent_client_protocol_conductor/
trace.rs

1//! Trace event types for the sequence diagram viewer.
2//!
3//! Events are serialized as newline-delimited JSON (`.jsons` files).
4//! The viewer loads these files to render interactive sequence diagrams.
5
6use std::collections::HashMap;
7use std::fs::OpenOptions;
8use std::io::{BufWriter, Write};
9use std::path::Path;
10use std::time::Instant;
11
12use agent_client_protocol::schema::v1::{
13    Notification as RpcNotification, Request as RpcRequest, RequestId, Response as RpcResponse,
14};
15use agent_client_protocol::schema::{McpOverAcpMessage, SuccessorMessage};
16use agent_client_protocol::{
17    DynConnectTo, JsonRpcMessage, RawJsonRpcMessage, RawJsonRpcParams, Role, UntypedMessage,
18};
19use rustc_hash::FxHashMap;
20use serde::{Deserialize, Serialize};
21
22use crate::ComponentIndex;
23use crate::snoop::SnooperComponent;
24
25/// A trace event representing message flow between components.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "type", rename_all = "snake_case")]
28#[non_exhaustive]
29pub enum TraceEvent {
30    /// A JSON-RPC request from one component to another.
31    Request(RequestEvent),
32
33    /// A JSON-RPC response to a prior request.
34    Response(ResponseEvent),
35
36    /// A JSON-RPC notification (no response expected).
37    Notification(NotificationEvent),
38}
39
40/// Protocol type for messages.
41#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
42#[serde(rename_all = "snake_case")]
43#[non_exhaustive]
44pub enum Protocol {
45    /// Standard ACP protocol messages.
46    Acp,
47    /// MCP-over-ACP messages (agent calling proxy's MCP server).
48    Mcp,
49}
50
51/// A JSON-RPC request from one component to another.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53#[non_exhaustive]
54pub struct RequestEvent {
55    /// Monotonic timestamp (seconds since trace start).
56    pub ts: f64,
57
58    /// Protocol: ACP or MCP.
59    pub protocol: Protocol,
60
61    /// Source component (e.g., "client", "proxy:0", "proxy:1", "agent").
62    pub from: String,
63
64    /// Destination component.
65    pub to: String,
66
67    /// JSON-RPC request ID (for correlating with response).
68    pub id: serde_json::Value,
69
70    /// JSON-RPC method name.
71    pub method: String,
72
73    /// ACP session ID, if known (null before session/new completes).
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub session: Option<String>,
76
77    /// Full request params.
78    pub params: serde_json::Value,
79}
80
81/// A JSON-RPC response to a prior request.
82#[derive(Debug, Clone, Serialize, Deserialize)]
83#[non_exhaustive]
84pub struct ResponseEvent {
85    /// Monotonic timestamp (seconds since trace start).
86    pub ts: f64,
87
88    /// Source component (who sent the response).
89    pub from: String,
90
91    /// Destination component (who receives the response).
92    pub to: String,
93
94    /// JSON-RPC request ID this responds to.
95    pub id: serde_json::Value,
96
97    /// True if this is an error response.
98    pub is_error: bool,
99
100    /// Response result or error object.
101    pub payload: serde_json::Value,
102}
103
104/// A JSON-RPC notification (no response expected).
105#[derive(Debug, Clone, Serialize, Deserialize)]
106#[non_exhaustive]
107pub struct NotificationEvent {
108    /// Monotonic timestamp (seconds since trace start).
109    pub ts: f64,
110
111    /// Protocol: ACP or MCP.
112    pub protocol: Protocol,
113
114    /// Source component.
115    pub from: String,
116
117    /// Destination component.
118    pub to: String,
119
120    /// JSON-RPC method name.
121    pub method: String,
122
123    /// ACP session ID, if known.
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub session: Option<String>,
126
127    /// Full notification params.
128    pub params: serde_json::Value,
129}
130
131/// Trait for destinations that can receive trace events.
132pub trait WriteEvent: Send + 'static {
133    /// Write a trace event to the destination.
134    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()>;
135}
136
137/// Writes trace events as newline-delimited JSON to a `Write` impl.
138pub(crate) struct EventWriter<W> {
139    writer: W,
140}
141
142impl<W: Write> EventWriter<W> {
143    pub fn new(writer: W) -> Self {
144        Self { writer }
145    }
146}
147
148impl<W: Write + Send + 'static> WriteEvent for EventWriter<W> {
149    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
150        serde_json::to_writer(&mut self.writer, event).map_err(std::io::Error::other)?;
151        self.writer.write_all(b"\n")?;
152        self.writer.flush()
153    }
154}
155
156/// Impl for UnboundedSender - sends events to a channel (useful for testing).
157impl WriteEvent for futures::channel::mpsc::UnboundedSender<TraceEvent> {
158    fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
159        self.unbounded_send(event.clone())
160            .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
161    }
162}
163
164/// Writer for trace events.
165pub struct TraceWriter {
166    dest: Box<dyn WriteEvent>,
167    start_time: Instant,
168
169    /// When we trace a request, we store its id along with the
170    /// details here. When we see responses, we try to match them up.
171    request_details: FxHashMap<serde_json::Value, RequestDetails>,
172}
173
174impl std::fmt::Debug for TraceWriter {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("TraceWriter")
177            .field("start_time", &self.start_time)
178            .finish_non_exhaustive()
179    }
180}
181
182struct RequestDetails {
183    #[expect(dead_code)]
184    protocol: Protocol,
185
186    #[expect(dead_code)]
187    method: String,
188
189    request_from: ComponentIndex,
190    request_to: ComponentIndex,
191}
192
193impl TraceWriter {
194    /// Create a new trace writer from any WriteEvent destination.
195    pub fn new<D: WriteEvent>(dest: D) -> Self {
196        Self {
197            dest: Box::new(dest),
198            start_time: Instant::now(),
199            request_details: HashMap::default(),
200        }
201    }
202
203    /// Create a new trace writer that writes to a file path.
204    pub fn from_path(path: impl AsRef<Path>) -> std::io::Result<Self> {
205        let file = OpenOptions::new()
206            .create(true)
207            .write(true)
208            .truncate(true)
209            .open(path.as_ref())?;
210        Ok(Self::new(EventWriter::new(BufWriter::new(file))))
211    }
212
213    /// Get the elapsed time since trace start, in seconds.
214    fn elapsed(&self) -> f64 {
215        self.start_time.elapsed().as_secs_f64()
216    }
217
218    /// Write a trace event.
219    fn write_event(&mut self, event: &TraceEvent) {
220        // Ignore errors - tracing should not break the conductor
221        drop(self.dest.write_event(event));
222    }
223
224    /// Write a request event.
225    #[expect(clippy::too_many_arguments)]
226    fn request(
227        &mut self,
228        protocol: Protocol,
229        from: ComponentIndex,
230        to: ComponentIndex,
231        id: serde_json::Value,
232        method: String,
233        session: Option<String>,
234        params: serde_json::Value,
235    ) {
236        self.request_details.insert(
237            id.clone(),
238            RequestDetails {
239                protocol,
240                method: method.clone(),
241                request_from: from,
242                request_to: to,
243            },
244        );
245        self.write_event(&TraceEvent::Request(RequestEvent {
246            ts: self.elapsed(),
247            protocol,
248            from: format!("{from:?}"),
249            to: format!("{to:?}"),
250            id,
251            method,
252            session,
253            params,
254        }));
255    }
256
257    /// Write a response event.
258    fn response(
259        &mut self,
260        from: ComponentIndex,
261        to: ComponentIndex,
262        id: serde_json::Value,
263        is_error: bool,
264        payload: serde_json::Value,
265    ) {
266        self.write_event(&TraceEvent::Response(ResponseEvent {
267            ts: self.elapsed(),
268            from: format!("{from:?}"),
269            to: format!("{to:?}"),
270            id,
271            is_error,
272            payload,
273        }));
274    }
275
276    /// Write a notification event.
277    fn notification(
278        &mut self,
279        protocol: Protocol,
280        from: ComponentIndex,
281        to: ComponentIndex,
282        method: impl Into<String>,
283        session: Option<String>,
284        params: serde_json::Value,
285    ) {
286        self.write_event(&TraceEvent::Notification(NotificationEvent {
287            ts: self.elapsed(),
288            protocol,
289            from: format!("{from:?}"),
290            to: format!("{to:?}"),
291            method: method.into(),
292            session,
293            params,
294        }));
295    }
296
297    /// Trace a raw JSON-RPC message being sent from one component to another.
298    fn trace_message(&mut self, traced_message: TracedMessage) {
299        let TracedMessage {
300            component_index,
301            successor_index,
302            incoming,
303            message,
304        } = traced_message;
305
306        // We get every message going into or out of a proxy. This includes
307        // a fair number of duplicates: for example, if proxy P0 sends to P1,
308        // we'll get it as an *outgoing* message from P0 and an *incoming* message to P1.
309        // So we want to keep just one copy.
310        //
311        // We retain:
312        //
313        // * Incoming requests/notifications targeting a PROXY.
314        // * Incoming requests/notifications targeting the AGENT.
315
316        match message {
317            RawJsonRpcMessage::Request(req) => {
318                let MessageInfo {
319                    successor,
320                    id,
321                    protocol,
322                    method,
323                    params,
324                } = MessageInfo::from_request(req);
325
326                self.trace_request_or_notification(
327                    incoming,
328                    component_index,
329                    successor_index,
330                    successor,
331                    id,
332                    protocol,
333                    method,
334                    params,
335                );
336            }
337            RawJsonRpcMessage::Notification(notification) => {
338                let MessageInfo {
339                    successor,
340                    id,
341                    protocol,
342                    method,
343                    params,
344                } = MessageInfo::from_notification(notification);
345
346                self.trace_request_or_notification(
347                    incoming,
348                    component_index,
349                    successor_index,
350                    successor,
351                    id,
352                    protocol,
353                    method,
354                    params,
355                );
356            }
357            RawJsonRpcMessage::Response(resp) => {
358                // Lookup the response by its id.
359                // All of the messages we are intercepting go to our proxies,
360                // and we always assign them globally unique ids.
361                let (id, is_error, payload) = match resp {
362                    RpcResponse::Result { id, result } => (id, false, result),
363                    RpcResponse::Error { id, error } => {
364                        (id, true, serde_json::to_value(error).unwrap_or_default())
365                    }
366                };
367                let id = id_to_json(&id);
368                if let Some(RequestDetails {
369                    protocol: _,
370                    method: _,
371                    request_from,
372                    request_to,
373                }) = self.request_details.remove(&id)
374                {
375                    self.response(request_to, request_from, id, is_error, payload);
376                }
377            }
378        }
379    }
380
381    #[expect(clippy::too_many_arguments)]
382    fn trace_request_or_notification(
383        &mut self,
384        incoming: Incoming,
385        component_index: ComponentIndex,
386        successor_index: ComponentIndex,
387        successor: Successor,
388        id: Option<RequestId>,
389        protocol: Protocol,
390        method: String,
391        params: serde_json::Value,
392    ) {
393        let (from, to) = match (successor, incoming, component_index, successor_index) {
394            // An incoming request/notification to a proxy from its predecessor.
395            (Successor(false), Incoming(true), ComponentIndex::Proxy(proxy_index), _) => (
396                ComponentIndex::predecessor_of(proxy_index),
397                ComponentIndex::Proxy(proxy_index),
398            ),
399
400            // An incoming request/notification to any component from its successor.
401            //
402            // This includes incoming messages to the client in the case where we have no proxies.
403            (Successor(true), Incoming(true), component_index, successor_index) => {
404                (successor_index, component_index)
405            }
406
407            // An outgoing request/notification from a component to its successor
408            // *and* its successor is not a proxy.
409            //
410            // (If its successor is a proxy, we ignore it, because we'll also see the
411            // message in "incoming" form).
412            (Successor(true), Incoming(false), component_index, ComponentIndex::Agent) => {
413                (component_index, ComponentIndex::Agent)
414            }
415
416            _ => return,
417        };
418
419        match id {
420            Some(id) => {
421                self.request(protocol, from, to, id_to_json(&id), method, None, params);
422            }
423            None => {
424                self.notification(protocol, from, to, method, None, params);
425            }
426        }
427    }
428
429    /// Spawn a trace writer task.
430    ///
431    /// Returns a `TraceHandle` that can be cloned and used from multiple tasks,
432    /// and a future that should be spawned (e.g., via `with_spawned`).
433    pub(crate) fn spawn(
434        mut self: TraceWriter,
435    ) -> (
436        TraceHandle,
437        impl std::future::Future<Output = Result<(), agent_client_protocol::Error>>,
438    ) {
439        use futures::StreamExt;
440
441        let (tx, mut rx) = futures::channel::mpsc::unbounded();
442
443        let future = async move {
444            while let Some(event) = rx.next().await {
445                self.trace_message(event);
446            }
447            Ok(())
448        };
449
450        (TraceHandle { tx }, future)
451    }
452}
453
454/// A cloneable handle for sending trace events to the trace writer task.
455///
456/// Create with [`spawn_trace_writer`], then clone and pass to bridges.
457#[derive(Clone, Debug)]
458pub(crate) struct TraceHandle {
459    tx: futures::channel::mpsc::UnboundedSender<TracedMessage>,
460}
461
462impl TraceHandle {
463    /// Trace a raw JSON-RPC message being sent from one component to another.
464    fn trace_message(
465        &self,
466        component_index: ComponentIndex,
467        successor_index: ComponentIndex,
468        incoming: Incoming,
469        message: &RawJsonRpcMessage,
470    ) -> Result<(), agent_client_protocol::Error> {
471        self.tx
472            .unbounded_send(TracedMessage {
473                component_index,
474                successor_index,
475                incoming,
476                message: message.clone(),
477            })
478            .map_err(agent_client_protocol::util::internal_error)
479    }
480
481    /// Create a tracing bridge that wraps a proxy component.
482    ///
483    /// Spawns a bridge task that forwards messages between the channel and the component
484    /// while tracing them. Returns the wrapped component.
485    ///
486    /// Tracing strategy:
487    /// - **Left→Right (incoming)**: Trace requests/notifications, skip responses
488    /// - **Right→Left (outgoing)**: Trace responses, and if `trace_outgoing_requests` is true,
489    ///   also trace requests/notifications (needed for edge bridges at conductor boundaries)
490    ///
491    /// - `cx`: Connection context for spawning the bridge task
492    /// - `left_name`: Logical name of the component on the "left" side (e.g., "client", "proxy:0")
493    /// - `right_name`: Logical name of the component on the "right" side (e.g., "proxy:0", "agent")
494    /// - `component`: The component to wrap
495    pub fn bridge_component<R: Role>(
496        &self,
497        proxy_index: ComponentIndex,
498        successor_index: ComponentIndex,
499        proxy: impl agent_client_protocol::ConnectTo<R>,
500    ) -> DynConnectTo<R> {
501        DynConnectTo::new(SnooperComponent::new(
502            proxy,
503            {
504                let trace_handle = self.clone();
505                move |msg| {
506                    trace_handle.trace_message(proxy_index, successor_index, Incoming(true), msg)
507                }
508            },
509            {
510                let trace_handle = self.clone();
511                move |msg| {
512                    trace_handle.trace_message(proxy_index, successor_index, Incoming(false), msg)
513                }
514            },
515        ))
516    }
517}
518
519/// Convert a JSON-RPC id to serde_json::Value.
520fn id_to_json(id: &RequestId) -> serde_json::Value {
521    serde_json::to_value(id).expect("RequestId serializes infallibly")
522}
523
524fn params_from_transport(params: Option<RawJsonRpcParams>) -> serde_json::Value {
525    params.map_or(serde_json::Value::Null, RawJsonRpcParams::into_value)
526}
527
528/// A message observed going over a channel connected to `left` and `right`.
529/// This could be a successor message, a mcp-over-acp message, etc.
530#[derive(Debug)]
531struct TracedMessage {
532    component_index: ComponentIndex,
533    successor_index: ComponentIndex,
534    incoming: Incoming,
535    message: RawJsonRpcMessage,
536}
537
538/// Fully interpreted message info.
539#[derive(Debug)]
540struct MessageInfo {
541    successor: Successor,
542    id: Option<RequestId>,
543    protocol: Protocol,
544    method: String,
545    params: serde_json::Value,
546}
547
548#[derive(Copy, Clone, Debug)]
549struct Successor(bool);
550
551#[derive(Copy, Clone, Debug)]
552struct Incoming(bool);
553
554impl MessageInfo {
555    /// Extract logical message info from method and params.
556    ///
557    /// This unwraps protocol wrappers to get the "real" message:
558    /// - `_proxy/successor` messages are unwrapped to get the inner message
559    /// - `_proxy/initialize` messages are unwrapped to get `initialize`
560    /// - `_mcp/message` messages are detected and marked as MCP protocol
561    ///
562    /// Returns (protocol, method, params).
563    fn from_request(req: RpcRequest<RawJsonRpcParams>) -> Self {
564        let untyped =
565            UntypedMessage::parse_message(&req.method, &params_from_transport(req.params))
566                .expect("untyped message is infallible");
567        Self::from_untyped(Successor(false), Some(req.id), Protocol::Acp, untyped)
568    }
569
570    fn from_notification(notification: RpcNotification<RawJsonRpcParams>) -> Self {
571        let untyped = UntypedMessage::parse_message(
572            &notification.method,
573            &params_from_transport(notification.params),
574        )
575        .expect("untyped message is infallible");
576        Self::from_untyped(Successor(false), None, Protocol::Acp, untyped)
577    }
578
579    fn from_untyped(
580        successor: Successor,
581        id: Option<RequestId>,
582        protocol: Protocol,
583        untyped: UntypedMessage,
584    ) -> Self {
585        if let Ok(m) = SuccessorMessage::parse_message(&untyped.method, &untyped.params) {
586            return Self::from_untyped(Successor(true), id, protocol, m.message);
587        }
588
589        if let Ok(m) = McpOverAcpMessage::parse_message(&untyped.method, &untyped.params) {
590            return Self::from_untyped(successor, id, Protocol::Mcp, m.message);
591        }
592
593        Self {
594            successor,
595            id,
596            protocol,
597            method: untyped.method,
598            params: untyped.params,
599        }
600    }
601}