1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
pub mod attachments;
pub mod router;
pub mod streaming;
pub mod types;
pub use router::{ModelRouter, ModelTier, TaskComplexity};
pub use streaming::{StreamAccumulator, StreamBox, StreamDelta};
pub use types::*;
use anyhow::Result;
use async_trait::async_trait;
use futures::StreamExt;
use crate::model_capabilities::{
ModelCapabilities, default_max_output_tokens, get_model_capabilities,
};
#[async_trait]
pub trait LlmProvider: Send + Sync {
/// Non-streaming chat completion.
async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome>;
/// Streaming chat completion.
///
/// Returns a stream of [`StreamDelta`] events. The default implementation
/// calls [`chat()`](Self::chat) and converts the result to a single-chunk stream.
///
/// Providers should override this method to provide true streaming support.
fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
Box::pin(async_stream::stream! {
match self.chat(request).await {
Ok(outcome) => match outcome {
ChatOutcome::Success(response) => {
// Emit content as deltas
for (idx, block) in response.content.iter().enumerate() {
match block {
ContentBlock::Text { text } => {
yield Ok(StreamDelta::TextDelta {
delta: text.clone(),
block_index: idx,
});
}
ContentBlock::Thinking { thinking, .. } => {
yield Ok(StreamDelta::ThinkingDelta {
delta: thinking.clone(),
block_index: idx,
});
}
ContentBlock::RedactedThinking { .. }
| ContentBlock::ToolResult { .. }
| ContentBlock::Image { .. }
| ContentBlock::Document { .. } => {
// Not streamed in the default implementation
}
ContentBlock::ToolUse { id, name, input, thought_signature } => {
yield Ok(StreamDelta::ToolUseStart {
id: id.clone(),
name: name.clone(),
block_index: idx,
thought_signature: thought_signature.clone(),
});
yield Ok(StreamDelta::ToolInputDelta {
id: id.clone(),
delta: serde_json::to_string(input).unwrap_or_default(),
block_index: idx,
});
}
}
}
yield Ok(StreamDelta::Usage(response.usage));
yield Ok(StreamDelta::Done {
stop_reason: response.stop_reason,
});
}
ChatOutcome::RateLimited => {
yield Ok(StreamDelta::Error {
message: "Rate limited".to_string(),
recoverable: true,
});
}
ChatOutcome::InvalidRequest(msg) => {
yield Ok(StreamDelta::Error {
message: msg,
recoverable: false,
});
}
ChatOutcome::ServerError(msg) => {
yield Ok(StreamDelta::Error {
message: msg,
recoverable: true,
});
}
},
Err(e) => yield Err(e),
}
})
}
fn model(&self) -> &str;
fn provider(&self) -> &'static str;
/// Provider-owned thinking configuration, if any.
fn configured_thinking(&self) -> Option<&ThinkingConfig> {
None
}
/// Canonical capability metadata for this provider/model, if known.
fn capabilities(&self) -> Option<&'static ModelCapabilities> {
get_model_capabilities(self.provider(), self.model()).or_else(|| match self.provider() {
"openai-responses" | "openai-codex" => get_model_capabilities("openai", self.model()),
"vertex" if self.model().starts_with("claude-") => {
get_model_capabilities("anthropic", self.model())
}
"vertex" => get_model_capabilities("gemini", self.model()),
_ => None,
})
}
/// Validate a thinking configuration against the provider/model capabilities.
///
/// # Errors
///
/// Returns an error when the requested thinking mode is not supported by
/// the active provider/model capability set.
fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
let Some(thinking) = thinking else {
return Ok(());
};
if self
.capabilities()
.is_some_and(|caps| !caps.supports_thinking)
{
return Err(anyhow::anyhow!(
"thinking is not supported for provider={} model={}",
self.provider(),
self.model()
));
}
if matches!(thinking.mode, ThinkingMode::Adaptive)
&& !self
.capabilities()
.is_some_and(|caps| caps.supports_adaptive_thinking)
{
return Err(anyhow::anyhow!(
"adaptive thinking is not supported for provider={} model={}",
self.provider(),
self.model()
));
}
Ok(())
}
/// Resolve the effective thinking configuration for a request.
///
/// Request-level thinking overrides provider-owned defaults when present.
///
/// # Errors
///
/// Returns an error when the resolved thinking configuration is not
/// supported by the active provider/model capability set.
fn resolve_thinking_config(
&self,
request_thinking: Option<&ThinkingConfig>,
) -> Result<Option<ThinkingConfig>> {
let thinking = request_thinking.or_else(|| self.configured_thinking());
self.validate_thinking_config(thinking)?;
Ok(thinking.cloned())
}
/// Default maximum output tokens for this provider/model when the caller
/// does not explicitly override `AgentConfig.max_tokens`.
fn default_max_tokens(&self) -> u32 {
self.capabilities()
.and_then(|caps| caps.max_output_tokens)
.or_else(|| default_max_output_tokens(self.provider(), self.model()))
.unwrap_or(4096)
}
}
/// Helper function to consume a stream and collect it into a `ChatResponse`.
///
/// This is useful for providers that want to test their streaming implementation
/// or for cases where you need the full response after streaming.
///
/// # Errors
///
/// Returns an error if the stream yields an error result.
pub async fn collect_stream(mut stream: StreamBox<'_>, model: String) -> Result<ChatOutcome> {
let mut accumulator = StreamAccumulator::new();
let mut last_error: Option<(String, bool)> = None;
while let Some(result) = stream.next().await {
match result {
Ok(delta) => {
if let StreamDelta::Error {
message,
recoverable,
} = &delta
{
last_error = Some((message.clone(), *recoverable));
}
accumulator.apply(&delta);
}
Err(e) => return Err(e),
}
}
// If we encountered an error during streaming, return it
if let Some((message, recoverable)) = last_error {
if !recoverable {
return Ok(ChatOutcome::InvalidRequest(message));
}
// Check if it was a rate limit
if message.contains("Rate limited") || message.contains("rate limit") {
return Ok(ChatOutcome::RateLimited);
}
return Ok(ChatOutcome::ServerError(message));
}
// Extract usage and stop_reason before consuming the accumulator
let usage = accumulator.take_usage().unwrap_or(Usage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
});
let stop_reason = accumulator.take_stop_reason();
let content = accumulator.into_content_blocks();
// Log accumulated response for debugging
log::debug!(
"Collected stream response: model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
model,
stop_reason,
usage.input_tokens,
usage.output_tokens,
content.len()
);
for (i, block) in content.iter().enumerate() {
match block {
ContentBlock::Text { text } => {
log::debug!(" content_block[{}]: Text (len={})", i, text.len());
}
ContentBlock::Thinking { thinking, .. } => {
log::debug!(" content_block[{}]: Thinking (len={})", i, thinking.len());
}
ContentBlock::RedactedThinking { .. } => {
log::debug!(" content_block[{i}]: RedactedThinking");
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
log::debug!(" content_block[{i}]: ToolUse id={id} name={name} input={input}");
}
ContentBlock::ToolResult {
tool_use_id,
content: result_content,
is_error,
} => {
log::debug!(
" content_block[{}]: ToolResult tool_use_id={} is_error={:?} content_len={}",
i,
tool_use_id,
is_error,
result_content.len()
);
}
ContentBlock::Image { source } => {
log::debug!(
" content_block[{i}]: Image media_type={}",
source.media_type
);
}
ContentBlock::Document { source } => {
log::debug!(
" content_block[{i}]: Document media_type={}",
source.media_type
);
}
}
}
Ok(ChatOutcome::Success(ChatResponse {
id: String::new(),
content,
model,
stop_reason,
usage,
}))
}