1use crate::config::constants::{models, urls};
2use crate::llm::client::LLMClient;
3use crate::llm::error_display;
4use crate::llm::provider::{
5 FinishReason, LLMError, LLMProvider, LLMRequest, LLMResponse, LLMStream, LLMStreamEvent,
6 Message, MessageRole, ToolCall, ToolChoice, ToolDefinition, Usage,
7};
8use crate::llm::types as llm_types;
9use async_stream::try_stream;
10use async_trait::async_trait;
11use futures::StreamExt;
12use reqwest::Client as HttpClient;
13use serde_json::{Map, Value, json};
14
15use super::extract_reasoning_trace;
16
17#[derive(Default, Clone)]
18struct ToolCallBuilder {
19 id: Option<String>,
20 name: Option<String>,
21 arguments: String,
22}
23
24impl ToolCallBuilder {
25 fn finalize(self, fallback_index: usize) -> Option<ToolCall> {
26 let name = self.name?;
27 let id = self
28 .id
29 .unwrap_or_else(|| format!("tool_call_{}", fallback_index));
30 let arguments = if self.arguments.is_empty() {
31 "{}".to_string()
32 } else {
33 self.arguments
34 };
35 Some(ToolCall::function(id, name, arguments))
36 }
37}
38
39fn update_tool_calls(builders: &mut Vec<ToolCallBuilder>, deltas: &[Value]) {
40 for (index, delta) in deltas.iter().enumerate() {
41 if builders.len() <= index {
42 builders.push(ToolCallBuilder::default());
43 }
44 let builder = builders
45 .get_mut(index)
46 .expect("tool call builder must exist after push");
47
48 if let Some(id) = delta.get("id").and_then(|v| v.as_str()) {
49 builder.id = Some(id.to_string());
50 }
51
52 if let Some(function) = delta.get("function") {
53 if let Some(name) = function.get("name").and_then(|v| v.as_str()) {
54 builder.name = Some(name.to_string());
55 }
56
57 if let Some(arguments_value) = function.get("arguments") {
58 if let Some(arguments) = arguments_value.as_str() {
59 builder.arguments.push_str(arguments);
60 } else if arguments_value.is_object() || arguments_value.is_array() {
61 builder.arguments.push_str(&arguments_value.to_string());
62 }
63 }
64 }
65 }
66}
67
68fn finalize_tool_calls(builders: Vec<ToolCallBuilder>) -> Option<Vec<ToolCall>> {
69 let calls: Vec<ToolCall> = builders
70 .into_iter()
71 .enumerate()
72 .filter_map(|(index, builder)| builder.finalize(index))
73 .collect();
74
75 if calls.is_empty() { None } else { Some(calls) }
76}
77
78#[derive(Debug, PartialEq, Eq)]
79enum StreamFragment {
80 Content(String),
81 Reasoning(String),
82}
83
84#[derive(Default, Debug)]
85struct StreamDelta {
86 fragments: Vec<StreamFragment>,
87}
88
89impl StreamDelta {
90 fn push_content(&mut self, text: &str) {
91 if text.is_empty() {
92 return;
93 }
94
95 match self.fragments.last_mut() {
96 Some(StreamFragment::Content(existing)) => existing.push_str(text),
97 _ => self
98 .fragments
99 .push(StreamFragment::Content(text.to_string())),
100 }
101 }
102
103 fn push_reasoning(&mut self, text: &str) {
104 if text.is_empty() {
105 return;
106 }
107
108 match self.fragments.last_mut() {
109 Some(StreamFragment::Reasoning(existing)) => existing.push_str(text),
110 _ => self
111 .fragments
112 .push(StreamFragment::Reasoning(text.to_string())),
113 }
114 }
115
116 fn is_empty(&self) -> bool {
117 self.fragments.is_empty()
118 }
119
120 fn into_fragments(self) -> Vec<StreamFragment> {
121 self.fragments
122 }
123
124 fn extend(&mut self, other: StreamDelta) {
125 self.fragments.extend(other.fragments);
126 }
127}
128
129#[derive(Default, Clone)]
130struct ReasoningBuffer {
131 text: String,
132 last_chunk: Option<String>,
133}
134
135impl ReasoningBuffer {
136 fn push(&mut self, chunk: &str) -> Option<String> {
137 if chunk.trim().is_empty() {
138 return None;
139 }
140
141 let normalized = Self::normalize_chunk(chunk);
142
143 if normalized.is_empty() {
144 return None;
145 }
146
147 if self.last_chunk.as_deref() == Some(&normalized) {
148 return None;
149 }
150
151 let last_has_spacing = self.text.ends_with(' ') || self.text.ends_with('\n');
152 let chunk_starts_with_space = chunk
153 .chars()
154 .next()
155 .map(|value| value.is_whitespace())
156 .unwrap_or(false);
157 let leading_punctuation = Self::is_leading_punctuation(chunk);
158 let trailing_connector = Self::ends_with_connector(&self.text);
159
160 let mut delta = String::new();
161
162 if !self.text.is_empty()
163 && !last_has_spacing
164 && !chunk_starts_with_space
165 && !leading_punctuation
166 && !trailing_connector
167 {
168 delta.push(' ');
169 }
170
171 delta.push_str(&normalized);
172 self.text.push_str(&delta);
173 self.last_chunk = Some(normalized);
174
175 Some(delta)
176 }
177
178 fn finalize(self) -> Option<String> {
179 let trimmed = self.text.trim();
180 if trimmed.is_empty() {
181 None
182 } else {
183 Some(trimmed.to_string())
184 }
185 }
186
187 fn normalize_chunk(chunk: &str) -> String {
188 let mut normalized = String::new();
189 for part in chunk.split_whitespace() {
190 if !normalized.is_empty() {
191 normalized.push(' ');
192 }
193 normalized.push_str(part);
194 }
195 normalized
196 }
197
198 fn is_leading_punctuation(chunk: &str) -> bool {
199 chunk
200 .chars()
201 .find(|ch| !ch.is_whitespace())
202 .map(|ch| matches!(ch, ',' | '.' | '!' | '?' | ':' | ';' | ')' | ']' | '}'))
203 .unwrap_or(false)
204 }
205
206 fn ends_with_connector(text: &str) -> bool {
207 text.chars()
208 .rev()
209 .find(|ch| !ch.is_whitespace())
210 .map(|ch| matches!(ch, '(' | '[' | '{' | '/' | '-'))
211 .unwrap_or(false)
212 }
213}
214
215fn apply_tool_call_delta_from_content(
216 builders: &mut Vec<ToolCallBuilder>,
217 container: &Map<String, Value>,
218) {
219 if let Some(nested) = container.get("delta").and_then(|value| value.as_object()) {
220 apply_tool_call_delta_from_content(builders, nested);
221 }
222
223 let (index, delta_source) = if let Some(tool_call_value) = container.get("tool_call") {
224 match tool_call_value.as_object() {
225 Some(tool_call) => {
226 let idx = tool_call
227 .get("index")
228 .and_then(|value| value.as_u64())
229 .unwrap_or(0) as usize;
230 (idx, tool_call)
231 }
232 None => (0usize, container),
233 }
234 } else {
235 let idx = container
236 .get("index")
237 .and_then(|value| value.as_u64())
238 .unwrap_or(0) as usize;
239 (idx, container)
240 };
241
242 let mut delta_map = Map::new();
243
244 if let Some(id_value) = delta_source.get("id") {
245 delta_map.insert("id".to_string(), id_value.clone());
246 }
247
248 if let Some(function_value) = delta_source.get("function") {
249 delta_map.insert("function".to_string(), function_value.clone());
250 }
251
252 if delta_map.is_empty() {
253 return;
254 }
255
256 if builders.len() <= index {
257 builders.resize_with(index + 1, ToolCallBuilder::default);
258 }
259
260 let mut deltas = vec![Value::Null; index + 1];
261 deltas[index] = Value::Object(delta_map);
262 update_tool_calls(builders, &deltas);
263}
264
265fn process_content_object(
266 map: &Map<String, Value>,
267 aggregated_content: &mut String,
268 reasoning: &mut ReasoningBuffer,
269 tool_call_builders: &mut Vec<ToolCallBuilder>,
270 deltas: &mut StreamDelta,
271) {
272 if let Some(content_type) = map.get("type").and_then(|value| value.as_str()) {
273 match content_type {
274 "reasoning" | "thinking" | "analysis" => {
275 if let Some(text_value) = map.get("text").and_then(|value| value.as_str()) {
276 if let Some(delta) = reasoning.push(text_value) {
277 deltas.push_reasoning(&delta);
278 }
279 } else if let Some(text_value) =
280 map.get("output_text").and_then(|value| value.as_str())
281 {
282 if let Some(delta) = reasoning.push(text_value) {
283 deltas.push_reasoning(&delta);
284 }
285 }
286 return;
287 }
288 "tool_call_delta" | "tool_call" => {
289 apply_tool_call_delta_from_content(tool_call_builders, map);
290 return;
291 }
292 _ => {}
293 }
294 }
295
296 if let Some(tool_call_value) = map.get("tool_call").and_then(|value| value.as_object()) {
297 apply_tool_call_delta_from_content(tool_call_builders, tool_call_value);
298 return;
299 }
300
301 if let Some(text_value) = map.get("text").and_then(|value| value.as_str()) {
302 if !text_value.is_empty() {
303 aggregated_content.push_str(text_value);
304 deltas.push_content(text_value);
305 }
306 return;
307 }
308
309 if let Some(text_value) = map.get("output_text").and_then(|value| value.as_str()) {
310 if !text_value.is_empty() {
311 aggregated_content.push_str(text_value);
312 deltas.push_content(text_value);
313 }
314 return;
315 }
316
317 if let Some(text_value) = map
318 .get("output_text_delta")
319 .and_then(|value| value.as_str())
320 {
321 if !text_value.is_empty() {
322 aggregated_content.push_str(text_value);
323 deltas.push_content(text_value);
324 }
325 return;
326 }
327
328 for key in ["content", "items", "output", "outputs", "delta"] {
329 if let Some(inner) = map.get(key) {
330 process_content_value(
331 inner,
332 aggregated_content,
333 reasoning,
334 tool_call_builders,
335 deltas,
336 );
337 }
338 }
339}
340
341fn process_content_part(
342 part: &Value,
343 aggregated_content: &mut String,
344 reasoning: &mut ReasoningBuffer,
345 tool_call_builders: &mut Vec<ToolCallBuilder>,
346 deltas: &mut StreamDelta,
347) {
348 if let Some(text) = part.as_str() {
349 if !text.is_empty() {
350 aggregated_content.push_str(text);
351 deltas.push_content(text);
352 }
353 return;
354 }
355
356 if let Some(map) = part.as_object() {
357 process_content_object(
358 map,
359 aggregated_content,
360 reasoning,
361 tool_call_builders,
362 deltas,
363 );
364 return;
365 }
366
367 if part.is_array() {
368 process_content_value(
369 part,
370 aggregated_content,
371 reasoning,
372 tool_call_builders,
373 deltas,
374 );
375 }
376}
377
378fn process_content_value(
379 value: &Value,
380 aggregated_content: &mut String,
381 reasoning: &mut ReasoningBuffer,
382 tool_call_builders: &mut Vec<ToolCallBuilder>,
383 deltas: &mut StreamDelta,
384) {
385 match value {
386 Value::String(text) => {
387 if !text.is_empty() {
388 aggregated_content.push_str(text);
389 deltas.push_content(text);
390 }
391 }
392 Value::Array(parts) => {
393 for part in parts {
394 process_content_part(
395 part,
396 aggregated_content,
397 reasoning,
398 tool_call_builders,
399 deltas,
400 );
401 }
402 }
403 Value::Object(map) => {
404 process_content_object(
405 map,
406 aggregated_content,
407 reasoning,
408 tool_call_builders,
409 deltas,
410 );
411 }
412 _ => {}
413 }
414}
415
416fn extract_reasoning_from_message_content(message: &Value) -> Option<String> {
417 let parts = message.get("content")?.as_array()?;
418 let mut segments: Vec<String> = Vec::new();
419
420 for part in parts {
421 match part {
422 Value::Object(map) => {
423 let part_type = map
424 .get("type")
425 .and_then(|value| value.as_str())
426 .unwrap_or("");
427
428 if matches!(part_type, "reasoning" | "thinking" | "analysis") {
429 if let Some(extracted) = extract_reasoning_trace(part) {
430 if !extracted.trim().is_empty() {
431 segments.push(extracted);
432 continue;
433 }
434 }
435
436 if let Some(text) = map.get("text").and_then(|value| value.as_str()) {
437 let trimmed = text.trim();
438 if !trimmed.is_empty() {
439 segments.push(trimmed.to_string());
440 }
441 }
442 }
443 }
444 Value::String(text) => {
445 let trimmed = text.trim();
446 if !trimmed.is_empty() {
447 segments.push(trimmed.to_string());
448 }
449 }
450 _ => {}
451 }
452 }
453
454 if segments.is_empty() {
455 None
456 } else {
457 let mut combined = String::new();
458 for (idx, segment) in segments.iter().enumerate() {
459 if idx > 0 {
460 combined.push('\n');
461 }
462 combined.push_str(segment);
463 }
464 Some(combined)
465 }
466}
467
468fn parse_usage_value(value: &Value) -> Usage {
469 Usage {
470 prompt_tokens: value
471 .get("prompt_tokens")
472 .and_then(|pt| pt.as_u64())
473 .unwrap_or(0) as u32,
474 completion_tokens: value
475 .get("completion_tokens")
476 .and_then(|ct| ct.as_u64())
477 .unwrap_or(0) as u32,
478 total_tokens: value
479 .get("total_tokens")
480 .and_then(|tt| tt.as_u64())
481 .unwrap_or(0) as u32,
482 }
483}
484
485fn map_finish_reason(reason: &str) -> FinishReason {
486 match reason {
487 "stop" | "completed" | "done" | "finished" => FinishReason::Stop,
488 "length" => FinishReason::Length,
489 "tool_calls" => FinishReason::ToolCalls,
490 "content_filter" => FinishReason::ContentFilter,
491 other => FinishReason::Error(other.to_string()),
492 }
493}
494
495fn push_reasoning_value(reasoning: &mut ReasoningBuffer, value: &Value, deltas: &mut StreamDelta) {
496 if let Some(reasoning_text) = extract_reasoning_trace(value) {
497 if let Some(delta) = reasoning.push(&reasoning_text) {
498 deltas.push_reasoning(&delta);
499 }
500 } else if let Some(text_value) = value.get("text").and_then(|v| v.as_str()) {
501 if let Some(delta) = reasoning.push(text_value) {
502 deltas.push_reasoning(&delta);
503 }
504 }
505}
506
507fn parse_chat_completion_chunk(
508 payload: &Value,
509 aggregated_content: &mut String,
510 tool_call_builders: &mut Vec<ToolCallBuilder>,
511 reasoning: &mut ReasoningBuffer,
512 finish_reason: &mut FinishReason,
513) -> StreamDelta {
514 let mut deltas = StreamDelta::default();
515
516 if let Some(choices) = payload.get("choices").and_then(|c| c.as_array()) {
517 if let Some(choice) = choices.first() {
518 if let Some(delta) = choice.get("delta") {
519 if let Some(content_value) = delta.get("content") {
520 process_content_value(
521 content_value,
522 aggregated_content,
523 reasoning,
524 tool_call_builders,
525 &mut deltas,
526 );
527 }
528
529 if let Some(reasoning_value) = delta.get("reasoning") {
530 push_reasoning_value(reasoning, reasoning_value, &mut deltas);
531 }
532
533 if let Some(tool_calls_value) = delta.get("tool_calls").and_then(|v| v.as_array()) {
534 update_tool_calls(tool_call_builders, tool_calls_value);
535 }
536 }
537
538 if let Some(reasoning_value) = choice.get("reasoning") {
539 push_reasoning_value(reasoning, reasoning_value, &mut deltas);
540 }
541
542 if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
543 *finish_reason = map_finish_reason(reason);
544 }
545 }
546 }
547
548 deltas
549}
550
551fn parse_response_chunk(
552 payload: &Value,
553 aggregated_content: &mut String,
554 tool_call_builders: &mut Vec<ToolCallBuilder>,
555 reasoning: &mut ReasoningBuffer,
556 finish_reason: &mut FinishReason,
557) -> StreamDelta {
558 let mut deltas = StreamDelta::default();
559
560 if let Some(delta_value) = payload.get("delta") {
561 process_content_value(
562 delta_value,
563 aggregated_content,
564 reasoning,
565 tool_call_builders,
566 &mut deltas,
567 );
568 }
569
570 if let Some(event_type) = payload.get("type").and_then(|v| v.as_str()) {
571 match event_type {
572 "response.reasoning.delta" => {
573 if let Some(delta_value) = payload.get("delta") {
574 push_reasoning_value(reasoning, delta_value, &mut deltas);
575 }
576 }
577 "response.tool_call.delta" => {
578 if let Some(delta_object) = payload.get("delta").and_then(|v| v.as_object()) {
579 apply_tool_call_delta_from_content(tool_call_builders, delta_object);
580 }
581 }
582 "response.completed" | "response.done" | "response.finished" => {
583 if let Some(response_obj) = payload.get("response") {
584 if aggregated_content.is_empty() {
585 process_content_value(
586 response_obj,
587 aggregated_content,
588 reasoning,
589 tool_call_builders,
590 &mut deltas,
591 );
592 }
593
594 if let Some(reason) = response_obj
595 .get("stop_reason")
596 .and_then(|value| value.as_str())
597 .or_else(|| response_obj.get("status").and_then(|value| value.as_str()))
598 {
599 *finish_reason = map_finish_reason(reason);
600 }
601 }
602 }
603 _ => {}
604 }
605 }
606
607 if let Some(response_obj) = payload.get("response") {
608 if aggregated_content.is_empty() {
609 if let Some(content_value) = response_obj
610 .get("output_text")
611 .or_else(|| response_obj.get("output"))
612 .or_else(|| response_obj.get("content"))
613 {
614 process_content_value(
615 content_value,
616 aggregated_content,
617 reasoning,
618 tool_call_builders,
619 &mut deltas,
620 );
621 }
622 }
623 }
624
625 if let Some(reasoning_value) = payload.get("reasoning") {
626 push_reasoning_value(reasoning, reasoning_value, &mut deltas);
627 }
628
629 deltas
630}
631
632fn update_usage_from_value(source: &Value, usage: &mut Option<Usage>) {
633 if let Some(usage_value) = source.get("usage") {
634 *usage = Some(parse_usage_value(usage_value));
635 }
636}
637
638fn extract_data_payload(event: &str) -> Option<String> {
639 let mut data_lines: Vec<String> = Vec::new();
640
641 for raw_line in event.lines() {
642 let line = raw_line.trim_end_matches('\r');
643 if line.is_empty() || line.starts_with(':') {
644 continue;
645 }
646
647 if let Some(value) = line.strip_prefix("data:") {
648 data_lines.push(value.trim_start().to_string());
649 }
650 }
651
652 if data_lines.is_empty() {
653 None
654 } else {
655 Some(data_lines.join("\n"))
656 }
657}
658
659fn parse_stream_payload(
660 payload: &Value,
661 aggregated_content: &mut String,
662 tool_call_builders: &mut Vec<ToolCallBuilder>,
663 reasoning: &mut ReasoningBuffer,
664 usage: &mut Option<Usage>,
665 finish_reason: &mut FinishReason,
666) -> Option<StreamDelta> {
667 let mut emitted_delta = StreamDelta::default();
668
669 let chat_delta = parse_chat_completion_chunk(
670 payload,
671 aggregated_content,
672 tool_call_builders,
673 reasoning,
674 finish_reason,
675 );
676 emitted_delta.extend(chat_delta);
677
678 let response_delta = parse_response_chunk(
679 payload,
680 aggregated_content,
681 tool_call_builders,
682 reasoning,
683 finish_reason,
684 );
685 emitted_delta.extend(response_delta);
686
687 update_usage_from_value(payload, usage);
688 if let Some(response_obj) = payload.get("response") {
689 update_usage_from_value(response_obj, usage);
690 if let Some(reason) = response_obj
691 .get("finish_reason")
692 .and_then(|value| value.as_str())
693 {
694 *finish_reason = map_finish_reason(reason);
695 }
696 }
697
698 if emitted_delta.is_empty() {
699 None
700 } else {
701 Some(emitted_delta)
702 }
703}
704
705fn finalize_stream_response(
706 aggregated_content: String,
707 tool_call_builders: Vec<ToolCallBuilder>,
708 usage: Option<Usage>,
709 finish_reason: FinishReason,
710 reasoning: ReasoningBuffer,
711) -> LLMResponse {
712 let content = if aggregated_content.is_empty() {
713 None
714 } else {
715 Some(aggregated_content)
716 };
717
718 let reasoning = reasoning.finalize();
719
720 LLMResponse {
721 content,
722 tool_calls: finalize_tool_calls(tool_call_builders),
723 usage,
724 finish_reason,
725 reasoning,
726 }
727}
728
729pub struct OpenRouterProvider {
730 api_key: String,
731 http_client: HttpClient,
732 base_url: String,
733 model: String,
734}
735
736impl OpenRouterProvider {
737 pub fn new(api_key: String) -> Self {
738 Self::with_model(api_key, models::openrouter::DEFAULT_MODEL.to_string())
739 }
740
741 pub fn with_model(api_key: String, model: String) -> Self {
742 Self {
743 api_key,
744 http_client: HttpClient::new(),
745 base_url: urls::OPENROUTER_API_BASE.to_string(),
746 model,
747 }
748 }
749
750 pub fn from_config(
751 api_key: Option<String>,
752 model: Option<String>,
753 base_url: Option<String>,
754 ) -> Self {
755 let api_key_value = api_key.unwrap_or_default();
756 let mut provider = if let Some(model_value) = model {
757 Self::with_model(api_key_value, model_value)
758 } else {
759 Self::new(api_key_value)
760 };
761 if let Some(base) = base_url {
762 provider.base_url = base;
763 }
764 provider
765 }
766
767 fn default_request(&self, prompt: &str) -> LLMRequest {
768 LLMRequest {
769 messages: vec![Message::user(prompt.to_string())],
770 system_prompt: None,
771 tools: None,
772 model: self.model.clone(),
773 max_tokens: None,
774 temperature: None,
775 stream: false,
776 tool_choice: None,
777 parallel_tool_calls: None,
778 parallel_tool_config: None,
779 reasoning_effort: None,
780 }
781 }
782
783 fn parse_client_prompt(&self, prompt: &str) -> LLMRequest {
784 let trimmed = prompt.trim_start();
785 if trimmed.starts_with('{') {
786 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
787 if let Some(request) = self.parse_chat_request(&value) {
788 return request;
789 }
790 }
791 }
792
793 self.default_request(prompt)
794 }
795
796 fn parse_chat_request(&self, value: &Value) -> Option<LLMRequest> {
797 let messages_value = value.get("messages")?.as_array()?;
798 let mut system_prompt = None;
799 let mut messages = Vec::new();
800
801 for entry in messages_value {
802 let role = entry
803 .get("role")
804 .and_then(|r| r.as_str())
805 .unwrap_or(crate::config::constants::message_roles::USER);
806 let content = entry.get("content");
807 let text_content = content.map(Self::extract_content_text).unwrap_or_default();
808
809 match role {
810 "system" => {
811 if system_prompt.is_none() && !text_content.is_empty() {
812 system_prompt = Some(text_content);
813 }
814 }
815 "assistant" => {
816 let tool_calls = entry
817 .get("tool_calls")
818 .and_then(|tc| tc.as_array())
819 .map(|calls| {
820 calls
821 .iter()
822 .filter_map(|call| {
823 let id = call.get("id").and_then(|v| v.as_str())?;
824 let function = call.get("function")?;
825 let name = function.get("name").and_then(|v| v.as_str())?;
826 let arguments = function.get("arguments");
827 let serialized = arguments.map_or("{}".to_string(), |value| {
828 if value.is_string() {
829 value.as_str().unwrap_or("").to_string()
830 } else {
831 value.to_string()
832 }
833 });
834 Some(ToolCall::function(
835 id.to_string(),
836 name.to_string(),
837 serialized,
838 ))
839 })
840 .collect::<Vec<_>>()
841 })
842 .filter(|calls| !calls.is_empty());
843
844 let message = if let Some(calls) = tool_calls {
845 Message {
846 role: MessageRole::Assistant,
847 content: text_content,
848 tool_calls: Some(calls),
849 tool_call_id: None,
850 }
851 } else {
852 Message::assistant(text_content)
853 };
854 messages.push(message);
855 }
856 "tool" => {
857 let tool_call_id = entry
858 .get("tool_call_id")
859 .and_then(|id| id.as_str())
860 .map(|s| s.to_string());
861 let content_value = entry
862 .get("content")
863 .map(|value| {
864 if text_content.is_empty() {
865 value.to_string()
866 } else {
867 text_content.clone()
868 }
869 })
870 .unwrap_or_else(|| text_content.clone());
871 messages.push(Message {
872 role: MessageRole::Tool,
873 content: content_value,
874 tool_calls: None,
875 tool_call_id,
876 });
877 }
878 _ => {
879 messages.push(Message::user(text_content));
880 }
881 }
882 }
883
884 if messages.is_empty() {
885 return None;
886 }
887
888 let tools = value.get("tools").and_then(|tools_value| {
889 let tools_array = tools_value.as_array()?;
890 let converted: Vec<_> = tools_array
891 .iter()
892 .filter_map(|tool| {
893 let function = tool.get("function")?;
894 let name = function.get("name").and_then(|n| n.as_str())?;
895 let description = function
896 .get("description")
897 .and_then(|d| d.as_str())
898 .unwrap_or("")
899 .to_string();
900 let parameters = function
901 .get("parameters")
902 .cloned()
903 .unwrap_or_else(|| json!({}));
904 Some(ToolDefinition::function(
905 name.to_string(),
906 description,
907 parameters,
908 ))
909 })
910 .collect();
911
912 if converted.is_empty() {
913 None
914 } else {
915 Some(converted)
916 }
917 });
918
919 let max_tokens = value
920 .get("max_tokens")
921 .and_then(|v| v.as_u64())
922 .map(|v| v as u32);
923 let temperature = value
924 .get("temperature")
925 .and_then(|v| v.as_f64())
926 .map(|v| v as f32);
927 let stream = value
928 .get("stream")
929 .and_then(|v| v.as_bool())
930 .unwrap_or(false);
931 let tool_choice = value.get("tool_choice").and_then(Self::parse_tool_choice);
932 let parallel_tool_calls = value.get("parallel_tool_calls").and_then(|v| v.as_bool());
933 let reasoning_effort = value
934 .get("reasoning_effort")
935 .and_then(|v| v.as_str())
936 .map(|s| s.to_string())
937 .or_else(|| {
938 value
939 .get("reasoning")
940 .and_then(|r| r.get("effort"))
941 .and_then(|effort| effort.as_str())
942 .map(|s| s.to_string())
943 });
944
945 let model = value
946 .get("model")
947 .and_then(|m| m.as_str())
948 .unwrap_or(&self.model)
949 .to_string();
950
951 Some(LLMRequest {
952 messages,
953 system_prompt,
954 tools,
955 model,
956 max_tokens,
957 temperature,
958 stream,
959 tool_choice,
960 parallel_tool_calls,
961 parallel_tool_config: None,
962 reasoning_effort,
963 })
964 }
965
966 fn extract_content_text(content: &Value) -> String {
967 match content {
968 Value::String(text) => text.to_string(),
969 Value::Array(parts) => parts
970 .iter()
971 .filter_map(|part| {
972 if let Some(text) = part.get("text").and_then(|t| t.as_str()) {
973 Some(text.to_string())
974 } else if let Some(Value::String(text)) = part.get("content") {
975 Some(text.clone())
976 } else {
977 None
978 }
979 })
980 .collect::<Vec<_>>()
981 .join(""),
982 _ => String::new(),
983 }
984 }
985
986 fn parse_tool_choice(choice: &Value) -> Option<ToolChoice> {
987 match choice {
988 Value::String(value) => match value.as_str() {
989 "auto" => Some(ToolChoice::auto()),
990 "none" => Some(ToolChoice::none()),
991 "required" => Some(ToolChoice::any()),
992 _ => None,
993 },
994 Value::Object(map) => {
995 let choice_type = map.get("type").and_then(|t| t.as_str())?;
996 match choice_type {
997 "function" => map
998 .get("function")
999 .and_then(|f| f.get("name"))
1000 .and_then(|n| n.as_str())
1001 .map(|name| ToolChoice::function(name.to_string())),
1002 "auto" => Some(ToolChoice::auto()),
1003 "none" => Some(ToolChoice::none()),
1004 "any" | "required" => Some(ToolChoice::any()),
1005 _ => None,
1006 }
1007 }
1008 _ => None,
1009 }
1010 }
1011
1012 fn convert_to_openrouter_format(&self, request: &LLMRequest) -> Result<Value, LLMError> {
1013 let mut messages = Vec::new();
1014
1015 if let Some(system_prompt) = &request.system_prompt {
1016 messages.push(json!({
1017 "role": crate::config::constants::message_roles::SYSTEM,
1018 "content": system_prompt
1019 }));
1020 }
1021
1022 for msg in &request.messages {
1023 let role = msg.role.as_openai_str();
1024 let mut message = json!({
1025 "role": role,
1026 "content": msg.content
1027 });
1028
1029 if msg.role == MessageRole::Assistant {
1030 if let Some(tool_calls) = &msg.tool_calls {
1031 if !tool_calls.is_empty() {
1032 let tool_calls_json: Vec<Value> = tool_calls
1033 .iter()
1034 .map(|tc| {
1035 json!({
1036 "id": tc.id,
1037 "type": "function",
1038 "function": {
1039 "name": tc.function.name,
1040 "arguments": tc.function.arguments
1041 }
1042 })
1043 })
1044 .collect();
1045 message["tool_calls"] = Value::Array(tool_calls_json);
1046 }
1047 }
1048 }
1049
1050 if msg.role == MessageRole::Tool {
1051 if let Some(tool_call_id) = &msg.tool_call_id {
1052 message["tool_call_id"] = Value::String(tool_call_id.clone());
1053 }
1054 }
1055
1056 messages.push(message);
1057 }
1058
1059 if messages.is_empty() {
1060 let formatted_error =
1061 error_display::format_llm_error("OpenRouter", "No messages provided");
1062 return Err(LLMError::InvalidRequest(formatted_error));
1063 }
1064
1065 let mut provider_request = json!({
1066 "model": if request.model.trim().is_empty() {
1067 &self.model
1068 } else {
1069 &request.model
1070 },
1071 "messages": messages,
1072 "stream": request.stream
1073 });
1074
1075 if let Some(max_tokens) = request.max_tokens {
1076 provider_request["max_tokens"] = json!(max_tokens);
1077 }
1078
1079 if let Some(temperature) = request.temperature {
1080 provider_request["temperature"] = json!(temperature);
1081 }
1082
1083 if let Some(tools) = &request.tools {
1084 if !tools.is_empty() {
1085 let tools_json: Vec<Value> = tools
1086 .iter()
1087 .map(|tool| {
1088 json!({
1089 "type": "function",
1090 "function": {
1091 "name": tool.function.name,
1092 "description": tool.function.description,
1093 "parameters": tool.function.parameters
1094 }
1095 })
1096 })
1097 .collect();
1098 provider_request["tools"] = Value::Array(tools_json);
1099 }
1100 }
1101
1102 if let Some(tool_choice) = &request.tool_choice {
1103 provider_request["tool_choice"] = tool_choice.to_provider_format("openai");
1104 }
1105
1106 if let Some(parallel) = request.parallel_tool_calls {
1107 provider_request["parallel_tool_calls"] = Value::Bool(parallel);
1108 }
1109
1110 if let Some(effort) = request.reasoning_effort.as_deref() {
1111 if self.supports_reasoning_effort(&request.model) {
1112 provider_request["reasoning"] = json!({ "effort": effort });
1113 }
1114 }
1115
1116 Ok(provider_request)
1117 }
1118
1119 fn parse_openrouter_response(&self, response_json: Value) -> Result<LLMResponse, LLMError> {
1120 let choices = response_json
1121 .get("choices")
1122 .and_then(|c| c.as_array())
1123 .ok_or_else(|| {
1124 let formatted_error = error_display::format_llm_error(
1125 "OpenRouter",
1126 "Invalid response format: missing choices",
1127 );
1128 LLMError::Provider(formatted_error)
1129 })?;
1130
1131 if choices.is_empty() {
1132 let formatted_error =
1133 error_display::format_llm_error("OpenRouter", "No choices in response");
1134 return Err(LLMError::Provider(formatted_error));
1135 }
1136
1137 let choice = &choices[0];
1138 let message = choice.get("message").ok_or_else(|| {
1139 let formatted_error = error_display::format_llm_error(
1140 "OpenRouter",
1141 "Invalid response format: missing message",
1142 );
1143 LLMError::Provider(formatted_error)
1144 })?;
1145
1146 let content = match message.get("content") {
1147 Some(Value::String(text)) => Some(text.to_string()),
1148 Some(Value::Array(parts)) => {
1149 let text = parts
1150 .iter()
1151 .filter_map(|part| part.get("text").and_then(|t| t.as_str()))
1152 .collect::<Vec<_>>()
1153 .join("");
1154 if text.is_empty() { None } else { Some(text) }
1155 }
1156 _ => None,
1157 };
1158
1159 let tool_calls = message
1160 .get("tool_calls")
1161 .and_then(|tc| tc.as_array())
1162 .map(|calls| {
1163 calls
1164 .iter()
1165 .filter_map(|call| {
1166 let id = call.get("id").and_then(|v| v.as_str())?;
1167 let function = call.get("function")?;
1168 let name = function.get("name").and_then(|v| v.as_str())?;
1169 let arguments = function.get("arguments");
1170 let serialized = arguments.map_or("{}".to_string(), |value| {
1171 if value.is_string() {
1172 value.as_str().unwrap_or("").to_string()
1173 } else {
1174 value.to_string()
1175 }
1176 });
1177 Some(ToolCall::function(
1178 id.to_string(),
1179 name.to_string(),
1180 serialized,
1181 ))
1182 })
1183 .collect::<Vec<_>>()
1184 })
1185 .filter(|calls| !calls.is_empty());
1186
1187 let mut reasoning = message
1188 .get("reasoning")
1189 .and_then(extract_reasoning_trace)
1190 .or_else(|| choice.get("reasoning").and_then(extract_reasoning_trace));
1191
1192 if reasoning.is_none() {
1193 reasoning = extract_reasoning_from_message_content(message);
1194 }
1195
1196 let finish_reason = choice
1197 .get("finish_reason")
1198 .and_then(|fr| fr.as_str())
1199 .map(map_finish_reason)
1200 .unwrap_or(FinishReason::Stop);
1201
1202 let usage = response_json.get("usage").map(parse_usage_value);
1203
1204 Ok(LLMResponse {
1205 content,
1206 tool_calls,
1207 usage,
1208 finish_reason,
1209 reasoning,
1210 })
1211 }
1212}
1213
1214#[async_trait]
1215impl LLMProvider for OpenRouterProvider {
1216 fn name(&self) -> &str {
1217 "openrouter"
1218 }
1219
1220 fn supports_streaming(&self) -> bool {
1221 true
1222 }
1223
1224 fn supports_reasoning(&self, _model: &str) -> bool {
1225 false
1226 }
1227
1228 fn supports_reasoning_effort(&self, model: &str) -> bool {
1229 let requested = if model.trim().is_empty() {
1230 self.model.as_str()
1231 } else {
1232 model
1233 };
1234 models::openrouter::REASONING_MODELS
1235 .iter()
1236 .any(|candidate| *candidate == requested)
1237 }
1238
1239 async fn stream(&self, request: LLMRequest) -> Result<LLMStream, LLMError> {
1240 let mut provider_request = self.convert_to_openrouter_format(&request)?;
1241 provider_request["stream"] = Value::Bool(true);
1242
1243 let url = format!("{}/chat/completions", self.base_url);
1244
1245 let response = self
1246 .http_client
1247 .post(&url)
1248 .bearer_auth(&self.api_key)
1249 .json(&provider_request)
1250 .send()
1251 .await
1252 .map_err(|e| {
1253 let formatted_error =
1254 error_display::format_llm_error("OpenRouter", &format!("Network error: {}", e));
1255 LLMError::Network(formatted_error)
1256 })?;
1257
1258 if !response.status().is_success() {
1259 let status = response.status();
1260 let error_text = response.text().await.unwrap_or_default();
1261
1262 if status.as_u16() == 429 || error_text.contains("quota") {
1263 return Err(LLMError::RateLimit);
1264 }
1265
1266 let formatted_error = error_display::format_llm_error(
1267 "OpenRouter",
1268 &format!("HTTP {}: {}", status, error_text),
1269 );
1270 return Err(LLMError::Provider(formatted_error));
1271 }
1272
1273 fn find_sse_boundary(buffer: &str) -> Option<(usize, usize)> {
1274 let newline_boundary = buffer.find("\n\n").map(|idx| (idx, 2));
1275 let carriage_boundary = buffer.find("\r\n\r\n").map(|idx| (idx, 4));
1276
1277 match (newline_boundary, carriage_boundary) {
1278 (Some((n_idx, n_len)), Some((c_idx, c_len))) => {
1279 if n_idx <= c_idx {
1280 Some((n_idx, n_len))
1281 } else {
1282 Some((c_idx, c_len))
1283 }
1284 }
1285 (Some(boundary), None) => Some(boundary),
1286 (None, Some(boundary)) => Some(boundary),
1287 (None, None) => None,
1288 }
1289 }
1290
1291 let stream = try_stream! {
1292 let mut body_stream = response.bytes_stream();
1293 let mut buffer = String::new();
1294 let mut aggregated_content = String::new();
1295 let mut tool_call_builders: Vec<ToolCallBuilder> = Vec::new();
1296 let mut reasoning = ReasoningBuffer::default();
1297 let mut usage: Option<Usage> = None;
1298 let mut finish_reason = FinishReason::Stop;
1299 let mut done = false;
1300
1301 while let Some(chunk_result) = body_stream.next().await {
1302 let chunk = chunk_result.map_err(|err| {
1303 let formatted_error = error_display::format_llm_error(
1304 "OpenRouter",
1305 &format!("Streaming error: {}", err),
1306 );
1307 LLMError::Network(formatted_error)
1308 })?;
1309
1310 buffer.push_str(&String::from_utf8_lossy(&chunk));
1311
1312 while let Some((split_idx, delimiter_len)) = find_sse_boundary(&buffer) {
1313 let event = buffer[..split_idx].to_string();
1314 buffer.drain(..split_idx + delimiter_len);
1315
1316 if let Some(data_payload) = extract_data_payload(&event) {
1317 let trimmed_payload = data_payload.trim();
1318 if trimmed_payload == "[DONE]" {
1319 done = true;
1320 break;
1321 }
1322
1323 if !trimmed_payload.is_empty() {
1324 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
1325 let formatted_error = error_display::format_llm_error(
1326 "OpenRouter",
1327 &format!("Failed to parse stream payload: {}", err),
1328 );
1329 LLMError::Provider(formatted_error)
1330 })?;
1331
1332 if let Some(delta) = parse_stream_payload(
1333 &payload,
1334 &mut aggregated_content,
1335 &mut tool_call_builders,
1336 &mut reasoning,
1337 &mut usage,
1338 &mut finish_reason,
1339 ) {
1340 for fragment in delta.into_fragments() {
1341 match fragment {
1342 StreamFragment::Content(text) if !text.is_empty() => {
1343 yield LLMStreamEvent::Token { delta: text };
1344 }
1345 StreamFragment::Reasoning(text) if !text.is_empty() => {
1346 yield LLMStreamEvent::Reasoning { delta: text };
1347 }
1348 _ => {}
1349 }
1350 }
1351 }
1352 }
1353 }
1354 }
1355
1356 if done {
1357 break;
1358 }
1359 }
1360
1361 if !done && !buffer.trim().is_empty() {
1362 if let Some(data_payload) = extract_data_payload(&buffer) {
1363 let trimmed_payload = data_payload.trim();
1364 if trimmed_payload != "[DONE]" && !trimmed_payload.is_empty() {
1365 let payload: Value = serde_json::from_str(trimmed_payload).map_err(|err| {
1366 let formatted_error = error_display::format_llm_error(
1367 "OpenRouter",
1368 &format!("Failed to parse stream payload: {}", err),
1369 );
1370 LLMError::Provider(formatted_error)
1371 })?;
1372
1373 if let Some(delta) = parse_stream_payload(
1374 &payload,
1375 &mut aggregated_content,
1376 &mut tool_call_builders,
1377 &mut reasoning,
1378 &mut usage,
1379 &mut finish_reason,
1380 ) {
1381 for fragment in delta.into_fragments() {
1382 match fragment {
1383 StreamFragment::Content(text) if !text.is_empty() => {
1384 yield LLMStreamEvent::Token { delta: text };
1385 }
1386 StreamFragment::Reasoning(text) if !text.is_empty() => {
1387 yield LLMStreamEvent::Reasoning { delta: text };
1388 }
1389 _ => {}
1390 }
1391 }
1392 }
1393 }
1394 }
1395 }
1396
1397 let response = finalize_stream_response(
1398 aggregated_content,
1399 tool_call_builders,
1400 usage,
1401 finish_reason,
1402 reasoning,
1403 );
1404
1405 yield LLMStreamEvent::Completed { response };
1406 };
1407
1408 Ok(Box::pin(stream))
1409 }
1410
1411 async fn generate(&self, request: LLMRequest) -> Result<LLMResponse, LLMError> {
1412 let provider_request = self.convert_to_openrouter_format(&request)?;
1413 let url = format!("{}/chat/completions", self.base_url);
1414
1415 let response = self
1416 .http_client
1417 .post(&url)
1418 .bearer_auth(&self.api_key)
1419 .json(&provider_request)
1420 .send()
1421 .await
1422 .map_err(|e| {
1423 let formatted_error =
1424 error_display::format_llm_error("OpenRouter", &format!("Network error: {}", e));
1425 LLMError::Network(formatted_error)
1426 })?;
1427
1428 if !response.status().is_success() {
1429 let status = response.status();
1430 let error_text = response.text().await.unwrap_or_default();
1431
1432 if status.as_u16() == 429 || error_text.contains("quota") {
1433 return Err(LLMError::RateLimit);
1434 }
1435
1436 let formatted_error = error_display::format_llm_error(
1437 "OpenRouter",
1438 &format!("HTTP {}: {}", status, error_text),
1439 );
1440 return Err(LLMError::Provider(formatted_error));
1441 }
1442
1443 let openrouter_response: Value = response.json().await.map_err(|e| {
1444 let formatted_error = error_display::format_llm_error(
1445 "OpenRouter",
1446 &format!("Failed to parse response: {}", e),
1447 );
1448 LLMError::Provider(formatted_error)
1449 })?;
1450
1451 self.parse_openrouter_response(openrouter_response)
1452 }
1453
1454 fn supported_models(&self) -> Vec<String> {
1455 models::openrouter::SUPPORTED_MODELS
1456 .iter()
1457 .map(|s| s.to_string())
1458 .collect()
1459 }
1460
1461 fn validate_request(&self, request: &LLMRequest) -> Result<(), LLMError> {
1462 if request.messages.is_empty() {
1463 let formatted_error =
1464 error_display::format_llm_error("OpenRouter", "Messages cannot be empty");
1465 return Err(LLMError::InvalidRequest(formatted_error));
1466 }
1467
1468 for message in &request.messages {
1469 if let Err(err) = message.validate_for_provider("openai") {
1470 let formatted = error_display::format_llm_error("OpenRouter", &err);
1471 return Err(LLMError::InvalidRequest(formatted));
1472 }
1473 }
1474
1475 if request.model.trim().is_empty() {
1476 let formatted_error =
1477 error_display::format_llm_error("OpenRouter", "Model must be provided");
1478 return Err(LLMError::InvalidRequest(formatted_error));
1479 }
1480
1481 Ok(())
1482 }
1483}
1484
1485#[async_trait]
1486impl LLMClient for OpenRouterProvider {
1487 async fn generate(&mut self, prompt: &str) -> Result<llm_types::LLMResponse, LLMError> {
1488 let request = self.parse_client_prompt(prompt);
1489 let request_model = request.model.clone();
1490 let response = LLMProvider::generate(self, request).await?;
1491
1492 Ok(llm_types::LLMResponse {
1493 content: response.content.unwrap_or_default(),
1494 model: request_model,
1495 usage: response.usage.map(|u| llm_types::Usage {
1496 prompt_tokens: u.prompt_tokens as usize,
1497 completion_tokens: u.completion_tokens as usize,
1498 total_tokens: u.total_tokens as usize,
1499 }),
1500 reasoning: response.reasoning,
1501 })
1502 }
1503
1504 fn backend_kind(&self) -> llm_types::BackendKind {
1505 llm_types::BackendKind::OpenRouter
1506 }
1507
1508 fn model_id(&self) -> &str {
1509 &self.model
1510 }
1511}
1512
1513#[cfg(test)]
1514mod tests {
1515 use super::*;
1516 use serde_json::json;
1517
1518 #[test]
1519 fn test_parse_stream_payload_chat_chunk() {
1520 let payload = json!({
1521 "choices": [{
1522 "delta": {
1523 "content": [
1524 {"type": "output_text", "text": "Hello"}
1525 ]
1526 }
1527 }]
1528 });
1529
1530 let mut aggregated = String::new();
1531 let mut builders = Vec::new();
1532 let mut reasoning = ReasoningBuffer::default();
1533 let mut usage = None;
1534 let mut finish_reason = FinishReason::Stop;
1535
1536 let delta = parse_stream_payload(
1537 &payload,
1538 &mut aggregated,
1539 &mut builders,
1540 &mut reasoning,
1541 &mut usage,
1542 &mut finish_reason,
1543 );
1544
1545 let fragments = delta.expect("delta should exist").into_fragments();
1546 assert_eq!(
1547 fragments,
1548 vec![StreamFragment::Content("Hello".to_string())]
1549 );
1550 assert_eq!(aggregated, "Hello");
1551 assert!(builders.is_empty());
1552 assert!(usage.is_none());
1553 assert!(reasoning.finalize().is_none());
1554 }
1555
1556 #[test]
1557 fn test_parse_stream_payload_response_delta() {
1558 let payload = json!({
1559 "type": "response.delta",
1560 "delta": {
1561 "type": "output_text_delta",
1562 "text": "Stream"
1563 }
1564 });
1565
1566 let mut aggregated = String::new();
1567 let mut builders = Vec::new();
1568 let mut reasoning = ReasoningBuffer::default();
1569 let mut usage = None;
1570 let mut finish_reason = FinishReason::Stop;
1571
1572 let delta = parse_stream_payload(
1573 &payload,
1574 &mut aggregated,
1575 &mut builders,
1576 &mut reasoning,
1577 &mut usage,
1578 &mut finish_reason,
1579 );
1580
1581 let fragments = delta.expect("delta should exist").into_fragments();
1582 assert_eq!(
1583 fragments,
1584 vec![StreamFragment::Content("Stream".to_string())]
1585 );
1586 assert_eq!(aggregated, "Stream");
1587 }
1588
1589 #[test]
1590 fn test_extract_data_payload_joins_multiline_events() {
1591 let event = ": keep-alive\n".to_string() + "data: {\"a\":1}\n" + "data: {\"b\":2}\n";
1592 let payload = extract_data_payload(&event);
1593 assert_eq!(payload.as_deref(), Some("{\"a\":1}\n{\"b\":2}"));
1594 }
1595}