1use std::collections::HashMap;
7use std::fs::OpenOptions;
8use std::io::{BufWriter, Write};
9use std::path::Path;
10use std::time::Instant;
11
12use agent_client_protocol::schema::{McpOverAcpMessage, SuccessorMessage};
13use agent_client_protocol::{DynConnectTo, JsonRpcMessage, Role, UntypedMessage, jsonrpcmsg};
14use rustc_hash::FxHashMap;
15use serde::{Deserialize, Serialize};
16
17use crate::ComponentIndex;
18use crate::snoop::SnooperComponent;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22#[serde(tag = "type", rename_all = "snake_case")]
23#[non_exhaustive]
24pub enum TraceEvent {
25 Request(RequestEvent),
27
28 Response(ResponseEvent),
30
31 Notification(NotificationEvent),
33}
34
35#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(rename_all = "snake_case")]
38#[non_exhaustive]
39pub enum Protocol {
40 Acp,
42 Mcp,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48#[non_exhaustive]
49pub struct RequestEvent {
50 pub ts: f64,
52
53 pub protocol: Protocol,
55
56 pub from: String,
58
59 pub to: String,
61
62 pub id: serde_json::Value,
64
65 pub method: String,
67
68 #[serde(skip_serializing_if = "Option::is_none")]
70 pub session: Option<String>,
71
72 pub params: serde_json::Value,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78#[non_exhaustive]
79pub struct ResponseEvent {
80 pub ts: f64,
82
83 pub from: String,
85
86 pub to: String,
88
89 pub id: serde_json::Value,
91
92 pub is_error: bool,
94
95 pub payload: serde_json::Value,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101#[non_exhaustive]
102pub struct NotificationEvent {
103 pub ts: f64,
105
106 pub protocol: Protocol,
108
109 pub from: String,
111
112 pub to: String,
114
115 pub method: String,
117
118 #[serde(skip_serializing_if = "Option::is_none")]
120 pub session: Option<String>,
121
122 pub params: serde_json::Value,
124}
125
126pub trait WriteEvent: Send + 'static {
128 fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()>;
130}
131
132pub(crate) struct EventWriter<W> {
134 writer: W,
135}
136
137impl<W: Write> EventWriter<W> {
138 pub fn new(writer: W) -> Self {
139 Self { writer }
140 }
141}
142
143impl<W: Write + Send + 'static> WriteEvent for EventWriter<W> {
144 fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
145 serde_json::to_writer(&mut self.writer, event).map_err(std::io::Error::other)?;
146 self.writer.write_all(b"\n")?;
147 self.writer.flush()
148 }
149}
150
151impl WriteEvent for futures::channel::mpsc::UnboundedSender<TraceEvent> {
153 fn write_event(&mut self, event: &TraceEvent) -> std::io::Result<()> {
154 self.unbounded_send(event.clone())
155 .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
156 }
157}
158
159pub struct TraceWriter {
161 dest: Box<dyn WriteEvent>,
162 start_time: Instant,
163
164 request_details: FxHashMap<serde_json::Value, RequestDetails>,
167}
168
169impl std::fmt::Debug for TraceWriter {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("TraceWriter")
172 .field("start_time", &self.start_time)
173 .finish_non_exhaustive()
174 }
175}
176
177struct RequestDetails {
178 #[expect(dead_code)]
179 protocol: Protocol,
180
181 #[expect(dead_code)]
182 method: String,
183
184 request_from: ComponentIndex,
185 request_to: ComponentIndex,
186}
187
188impl TraceWriter {
189 pub fn new<D: WriteEvent>(dest: D) -> Self {
191 Self {
192 dest: Box::new(dest),
193 start_time: Instant::now(),
194 request_details: HashMap::default(),
195 }
196 }
197
198 pub fn from_path(path: impl AsRef<Path>) -> std::io::Result<Self> {
200 let file = OpenOptions::new()
201 .create(true)
202 .write(true)
203 .truncate(true)
204 .open(path.as_ref())?;
205 Ok(Self::new(EventWriter::new(BufWriter::new(file))))
206 }
207
208 fn elapsed(&self) -> f64 {
210 self.start_time.elapsed().as_secs_f64()
211 }
212
213 fn write_event(&mut self, event: &TraceEvent) {
215 drop(self.dest.write_event(event));
217 }
218
219 #[expect(clippy::too_many_arguments)]
221 fn request(
222 &mut self,
223 protocol: Protocol,
224 from: ComponentIndex,
225 to: ComponentIndex,
226 id: serde_json::Value,
227 method: String,
228 session: Option<String>,
229 params: serde_json::Value,
230 ) {
231 self.request_details.insert(
232 id.clone(),
233 RequestDetails {
234 protocol,
235 method: method.clone(),
236 request_from: from,
237 request_to: to,
238 },
239 );
240 self.write_event(&TraceEvent::Request(RequestEvent {
241 ts: self.elapsed(),
242 protocol,
243 from: format!("{from:?}"),
244 to: format!("{to:?}"),
245 id,
246 method,
247 session,
248 params,
249 }));
250 }
251
252 fn response(
254 &mut self,
255 from: ComponentIndex,
256 to: ComponentIndex,
257 id: serde_json::Value,
258 is_error: bool,
259 payload: serde_json::Value,
260 ) {
261 self.write_event(&TraceEvent::Response(ResponseEvent {
262 ts: self.elapsed(),
263 from: format!("{from:?}"),
264 to: format!("{to:?}"),
265 id,
266 is_error,
267 payload,
268 }));
269 }
270
271 fn notification(
273 &mut self,
274 protocol: Protocol,
275 from: ComponentIndex,
276 to: ComponentIndex,
277 method: impl Into<String>,
278 session: Option<String>,
279 params: serde_json::Value,
280 ) {
281 self.write_event(&TraceEvent::Notification(NotificationEvent {
282 ts: self.elapsed(),
283 protocol,
284 from: format!("{from:?}"),
285 to: format!("{to:?}"),
286 method: method.into(),
287 session,
288 params,
289 }));
290 }
291
292 fn trace_message(&mut self, traced_message: TracedMessage) {
294 let TracedMessage {
295 component_index,
296 successor_index,
297 incoming,
298 message,
299 } = traced_message;
300
301 match message {
312 jsonrpcmsg::Message::Request(req) => {
313 let MessageInfo {
314 successor,
315 id,
316 protocol,
317 method,
318 params,
319 } = MessageInfo::from_req(req);
320
321 let (from, to) = match (successor, incoming, component_index, successor_index) {
322 (Successor(false), Incoming(true), ComponentIndex::Proxy(proxy_index), _) => (
324 ComponentIndex::predecessor_of(proxy_index),
325 ComponentIndex::Proxy(proxy_index),
326 ),
327
328 (Successor(true), Incoming(true), component_index, successor_index) => {
332 (successor_index, component_index)
333 }
334
335 (Successor(true), Incoming(false), component_index, ComponentIndex::Agent) => {
341 (component_index, ComponentIndex::Agent)
342 }
343
344 _ => return,
345 };
346
347 match id {
348 Some(id) => {
349 self.request(protocol, from, to, id_to_json(&id), method, None, params);
350 }
351 None => {
352 self.notification(protocol, from, to, method, None, params);
353 }
354 }
355 }
356 jsonrpcmsg::Message::Response(resp) => {
357 if let Some(id) = resp.id {
361 let id = id_to_json(&id);
362 if let Some(RequestDetails {
363 protocol: _,
364 method: _,
365 request_from,
366 request_to,
367 }) = self.request_details.remove(&id)
368 {
369 let (is_error, payload) = match (&resp.result, &resp.error) {
370 (Some(result), _) => (false, result.clone()),
371 (_, Some(error)) => {
372 (true, serde_json::to_value(error).unwrap_or_default())
373 }
374 (None, None) => (false, serde_json::Value::Null),
375 };
376 self.response(request_to, request_from, id, is_error, payload);
377 }
378 }
379 }
380 }
381 }
382
383 pub(crate) fn spawn(
388 mut self: TraceWriter,
389 ) -> (
390 TraceHandle,
391 impl std::future::Future<Output = Result<(), agent_client_protocol::Error>>,
392 ) {
393 use futures::StreamExt;
394
395 let (tx, mut rx) = futures::channel::mpsc::unbounded();
396
397 let future = async move {
398 while let Some(event) = rx.next().await {
399 self.trace_message(event);
400 }
401 Ok(())
402 };
403
404 (TraceHandle { tx }, future)
405 }
406}
407
408#[derive(Clone, Debug)]
412pub(crate) struct TraceHandle {
413 tx: futures::channel::mpsc::UnboundedSender<TracedMessage>,
414}
415
416impl TraceHandle {
417 fn trace_message(
419 &self,
420 component_index: ComponentIndex,
421 successor_index: ComponentIndex,
422 incoming: Incoming,
423 message: &jsonrpcmsg::Message,
424 ) -> Result<(), agent_client_protocol::Error> {
425 self.tx
426 .unbounded_send(TracedMessage {
427 component_index,
428 successor_index,
429 incoming,
430 message: message.clone(),
431 })
432 .map_err(agent_client_protocol::util::internal_error)
433 }
434
435 pub fn bridge_component<R: Role>(
450 &self,
451 proxy_index: ComponentIndex,
452 successor_index: ComponentIndex,
453 proxy: impl agent_client_protocol::ConnectTo<R>,
454 ) -> DynConnectTo<R> {
455 DynConnectTo::new(SnooperComponent::new(
456 proxy,
457 {
458 let trace_handle = self.clone();
459 move |msg| {
460 trace_handle.trace_message(proxy_index, successor_index, Incoming(true), msg)
461 }
462 },
463 {
464 let trace_handle = self.clone();
465 move |msg| {
466 trace_handle.trace_message(proxy_index, successor_index, Incoming(false), msg)
467 }
468 },
469 ))
470 }
471}
472
473fn id_to_json(id: &jsonrpcmsg::Id) -> serde_json::Value {
475 match id {
476 jsonrpcmsg::Id::String(s) => serde_json::Value::String(s.clone()),
477 jsonrpcmsg::Id::Number(n) => serde_json::Value::Number((*n).into()),
478 jsonrpcmsg::Id::Null => serde_json::Value::Null,
479 }
480}
481
482#[derive(Debug)]
485struct TracedMessage {
486 component_index: ComponentIndex,
487 successor_index: ComponentIndex,
488 incoming: Incoming,
489 message: jsonrpcmsg::Message,
490}
491
492#[derive(Debug)]
494struct MessageInfo {
495 successor: Successor,
496 id: Option<jsonrpcmsg::Id>,
497 protocol: Protocol,
498 method: String,
499 params: serde_json::Value,
500}
501
502#[derive(Copy, Clone, Debug)]
503struct Successor(bool);
504
505#[derive(Copy, Clone, Debug)]
506struct Incoming(bool);
507
508impl MessageInfo {
509 fn from_req(req: jsonrpcmsg::Request) -> Self {
518 let untyped = UntypedMessage::parse_message(&req.method, &req.params)
519 .expect("untyped message is infallible");
520 Self::from_untyped(Successor(false), req.id, Protocol::Acp, untyped)
521 }
522
523 fn from_untyped(
524 successor: Successor,
525 id: Option<jsonrpcmsg::Id>,
526 protocol: Protocol,
527 untyped: UntypedMessage,
528 ) -> Self {
529 if let Ok(m) = SuccessorMessage::parse_message(&untyped.method, &untyped.params) {
530 return Self::from_untyped(Successor(true), id, protocol, m.message);
531 }
532
533 if let Ok(m) = McpOverAcpMessage::parse_message(&untyped.method, &untyped.params) {
534 return Self::from_untyped(successor, id, Protocol::Mcp, m.message);
535 }
536
537 Self {
538 successor,
539 id,
540 protocol,
541 method: untyped.method,
542 params: untyped.params,
543 }
544 }
545}