1use async_trait::async_trait;
2use futures::StreamExt;
3use reqwest::Client;
4use serde_json::json;
5
6use super::{Brain, BrainEvent, BrainRequest, BrainStream, LatencyClass, ModelCaps};
7
8pub struct OpenAIResponsesAdapter {
13 model: String,
14 api_key: String,
15 base_url: String,
16 client: Client,
17 caps: ModelCaps,
18}
19
20impl OpenAIResponsesAdapter {
21 pub fn new(model: &str, api_key: impl Into<String>, base_url: Option<&str>) -> Self {
22 let model = model.to_string();
23 Self {
24 model,
25 api_key: api_key.into(),
26 base_url: base_url.unwrap_or("https://api.openai.com/v1").to_string(),
27 client: Client::new(),
28 caps: ModelCaps {
29 context_window: 128_000,
30 max_output: 16_000,
31 tools: true,
32 vision: true,
33 cost_input_per_mtok: 2.5,
34 cost_output_per_mtok: 10.0,
35 latency: LatencyClass::Medium,
36 },
37 }
38 }
39
40 pub fn with_caps(mut self, caps: ModelCaps) -> Self {
42 self.caps = caps;
43 self
44 }
45}
46
47fn build_responses_body(model: &str, req: &BrainRequest) -> serde_json::Value {
48 let mut input: Vec<serde_json::Value> = Vec::new();
50
51 if let Some(sys) = &req.system {
52 input.push(json!({
53 "role": "system",
54 "content": sys,
55 }));
56 }
57
58 for msg in &req.messages {
59 let mut reasoning_buf = String::new();
60 let mut content_blocks: Vec<serde_json::Value> = Vec::new();
61 for block in &msg.content {
62 match block {
63 super::ContentBlock::Text { text } => {
64 if msg.role == "assistant" {
65 content_blocks.push(json!({
66 "type": "output_text",
67 "text": text,
68 }));
69 } else {
70 content_blocks.push(json!({
71 "type": "input_text",
72 "text": text,
73 }));
74 }
75 }
76 super::ContentBlock::Image { source } => {
77 content_blocks.push(json!({
78 "type": "input_image",
79 "image_url": image_source_url(source),
80 }));
81 }
82 super::ContentBlock::Reasoning { text } => {
83 if !reasoning_buf.is_empty() {
84 reasoning_buf.push('\n');
85 }
86 reasoning_buf.push_str(text);
87 }
88 _ => {}
89 }
90 }
91 let content = if content_blocks.len() == 1
92 && content_blocks[0]["type"].as_str() == Some("input_text")
93 {
94 content_blocks[0]["text"].clone()
95 } else if content_blocks.len() == 1
96 && content_blocks[0]["type"].as_str() == Some("output_text")
97 {
98 content_blocks[0]["text"].clone()
99 } else {
100 json!(content_blocks)
101 };
102
103 let mut item = json!({
104 "role": msg.role,
105 "content": content,
106 });
107 if msg.role == "assistant" && !reasoning_buf.is_empty() {
108 item["reasoning_content"] = json!(reasoning_buf);
109 }
110 input.push(item);
111 }
112
113 let mut body = json!({
114 "model": model,
115 "input": input,
116 "stream": true,
117 "temperature": req.temperature,
118 "max_output_tokens": req.max_tokens,
119 });
120
121 if req.cache.enabled {
122 if let Some(key) = &req.cache.key {
123 body["prompt_cache_key"] = json!(key);
124 }
125 body["prompt_cache_retention"] = json!(req.cache.ttl.openai_retention());
126 }
127
128 body
129}
130
131fn image_source_url(source: &super::ImageSource) -> String {
132 match source {
133 super::ImageSource::Base64 { media_type, data } => {
134 format!("data:{};base64,{}", media_type, data)
135 }
136 super::ImageSource::Url { url } => url.clone(),
137 }
138}
139
140fn push_responses_events(val: &serde_json::Value, events: &mut Vec<BrainEvent>) {
141 let event_type = val["type"].as_str().unwrap_or("");
142 if let Some(delta) = val["delta"].as_str() {
143 if event_type.contains("reasoning") || event_type.contains("thinking") {
144 events.push(BrainEvent::ReasoningDelta(delta.to_string()));
145 } else {
146 events.push(BrainEvent::TextDelta(delta.to_string()));
147 }
148 }
149
150 for key in [
151 "reasoning_content",
152 "reasoning",
153 "thinking",
154 "reasoning_summary_text",
155 ] {
156 if let Some(text) = val.get(key).and_then(|v| v.as_str()) {
157 if !text.is_empty() {
158 events.push(BrainEvent::ReasoningDelta(text.to_string()));
159 }
160 }
161 }
162
163 if let Some(response) = val.get("response") {
164 collect_nested_reasoning(response, events);
165 }
166 if event_type == "response.completed" {
167 events.push(BrainEvent::Done(crate::event::StopReason::EndTurn));
168 }
169}
170
171fn collect_nested_reasoning(value: &serde_json::Value, events: &mut Vec<BrainEvent>) {
172 match value {
173 serde_json::Value::Array(items) => {
174 for item in items {
175 collect_nested_reasoning(item, events);
176 }
177 }
178 serde_json::Value::Object(map) => {
179 for (key, value) in map {
180 if matches!(
181 key.as_str(),
182 "reasoning_content" | "reasoning" | "thinking" | "reasoning_summary_text"
183 ) {
184 if let Some(text) = value.as_str() {
185 if !text.is_empty() {
186 events.push(BrainEvent::ReasoningDelta(text.to_string()));
187 }
188 }
189 }
190 collect_nested_reasoning(value, events);
191 }
192 }
193 _ => {}
194 }
195}
196
197#[async_trait]
198impl Brain for OpenAIResponsesAdapter {
199 fn id(&self) -> &str {
200 &self.model
201 }
202
203 fn caps(&self) -> ModelCaps {
204 self.caps.clone()
205 }
206
207 async fn complete(&self, req: BrainRequest) -> anyhow::Result<BrainStream> {
208 let body = build_responses_body(&self.model, &req);
209
210 let response = self
211 .client
212 .post(format!("{}/responses", self.base_url))
213 .header("Authorization", format!("Bearer {}", self.api_key))
214 .json(&body)
215 .send()
216 .await?;
217
218 if !response.status().is_success() {
219 let status = response.status().as_u16();
220 let body = response.text().await.unwrap_or_default();
221 return Err(anyhow::anyhow!(
222 "OpenAI Responses API error {}: {}",
223 status,
224 body
225 ));
226 }
227
228 let stream = response.bytes_stream();
229 let event_stream = futures::stream::unfold(
233 (stream, false, super::sse_buffer::LineBuffer::new()),
234 |(mut stream, done, mut buf)| async move {
235 if done {
236 return None;
237 }
238 match futures::StreamExt::next(&mut stream).await {
239 Some(Ok(bytes)) => {
240 let mut events = Vec::new();
241 for line in buf.push(&bytes) {
242 let line = line.trim();
243 if line.is_empty() || !line.starts_with("data: ") {
244 continue;
245 }
246 let data = &line[6..];
247 if data == "[DONE]" {
248 events.push(BrainEvent::Done(crate::event::StopReason::EndTurn));
249 continue;
250 }
251 if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
252 push_responses_events(&val, &mut events);
253 }
254 }
255 Some((futures::stream::iter(events), (stream, false, buf)))
256 }
257 Some(Err(e)) => Some((
258 futures::stream::iter(vec![BrainEvent::Error(format!(
259 "stream error: {}",
260 e
261 ))]),
262 (stream, true, buf),
263 )),
264 None => None,
265 }
266 },
267 )
268 .flatten();
269
270 Ok(Box::pin(event_stream))
271 }
272}
273
274pub struct BedrockAdapter {
277 model_id: String,
278 #[allow(dead_code)]
280 region: String,
281 #[allow(dead_code)]
282 access_key: String,
283 #[allow(dead_code)]
284 secret_key: String,
285 #[allow(dead_code)]
286 client: Client,
287 caps: ModelCaps,
288}
289
290impl BedrockAdapter {
291 pub fn new(
292 model_id: &str,
293 region: &str,
294 access_key: impl Into<String>,
295 secret_key: impl Into<String>,
296 ) -> Self {
297 let model_id = model_id.to_string();
298 Self {
299 model_id,
300 region: region.to_string(),
301 access_key: access_key.into(),
302 secret_key: secret_key.into(),
303 client: Client::new(),
304 caps: ModelCaps {
305 context_window: 200_000,
306 max_output: 8_000,
307 tools: true,
308 vision: true,
309 cost_input_per_mtok: 3.0,
310 cost_output_per_mtok: 15.0,
311 latency: LatencyClass::Medium,
312 },
313 }
314 }
315}
316
317#[async_trait]
318impl Brain for BedrockAdapter {
319 fn id(&self) -> &str {
320 &self.model_id
321 }
322
323 fn caps(&self) -> ModelCaps {
324 self.caps.clone()
325 }
326
327 async fn complete(&self, _req: BrainRequest) -> anyhow::Result<BrainStream> {
328 anyhow::bail!(
336 "Bedrock provider is not implemented (model={}). \
337 AWS SigV4 signing + Bedrock EventStream parsing are missing. \
338 Use anthropic:* or openai:* directly, or pin a different provider in your config.",
339 self.model_id
340 )
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::provider::{ContentBlock, Msg, PromptCacheConfig, PromptCacheTtl};
348
349 #[test]
350 fn responses_body_adds_prompt_cache_controls() {
351 let req = BrainRequest {
352 system: Some("stable sparrow system".into()),
353 messages: vec![Msg {
354 role: "user".into(),
355 content: vec![ContentBlock::Text {
356 text: "dynamic task".into(),
357 }],
358 }],
359 cache: PromptCacheConfig {
360 enabled: true,
361 ttl: PromptCacheTtl::OneHour,
362 key: Some("sparrow-repo-abc".into()),
363 },
364 ..BrainRequest::default()
365 };
366
367 let body = build_responses_body("gpt-test", &req);
368 assert_eq!(body["prompt_cache_key"], "sparrow-repo-abc");
369 assert_eq!(body["prompt_cache_retention"], "in_memory");
370 }
371
372 #[test]
373 fn responses_body_reinjects_assistant_reasoning_content() {
374 let req = BrainRequest {
375 messages: vec![Msg {
376 role: "assistant".into(),
377 content: vec![
378 ContentBlock::Reasoning {
379 text: "private reasoning state".into(),
380 },
381 ContentBlock::Text {
382 text: "visible answer".into(),
383 },
384 ],
385 }],
386 ..BrainRequest::default()
387 };
388
389 let body = build_responses_body("gpt-test", &req);
390 assert_eq!(body["input"][0]["content"], "visible answer");
391 assert_eq!(
392 body["input"][0]["reasoning_content"],
393 "private reasoning state"
394 );
395 }
396
397 #[test]
398 fn responses_body_serializes_image_blocks() {
399 let req = BrainRequest {
400 messages: vec![Msg {
401 role: "user".into(),
402 content: vec![
403 ContentBlock::Text {
404 text: "describe this".into(),
405 },
406 ContentBlock::Image {
407 source: crate::provider::ImageSource::Base64 {
408 media_type: "image/png".into(),
409 data: "iVBORw0KGgo=".into(),
410 },
411 },
412 ],
413 }],
414 ..BrainRequest::default()
415 };
416
417 let body = build_responses_body("gpt-test", &req);
418 assert_eq!(body["input"][0]["content"][0]["type"], "input_text");
419 assert_eq!(body["input"][0]["content"][1]["type"], "input_image");
420 assert_eq!(
421 body["input"][0]["content"][1]["image_url"],
422 "data:image/png;base64,iVBORw0KGgo="
423 );
424 }
425
426 #[test]
427 fn responses_events_capture_reasoning_delta_without_visible_text() {
428 let event = json!({
429 "type": "response.reasoning_summary_text.delta",
430 "delta": "reasoning chunk"
431 });
432 let mut events = Vec::new();
433 push_responses_events(&event, &mut events);
434
435 assert!(matches!(
436 events.as_slice(),
437 [BrainEvent::ReasoningDelta(text)] if text == "reasoning chunk"
438 ));
439 }
440}