1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::{AgentBus, BusEnvelope};
11use crate::cli::ServeArgs;
12use crate::cloudevents::parse_cloud_event;
13use crate::cognition::{
14 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
15 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
16 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
17 executor::DecisionReceipt,
18};
19use crate::config::Config;
20use crate::k8s::K8sManager;
21use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
22use anyhow::Result;
23use auth::AuthState;
24use axum::{
25 Router,
26 body::Body,
27 extract::Path,
28 extract::{Query, State},
29 http::{HeaderMap, Method, Request, StatusCode},
30 middleware::{self, Next},
31 response::sse::{Event, KeepAlive, Sse},
32 response::{IntoResponse, Json, Response},
33 routing::{get, post},
34};
35use futures::{StreamExt, future::join_all, stream};
36use http::HeaderValue;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::convert::Infallible;
40use std::sync::Arc;
41use std::time::Duration;
42use tokio::sync::{Mutex, broadcast};
43use tonic_web::GrpcWebLayer;
44use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
45use tower_http::trace::TraceLayer;
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct KnativeTask {
50 pub task_id: String,
51 pub title: String,
52 pub description: String,
53 pub agent_type: String,
54 pub priority: i32,
55 pub received_at: chrono::DateTime<chrono::Utc>,
56 pub status: String,
57}
58
59#[derive(Clone)]
61pub struct KnativeTaskQueue {
62 tasks: Arc<Mutex<Vec<KnativeTask>>>,
63}
64
65impl KnativeTaskQueue {
66 pub fn new() -> Self {
67 Self {
68 tasks: Arc::new(Mutex::new(Vec::new())),
69 }
70 }
71
72 pub async fn push(&self, task: KnativeTask) {
73 self.tasks.lock().await.push(task);
74 }
75
76 #[allow(dead_code)]
77 pub async fn pop(&self) -> Option<KnativeTask> {
78 self.tasks.lock().await.pop()
79 }
80
81 pub async fn list(&self) -> Vec<KnativeTask> {
82 self.tasks.lock().await.clone()
83 }
84
85 #[allow(dead_code)]
86 pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
87 self.tasks
88 .lock()
89 .await
90 .iter()
91 .find(|t| t.task_id == task_id)
92 .cloned()
93 }
94
95 pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
96 let mut tasks = self.tasks.lock().await;
97 if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
98 task.status = status.to_string();
99 true
100 } else {
101 false
102 }
103 }
104}
105
106impl Default for KnativeTaskQueue {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct RegisteredTool {
115 pub id: String,
116 pub name: String,
117 pub description: String,
118 pub version: String,
119 pub endpoint: String,
120 pub capabilities: Vec<String>,
121 pub parameters: serde_json::Value,
122 pub registered_at: chrono::DateTime<chrono::Utc>,
123 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
124 pub expires_at: chrono::DateTime<chrono::Utc>,
125}
126
127#[derive(Clone)]
129pub struct ToolRegistry {
130 tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
131}
132
133impl Default for ToolRegistry {
134 fn default() -> Self {
135 Self::new()
136 }
137}
138
139impl ToolRegistry {
140 pub fn new() -> Self {
141 Self {
142 tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
143 }
144 }
145
146 pub async fn register(&self, tool: RegisteredTool) {
148 let mut tools = self.tools.write().await;
149 tools.insert(tool.id.clone(), tool);
150 }
151
152 #[allow(dead_code)]
154 pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
155 let tools = self.tools.read().await;
156 tools.get(id).cloned()
157 }
158
159 pub async fn list(&self) -> Vec<RegisteredTool> {
161 let tools = self.tools.read().await;
162 let now = chrono::Utc::now();
163 tools
164 .values()
165 .filter(|t| t.expires_at > now)
166 .cloned()
167 .collect()
168 }
169
170 pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
172 let mut tools = self.tools.write().await;
173 if let Some(tool) = tools.get_mut(id) {
174 let now = chrono::Utc::now();
175 tool.last_heartbeat = now;
176 tool.expires_at = now + Duration::from_secs(90);
177 return Some(tool.clone());
178 }
179 None
180 }
181
182 pub async fn cleanup(&self) {
184 let mut tools = self.tools.write().await;
185 let now = chrono::Utc::now();
186 tools.retain(|_, t| t.expires_at > now);
187 }
188}
189
190#[derive(Clone)]
192pub struct AppState {
193 pub config: Arc<Config>,
194 pub cognition: Arc<CognitionRuntime>,
195 pub audit_log: AuditLog,
196 pub k8s: Arc<K8sManager>,
197 pub auth: AuthState,
198 pub bus: Arc<AgentBus>,
199 pub knative_tasks: KnativeTaskQueue,
200 pub tool_registry: ToolRegistry,
201}
202
203async fn audit_middleware(
205 State(state): State<AppState>,
206 request: Request<Body>,
207 next: Next,
208) -> Response {
209 let method = request.method().clone();
210 let path = request.uri().path().to_string();
211 let started = std::time::Instant::now();
212
213 let response = next.run(request).await;
214
215 let duration_ms = started.elapsed().as_millis() as u64;
216 let status = response.status().as_u16();
217 let outcome = if status < 400 {
218 AuditOutcome::Success
219 } else if status == 401 || status == 403 {
220 AuditOutcome::Denied
221 } else {
222 AuditOutcome::Failure
223 };
224
225 state
226 .audit_log
227 .record(audit::AuditEntry {
228 id: uuid::Uuid::new_v4().to_string(),
229 timestamp: chrono::Utc::now(),
230 category: AuditCategory::Api,
231 action: format!("{} {}", method, path),
232 principal: None,
233 outcome,
234 detail: Some(serde_json::json!({ "status": status })),
235 duration_ms: Some(duration_ms),
236 okr_id: None,
237 okr_run_id: None,
238 relay_id: None,
239 session_id: None,
240 })
241 .await;
242
243 response
244}
245
246struct PolicyRule {
249 pattern: &'static str,
250 methods: Option<&'static [&'static str]>,
251 permission: &'static str,
252}
253
254const POLICY_RULES: &[PolicyRule] = &[
255 PolicyRule {
257 pattern: "/health",
258 methods: None,
259 permission: "",
260 },
261 PolicyRule {
262 pattern: "/task",
263 methods: None,
264 permission: "",
265 },
266 PolicyRule {
267 pattern: "/v1/knative/",
268 methods: None,
269 permission: "",
270 },
271 PolicyRule {
273 pattern: "/v1/models",
274 methods: Some(&["GET"]),
275 permission: "agent:read",
276 },
277 PolicyRule {
278 pattern: "/v1/chat/completions",
279 methods: Some(&["POST"]),
280 permission: "agent:execute",
281 },
282 PolicyRule {
284 pattern: "/v1/tools",
285 methods: Some(&["GET"]),
286 permission: "agent:read",
287 },
288 PolicyRule {
289 pattern: "/v1/tools/register",
290 methods: Some(&["POST"]),
291 permission: "agent:execute",
292 },
293 PolicyRule {
294 pattern: "/v1/tools/",
295 methods: Some(&["POST"]),
296 permission: "agent:execute",
297 },
298 PolicyRule {
299 pattern: "/a2a/",
300 methods: None,
301 permission: "",
302 },
303 PolicyRule {
305 pattern: "/v1/k8s/scale",
306 methods: Some(&["POST"]),
307 permission: "admin:access",
308 },
309 PolicyRule {
310 pattern: "/v1/k8s/restart",
311 methods: Some(&["POST"]),
312 permission: "admin:access",
313 },
314 PolicyRule {
315 pattern: "/v1/k8s/",
316 methods: Some(&["GET"]),
317 permission: "admin:access",
318 },
319 PolicyRule {
321 pattern: "/v1/k8s/subagent",
322 methods: Some(&["POST", "DELETE"]),
323 permission: "admin:access",
324 },
325 PolicyRule {
327 pattern: "/v1/plugins",
328 methods: Some(&["GET"]),
329 permission: "agent:read",
330 },
331 PolicyRule {
333 pattern: "/v1/audit",
334 methods: None,
335 permission: "admin:access",
336 },
337 PolicyRule {
339 pattern: "/v1/audit/replay",
340 methods: Some(&["GET"]),
341 permission: "admin:access",
342 },
343 PolicyRule {
345 pattern: "/v1/cognition/start",
346 methods: Some(&["POST"]),
347 permission: "agent:execute",
348 },
349 PolicyRule {
350 pattern: "/v1/cognition/stop",
351 methods: Some(&["POST"]),
352 permission: "agent:execute",
353 },
354 PolicyRule {
355 pattern: "/v1/cognition/",
356 methods: Some(&["GET"]),
357 permission: "agent:read",
358 },
359 PolicyRule {
361 pattern: "/v1/swarm/personas",
362 methods: Some(&["POST"]),
363 permission: "agent:execute",
364 },
365 PolicyRule {
366 pattern: "/v1/swarm/",
367 methods: Some(&["POST"]),
368 permission: "agent:execute",
369 },
370 PolicyRule {
371 pattern: "/v1/swarm/",
372 methods: Some(&["GET"]),
373 permission: "agent:read",
374 },
375 PolicyRule {
377 pattern: "/v1/bus/stream",
378 methods: Some(&["GET"]),
379 permission: "agent:read",
380 },
381 PolicyRule {
383 pattern: "/v1/bus/publish",
384 methods: Some(&["POST"]),
385 permission: "agent:execute",
386 },
387 PolicyRule {
390 pattern: "/api/session/",
391 methods: Some(&["POST"]),
392 permission: "agent:execute",
393 },
394 PolicyRule {
395 pattern: "/api/session",
396 methods: Some(&["POST"]),
397 permission: "sessions:write",
398 },
399 PolicyRule {
400 pattern: "/api/session",
401 methods: Some(&["GET"]),
402 permission: "sessions:read",
403 },
404 PolicyRule {
406 pattern: "/mcp/v1/tools",
407 methods: Some(&["GET"]),
408 permission: "agent:read",
409 },
410 PolicyRule {
412 pattern: "/v1/agent/tasks",
413 methods: Some(&["GET"]),
414 permission: "agent:read",
415 },
416 PolicyRule {
417 pattern: "/v1/agent/tasks",
418 methods: Some(&["POST"]),
419 permission: "agent:execute",
420 },
421 PolicyRule {
422 pattern: "/v1/agent/tasks/",
423 methods: Some(&["GET"]),
424 permission: "agent:read",
425 },
426 PolicyRule {
428 pattern: "/v1/worker/",
429 methods: Some(&["GET"]),
430 permission: "agent:read",
431 },
432 PolicyRule {
433 pattern: "/v1/agent/workers",
434 methods: Some(&["GET"]),
435 permission: "agent:read",
436 },
437 PolicyRule {
439 pattern: "/v1/tasks/dispatch",
440 methods: Some(&["POST"]),
441 permission: "agent:execute",
442 },
443 PolicyRule {
445 pattern: "/v1/voice/",
446 methods: Some(&["GET"]),
447 permission: "agent:read",
448 },
449 PolicyRule {
450 pattern: "/v1/voice/",
451 methods: Some(&["POST", "DELETE"]),
452 permission: "agent:execute",
453 },
454 PolicyRule {
456 pattern: "/v1/agent/codebases/",
457 methods: Some(&["POST"]),
458 permission: "agent:execute",
459 },
460 PolicyRule {
462 pattern: "/v1/cognition/proposals/",
463 methods: Some(&["POST"]),
464 permission: "agent:execute",
465 },
466 PolicyRule {
468 pattern: "/api/version",
469 methods: None,
470 permission: "agent:read",
471 },
472 PolicyRule {
473 pattern: "/api/config",
474 methods: None,
475 permission: "agent:read",
476 },
477 PolicyRule {
478 pattern: "/api/provider",
479 methods: None,
480 permission: "agent:read",
481 },
482 PolicyRule {
483 pattern: "/api/agent",
484 methods: None,
485 permission: "agent:read",
486 },
487];
488
489fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
492 for rule in POLICY_RULES {
493 let matches = if rule.pattern.ends_with('/') {
494 path.starts_with(rule.pattern)
495 } else {
496 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
497 };
498 if matches {
499 if let Some(allowed_methods) = rule.methods
500 && !allowed_methods.contains(&method)
501 {
502 continue;
503 }
504 return Some(rule.permission);
505 }
506 }
507 None
508}
509
510async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
517 let path = request.uri().path().to_string();
518 let method = request.method().as_str().to_string();
519
520 let permission = match match_policy_rule(&path, &method) {
521 None | Some("") => return Ok(next.run(request).await),
522 Some(perm) => perm,
523 };
524
525 let user = policy::PolicyUser {
529 user_id: "bearer-token-user".to_string(),
530 roles: vec!["admin".to_string()],
531 tenant_id: None,
532 scopes: vec![],
533 auth_source: "static_token".to_string(),
534 };
535
536 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
537 tracing::warn!(
538 path = %path,
539 method = %method,
540 permission = %permission,
541 "Policy middleware denied request"
542 );
543 return Err(status);
544 }
545
546 Ok(next.run(request).await)
547}
548
549pub async fn serve(args: ServeArgs) -> Result<()> {
551 let t0 = std::time::Instant::now();
552 tracing::info!("[startup] begin");
553 let config = Config::load().await?;
554 tracing::info!(
555 elapsed_ms = t0.elapsed().as_millis(),
556 "[startup] config loaded"
557 );
558 let mut cognition = CognitionRuntime::new_from_env();
559 tracing::info!(
560 elapsed_ms = t0.elapsed().as_millis(),
561 "[startup] cognition runtime created"
562 );
563
564 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
566 tracing::info!(
567 elapsed_ms = t0.elapsed().as_millis(),
568 "[startup] tools registered"
569 );
570 let cognition = Arc::new(cognition);
571
572 let audit_log = AuditLog::from_env();
574 let _ = audit::init_audit_log(audit_log.clone());
575
576 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
578 let k8s = Arc::new(K8sManager::new().await);
579 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
580 if k8s.is_available() {
581 tracing::info!("K8s self-deployment enabled");
582 }
583
584 let auth_state = AuthState::from_env();
586 tracing::info!(
587 token_len = auth_state.token().len(),
588 "Auth is mandatory. Only /health is served without a Bearer token."
589 );
590 tracing::info!(
591 audit_entries = audit_log.count().await,
592 "Audit log initialized"
593 );
594
595 let bus = AgentBus::new().into_arc();
597 crate::bus::set_global(bus.clone());
598
599 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
601
602 tracing::info!(
603 elapsed_ms = t0.elapsed().as_millis(),
604 "[startup] bus created"
605 );
606
607 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
608 tracing::info!(
609 elapsed_ms = t0.elapsed().as_millis(),
610 "[startup] auto-starting cognition"
611 );
612 if let Err(error) = cognition.start(None).await {
613 tracing::warn!(%error, "Failed to auto-start cognition loop");
614 } else {
615 tracing::info!("Perpetual cognition auto-started");
616 }
617 }
618
619 tracing::info!(
620 elapsed_ms = t0.elapsed().as_millis(),
621 "[startup] building routes"
622 );
623 let addr = format!("{}:{}", args.hostname, args.port);
624
625 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
627 let a2a_server = a2a::server::A2AServer::with_bus(agent_card.clone(), bus.clone());
628
629 let a2a_router = a2a_server.clone().router();
631 let a2a_nested_router = a2a_server.router();
632
633 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
635 .ok()
636 .and_then(|p| p.parse::<u16>().ok())
637 .unwrap_or(50051);
638 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
639 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
640 let grpc_service = grpc_store.into_service();
641 let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
642 tokio::spawn(async move {
643 tracing::info!(addr = %grpc_addr, "Starting gRPC A2A server");
644
645 let allowed_origins = [
647 HeaderValue::from_static("https://codetether.run"),
648 HeaderValue::from_static("https://api.codetether.run"),
649 HeaderValue::from_static("https://docs.codetether.run"),
650 HeaderValue::from_static("https://codetether.com"),
651 HeaderValue::from_static("http://localhost:3000"),
652 HeaderValue::from_static("http://localhost:3001"),
653 ];
654 let cors = CorsLayer::new()
655 .allow_origin(AllowOrigin::list(allowed_origins))
656 .allow_methods(AllowMethods::any())
657 .allow_headers(AllowHeaders::any())
658 .expose_headers(tower_http::cors::ExposeHeaders::list([
659 http::header::HeaderName::from_static("grpc-status"),
660 http::header::HeaderName::from_static("grpc-message"),
661 ]));
662
663 if let Err(error) = tonic::transport::Server::builder()
664 .accept_http1(true)
665 .layer(cors)
666 .layer(GrpcWebLayer::new())
667 .add_service(grpc_service)
668 .add_service(voice_service)
669 .serve(grpc_addr)
670 .await
671 {
672 tracing::warn!(addr = %grpc_addr, error = ?error, "gRPC A2A server exited");
673 }
674 });
675
676 let state = AppState {
677 config: Arc::new(config),
678 cognition,
679 audit_log,
680 k8s,
681 auth: auth_state.clone(),
682 bus,
683 knative_tasks: KnativeTaskQueue::new(),
684 tool_registry: ToolRegistry::new(),
685 };
686
687 let tool_registry = state.tool_registry.clone();
689 tokio::spawn(async move {
690 let mut interval = tokio::time::interval(Duration::from_secs(15));
691 loop {
692 interval.tick().await;
693 tool_registry.cleanup().await;
694 tracing::debug!("Tool reaper ran cleanup");
695 }
696 });
697
698 let app = Router::new()
699 .route("/health", get(health))
701 .route("/task", post(receive_task_event))
703 .route("/v1/knative/tasks", get(list_knative_tasks))
705 .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
706 .route(
707 "/v1/knative/tasks/{task_id}/claim",
708 post(claim_knative_task),
709 )
710 .route(
711 "/v1/knative/tasks/{task_id}/complete",
712 post(complete_knative_task),
713 )
714 .route("/api/version", get(get_version))
716 .route("/api/session", get(list_sessions).post(create_session))
717 .route("/api/session/{id}", get(get_session))
718 .route("/api/session/{id}/prompt", post(prompt_session))
719 .route("/api/config", get(get_config))
720 .route("/api/provider", get(list_providers))
721 .route("/api/agent", get(list_agents))
722 .route("/v1/models", get(list_openai_models))
724 .route("/v1/chat/completions", post(openai_chat_completions))
725 .route("/v1/cognition/start", post(start_cognition))
727 .route("/v1/cognition/stop", post(stop_cognition))
728 .route("/v1/cognition/status", get(get_cognition_status))
729 .route("/v1/cognition/stream", get(stream_cognition))
730 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
731 .route("/v1/swarm/personas", post(create_persona))
733 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
734 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
735 .route("/v1/swarm/lineage", get(get_swarm_lineage))
736 .route("/v1/cognition/beliefs", get(list_beliefs))
738 .route("/v1/cognition/beliefs/{id}", get(get_belief))
739 .route("/v1/cognition/attention", get(list_attention))
740 .route("/v1/cognition/proposals", get(list_proposals))
741 .route(
742 "/v1/cognition/proposals/{id}/approve",
743 post(approve_proposal),
744 )
745 .route("/v1/cognition/receipts", get(list_receipts))
746 .route("/v1/cognition/workspace", get(get_workspace))
747 .route("/v1/cognition/governance", get(get_governance))
748 .route("/v1/cognition/personas/{id}", get(get_persona))
749 .route("/v1/audit", get(list_audit_entries))
751 .route("/v1/audit/replay", get(replay_session_events))
753 .route("/v1/audit/replay/index", get(list_session_event_files))
754 .route("/v1/k8s/status", get(get_k8s_status))
756 .route("/v1/k8s/scale", post(k8s_scale))
757 .route("/v1/k8s/restart", post(k8s_restart))
758 .route("/v1/k8s/pods", get(k8s_list_pods))
759 .route("/v1/k8s/actions", get(k8s_actions))
760 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
761 .route(
762 "/v1/k8s/subagent/{id}",
763 axum::routing::delete(k8s_delete_subagent),
764 )
765 .route("/v1/plugins", get(list_plugins))
767 .route("/v1/tools", get(list_tools))
769 .route("/v1/tools/register", post(register_tool))
770 .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
771 .route("/mcp/v1/tools", get(list_tools))
773 .route(
775 "/v1/agent/tasks",
776 get(list_agent_tasks).post(create_agent_task),
777 )
778 .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
779 .route(
780 "/v1/agent/tasks/{task_id}/output",
781 get(get_agent_task_output).post(agent_task_output),
782 )
783 .route(
784 "/v1/agent/tasks/{task_id}/output/stream",
785 get(stream_agent_task_output),
786 )
787 .route("/v1/worker/connected", get(list_connected_workers))
789 .route("/v1/agent/workers", get(list_connected_workers))
790 .route("/v1/tasks/dispatch", post(dispatch_task))
792 .route("/v1/voice/sessions", post(create_voice_session_rest))
794 .route(
795 "/v1/voice/sessions/{room_name}",
796 get(get_voice_session_rest).delete(delete_voice_session_rest),
797 )
798 .route("/v1/voice/voices", get(list_voices_rest))
799 .route(
801 "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
802 post(resume_codebase_session),
803 )
804 .route("/v1/bus/stream", get(stream_bus_events))
806 .with_state(state.clone())
807 .merge(a2a_router)
810 .nest("/a2a", a2a_nested_router)
811 .layer(middleware::from_fn_with_state(
813 state.clone(),
814 audit_middleware,
815 ))
816 .layer(middleware::from_fn(policy_middleware))
817 .layer(middleware::from_fn(auth::require_auth))
818 .layer(axum::Extension(state.auth.clone()))
819 .layer(
822 CorsLayer::new()
823 .allow_origin([
824 "https://codetether.run".parse::<HeaderValue>().unwrap(),
825 "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
826 "https://docs.codetether.run"
827 .parse::<HeaderValue>()
828 .unwrap(),
829 "https://codetether.com".parse::<HeaderValue>().unwrap(),
830 "http://localhost:3000".parse::<HeaderValue>().unwrap(),
831 "http://localhost:3001".parse::<HeaderValue>().unwrap(),
832 "http://localhost:8000".parse::<HeaderValue>().unwrap(),
833 ])
834 .allow_credentials(true)
835 .allow_methods([
836 Method::GET,
837 Method::POST,
838 Method::PUT,
839 Method::PATCH,
840 Method::DELETE,
841 Method::OPTIONS,
842 ])
843 .allow_headers([
844 http::header::AUTHORIZATION,
845 http::header::CONTENT_TYPE,
846 http::header::ACCEPT,
847 http::header::ORIGIN,
848 http::header::CACHE_CONTROL,
849 http::header::HeaderName::from_static("x-worker-id"),
850 http::header::HeaderName::from_static("x-agent-name"),
851 http::header::HeaderName::from_static("x-workspaces"),
852 http::header::HeaderName::from_static("x-requested-with"),
853 ]),
854 )
855 .layer(TraceLayer::new_for_http());
856
857 tracing::info!(
858 elapsed_ms = t0.elapsed().as_millis(),
859 "[startup] router built, binding"
860 );
861 let listener = tokio::net::TcpListener::bind(&addr).await?;
862 tracing::info!(
863 elapsed_ms = t0.elapsed().as_millis(),
864 "[startup] listening on http://{}",
865 addr
866 );
867
868 axum::serve(listener, app).await?;
869
870 Ok(())
871}
872
873async fn health() -> &'static str {
875 "ok"
876}
877
878async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
880 Json(state.knative_tasks.list().await)
881}
882
883async fn get_knative_task(
885 State(state): State<AppState>,
886 Path(task_id): Path<String>,
887) -> Result<Json<KnativeTask>, (StatusCode, String)> {
888 state
889 .knative_tasks
890 .get(&task_id)
891 .await
892 .map(Json)
893 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
894}
895
896async fn claim_knative_task(
898 State(state): State<AppState>,
899 Path(task_id): Path<String>,
900) -> Result<Json<KnativeTask>, (StatusCode, String)> {
901 let updated = state
903 .knative_tasks
904 .update_status(&task_id, "processing")
905 .await;
906 if !updated {
907 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
908 }
909
910 state
911 .knative_tasks
912 .get(&task_id)
913 .await
914 .map(Json)
915 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
916}
917
918async fn complete_knative_task(
920 State(state): State<AppState>,
921 Path(task_id): Path<String>,
922) -> Result<Json<KnativeTask>, (StatusCode, String)> {
923 let updated = state
925 .knative_tasks
926 .update_status(&task_id, "completed")
927 .await;
928 if !updated {
929 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
930 }
931
932 state
933 .knative_tasks
934 .get(&task_id)
935 .await
936 .map(Json)
937 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
938}
939
940async fn receive_task_event(
943 State(state): State<AppState>,
944 headers: HeaderMap,
945 Json(body): Json<serde_json::Value>,
946) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
947 let event =
948 parse_cloud_event(&headers, body).map_err(|error| (StatusCode::BAD_REQUEST, error))?;
949 tracing::info!(
950 event_type = %event.event_type,
951 event_id = %event.id,
952 "Received CloudEvent from Knative"
953 );
954
955 state
957 .audit_log
958 .log(
959 audit::AuditCategory::Api,
960 format!("cloudevents:{}", event.event_type),
961 AuditOutcome::Success,
962 None,
963 None,
964 )
965 .await;
966
967 match event.event_type.as_str() {
969 "codetether.task.created" | "task.created" => {
970 let task_id = event
971 .data
972 .get("task_id")
973 .or_else(|| event.data.get("id"))
974 .and_then(|v| v.as_str())
975 .unwrap_or(event.id.as_str())
976 .to_string();
977 let title = event
978 .data
979 .get("title")
980 .or_else(|| event.data.get("prompt"))
981 .and_then(|v| v.as_str())
982 .unwrap_or("")
983 .to_string();
984 let description = event
985 .data
986 .get("description")
987 .or_else(|| event.data.get("prompt"))
988 .and_then(|v| v.as_str())
989 .unwrap_or("")
990 .to_string();
991 let agent_type = event
992 .data
993 .get("agent_type")
994 .or_else(|| event.data.get("agent"))
995 .and_then(|v| v.as_str())
996 .unwrap_or("build")
997 .to_string();
998 let priority = event
999 .data
1000 .get("priority")
1001 .and_then(|v| v.as_i64())
1002 .unwrap_or(0) as i32;
1003
1004 let task = KnativeTask {
1005 task_id: task_id.clone(),
1006 title,
1007 description,
1008 agent_type,
1009 priority,
1010 received_at: chrono::Utc::now(),
1011 status: "queued".to_string(),
1012 };
1013
1014 state.knative_tasks.push(task).await;
1015 tracing::info!(task_id = %task_id, "Task queued for execution");
1016 }
1017 "codetether.task.updated" | "task.updated" => {
1018 if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1019 let status = event
1020 .data
1021 .get("status")
1022 .and_then(|v| v.as_str())
1023 .unwrap_or("updated");
1024 let _ = state.knative_tasks.update_status(task_id, status).await;
1025 tracing::info!(task_id = %task_id, status = %status, "Task status updated");
1026 }
1027 }
1028 "codetether.task.cancelled" => {
1029 tracing::info!("Task cancellation event received");
1030 if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1032 let _ = state
1033 .knative_tasks
1034 .update_status(task_id, "cancelled")
1035 .await;
1036 tracing::info!(task_id = %task_id, "Task cancelled");
1037 }
1038 }
1039 _ => {
1040 tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
1041 }
1042 }
1043
1044 Ok(Json(CloudEventResponse {
1045 status: "accepted".to_string(),
1046 event_id: event.id,
1047 }))
1048}
1049
1050#[derive(Serialize)]
1052struct CloudEventResponse {
1053 status: String,
1054 event_id: String,
1055}
1056
1057#[derive(Serialize)]
1059struct VersionInfo {
1060 version: &'static str,
1061 name: &'static str,
1062 binary_hash: Option<String>,
1063}
1064
1065async fn get_version() -> Json<VersionInfo> {
1066 let binary_hash = std::env::current_exe()
1067 .ok()
1068 .and_then(|p| hash_file(&p).ok());
1069 Json(VersionInfo {
1070 version: env!("CARGO_PKG_VERSION"),
1071 name: env!("CARGO_PKG_NAME"),
1072 binary_hash,
1073 })
1074}
1075
1076#[derive(Deserialize)]
1078struct ListSessionsQuery {
1079 limit: Option<usize>,
1080 offset: Option<usize>,
1081}
1082
1083async fn list_sessions(
1084 Query(query): Query<ListSessionsQuery>,
1085) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1086 let sessions = crate::session::list_sessions()
1087 .await
1088 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1089
1090 let offset = query.offset.unwrap_or(0);
1091 let limit = query.limit.unwrap_or(100);
1092 Ok(Json(
1093 sessions.into_iter().skip(offset).take(limit).collect(),
1094 ))
1095}
1096
1097#[derive(Deserialize)]
1099struct CreateSessionRequest {
1100 title: Option<String>,
1101 agent: Option<String>,
1102}
1103
1104async fn create_session(
1105 Json(req): Json<CreateSessionRequest>,
1106) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1107 let mut session = crate::session::Session::new()
1108 .await
1109 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1110
1111 session.title = req.title;
1112 if let Some(agent) = req.agent {
1113 session.set_agent_name(agent);
1114 }
1115
1116 session
1117 .save()
1118 .await
1119 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1120
1121 Ok(Json(session))
1122}
1123
1124async fn get_session(
1126 axum::extract::Path(id): axum::extract::Path<String>,
1127) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1128 let session = crate::session::Session::load(&id)
1129 .await
1130 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1131
1132 Ok(Json(session))
1133}
1134
1135#[derive(Deserialize)]
1137struct PromptRequest {
1138 message: String,
1139}
1140
1141async fn prompt_session(
1142 axum::extract::Path(id): axum::extract::Path<String>,
1143 Json(req): Json<PromptRequest>,
1144) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1145 if req.message.trim().is_empty() {
1147 return Err((
1148 StatusCode::BAD_REQUEST,
1149 "Message cannot be empty".to_string(),
1150 ));
1151 }
1152
1153 tracing::info!(
1155 session_id = %id,
1156 message_len = req.message.len(),
1157 "Received prompt request"
1158 );
1159
1160 let mut session = crate::session::Session::load(&id)
1161 .await
1162 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1163
1164 let result = session.prompt(&req.message).await.map_err(|e| {
1165 tracing::error!(session_id = %id, error = %e, "Session prompt failed");
1166 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1167 })?;
1168
1169 session.save().await.map_err(|e| {
1170 tracing::error!(session_id = %id, error = %e, "Failed to save session after prompt");
1171 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1172 })?;
1173
1174 Ok(Json(result))
1175}
1176
1177async fn get_config(State(state): State<AppState>) -> Json<Config> {
1179 Json((*state.config).clone())
1180}
1181
1182async fn list_providers() -> Json<Vec<String>> {
1184 Json(vec![
1185 "openai".to_string(),
1186 "anthropic".to_string(),
1187 "google".to_string(),
1188 ])
1189}
1190
1191async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1193 let registry = crate::agent::AgentRegistry::with_builtins();
1194 Json(registry.list().into_iter().cloned().collect())
1195}
1196
1197type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1198type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1199
1200#[derive(Debug, Deserialize)]
1201struct OpenAiChatCompletionRequest {
1202 model: String,
1203 messages: Vec<OpenAiRequestMessage>,
1204 #[serde(default)]
1205 tools: Vec<OpenAiRequestTool>,
1206 temperature: Option<f32>,
1207 top_p: Option<f32>,
1208 max_tokens: Option<usize>,
1209 max_completion_tokens: Option<usize>,
1210 stop: Option<OpenAiStop>,
1211 stream: Option<bool>,
1212}
1213
1214#[derive(Debug, Deserialize)]
1215#[serde(untagged)]
1216enum OpenAiStop {
1217 Single(String),
1218 Multiple(Vec<String>),
1219}
1220
1221#[derive(Debug, Deserialize)]
1222struct OpenAiRequestMessage {
1223 role: String,
1224 #[serde(default)]
1225 content: Option<serde_json::Value>,
1226 #[serde(default)]
1227 tool_call_id: Option<String>,
1228 #[serde(default)]
1229 tool_calls: Vec<OpenAiRequestToolCall>,
1230}
1231
1232#[derive(Debug, Deserialize)]
1233struct OpenAiRequestToolCall {
1234 id: String,
1235 #[serde(rename = "type", default = "default_function_type")]
1236 kind: String,
1237 function: OpenAiRequestToolCallFunction,
1238}
1239
1240#[derive(Debug, Deserialize)]
1241struct OpenAiRequestToolCallFunction {
1242 name: String,
1243 #[serde(default = "default_json_object_string")]
1244 arguments: String,
1245}
1246
1247#[derive(Debug, Deserialize)]
1248struct OpenAiRequestTool {
1249 #[serde(rename = "type", default = "default_function_type")]
1250 kind: String,
1251 function: OpenAiRequestToolDefinition,
1252}
1253
1254#[derive(Debug, Deserialize)]
1255struct OpenAiRequestToolDefinition {
1256 name: String,
1257 #[serde(default)]
1258 description: String,
1259 #[serde(default = "default_json_object")]
1260 parameters: serde_json::Value,
1261}
1262
1263#[derive(Debug, Serialize)]
1264struct OpenAiModelsResponse {
1265 object: String,
1266 data: Vec<OpenAiModel>,
1267}
1268
1269#[derive(Debug, Serialize)]
1270struct OpenAiModel {
1271 id: String,
1272 object: String,
1273 created: i64,
1274 owned_by: String,
1275}
1276
1277#[derive(Debug, Serialize)]
1278struct OpenAiChatCompletionResponse {
1279 id: String,
1280 object: String,
1281 created: i64,
1282 model: String,
1283 choices: Vec<OpenAiChatCompletionChoice>,
1284 usage: OpenAiUsage,
1285}
1286
1287#[derive(Debug, Serialize)]
1288struct OpenAiChatCompletionChoice {
1289 index: usize,
1290 message: OpenAiChatCompletionMessage,
1291 finish_reason: String,
1292}
1293
1294#[derive(Debug, Serialize)]
1295struct OpenAiChatCompletionMessage {
1296 role: String,
1297 #[serde(skip_serializing_if = "Option::is_none")]
1298 content: Option<String>,
1299 #[serde(skip_serializing_if = "Option::is_none")]
1300 tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1301}
1302
1303#[derive(Debug, Serialize)]
1304struct OpenAiResponseToolCall {
1305 id: String,
1306 #[serde(rename = "type")]
1307 kind: String,
1308 function: OpenAiResponseToolCallFunction,
1309}
1310
1311#[derive(Debug, Serialize)]
1312struct OpenAiResponseToolCallFunction {
1313 name: String,
1314 arguments: String,
1315}
1316
1317#[derive(Debug, Serialize)]
1318struct OpenAiUsage {
1319 prompt_tokens: usize,
1320 completion_tokens: usize,
1321 total_tokens: usize,
1322}
1323
1324fn default_function_type() -> String {
1325 "function".to_string()
1326}
1327
1328fn default_json_object() -> serde_json::Value {
1329 serde_json::json!({})
1330}
1331
1332fn default_json_object_string() -> String {
1333 "{}".to_string()
1334}
1335
1336fn openai_error(
1337 status: StatusCode,
1338 message: impl Into<String>,
1339 error_type: &str,
1340 code: &str,
1341) -> OpenAiApiError {
1342 (
1343 status,
1344 Json(serde_json::json!({
1345 "error": {
1346 "message": message.into(),
1347 "type": error_type,
1348 "code": code,
1349 }
1350 })),
1351 )
1352}
1353
1354fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1355 openai_error(
1356 StatusCode::BAD_REQUEST,
1357 message,
1358 "invalid_request_error",
1359 "invalid_request",
1360 )
1361}
1362
1363fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1364 openai_error(
1365 StatusCode::INTERNAL_SERVER_ERROR,
1366 message,
1367 "server_error",
1368 "internal_error",
1369 )
1370}
1371
1372fn canonicalize_provider_name(provider: &str) -> &str {
1373 if provider == "zhipuai" {
1374 "zai"
1375 } else {
1376 provider
1377 }
1378}
1379
1380fn normalize_model_reference(model: &str) -> String {
1381 let trimmed = model.trim();
1382 if let Some((provider, model_id)) = trimmed.split_once(':')
1383 && !provider.is_empty()
1384 && !model_id.is_empty()
1385 && !trimmed.contains('/')
1386 {
1387 return format!("{provider}/{model_id}");
1388 }
1389 trimmed.to_string()
1390}
1391
1392fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1393 let provider = canonicalize_provider_name(provider);
1394 let trimmed = model_id.trim_start_matches('/');
1395 if trimmed.starts_with(&format!("{provider}/")) {
1396 trimmed.to_string()
1397 } else {
1398 format!("{provider}/{trimmed}")
1399 }
1400}
1401
1402fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1403 let obj = value.as_object()?;
1404 let part_type = obj
1405 .get("type")
1406 .and_then(serde_json::Value::as_str)
1407 .unwrap_or("text");
1408
1409 match part_type {
1410 "text" | "input_text" => obj
1411 .get("text")
1412 .and_then(serde_json::Value::as_str)
1413 .map(|text| crate::provider::ContentPart::Text {
1414 text: text.to_string(),
1415 }),
1416 "image_url" => obj
1417 .get("image_url")
1418 .and_then(serde_json::Value::as_object)
1419 .and_then(|image| image.get("url"))
1420 .and_then(serde_json::Value::as_str)
1421 .map(|url| crate::provider::ContentPart::Image {
1422 url: url.to_string(),
1423 mime_type: None,
1424 }),
1425 _ => obj
1426 .get("text")
1427 .and_then(serde_json::Value::as_str)
1428 .map(|text| crate::provider::ContentPart::Text {
1429 text: text.to_string(),
1430 }),
1431 }
1432}
1433
1434fn parse_openai_content_parts(
1435 content: &Option<serde_json::Value>,
1436) -> Vec<crate::provider::ContentPart> {
1437 let Some(value) = content else {
1438 return Vec::new();
1439 };
1440
1441 match value {
1442 serde_json::Value::String(text) => {
1443 vec![crate::provider::ContentPart::Text { text: text.clone() }]
1444 }
1445 serde_json::Value::Array(parts) => {
1446 parts.iter().filter_map(parse_openai_content_part).collect()
1447 }
1448 serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1449 _ => Vec::new(),
1450 }
1451}
1452
1453fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1454 parse_openai_content_parts(content)
1455 .into_iter()
1456 .filter_map(|part| match part {
1457 crate::provider::ContentPart::Text { text } => Some(text),
1458 _ => None,
1459 })
1460 .collect::<Vec<_>>()
1461 .join("\n")
1462}
1463
1464fn convert_openai_messages(
1465 messages: &[OpenAiRequestMessage],
1466) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1467 let mut converted = Vec::with_capacity(messages.len());
1468
1469 for (index, message) in messages.iter().enumerate() {
1470 let role = message.role.to_ascii_lowercase();
1471 match role.as_str() {
1472 "system" | "user" => {
1473 let content = parse_openai_content_parts(&message.content);
1474 if content.is_empty() {
1475 return Err(openai_bad_request(format!(
1476 "messages[{index}] must include text content"
1477 )));
1478 }
1479
1480 converted.push(crate::provider::Message {
1481 role: if role == "system" {
1482 crate::provider::Role::System
1483 } else {
1484 crate::provider::Role::User
1485 },
1486 content,
1487 });
1488 }
1489 "assistant" => {
1490 let mut content = parse_openai_content_parts(&message.content);
1491 for tool_call in &message.tool_calls {
1492 if tool_call.kind != "function" {
1493 return Err(openai_bad_request(format!(
1494 "messages[{index}].tool_calls only support `function`"
1495 )));
1496 }
1497 if tool_call.function.name.trim().is_empty() {
1498 return Err(openai_bad_request(format!(
1499 "messages[{index}].tool_calls[].function.name is required"
1500 )));
1501 }
1502
1503 content.push(crate::provider::ContentPart::ToolCall {
1504 id: tool_call.id.clone(),
1505 name: tool_call.function.name.clone(),
1506 arguments: tool_call.function.arguments.clone(),
1507 thought_signature: None,
1508 });
1509 }
1510
1511 if content.is_empty() {
1512 return Err(openai_bad_request(format!(
1513 "messages[{index}] must include `content` or `tool_calls` for assistant role"
1514 )));
1515 }
1516
1517 converted.push(crate::provider::Message {
1518 role: crate::provider::Role::Assistant,
1519 content,
1520 });
1521 }
1522 "tool" => {
1523 let tool_call_id = message
1524 .tool_call_id
1525 .as_ref()
1526 .map(|s| s.trim())
1527 .filter(|s| !s.is_empty())
1528 .ok_or_else(|| {
1529 openai_bad_request(format!(
1530 "messages[{index}].tool_call_id is required for tool role"
1531 ))
1532 })?
1533 .to_string();
1534
1535 converted.push(crate::provider::Message {
1536 role: crate::provider::Role::Tool,
1537 content: vec![crate::provider::ContentPart::ToolResult {
1538 tool_call_id,
1539 content: parse_openai_tool_content(&message.content),
1540 }],
1541 });
1542 }
1543 _ => {
1544 return Err(openai_bad_request(format!(
1545 "messages[{index}].role `{}` is not supported",
1546 message.role
1547 )));
1548 }
1549 }
1550 }
1551
1552 Ok(converted)
1553}
1554
1555fn convert_openai_tools(
1556 tools: &[OpenAiRequestTool],
1557) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1558 let mut converted = Vec::with_capacity(tools.len());
1559
1560 for (index, tool) in tools.iter().enumerate() {
1561 if tool.kind != "function" {
1562 return Err(openai_bad_request(format!(
1563 "tools[{index}].type `{}` is not supported",
1564 tool.kind
1565 )));
1566 }
1567 if tool.function.name.trim().is_empty() {
1568 return Err(openai_bad_request(format!(
1569 "tools[{index}].function.name is required"
1570 )));
1571 }
1572
1573 converted.push(crate::provider::ToolDefinition {
1574 name: tool.function.name.clone(),
1575 description: tool.function.description.clone(),
1576 parameters: tool.function.parameters.clone(),
1577 });
1578 }
1579
1580 Ok(converted)
1581}
1582
1583fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1584 match reason {
1585 crate::provider::FinishReason::Stop => "stop",
1586 crate::provider::FinishReason::Length => "length",
1587 crate::provider::FinishReason::ToolCalls => "tool_calls",
1588 crate::provider::FinishReason::ContentFilter => "content_filter",
1589 crate::provider::FinishReason::Error => "error",
1590 }
1591}
1592
1593fn convert_response_message(
1594 message: &crate::provider::Message,
1595) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1596 let mut texts = Vec::new();
1597 let mut tool_calls = Vec::new();
1598
1599 for part in &message.content {
1600 match part {
1601 crate::provider::ContentPart::Text { text } => {
1602 if !text.is_empty() {
1603 texts.push(text.clone());
1604 }
1605 }
1606 crate::provider::ContentPart::ToolCall {
1607 id,
1608 name,
1609 arguments,
1610 ..
1611 } => {
1612 tool_calls.push(OpenAiResponseToolCall {
1613 id: id.clone(),
1614 kind: "function".to_string(),
1615 function: OpenAiResponseToolCallFunction {
1616 name: name.clone(),
1617 arguments: arguments.clone(),
1618 },
1619 });
1620 }
1621 _ => {}
1622 }
1623 }
1624
1625 let content = if texts.is_empty() {
1626 None
1627 } else {
1628 Some(texts.join("\n"))
1629 };
1630 let tool_calls = if tool_calls.is_empty() {
1631 None
1632 } else {
1633 Some(tool_calls)
1634 };
1635
1636 (content, tool_calls)
1637}
1638
1639fn openai_stream_chunk(
1640 id: &str,
1641 created: i64,
1642 model: &str,
1643 delta: serde_json::Value,
1644 finish_reason: Option<&str>,
1645) -> serde_json::Value {
1646 let finish_reason = finish_reason
1647 .map(|value| serde_json::Value::String(value.to_string()))
1648 .unwrap_or(serde_json::Value::Null);
1649
1650 serde_json::json!({
1651 "id": id,
1652 "object": "chat.completion.chunk",
1653 "created": created,
1654 "model": model,
1655 "choices": [{
1656 "index": 0,
1657 "delta": delta,
1658 "finish_reason": finish_reason,
1659 }]
1660 })
1661}
1662
1663fn openai_stream_event(payload: &serde_json::Value) -> Event {
1664 Event::default().data(payload.to_string())
1665}
1666
1667async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1668 let registry = crate::provider::ProviderRegistry::from_vault()
1669 .await
1670 .map_err(|error| {
1671 tracing::error!(error = %error, "Failed to load providers from Vault");
1672 openai_internal_error(format!("failed to load providers: {error}"))
1673 })?;
1674
1675 let model_futures = registry.list().into_iter().map(|provider_id| {
1676 let provider_id = provider_id.to_string();
1677 let provider = registry.get(&provider_id);
1678
1679 async move {
1680 let Some(provider) = provider else {
1681 return Vec::new();
1682 };
1683
1684 let now = chrono::Utc::now().timestamp();
1685 match provider.list_models().await {
1686 Ok(models) => models
1687 .into_iter()
1688 .map(|model| OpenAiModel {
1689 id: make_openai_model_id(&provider_id, &model.id),
1690 object: "model".to_string(),
1691 created: now,
1692 owned_by: canonicalize_provider_name(&provider_id).to_string(),
1693 })
1694 .collect(),
1695 Err(error) => {
1696 tracing::warn!(
1697 provider = %provider_id,
1698 error = %error,
1699 "Failed to list models for provider"
1700 );
1701 Vec::new()
1702 }
1703 }
1704 }
1705 });
1706
1707 let mut data: Vec<OpenAiModel> = join_all(model_futures)
1708 .await
1709 .into_iter()
1710 .flatten()
1711 .collect();
1712 data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1713
1714 Ok(Json(OpenAiModelsResponse {
1715 object: "list".to_string(),
1716 data,
1717 }))
1718}
1719
1720async fn openai_chat_completions(
1721 Json(req): Json<OpenAiChatCompletionRequest>,
1722) -> Result<Response, OpenAiApiError> {
1723 let is_stream = req.stream.unwrap_or(false);
1724 if req.model.trim().is_empty() {
1725 return Err(openai_bad_request("`model` is required"));
1726 }
1727 if req.messages.is_empty() {
1728 return Err(openai_bad_request("`messages` must not be empty"));
1729 }
1730
1731 let registry = crate::provider::ProviderRegistry::from_vault()
1732 .await
1733 .map_err(|error| {
1734 tracing::error!(error = %error, "Failed to load providers from Vault");
1735 openai_internal_error(format!("failed to load providers: {error}"))
1736 })?;
1737
1738 let providers = registry.list();
1739 if providers.is_empty() {
1740 return Err(openai_error(
1741 StatusCode::SERVICE_UNAVAILABLE,
1742 "No providers are configured in Vault",
1743 "server_error",
1744 "no_providers",
1745 ));
1746 }
1747
1748 let normalized_model = normalize_model_reference(&req.model);
1749 let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1750 let model_id = if maybe_provider.is_some() {
1751 if model_id.trim().is_empty() {
1752 return Err(openai_bad_request(
1753 "Model must be in `provider/model` format",
1754 ));
1755 }
1756 model_id.to_string()
1757 } else {
1758 req.model.trim().to_string()
1759 };
1760
1761 let selected_provider = if let Some(provider_name) = maybe_provider {
1762 let provider_name = canonicalize_provider_name(provider_name);
1763 if providers.contains(&provider_name) {
1764 provider_name.to_string()
1765 } else {
1766 return Err(openai_bad_request(format!(
1767 "Provider `{provider_name}` is not configured in Vault"
1768 )));
1769 }
1770 } else if providers.len() == 1 {
1771 providers[0].to_string()
1772 } else if providers.contains(&"openai") {
1773 "openai".to_string()
1774 } else {
1775 return Err(openai_bad_request(
1776 "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1777 ));
1778 };
1779
1780 let provider = registry.get(&selected_provider).ok_or_else(|| {
1781 openai_internal_error(format!(
1782 "Provider `{selected_provider}` was available but could not be loaded"
1783 ))
1784 })?;
1785
1786 let messages = convert_openai_messages(&req.messages)?;
1787 let tools = convert_openai_tools(&req.tools)?;
1788 let stop = match req.stop {
1789 Some(OpenAiStop::Single(stop)) => vec![stop],
1790 Some(OpenAiStop::Multiple(stops)) => stops,
1791 None => Vec::new(),
1792 };
1793
1794 let completion_request = crate::provider::CompletionRequest {
1795 messages,
1796 tools,
1797 model: model_id.clone(),
1798 temperature: req.temperature,
1799 top_p: req.top_p,
1800 max_tokens: req.max_completion_tokens.or(req.max_tokens),
1801 stop,
1802 };
1803
1804 let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1805 let now = chrono::Utc::now().timestamp();
1806 let openai_model = make_openai_model_id(&selected_provider, &model_id);
1807
1808 if is_stream {
1809 let mut provider_stream =
1810 provider
1811 .complete_stream(completion_request)
1812 .await
1813 .map_err(|error| {
1814 tracing::warn!(
1815 provider = %selected_provider,
1816 model = %model_id,
1817 error = %error,
1818 "Streaming completion request failed"
1819 );
1820 openai_internal_error(error.to_string())
1821 })?;
1822
1823 let stream_id = chat_id.clone();
1824 let stream_model = openai_model.clone();
1825 let event_stream = async_stream::stream! {
1826 let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1827 let mut next_tool_call_index = 0usize;
1828 let mut saw_text = false;
1829 let mut saw_tool_calls = false;
1830
1831 let role_chunk = openai_stream_chunk(
1832 &stream_id,
1833 now,
1834 &stream_model,
1835 serde_json::json!({ "role": "assistant" }),
1836 None,
1837 );
1838 yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1839
1840 while let Some(chunk) = provider_stream.next().await {
1841 match chunk {
1842 crate::provider::StreamChunk::Text(text) => {
1843 if text.is_empty() {
1844 continue;
1845 }
1846 saw_text = true;
1847 let content_chunk = openai_stream_chunk(
1848 &stream_id,
1849 now,
1850 &stream_model,
1851 serde_json::json!({ "content": text }),
1852 None,
1853 );
1854 yield Ok(openai_stream_event(&content_chunk));
1855 }
1856 crate::provider::StreamChunk::ToolCallStart { id, name } => {
1857 saw_tool_calls = true;
1858 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1859 let value = next_tool_call_index;
1860 next_tool_call_index += 1;
1861 value
1862 });
1863 let tool_chunk = openai_stream_chunk(
1864 &stream_id,
1865 now,
1866 &stream_model,
1867 serde_json::json!({
1868 "tool_calls": [{
1869 "index": index,
1870 "id": id,
1871 "type": "function",
1872 "function": {
1873 "name": name,
1874 "arguments": "",
1875 }
1876 }]
1877 }),
1878 None,
1879 );
1880 yield Ok(openai_stream_event(&tool_chunk));
1881 }
1882 crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1883 if arguments_delta.is_empty() {
1884 continue;
1885 }
1886 saw_tool_calls = true;
1887 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1888 let value = next_tool_call_index;
1889 next_tool_call_index += 1;
1890 value
1891 });
1892 let tool_delta_chunk = openai_stream_chunk(
1893 &stream_id,
1894 now,
1895 &stream_model,
1896 serde_json::json!({
1897 "tool_calls": [{
1898 "index": index,
1899 "id": id,
1900 "type": "function",
1901 "function": {
1902 "arguments": arguments_delta,
1903 }
1904 }]
1905 }),
1906 None,
1907 );
1908 yield Ok(openai_stream_event(&tool_delta_chunk));
1909 }
1910 crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1911 crate::provider::StreamChunk::Done { .. } => {}
1912 crate::provider::StreamChunk::Error(error) => {
1913 let error_chunk = openai_stream_chunk(
1914 &stream_id,
1915 now,
1916 &stream_model,
1917 serde_json::json!({ "content": error }),
1918 Some("error"),
1919 );
1920 yield Ok(openai_stream_event(&error_chunk));
1921 yield Ok(Event::default().data("[DONE]"));
1922 return;
1923 }
1924 }
1925 }
1926
1927 let finish_reason = if saw_tool_calls && !saw_text {
1928 "tool_calls"
1929 } else {
1930 "stop"
1931 };
1932 let final_chunk = openai_stream_chunk(
1933 &stream_id,
1934 now,
1935 &stream_model,
1936 serde_json::json!({}),
1937 Some(finish_reason),
1938 );
1939 yield Ok(openai_stream_event(&final_chunk));
1940 yield Ok(Event::default().data("[DONE]"));
1941 };
1942
1943 return Ok(Sse::new(event_stream)
1944 .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1945 .into_response());
1946 }
1947
1948 let completion = provider
1949 .complete(completion_request)
1950 .await
1951 .map_err(|error| {
1952 tracing::warn!(
1953 provider = %selected_provider,
1954 model = %model_id,
1955 error = %error,
1956 "Completion request failed"
1957 );
1958 openai_internal_error(error.to_string())
1959 })?;
1960
1961 let (content, tool_calls) = convert_response_message(&completion.message);
1962
1963 Ok(Json(OpenAiChatCompletionResponse {
1964 id: chat_id,
1965 object: "chat.completion".to_string(),
1966 created: now,
1967 model: openai_model,
1968 choices: vec![OpenAiChatCompletionChoice {
1969 index: 0,
1970 message: OpenAiChatCompletionMessage {
1971 role: "assistant".to_string(),
1972 content,
1973 tool_calls,
1974 },
1975 finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
1976 }],
1977 usage: OpenAiUsage {
1978 prompt_tokens: completion.usage.prompt_tokens,
1979 completion_tokens: completion.usage.completion_tokens,
1980 total_tokens: completion.usage.total_tokens,
1981 },
1982 })
1983 .into_response())
1984}
1985
1986async fn start_cognition(
1987 State(state): State<AppState>,
1988 payload: Option<Json<StartCognitionRequest>>,
1989) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1990 state
1991 .cognition
1992 .start(payload.map(|Json(body)| body))
1993 .await
1994 .map(Json)
1995 .map_err(internal_error)
1996}
1997
1998async fn stop_cognition(
1999 State(state): State<AppState>,
2000 payload: Option<Json<StopCognitionRequest>>,
2001) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2002 let reason = payload.and_then(|Json(body)| body.reason);
2003 state
2004 .cognition
2005 .stop(reason)
2006 .await
2007 .map(Json)
2008 .map_err(internal_error)
2009}
2010
2011async fn get_cognition_status(
2012 State(state): State<AppState>,
2013) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2014 Ok(Json(state.cognition.status().await))
2015}
2016
2017async fn stream_cognition(
2018 State(state): State<AppState>,
2019) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2020 let rx = state.cognition.subscribe_events();
2021
2022 let event_stream = stream::unfold(rx, |mut rx| async move {
2023 match rx.recv().await {
2024 Ok(event) => {
2025 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
2026 let sse_event = Event::default().event("cognition").data(payload);
2027 Some((Ok(sse_event), rx))
2028 }
2029 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2030 let lag_event = Event::default()
2031 .event("lag")
2032 .data(format!("skipped {}", skipped));
2033 Some((Ok(lag_event), rx))
2034 }
2035 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2036 }
2037 });
2038
2039 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2040}
2041
2042async fn stream_bus_events(
2048 State(state): State<AppState>,
2049 req: Request<Body>,
2050) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2051 let allowed_topics: Vec<String> = req
2053 .extensions()
2054 .get::<crate::server::auth::JwtClaims>()
2055 .map(|claims| claims.topics.clone())
2056 .unwrap_or_default();
2057
2058 let bus_handle = state.bus.handle("stream_bus_events");
2060 let rx = bus_handle.into_receiver();
2061
2062 let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2063 let allowed_topics = allowed_topics.clone();
2064 async move {
2065 match rx.recv().await {
2066 Ok(envelope) => {
2067 let should_send = allowed_topics.is_empty()
2069 || allowed_topics
2070 .iter()
2071 .any(|pattern| topic_matches(&envelope.topic, pattern));
2072
2073 if should_send {
2074 let payload =
2075 serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2076 let sse_event = Event::default().event("bus").data(payload);
2077 return Some((Ok(sse_event), rx));
2078 }
2079
2080 Some((Ok(Event::default().event("keepalive").data("")), rx))
2082 }
2083 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2084 let lag_event = Event::default()
2085 .event("lag")
2086 .data(format!("skipped {}", skipped));
2087 Some((Ok(lag_event), rx))
2088 }
2089 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2090 }
2091 }
2092 });
2093
2094 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2095}
2096
2097fn topic_matches(topic: &str, pattern: &str) -> bool {
2100 if pattern == "*" {
2101 return true;
2102 }
2103 if let Some(prefix) = pattern.strip_suffix(".*") {
2104 return topic.starts_with(prefix);
2105 }
2106 if let Some(suffix) = pattern.strip_prefix(".*") {
2107 return topic.ends_with(suffix);
2108 }
2109 topic == pattern
2110}
2111
2112async fn get_latest_snapshot(
2113 State(state): State<AppState>,
2114) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2115 match state.cognition.latest_snapshot().await {
2116 Some(snapshot) => Ok(Json(snapshot)),
2117 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2118 }
2119}
2120
2121async fn create_persona(
2122 State(state): State<AppState>,
2123 Json(req): Json<CreatePersonaRequest>,
2124) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2125 state
2126 .cognition
2127 .create_persona(req)
2128 .await
2129 .map(Json)
2130 .map_err(internal_error)
2131}
2132
2133async fn spawn_persona(
2134 State(state): State<AppState>,
2135 Path(id): Path<String>,
2136 Json(req): Json<SpawnPersonaRequest>,
2137) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2138 state
2139 .cognition
2140 .spawn_child(&id, req)
2141 .await
2142 .map(Json)
2143 .map_err(internal_error)
2144}
2145
2146async fn reap_persona(
2147 State(state): State<AppState>,
2148 Path(id): Path<String>,
2149 payload: Option<Json<ReapPersonaRequest>>,
2150) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2151 let req = payload
2152 .map(|Json(body)| body)
2153 .unwrap_or(ReapPersonaRequest {
2154 cascade: Some(false),
2155 reason: None,
2156 });
2157
2158 state
2159 .cognition
2160 .reap_persona(&id, req)
2161 .await
2162 .map(Json)
2163 .map_err(internal_error)
2164}
2165
2166async fn get_swarm_lineage(
2167 State(state): State<AppState>,
2168) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2169 Ok(Json(state.cognition.lineage_graph().await))
2170}
2171
2172#[derive(Deserialize)]
2175struct BeliefFilter {
2176 status: Option<String>,
2177 persona: Option<String>,
2178}
2179
2180async fn list_beliefs(
2181 State(state): State<AppState>,
2182 Query(filter): Query<BeliefFilter>,
2183) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2184 let beliefs = state.cognition.get_beliefs().await;
2185 let mut result: Vec<Belief> = beliefs.into_values().collect();
2186
2187 if let Some(status) = &filter.status {
2188 result.retain(|b| {
2189 let s = serde_json::to_string(&b.status).unwrap_or_default();
2190 s.contains(status)
2191 });
2192 }
2193 if let Some(persona) = &filter.persona {
2194 result.retain(|b| &b.asserted_by == persona);
2195 }
2196
2197 result.sort_by(|a, b| {
2198 b.confidence
2199 .partial_cmp(&a.confidence)
2200 .unwrap_or(std::cmp::Ordering::Equal)
2201 });
2202 Ok(Json(result))
2203}
2204
2205async fn get_belief(
2206 State(state): State<AppState>,
2207 Path(id): Path<String>,
2208) -> Result<Json<Belief>, (StatusCode, String)> {
2209 match state.cognition.get_belief(&id).await {
2210 Some(belief) => Ok(Json(belief)),
2211 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2212 }
2213}
2214
2215async fn list_attention(
2216 State(state): State<AppState>,
2217) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2218 Ok(Json(state.cognition.get_attention_queue().await))
2219}
2220
2221async fn list_proposals(
2222 State(state): State<AppState>,
2223) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2224 let proposals = state.cognition.get_proposals().await;
2225 let mut result: Vec<Proposal> = proposals.into_values().collect();
2226 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2227 Ok(Json(result))
2228}
2229
2230async fn approve_proposal(
2231 State(state): State<AppState>,
2232 Path(id): Path<String>,
2233) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2234 state
2235 .cognition
2236 .approve_proposal(&id)
2237 .await
2238 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2239 .map_err(internal_error)
2240}
2241
2242async fn list_receipts(
2243 State(state): State<AppState>,
2244) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2245 Ok(Json(state.cognition.get_receipts().await))
2246}
2247
2248async fn get_workspace(
2249 State(state): State<AppState>,
2250) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2251 Ok(Json(state.cognition.get_workspace().await))
2252}
2253
2254async fn get_governance(
2255 State(state): State<AppState>,
2256) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2257 Ok(Json(state.cognition.get_governance().await))
2258}
2259
2260async fn get_persona(
2261 State(state): State<AppState>,
2262 axum::extract::Path(id): axum::extract::Path<String>,
2263) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2264 state
2265 .cognition
2266 .get_persona(&id)
2267 .await
2268 .map(Json)
2269 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2270}
2271
2272#[derive(Deserialize)]
2275struct AuditQuery {
2276 limit: Option<usize>,
2277 category: Option<String>,
2278}
2279
2280async fn list_audit_entries(
2281 State(state): State<AppState>,
2282 Query(query): Query<AuditQuery>,
2283) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2284 let limit = query.limit.unwrap_or(100).min(1000);
2285
2286 let entries = if let Some(ref cat) = query.category {
2287 let category = match cat.as_str() {
2288 "api" => AuditCategory::Api,
2289 "tool" | "tool_execution" => AuditCategory::ToolExecution,
2290 "session" => AuditCategory::Session,
2291 "cognition" => AuditCategory::Cognition,
2292 "swarm" => AuditCategory::Swarm,
2293 "auth" => AuditCategory::Auth,
2294 "k8s" => AuditCategory::K8s,
2295 "sandbox" => AuditCategory::Sandbox,
2296 "config" => AuditCategory::Config,
2297 _ => {
2298 return Err((
2299 StatusCode::BAD_REQUEST,
2300 format!("Unknown category: {}", cat),
2301 ));
2302 }
2303 };
2304 state.audit_log.by_category(category, limit).await
2305 } else {
2306 state.audit_log.recent(limit).await
2307 };
2308
2309 Ok(Json(entries))
2310}
2311
2312#[derive(Deserialize)]
2317struct ReplayQuery {
2318 session_id: String,
2320 start_offset: Option<u64>,
2322 end_offset: Option<u64>,
2324 limit: Option<usize>,
2326 tool_name: Option<String>,
2328}
2329
2330async fn replay_session_events(
2334 Query(query): Query<ReplayQuery>,
2335) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2336 use std::path::PathBuf;
2337
2338 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2339 .map(PathBuf::from)
2340 .ok()
2341 .ok_or_else(|| {
2342 (
2343 StatusCode::SERVICE_UNAVAILABLE,
2344 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2345 )
2346 })?;
2347
2348 let session_dir = base_dir.join(&query.session_id);
2349
2350 if !session_dir.exists() {
2352 return Err((
2353 StatusCode::NOT_FOUND,
2354 format!("Session not found: {}", query.session_id),
2355 ));
2356 }
2357
2358 let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2359
2360 let entries = std::fs::read_dir(&session_dir)
2362 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2363
2364 for entry in entries {
2365 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2366 let path = entry.path();
2367
2368 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2369 continue;
2370 }
2371
2372 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2374
2375 if let Some(offsets) = filename
2376 .strip_prefix("T")
2377 .or_else(|| filename.strip_prefix("202"))
2378 {
2379 let parts: Vec<&str> = offsets.split('-').collect();
2381 if parts.len() >= 4 {
2382 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2383 let end: u64 = parts[parts.len() - 1]
2384 .trim_end_matches(".jsonl")
2385 .parse()
2386 .unwrap_or(0);
2387
2388 if let Some(query_start) = query.start_offset
2390 && end <= query_start
2391 {
2392 continue;
2393 }
2394 if let Some(query_end) = query.end_offset
2395 && start >= query_end
2396 {
2397 continue;
2398 }
2399
2400 let content = std::fs::read_to_string(&path)
2402 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2403
2404 for line in content.lines() {
2405 if line.trim().is_empty() {
2406 continue;
2407 }
2408 if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2409 if let Some(ref tool_filter) = query.tool_name {
2411 if let Some(event_tool) =
2412 event.get("tool_name").and_then(|v| v.as_str())
2413 {
2414 if event_tool != tool_filter {
2415 continue;
2416 }
2417 } else {
2418 continue;
2419 }
2420 }
2421 all_events.push((start, end, event));
2422 }
2423 }
2424 }
2425 }
2426 }
2427
2428 all_events.sort_by_key(|(s, _, _)| *s);
2430
2431 let limit = query.limit.unwrap_or(1000).min(10000);
2433 let events: Vec<_> = all_events
2434 .into_iter()
2435 .take(limit)
2436 .map(|(_, _, e)| e)
2437 .collect();
2438
2439 Ok(Json(events))
2440}
2441
2442async fn list_session_event_files(
2444 Query(query): Query<ReplayQuery>,
2445) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2446 use std::path::PathBuf;
2447
2448 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2449 .map(PathBuf::from)
2450 .ok()
2451 .ok_or_else(|| {
2452 (
2453 StatusCode::SERVICE_UNAVAILABLE,
2454 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2455 )
2456 })?;
2457
2458 let session_dir = base_dir.join(&query.session_id);
2459 if !session_dir.exists() {
2460 return Err((
2461 StatusCode::NOT_FOUND,
2462 format!("Session not found: {}", query.session_id),
2463 ));
2464 }
2465
2466 let mut files: Vec<EventFileMeta> = Vec::new();
2467
2468 let entries = std::fs::read_dir(&session_dir)
2470 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2471
2472 for entry in entries {
2473 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2474 let path = entry.path();
2475
2476 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2477 continue;
2478 }
2479
2480 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2481
2482 if let Some(offsets) = filename
2484 .strip_prefix("T")
2485 .or_else(|| filename.strip_prefix("202"))
2486 {
2487 let parts: Vec<&str> = offsets.split('-').collect();
2488 if parts.len() >= 4 {
2489 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2490 let end: u64 = parts[parts.len() - 1]
2491 .trim_end_matches(".jsonl")
2492 .parse()
2493 .unwrap_or(0);
2494
2495 let metadata = std::fs::metadata(&path)
2496 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2497
2498 files.push(EventFileMeta {
2499 filename: filename.to_string(),
2500 start_offset: start,
2501 end_offset: end,
2502 size_bytes: metadata.len(),
2503 });
2504 }
2505 }
2506 }
2507
2508 files.sort_by_key(|f| f.start_offset);
2509 Ok(Json(files))
2510}
2511
2512#[derive(Serialize)]
2513struct EventFileMeta {
2514 filename: String,
2515 start_offset: u64,
2516 end_offset: u64,
2517 size_bytes: u64,
2518}
2519
2520async fn get_k8s_status(
2523 State(state): State<AppState>,
2524) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2525 Ok(Json(state.k8s.status().await))
2526}
2527
2528#[derive(Deserialize)]
2529struct ScaleRequest {
2530 replicas: i32,
2531}
2532
2533async fn k8s_scale(
2534 State(state): State<AppState>,
2535 Json(req): Json<ScaleRequest>,
2536) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2537 if req.replicas < 0 || req.replicas > 100 {
2538 return Err((
2539 StatusCode::BAD_REQUEST,
2540 "Replicas must be between 0 and 100".to_string(),
2541 ));
2542 }
2543
2544 state
2545 .audit_log
2546 .log(
2547 AuditCategory::K8s,
2548 format!("scale:{}", req.replicas),
2549 AuditOutcome::Success,
2550 None,
2551 None,
2552 )
2553 .await;
2554
2555 state
2556 .k8s
2557 .scale(req.replicas)
2558 .await
2559 .map(Json)
2560 .map_err(internal_error)
2561}
2562
2563async fn k8s_restart(
2564 State(state): State<AppState>,
2565) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2566 state
2567 .audit_log
2568 .log(
2569 AuditCategory::K8s,
2570 "rolling_restart",
2571 AuditOutcome::Success,
2572 None,
2573 None,
2574 )
2575 .await;
2576
2577 state
2578 .k8s
2579 .rolling_restart()
2580 .await
2581 .map(Json)
2582 .map_err(internal_error)
2583}
2584
2585async fn k8s_list_pods(
2586 State(state): State<AppState>,
2587) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2588 state
2589 .k8s
2590 .list_pods()
2591 .await
2592 .map(Json)
2593 .map_err(internal_error)
2594}
2595
2596async fn k8s_actions(
2597 State(state): State<AppState>,
2598) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2599 Ok(Json(state.k8s.recent_actions(100).await))
2600}
2601
2602#[derive(Deserialize)]
2603struct SpawnSubagentRequest {
2604 subagent_id: String,
2605 #[serde(default)]
2606 image: Option<String>,
2607 #[serde(default)]
2608 env_vars: std::collections::HashMap<String, String>,
2609 #[serde(default)]
2610 labels: std::collections::HashMap<String, String>,
2611 #[serde(default)]
2612 command: Option<Vec<String>>,
2613 #[serde(default)]
2614 args: Option<Vec<String>>,
2615}
2616
2617async fn k8s_spawn_subagent(
2618 State(state): State<AppState>,
2619 Json(req): Json<SpawnSubagentRequest>,
2620) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2621 state
2622 .k8s
2623 .spawn_subagent_pod_with_spec(
2624 &req.subagent_id,
2625 crate::k8s::SubagentPodSpec {
2626 image: req.image,
2627 env_vars: req.env_vars,
2628 labels: req.labels,
2629 command: req.command,
2630 args: req.args,
2631 },
2632 )
2633 .await
2634 .map(Json)
2635 .map_err(internal_error)
2636}
2637
2638async fn k8s_delete_subagent(
2639 State(state): State<AppState>,
2640 axum::extract::Path(id): axum::extract::Path<String>,
2641) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2642 state
2643 .k8s
2644 .delete_subagent_pod(&id)
2645 .await
2646 .map(Json)
2647 .map_err(internal_error)
2648}
2649
2650#[derive(Debug, Deserialize)]
2652pub struct RegisterToolRequest {
2653 pub id: String,
2654 pub name: String,
2655 pub description: String,
2656 pub version: String,
2657 pub endpoint: String,
2658 pub capabilities: Vec<String>,
2659 pub parameters: serde_json::Value,
2660}
2661
2662#[derive(Serialize)]
2664struct RegisterToolResponse {
2665 tool: RegisteredTool,
2666 message: String,
2667}
2668
2669async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredTool>> {
2671 Json(state.tool_registry.list().await)
2672}
2673
2674async fn register_tool(
2676 State(state): State<AppState>,
2677 Json(req): Json<RegisterToolRequest>,
2678) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2679 let now = chrono::Utc::now();
2680 let tool = RegisteredTool {
2681 id: req.id.clone(),
2682 name: req.name,
2683 description: req.description,
2684 version: req.version,
2685 endpoint: req.endpoint,
2686 capabilities: req.capabilities,
2687 parameters: req.parameters,
2688 registered_at: now,
2689 last_heartbeat: now,
2690 expires_at: now + Duration::from_secs(90),
2691 };
2692
2693 state.tool_registry.register(tool.clone()).await;
2694
2695 tracing::info!(tool_id = %tool.id, "Tool registered");
2696
2697 Ok(Json(RegisterToolResponse {
2698 tool,
2699 message: "Tool registered successfully. Heartbeat required every 30s.".to_string(),
2700 }))
2701}
2702
2703async fn tool_heartbeat(
2705 State(state): State<AppState>,
2706 Path(id): Path<String>,
2707) -> Result<Json<RegisteredTool>, (StatusCode, String)> {
2708 state
2709 .tool_registry
2710 .heartbeat(&id)
2711 .await
2712 .map(|tool| {
2713 tracing::info!(tool_id = %id, "Tool heartbeat received");
2714 Json(tool)
2715 })
2716 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2717}
2718
2719async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
2721 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
2722 let signing_key = SigningKey::from_env();
2723 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
2724 Json(PluginListResponse {
2725 server_fingerprint,
2726 signing_available: !test_sig.is_empty(),
2727 plugins: Vec::<PluginManifest>::new(),
2728 })
2729}
2730
2731#[derive(Serialize)]
2732struct PluginListResponse {
2733 server_fingerprint: String,
2734 signing_available: bool,
2735 plugins: Vec<PluginManifest>,
2736}
2737
2738async fn list_agent_tasks(
2744 State(state): State<AppState>,
2745 Query(params): Query<ListAgentTasksQuery>,
2746) -> Json<serde_json::Value> {
2747 let tasks = state.knative_tasks.list().await;
2748 let filtered: Vec<_> = tasks
2749 .into_iter()
2750 .filter(|t| params.status.as_ref().is_none_or(|s| t.status == *s))
2751 .filter(|t| {
2752 params
2753 .agent_type
2754 .as_ref()
2755 .is_none_or(|a| t.agent_type == *a)
2756 })
2757 .collect();
2758 Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2759}
2760
2761#[derive(Deserialize)]
2762struct ListAgentTasksQuery {
2763 status: Option<String>,
2764 agent_type: Option<String>,
2765}
2766
2767async fn create_agent_task(
2769 State(state): State<AppState>,
2770 Json(req): Json<CreateAgentTaskRequest>,
2771) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2772 let task_id = uuid::Uuid::new_v4().to_string();
2773 let task = KnativeTask {
2774 task_id: task_id.clone(),
2775 title: req.title,
2776 description: req.description,
2777 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2778 priority: req.priority.unwrap_or(0),
2779 received_at: chrono::Utc::now(),
2780 status: "pending".to_string(),
2781 };
2782 state.knative_tasks.push(task).await;
2783 Ok(Json(serde_json::json!({
2784 "task_id": task_id,
2785 "status": "pending",
2786 })))
2787}
2788
2789#[derive(Deserialize)]
2790#[allow(dead_code)]
2791struct CreateAgentTaskRequest {
2792 title: String,
2793 description: String,
2794 agent_type: Option<String>,
2795 model: Option<String>,
2796 priority: Option<i32>,
2797}
2798
2799async fn get_agent_task(
2801 State(state): State<AppState>,
2802 Path(task_id): Path<String>,
2803) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2804 state
2805 .knative_tasks
2806 .get(&task_id)
2807 .await
2808 .map(Json)
2809 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2810}
2811
2812async fn get_agent_task_output(
2814 State(state): State<AppState>,
2815 Path(task_id): Path<String>,
2816) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2817 let task = state
2818 .knative_tasks
2819 .get(&task_id)
2820 .await
2821 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2822 Ok(Json(serde_json::json!({
2823 "task_id": task.task_id,
2824 "status": task.status,
2825 "title": task.title,
2826 "output": null,
2827 })))
2828}
2829
2830#[derive(Deserialize)]
2832struct TaskOutputPayload {
2833 #[allow(dead_code)]
2834 #[serde(default)]
2835 worker_id: Option<String>,
2836 #[serde(default)]
2837 output: Option<String>,
2838}
2839
2840async fn agent_task_output(
2841 State(state): State<AppState>,
2842 Path(task_id): Path<String>,
2843 Json(payload): Json<TaskOutputPayload>,
2844) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2845 let task_exists = state.knative_tasks.get(&task_id).await.is_some();
2847 if !task_exists {
2848 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
2849 }
2850
2851 let _ = state.knative_tasks.update_status(&task_id, "working").await;
2853
2854 if let Some(ref output) = payload.output {
2856 let bus_handle = state.bus.handle("task-output");
2857 let _ = bus_handle.send(
2858 format!("task.{}", task_id),
2859 crate::bus::BusMessage::TaskUpdate {
2860 task_id: task_id.clone(),
2861 state: crate::a2a::types::TaskState::Working,
2862 message: Some(output.clone()),
2863 },
2864 );
2865 }
2866
2867 Ok(Json(serde_json::json!({
2868 "task_id": task_id,
2869 "status": "received",
2870 })))
2871}
2872
2873async fn stream_agent_task_output(
2875 State(state): State<AppState>,
2876 Path(task_id): Path<String>,
2877) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2878 state
2880 .knative_tasks
2881 .get(&task_id)
2882 .await
2883 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2884
2885 let bus_handle = state.bus.handle("task-stream");
2886 let rx = bus_handle.into_receiver();
2887 let topic_prefix = format!("task.{task_id}");
2888
2889 let stream = async_stream::try_stream! {
2890 let mut rx = rx;
2891 loop {
2892 match rx.recv().await {
2893 Ok(envelope) => {
2894 if !envelope.topic.starts_with(&topic_prefix) {
2895 continue;
2896 }
2897 let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2898 yield Event::default().event("output").data(data);
2899 }
2900 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2901 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2902 }
2903 }
2904 };
2905
2906 Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2907}
2908
2909async fn list_connected_workers(
2913 State(state): State<AppState>,
2914) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2915 let pods = state.k8s.list_pods().await.unwrap_or_default();
2917 let workers: Vec<serde_json::Value> = pods
2918 .iter()
2919 .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2920 .map(|p| {
2921 serde_json::json!({
2922 "worker_id": p.name,
2923 "name": p.name,
2924 "status": p.phase,
2925 "is_sse_connected": p.phase == "Running",
2926 "last_seen": chrono::Utc::now().to_rfc3339(),
2927 })
2928 })
2929 .collect();
2930 Ok(Json(serde_json::json!({ "workers": workers })))
2931}
2932
2933async fn dispatch_task(
2937 State(state): State<AppState>,
2938 Json(req): Json<DispatchTaskRequest>,
2939) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2940 let task_id = uuid::Uuid::new_v4().to_string();
2941 let task = KnativeTask {
2942 task_id: task_id.clone(),
2943 title: req.title,
2944 description: req.description,
2945 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2946 priority: req.priority.unwrap_or(0),
2947 received_at: chrono::Utc::now(),
2948 status: "pending".to_string(),
2949 };
2950
2951 let handle = state.bus.handle("task-dispatch");
2953 handle.send(
2954 format!("task.{}", task_id),
2955 crate::bus::BusMessage::TaskUpdate {
2956 task_id: task_id.clone(),
2957 state: crate::a2a::types::TaskState::Submitted,
2958 message: Some(format!("Task dispatched: {}", task.title)),
2959 },
2960 );
2961
2962 state.knative_tasks.push(task).await;
2963
2964 Ok(Json(serde_json::json!({
2965 "task_id": task_id,
2966 "status": "pending",
2967 "dispatched_via_knative": true,
2968 })))
2969}
2970
2971#[derive(Deserialize)]
2972#[allow(dead_code)]
2973struct DispatchTaskRequest {
2974 title: String,
2975 description: String,
2976 agent_type: Option<String>,
2977 model: Option<String>,
2978 priority: Option<i32>,
2979 metadata: Option<serde_json::Value>,
2980}
2981
2982async fn create_voice_session_rest(
2988 State(state): State<AppState>,
2989 Json(req): Json<CreateVoiceSessionRequest>,
2990) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2991 let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
2992 let room_name = format!("voice-{}", uuid::Uuid::new_v4());
2993
2994 let handle = state.bus.handle("voice-rest");
2996 handle.send_voice_session_started(&room_name, &voice_id);
2997
2998 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2999
3000 Ok(Json(serde_json::json!({
3001 "room_name": room_name,
3002 "voice": voice_id,
3003 "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
3004 "access_token": "",
3005 "livekit_url": livekit_url,
3006 "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
3007 })))
3008}
3009
3010#[derive(Deserialize)]
3011#[allow(dead_code)]
3012struct CreateVoiceSessionRequest {
3013 voice: Option<String>,
3014 mode: Option<String>,
3015 codebase_id: Option<String>,
3016 user_id: Option<String>,
3017}
3018
3019async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
3021 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3022 Json(serde_json::json!({
3023 "room_name": room_name,
3024 "state": "active",
3025 "agent_state": "listening",
3026 "access_token": "",
3027 "livekit_url": livekit_url,
3028 }))
3029}
3030
3031async fn delete_voice_session_rest(
3033 State(state): State<AppState>,
3034 Path(room_name): Path<String>,
3035) -> Json<serde_json::Value> {
3036 let handle = state.bus.handle("voice-rest");
3037 handle.send_voice_session_ended(&room_name, "user_ended");
3038 Json(serde_json::json!({ "status": "deleted" }))
3039}
3040
3041async fn list_voices_rest() -> Json<serde_json::Value> {
3043 Json(serde_json::json!({
3044 "voices": [
3045 { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
3046 { "voice_id": "puck", "name": "Puck", "language": "english" },
3047 { "voice_id": "charon", "name": "Charon", "language": "english" },
3048 { "voice_id": "kore", "name": "Kore", "language": "english" },
3049 { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
3050 { "voice_id": "aoede", "name": "Aoede", "language": "english" },
3051 ]
3052 }))
3053}
3054
3055async fn resume_codebase_session(
3059 Path((_codebase_id, session_id)): Path<(String, String)>,
3060 Json(req): Json<ResumeSessionRequest>,
3061) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3062 let mut session = match crate::session::Session::load(&session_id).await {
3064 Ok(s) => s,
3065 Err(_) => {
3066 let mut s = crate::session::Session::new()
3068 .await
3069 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3070 if let Some(agent) = &req.agent {
3071 s.set_agent_name(agent.clone());
3072 }
3073 s.save()
3074 .await
3075 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3076 s
3077 }
3078 };
3079
3080 if let Some(prompt) = &req.prompt
3082 && !prompt.is_empty()
3083 {
3084 match session.prompt(prompt).await {
3085 Ok(result) => {
3086 session.save().await.ok();
3087 return Ok(Json(serde_json::json!({
3088 "session_id": session.id,
3089 "active_session_id": session.id,
3090 "status": "completed",
3091 "result": result.text,
3092 })));
3093 }
3094 Err(e) => {
3095 return Ok(Json(serde_json::json!({
3096 "session_id": session.id,
3097 "active_session_id": session.id,
3098 "status": "failed",
3099 "error": e.to_string(),
3100 })));
3101 }
3102 }
3103 }
3104
3105 Ok(Json(serde_json::json!({
3107 "session_id": session.id,
3108 "active_session_id": session.id,
3109 "status": "ready",
3110 })))
3111}
3112
3113#[derive(Deserialize)]
3114#[allow(dead_code)]
3115struct ResumeSessionRequest {
3116 prompt: Option<String>,
3117 agent: Option<String>,
3118 model: Option<String>,
3119}
3120
3121fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3122 let message = error.to_string();
3123 if message.contains("not found") {
3124 return (StatusCode::NOT_FOUND, message);
3125 }
3126 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3127 return (StatusCode::BAD_REQUEST, message);
3128 }
3129 (StatusCode::INTERNAL_SERVER_ERROR, message)
3130}
3131
3132fn env_bool(name: &str, default: bool) -> bool {
3133 std::env::var(name)
3134 .ok()
3135 .and_then(|v| match v.to_ascii_lowercase().as_str() {
3136 "1" | "true" | "yes" | "on" => Some(true),
3137 "0" | "false" | "no" | "off" => Some(false),
3138 _ => None,
3139 })
3140 .unwrap_or(default)
3141}
3142
3143#[cfg(test)]
3144mod tests {
3145 use super::{match_policy_rule, normalize_model_reference};
3146
3147 #[test]
3148 fn policy_prompt_session_requires_execute_permission() {
3149 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3150 assert_eq!(permission, Some("agent:execute"));
3151 }
3152
3153 #[test]
3154 fn policy_create_session_keeps_sessions_write_permission() {
3155 let permission = match_policy_rule("/api/session", "POST");
3156 assert_eq!(permission, Some("sessions:write"));
3157 }
3158
3159 #[test]
3160 fn policy_proposal_approval_requires_execute_permission() {
3161 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3162 assert_eq!(permission, Some("agent:execute"));
3163 }
3164
3165 #[test]
3166 fn policy_openai_models_requires_read_permission() {
3167 let permission = match_policy_rule("/v1/models", "GET");
3168 assert_eq!(permission, Some("agent:read"));
3169 }
3170
3171 #[test]
3172 fn policy_openai_chat_completions_requires_execute_permission() {
3173 let permission = match_policy_rule("/v1/chat/completions", "POST");
3174 assert_eq!(permission, Some("agent:execute"));
3175 }
3176
3177 #[test]
3178 fn normalize_model_reference_accepts_colon_format() {
3179 assert_eq!(
3180 normalize_model_reference("openai:gpt-4o"),
3181 "openai/gpt-4o".to_string()
3182 );
3183 }
3184}