Skip to main content

synaps_cli/runtime/openai/
mod.rs

1//! OpenAI-compatible provider engine.
2//!
3//! Ported from the `openai-runtime` prototype. Translates between Anthropic-shaped
4//! messages/tools/content-blocks (the internal synaps representation) and
5//! OpenAI `chat/completions` SSE wire.
6
7use std::collections::BTreeMap;
8
9pub mod types;
10pub mod wire;
11pub mod registry;
12pub mod catalog;
13pub mod reasoning;
14pub mod translate;
15pub mod stream;
16pub mod ping;
17
18use std::sync::Arc;
19
20use crate::extensions::manager::ExtensionManager;
21use crate::extensions::providers::ProviderRegistry;
22use crate::tools::{ToolCapabilities, ToolChannels, ToolContext, ToolLimits};
23
24pub use types::{
25    ChatMessage, ChatOptions, ChatRequest, FunctionCall, FunctionDefinition,
26    OaiEvent, ProviderConfig, StreamOptions, ToolCall, ToolChoice, ToolDefinition,
27};
28pub use wire::StreamDecoder;
29
30static EXTENSION_MANAGER: std::sync::RwLock<Option<Arc<tokio::sync::RwLock<ExtensionManager>>>> = std::sync::RwLock::new(None);
31
32pub fn set_extension_manager_for_routing(manager: Arc<tokio::sync::RwLock<ExtensionManager>>) {
33    *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = Some(manager);
34}
35
36pub fn clear_extension_manager_for_routing() {
37    *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = None;
38}
39
40pub fn extension_manager_for_routing() -> Option<Arc<tokio::sync::RwLock<ExtensionManager>>> {
41    EXTENSION_MANAGER
42        .read()
43        .expect("extension manager routing lock poisoned")
44        .clone()
45}
46
47/// Routing decision for a given model id.
48#[derive(Debug, Clone)]
49pub enum Provider {
50    /// Native Anthropic path (default, backward-compatible).
51    Anthropic,
52    /// OpenAI-compatible provider with a resolved config.
53    OpenAi(ProviderConfig),
54    /// ChatGPT subscription-backed Codex responses endpoint.
55    Codex(ProviderConfig),
56    /// Known provider prefix but no API key configured.
57    MissingKey(String),
58}
59
60/// Decide which backend a model should route to.
61///
62/// - `provider/model` shorthand where `provider` matches a known provider key → `OpenAi`
63/// - `claude-*` → `Anthropic`
64/// - anything else → `Anthropic` (backward compat)
65pub fn resolve_route(model: &str, provider_keys: &BTreeMap<String, String>) -> Provider {
66    if let Some((prefix, _rest)) = model.split_once('/') {
67        if prefix == "openai-codex" {
68            if let Some(cfg) = registry::resolve_codex_shorthand(model) {
69                return Provider::Codex(cfg);
70            }
71            return Provider::MissingKey(prefix.to_string());
72        }
73        if prefix == "local" || registry::providers().iter().any(|s| s.key == prefix) {
74            if let Some(cfg) = registry::resolve_shorthand(model, provider_keys) {
75                return Provider::OpenAi(cfg);
76            }
77            // Known provider but no key
78            return Provider::MissingKey(prefix.to_string());
79        }
80    }
81    Provider::Anthropic
82}
83
84/// Try to route a request through an OpenAI-compatible provider.
85///
86/// Returns `Some(Ok(value))` if the model resolved to an OpenAI provider and the
87/// request completed (successfully or with error). Returns `None` if the model
88/// should be handled by the Anthropic path.
89///
90/// This is the single routing entry point — both streaming and non-streaming
91/// callers in `api.rs` use this instead of duplicating the routing logic.
92pub async fn try_route(
93    model: &str,
94    client: &reqwest::Client,
95    tools_schema: &std::sync::Arc<Vec<serde_json::Value>>,
96    system_prompt: &Option<String>,
97    messages: &[serde_json::Value],
98    tx: &tokio::sync::mpsc::UnboundedSender<crate::runtime::types::StreamEvent>,
99    temperature: Option<f32>,
100    max_tokens: Option<u32>,
101    thinking_budget: u32,
102    cancel: &tokio_util::sync::CancellationToken,
103) -> Option<Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>> {
104    if let Some((plugin_id, provider_id, model_id)) = ProviderRegistry::parse_model_id(model) {
105        if let Some(manager) = extension_manager_for_routing() {
106            let provider_runtime_id = format!("{}:{}", plugin_id, provider_id);
107            let Some((handler, hook_bus, tools_shared, streaming, model_tool_use)) = ({
108                let manager = manager.read().await;
109                manager.provider(&provider_runtime_id).and_then(|provider| {
110                    provider.handler.as_ref().map(|handler| {
111                        let model_spec = provider.spec.models.iter().find(|m| m.id == model_id);
112                        let streaming = model_spec
113                            .and_then(|m| m.capabilities.get("streaming"))
114                            .and_then(|v| v.as_bool())
115                            .unwrap_or(false);
116                        let model_tool_use = model_spec
117                            .and_then(|m| m.capabilities.get("tool_use"))
118                            .and_then(|v| v.as_bool())
119                            .unwrap_or(false);
120                        (handler.clone(), manager.hook_bus().clone(), manager.tools_shared(), streaming, model_tool_use)
121                    })
122                })
123            }) else {
124                return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
125            };
126            // Per-provider trust gate: a disabled provider must not be invoked.
127            // The check runs before any IPC and we DO NOT silently fall back to
128            // built-in routing — instead return a clear routing error.
129            let trust = match crate::extensions::trust::load_trust_state() {
130                Ok(t) => t,
131                Err(e) => {
132                    tracing::warn!("trust.json corrupt or unreadable, failing closed: {e}");
133                    return Some(Err(
134                        format!("Cannot route to provider: trust state unreadable: {e}").into()
135                    ));
136                }
137            };
138            if !crate::extensions::trust::is_provider_enabled(&trust, &provider_runtime_id) {
139                let _ = crate::extensions::audit::append_audit_entry(
140                    &crate::extensions::audit::new_audit_entry(
141                        plugin_id,
142                        provider_id,
143                        model_id,
144                        false,
145                        0,
146                        false,
147                        "blocked",
148                        Some("trust_disabled".to_string()),
149                    ),
150                );
151                return Some(Err(format!(
152                    "Provider '{}' is disabled by user trust settings",
153                    provider_runtime_id
154                ).into()));
155            }
156            // Audit metadata captured up-front so each terminal branch can record an entry.
157            let audit_plugin = plugin_id.to_string();
158            let audit_provider = provider_id.to_string();
159            let audit_model = model_id.to_string();
160            let tools_exposed = !tools_schema.is_empty();
161            let emit_audit = |streamed: bool, outcome: &str, error_class: Option<&str>, tools_requested: u32| {
162                let _ = crate::extensions::audit::append_audit_entry(
163                    &crate::extensions::audit::new_audit_entry(
164                        audit_plugin.clone(),
165                        audit_provider.clone(),
166                        audit_model.clone(),
167                        tools_exposed,
168                        tools_requested,
169                        streamed,
170                        outcome,
171                        error_class.map(|s| s.to_string()),
172                    ),
173                );
174            };
175            if cancel.is_cancelled() {
176                emit_audit(false, "error", Some("canceled"), 0);
177                return Some(Err("operation canceled".into()));
178            }
179            let params = crate::extensions::runtime::process::ProviderCompleteParams {
180                provider_id: provider_id.to_string(),
181                model_id: model_id.to_string(),
182                model: model.to_string(),
183                messages: messages.to_vec(),
184                system_prompt: system_prompt.clone(),
185                tools: tools_schema.as_ref().clone(),
186                temperature,
187                max_tokens,
188                thinking_budget,
189            };
190            let has_active_tools = model_tool_use && !tools_schema.is_empty();
191            // Streaming path: forward TextDelta events as LlmEvent::Text deltas in real time.
192            if streaming && !has_active_tools {
193                let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<crate::extensions::runtime::process::ProviderStreamEvent>();
194                let tx_clone = tx.clone();
195                let forwarder = tokio::spawn(async move {
196                    use crate::extensions::runtime::process::ProviderStreamEvent;
197                    while let Some(event) = sink_rx.recv().await {
198                        match event {
199                            ProviderStreamEvent::TextDelta { text } => {
200                                let _ = tx_clone.send(crate::runtime::types::StreamEvent::Llm(
201                                    crate::runtime::types::LlmEvent::Text(text)
202                                ));
203                            }
204                            ProviderStreamEvent::ToolUse { .. } => {
205                                tracing::warn!("provider.stream tool_use event ignored (streaming tool-use not yet wired)");
206                            }
207                            // Usage / Done / ThinkingDelta / Error are absorbed; final result aggregates them.
208                            _ => {}
209                        }
210                    }
211                });
212                let stream_fut = handler.provider_stream(params, sink_tx);
213                let result = tokio::select! {
214                    biased;
215                    _ = cancel.cancelled() => {
216                        forwarder.abort();
217                        emit_audit(true, "error", Some("canceled"), 0);
218                        return Some(Err("operation canceled".into()));
219                    }
220                    res = stream_fut => res,
221                };
222                let _ = forwarder.await;
223                if cancel.is_cancelled() {
224                    emit_audit(true, "error", Some("canceled"), 0);
225                    return Some(Err("operation canceled".into()));
226                }
227                // TODO(audit): tools_requested is reported as 0 for the streaming
228                // path until ProviderStreamEvent::ToolUse is wired through the
229                // forwarder; tool-use over streaming is not yet routed.
230                match result {
231                    Ok(complete) => {
232                        emit_audit(true, "ok", None, 0);
233                        return Some(Ok(serde_json::json!({
234                            "content": complete.content,
235                            "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
236                            "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
237                        })));
238                    }
239                    Err(e) => {
240                        emit_audit(true, "error", Some("provider_error"), 0);
241                        return Some(Err(format!("extension provider: {e}").into()));
242                    }
243                }
244            }
245            let result = if let Some(tools) = tools_shared {
246                let registry = tools.read().await;
247                crate::extensions::runtime::process::complete_provider_with_tools(
248                    handler.clone(),
249                    params,
250                    &registry,
251                    &hook_bus,
252                    || ToolContext {
253                        channels: ToolChannels {
254                            tx_delta: None,
255                            tx_events: None,
256                        },
257                        capabilities: ToolCapabilities {
258                            watcher_exit_path: None,
259                            tool_register_tx: None,
260                            session_manager: None,
261                            subagent_registry: None,
262                            event_queue: None,
263                            secret_prompt: None,
264                        },
265                        limits: ToolLimits {
266                            max_tool_output: 30000,
267                            bash_timeout: 30,
268                            bash_max_timeout: 300,
269                            subagent_timeout: 300,
270                        },
271                    },
272                    30000,
273                    8,
274                ).await
275            } else {
276                handler.provider_complete(params).await
277            };
278            if cancel.is_cancelled() {
279                emit_audit(false, "error", Some("canceled"), 0);
280                return Some(Err("operation canceled".into()));
281            }
282            // TODO(audit): tools_requested is reported as 0 here; the
283            // complete_provider_with_tools helper does not yet expose its
284            // observed tool-use iteration count. Wire that through when the
285            // helper grows a return-tuple or counter argument.
286            match result {
287                Ok(complete) => {
288                    let text = complete
289                        .content
290                        .iter()
291                        .filter_map(|block| block.get("text").and_then(|v| v.as_str()))
292                        .collect::<Vec<_>>()
293                        .join("");
294                    if !text.is_empty() {
295                        let _ = tx.send(crate::runtime::types::StreamEvent::Llm(
296                            crate::runtime::types::LlmEvent::Text(text)
297                        ));
298                    }
299                    emit_audit(false, "ok", None, 0);
300                    return Some(Ok(serde_json::json!({
301                        "content": complete.content,
302                        "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
303                        "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
304                    })));
305                }
306                Err(e) => {
307                    emit_audit(false, "error", Some("provider_error"), 0);
308                    return Some(Err(format!("extension provider: {e}").into()));
309                }
310            }
311        }
312        return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
313    }
314
315    let provider_keys = crate::core::config::get_provider_keys();
316    match resolve_route(model, &provider_keys) {
317        Provider::OpenAi(cfg) => {
318            let result = stream::call_oai_stream_inner(
319                &cfg, client, tools_schema, system_prompt, messages, tx,
320                temperature, max_tokens, thinking_budget, cancel,
321            ).await;
322            Some(result)
323        }
324        Provider::Codex(cfg) => {
325            let result = stream::call_codex_stream_inner(
326                &cfg, client, tools_schema, system_prompt, messages, tx,
327                temperature, max_tokens, cancel,
328            ).await;
329            Some(result)
330        }
331        Provider::Anthropic => None,
332        Provider::MissingKey(provider) => {
333            Some(Err(format!(
334                "No API key for '{}'. Set provider.{} in ~/.synaps-cli/config or the corresponding env var.",
335                provider, provider
336            ).into()))
337        }
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn resolves_openai_codex_without_requiring_eager_credentials() {
347        std::env::remove_var("OPENAI_CODEX_ACCESS_TOKEN");
348        match resolve_route("openai-codex/gpt-5.1-codex-mini", &BTreeMap::new()) {
349            Provider::Codex(cfg) => {
350                assert_eq!(cfg.provider, "openai-codex");
351                assert_eq!(cfg.model, "gpt-5.1-codex-mini");
352                assert!(cfg.base_url.contains("chatgpt.com/backend-api"));
353            }
354            other => panic!("expected Codex route, got {other:?}"),
355        }
356    }
357
358    #[test]
359    fn set_extension_manager_for_routing_overwrites_previous_manager() {
360        clear_extension_manager_for_routing();
361        let first = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
362        let second = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
363
364        set_extension_manager_for_routing(first.clone());
365        assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &first));
366
367        set_extension_manager_for_routing(second.clone());
368        assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &second));
369
370        clear_extension_manager_for_routing();
371        assert!(extension_manager_for_routing().is_none());
372    }
373}