Skip to main content

agy_bridge/streaming/
types.rs

1//! Streaming types: chunks, events, errors, and shared state.
2
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::mpsc;
7
8use crate::types::{Step, UsageMetadata};
9
10/// The result of draining a chat response via [`ChatResponseHandle::text()`].
11///
12/// Carries the full response text alongside optional metadata (token usage,
13/// structured output). Dereferences to `&str` for ergonomic use:
14///
15/// ```rust
16/// # #[tokio::main]
17/// # async fn main() -> Result<(), agy_bridge::error::Error> {
18/// # agy_bridge::load_dotenv();
19/// # let bridge = agy_bridge::AgyBridge::builder().build()?;
20/// # let agent = bridge.agent(
21/// #     agy_bridge::config::AgentConfig::builder()
22/// #         .system_instructions("Reply with 'Hello!' and nothing else. Never use tools.")
23/// #         .capabilities(agy_bridge::config::CapabilitiesConfig::custom_tools_only())
24/// #         .build()
25/// # ).await?;
26/// let result = agent
27///     .chat("Reply with 'Hello!' and nothing else.")
28///     .await?
29///     .text()
30///     .await?;
31/// println!("{result}"); // prints text
32/// if let Some(usage) = result.usage() { /* access metadata */ }
33/// # agent.shutdown().await?;
34/// # Ok(())
35/// # }
36/// ```
37#[derive(Debug, Clone)]
38pub struct ChatResult {
39    pub(super) text: String,
40    pub(super) usage: Option<UsageMetadata>,
41    pub(super) structured_output: Option<serde_json::Value>,
42}
43
44impl ChatResult {
45    /// The full response text.
46    #[must_use]
47    pub fn text(&self) -> &str {
48        &self.text
49    }
50
51    /// Consume the result and return the inner `String`.
52    #[must_use]
53    pub fn into_string(self) -> String {
54        self.text
55    }
56
57    /// Token usage metadata, if available.
58    #[must_use]
59    pub fn usage(&self) -> Option<&UsageMetadata> {
60        self.usage.as_ref()
61    }
62
63    /// Structured output (JSON), if the agent was configured with a
64    /// `response_schema` and the model returned valid JSON.
65    #[must_use]
66    pub fn structured_output(&self) -> Option<&serde_json::Value> {
67        self.structured_output.as_ref()
68    }
69}
70
71impl std::ops::Deref for ChatResult {
72    type Target = str;
73    fn deref(&self) -> &str {
74        &self.text
75    }
76}
77
78impl PartialEq<&str> for ChatResult {
79    fn eq(&self, other: &&str) -> bool {
80        self.text == *other
81    }
82}
83
84impl PartialEq<String> for ChatResult {
85    fn eq(&self, other: &String) -> bool {
86        self.text == *other
87    }
88}
89
90impl std::fmt::Display for ChatResult {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.write_str(&self.text)
93    }
94}
95
96impl From<ChatResult> for String {
97    fn from(result: ChatResult) -> Self {
98        result.text
99    }
100}
101
102/// Brief timeout used when draining the error channel after the text stream
103/// closes. Shared with [`crate::interactive`].
104pub(crate) const ERROR_DRAIN_TIMEOUT: Duration = Duration::from_millis(50);
105
106/// A tool call event received during streaming.
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ToolCallEvent {
109    /// Tool name (e.g. `"view_file"` or a custom tool name).
110    pub name: String,
111    /// Arguments as a JSON object.
112    pub args: serde_json::Value,
113    /// Optional call identifier assigned by the backend.
114    pub id: Option<String>,
115    /// Optional canonical path for file tools.
116    #[serde(default)]
117    pub canonical_path: Option<String>,
118}
119
120/// Error sent over the error channel when the Python stream fails.
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct StreamError {
123    /// Error message from the Python side.
124    pub message: String,
125}
126
127impl std::fmt::Display for StreamError {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        write!(f, "stream error: {}", self.message)
130    }
131}
132
133impl std::error::Error for StreamError {}
134
135/// An ordered event from a response timeline, produced by [`ChatResponseHandle::resolve`].
136///
137/// Mirrors the Python SDK's `ChatResponse.resolve()` which returns
138/// `list[StreamChunk | ToolCall | ToolResult]`.
139#[non_exhaustive]
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub enum ResponseEvent {
142    /// A text chunk from the model.
143    TextChunk(String),
144    /// A thinking/reasoning chunk from the model.
145    ThoughtChunk(String),
146    /// A tool call request from the model.
147    ToolCall(ToolCallEvent),
148    /// A tool execution result.
149    ToolResult(crate::types::ToolResult),
150}
151
152/// A chunk from the streaming response, combining text, thought, and tool call events.
153///
154/// This provides a unified stream of all chunk types, unlike the separate
155/// `take_text_stream()` / `take_thought_stream()` / `take_tool_call_stream()`
156/// accessors which split events by kind.
157#[non_exhaustive]
158#[derive(Debug, Clone, Serialize, Deserialize)]
159pub enum StreamChunk {
160    /// A text token from the model.
161    Text(String),
162    /// A thinking/reasoning token.
163    Thought(String),
164    /// A tool call event.
165    ToolCall(ToolCallEvent),
166}
167
168/// Shared mutable state between the writer and handle.
169///
170/// Uses `std::sync::Mutex` rather than `tokio::sync::Mutex` because the lock
171/// is held only for brief field reads/clones (never across `.await`). This is
172/// safe from deadlocks and cheaper than an async mutex.
173#[doc(hidden)]
174#[derive(Debug, Default)]
175pub struct ChatResponseSharedState {
176    /// Token usage metadata, populated by the writer after the stream completes.
177    pub usage: Option<UsageMetadata>,
178    /// Structured output, populated by the writer after the stream completes.
179    pub structured_output: Option<serde_json::Value>,
180}
181
182/// Grouped receivers for each independent stream channel.
183///
184/// Extracted from [`ChatResponseHandle`] so the seven channel receivers
185/// are logically grouped, keeping the handle's field list manageable.
186#[derive(Debug)]
187pub(crate) struct StreamReceivers {
188    /// Receives text tokens as they arrive from the model.
189    pub(super) text: Option<mpsc::Receiver<String>>,
190    /// Receives thinking/reasoning tokens.
191    pub(super) thought: Option<mpsc::Receiver<String>>,
192    /// Receives tool call events.
193    pub(super) tool_call: Option<mpsc::Receiver<ToolCallEvent>>,
194    /// Receives at most one error if the stream fails.
195    pub(super) error: Option<mpsc::Receiver<StreamError>>,
196    /// Receives ordered [`ResponseEvent`]s for [`resolve()`](super::handle::ChatResponseHandle::resolve).
197    pub(super) event: Option<mpsc::Receiver<ResponseEvent>>,
198    /// Receives [`Step`] objects as they are produced.
199    pub(super) step: Option<mpsc::Receiver<Step>>,
200    /// Receives unified [`StreamChunk`]s (text, thought, and tool call events).
201    pub(super) chunk: Option<mpsc::Receiver<StreamChunk>>,
202}
203
204impl StreamReceivers {
205    /// Create a new set of receivers from channel endpoints.
206    pub(super) fn new(
207        text: mpsc::Receiver<String>,
208        thought: mpsc::Receiver<String>,
209        tool_call: mpsc::Receiver<ToolCallEvent>,
210        error: mpsc::Receiver<StreamError>,
211        event: mpsc::Receiver<ResponseEvent>,
212        step: mpsc::Receiver<Step>,
213        chunk: mpsc::Receiver<StreamChunk>,
214    ) -> Self {
215        Self {
216            text: Some(text),
217            thought: Some(thought),
218            tool_call: Some(tool_call),
219            error: Some(error),
220            event: Some(event),
221            step: Some(step),
222            chunk: Some(chunk),
223        }
224    }
225}