1use std::collections::BTreeMap;
8
9pub mod types;
10pub mod wire;
11pub mod registry;
12pub mod catalog;
13pub mod reasoning;
14pub mod translate;
15pub mod stream;
16pub mod ping;
17
18use std::sync::Arc;
19
20use crate::extensions::manager::ExtensionManager;
21use crate::extensions::providers::ProviderRegistry;
22use crate::tools::{ToolCapabilities, ToolChannels, ToolContext, ToolLimits};
23
24pub use types::{
25 ChatMessage, ChatOptions, ChatRequest, FunctionCall, FunctionDefinition,
26 OaiEvent, ProviderConfig, StreamOptions, ToolCall, ToolChoice, ToolDefinition,
27};
28pub use wire::StreamDecoder;
29
30static EXTENSION_MANAGER: std::sync::RwLock<Option<Arc<tokio::sync::RwLock<ExtensionManager>>>> = std::sync::RwLock::new(None);
31
32pub fn set_extension_manager_for_routing(manager: Arc<tokio::sync::RwLock<ExtensionManager>>) {
33 *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = Some(manager);
34}
35
36pub fn clear_extension_manager_for_routing() {
37 *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = None;
38}
39
40pub fn extension_manager_for_routing() -> Option<Arc<tokio::sync::RwLock<ExtensionManager>>> {
41 EXTENSION_MANAGER
42 .read()
43 .expect("extension manager routing lock poisoned")
44 .clone()
45}
46
47#[derive(Debug, Clone)]
49pub enum Provider {
50 Anthropic,
52 OpenAi(ProviderConfig),
54 Codex(ProviderConfig),
56 MissingKey(String),
58}
59
60pub fn resolve_route(model: &str, provider_keys: &BTreeMap<String, String>) -> Provider {
66 if let Some((prefix, _rest)) = model.split_once('/') {
67 if prefix == "openai-codex" {
68 if let Some(cfg) = registry::resolve_codex_shorthand(model) {
69 return Provider::Codex(cfg);
70 }
71 return Provider::MissingKey(prefix.to_string());
72 }
73 if prefix == "local" || registry::providers().iter().any(|s| s.key == prefix) {
74 if let Some(cfg) = registry::resolve_shorthand(model, provider_keys) {
75 return Provider::OpenAi(cfg);
76 }
77 return Provider::MissingKey(prefix.to_string());
79 }
80 }
81 Provider::Anthropic
82}
83
84pub async fn try_route(
93 model: &str,
94 client: &reqwest::Client,
95 tools_schema: &std::sync::Arc<Vec<serde_json::Value>>,
96 system_prompt: &Option<String>,
97 messages: &[serde_json::Value],
98 tx: &tokio::sync::mpsc::UnboundedSender<crate::runtime::types::StreamEvent>,
99 temperature: Option<f32>,
100 max_tokens: Option<u32>,
101 thinking_budget: u32,
102 cancel: &tokio_util::sync::CancellationToken,
103) -> Option<Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>> {
104 if let Some((plugin_id, provider_id, model_id)) = ProviderRegistry::parse_model_id(model) {
105 if let Some(manager) = extension_manager_for_routing() {
106 let provider_runtime_id = format!("{}:{}", plugin_id, provider_id);
107 let Some((handler, hook_bus, tools_shared, streaming, model_tool_use)) = ({
108 let manager = manager.read().await;
109 manager.provider(&provider_runtime_id).and_then(|provider| {
110 provider.handler.as_ref().map(|handler| {
111 let model_spec = provider.spec.models.iter().find(|m| m.id == model_id);
112 let streaming = model_spec
113 .and_then(|m| m.capabilities.get("streaming"))
114 .and_then(|v| v.as_bool())
115 .unwrap_or(false);
116 let model_tool_use = model_spec
117 .and_then(|m| m.capabilities.get("tool_use"))
118 .and_then(|v| v.as_bool())
119 .unwrap_or(false);
120 (handler.clone(), manager.hook_bus().clone(), manager.tools_shared(), streaming, model_tool_use)
121 })
122 })
123 }) else {
124 return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
125 };
126 let trust = match crate::extensions::trust::load_trust_state() {
130 Ok(t) => t,
131 Err(e) => {
132 tracing::warn!("trust.json corrupt or unreadable, failing closed: {e}");
133 return Some(Err(
134 format!("Cannot route to provider: trust state unreadable: {e}").into()
135 ));
136 }
137 };
138 if !crate::extensions::trust::is_provider_enabled(&trust, &provider_runtime_id) {
139 let _ = crate::extensions::audit::append_audit_entry(
140 &crate::extensions::audit::new_audit_entry(
141 plugin_id,
142 provider_id,
143 model_id,
144 false,
145 0,
146 false,
147 "blocked",
148 Some("trust_disabled".to_string()),
149 ),
150 );
151 return Some(Err(format!(
152 "Provider '{}' is disabled by user trust settings",
153 provider_runtime_id
154 ).into()));
155 }
156 let audit_plugin = plugin_id.to_string();
158 let audit_provider = provider_id.to_string();
159 let audit_model = model_id.to_string();
160 let tools_exposed = !tools_schema.is_empty();
161 let emit_audit = |streamed: bool, outcome: &str, error_class: Option<&str>, tools_requested: u32| {
162 let _ = crate::extensions::audit::append_audit_entry(
163 &crate::extensions::audit::new_audit_entry(
164 audit_plugin.clone(),
165 audit_provider.clone(),
166 audit_model.clone(),
167 tools_exposed,
168 tools_requested,
169 streamed,
170 outcome,
171 error_class.map(|s| s.to_string()),
172 ),
173 );
174 };
175 if cancel.is_cancelled() {
176 emit_audit(false, "error", Some("canceled"), 0);
177 return Some(Err("operation canceled".into()));
178 }
179 let params = crate::extensions::runtime::process::ProviderCompleteParams {
180 provider_id: provider_id.to_string(),
181 model_id: model_id.to_string(),
182 model: model.to_string(),
183 messages: messages.to_vec(),
184 system_prompt: system_prompt.clone(),
185 tools: tools_schema.as_ref().clone(),
186 temperature,
187 max_tokens,
188 thinking_budget,
189 };
190 let has_active_tools = model_tool_use && !tools_schema.is_empty();
191 if streaming && !has_active_tools {
193 let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<crate::extensions::runtime::process::ProviderStreamEvent>();
194 let tx_clone = tx.clone();
195 let forwarder = tokio::spawn(async move {
196 use crate::extensions::runtime::process::ProviderStreamEvent;
197 while let Some(event) = sink_rx.recv().await {
198 match event {
199 ProviderStreamEvent::TextDelta { text } => {
200 let _ = tx_clone.send(crate::runtime::types::StreamEvent::Llm(
201 crate::runtime::types::LlmEvent::Text(text)
202 ));
203 }
204 ProviderStreamEvent::ToolUse { .. } => {
205 tracing::warn!("provider.stream tool_use event ignored (streaming tool-use not yet wired)");
206 }
207 _ => {}
209 }
210 }
211 });
212 let stream_fut = handler.provider_stream(params, sink_tx);
213 let result = tokio::select! {
214 biased;
215 _ = cancel.cancelled() => {
216 forwarder.abort();
217 emit_audit(true, "error", Some("canceled"), 0);
218 return Some(Err("operation canceled".into()));
219 }
220 res = stream_fut => res,
221 };
222 let _ = forwarder.await;
223 if cancel.is_cancelled() {
224 emit_audit(true, "error", Some("canceled"), 0);
225 return Some(Err("operation canceled".into()));
226 }
227 match result {
231 Ok(complete) => {
232 emit_audit(true, "ok", None, 0);
233 return Some(Ok(serde_json::json!({
234 "content": complete.content,
235 "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
236 "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
237 })));
238 }
239 Err(e) => {
240 emit_audit(true, "error", Some("provider_error"), 0);
241 return Some(Err(format!("extension provider: {e}").into()));
242 }
243 }
244 }
245 let result = if let Some(tools) = tools_shared {
246 let registry = tools.read().await;
247 crate::extensions::runtime::process::complete_provider_with_tools(
248 handler.clone(),
249 params,
250 ®istry,
251 &hook_bus,
252 || ToolContext {
253 channels: ToolChannels {
254 tx_delta: None,
255 tx_events: None,
256 },
257 capabilities: ToolCapabilities {
258 watcher_exit_path: None,
259 tool_register_tx: None,
260 session_manager: None,
261 subagent_registry: None,
262 event_queue: None,
263 secret_prompt: None,
264 },
265 limits: ToolLimits {
266 max_tool_output: 30000,
267 bash_timeout: 30,
268 bash_max_timeout: 300,
269 subagent_timeout: 300,
270 },
271 },
272 30000,
273 8,
274 ).await
275 } else {
276 handler.provider_complete(params).await
277 };
278 if cancel.is_cancelled() {
279 emit_audit(false, "error", Some("canceled"), 0);
280 return Some(Err("operation canceled".into()));
281 }
282 match result {
287 Ok(complete) => {
288 let text = complete
289 .content
290 .iter()
291 .filter_map(|block| block.get("text").and_then(|v| v.as_str()))
292 .collect::<Vec<_>>()
293 .join("");
294 if !text.is_empty() {
295 let _ = tx.send(crate::runtime::types::StreamEvent::Llm(
296 crate::runtime::types::LlmEvent::Text(text)
297 ));
298 }
299 emit_audit(false, "ok", None, 0);
300 return Some(Ok(serde_json::json!({
301 "content": complete.content,
302 "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
303 "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
304 })));
305 }
306 Err(e) => {
307 emit_audit(false, "error", Some("provider_error"), 0);
308 return Some(Err(format!("extension provider: {e}").into()));
309 }
310 }
311 }
312 return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
313 }
314
315 let provider_keys = crate::core::config::get_provider_keys();
316 match resolve_route(model, &provider_keys) {
317 Provider::OpenAi(cfg) => {
318 let result = stream::call_oai_stream_inner(
319 &cfg, client, tools_schema, system_prompt, messages, tx,
320 temperature, max_tokens, thinking_budget, cancel,
321 ).await;
322 Some(result)
323 }
324 Provider::Codex(cfg) => {
325 let result = stream::call_codex_stream_inner(
326 &cfg, client, tools_schema, system_prompt, messages, tx,
327 temperature, max_tokens, cancel,
328 ).await;
329 Some(result)
330 }
331 Provider::Anthropic => None,
332 Provider::MissingKey(provider) => {
333 Some(Err(format!(
334 "No API key for '{}'. Set provider.{} in ~/.synaps-cli/config or the corresponding env var.",
335 provider, provider
336 ).into()))
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn resolves_openai_codex_without_requiring_eager_credentials() {
347 std::env::remove_var("OPENAI_CODEX_ACCESS_TOKEN");
348 match resolve_route("openai-codex/gpt-5.1-codex-mini", &BTreeMap::new()) {
349 Provider::Codex(cfg) => {
350 assert_eq!(cfg.provider, "openai-codex");
351 assert_eq!(cfg.model, "gpt-5.1-codex-mini");
352 assert!(cfg.base_url.contains("chatgpt.com/backend-api"));
353 }
354 other => panic!("expected Codex route, got {other:?}"),
355 }
356 }
357
358 #[test]
359 fn set_extension_manager_for_routing_overwrites_previous_manager() {
360 clear_extension_manager_for_routing();
361 let first = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
362 let second = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
363
364 set_extension_manager_for_routing(first.clone());
365 assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &first));
366
367 set_extension_manager_for_routing(second.clone());
368 assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &second));
369
370 clear_extension_manager_for_routing();
371 assert!(extension_manager_for_routing().is_none());
372 }
373}