clark_agent/stream.rs
1//! LLM transport extension point.
2//!
3//! `StreamFn` is the swappable boundary between the loop and any
4//! provider: real LLM API call (production), recorded-fixture replay
5//! (eval), scripted scenario (tests), remote proxy (gateway). One trait
6//! method, typed request/response.
7//!
8//! The loop never imports a specific provider; the caller assembles a
9//! `LoopConfig` with a `StreamFn` implementation of their choice.
10
11use async_trait::async_trait;
12use futures::stream::BoxStream;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use tokio_util::sync::CancellationToken;
16
17use crate::tool::ToolCall;
18use crate::types::{AgentMessage, AssistantContent, StopReason};
19
20/// Inputs for one LLM call.
21#[derive(Debug, Clone)]
22pub struct StreamRequest {
23 /// System prompt prepended by the provider.
24 pub system_prompt: String,
25 /// Conversation transcript. The transport converts to provider-native
26 /// format before sending.
27 pub messages: Vec<AgentMessage>,
28 /// Tool schemas the provider should expose to the model.
29 pub tools: Vec<ToolSchema>,
30 /// Optional sampling controls. Implementations may ignore.
31 pub temperature: Option<f32>,
32 pub max_output_tokens: Option<u32>,
33 /// Reasoning-effort knob shipped to the provider. Typed contract;
34 /// each `StreamFn` impl owns the wire mapping (OpenRouter's
35 /// `reasoning: {effort}` block, Fireworks' top-level
36 /// `reasoning_effort`, etc.). Single source of truth — replaces the
37 /// stringly-typed `provider_config["reasoning_effort"]` knob and the
38 /// hardcoded per-bridge defaults.
39 pub reasoning: ReasoningEffort,
40 /// Provider-specific extras (e.g., response format, custom routing
41 /// pins). Open-by-design leaf — typed at the leaf is overkill given
42 /// provider fragmentation. Reasoning effort is NOT carried here; use
43 /// the typed `reasoning` field above.
44 #[allow(clippy::struct_field_names)]
45 pub provider_extras: Value,
46 /// When `true`, the wire request sets `tool_choice: "required"` —
47 /// the provider MUST emit a tool call (no plain-text or empty
48 /// completion). Coercive narrowing for the framing turn: paired
49 /// with `OpeningGate`'s compact catalog, this forces the model to
50 /// pick a message or planning tool instead of emitting
51 /// reasoning-only text and skipping the structured turn.
52 /// Default `false` leaves tool choice to the provider for every
53 /// other turn.
54 pub force_tool_call: bool,
55}
56
57/// Reasoning-effort levels accepted by every supported provider.
58///
59/// Wire mapping is owned by each `StreamFn` impl: OpenRouter sends
60/// `reasoning: {effort}`; Fireworks rejects `"minimal"` and gets
61/// `Minimal → "low"` remapped at the bridge. The enum stays
62/// provider-agnostic so the loop and the configuration surface speak
63/// one language.
64///
65/// Default is [`ReasoningEffort::Minimal`] — keeps the reasoning
66/// channel open (some providers reject requests that omit it on
67/// reasoning-capable models) without burning the visible-completion
68/// budget on hidden-thought tokens.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
70#[serde(rename_all = "lowercase")]
71pub enum ReasoningEffort {
72 /// Reasoning channel suppressed entirely. Use only when the model
73 /// is known to tolerate `effort: "none"` (Anthropic, xAI Grok,
74 /// most non-OpenAI families).
75 None,
76 /// Lowest non-empty reasoning budget. Universal default — works
77 /// on every provider and keeps tool-call latency low.
78 #[default]
79 Minimal,
80 Low,
81 Medium,
82 High,
83 /// OpenRouter-specific extra-high reasoning tier (~95% of max_tokens
84 /// on OpenAI/Grok/Anthropic; mapped down to `"high"` for Gemini 3).
85 XHigh,
86}
87
88impl ReasoningEffort {
89 /// Wire form sent to OpenRouter / OpenAI-compatible endpoints.
90 pub fn as_wire(self) -> &'static str {
91 match self {
92 ReasoningEffort::None => "none",
93 ReasoningEffort::Minimal => "minimal",
94 ReasoningEffort::Low => "low",
95 ReasoningEffort::Medium => "medium",
96 ReasoningEffort::High => "high",
97 ReasoningEffort::XHigh => "xhigh",
98 }
99 }
100
101 /// Parse the canonical wire string back into the enum. Unknown or
102 /// empty inputs return `None` so callers can decide whether to
103 /// fall back to the default or surface a configuration error.
104 pub fn from_wire(s: &str) -> Option<Self> {
105 match s.trim().to_ascii_lowercase().as_str() {
106 "none" => Some(ReasoningEffort::None),
107 "minimal" => Some(ReasoningEffort::Minimal),
108 "low" => Some(ReasoningEffort::Low),
109 "medium" => Some(ReasoningEffort::Medium),
110 "high" => Some(ReasoningEffort::High),
111 "xhigh" => Some(ReasoningEffort::XHigh),
112 _ => None,
113 }
114 }
115}
116
117/// Tool schema as the provider sees it.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ToolSchema {
120 pub name: String,
121 pub description: String,
122 pub parameters: Value,
123}
124
125/// Streamed event from the provider.
126///
127/// Streaming surface is rich because UIs need to render token-by-token.
128/// The loop folds these into a single `StreamResponse` at the end.
129#[derive(Debug, Clone)]
130pub enum StreamEvent {
131 /// First event. Provider has accepted the request and started
132 /// generating. Carries the (so-far-empty) partial message so listeners
133 /// can render an empty assistant placeholder.
134 Start { partial: AgentMessage },
135
136 /// A streaming chunk. UI listeners render this; the loop folds it
137 /// into the assembled message.
138 Chunk(AssistantStreamChunk),
139
140 /// Final event. Provider finished generating. Carries the assembled
141 /// final message.
142 Done { message: AgentMessage },
143
144 /// Final event. Provider raised an error during streaming. Carries
145 /// the partial message produced so far and a human-readable error
146 /// description. Stream implementations encode their typed errors
147 /// here as strings; callers that need structured detail can parse
148 /// `kind`.
149 Error {
150 partial: AgentMessage,
151 kind: StreamErrorKind,
152 message: String,
153 },
154}
155
156/// One token-level chunk during streaming.
157///
158/// Apps render these as deltas. The loop ignores them (it only looks at
159/// `Start` for the placeholder and `Done` / `Error` for the assembled
160/// final message).
161#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(tag = "kind", rename_all = "snake_case")]
163pub enum AssistantStreamChunk {
164 /// Text being appended to the visible assistant content.
165 Text { delta: String },
166 /// Hidden reasoning being appended (think-then-act block).
167 Thinking { delta: String },
168 /// Native provider reasoning being appended.
169 Reasoning { delta: String },
170 /// Native provider reasoning detail blocks being appended.
171 ReasoningDetails { delta: Vec<Value> },
172 /// Tool call accumulating: arguments JSON streaming in piece by piece.
173 ToolCallDelta {
174 index: usize,
175 id_delta: Option<String>,
176 name_delta: Option<String>,
177 arguments_delta: Option<String>,
178 },
179}
180
181/// Coarse classification of a stream error.
182///
183/// `ContextOverflow` is split out from `Fatal` so the loop can apply
184/// recovery (re-run with a smaller context) instead of terminating.
185/// Without this split, providers that surface a `prompt_too_long` /
186/// `context_length_exceeded` error look identical to permanent
187/// failures (auth, schema, model id) and the loop has no signal to
188/// distinguish "trim and retry" from "give up".
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
190#[serde(rename_all = "snake_case")]
191pub enum StreamErrorKind {
192 Transient,
193 /// The selected model/provider is temporarily rate-limited. This
194 /// remains retryable at the transport boundary, but surfaces as a
195 /// distinct terminal kind if every retry fails.
196 ProviderRateLimited,
197 /// Transport failed before the provider produced any actionable
198 /// assistant turn. Hidden reasoning/details, usage, or an unusable
199 /// burst of partial tool-call deltas may have arrived, but the loop
200 /// still has no runnable next step, so this is safer to replay than
201 /// a generic transient stream error.
202 ZeroOutputTransport,
203 Fatal,
204 Empty,
205 Aborted,
206 /// Provider rejected the request because the assembled context
207 /// exceeds the model's window. Distinct from `Fatal` so a future
208 /// recovery layer (Phase 2 in the compaction roadmap) can compact
209 /// more aggressively and retry rather than ending the run.
210 ContextOverflow,
211}
212
213/// Final assembled response from one LLM call. Producers that don't need
214/// fine-grained streaming can return a `BoxStream` that yields just one
215/// `Done` event.
216#[derive(Debug, Clone)]
217pub struct StreamResponse {
218 pub content: AssistantContent,
219 pub stop_reason: StopReason,
220 pub error_message: Option<String>,
221 pub tool_calls: Vec<ToolCall>,
222}
223
224/// The transport trait. One method, typed in/out.
225///
226/// Contract:
227/// - Must not panic on request/model/runtime failures. Encode failures in
228/// the returned stream as `StreamEvent::Error` with a partial message,
229/// not by returning `Err`.
230/// - The returned stream must yield exactly one terminal event
231/// (`Done` or `Error`). Apps that prefer non-streaming can yield a
232/// single `Start` + `Done` with the full final message.
233/// - Honor `signal` for cancellation. On cancel, yield `Error` with
234/// `StreamError::Aborted`-equivalent or simply drop the stream.
235#[async_trait]
236pub trait StreamFn: Send + Sync + 'static {
237 async fn stream(
238 &self,
239 request: StreamRequest,
240 signal: CancellationToken,
241 ) -> BoxStream<'static, StreamEvent>;
242}