Skip to main content

clark_agent/
stream.rs

1//! LLM transport extension point.
2//!
3//! `StreamFn` is the swappable boundary between the loop and any
4//! provider: real LLM API call (production), recorded-fixture replay
5//! (eval), scripted scenario (tests), remote proxy (gateway). One trait
6//! method, typed request/response.
7//!
8//! The loop never imports a specific provider; the caller assembles a
9//! `LoopConfig` with a `StreamFn` implementation of their choice.
10
11use async_trait::async_trait;
12use futures::stream::BoxStream;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use tokio_util::sync::CancellationToken;
16
17use crate::tool::ToolCall;
18use crate::types::{AgentMessage, AssistantContent, StopReason};
19
20/// Inputs for one LLM call.
21#[derive(Debug, Clone)]
22pub struct StreamRequest {
23    /// System prompt prepended by the provider.
24    pub system_prompt: String,
25    /// Conversation transcript. The transport converts to provider-native
26    /// format before sending.
27    pub messages: Vec<AgentMessage>,
28    /// Tool schemas the provider should expose to the model.
29    pub tools: Vec<ToolSchema>,
30    /// Optional sampling controls. Implementations may ignore.
31    pub temperature: Option<f32>,
32    pub max_output_tokens: Option<u32>,
33    /// Reasoning-effort knob shipped to the provider. Typed contract;
34    /// each `StreamFn` impl owns the wire mapping (OpenRouter's
35    /// `reasoning: {effort}` block, Fireworks' top-level
36    /// `reasoning_effort`, etc.). Single source of truth — replaces the
37    /// stringly-typed `provider_config["reasoning_effort"]` knob and the
38    /// hardcoded per-bridge defaults.
39    pub reasoning: ReasoningEffort,
40    /// Provider-specific extras (e.g., response format, custom routing
41    /// pins). Open-by-design leaf — typed at the leaf is overkill given
42    /// provider fragmentation. Reasoning effort is NOT carried here; use
43    /// the typed `reasoning` field above.
44    #[allow(clippy::struct_field_names)]
45    pub provider_extras: Value,
46    /// When `true`, the wire request sets `tool_choice: "required"` —
47    /// the provider MUST emit a tool call (no plain-text or empty
48    /// completion). Coercive narrowing for the framing turn: paired
49    /// with `OpeningGate`'s compact catalog, this forces the model to
50    /// pick a message or planning tool instead of emitting
51    /// reasoning-only text and skipping the structured turn.
52    /// Default `false` leaves tool choice to the provider for every
53    /// other turn.
54    pub force_tool_call: bool,
55}
56
57/// Reasoning-effort levels accepted by every supported provider.
58///
59/// Wire mapping is owned by each `StreamFn` impl: OpenRouter sends
60/// `reasoning: {effort}`; Fireworks rejects `"minimal"` and gets
61/// `Minimal → "low"` remapped at the bridge. The enum stays
62/// provider-agnostic so the loop and the configuration surface speak
63/// one language.
64///
65/// Default is [`ReasoningEffort::Minimal`] — keeps the reasoning
66/// channel open (some providers reject requests that omit it on
67/// reasoning-capable models) without burning the visible-completion
68/// budget on hidden-thought tokens.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
70#[serde(rename_all = "lowercase")]
71pub enum ReasoningEffort {
72    /// Reasoning channel suppressed entirely. Use only when the model
73    /// is known to tolerate `effort: "none"` (Anthropic, xAI Grok,
74    /// most non-OpenAI families).
75    None,
76    /// Lowest non-empty reasoning budget. Universal default — works
77    /// on every provider and keeps tool-call latency low.
78    #[default]
79    Minimal,
80    Low,
81    Medium,
82    High,
83    /// OpenRouter-specific extra-high reasoning tier (~95% of max_tokens
84    /// on OpenAI/Grok/Anthropic; mapped down to `"high"` for Gemini 3).
85    XHigh,
86}
87
88impl ReasoningEffort {
89    /// Wire form sent to OpenRouter / OpenAI-compatible endpoints.
90    pub fn as_wire(self) -> &'static str {
91        match self {
92            ReasoningEffort::None => "none",
93            ReasoningEffort::Minimal => "minimal",
94            ReasoningEffort::Low => "low",
95            ReasoningEffort::Medium => "medium",
96            ReasoningEffort::High => "high",
97            ReasoningEffort::XHigh => "xhigh",
98        }
99    }
100
101    /// Parse the canonical wire string back into the enum. Unknown or
102    /// empty inputs return `None` so callers can decide whether to
103    /// fall back to the default or surface a configuration error.
104    pub fn from_wire(s: &str) -> Option<Self> {
105        match s.trim().to_ascii_lowercase().as_str() {
106            "none" => Some(ReasoningEffort::None),
107            "minimal" => Some(ReasoningEffort::Minimal),
108            "low" => Some(ReasoningEffort::Low),
109            "medium" => Some(ReasoningEffort::Medium),
110            "high" => Some(ReasoningEffort::High),
111            "xhigh" => Some(ReasoningEffort::XHigh),
112            _ => None,
113        }
114    }
115}
116
117/// Tool schema as the provider sees it.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ToolSchema {
120    pub name: String,
121    pub description: String,
122    pub parameters: Value,
123}
124
125/// Streamed event from the provider.
126///
127/// Streaming surface is rich because UIs need to render token-by-token.
128/// The loop folds these into a single `StreamResponse` at the end.
129#[derive(Debug, Clone)]
130pub enum StreamEvent {
131    /// First event. Provider has accepted the request and started
132    /// generating. Carries the (so-far-empty) partial message so listeners
133    /// can render an empty assistant placeholder.
134    Start { partial: AgentMessage },
135
136    /// A streaming chunk. UI listeners render this; the loop folds it
137    /// into the assembled message.
138    Chunk(AssistantStreamChunk),
139
140    /// Final event. Provider finished generating. Carries the assembled
141    /// final message.
142    Done { message: AgentMessage },
143
144    /// Final event. Provider raised an error during streaming. Carries
145    /// the partial message produced so far and a human-readable error
146    /// description. Stream implementations encode their typed errors
147    /// here as strings; callers that need structured detail can parse
148    /// `kind`.
149    Error {
150        partial: AgentMessage,
151        kind: StreamErrorKind,
152        message: String,
153    },
154}
155
156/// One token-level chunk during streaming.
157///
158/// Apps render these as deltas. The loop ignores them (it only looks at
159/// `Start` for the placeholder and `Done` / `Error` for the assembled
160/// final message).
161#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(tag = "kind", rename_all = "snake_case")]
163pub enum AssistantStreamChunk {
164    /// Text being appended to the visible assistant content.
165    Text { delta: String },
166    /// Hidden reasoning being appended (think-then-act block).
167    Thinking { delta: String },
168    /// Native provider reasoning being appended.
169    Reasoning { delta: String },
170    /// Native provider reasoning detail blocks being appended.
171    ReasoningDetails { delta: Vec<Value> },
172    /// Tool call accumulating: arguments JSON streaming in piece by piece.
173    ToolCallDelta {
174        index: usize,
175        id_delta: Option<String>,
176        name_delta: Option<String>,
177        arguments_delta: Option<String>,
178    },
179}
180
181/// Coarse classification of a stream error.
182///
183/// `ContextOverflow` is split out from `Fatal` so the loop can apply
184/// recovery (re-run with a smaller context) instead of terminating.
185/// Without this split, providers that surface a `prompt_too_long` /
186/// `context_length_exceeded` error look identical to permanent
187/// failures (auth, schema, model id) and the loop has no signal to
188/// distinguish "trim and retry" from "give up".
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub enum StreamErrorKind {
192    Transient,
193    /// The selected model/provider is temporarily rate-limited. This
194    /// remains retryable at the transport boundary, but surfaces as a
195    /// distinct terminal kind if every retry fails.
196    ProviderRateLimited,
197    /// Transport failed before the provider produced any actionable
198    /// assistant turn. Hidden reasoning/details, usage, or an unusable
199    /// burst of partial tool-call deltas may have arrived, but the loop
200    /// still has no runnable next step, so this is safer to replay than
201    /// a generic transient stream error.
202    ZeroOutputTransport,
203    Fatal,
204    Empty,
205    Aborted,
206    /// Provider rejected the request because the assembled context
207    /// exceeds the model's window. Distinct from `Fatal` so a future
208    /// recovery layer (Phase 2 in the compaction roadmap) can compact
209    /// more aggressively and retry rather than ending the run.
210    ContextOverflow,
211}
212
213/// Final assembled response from one LLM call. Producers that don't need
214/// fine-grained streaming can return a `BoxStream` that yields just one
215/// `Done` event.
216#[derive(Debug, Clone)]
217pub struct StreamResponse {
218    pub content: AssistantContent,
219    pub stop_reason: StopReason,
220    pub error_message: Option<String>,
221    pub tool_calls: Vec<ToolCall>,
222}
223
224/// The transport trait. One method, typed in/out.
225///
226/// Contract:
227/// - Must not panic on request/model/runtime failures. Encode failures in
228///   the returned stream as `StreamEvent::Error` with a partial message,
229///   not by returning `Err`.
230/// - The returned stream must yield exactly one terminal event
231///   (`Done` or `Error`). Apps that prefer non-streaming can yield a
232///   single `Start` + `Done` with the full final message.
233/// - Honor `signal` for cancellation. On cancel, yield `Error` with
234///   `StreamError::Aborted`-equivalent or simply drop the stream.
235#[async_trait]
236pub trait StreamFn: Send + Sync + 'static {
237    async fn stream(
238        &self,
239        request: StreamRequest,
240        signal: CancellationToken,
241    ) -> BoxStream<'static, StreamEvent>;
242}