agy_bridge/streaming/types.rs
1//! Streaming types: chunks, events, errors, and shared state.
2
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7
8use crate::types::{Step, UsageMetadata};
9
10/// The result of draining a chat response via [`ChatResponseHandle::text()`].
11///
12/// Carries the full response text alongside optional metadata (token usage,
13/// structured output). Dereferences to `&str` for ergonomic use:
14///
15/// ```rust
16/// # #[tokio::main]
17/// # async fn main() -> Result<(), agy_bridge::error::Error> {
18/// # agy_bridge::load_dotenv();
19/// # let bridge = agy_bridge::AgyBridge::builder().build()?;
20/// # let agent = bridge.agent(
21/// # agy_bridge::config::AgentConfig::builder()
22/// # .system_instructions("Reply with 'Hello!' and nothing else. Never use tools.")
23/// # .capabilities(agy_bridge::config::CapabilitiesConfig::custom_tools_only())
24/// # .build()
25/// # ).await?;
26/// let result = agent
27/// .chat("Reply with 'Hello!' and nothing else.")
28/// .await?
29/// .text()
30/// .await?;
31/// println!("{result}"); // prints text
32/// if let Some(usage) = result.usage() { /* access metadata */ }
33/// # agent.shutdown().await?;
34/// # Ok(())
35/// # }
36/// ```
37#[derive(Debug, Clone)]
38pub struct ChatResult {
39 pub(super) text: String,
40 pub(super) usage: Option<UsageMetadata>,
41 pub(super) structured_output: Option<serde_json::Value>,
42}
43
44impl ChatResult {
45 /// The full response text.
46 #[must_use]
47 pub fn text(&self) -> &str {
48 &self.text
49 }
50
51 /// Consume the result and return the inner `String`.
52 #[must_use]
53 pub fn into_string(self) -> String {
54 self.text
55 }
56
57 /// Token usage metadata, if available.
58 #[must_use]
59 pub fn usage(&self) -> Option<&UsageMetadata> {
60 self.usage.as_ref()
61 }
62
63 /// Structured output (JSON), if the agent was configured with a
64 /// `response_schema` and the model returned valid JSON.
65 #[must_use]
66 pub fn structured_output(&self) -> Option<&serde_json::Value> {
67 self.structured_output.as_ref()
68 }
69}
70
71impl std::ops::Deref for ChatResult {
72 type Target = str;
73 fn deref(&self) -> &str {
74 &self.text
75 }
76}
77
78impl PartialEq<&str> for ChatResult {
79 fn eq(&self, other: &&str) -> bool {
80 self.text == *other
81 }
82}
83
84impl PartialEq<String> for ChatResult {
85 fn eq(&self, other: &String) -> bool {
86 self.text == *other
87 }
88}
89
90impl std::fmt::Display for ChatResult {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.write_str(&self.text)
93 }
94}
95
96impl From<ChatResult> for String {
97 fn from(result: ChatResult) -> Self {
98 result.text
99 }
100}
101
102/// Brief timeout used when draining the error channel after the text stream
103/// closes. Shared with [`crate::interactive`].
104pub(crate) const ERROR_DRAIN_TIMEOUT: Duration = Duration::from_millis(50);
105
106/// A tool call event received during streaming.
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ToolCallEvent {
109 /// Tool name (e.g. `"view_file"` or a custom tool name).
110 pub name: String,
111 /// Arguments as a JSON object.
112 pub args: serde_json::Value,
113 /// Optional call identifier assigned by the backend.
114 pub id: Option<String>,
115 /// Optional canonical path for file tools.
116 #[serde(default)]
117 pub canonical_path: Option<String>,
118}
119
120/// Error sent over the error channel when the Python stream fails.
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct StreamError {
123 /// Error message from the Python side.
124 pub message: String,
125}
126
127impl std::fmt::Display for StreamError {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 write!(f, "stream error: {}", self.message)
130 }
131}
132
133impl std::error::Error for StreamError {}
134
135/// An ordered event from a response timeline, produced by [`ChatResponseHandle::resolve`].
136///
137/// Mirrors the Python SDK's `ChatResponse.resolve()` which returns
138/// `list[StreamChunk | ToolCall | ToolResult]`.
139#[non_exhaustive]
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub enum ResponseEvent {
142 /// A text chunk from the model.
143 TextChunk(String),
144 /// A thinking/reasoning chunk from the model.
145 ThoughtChunk(String),
146 /// A tool call request from the model.
147 ToolCall(ToolCallEvent),
148 /// A tool execution result.
149 ToolResult(crate::types::ToolResult),
150}
151
152/// A chunk from the streaming response, combining text, thought, and tool call events.
153///
154/// This provides a unified stream of all chunk types, unlike the separate
155/// `take_text_stream()` / `take_thought_stream()` / `take_tool_call_stream()`
156/// accessors which split events by kind.
157#[non_exhaustive]
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub enum StreamChunk {
160 /// A text token from the model.
161 Text(String),
162 /// A thinking/reasoning token.
163 Thought(String),
164 /// A tool call event.
165 ToolCall(ToolCallEvent),
166}
167
168/// Shared mutable state between the writer and handle.
169///
170/// Uses `std::sync::Mutex` rather than `tokio::sync::Mutex` because the lock
171/// is held only for brief field reads/clones (never across `.await`). This is
172/// safe from deadlocks and cheaper than an async mutex.
173#[doc(hidden)]
174#[derive(Debug, Default)]
175pub struct ChatResponseSharedState {
176 /// Token usage metadata, populated by the writer after the stream completes.
177 pub usage: Option<UsageMetadata>,
178 /// Structured output, populated by the writer after the stream completes.
179 pub structured_output: Option<serde_json::Value>,
180}
181
182/// Grouped receivers for each independent stream channel.
183///
184/// Extracted from [`ChatResponseHandle`] so the seven channel receivers
185/// are logically grouped, keeping the handle's field list manageable.
186#[derive(Debug)]
187pub(crate) struct StreamReceivers {
188 /// Receives text tokens as they arrive from the model.
189 pub(super) text: Option<mpsc::Receiver<String>>,
190 /// Receives thinking/reasoning tokens.
191 pub(super) thought: Option<mpsc::Receiver<String>>,
192 /// Receives tool call events.
193 pub(super) tool_call: Option<mpsc::Receiver<ToolCallEvent>>,
194 /// Receives at most one error if the stream fails.
195 pub(super) error: Option<mpsc::Receiver<StreamError>>,
196 /// Receives ordered [`ResponseEvent`]s for [`resolve()`](super::handle::ChatResponseHandle::resolve).
197 pub(super) event: Option<mpsc::Receiver<ResponseEvent>>,
198 /// Receives [`Step`] objects as they are produced.
199 pub(super) step: Option<mpsc::Receiver<Step>>,
200 /// Receives unified [`StreamChunk`]s (text, thought, and tool call events).
201 pub(super) chunk: Option<mpsc::Receiver<StreamChunk>>,
202}
203
204impl StreamReceivers {
205 /// Create a new set of receivers from channel endpoints.
206 pub(super) fn new(
207 text: mpsc::Receiver<String>,
208 thought: mpsc::Receiver<String>,
209 tool_call: mpsc::Receiver<ToolCallEvent>,
210 error: mpsc::Receiver<StreamError>,
211 event: mpsc::Receiver<ResponseEvent>,
212 step: mpsc::Receiver<Step>,
213 chunk: mpsc::Receiver<StreamChunk>,
214 ) -> Self {
215 Self {
216 text: Some(text),
217 thought: Some(thought),
218 tool_call: Some(tool_call),
219 error: Some(error),
220 event: Some(event),
221 step: Some(step),
222 chunk: Some(chunk),
223 }
224 }
225}