agent_client_protocol/
stream_broadcast.rs

1//! JSON-RPC Stream broadcasting for debugging and monitoring communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6
7use std::sync::Arc;
8
9use anyhow::Result;
10use serde::Serialize;
11use serde_json::value::RawValue;
12
13use crate::{
14    Error,
15    rpc::{OutgoingMessage, ResponseResult, Side},
16};
17
18/// A message that flows through the RPC stream.
19///
20/// This represents any JSON-RPC message (request, response, or notification)
21/// along with its direction (incoming or outgoing).
22///
23/// Stream messages are used for observing and debugging the protocol communication
24/// without interfering with the actual message handling.
25#[derive(Debug, Clone)]
26pub struct StreamMessage {
27    /// The direction of the message relative to this side of the connection.
28    pub direction: StreamMessageDirection,
29    /// The actual content of the message.
30    pub message: StreamMessageContent,
31}
32
33/// The direction of a message in the RPC stream.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum StreamMessageDirection {
36    /// A message received from the other side of the connection.
37    Incoming,
38    /// A message sent to the other side of the connection.
39    Outgoing,
40}
41
42/// The content of a stream message.
43///
44/// This enum represents the three types of JSON-RPC messages:
45/// - Requests: Method calls that expect a response
46/// - Responses: Replies to previous requests
47/// - Notifications: One-way messages that don't expect a response
48#[derive(Debug, Clone)]
49pub enum StreamMessageContent {
50    /// A JSON-RPC request message.
51    Request {
52        /// The unique identifier for this request.
53        id: i32,
54        /// The name of the method being called.
55        method: Arc<str>,
56        /// Optional parameters for the method.
57        params: Option<serde_json::Value>,
58    },
59    /// A JSON-RPC response message.
60    Response {
61        /// The ID of the request this response is for.
62        id: i32,
63        /// The result of the request (success or error).
64        result: Result<Option<serde_json::Value>, Error>,
65    },
66    /// A JSON-RPC notification message.
67    Notification {
68        /// The name of the notification method.
69        method: Arc<str>,
70        /// Optional parameters for the notification.
71        params: Option<serde_json::Value>,
72    },
73}
74
75/// A receiver for observing the message stream.
76///
77/// This allows you to receive copies of all messages flowing through the connection,
78/// useful for debugging, logging, or building development tools.
79///
80/// # Example
81///
82/// ```no_run
83/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
84///
85/// async fn monitor_messages(mut receiver: StreamReceiver) {
86///     while let Ok(message) = receiver.recv().await {
87///         match message.direction {
88///             StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
89///             StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
90///         }
91///     }
92/// }
93/// ```
94pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
95
96impl StreamReceiver {
97    /// Receives the next message from the stream.
98    ///
99    /// This method will wait until a message is available or the sender is dropped.
100    ///
101    /// # Returns
102    ///
103    /// - `Ok(StreamMessage)` when a message is received
104    /// - `Err` when the sender is dropped or the receiver is lagged
105    pub async fn recv(&mut self) -> Result<StreamMessage> {
106        Ok(self.0.recv().await?)
107    }
108}
109
110/// Internal sender for broadcasting stream messages.
111///
112/// This is used internally by the RPC system to broadcast messages to all receivers.
113/// You typically won't interact with this directly.
114pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
115
116impl StreamSender {
117    /// Broadcasts an outgoing message to all receivers.
118    pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
119        if self.0.receiver_count() == 0 {
120            return;
121        }
122
123        let message = StreamMessage {
124            direction: StreamMessageDirection::Outgoing,
125            message: match message {
126                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
127                    id: *id,
128                    method: method.clone(),
129                    params: serde_json::to_value(params).ok(),
130                },
131                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
132                    id: *id,
133                    result: match result {
134                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
135                        ResponseResult::Error(error) => Err(error.clone()),
136                    },
137                },
138                OutgoingMessage::Notification { method, params } => {
139                    StreamMessageContent::Notification {
140                        method: method.clone(),
141                        params: serde_json::to_value(params).ok(),
142                    }
143                }
144            },
145        };
146
147        self.0.try_broadcast(message).ok();
148    }
149
150    /// Broadcasts an incoming request to all receivers.
151    pub(crate) fn incoming_request(
152        &self,
153        id: i32,
154        method: impl Into<Arc<str>>,
155        params: &impl Serialize,
156    ) {
157        if self.0.receiver_count() == 0 {
158            return;
159        }
160
161        let message = StreamMessage {
162            direction: StreamMessageDirection::Incoming,
163            message: StreamMessageContent::Request {
164                id,
165                method: method.into(),
166                params: serde_json::to_value(params).ok(),
167            },
168        };
169
170        self.0.try_broadcast(message).ok();
171    }
172
173    /// Broadcasts an incoming response to all receivers.
174    pub(crate) fn incoming_response(&self, id: i32, result: Result<Option<&RawValue>, &Error>) {
175        if self.0.receiver_count() == 0 {
176            return;
177        }
178
179        let result = match result {
180            Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
181            Ok(None) => Ok(None),
182            Err(err) => Err(err.clone()),
183        };
184
185        let message = StreamMessage {
186            direction: StreamMessageDirection::Incoming,
187            message: StreamMessageContent::Response { id, result },
188        };
189
190        self.0.try_broadcast(message).ok();
191    }
192
193    /// Broadcasts an incoming notification to all receivers.
194    pub(crate) fn incoming_notification(
195        &self,
196        method: impl Into<Arc<str>>,
197        params: &impl Serialize,
198    ) {
199        if self.0.receiver_count() == 0 {
200            return;
201        }
202
203        let message = StreamMessage {
204            direction: StreamMessageDirection::Incoming,
205            message: StreamMessageContent::Notification {
206                method: method.into(),
207                params: serde_json::to_value(params).ok(),
208            },
209        };
210
211        self.0.try_broadcast(message).ok();
212    }
213}
214
215/// A broadcast for observing RPC message streams.
216///
217/// This is used internally by the RPC connection to allow multiple receivers
218/// to observe the message stream.
219pub(crate) struct StreamBroadcast {
220    receiver: async_broadcast::InactiveReceiver<StreamMessage>,
221}
222
223impl StreamBroadcast {
224    /// Creates a new broadcast.
225    ///
226    /// Returns a sender for broadcasting messages and the broadcast instance
227    /// for creating receivers.
228    pub(crate) fn new() -> (StreamSender, Self) {
229        let (sender, receiver) = async_broadcast::broadcast(1);
230        (
231            StreamSender(sender),
232            Self {
233                receiver: receiver.deactivate(),
234            },
235        )
236    }
237
238    /// Creates a new receiver for observing the message stream.
239    ///
240    /// Each receiver will get its own copy of every message.
241    pub(crate) fn receiver(&self) -> StreamReceiver {
242        let was_empty = self.receiver.receiver_count() == 0;
243        let mut new_receiver = self.receiver.activate_cloned();
244        if was_empty {
245            // Grow capacity once we actually have a receiver
246            new_receiver.set_capacity(64);
247        }
248        StreamReceiver(new_receiver)
249    }
250}
251
252impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
253    fn from(message: OutgoingMessage<Local, Remote>) -> Self {
254        Self {
255            direction: StreamMessageDirection::Outgoing,
256            message: match message {
257                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
258                    id,
259                    method,
260                    params: serde_json::to_value(params).ok(),
261                },
262                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
263                    id,
264                    result: match result {
265                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
266                        ResponseResult::Error(error) => Err(error),
267                    },
268                },
269                OutgoingMessage::Notification { method, params } => {
270                    StreamMessageContent::Notification {
271                        method,
272                        params: serde_json::to_value(params).ok(),
273                    }
274                }
275            },
276        }
277    }
278}