agy_bridge/streaming/writer.rs
1//! The sending/writing side of the streaming channel pair.
2
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::mpsc;
6
7use super::types::{
8 ChatResponseSharedState, ResponseEvent, StreamChunk, StreamError, ToolCallEvent,
9};
10use crate::types::Step;
11
12/// Error returned when sending to a [`ChatResponseWriter`] channel fails.
13///
14/// This wraps the underlying channel error to avoid leaking the
15/// `tokio::sync::mpsc::error::SendError<T>` generic into the public API.
16///
17/// # Example
18///
19/// ```
20/// use agy_bridge::streaming::WriterError;
21///
22/// let err = WriterError::new("receiver dropped");
23/// assert_eq!(err.to_string(), "receiver dropped");
24/// ```
25#[derive(Debug)]
26pub struct WriterError {
27 /// Human-readable description of the failure.
28 pub message: String,
29}
30
31impl WriterError {
32 /// Create a new writer error.
33 pub fn new(message: impl Into<String>) -> Self {
34 Self {
35 message: message.into(),
36 }
37 }
38}
39
40impl std::fmt::Display for WriterError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.message)
43 }
44}
45
46impl std::error::Error for WriterError {}
47
48impl<T> From<mpsc::error::SendError<T>> for WriterError {
49 fn from(err: mpsc::error::SendError<T>) -> Self {
50 Self {
51 message: format!("channel send failed: {err}"),
52 }
53 }
54}
55
56/// The sending side of a [`ChatResponseHandle`](super::handle::ChatResponseHandle),
57/// held by the Python bridge thread that drives the SDK's async iterator.
58pub struct ChatResponseWriter {
59 /// Sends text tokens.
60 pub(crate) text_tx: mpsc::Sender<String>,
61 /// Sends thinking tokens.
62 pub(crate) thought_tx: mpsc::Sender<String>,
63 /// Sends tool call events.
64 pub(crate) tool_call_tx: mpsc::Sender<ToolCallEvent>,
65 /// Sends a stream error (at most one).
66 pub(crate) error_tx: mpsc::Sender<StreamError>,
67 /// Sends ordered [`ResponseEvent`]s for the resolve timeline.
68 pub(crate) event_tx: mpsc::Sender<ResponseEvent>,
69 /// Sends [`Step`] objects as they are produced.
70 ///
71 /// The sender must be held to keep the channel alive for
72 /// [`ChatResponseHandle::take_step_stream()`](super::handle::ChatResponseHandle::take_step_stream).
73 /// It will be actively written once step-level streaming is wired through
74 /// the command loop.
75 pub(crate) step_tx: mpsc::Sender<Step>,
76 /// Sends unified [`StreamChunk`]s.
77 pub(crate) chunk_tx: mpsc::Sender<StreamChunk>,
78 /// Shared state to send metadata updates back to the handle.
79 pub(crate) shared_state: Arc<Mutex<ChatResponseSharedState>>,
80}
81
82impl ChatResponseWriter {
83 /// Send a text token.
84 ///
85 /// # Errors
86 ///
87 /// Returns [`WriterError`] if the receiver has been dropped.
88 pub async fn send_text(&self, text: String) -> Result<(), WriterError> {
89 self.text_tx.send(text).await.map_err(WriterError::from)
90 }
91
92 /// Send a thinking token.
93 ///
94 /// # Errors
95 ///
96 /// Returns [`WriterError`] if the receiver has been dropped.
97 pub async fn send_thought(&self, thought: String) -> Result<(), WriterError> {
98 self.thought_tx
99 .send(thought)
100 .await
101 .map_err(WriterError::from)
102 }
103
104 /// Send a tool call event.
105 ///
106 /// # Errors
107 ///
108 /// Returns [`WriterError`] if the receiver has been dropped.
109 pub async fn send_tool_call(&self, event: ToolCallEvent) -> Result<(), WriterError> {
110 self.tool_call_tx
111 .send(event)
112 .await
113 .map_err(WriterError::from)
114 }
115
116 /// Send an error.
117 ///
118 /// # Errors
119 ///
120 /// Returns [`WriterError`] if the receiver has been dropped.
121 pub async fn send_error(&self, error: StreamError) -> Result<(), WriterError> {
122 self.error_tx.send(error).await.map_err(WriterError::from)
123 }
124
125 /// Send a response event.
126 ///
127 /// # Errors
128 ///
129 /// Returns [`WriterError`] if the receiver has been dropped.
130 pub async fn send_event(&self, event: ResponseEvent) -> Result<(), WriterError> {
131 self.event_tx.send(event).await.map_err(WriterError::from)
132 }
133
134 /// Send a step.
135 ///
136 /// # Errors
137 ///
138 /// Returns [`WriterError`] if the receiver has been dropped.
139 pub async fn send_step(&self, step: crate::types::Step) -> Result<(), WriterError> {
140 self.step_tx.send(step).await.map_err(WriterError::from)
141 }
142
143 /// Send a unified stream chunk.
144 ///
145 /// # Errors
146 ///
147 /// Returns [`WriterError`] if the receiver has been dropped.
148 pub async fn send_chunk(&self, chunk: StreamChunk) -> Result<(), WriterError> {
149 self.chunk_tx.send(chunk).await.map_err(WriterError::from)
150 }
151
152 /// Store usage metadata in the shared state so the handle can read it
153 /// after the stream completes.
154 pub fn set_usage(&self, usage: crate::types::UsageMetadata) {
155 match self.shared_state.lock() {
156 Ok(mut state) => {
157 state.usage = Some(usage);
158 }
159 Err(e) => {
160 tracing::error!(
161 error = %e,
162 "ChatResponseWriter shared_state mutex poisoned in set_usage"
163 );
164 }
165 }
166 }
167
168 /// Store structured output in the shared state so the handle can read it
169 /// after the stream completes.
170 pub fn set_structured_output(&self, value: serde_json::Value) {
171 match self.shared_state.lock() {
172 Ok(mut state) => {
173 state.structured_output = Some(value);
174 }
175 Err(e) => {
176 tracing::error!(
177 error = %e,
178 "ChatResponseWriter shared_state mutex poisoned in set_structured_output"
179 );
180 }
181 }
182 }
183}