agy_bridge/streaming/handle.rs
1//! The receiving/reading side of the streaming channel pair.
2
3use std::sync::{Arc, Mutex};
4
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7
8use super::types::{
9 ChatResponseSharedState, ChatResult, ERROR_DRAIN_TIMEOUT, ResponseEvent, StreamChunk,
10 StreamError, StreamReceivers, ToolCallEvent,
11};
12use crate::types::{Step, UsageMetadata};
13
14/// Handle to a streaming chat response.
15///
16/// Created by [`AgentHandle::chat()`](crate::agent::AgentHandle::chat). Provides
17/// independent channels for text tokens, thinking tokens, and tool-call events.
18///
19/// Each stream accessor can only be called once — subsequent calls return `None`
20/// because the underlying receiver has already been taken.
21#[derive(Debug)]
22pub struct ChatResponseHandle {
23 /// All per-stream receivers, grouped for clarity.
24 pub(super) rx: StreamReceivers,
25 /// Token usage metadata, populated after the stream completes.
26 pub(super) usage: Option<UsageMetadata>,
27 /// Structured output from a `response_schema`-configured agent.
28 pub(super) structured_output_value: Option<serde_json::Value>,
29 /// Shared state to receive metadata updates from the python bridge thread.
30 pub(crate) shared_state: Arc<Mutex<ChatResponseSharedState>>,
31}
32
33impl ChatResponseHandle {
34 /// Take the text token receiver for token-by-token streaming.
35 ///
36 /// Returns `None` if the receiver was already taken.
37 pub const fn take_text_stream(&mut self) -> Option<mpsc::Receiver<String>> {
38 self.rx.text.take()
39 }
40
41 /// Take the thinking token receiver.
42 ///
43 /// Returns `None` if the receiver was already taken.
44 pub const fn take_thought_stream(&mut self) -> Option<mpsc::Receiver<String>> {
45 self.rx.thought.take()
46 }
47
48 /// Take the tool call event receiver.
49 ///
50 /// Returns `None` if the receiver was already taken.
51 pub const fn take_tool_call_stream(&mut self) -> Option<mpsc::Receiver<ToolCallEvent>> {
52 self.rx.tool_call.take()
53 }
54
55 /// Take the raw step receiver.
56 ///
57 /// Returns `None` if the receiver was already taken.
58 /// Prefer [`receive_steps()`](Self::receive_steps) for `StreamExt`-compatible usage.
59 pub const fn take_step_stream(&mut self) -> Option<mpsc::Receiver<Step>> {
60 self.rx.step.take()
61 }
62
63 /// Take the step stream for consuming with `StreamExt::next()`.
64 ///
65 /// Returns `None` if the stream was already taken.
66 ///
67 /// # Example
68 ///
69 /// ```
70 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
71 /// use agy_bridge::streaming;
72 /// use tokio_stream::StreamExt;
73 ///
74 /// let (_writer, mut handle) = streaming::channel();
75 /// drop(_writer); // close the channel so the stream ends
76 /// let mut steps = handle.receive_steps().unwrap();
77 /// while let Some(step) = steps.next().await {
78 /// println!("step: {:?}", step.step_type);
79 /// }
80 /// # });
81 pub fn receive_steps(&mut self) -> Option<impl tokio_stream::Stream<Item = Step>> {
82 self.rx.step.take().map(ReceiverStream::new)
83 }
84
85 /// Take the unified chunk stream for consuming with `StreamExt::next()`.
86 ///
87 /// Returns `None` if the stream was already taken.
88 ///
89 /// # Example
90 ///
91 /// ```
92 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
93 /// use agy_bridge::streaming::{self, StreamChunk};
94 /// use tokio_stream::StreamExt;
95 ///
96 /// let (_writer, mut handle) = streaming::channel();
97 /// drop(_writer); // close the channel so the stream ends
98 /// let mut chunks = handle.receive_chunks().unwrap();
99 /// while let Some(chunk) = chunks.next().await {
100 /// match chunk {
101 /// StreamChunk::Text(t) => print!("{t}"),
102 /// StreamChunk::Thought(t) => eprintln!("thought: {t}"),
103 /// StreamChunk::ToolCall(tc) => eprintln!("tool: {}", tc.name),
104 /// _ => {}
105 /// }
106 /// }
107 /// # });
108 pub fn receive_chunks(&mut self) -> Option<impl tokio_stream::Stream<Item = StreamChunk>> {
109 self.rx.chunk.take().map(ReceiverStream::new)
110 }
111
112 /// Drain the text stream and return the complete response text.
113 ///
114 /// Consumes the handle — use the `take_*` methods instead if you need
115 /// to keep streaming individual channels.
116 ///
117 /// # Errors
118 ///
119 /// Returns a [`StreamError`] if the Python side reported an error.
120 pub async fn text(mut self) -> Result<ChatResult, StreamError> {
121 let mut buf = String::new();
122
123 if let Some(mut rx) = self.rx.text.take() {
124 while let Some(token) = rx.recv().await {
125 buf.push_str(&token);
126 }
127 }
128
129 // Check for errors. Use a brief timeout rather than try_recv() to
130 // catch errors that are sent just after the text channel closes.
131 if let Some(mut err_rx) = self.rx.error.take()
132 && let Ok(Some(err)) = tokio::time::timeout(ERROR_DRAIN_TIMEOUT, err_rx.recv()).await
133 {
134 return Err(err);
135 }
136
137 self.finalize();
138
139 Ok(ChatResult {
140 text: buf,
141 usage: self.usage,
142 structured_output: self.structured_output_value,
143 })
144 }
145
146 /// Finalize the response handle by pulling usage and structured output
147 /// from the shared state. Called after the stream has been fully drained.
148 pub fn finalize(&mut self) {
149 if let Ok(state) = self.shared_state.lock() {
150 self.usage = state.usage.clone();
151 self.structured_output_value = state.structured_output.clone();
152 } else {
153 tracing::error!(
154 "ChatResponseHandle shared_state mutex poisoned during finalize — \
155 usage and structured_output will be unavailable"
156 );
157 }
158 }
159
160 /// Return the structured output, if available.
161 ///
162 /// Only populated when the agent was configured with a `response_schema`
163 /// and the model returned a valid JSON payload.
164 #[must_use]
165 pub const fn structured_output(&self) -> Option<&serde_json::Value> {
166 self.structured_output_value.as_ref()
167 }
168
169 /// Return the token usage metadata, if available.
170 ///
171 /// Populated after [`finalize()`](Self::finalize) or [`text()`](Self::text).
172 #[must_use]
173 pub const fn usage_metadata(&self) -> Option<&UsageMetadata> {
174 self.usage.as_ref()
175 }
176
177 /// Return a reference-counted handle to the shared state.
178 ///
179 /// This allows callers to clone the `Arc` **before** consuming the handle
180 /// via [`text()`](Self::text) or [`resolve()`](Self::resolve), and then
181 /// read usage metadata / structured output from the shared state
182 /// afterwards.
183 #[doc(hidden)]
184 #[must_use]
185 pub fn shared_state(&self) -> Arc<Mutex<ChatResponseSharedState>> {
186 Arc::clone(&self.shared_state)
187 }
188
189 /// Drain all events and return them as an ordered timeline.
190 ///
191 /// Consumes the handle — use the `take_*` methods instead if you need
192 /// to keep streaming individual channels.
193 pub async fn resolve(mut self) -> Vec<ResponseEvent> {
194 let mut events = Vec::new();
195 if let Some(mut rx) = self.rx.event.take() {
196 while let Some(event) = rx.recv().await {
197 events.push(event);
198 }
199 }
200 self.finalize();
201 events
202 }
203}