1use std::collections::BTreeMap;
8
9pub mod types;
10pub mod wire;
11pub mod registry;
12pub mod catalog;
13pub mod reasoning;
14pub mod translate;
15pub mod stream;
16pub mod ping;
17
18use std::sync::Arc;
19
20use crate::extensions::manager::ExtensionManager;
21use crate::extensions::providers::ProviderRegistry;
22use crate::tools::{ToolCapabilities, ToolChannels, ToolContext, ToolLimits};
23
24pub use types::{
25 ChatMessage, ChatOptions, ChatRequest, FunctionCall, FunctionDefinition,
26 OaiEvent, ProviderConfig, StreamOptions, ToolCall, ToolChoice, ToolDefinition,
27};
28pub use wire::StreamDecoder;
29
30static EXTENSION_MANAGER: std::sync::RwLock<Option<Arc<tokio::sync::RwLock<ExtensionManager>>>> = std::sync::RwLock::new(None);
31
32pub fn set_extension_manager_for_routing(manager: Arc<tokio::sync::RwLock<ExtensionManager>>) {
33 *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = Some(manager);
34}
35
36pub fn clear_extension_manager_for_routing() {
37 *EXTENSION_MANAGER.write().expect("extension manager routing lock poisoned") = None;
38}
39
40pub fn extension_manager_for_routing() -> Option<Arc<tokio::sync::RwLock<ExtensionManager>>> {
41 EXTENSION_MANAGER
42 .read()
43 .expect("extension manager routing lock poisoned")
44 .clone()
45}
46
47#[derive(Debug, Clone)]
49pub enum Provider {
50 Anthropic,
52 OpenAi(ProviderConfig),
54 Codex(ProviderConfig),
56 MissingKey(String),
58}
59
60pub fn resolve_route(model: &str, provider_keys: &BTreeMap<String, String>) -> Provider {
66 if let Some((prefix, _rest)) = model.split_once('/') {
67 if prefix == "openai-codex" {
68 if let Some(cfg) = registry::resolve_codex_shorthand(model) {
69 return Provider::Codex(cfg);
70 }
71 return Provider::MissingKey(prefix.to_string());
72 }
73 if prefix == "local" || registry::providers().iter().any(|s| s.key == prefix) {
74 if let Some(cfg) = registry::resolve_shorthand(model, provider_keys) {
75 return Provider::OpenAi(cfg);
76 }
77 return Provider::MissingKey(prefix.to_string());
79 }
80 }
81 Provider::Anthropic
82}
83
84#[allow(clippy::too_many_arguments)]
93pub async fn try_route(
94 model: &str,
95 client: &reqwest::Client,
96 tools_schema: &std::sync::Arc<Vec<serde_json::Value>>,
97 system_prompt: &Option<String>,
98 messages: &[serde_json::Value],
99 tx: &tokio::sync::mpsc::UnboundedSender<crate::runtime::types::StreamEvent>,
100 temperature: Option<f32>,
101 max_tokens: Option<u32>,
102 thinking_budget: u32,
103 cancel: &tokio_util::sync::CancellationToken,
104) -> Option<Result<serde_json::Value, Box<dyn std::error::Error + Send + Sync>>> {
105 if let Some((plugin_id, provider_id, model_id)) = ProviderRegistry::parse_model_id(model) {
106 if let Some(manager) = extension_manager_for_routing() {
107 let provider_runtime_id = format!("{}:{}", plugin_id, provider_id);
108 let Some((handler, hook_bus, tools_shared, streaming, model_tool_use)) = ({
109 let manager = manager.read().await;
110 manager.provider(&provider_runtime_id).and_then(|provider| {
111 provider.handler.as_ref().map(|handler| {
112 let model_spec = provider.spec.models.iter().find(|m| m.id == model_id);
113 let streaming = model_spec
114 .and_then(|m| m.capabilities.get("streaming"))
115 .and_then(|v| v.as_bool())
116 .unwrap_or(false);
117 let model_tool_use = model_spec
118 .and_then(|m| m.capabilities.get("tool_use"))
119 .and_then(|v| v.as_bool())
120 .unwrap_or(false);
121 (handler.clone(), manager.hook_bus().clone(), manager.tools_shared(), streaming, model_tool_use)
122 })
123 })
124 }) else {
125 return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
126 };
127 let trust = match crate::extensions::trust::load_trust_state() {
131 Ok(t) => t,
132 Err(e) => {
133 tracing::warn!("trust.json corrupt or unreadable, failing closed: {e}");
134 return Some(Err(
135 format!("Cannot route to provider: trust state unreadable: {e}").into()
136 ));
137 }
138 };
139 if !crate::extensions::trust::is_provider_enabled(&trust, &provider_runtime_id) {
140 let _ = crate::extensions::audit::append_audit_entry(
141 &crate::extensions::audit::new_audit_entry(
142 plugin_id,
143 provider_id,
144 model_id,
145 false,
146 0,
147 false,
148 "blocked",
149 Some("trust_disabled".to_string()),
150 ),
151 );
152 return Some(Err(format!(
153 "Provider '{}' is disabled by user trust settings",
154 provider_runtime_id
155 ).into()));
156 }
157 let audit_plugin = plugin_id.to_string();
159 let audit_provider = provider_id.to_string();
160 let audit_model = model_id.to_string();
161 let tools_exposed = !tools_schema.is_empty();
162 let emit_audit = |streamed: bool, outcome: &str, error_class: Option<&str>, tools_requested: u32| {
163 let _ = crate::extensions::audit::append_audit_entry(
164 &crate::extensions::audit::new_audit_entry(
165 audit_plugin.clone(),
166 audit_provider.clone(),
167 audit_model.clone(),
168 tools_exposed,
169 tools_requested,
170 streamed,
171 outcome,
172 error_class.map(|s| s.to_string()),
173 ),
174 );
175 };
176 if cancel.is_cancelled() {
177 emit_audit(false, "error", Some("canceled"), 0);
178 return Some(Err("operation canceled".into()));
179 }
180 let params = crate::extensions::runtime::process::ProviderCompleteParams {
181 provider_id: provider_id.to_string(),
182 model_id: model_id.to_string(),
183 model: model.to_string(),
184 messages: messages.to_vec(),
185 system_prompt: system_prompt.clone(),
186 tools: tools_schema.as_ref().clone(),
187 temperature,
188 max_tokens,
189 thinking_budget,
190 };
191 let has_active_tools = model_tool_use && !tools_schema.is_empty();
192 if streaming && !has_active_tools {
194 let (sink_tx, mut sink_rx) = tokio::sync::mpsc::unbounded_channel::<crate::extensions::runtime::process::ProviderStreamEvent>();
195 let tx_clone = tx.clone();
196 let forwarder = tokio::spawn(async move {
197 use crate::extensions::runtime::process::ProviderStreamEvent;
198 while let Some(event) = sink_rx.recv().await {
199 match event {
200 ProviderStreamEvent::TextDelta { text } => {
201 let _ = tx_clone.send(crate::runtime::types::StreamEvent::Llm(
202 crate::runtime::types::LlmEvent::Text(text)
203 ));
204 }
205 ProviderStreamEvent::ToolUse { .. } => {
206 tracing::warn!("provider.stream tool_use event ignored (streaming tool-use not yet wired)");
207 }
208 _ => {}
210 }
211 }
212 });
213 let stream_fut = handler.provider_stream(params, sink_tx);
214 let result = tokio::select! {
215 biased;
216 _ = cancel.cancelled() => {
217 forwarder.abort();
218 emit_audit(true, "error", Some("canceled"), 0);
219 return Some(Err("operation canceled".into()));
220 }
221 res = stream_fut => res,
222 };
223 let _ = forwarder.await;
224 if cancel.is_cancelled() {
225 emit_audit(true, "error", Some("canceled"), 0);
226 return Some(Err("operation canceled".into()));
227 }
228 match result {
232 Ok(complete) => {
233 emit_audit(true, "ok", None, 0);
234 return Some(Ok(serde_json::json!({
235 "content": complete.content,
236 "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
237 "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
238 })));
239 }
240 Err(e) => {
241 emit_audit(true, "error", Some("provider_error"), 0);
242 return Some(Err(format!("extension provider: {e}").into()));
243 }
244 }
245 }
246 let result = if let Some(tools) = tools_shared {
247 let registry = tools.read().await;
248 crate::extensions::runtime::process::complete_provider_with_tools(
249 handler.clone(),
250 params,
251 ®istry,
252 &hook_bus,
253 || ToolContext {
254 channels: ToolChannels {
255 tx_delta: None,
256 tx_events: None,
257 },
258 capabilities: ToolCapabilities {
259 watcher_exit_path: None,
260 tool_register_tx: None,
261 session_manager: None,
262 subagent_registry: None,
263 event_queue: None,
264 secret_prompt: None,
265 },
266 limits: ToolLimits {
267 max_tool_output: 30000,
268 bash_timeout: 30,
269 bash_max_timeout: 300,
270 subagent_timeout: 300,
271 },
272 },
273 30000,
274 8,
275 ).await
276 } else {
277 handler.provider_complete(params).await
278 };
279 if cancel.is_cancelled() {
280 emit_audit(false, "error", Some("canceled"), 0);
281 return Some(Err("operation canceled".into()));
282 }
283 match result {
288 Ok(complete) => {
289 let text = complete
290 .content
291 .iter()
292 .filter_map(|block| block.get("text").and_then(|v| v.as_str()))
293 .collect::<Vec<_>>()
294 .join("");
295 if !text.is_empty() {
296 let _ = tx.send(crate::runtime::types::StreamEvent::Llm(
297 crate::runtime::types::LlmEvent::Text(text)
298 ));
299 }
300 emit_audit(false, "ok", None, 0);
301 return Some(Ok(serde_json::json!({
302 "content": complete.content,
303 "stop_reason": complete.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
304 "usage": complete.usage.unwrap_or_else(|| serde_json::json!({}))
305 })));
306 }
307 Err(e) => {
308 emit_audit(false, "error", Some("provider_error"), 0);
309 return Some(Err(format!("extension provider: {e}").into()));
310 }
311 }
312 }
313 return Some(Err(format!("Extension provider model '{}' is not available", model).into()));
314 }
315
316 let provider_keys = crate::core::config::get_provider_keys();
317 match resolve_route(model, &provider_keys) {
318 Provider::OpenAi(cfg) => {
319 let result = stream::call_oai_stream_inner(
320 &cfg, client, tools_schema, system_prompt, messages, tx,
321 temperature, max_tokens, thinking_budget, cancel,
322 ).await;
323 Some(result)
324 }
325 Provider::Codex(cfg) => {
326 let result = stream::call_codex_stream_inner(
327 &cfg, client, tools_schema, system_prompt, messages, tx,
328 temperature, max_tokens, cancel,
329 ).await;
330 Some(result)
331 }
332 Provider::Anthropic => None,
333 Provider::MissingKey(provider) => {
334 Some(Err(format!(
335 "No API key for '{}'. Set provider.{} in ~/.synaps-cli/config or the corresponding env var.",
336 provider, provider
337 ).into()))
338 }
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn resolves_openai_codex_without_requiring_eager_credentials() {
348 std::env::remove_var("OPENAI_CODEX_ACCESS_TOKEN");
349 match resolve_route("openai-codex/gpt-5.1-codex-mini", &BTreeMap::new()) {
350 Provider::Codex(cfg) => {
351 assert_eq!(cfg.provider, "openai-codex");
352 assert_eq!(cfg.model, "gpt-5.1-codex-mini");
353 assert!(cfg.base_url.contains("chatgpt.com/backend-api"));
354 }
355 other => panic!("expected Codex route, got {other:?}"),
356 }
357 }
358
359 #[test]
360 fn set_extension_manager_for_routing_overwrites_previous_manager() {
361 clear_extension_manager_for_routing();
362 let first = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
363 let second = Arc::new(tokio::sync::RwLock::new(ExtensionManager::new(Arc::new(crate::extensions::hooks::HookBus::new()))));
364
365 set_extension_manager_for_routing(first.clone());
366 assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &first));
367
368 set_extension_manager_for_routing(second.clone());
369 assert!(Arc::ptr_eq(&extension_manager_for_routing().unwrap(), &second));
370
371 clear_extension_manager_for_routing();
372 assert!(extension_manager_for_routing().is_none());
373 }
374}