Skip to main content

agy_bridge/streaming/
writer.rs

1//! The sending/writing side of the streaming channel pair.
2
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::mpsc;
6
7use super::types::{
8    ChatResponseSharedState, ResponseEvent, StreamChunk, StreamError, ToolCallEvent,
9};
10use crate::types::Step;
11
12/// Error returned when sending to a [`ChatResponseWriter`] channel fails.
13///
14/// This wraps the underlying channel error to avoid leaking the
15/// `tokio::sync::mpsc::error::SendError<T>` generic into the public API.
16///
17/// # Example
18///
19/// ```
20/// use agy_bridge::streaming::WriterError;
21///
22/// let err = WriterError::new("receiver dropped");
23/// assert_eq!(err.to_string(), "receiver dropped");
24/// ```
25#[derive(Debug)]
26pub struct WriterError {
27    /// Human-readable description of the failure.
28    pub message: String,
29}
30
31impl WriterError {
32    /// Create a new writer error.
33    pub fn new(message: impl Into<String>) -> Self {
34        Self {
35            message: message.into(),
36        }
37    }
38}
39
40impl std::fmt::Display for WriterError {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.message)
43    }
44}
45
46impl std::error::Error for WriterError {}
47
48impl<T> From<mpsc::error::SendError<T>> for WriterError {
49    fn from(err: mpsc::error::SendError<T>) -> Self {
50        Self {
51            message: format!("channel send failed: {err}"),
52        }
53    }
54}
55
56/// The sending side of a [`ChatResponseHandle`](super::handle::ChatResponseHandle),
57/// held by the Python bridge thread that drives the SDK's async iterator.
58pub struct ChatResponseWriter {
59    /// Sends text tokens.
60    pub(crate) text_tx: mpsc::Sender<String>,
61    /// Sends thinking tokens.
62    pub(crate) thought_tx: mpsc::Sender<String>,
63    /// Sends tool call events.
64    pub(crate) tool_call_tx: mpsc::Sender<ToolCallEvent>,
65    /// Sends a stream error (at most one).
66    pub(crate) error_tx: mpsc::Sender<StreamError>,
67    /// Sends ordered [`ResponseEvent`]s for the resolve timeline.
68    pub(crate) event_tx: mpsc::Sender<ResponseEvent>,
69    /// Sends [`Step`] objects as they are produced.
70    ///
71    /// The sender must be held to keep the channel alive for
72    /// [`ChatResponseHandle::take_step_stream()`](super::handle::ChatResponseHandle::take_step_stream).
73    /// It will be actively written once step-level streaming is wired through
74    /// the command loop.
75    pub(crate) step_tx: mpsc::Sender<Step>,
76    /// Sends unified [`StreamChunk`]s.
77    pub(crate) chunk_tx: mpsc::Sender<StreamChunk>,
78    /// Shared state to send metadata updates back to the handle.
79    pub(crate) shared_state: Arc<Mutex<ChatResponseSharedState>>,
80}
81
82impl ChatResponseWriter {
83    /// Send a text token.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`WriterError`] if the receiver has been dropped.
88    pub async fn send_text(&self, text: String) -> Result<(), WriterError> {
89        self.text_tx.send(text).await.map_err(WriterError::from)
90    }
91
92    /// Send a thinking token.
93    ///
94    /// # Errors
95    ///
96    /// Returns [`WriterError`] if the receiver has been dropped.
97    pub async fn send_thought(&self, thought: String) -> Result<(), WriterError> {
98        self.thought_tx
99            .send(thought)
100            .await
101            .map_err(WriterError::from)
102    }
103
104    /// Send a tool call event.
105    ///
106    /// # Errors
107    ///
108    /// Returns [`WriterError`] if the receiver has been dropped.
109    pub async fn send_tool_call(&self, event: ToolCallEvent) -> Result<(), WriterError> {
110        self.tool_call_tx
111            .send(event)
112            .await
113            .map_err(WriterError::from)
114    }
115
116    /// Send an error.
117    ///
118    /// # Errors
119    ///
120    /// Returns [`WriterError`] if the receiver has been dropped.
121    pub async fn send_error(&self, error: StreamError) -> Result<(), WriterError> {
122        self.error_tx.send(error).await.map_err(WriterError::from)
123    }
124
125    /// Send a response event.
126    ///
127    /// # Errors
128    ///
129    /// Returns [`WriterError`] if the receiver has been dropped.
130    pub async fn send_event(&self, event: ResponseEvent) -> Result<(), WriterError> {
131        self.event_tx.send(event).await.map_err(WriterError::from)
132    }
133
134    /// Send a step.
135    ///
136    /// # Errors
137    ///
138    /// Returns [`WriterError`] if the receiver has been dropped.
139    pub async fn send_step(&self, step: crate::types::Step) -> Result<(), WriterError> {
140        self.step_tx.send(step).await.map_err(WriterError::from)
141    }
142
143    /// Send a unified stream chunk.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`WriterError`] if the receiver has been dropped.
148    pub async fn send_chunk(&self, chunk: StreamChunk) -> Result<(), WriterError> {
149        self.chunk_tx.send(chunk).await.map_err(WriterError::from)
150    }
151
152    /// Store usage metadata in the shared state so the handle can read it
153    /// after the stream completes.
154    pub fn set_usage(&self, usage: crate::types::UsageMetadata) {
155        match self.shared_state.lock() {
156            Ok(mut state) => {
157                state.usage = Some(usage);
158            }
159            Err(e) => {
160                tracing::error!(
161                    error = %e,
162                    "ChatResponseWriter shared_state mutex poisoned in set_usage"
163                );
164            }
165        }
166    }
167
168    /// Store structured output in the shared state so the handle can read it
169    /// after the stream completes.
170    pub fn set_structured_output(&self, value: serde_json::Value) {
171        match self.shared_state.lock() {
172            Ok(mut state) => {
173                state.structured_output = Some(value);
174            }
175            Err(e) => {
176                tracing::error!(
177                    error = %e,
178                    "ChatResponseWriter shared_state mutex poisoned in set_structured_output"
179                );
180            }
181        }
182    }
183}