Skip to main content

agent_engine/runtime/openai/
mod.rs

1//! OpenAI-compatible provider engine.
2//!
3//! Ported from the `openai-runtime` prototype. Translates between Anthropic-shaped
4//! messages/tools/content-blocks (the internal synaps representation) and
5//! OpenAI `chat/completions` SSE wire.
6
7use std::collections::BTreeMap;
8
9pub mod types;
10pub mod wire;
11pub mod registry;
12pub mod catalog;
13pub mod reasoning;
14pub mod translate;
15pub mod stream;
16pub mod ping;
17
18use std::sync::Arc;
19
20use crate::extensions::manager::ExtensionManager;
21use crate::extensions::providers::ProviderRegistry;
22use crate::tools::{ToolCapabilities, ToolChannels, ToolContext, ToolLimits};
23
24pub use types::{
25    ChatMessage, ChatOptions, ChatRequest, FunctionCall, FunctionDefinition,
26    OaiEvent, ProviderConfig, StreamOptions, ToolCall, ToolChoice, ToolDefinition,
27};
28pub use wire::StreamDecoder;
29
30static EXTENSION_MANAGER: std::sync::RwLock<Option<Arc<tokio::sync::RwLock<ExtensionManager>>>> = std::sync::RwLock::new(None);
31
32pub fn set_extension_manager_for_routing(manager: Arc<tokio::sync::RwLock<ExtensionManager>>) {
33    *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = Some(manager);
34}
35
36pub fn clear_extension_manager_for_routing() {
37    *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = None;
38}
39
40pub fn extension_manager_for_routing() -> Option<Arc<tokio::sync::RwLock<ExtensionManager>>> {
41    EXTENSION_MANAGER
42        .read()
43        .expect("extension manager routing lock poisoned")
44        .clone()
45}
46
47/// Routing decision for a given model id.
48#[derive(Debug, Clone)]
49pub enum Provider {
50    /// Native Anthropic path (default, backward-compatible).
51    Anthropic,
52    /// OpenAI-compatible provider with a resolved config.
53    OpenAi(ProviderConfig),
54    /// ChatGPT subscription-backed Codex responses endpoint.
55    Codex(ProviderConfig),
56    /// Known provider prefix but no API key configured.
57    MissingKey(String),
58}
59
60/// Decide which backend a model should route to.
61///
62/// - `provider/model` shorthand where `provider` matches a known provider key → `OpenAi`
63/// - `claude-*` → `Anthropic`
64/// - anything else → `Anthropic` (backward compat)
65pub fn resolve_route(model: &str, provider_keys: &BTreeMap<String, String>) -> Provider {
66    if let Some((prefix, _rest)) = model.split_once('/') {
67        if prefix == "openai-codex" {
68            if let Some(cfg) = registry::resolve_codex_shorthand(model) {
69                return Provider::Codex(cfg);
70            }
71            return Provider::MissingKey(prefix.to_string());
72        }
73        if prefix == "local" || registry::providers().iter().any(|s| s.key == prefix) {
74            if let Some(cfg) = registry::resolve_shorthand(model, provider_keys) {
75                return Provider::OpenAi(cfg);
76            }
77            // Known provider but no key
78            return Provider::MissingKey(prefix.to_string());
79        }
80    }
81    Provider::Anthropic
82}
83
84/// Try to route a request through an OpenAI-compatible provider.
85///
86/// Returns `Some(Ok(value))` if the model resolved to an OpenAI provider and the
87/// request completed (successfully or with error). Returns `None` if the model
88/// should be handled by the Anthropic path.
89///
90/// This is the single routing entry point — both streaming and non-streaming
91/// callers in `api.rs` use this instead of duplicating the routing logic.
92#[allow(clippy::too_many_arguments)]
93pub async fn try_route(
94    model: &str,
95    client: &reqwest::Client,
96    tools_schema: &std::sync::Arc<Vec<serde_json::Value>>,
97    system_prompt: &Option<String>,
98    messages: &[serde_json::Value],
99    tx: &tokio::sync::mpsc::UnboundedSender<crate::runtime::types::StreamEvent>,
100    temperature: Option<f32>,
101    max_tokens: Option<u32>,
102    thinking_budget: u32,
103    cancel: &tokio_util::sync::CancellationToken,
104) -> Option<Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>> {
105    if let Some((plugin_id, provider_id, model_id)) = ProviderRegistry::parse_model_id(model) {
106        if let Some(manager) = extension_manager_for_routing() {
107            let provider_runtime_id = format!("{}:{}", plugin_id, provider_id);
108            let Some((handler, hook_bus, tools_shared, streaming, model_tool_use)) = ({
109                let manager = manager.read().await;
110                manager.provider(&provider_runtime_id).and_then(|provider| {
111                    provider.handler.as_ref().map(|handler| {
112                        let model_spec = provider.spec.models.iter().find(|m| m.id == model_id);
113                        let streaming = model_spec
114                            .and_then(|m| m.capabilities.get("streaming"))
115                            .and_then(|v| v.as_bool())
116                            .unwrap_or(false);
117                        let model_tool_use = model_spec
118                            .and_then(|m| m.capabilities.get("tool_use"))
119                            .and_then(|v| v.as_bool())
120                            .unwrap_or(false);
121                        (handler.clone(), manager.hook_bus().clone(), manager.tools_shared(), streaming, model_tool_use)
122                    })
123                })
124            }) else {
125                return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
126            };
127            // Per-provider trust gate: a disabled provider must not be invoked.
128            // The check runs before any IPC and we DO NOT silently fall back to
129            // built-in routing — instead return a clear routing error.
130            let trust = match crate::extensions::trust::load_trust_state() {
131                Ok(t) => t,
132                Err(e) => {
133                    tracing::warn!("trust.json corrupt or unreadable, failing closed: {e}");
134                    return Some(Err(
135                        format!("Cannot route to provider: trust state unreadable: {e}").into()
136                    ));
137                }
138            };
139            if !crate::extensions::trust::is_provider_enabled(&trust, &provider_runtime_id) {
140                let _ = crate::extensions::audit::append_audit_entry(
141                    &crate::extensions::audit::new_audit_entry(
142                        plugin_id,
143                        provider_id,
144                        model_id,
145                        false,
146                        0,
147                        false,
148                        "blocked",
149                        Some("trust_disabled".to_string()),
150                    ),
151                );
152                return Some(Err(format!(
153                    "Provider '{}' is disabled by user trust settings",
154                    provider_runtime_id
155                ).into()));
156            }
157            // Audit metadata captured up-front so each terminal branch can record an entry.
158            let audit_plugin = plugin_id.to_string();
159            let audit_provider = provider_id.to_string();
160            let audit_model = model_id.to_string();
161            let tools_exposed = !tools_schema.is_empty();
162            let emit_audit = |streamed: bool, outcome: &str, error_class: Option<&str>, tools_requested: u32| {
163                let _ = crate::extensions::audit::append_audit_entry(
164                    &crate::extensions::audit::new_audit_entry(
165                        audit_plugin.clone(),
166                        audit_provider.clone(),
167                        audit_model.clone(),
168                        tools_exposed,
169                        tools_requested,
170                        streamed,
171                        outcome,
172                        error_class.map(|s| s.to_string()),
173                    ),
174                );
175            };
176            if cancel.is_cancelled() {
177                emit_audit(false, "error", Some("canceled"), 0);
178                return Some(Err("operation canceled".into()));
179            }
180            let params = crate::extensions::runtime::process::ProviderCompleteParams {
181                provider_id: provider_id.to_string(),
182                model_id: model_id.to_string(),
183                model: model.to_string(),
184                messages: messages.to_vec(),
185                system_prompt: system_prompt.clone(),
186                tools: tools_schema.as_ref().clone(),
187                temperature,
188                max_tokens,
189                thinking_budget,
190            };
191            let has_active_tools = model_tool_use && !tools_schema.is_empty();
192            // Streaming path: forward TextDelta events as LlmEvent::Text deltas in real time.
193            if streaming && !has_active_tools {
194                let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<crate::extensions::runtime::process::ProviderStreamEvent>();
195                let tx_clone = tx.clone();
196                let forwarder = tokio::spawn(async move {
197                    use crate::extensions::runtime::process::ProviderStreamEvent;
198                    while let Some(event) = sink_rx.recv().await {
199                        match event {
200                            ProviderStreamEvent::TextDelta { text } => {
201                                let _ = tx_clone.send(crate::runtime::types::StreamEvent::Llm(
202                                    crate::runtime::types::LlmEvent::Text(text)
203                                ));
204                            }
205                            ProviderStreamEvent::ToolUse { .. } => {
206                                tracing::warn!("provider.stream tool_use event ignored (streaming tool-use not yet wired)");
207                            }
208                            // Usage / Done / ThinkingDelta / Error are absorbed; final result aggregates them.
209                            _ => {}
210                        }
211                    }
212                });
213                let stream_fut = handler.provider_stream(params, sink_tx);
214                let result = tokio::select! {
215                    biased;
216                    _ = cancel.cancelled() => {
217                        forwarder.abort();
218                        emit_audit(true, "error", Some("canceled"), 0);
219                        return Some(Err("operation canceled".into()));
220                    }
221                    res = stream_fut => res,
222                };
223                let _ = forwarder.await;
224                if cancel.is_cancelled() {
225                    emit_audit(true, "error", Some("canceled"), 0);
226                    return Some(Err("operation canceled".into()));
227                }
228                // TODO(audit): tools_requested is reported as 0 for the streaming
229                // path until ProviderStreamEvent::ToolUse is wired through the
230                // forwarder; tool-use over streaming is not yet routed.
231                match result {
232                    Ok(complete) => {
233                        emit_audit(true, "ok", None, 0);
234                        return Some(Ok(serde_json::json!({
235                            "content": complete.content,
236                            "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
237                            "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
238                        })));
239                    }
240                    Err(e) => {
241                        emit_audit(true, "error", Some("provider_error"), 0);
242                        return Some(Err(format!("extension provider: {e}").into()));
243                    }
244                }
245            }
246            let result = if let Some(tools) = tools_shared {
247                let registry = tools.read().await;
248                crate::extensions::runtime::process::complete_provider_with_tools(
249                    handler.clone(),
250                    params,
251                    &registry,
252                    &hook_bus,
253                    || ToolContext {
254                        channels: ToolChannels {
255                            tx_delta: None,
256                            tx_events: None,
257                        },
258                        capabilities: ToolCapabilities {
259                            watcher_exit_path: None,
260                            tool_register_tx: None,
261                            session_manager: None,
262                            subagent_registry: None,
263                            event_queue: None,
264                            secret_prompt: None,
265                        },
266                        limits: ToolLimits {
267                            max_tool_output: 30000,
268                            bash_timeout: 30,
269                            bash_max_timeout: 300,
270                            subagent_timeout: 300,
271                        },
272                    },
273                    30000,
274                    8,
275                ).await
276            } else {
277                handler.provider_complete(params).await
278            };
279            if cancel.is_cancelled() {
280                emit_audit(false, "error", Some("canceled"), 0);
281                return Some(Err("operation canceled".into()));
282            }
283            // TODO(audit): tools_requested is reported as 0 here; the
284            // complete_provider_with_tools helper does not yet expose its
285            // observed tool-use iteration count. Wire that through when the
286            // helper grows a return-tuple or counter argument.
287            match result {
288                Ok(complete) => {
289                    let text = complete
290                        .content
291                        .iter()
292                        .filter_map(|block| block.get("text").and_then(|v| v.as_str()))
293                        .collect::<Vec<_>>()
294                        .join("");
295                    if !text.is_empty() {
296                        let _ = tx.send(crate::runtime::types::StreamEvent::Llm(
297                            crate::runtime::types::LlmEvent::Text(text)
298                        ));
299                    }
300                    emit_audit(false, "ok", None, 0);
301                    return Some(Ok(serde_json::json!({
302                        "content": complete.content,
303                        "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
304                        "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
305                    })));
306                }
307                Err(e) => {
308                    emit_audit(false, "error", Some("provider_error"), 0);
309                    return Some(Err(format!("extension provider: {e}").into()));
310                }
311            }
312        }
313        return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
314    }
315
316    let provider_keys = crate::core::config::get_provider_keys();
317    match resolve_route(model, &provider_keys) {
318        Provider::OpenAi(cfg) => {
319            let result = stream::call_oai_stream_inner(
320                &cfg, client, tools_schema, system_prompt, messages, tx,
321                temperature, max_tokens, thinking_budget, cancel,
322            ).await;
323            Some(result)
324        }
325        Provider::Codex(cfg) => {
326            let result = stream::call_codex_stream_inner(
327                &cfg, client, tools_schema, system_prompt, messages, tx,
328                temperature, max_tokens, cancel,
329            ).await;
330            Some(result)
331        }
332        Provider::Anthropic => None,
333        Provider::MissingKey(provider) => {
334            Some(Err(format!(
335                "No API key for '{}'. Set provider.{} in ~/.synaps-cli/config or the corresponding env var.",
336                provider, provider
337            ).into()))
338        }
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn resolves_openai_codex_without_requiring_eager_credentials() {
348        std::env::remove_var("OPENAI_CODEX_ACCESS_TOKEN");
349        match resolve_route("openai-codex/gpt-5.1-codex-mini", &BTreeMap::new()) {
350            Provider::Codex(cfg) => {
351                assert_eq!(cfg.provider, "openai-codex");
352                assert_eq!(cfg.model, "gpt-5.1-codex-mini");
353                assert!(cfg.base_url.contains("chatgpt.com/backend-api"));
354            }
355            other => panic!("expected Codex route, got {other:?}"),
356        }
357    }
358
359    #[test]
360    fn set_extension_manager_for_routing_overwrites_previous_manager() {
361        clear_extension_manager_for_routing();
362        let first = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
363        let second = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
364
365        set_extension_manager_for_routing(first.clone());
366        assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &first));
367
368        set_extension_manager_for_routing(second.clone());
369        assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &second));
370
371        clear_extension_manager_for_routing();
372        assert!(extension_manager_for_routing().is_none());
373    }
374}