Skip to main content

agy_bridge/streaming/
handle.rs

1//! The receiving/reading side of the streaming channel pair.
2
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7
8use super::types::{
9    ChatResponseSharedState, ChatResult, ERROR_DRAIN_TIMEOUT, ResponseEvent, StreamChunk,
10    StreamError, StreamReceivers, ToolCallEvent,
11};
12use crate::types::{Step, UsageMetadata};
13
14/// Handle to a streaming chat response.
15///
16/// Created by [`AgentHandle::chat()`](crate::agent::AgentHandle::chat). Provides
17/// independent channels for text tokens, thinking tokens, and tool-call events.
18///
19/// Each stream accessor can only be called once — subsequent calls return `None`
20/// because the underlying receiver has already been taken.
21#[derive(Debug)]
22pub struct ChatResponseHandle {
23    /// All per-stream receivers, grouped for clarity.
24    pub(super) rx: StreamReceivers,
25    /// Token usage metadata, populated after the stream completes.
26    pub(super) usage: Option<UsageMetadata>,
27    /// Structured output from a `response_schema`-configured agent.
28    pub(super) structured_output_value: Option<serde_json::Value>,
29    /// Shared state to receive metadata updates from the python bridge thread.
30    pub(crate) shared_state: Arc<Mutex<ChatResponseSharedState>>,
31}
32
33impl ChatResponseHandle {
34    /// Take the text token receiver for token-by-token streaming.
35    ///
36    /// Returns `None` if the receiver was already taken.
37    pub const fn take_text_stream(&mut self) -> Option<mpsc::Receiver<String>> {
38        self.rx.text.take()
39    }
40
41    /// Take the thinking token receiver.
42    ///
43    /// Returns `None` if the receiver was already taken.
44    pub const fn take_thought_stream(&mut self) -> Option<mpsc::Receiver<String>> {
45        self.rx.thought.take()
46    }
47
48    /// Take the tool call event receiver.
49    ///
50    /// Returns `None` if the receiver was already taken.
51    pub const fn take_tool_call_stream(&mut self) -> Option<mpsc::Receiver<ToolCallEvent>> {
52        self.rx.tool_call.take()
53    }
54
55    /// Take the raw step receiver.
56    ///
57    /// Returns `None` if the receiver was already taken.
58    /// Prefer [`receive_steps()`](Self::receive_steps) for `StreamExt`-compatible usage.
59    pub const fn take_step_stream(&mut self) -> Option<mpsc::Receiver<Step>> {
60        self.rx.step.take()
61    }
62
63    /// Take the step stream for consuming with `StreamExt::next()`.
64    ///
65    /// Returns `None` if the stream was already taken.
66    ///
67    /// # Example
68    ///
69    /// ```
70    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
71    /// use agy_bridge::streaming;
72    /// use tokio_stream::StreamExt;
73    ///
74    /// let (_writer, mut handle) = streaming::channel();
75    /// drop(_writer); // close the channel so the stream ends
76    /// let mut steps = handle.receive_steps().unwrap();
77    /// while let Some(step) = steps.next().await {
78    ///     println!("step: {:?}", step.step_type);
79    /// }
80    /// # });
81    pub fn receive_steps(&mut self) -> Option<impl tokio_stream::Stream<Item = Step>> {
82        self.rx.step.take().map(ReceiverStream::new)
83    }
84
85    /// Take the unified chunk stream for consuming with `StreamExt::next()`.
86    ///
87    /// Returns `None` if the stream was already taken.
88    ///
89    /// # Example
90    ///
91    /// ```
92    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
93    /// use agy_bridge::streaming::{self, StreamChunk};
94    /// use tokio_stream::StreamExt;
95    ///
96    /// let (_writer, mut handle) = streaming::channel();
97    /// drop(_writer); // close the channel so the stream ends
98    /// let mut chunks = handle.receive_chunks().unwrap();
99    /// while let Some(chunk) = chunks.next().await {
100    ///     match chunk {
101    ///         StreamChunk::Text(t) => print!("{t}"),
102    ///         StreamChunk::Thought(t) => eprintln!("thought: {t}"),
103    ///         StreamChunk::ToolCall(tc) => eprintln!("tool: {}", tc.name),
104    ///         _ => {}
105    ///     }
106    /// }
107    /// # });
108    pub fn receive_chunks(&mut self) -> Option<impl tokio_stream::Stream<Item = StreamChunk>> {
109        self.rx.chunk.take().map(ReceiverStream::new)
110    }
111
112    /// Drain the text stream and return the complete response text.
113    ///
114    /// Consumes the handle — use the `take_*` methods instead if you need
115    /// to keep streaming individual channels.
116    ///
117    /// # Errors
118    ///
119    /// Returns a [`StreamError`] if the Python side reported an error.
120    pub async fn text(mut self) -> Result<ChatResult, StreamError> {
121        let mut buf = String::new();
122
123        if let Some(mut rx) = self.rx.text.take() {
124            while let Some(token) = rx.recv().await {
125                buf.push_str(&token);
126            }
127        }
128
129        // Check for errors. Use a brief timeout rather than try_recv() to
130        // catch errors that are sent just after the text channel closes.
131        if let Some(mut err_rx) = self.rx.error.take()
132            && let Ok(Some(err)) = tokio::time::timeout(ERROR_DRAIN_TIMEOUT, err_rx.recv()).await
133        {
134            return Err(err);
135        }
136
137        self.finalize();
138
139        Ok(ChatResult {
140            text: buf,
141            usage: self.usage,
142            structured_output: self.structured_output_value,
143        })
144    }
145
146    /// Finalize the response handle by pulling usage and structured output
147    /// from the shared state. Called after the stream has been fully drained.
148    pub fn finalize(&mut self) {
149        if let Ok(state) = self.shared_state.lock() {
150            self.usage = state.usage.clone();
151            self.structured_output_value = state.structured_output.clone();
152        } else {
153            tracing::error!(
154                "ChatResponseHandle shared_state mutex poisoned during finalize — \
155                 usage and structured_output will be unavailable"
156            );
157        }
158    }
159
160    /// Return the structured output, if available.
161    ///
162    /// Only populated when the agent was configured with a `response_schema`
163    /// and the model returned a valid JSON payload.
164    #[must_use]
165    pub const fn structured_output(&self) -> Option<&serde_json::Value> {
166        self.structured_output_value.as_ref()
167    }
168
169    /// Return the token usage metadata, if available.
170    ///
171    /// Populated after [`finalize()`](Self::finalize) or [`text()`](Self::text).
172    #[must_use]
173    pub const fn usage_metadata(&self) -> Option<&UsageMetadata> {
174        self.usage.as_ref()
175    }
176
177    /// Return a reference-counted handle to the shared state.
178    ///
179    /// This allows callers to clone the `Arc` **before** consuming the handle
180    /// via [`text()`](Self::text) or [`resolve()`](Self::resolve), and then
181    /// read usage metadata / structured output from the shared state
182    /// afterwards.
183    #[doc(hidden)]
184    #[must_use]
185    pub fn shared_state(&self) -> Arc<Mutex<ChatResponseSharedState>> {
186        Arc::clone(&self.shared_state)
187    }
188
189    /// Drain all events and return them as an ordered timeline.
190    ///
191    /// Consumes the handle — use the `take_*` methods instead if you need
192    /// to keep streaming individual channels.
193    pub async fn resolve(mut self) -> Vec<ResponseEvent> {
194        let mut events = Vec::new();
195        if let Some(mut rx) = self.rx.event.take() {
196            while let Some(event) = rx.recv().await {
197                events.push(event);
198            }
199        }
200        self.finalize();
201        events
202    }
203}