agent_client_protocol/
stream_broadcast.rs

1//! Message streaming and broadcasting for debugging and monitoring RPC communication.
2//!
3//! This module provides functionality to observe the JSON-RPC message stream between
4//! clients and agents. It's primarily used for debugging, logging, and building
5//! development tools that need to monitor the protocol communication.
6//!
7//! The broadcasting system allows multiple receivers to observe the message stream
8//! without interfering with the actual communication.
9
10use std::sync::Arc;
11
12use anyhow::Result;
13use serde::Serialize;
14use serde_json::value::RawValue;
15
16use crate::{
17    Error,
18    rpc::{OutgoingMessage, ResponseResult, Side},
19};
20
21/// A message that flows through the RPC stream.
22///
23/// This represents any JSON-RPC message (request, response, or notification)
24/// along with its direction (incoming or outgoing).
25///
26/// Stream messages are used for observing and debugging the protocol communication
27/// without interfering with the actual message handling.
28#[derive(Debug, Clone)]
29pub struct StreamMessage {
30    /// The direction of the message relative to this side of the connection.
31    pub direction: StreamMessageDirection,
32    /// The actual content of the message.
33    pub message: StreamMessageContent,
34}
35
36/// The direction of a message in the RPC stream.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamMessageDirection {
39    /// A message received from the other side of the connection.
40    Incoming,
41    /// A message sent to the other side of the connection.
42    Outgoing,
43}
44
45/// The content of a stream message.
46///
47/// This enum represents the three types of JSON-RPC messages:
48/// - Requests: Method calls that expect a response
49/// - Responses: Replies to previous requests
50/// - Notifications: One-way messages that don't expect a response
51#[derive(Debug, Clone)]
52pub enum StreamMessageContent {
53    /// A JSON-RPC request message.
54    Request {
55        /// The unique identifier for this request.
56        id: i32,
57        /// The name of the method being called.
58        method: Arc<str>,
59        /// Optional parameters for the method.
60        params: Option<serde_json::Value>,
61    },
62    /// A JSON-RPC response message.
63    Response {
64        /// The ID of the request this response is for.
65        id: i32,
66        /// The result of the request (success or error).
67        result: Result<Option<serde_json::Value>, Error>,
68    },
69    /// A JSON-RPC notification message.
70    Notification {
71        /// The name of the notification method.
72        method: Arc<str>,
73        /// Optional parameters for the notification.
74        params: Option<serde_json::Value>,
75    },
76}
77
78/// A receiver for observing the message stream.
79///
80/// This allows you to receive copies of all messages flowing through the connection,
81/// useful for debugging, logging, or building development tools.
82///
83/// # Example
84///
85/// ```no_run
86/// use agent_client_protocol::{StreamReceiver, StreamMessageDirection};
87///
88/// async fn monitor_messages(mut receiver: StreamReceiver) {
89///     while let Ok(message) = receiver.recv().await {
90///         match message.direction {
91///             StreamMessageDirection::Incoming => println!("← Received: {:?}", message.message),
92///             StreamMessageDirection::Outgoing => println!("→ Sent: {:?}", message.message),
93///         }
94///     }
95/// }
96/// ```
97pub struct StreamReceiver(async_broadcast::Receiver<StreamMessage>);
98
99impl StreamReceiver {
100    /// Receives the next message from the stream.
101    ///
102    /// This method will wait until a message is available or the sender is dropped.
103    ///
104    /// # Returns
105    ///
106    /// - `Ok(StreamMessage)` when a message is received
107    /// - `Err` when the sender is dropped or the receiver is lagged
108    pub async fn recv(&mut self) -> Result<StreamMessage> {
109        Ok(self.0.recv().await?)
110    }
111}
112
113/// Internal sender for broadcasting stream messages.
114///
115/// This is used internally by the RPC system to broadcast messages to all receivers.
116/// You typically won't interact with this directly.
117pub(crate) struct StreamSender(async_broadcast::Sender<StreamMessage>);
118
119impl StreamSender {
120    /// Broadcasts an outgoing message to all receivers.
121    pub(crate) fn outgoing<L: Side, R: Side>(&self, message: &OutgoingMessage<L, R>) {
122        if self.0.receiver_count() == 0 {
123            return;
124        }
125
126        let message = StreamMessage {
127            direction: StreamMessageDirection::Outgoing,
128            message: match message {
129                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
130                    id: *id,
131                    method: (*method).into(),
132                    params: serde_json::to_value(params).ok(),
133                },
134                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
135                    id: *id,
136                    result: match result {
137                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
138                        ResponseResult::Error(error) => Err(error.clone()),
139                    },
140                },
141                OutgoingMessage::Notification { method, params } => {
142                    StreamMessageContent::Notification {
143                        method: (*method).into(),
144                        params: serde_json::to_value(params).ok(),
145                    }
146                }
147            },
148        };
149
150        self.0.try_broadcast(message).ok();
151    }
152
153    /// Broadcasts an incoming request to all receivers.
154    pub(crate) fn incoming_request(
155        &self,
156        id: i32,
157        method: impl Into<Arc<str>>,
158        params: &impl Serialize,
159    ) {
160        if self.0.receiver_count() == 0 {
161            return;
162        }
163
164        let message = StreamMessage {
165            direction: StreamMessageDirection::Incoming,
166            message: StreamMessageContent::Request {
167                id,
168                method: method.into(),
169                params: serde_json::to_value(params).ok(),
170            },
171        };
172
173        self.0.try_broadcast(message).ok();
174    }
175
176    /// Broadcasts an incoming response to all receivers.
177    pub(crate) fn incoming_response(&self, id: i32, result: Result<Option<&RawValue>, &Error>) {
178        if self.0.receiver_count() == 0 {
179            return;
180        }
181
182        let result = match result {
183            Ok(Some(value)) => Ok(serde_json::from_str(value.get()).ok()),
184            Ok(None) => Ok(None),
185            Err(err) => Err(err.clone()),
186        };
187
188        let message = StreamMessage {
189            direction: StreamMessageDirection::Incoming,
190            message: StreamMessageContent::Response { id, result },
191        };
192
193        self.0.try_broadcast(message).ok();
194    }
195
196    /// Broadcasts an incoming notification to all receivers.
197    pub(crate) fn incoming_notification(
198        &self,
199        method: impl Into<Arc<str>>,
200        params: &impl Serialize,
201    ) {
202        if self.0.receiver_count() == 0 {
203            return;
204        }
205
206        let message = StreamMessage {
207            direction: StreamMessageDirection::Incoming,
208            message: StreamMessageContent::Notification {
209                method: method.into(),
210                params: serde_json::to_value(params).ok(),
211            },
212        };
213
214        self.0.try_broadcast(message).ok();
215    }
216}
217
218/// A broadcast system for observing RPC message streams.
219///
220/// This is used internally by the RPC connection to allow multiple receivers
221/// to observe the message stream.
222pub(crate) struct StreamBroadcast {
223    receiver: async_broadcast::InactiveReceiver<StreamMessage>,
224}
225
226impl StreamBroadcast {
227    /// Creates a new broadcast system.
228    ///
229    /// Returns a sender for broadcasting messages and the broadcast instance
230    /// for creating receivers.
231    pub(crate) fn new() -> (StreamSender, Self) {
232        let (sender, receiver) = async_broadcast::broadcast(1);
233        (
234            StreamSender(sender),
235            Self {
236                receiver: receiver.deactivate(),
237            },
238        )
239    }
240
241    /// Creates a new receiver for observing the message stream.
242    ///
243    /// Each receiver will get its own copy of every message.
244    pub(crate) fn receiver(&self) -> StreamReceiver {
245        let was_empty = self.receiver.receiver_count() == 0;
246        let mut new_receiver = self.receiver.activate_cloned();
247        if was_empty {
248            // Grow capacity once we actually have a receiver
249            new_receiver.set_capacity(64);
250        }
251        StreamReceiver(new_receiver)
252    }
253}
254
255impl<Local: Side, Remote: Side> From<OutgoingMessage<Local, Remote>> for StreamMessage {
256    fn from(message: OutgoingMessage<Local, Remote>) -> Self {
257        Self {
258            direction: StreamMessageDirection::Outgoing,
259            message: match message {
260                OutgoingMessage::Request { id, method, params } => StreamMessageContent::Request {
261                    id,
262                    method: method.into(),
263                    params: serde_json::to_value(params).ok(),
264                },
265                OutgoingMessage::Response { id, result } => StreamMessageContent::Response {
266                    id,
267                    result: match result {
268                        ResponseResult::Result(value) => Ok(serde_json::to_value(value).ok()),
269                        ResponseResult::Error(error) => Err(error),
270                    },
271                },
272                OutgoingMessage::Notification { method, params } => {
273                    StreamMessageContent::Notification {
274                        method: method.into(),
275                        params: serde_json::to_value(params).ok(),
276                    }
277                }
278            },
279        }
280    }
281}