1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::{AgentBus, BusEnvelope};
11use crate::cli::ServeArgs;
12use crate::cognition::{
13 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16 executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24 Router,
25 body::Body,
26 extract::Path,
27 extract::{Query, State},
28 http::{Request, StatusCode},
29 middleware::{self, Next},
30 response::sse::{Event, KeepAlive, Sse},
31 response::{IntoResponse, Json, Response},
32 routing::{get, post},
33};
34use futures::{StreamExt, future::join_all, stream};
35use http::HeaderValue;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::convert::Infallible;
39use std::sync::Arc;
40use std::time::Duration;
41use tokio::sync::{Mutex, broadcast};
42use tonic_web::GrpcWebLayer;
43use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
44use tower_http::trace::TraceLayer;
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct KnativeTask {
49 pub task_id: String,
50 pub title: String,
51 pub description: String,
52 pub agent_type: String,
53 pub priority: i32,
54 pub received_at: chrono::DateTime<chrono::Utc>,
55 pub status: String,
56}
57
58#[derive(Clone)]
60pub struct KnativeTaskQueue {
61 tasks: Arc<Mutex<Vec<KnativeTask>>>,
62}
63
64impl KnativeTaskQueue {
65 pub fn new() -> Self {
66 Self {
67 tasks: Arc::new(Mutex::new(Vec::new())),
68 }
69 }
70
71 pub async fn push(&self, task: KnativeTask) {
72 self.tasks.lock().await.push(task);
73 }
74
75 pub async fn pop(&self) -> Option<KnativeTask> {
76 self.tasks.lock().await.pop()
77 }
78
79 pub async fn list(&self) -> Vec<KnativeTask> {
80 self.tasks.lock().await.clone()
81 }
82
83 pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
84 self.tasks
85 .lock()
86 .await
87 .iter()
88 .find(|t| t.task_id == task_id)
89 .cloned()
90 }
91
92 pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
93 let mut tasks = self.tasks.lock().await;
94 if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
95 task.status = status.to_string();
96 true
97 } else {
98 false
99 }
100 }
101}
102
103impl Default for KnativeTaskQueue {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct RegisteredTool {
112 pub id: String,
113 pub name: String,
114 pub description: String,
115 pub version: String,
116 pub endpoint: String,
117 pub capabilities: Vec<String>,
118 pub parameters: serde_json::Value,
119 pub registered_at: chrono::DateTime<chrono::Utc>,
120 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
121 pub expires_at: chrono::DateTime<chrono::Utc>,
122}
123
124#[derive(Clone)]
126pub struct ToolRegistry {
127 tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
128}
129
130impl Default for ToolRegistry {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136impl ToolRegistry {
137 pub fn new() -> Self {
138 Self {
139 tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
140 }
141 }
142
143 pub async fn register(&self, tool: RegisteredTool) {
145 let mut tools = self.tools.write().await;
146 tools.insert(tool.id.clone(), tool);
147 }
148
149 pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
151 let tools = self.tools.read().await;
152 tools.get(id).cloned()
153 }
154
155 pub async fn list(&self) -> Vec<RegisteredTool> {
157 let tools = self.tools.read().await;
158 let now = chrono::Utc::now();
159 tools
160 .values()
161 .filter(|t| t.expires_at > now)
162 .cloned()
163 .collect()
164 }
165
166 pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
168 let mut tools = self.tools.write().await;
169 if let Some(tool) = tools.get_mut(id) {
170 let now = chrono::Utc::now();
171 tool.last_heartbeat = now;
172 tool.expires_at = now + Duration::from_secs(90);
173 return Some(tool.clone());
174 }
175 None
176 }
177
178 pub async fn cleanup(&self) {
180 let mut tools = self.tools.write().await;
181 let now = chrono::Utc::now();
182 tools.retain(|_, t| t.expires_at > now);
183 }
184}
185
186#[derive(Clone)]
188pub struct AppState {
189 pub config: Arc<Config>,
190 pub cognition: Arc<CognitionRuntime>,
191 pub audit_log: AuditLog,
192 pub k8s: Arc<K8sManager>,
193 pub auth: AuthState,
194 pub bus: Arc<AgentBus>,
195 pub knative_tasks: KnativeTaskQueue,
196 pub tool_registry: ToolRegistry,
197}
198
199async fn audit_middleware(
201 State(state): State<AppState>,
202 request: Request<Body>,
203 next: Next,
204) -> Response {
205 let method = request.method().clone();
206 let path = request.uri().path().to_string();
207 let started = std::time::Instant::now();
208
209 let response = next.run(request).await;
210
211 let duration_ms = started.elapsed().as_millis() as u64;
212 let status = response.status().as_u16();
213 let outcome = if status < 400 {
214 AuditOutcome::Success
215 } else if status == 401 || status == 403 {
216 AuditOutcome::Denied
217 } else {
218 AuditOutcome::Failure
219 };
220
221 state
222 .audit_log
223 .record(audit::AuditEntry {
224 id: uuid::Uuid::new_v4().to_string(),
225 timestamp: chrono::Utc::now(),
226 category: AuditCategory::Api,
227 action: format!("{} {}", method, path),
228 principal: None,
229 outcome,
230 detail: Some(serde_json::json!({ "status": status })),
231 duration_ms: Some(duration_ms),
232 okr_id: None,
233 okr_run_id: None,
234 relay_id: None,
235 session_id: None,
236 })
237 .await;
238
239 response
240}
241
242struct PolicyRule {
245 pattern: &'static str,
246 methods: Option<&'static [&'static str]>,
247 permission: &'static str,
248}
249
250const POLICY_RULES: &[PolicyRule] = &[
251 PolicyRule {
253 pattern: "/health",
254 methods: None,
255 permission: "",
256 },
257 PolicyRule {
258 pattern: "/task",
259 methods: None,
260 permission: "",
261 },
262 PolicyRule {
263 pattern: "/v1/knative/",
264 methods: None,
265 permission: "",
266 },
267 PolicyRule {
269 pattern: "/v1/models",
270 methods: Some(&["GET"]),
271 permission: "agent:read",
272 },
273 PolicyRule {
274 pattern: "/v1/chat/completions",
275 methods: Some(&["POST"]),
276 permission: "agent:execute",
277 },
278 PolicyRule {
280 pattern: "/v1/tools",
281 methods: Some(&["GET"]),
282 permission: "agent:read",
283 },
284 PolicyRule {
285 pattern: "/v1/tools/register",
286 methods: Some(&["POST"]),
287 permission: "agent:execute",
288 },
289 PolicyRule {
290 pattern: "/v1/tools/",
291 methods: Some(&["POST"]),
292 permission: "agent:execute",
293 },
294 PolicyRule {
295 pattern: "/a2a/",
296 methods: None,
297 permission: "",
298 },
299 PolicyRule {
301 pattern: "/v1/k8s/scale",
302 methods: Some(&["POST"]),
303 permission: "admin:access",
304 },
305 PolicyRule {
306 pattern: "/v1/k8s/restart",
307 methods: Some(&["POST"]),
308 permission: "admin:access",
309 },
310 PolicyRule {
311 pattern: "/v1/k8s/",
312 methods: Some(&["GET"]),
313 permission: "admin:access",
314 },
315 PolicyRule {
317 pattern: "/v1/k8s/subagent",
318 methods: Some(&["POST", "DELETE"]),
319 permission: "admin:access",
320 },
321 PolicyRule {
323 pattern: "/v1/plugins",
324 methods: Some(&["GET"]),
325 permission: "agent:read",
326 },
327 PolicyRule {
329 pattern: "/v1/audit",
330 methods: None,
331 permission: "admin:access",
332 },
333 PolicyRule {
335 pattern: "/v1/audit/replay",
336 methods: Some(&["GET"]),
337 permission: "admin:access",
338 },
339 PolicyRule {
341 pattern: "/v1/cognition/start",
342 methods: Some(&["POST"]),
343 permission: "agent:execute",
344 },
345 PolicyRule {
346 pattern: "/v1/cognition/stop",
347 methods: Some(&["POST"]),
348 permission: "agent:execute",
349 },
350 PolicyRule {
351 pattern: "/v1/cognition/",
352 methods: Some(&["GET"]),
353 permission: "agent:read",
354 },
355 PolicyRule {
357 pattern: "/v1/swarm/personas",
358 methods: Some(&["POST"]),
359 permission: "agent:execute",
360 },
361 PolicyRule {
362 pattern: "/v1/swarm/",
363 methods: Some(&["POST"]),
364 permission: "agent:execute",
365 },
366 PolicyRule {
367 pattern: "/v1/swarm/",
368 methods: Some(&["GET"]),
369 permission: "agent:read",
370 },
371 PolicyRule {
373 pattern: "/v1/bus/stream",
374 methods: Some(&["GET"]),
375 permission: "agent:read",
376 },
377 PolicyRule {
379 pattern: "/v1/bus/publish",
380 methods: Some(&["POST"]),
381 permission: "agent:execute",
382 },
383 PolicyRule {
386 pattern: "/api/session/",
387 methods: Some(&["POST"]),
388 permission: "agent:execute",
389 },
390 PolicyRule {
391 pattern: "/api/session",
392 methods: Some(&["POST"]),
393 permission: "sessions:write",
394 },
395 PolicyRule {
396 pattern: "/api/session",
397 methods: Some(&["GET"]),
398 permission: "sessions:read",
399 },
400 PolicyRule {
402 pattern: "/mcp/v1/tools",
403 methods: Some(&["GET"]),
404 permission: "agent:read",
405 },
406 PolicyRule {
408 pattern: "/v1/agent/tasks",
409 methods: Some(&["GET"]),
410 permission: "agent:read",
411 },
412 PolicyRule {
413 pattern: "/v1/agent/tasks",
414 methods: Some(&["POST"]),
415 permission: "agent:execute",
416 },
417 PolicyRule {
418 pattern: "/v1/agent/tasks/",
419 methods: Some(&["GET"]),
420 permission: "agent:read",
421 },
422 PolicyRule {
424 pattern: "/v1/worker/",
425 methods: Some(&["GET"]),
426 permission: "agent:read",
427 },
428 PolicyRule {
429 pattern: "/v1/agent/workers",
430 methods: Some(&["GET"]),
431 permission: "agent:read",
432 },
433 PolicyRule {
435 pattern: "/v1/tasks/dispatch",
436 methods: Some(&["POST"]),
437 permission: "agent:execute",
438 },
439 PolicyRule {
441 pattern: "/v1/voice/",
442 methods: Some(&["GET"]),
443 permission: "agent:read",
444 },
445 PolicyRule {
446 pattern: "/v1/voice/",
447 methods: Some(&["POST", "DELETE"]),
448 permission: "agent:execute",
449 },
450 PolicyRule {
452 pattern: "/v1/agent/codebases/",
453 methods: Some(&["POST"]),
454 permission: "agent:execute",
455 },
456 PolicyRule {
458 pattern: "/v1/cognition/proposals/",
459 methods: Some(&["POST"]),
460 permission: "agent:execute",
461 },
462 PolicyRule {
464 pattern: "/api/version",
465 methods: None,
466 permission: "agent:read",
467 },
468 PolicyRule {
469 pattern: "/api/config",
470 methods: None,
471 permission: "agent:read",
472 },
473 PolicyRule {
474 pattern: "/api/provider",
475 methods: None,
476 permission: "agent:read",
477 },
478 PolicyRule {
479 pattern: "/api/agent",
480 methods: None,
481 permission: "agent:read",
482 },
483];
484
485fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
488 for rule in POLICY_RULES {
489 let matches = if rule.pattern.ends_with('/') {
490 path.starts_with(rule.pattern)
491 } else {
492 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
493 };
494 if matches {
495 if let Some(allowed_methods) = rule.methods {
496 if !allowed_methods.contains(&method) {
497 continue;
498 }
499 }
500 return Some(rule.permission);
501 }
502 }
503 None
504}
505
506async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
513 let path = request.uri().path().to_string();
514 let method = request.method().as_str().to_string();
515
516 let permission = match match_policy_rule(&path, &method) {
517 None | Some("") => return Ok(next.run(request).await),
518 Some(perm) => perm,
519 };
520
521 let user = policy::PolicyUser {
525 user_id: "bearer-token-user".to_string(),
526 roles: vec!["admin".to_string()],
527 tenant_id: None,
528 scopes: vec![],
529 auth_source: "static_token".to_string(),
530 };
531
532 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
533 tracing::warn!(
534 path = %path,
535 method = %method,
536 permission = %permission,
537 "Policy middleware denied request"
538 );
539 return Err(status);
540 }
541
542 Ok(next.run(request).await)
543}
544
545pub async fn serve(args: ServeArgs) -> Result<()> {
547 let t0 = std::time::Instant::now();
548 tracing::info!("[startup] begin");
549 let config = Config::load().await?;
550 tracing::info!(
551 elapsed_ms = t0.elapsed().as_millis(),
552 "[startup] config loaded"
553 );
554 let mut cognition = CognitionRuntime::new_from_env();
555 tracing::info!(
556 elapsed_ms = t0.elapsed().as_millis(),
557 "[startup] cognition runtime created"
558 );
559
560 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
562 tracing::info!(
563 elapsed_ms = t0.elapsed().as_millis(),
564 "[startup] tools registered"
565 );
566 let cognition = Arc::new(cognition);
567
568 let audit_log = AuditLog::from_env();
570 let _ = audit::init_audit_log(audit_log.clone());
571
572 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
574 let k8s = Arc::new(K8sManager::new().await);
575 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
576 if k8s.is_available() {
577 tracing::info!("K8s self-deployment enabled");
578 }
579
580 let auth_state = AuthState::from_env();
582 tracing::info!(
583 token_len = auth_state.token().len(),
584 "Auth is mandatory. Token required for all API endpoints."
585 );
586 tracing::info!(
587 audit_entries = audit_log.count().await,
588 "Audit log initialized"
589 );
590
591 let bus = AgentBus::new().into_arc();
593
594 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
596
597 tracing::info!(
598 elapsed_ms = t0.elapsed().as_millis(),
599 "[startup] bus created"
600 );
601
602 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
603 tracing::info!(
604 elapsed_ms = t0.elapsed().as_millis(),
605 "[startup] auto-starting cognition"
606 );
607 if let Err(error) = cognition.start(None).await {
608 tracing::warn!(%error, "Failed to auto-start cognition loop");
609 } else {
610 tracing::info!("Perpetual cognition auto-started");
611 }
612 }
613
614 tracing::info!(
615 elapsed_ms = t0.elapsed().as_millis(),
616 "[startup] building routes"
617 );
618 let addr = format!("{}:{}", args.hostname, args.port);
619
620 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
622 let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
623
624 let a2a_router = a2a_server.router();
626
627 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
629 .ok()
630 .and_then(|p| p.parse::<u16>().ok())
631 .unwrap_or(50051);
632 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
633 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
634 let grpc_service = grpc_store.into_service();
635 let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
636 tokio::spawn(async move {
637 tracing::info!("gRPC A2A server listening on {}", grpc_addr);
638
639 let allowed_origins = [
641 HeaderValue::from_static("https://codetether.run"),
642 HeaderValue::from_static("https://api.codetether.run"),
643 HeaderValue::from_static("https://docs.codetether.run"),
644 HeaderValue::from_static("https://codetether.com"),
645 HeaderValue::from_static("http://localhost:3000"),
646 HeaderValue::from_static("http://localhost:3001"),
647 ];
648 let cors = CorsLayer::new()
649 .allow_origin(AllowOrigin::list(allowed_origins))
650 .allow_methods(AllowMethods::any())
651 .allow_headers(AllowHeaders::any())
652 .expose_headers(tower_http::cors::ExposeHeaders::list([
653 http::header::HeaderName::from_static("grpc-status"),
654 http::header::HeaderName::from_static("grpc-message"),
655 ]));
656
657 if let Err(e) = tonic::transport::Server::builder()
658 .accept_http1(true)
659 .layer(cors)
660 .layer(GrpcWebLayer::new())
661 .add_service(grpc_service)
662 .add_service(voice_service)
663 .serve(grpc_addr)
664 .await
665 {
666 tracing::error!("gRPC server error: {}", e);
667 }
668 });
669
670 let state = AppState {
671 config: Arc::new(config),
672 cognition,
673 audit_log,
674 k8s,
675 auth: auth_state.clone(),
676 bus,
677 knative_tasks: KnativeTaskQueue::new(),
678 tool_registry: ToolRegistry::new(),
679 };
680
681 let tool_registry = state.tool_registry.clone();
683 tokio::spawn(async move {
684 let mut interval = tokio::time::interval(Duration::from_secs(15));
685 loop {
686 interval.tick().await;
687 tool_registry.cleanup().await;
688 tracing::debug!("Tool reaper ran cleanup");
689 }
690 });
691
692 let app = Router::new()
693 .route("/health", get(health))
695 .route("/task", post(receive_task_event))
697 .route("/v1/knative/tasks", get(list_knative_tasks))
699 .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
700 .route(
701 "/v1/knative/tasks/{task_id}/claim",
702 post(claim_knative_task),
703 )
704 .route(
705 "/v1/knative/tasks/{task_id}/complete",
706 post(complete_knative_task),
707 )
708 .route("/api/version", get(get_version))
710 .route("/api/session", get(list_sessions).post(create_session))
711 .route("/api/session/{id}", get(get_session))
712 .route("/api/session/{id}/prompt", post(prompt_session))
713 .route("/api/config", get(get_config))
714 .route("/api/provider", get(list_providers))
715 .route("/api/agent", get(list_agents))
716 .route("/v1/models", get(list_openai_models))
718 .route("/v1/chat/completions", post(openai_chat_completions))
719 .route("/v1/cognition/start", post(start_cognition))
721 .route("/v1/cognition/stop", post(stop_cognition))
722 .route("/v1/cognition/status", get(get_cognition_status))
723 .route("/v1/cognition/stream", get(stream_cognition))
724 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
725 .route("/v1/swarm/personas", post(create_persona))
727 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
728 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
729 .route("/v1/swarm/lineage", get(get_swarm_lineage))
730 .route("/v1/cognition/beliefs", get(list_beliefs))
732 .route("/v1/cognition/beliefs/{id}", get(get_belief))
733 .route("/v1/cognition/attention", get(list_attention))
734 .route("/v1/cognition/proposals", get(list_proposals))
735 .route(
736 "/v1/cognition/proposals/{id}/approve",
737 post(approve_proposal),
738 )
739 .route("/v1/cognition/receipts", get(list_receipts))
740 .route("/v1/cognition/workspace", get(get_workspace))
741 .route("/v1/cognition/governance", get(get_governance))
742 .route("/v1/cognition/personas/{id}", get(get_persona))
743 .route("/v1/audit", get(list_audit_entries))
745 .route("/v1/audit/replay", get(replay_session_events))
747 .route("/v1/audit/replay/index", get(list_session_event_files))
748 .route("/v1/k8s/status", get(get_k8s_status))
750 .route("/v1/k8s/scale", post(k8s_scale))
751 .route("/v1/k8s/restart", post(k8s_restart))
752 .route("/v1/k8s/pods", get(k8s_list_pods))
753 .route("/v1/k8s/actions", get(k8s_actions))
754 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
755 .route(
756 "/v1/k8s/subagent/{id}",
757 axum::routing::delete(k8s_delete_subagent),
758 )
759 .route("/v1/plugins", get(list_plugins))
761 .route("/v1/tools", get(list_tools))
763 .route("/v1/tools/register", post(register_tool))
764 .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
765 .route("/mcp/v1/tools", get(list_tools))
767 .route(
769 "/v1/agent/tasks",
770 get(list_agent_tasks).post(create_agent_task),
771 )
772 .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
773 .route(
774 "/v1/agent/tasks/{task_id}/output",
775 get(get_agent_task_output),
776 )
777 .route(
778 "/v1/agent/tasks/{task_id}/output/stream",
779 get(stream_agent_task_output),
780 )
781 .route("/v1/worker/connected", get(list_connected_workers))
783 .route("/v1/agent/workers", get(list_connected_workers))
784 .route("/v1/tasks/dispatch", post(dispatch_task))
786 .route("/v1/voice/sessions", post(create_voice_session_rest))
788 .route(
789 "/v1/voice/sessions/{room_name}",
790 get(get_voice_session_rest).delete(delete_voice_session_rest),
791 )
792 .route("/v1/voice/voices", get(list_voices_rest))
793 .route(
795 "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
796 post(resume_codebase_session),
797 )
798 .route("/v1/bus/stream", get(stream_bus_events))
800 .with_state(state.clone())
801 .nest("/a2a", a2a_router)
803 .layer(middleware::from_fn_with_state(
805 state.clone(),
806 audit_middleware,
807 ))
808 .layer(middleware::from_fn(policy_middleware))
809 .layer(middleware::from_fn(auth::require_auth))
810 .layer(axum::Extension(state.auth.clone()))
811 .layer(
814 CorsLayer::new()
815 .allow_origin([
816 "https://codetether.run".parse::<HeaderValue>().unwrap(),
817 "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
818 "https://docs.codetether.run"
819 .parse::<HeaderValue>()
820 .unwrap(),
821 "https://codetether.com".parse::<HeaderValue>().unwrap(),
822 "http://localhost:3000".parse::<HeaderValue>().unwrap(),
823 "http://localhost:3001".parse::<HeaderValue>().unwrap(),
824 "http://localhost:8000".parse::<HeaderValue>().unwrap(),
825 ])
826 .allow_credentials(true)
827 .allow_methods(AllowMethods::any())
828 .allow_headers(AllowHeaders::any()),
829 )
830 .layer(TraceLayer::new_for_http());
831
832 tracing::info!(
833 elapsed_ms = t0.elapsed().as_millis(),
834 "[startup] router built, binding"
835 );
836 let listener = tokio::net::TcpListener::bind(&addr).await?;
837 tracing::info!(
838 elapsed_ms = t0.elapsed().as_millis(),
839 "[startup] listening on http://{}",
840 addr
841 );
842
843 axum::serve(listener, app).await?;
844
845 Ok(())
846}
847
848async fn health() -> &'static str {
850 "ok"
851}
852
853async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
855 Json(state.knative_tasks.list().await)
856}
857
858async fn get_knative_task(
860 State(state): State<AppState>,
861 Path(task_id): Path<String>,
862) -> Result<Json<KnativeTask>, (StatusCode, String)> {
863 state
864 .knative_tasks
865 .get(&task_id)
866 .await
867 .map(Json)
868 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
869}
870
871async fn claim_knative_task(
873 State(state): State<AppState>,
874 Path(task_id): Path<String>,
875) -> Result<Json<KnativeTask>, (StatusCode, String)> {
876 let updated = state
878 .knative_tasks
879 .update_status(&task_id, "processing")
880 .await;
881 if !updated {
882 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
883 }
884
885 state
886 .knative_tasks
887 .get(&task_id)
888 .await
889 .map(Json)
890 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
891}
892
893async fn complete_knative_task(
895 State(state): State<AppState>,
896 Path(task_id): Path<String>,
897) -> Result<Json<KnativeTask>, (StatusCode, String)> {
898 let updated = state
900 .knative_tasks
901 .update_status(&task_id, "completed")
902 .await;
903 if !updated {
904 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
905 }
906
907 state
908 .knative_tasks
909 .get(&task_id)
910 .await
911 .map(Json)
912 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
913}
914
915async fn receive_task_event(
918 State(state): State<AppState>,
919 Json(event): Json<CloudEvent>,
920) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
921 tracing::info!(
922 event_type = %event.event_type,
923 event_id = %event.id,
924 "Received CloudEvent from Knative"
925 );
926
927 state
929 .audit_log
930 .log(
931 audit::AuditCategory::Api,
932 format!("cloudevents:{}", event.event_type),
933 AuditOutcome::Success,
934 None,
935 None,
936 )
937 .await;
938
939 match event.event_type.as_str() {
941 "codetether.task.created" | "task.created" => {
942 if let Some(data) = event.data {
944 let task_id = data
945 .get("task_id")
946 .and_then(|v| v.as_str())
947 .unwrap_or("unknown")
948 .to_string();
949 let title = data
950 .get("title")
951 .and_then(|v| v.as_str())
952 .unwrap_or("")
953 .to_string();
954 let description = data
955 .get("description")
956 .and_then(|v| v.as_str())
957 .unwrap_or("")
958 .to_string();
959 let agent_type = data
960 .get("agent_type")
961 .and_then(|v| v.as_str())
962 .unwrap_or("build")
963 .to_string();
964 let priority = data.get("priority").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
965
966 let task = KnativeTask {
967 task_id: task_id.clone(),
968 title,
969 description,
970 agent_type,
971 priority,
972 received_at: chrono::Utc::now(),
973 status: "queued".to_string(),
974 };
975
976 state.knative_tasks.push(task).await;
977 tracing::info!(task_id = %task_id, "Task queued for execution");
978 }
979 }
980 "codetether.task.cancelled" => {
981 tracing::info!("Task cancellation event received");
982 if let Some(data) = event.data {
984 if let Some(task_id) = data.get("task_id").and_then(|v| v.as_str()) {
985 let _ = state
986 .knative_tasks
987 .update_status(task_id, "cancelled")
988 .await;
989 tracing::info!(task_id = %task_id, "Task cancelled");
990 }
991 }
992 }
993 _ => {
994 tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
995 }
996 }
997
998 Ok(Json(CloudEventResponse {
999 status: "accepted".to_string(),
1000 event_id: event.id,
1001 }))
1002}
1003
1004#[derive(Deserialize, Serialize)]
1006struct CloudEvent {
1007 id: String,
1009 source: String,
1011 #[serde(rename = "type")]
1013 event_type: String,
1014 #[serde(rename = "time")]
1016 timestamp: Option<String>,
1017 #[serde(rename = "specversion")]
1019 spec_version: Option<String>,
1020 data: Option<serde_json::Value>,
1022}
1023
1024#[derive(Serialize)]
1026struct CloudEventResponse {
1027 status: String,
1028 event_id: String,
1029}
1030
1031#[derive(Serialize)]
1033struct VersionInfo {
1034 version: &'static str,
1035 name: &'static str,
1036 binary_hash: Option<String>,
1037}
1038
1039async fn get_version() -> Json<VersionInfo> {
1040 let binary_hash = std::env::current_exe()
1041 .ok()
1042 .and_then(|p| hash_file(&p).ok());
1043 Json(VersionInfo {
1044 version: env!("CARGO_PKG_VERSION"),
1045 name: env!("CARGO_PKG_NAME"),
1046 binary_hash,
1047 })
1048}
1049
1050#[derive(Deserialize)]
1052struct ListSessionsQuery {
1053 limit: Option<usize>,
1054 offset: Option<usize>,
1055}
1056
1057async fn list_sessions(
1058 Query(query): Query<ListSessionsQuery>,
1059) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1060 let sessions = crate::session::list_sessions()
1061 .await
1062 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1063
1064 let offset = query.offset.unwrap_or(0);
1065 let limit = query.limit.unwrap_or(100);
1066 Ok(Json(
1067 sessions.into_iter().skip(offset).take(limit).collect(),
1068 ))
1069}
1070
1071#[derive(Deserialize)]
1073struct CreateSessionRequest {
1074 title: Option<String>,
1075 agent: Option<String>,
1076}
1077
1078async fn create_session(
1079 Json(req): Json<CreateSessionRequest>,
1080) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1081 let mut session = crate::session::Session::new()
1082 .await
1083 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1084
1085 session.title = req.title;
1086 if let Some(agent) = req.agent {
1087 session.agent = agent;
1088 }
1089
1090 session
1091 .save()
1092 .await
1093 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1094
1095 Ok(Json(session))
1096}
1097
1098async fn get_session(
1100 axum::extract::Path(id): axum::extract::Path<String>,
1101) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1102 let session = crate::session::Session::load(&id)
1103 .await
1104 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1105
1106 Ok(Json(session))
1107}
1108
1109#[derive(Deserialize)]
1111struct PromptRequest {
1112 message: String,
1113}
1114
1115async fn prompt_session(
1116 axum::extract::Path(id): axum::extract::Path<String>,
1117 Json(req): Json<PromptRequest>,
1118) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1119 if req.message.trim().is_empty() {
1121 return Err((
1122 StatusCode::BAD_REQUEST,
1123 "Message cannot be empty".to_string(),
1124 ));
1125 }
1126
1127 tracing::info!(
1129 session_id = %id,
1130 message_len = req.message.len(),
1131 "Received prompt request"
1132 );
1133
1134 Err((
1136 StatusCode::NOT_IMPLEMENTED,
1137 "Prompt execution not yet implemented".to_string(),
1138 ))
1139}
1140
1141async fn get_config(State(state): State<AppState>) -> Json<Config> {
1143 Json((*state.config).clone())
1144}
1145
1146async fn list_providers() -> Json<Vec<String>> {
1148 Json(vec![
1149 "openai".to_string(),
1150 "anthropic".to_string(),
1151 "google".to_string(),
1152 ])
1153}
1154
1155async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1157 let registry = crate::agent::AgentRegistry::with_builtins();
1158 Json(registry.list().into_iter().cloned().collect())
1159}
1160
1161type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1162type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1163
1164#[derive(Debug, Deserialize)]
1165struct OpenAiChatCompletionRequest {
1166 model: String,
1167 messages: Vec<OpenAiRequestMessage>,
1168 #[serde(default)]
1169 tools: Vec<OpenAiRequestTool>,
1170 temperature: Option<f32>,
1171 top_p: Option<f32>,
1172 max_tokens: Option<usize>,
1173 max_completion_tokens: Option<usize>,
1174 stop: Option<OpenAiStop>,
1175 stream: Option<bool>,
1176}
1177
1178#[derive(Debug, Deserialize)]
1179#[serde(untagged)]
1180enum OpenAiStop {
1181 Single(String),
1182 Multiple(Vec<String>),
1183}
1184
1185#[derive(Debug, Deserialize)]
1186struct OpenAiRequestMessage {
1187 role: String,
1188 #[serde(default)]
1189 content: Option<serde_json::Value>,
1190 #[serde(default)]
1191 tool_call_id: Option<String>,
1192 #[serde(default)]
1193 tool_calls: Vec<OpenAiRequestToolCall>,
1194}
1195
1196#[derive(Debug, Deserialize)]
1197struct OpenAiRequestToolCall {
1198 id: String,
1199 #[serde(rename = "type", default = "default_function_type")]
1200 kind: String,
1201 function: OpenAiRequestToolCallFunction,
1202}
1203
1204#[derive(Debug, Deserialize)]
1205struct OpenAiRequestToolCallFunction {
1206 name: String,
1207 #[serde(default = "default_json_object_string")]
1208 arguments: String,
1209}
1210
1211#[derive(Debug, Deserialize)]
1212struct OpenAiRequestTool {
1213 #[serde(rename = "type", default = "default_function_type")]
1214 kind: String,
1215 function: OpenAiRequestToolDefinition,
1216}
1217
1218#[derive(Debug, Deserialize)]
1219struct OpenAiRequestToolDefinition {
1220 name: String,
1221 #[serde(default)]
1222 description: String,
1223 #[serde(default = "default_json_object")]
1224 parameters: serde_json::Value,
1225}
1226
1227#[derive(Debug, Serialize)]
1228struct OpenAiModelsResponse {
1229 object: String,
1230 data: Vec<OpenAiModel>,
1231}
1232
1233#[derive(Debug, Serialize)]
1234struct OpenAiModel {
1235 id: String,
1236 object: String,
1237 created: i64,
1238 owned_by: String,
1239}
1240
1241#[derive(Debug, Serialize)]
1242struct OpenAiChatCompletionResponse {
1243 id: String,
1244 object: String,
1245 created: i64,
1246 model: String,
1247 choices: Vec<OpenAiChatCompletionChoice>,
1248 usage: OpenAiUsage,
1249}
1250
1251#[derive(Debug, Serialize)]
1252struct OpenAiChatCompletionChoice {
1253 index: usize,
1254 message: OpenAiChatCompletionMessage,
1255 finish_reason: String,
1256}
1257
1258#[derive(Debug, Serialize)]
1259struct OpenAiChatCompletionMessage {
1260 role: String,
1261 #[serde(skip_serializing_if = "Option::is_none")]
1262 content: Option<String>,
1263 #[serde(skip_serializing_if = "Option::is_none")]
1264 tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1265}
1266
1267#[derive(Debug, Serialize)]
1268struct OpenAiResponseToolCall {
1269 id: String,
1270 #[serde(rename = "type")]
1271 kind: String,
1272 function: OpenAiResponseToolCallFunction,
1273}
1274
1275#[derive(Debug, Serialize)]
1276struct OpenAiResponseToolCallFunction {
1277 name: String,
1278 arguments: String,
1279}
1280
1281#[derive(Debug, Serialize)]
1282struct OpenAiUsage {
1283 prompt_tokens: usize,
1284 completion_tokens: usize,
1285 total_tokens: usize,
1286}
1287
1288fn default_function_type() -> String {
1289 "function".to_string()
1290}
1291
1292fn default_json_object() -> serde_json::Value {
1293 serde_json::json!({})
1294}
1295
1296fn default_json_object_string() -> String {
1297 "{}".to_string()
1298}
1299
1300fn openai_error(
1301 status: StatusCode,
1302 message: impl Into<String>,
1303 error_type: &str,
1304 code: &str,
1305) -> OpenAiApiError {
1306 (
1307 status,
1308 Json(serde_json::json!({
1309 "error": {
1310 "message": message.into(),
1311 "type": error_type,
1312 "code": code,
1313 }
1314 })),
1315 )
1316}
1317
1318fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1319 openai_error(
1320 StatusCode::BAD_REQUEST,
1321 message,
1322 "invalid_request_error",
1323 "invalid_request",
1324 )
1325}
1326
1327fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1328 openai_error(
1329 StatusCode::INTERNAL_SERVER_ERROR,
1330 message,
1331 "server_error",
1332 "internal_error",
1333 )
1334}
1335
1336fn canonicalize_provider_name(provider: &str) -> &str {
1337 if provider == "zhipuai" {
1338 "zai"
1339 } else {
1340 provider
1341 }
1342}
1343
1344fn normalize_model_reference(model: &str) -> String {
1345 let trimmed = model.trim();
1346 if let Some((provider, model_id)) = trimmed.split_once(':')
1347 && !provider.is_empty()
1348 && !model_id.is_empty()
1349 && !trimmed.contains('/')
1350 {
1351 return format!("{provider}/{model_id}");
1352 }
1353 trimmed.to_string()
1354}
1355
1356fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1357 let provider = canonicalize_provider_name(provider);
1358 let trimmed = model_id.trim_start_matches('/');
1359 if trimmed.starts_with(&format!("{provider}/")) {
1360 trimmed.to_string()
1361 } else {
1362 format!("{provider}/{trimmed}")
1363 }
1364}
1365
1366fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1367 let obj = value.as_object()?;
1368 let part_type = obj
1369 .get("type")
1370 .and_then(serde_json::Value::as_str)
1371 .unwrap_or("text");
1372
1373 match part_type {
1374 "text" | "input_text" => obj
1375 .get("text")
1376 .and_then(serde_json::Value::as_str)
1377 .map(|text| crate::provider::ContentPart::Text {
1378 text: text.to_string(),
1379 }),
1380 "image_url" => obj
1381 .get("image_url")
1382 .and_then(serde_json::Value::as_object)
1383 .and_then(|image| image.get("url"))
1384 .and_then(serde_json::Value::as_str)
1385 .map(|url| crate::provider::ContentPart::Image {
1386 url: url.to_string(),
1387 mime_type: None,
1388 }),
1389 _ => obj
1390 .get("text")
1391 .and_then(serde_json::Value::as_str)
1392 .map(|text| crate::provider::ContentPart::Text {
1393 text: text.to_string(),
1394 }),
1395 }
1396}
1397
1398fn parse_openai_content_parts(
1399 content: &Option<serde_json::Value>,
1400) -> Vec<crate::provider::ContentPart> {
1401 let Some(value) = content else {
1402 return Vec::new();
1403 };
1404
1405 match value {
1406 serde_json::Value::String(text) => {
1407 vec![crate::provider::ContentPart::Text { text: text.clone() }]
1408 }
1409 serde_json::Value::Array(parts) => {
1410 parts.iter().filter_map(parse_openai_content_part).collect()
1411 }
1412 serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1413 _ => Vec::new(),
1414 }
1415}
1416
1417fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1418 parse_openai_content_parts(content)
1419 .into_iter()
1420 .filter_map(|part| match part {
1421 crate::provider::ContentPart::Text { text } => Some(text),
1422 _ => None,
1423 })
1424 .collect::<Vec<_>>()
1425 .join("\n")
1426}
1427
1428fn convert_openai_messages(
1429 messages: &[OpenAiRequestMessage],
1430) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1431 let mut converted = Vec::with_capacity(messages.len());
1432
1433 for (index, message) in messages.iter().enumerate() {
1434 let role = message.role.to_ascii_lowercase();
1435 match role.as_str() {
1436 "system" | "user" => {
1437 let content = parse_openai_content_parts(&message.content);
1438 if content.is_empty() {
1439 return Err(openai_bad_request(format!(
1440 "messages[{index}] must include text content"
1441 )));
1442 }
1443
1444 converted.push(crate::provider::Message {
1445 role: if role == "system" {
1446 crate::provider::Role::System
1447 } else {
1448 crate::provider::Role::User
1449 },
1450 content,
1451 });
1452 }
1453 "assistant" => {
1454 let mut content = parse_openai_content_parts(&message.content);
1455 for tool_call in &message.tool_calls {
1456 if tool_call.kind != "function" {
1457 return Err(openai_bad_request(format!(
1458 "messages[{index}].tool_calls only support `function`"
1459 )));
1460 }
1461 if tool_call.function.name.trim().is_empty() {
1462 return Err(openai_bad_request(format!(
1463 "messages[{index}].tool_calls[].function.name is required"
1464 )));
1465 }
1466
1467 content.push(crate::provider::ContentPart::ToolCall {
1468 id: tool_call.id.clone(),
1469 name: tool_call.function.name.clone(),
1470 arguments: tool_call.function.arguments.clone(),
1471 thought_signature: None,
1472 });
1473 }
1474
1475 if content.is_empty() {
1476 return Err(openai_bad_request(format!(
1477 "messages[{index}] must include `content` or `tool_calls` for assistant role"
1478 )));
1479 }
1480
1481 converted.push(crate::provider::Message {
1482 role: crate::provider::Role::Assistant,
1483 content,
1484 });
1485 }
1486 "tool" => {
1487 let tool_call_id = message
1488 .tool_call_id
1489 .as_ref()
1490 .map(|s| s.trim())
1491 .filter(|s| !s.is_empty())
1492 .ok_or_else(|| {
1493 openai_bad_request(format!(
1494 "messages[{index}].tool_call_id is required for tool role"
1495 ))
1496 })?
1497 .to_string();
1498
1499 converted.push(crate::provider::Message {
1500 role: crate::provider::Role::Tool,
1501 content: vec![crate::provider::ContentPart::ToolResult {
1502 tool_call_id,
1503 content: parse_openai_tool_content(&message.content),
1504 }],
1505 });
1506 }
1507 _ => {
1508 return Err(openai_bad_request(format!(
1509 "messages[{index}].role `{}` is not supported",
1510 message.role
1511 )));
1512 }
1513 }
1514 }
1515
1516 Ok(converted)
1517}
1518
1519fn convert_openai_tools(
1520 tools: &[OpenAiRequestTool],
1521) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1522 let mut converted = Vec::with_capacity(tools.len());
1523
1524 for (index, tool) in tools.iter().enumerate() {
1525 if tool.kind != "function" {
1526 return Err(openai_bad_request(format!(
1527 "tools[{index}].type `{}` is not supported",
1528 tool.kind
1529 )));
1530 }
1531 if tool.function.name.trim().is_empty() {
1532 return Err(openai_bad_request(format!(
1533 "tools[{index}].function.name is required"
1534 )));
1535 }
1536
1537 converted.push(crate::provider::ToolDefinition {
1538 name: tool.function.name.clone(),
1539 description: tool.function.description.clone(),
1540 parameters: tool.function.parameters.clone(),
1541 });
1542 }
1543
1544 Ok(converted)
1545}
1546
1547fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1548 match reason {
1549 crate::provider::FinishReason::Stop => "stop",
1550 crate::provider::FinishReason::Length => "length",
1551 crate::provider::FinishReason::ToolCalls => "tool_calls",
1552 crate::provider::FinishReason::ContentFilter => "content_filter",
1553 crate::provider::FinishReason::Error => "error",
1554 }
1555}
1556
1557fn convert_response_message(
1558 message: &crate::provider::Message,
1559) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1560 let mut texts = Vec::new();
1561 let mut tool_calls = Vec::new();
1562
1563 for part in &message.content {
1564 match part {
1565 crate::provider::ContentPart::Text { text } => {
1566 if !text.is_empty() {
1567 texts.push(text.clone());
1568 }
1569 }
1570 crate::provider::ContentPart::ToolCall {
1571 id,
1572 name,
1573 arguments,
1574 ..
1575 } => {
1576 tool_calls.push(OpenAiResponseToolCall {
1577 id: id.clone(),
1578 kind: "function".to_string(),
1579 function: OpenAiResponseToolCallFunction {
1580 name: name.clone(),
1581 arguments: arguments.clone(),
1582 },
1583 });
1584 }
1585 _ => {}
1586 }
1587 }
1588
1589 let content = if texts.is_empty() {
1590 None
1591 } else {
1592 Some(texts.join("\n"))
1593 };
1594 let tool_calls = if tool_calls.is_empty() {
1595 None
1596 } else {
1597 Some(tool_calls)
1598 };
1599
1600 (content, tool_calls)
1601}
1602
1603fn openai_stream_chunk(
1604 id: &str,
1605 created: i64,
1606 model: &str,
1607 delta: serde_json::Value,
1608 finish_reason: Option<&str>,
1609) -> serde_json::Value {
1610 let finish_reason = finish_reason
1611 .map(|value| serde_json::Value::String(value.to_string()))
1612 .unwrap_or(serde_json::Value::Null);
1613
1614 serde_json::json!({
1615 "id": id,
1616 "object": "chat.completion.chunk",
1617 "created": created,
1618 "model": model,
1619 "choices": [{
1620 "index": 0,
1621 "delta": delta,
1622 "finish_reason": finish_reason,
1623 }]
1624 })
1625}
1626
1627fn openai_stream_event(payload: &serde_json::Value) -> Event {
1628 Event::default().data(payload.to_string())
1629}
1630
1631async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1632 let registry = crate::provider::ProviderRegistry::from_vault()
1633 .await
1634 .map_err(|error| {
1635 tracing::error!(error = %error, "Failed to load providers from Vault");
1636 openai_internal_error(format!("failed to load providers: {error}"))
1637 })?;
1638
1639 let model_futures = registry.list().into_iter().map(|provider_id| {
1640 let provider_id = provider_id.to_string();
1641 let provider = registry.get(&provider_id);
1642
1643 async move {
1644 let Some(provider) = provider else {
1645 return Vec::new();
1646 };
1647
1648 let now = chrono::Utc::now().timestamp();
1649 match provider.list_models().await {
1650 Ok(models) => models
1651 .into_iter()
1652 .map(|model| OpenAiModel {
1653 id: make_openai_model_id(&provider_id, &model.id),
1654 object: "model".to_string(),
1655 created: now,
1656 owned_by: canonicalize_provider_name(&provider_id).to_string(),
1657 })
1658 .collect(),
1659 Err(error) => {
1660 tracing::warn!(
1661 provider = %provider_id,
1662 error = %error,
1663 "Failed to list models for provider"
1664 );
1665 Vec::new()
1666 }
1667 }
1668 }
1669 });
1670
1671 let mut data: Vec<OpenAiModel> = join_all(model_futures)
1672 .await
1673 .into_iter()
1674 .flatten()
1675 .collect();
1676 data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1677
1678 Ok(Json(OpenAiModelsResponse {
1679 object: "list".to_string(),
1680 data,
1681 }))
1682}
1683
1684async fn openai_chat_completions(
1685 Json(req): Json<OpenAiChatCompletionRequest>,
1686) -> Result<Response, OpenAiApiError> {
1687 let is_stream = req.stream.unwrap_or(false);
1688 if req.model.trim().is_empty() {
1689 return Err(openai_bad_request("`model` is required"));
1690 }
1691 if req.messages.is_empty() {
1692 return Err(openai_bad_request("`messages` must not be empty"));
1693 }
1694
1695 let registry = crate::provider::ProviderRegistry::from_vault()
1696 .await
1697 .map_err(|error| {
1698 tracing::error!(error = %error, "Failed to load providers from Vault");
1699 openai_internal_error(format!("failed to load providers: {error}"))
1700 })?;
1701
1702 let providers = registry.list();
1703 if providers.is_empty() {
1704 return Err(openai_error(
1705 StatusCode::SERVICE_UNAVAILABLE,
1706 "No providers are configured in Vault",
1707 "server_error",
1708 "no_providers",
1709 ));
1710 }
1711
1712 let normalized_model = normalize_model_reference(&req.model);
1713 let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1714 let model_id = if maybe_provider.is_some() {
1715 if model_id.trim().is_empty() {
1716 return Err(openai_bad_request(
1717 "Model must be in `provider/model` format",
1718 ));
1719 }
1720 model_id.to_string()
1721 } else {
1722 req.model.trim().to_string()
1723 };
1724
1725 let selected_provider = if let Some(provider_name) = maybe_provider {
1726 let provider_name = canonicalize_provider_name(provider_name);
1727 if providers.contains(&provider_name) {
1728 provider_name.to_string()
1729 } else {
1730 return Err(openai_bad_request(format!(
1731 "Provider `{provider_name}` is not configured in Vault"
1732 )));
1733 }
1734 } else if providers.len() == 1 {
1735 providers[0].to_string()
1736 } else if providers.contains(&"openai") {
1737 "openai".to_string()
1738 } else {
1739 return Err(openai_bad_request(
1740 "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1741 ));
1742 };
1743
1744 let provider = registry.get(&selected_provider).ok_or_else(|| {
1745 openai_internal_error(format!(
1746 "Provider `{selected_provider}` was available but could not be loaded"
1747 ))
1748 })?;
1749
1750 let messages = convert_openai_messages(&req.messages)?;
1751 let tools = convert_openai_tools(&req.tools)?;
1752 let stop = match req.stop {
1753 Some(OpenAiStop::Single(stop)) => vec![stop],
1754 Some(OpenAiStop::Multiple(stops)) => stops,
1755 None => Vec::new(),
1756 };
1757
1758 let completion_request = crate::provider::CompletionRequest {
1759 messages,
1760 tools,
1761 model: model_id.clone(),
1762 temperature: req.temperature,
1763 top_p: req.top_p,
1764 max_tokens: req.max_completion_tokens.or(req.max_tokens),
1765 stop,
1766 };
1767
1768 let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1769 let now = chrono::Utc::now().timestamp();
1770 let openai_model = make_openai_model_id(&selected_provider, &model_id);
1771
1772 if is_stream {
1773 let mut provider_stream =
1774 provider
1775 .complete_stream(completion_request)
1776 .await
1777 .map_err(|error| {
1778 tracing::warn!(
1779 provider = %selected_provider,
1780 model = %model_id,
1781 error = %error,
1782 "Streaming completion request failed"
1783 );
1784 openai_internal_error(error.to_string())
1785 })?;
1786
1787 let stream_id = chat_id.clone();
1788 let stream_model = openai_model.clone();
1789 let event_stream = async_stream::stream! {
1790 let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1791 let mut next_tool_call_index = 0usize;
1792 let mut saw_text = false;
1793 let mut saw_tool_calls = false;
1794
1795 let role_chunk = openai_stream_chunk(
1796 &stream_id,
1797 now,
1798 &stream_model,
1799 serde_json::json!({ "role": "assistant" }),
1800 None,
1801 );
1802 yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1803
1804 while let Some(chunk) = provider_stream.next().await {
1805 match chunk {
1806 crate::provider::StreamChunk::Text(text) => {
1807 if text.is_empty() {
1808 continue;
1809 }
1810 saw_text = true;
1811 let content_chunk = openai_stream_chunk(
1812 &stream_id,
1813 now,
1814 &stream_model,
1815 serde_json::json!({ "content": text }),
1816 None,
1817 );
1818 yield Ok(openai_stream_event(&content_chunk));
1819 }
1820 crate::provider::StreamChunk::ToolCallStart { id, name } => {
1821 saw_tool_calls = true;
1822 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1823 let value = next_tool_call_index;
1824 next_tool_call_index += 1;
1825 value
1826 });
1827 let tool_chunk = openai_stream_chunk(
1828 &stream_id,
1829 now,
1830 &stream_model,
1831 serde_json::json!({
1832 "tool_calls": [{
1833 "index": index,
1834 "id": id,
1835 "type": "function",
1836 "function": {
1837 "name": name,
1838 "arguments": "",
1839 }
1840 }]
1841 }),
1842 None,
1843 );
1844 yield Ok(openai_stream_event(&tool_chunk));
1845 }
1846 crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1847 if arguments_delta.is_empty() {
1848 continue;
1849 }
1850 saw_tool_calls = true;
1851 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1852 let value = next_tool_call_index;
1853 next_tool_call_index += 1;
1854 value
1855 });
1856 let tool_delta_chunk = openai_stream_chunk(
1857 &stream_id,
1858 now,
1859 &stream_model,
1860 serde_json::json!({
1861 "tool_calls": [{
1862 "index": index,
1863 "id": id,
1864 "type": "function",
1865 "function": {
1866 "arguments": arguments_delta,
1867 }
1868 }]
1869 }),
1870 None,
1871 );
1872 yield Ok(openai_stream_event(&tool_delta_chunk));
1873 }
1874 crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1875 crate::provider::StreamChunk::Done { .. } => {}
1876 crate::provider::StreamChunk::Error(error) => {
1877 let error_chunk = openai_stream_chunk(
1878 &stream_id,
1879 now,
1880 &stream_model,
1881 serde_json::json!({ "content": error }),
1882 Some("error"),
1883 );
1884 yield Ok(openai_stream_event(&error_chunk));
1885 yield Ok(Event::default().data("[DONE]"));
1886 return;
1887 }
1888 }
1889 }
1890
1891 let finish_reason = if saw_tool_calls && !saw_text {
1892 "tool_calls"
1893 } else {
1894 "stop"
1895 };
1896 let final_chunk = openai_stream_chunk(
1897 &stream_id,
1898 now,
1899 &stream_model,
1900 serde_json::json!({}),
1901 Some(finish_reason),
1902 );
1903 yield Ok(openai_stream_event(&final_chunk));
1904 yield Ok(Event::default().data("[DONE]"));
1905 };
1906
1907 return Ok(Sse::new(event_stream)
1908 .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1909 .into_response());
1910 }
1911
1912 let completion = provider
1913 .complete(completion_request)
1914 .await
1915 .map_err(|error| {
1916 tracing::warn!(
1917 provider = %selected_provider,
1918 model = %model_id,
1919 error = %error,
1920 "Completion request failed"
1921 );
1922 openai_internal_error(error.to_string())
1923 })?;
1924
1925 let (content, tool_calls) = convert_response_message(&completion.message);
1926
1927 Ok(Json(OpenAiChatCompletionResponse {
1928 id: chat_id,
1929 object: "chat.completion".to_string(),
1930 created: now,
1931 model: openai_model,
1932 choices: vec![OpenAiChatCompletionChoice {
1933 index: 0,
1934 message: OpenAiChatCompletionMessage {
1935 role: "assistant".to_string(),
1936 content,
1937 tool_calls,
1938 },
1939 finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
1940 }],
1941 usage: OpenAiUsage {
1942 prompt_tokens: completion.usage.prompt_tokens,
1943 completion_tokens: completion.usage.completion_tokens,
1944 total_tokens: completion.usage.total_tokens,
1945 },
1946 })
1947 .into_response())
1948}
1949
1950async fn start_cognition(
1951 State(state): State<AppState>,
1952 payload: Option<Json<StartCognitionRequest>>,
1953) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1954 state
1955 .cognition
1956 .start(payload.map(|Json(body)| body))
1957 .await
1958 .map(Json)
1959 .map_err(internal_error)
1960}
1961
1962async fn stop_cognition(
1963 State(state): State<AppState>,
1964 payload: Option<Json<StopCognitionRequest>>,
1965) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1966 let reason = payload.and_then(|Json(body)| body.reason);
1967 state
1968 .cognition
1969 .stop(reason)
1970 .await
1971 .map(Json)
1972 .map_err(internal_error)
1973}
1974
1975async fn get_cognition_status(
1976 State(state): State<AppState>,
1977) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1978 Ok(Json(state.cognition.status().await))
1979}
1980
1981async fn stream_cognition(
1982 State(state): State<AppState>,
1983) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
1984 let rx = state.cognition.subscribe_events();
1985
1986 let event_stream = stream::unfold(rx, |mut rx| async move {
1987 match rx.recv().await {
1988 Ok(event) => {
1989 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
1990 let sse_event = Event::default().event("cognition").data(payload);
1991 Some((Ok(sse_event), rx))
1992 }
1993 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
1994 let lag_event = Event::default()
1995 .event("lag")
1996 .data(format!("skipped {}", skipped));
1997 Some((Ok(lag_event), rx))
1998 }
1999 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2000 }
2001 });
2002
2003 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2004}
2005
2006async fn stream_bus_events(
2012 State(state): State<AppState>,
2013 req: Request<Body>,
2014) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2015 let allowed_topics: Vec<String> = req
2017 .extensions()
2018 .get::<crate::server::auth::JwtClaims>()
2019 .map(|claims| claims.topics.clone())
2020 .unwrap_or_default();
2021
2022 let bus_handle = state.bus.handle("stream_bus_events");
2024 let rx = bus_handle.into_receiver();
2025
2026 let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2027 let allowed_topics = allowed_topics.clone();
2028 async move {
2029 match rx.recv().await {
2030 Ok(envelope) => {
2031 let should_send = allowed_topics.is_empty()
2033 || allowed_topics
2034 .iter()
2035 .any(|pattern| topic_matches(&envelope.topic, pattern));
2036
2037 if should_send {
2038 let payload =
2039 serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2040 let sse_event = Event::default().event("bus").data(payload);
2041 return Some((Ok(sse_event), rx));
2042 }
2043
2044 Some((Ok(Event::default().event("keepalive").data("")), rx))
2046 }
2047 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2048 let lag_event = Event::default()
2049 .event("lag")
2050 .data(format!("skipped {}", skipped));
2051 Some((Ok(lag_event), rx))
2052 }
2053 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2054 }
2055 }
2056 });
2057
2058 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2059}
2060
2061fn topic_matches(topic: &str, pattern: &str) -> bool {
2064 if pattern == "*" {
2065 return true;
2066 }
2067 if pattern.ends_with(".*") {
2068 let prefix = &pattern[..pattern.len() - 2];
2069 return topic.starts_with(prefix);
2070 }
2071 if pattern.starts_with(".*") {
2072 let suffix = &pattern[2..];
2073 return topic.ends_with(suffix);
2074 }
2075 topic == pattern
2076}
2077
2078async fn get_latest_snapshot(
2079 State(state): State<AppState>,
2080) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2081 match state.cognition.latest_snapshot().await {
2082 Some(snapshot) => Ok(Json(snapshot)),
2083 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2084 }
2085}
2086
2087async fn create_persona(
2088 State(state): State<AppState>,
2089 Json(req): Json<CreatePersonaRequest>,
2090) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2091 state
2092 .cognition
2093 .create_persona(req)
2094 .await
2095 .map(Json)
2096 .map_err(internal_error)
2097}
2098
2099async fn spawn_persona(
2100 State(state): State<AppState>,
2101 Path(id): Path<String>,
2102 Json(req): Json<SpawnPersonaRequest>,
2103) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2104 state
2105 .cognition
2106 .spawn_child(&id, req)
2107 .await
2108 .map(Json)
2109 .map_err(internal_error)
2110}
2111
2112async fn reap_persona(
2113 State(state): State<AppState>,
2114 Path(id): Path<String>,
2115 payload: Option<Json<ReapPersonaRequest>>,
2116) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2117 let req = payload
2118 .map(|Json(body)| body)
2119 .unwrap_or(ReapPersonaRequest {
2120 cascade: Some(false),
2121 reason: None,
2122 });
2123
2124 state
2125 .cognition
2126 .reap_persona(&id, req)
2127 .await
2128 .map(Json)
2129 .map_err(internal_error)
2130}
2131
2132async fn get_swarm_lineage(
2133 State(state): State<AppState>,
2134) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2135 Ok(Json(state.cognition.lineage_graph().await))
2136}
2137
2138#[derive(Deserialize)]
2141struct BeliefFilter {
2142 status: Option<String>,
2143 persona: Option<String>,
2144}
2145
2146async fn list_beliefs(
2147 State(state): State<AppState>,
2148 Query(filter): Query<BeliefFilter>,
2149) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2150 let beliefs = state.cognition.get_beliefs().await;
2151 let mut result: Vec<Belief> = beliefs.into_values().collect();
2152
2153 if let Some(status) = &filter.status {
2154 result.retain(|b| {
2155 let s = serde_json::to_string(&b.status).unwrap_or_default();
2156 s.contains(status)
2157 });
2158 }
2159 if let Some(persona) = &filter.persona {
2160 result.retain(|b| &b.asserted_by == persona);
2161 }
2162
2163 result.sort_by(|a, b| {
2164 b.confidence
2165 .partial_cmp(&a.confidence)
2166 .unwrap_or(std::cmp::Ordering::Equal)
2167 });
2168 Ok(Json(result))
2169}
2170
2171async fn get_belief(
2172 State(state): State<AppState>,
2173 Path(id): Path<String>,
2174) -> Result<Json<Belief>, (StatusCode, String)> {
2175 match state.cognition.get_belief(&id).await {
2176 Some(belief) => Ok(Json(belief)),
2177 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2178 }
2179}
2180
2181async fn list_attention(
2182 State(state): State<AppState>,
2183) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2184 Ok(Json(state.cognition.get_attention_queue().await))
2185}
2186
2187async fn list_proposals(
2188 State(state): State<AppState>,
2189) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2190 let proposals = state.cognition.get_proposals().await;
2191 let mut result: Vec<Proposal> = proposals.into_values().collect();
2192 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2193 Ok(Json(result))
2194}
2195
2196async fn approve_proposal(
2197 State(state): State<AppState>,
2198 Path(id): Path<String>,
2199) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2200 state
2201 .cognition
2202 .approve_proposal(&id)
2203 .await
2204 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2205 .map_err(internal_error)
2206}
2207
2208async fn list_receipts(
2209 State(state): State<AppState>,
2210) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2211 Ok(Json(state.cognition.get_receipts().await))
2212}
2213
2214async fn get_workspace(
2215 State(state): State<AppState>,
2216) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2217 Ok(Json(state.cognition.get_workspace().await))
2218}
2219
2220async fn get_governance(
2221 State(state): State<AppState>,
2222) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2223 Ok(Json(state.cognition.get_governance().await))
2224}
2225
2226async fn get_persona(
2227 State(state): State<AppState>,
2228 axum::extract::Path(id): axum::extract::Path<String>,
2229) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2230 state
2231 .cognition
2232 .get_persona(&id)
2233 .await
2234 .map(Json)
2235 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2236}
2237
2238#[derive(Deserialize)]
2241struct AuditQuery {
2242 limit: Option<usize>,
2243 category: Option<String>,
2244}
2245
2246async fn list_audit_entries(
2247 State(state): State<AppState>,
2248 Query(query): Query<AuditQuery>,
2249) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2250 let limit = query.limit.unwrap_or(100).min(1000);
2251
2252 let entries = if let Some(ref cat) = query.category {
2253 let category = match cat.as_str() {
2254 "api" => AuditCategory::Api,
2255 "tool" | "tool_execution" => AuditCategory::ToolExecution,
2256 "session" => AuditCategory::Session,
2257 "cognition" => AuditCategory::Cognition,
2258 "swarm" => AuditCategory::Swarm,
2259 "auth" => AuditCategory::Auth,
2260 "k8s" => AuditCategory::K8s,
2261 "sandbox" => AuditCategory::Sandbox,
2262 "config" => AuditCategory::Config,
2263 _ => {
2264 return Err((
2265 StatusCode::BAD_REQUEST,
2266 format!("Unknown category: {}", cat),
2267 ));
2268 }
2269 };
2270 state.audit_log.by_category(category, limit).await
2271 } else {
2272 state.audit_log.recent(limit).await
2273 };
2274
2275 Ok(Json(entries))
2276}
2277
2278#[derive(Deserialize)]
2283struct ReplayQuery {
2284 session_id: String,
2286 start_offset: Option<u64>,
2288 end_offset: Option<u64>,
2290 limit: Option<usize>,
2292 tool_name: Option<String>,
2294}
2295
2296async fn replay_session_events(
2300 Query(query): Query<ReplayQuery>,
2301) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2302 use std::path::PathBuf;
2303
2304 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2305 .map(PathBuf::from)
2306 .ok()
2307 .ok_or_else(|| {
2308 (
2309 StatusCode::SERVICE_UNAVAILABLE,
2310 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2311 )
2312 })?;
2313
2314 let session_dir = base_dir.join(&query.session_id);
2315
2316 if !session_dir.exists() {
2318 return Err((
2319 StatusCode::NOT_FOUND,
2320 format!("Session not found: {}", query.session_id),
2321 ));
2322 }
2323
2324 let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2325
2326 let entries = std::fs::read_dir(&session_dir)
2328 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2329
2330 for entry in entries {
2331 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2332 let path = entry.path();
2333
2334 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2335 continue;
2336 }
2337
2338 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2340
2341 if let Some(offsets) = filename
2342 .strip_prefix("T")
2343 .or_else(|| filename.strip_prefix("202"))
2344 {
2345 let parts: Vec<&str> = offsets.split('-').collect();
2347 if parts.len() >= 4 {
2348 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2349 let end: u64 = parts[parts.len() - 1]
2350 .trim_end_matches(".jsonl")
2351 .parse()
2352 .unwrap_or(0);
2353
2354 if let Some(query_start) = query.start_offset {
2356 if end <= query_start {
2357 continue;
2358 }
2359 }
2360 if let Some(query_end) = query.end_offset {
2361 if start >= query_end {
2362 continue;
2363 }
2364 }
2365
2366 let content = std::fs::read_to_string(&path)
2368 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2369
2370 for line in content.lines() {
2371 if line.trim().is_empty() {
2372 continue;
2373 }
2374 if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2375 if let Some(ref tool_filter) = query.tool_name {
2377 if let Some(event_tool) =
2378 event.get("tool_name").and_then(|v| v.as_str())
2379 {
2380 if event_tool != tool_filter {
2381 continue;
2382 }
2383 } else {
2384 continue;
2385 }
2386 }
2387 all_events.push((start, end, event));
2388 }
2389 }
2390 }
2391 }
2392 }
2393
2394 all_events.sort_by_key(|(s, _, _)| *s);
2396
2397 let limit = query.limit.unwrap_or(1000).min(10000);
2399 let events: Vec<_> = all_events
2400 .into_iter()
2401 .take(limit)
2402 .map(|(_, _, e)| e)
2403 .collect();
2404
2405 Ok(Json(events))
2406}
2407
2408async fn list_session_event_files(
2410 Query(query): Query<ReplayQuery>,
2411) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2412 use std::path::PathBuf;
2413
2414 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2415 .map(PathBuf::from)
2416 .ok()
2417 .ok_or_else(|| {
2418 (
2419 StatusCode::SERVICE_UNAVAILABLE,
2420 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2421 )
2422 })?;
2423
2424 let session_dir = base_dir.join(&query.session_id);
2425 if !session_dir.exists() {
2426 return Err((
2427 StatusCode::NOT_FOUND,
2428 format!("Session not found: {}", query.session_id),
2429 ));
2430 }
2431
2432 let mut files: Vec<EventFileMeta> = Vec::new();
2433
2434 let entries = std::fs::read_dir(&session_dir)
2436 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2437
2438 for entry in entries {
2439 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2440 let path = entry.path();
2441
2442 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2443 continue;
2444 }
2445
2446 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2447
2448 if let Some(offsets) = filename
2450 .strip_prefix("T")
2451 .or_else(|| filename.strip_prefix("202"))
2452 {
2453 let parts: Vec<&str> = offsets.split('-').collect();
2454 if parts.len() >= 4 {
2455 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2456 let end: u64 = parts[parts.len() - 1]
2457 .trim_end_matches(".jsonl")
2458 .parse()
2459 .unwrap_or(0);
2460
2461 let metadata = std::fs::metadata(&path)
2462 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2463
2464 files.push(EventFileMeta {
2465 filename: filename.to_string(),
2466 start_offset: start,
2467 end_offset: end,
2468 size_bytes: metadata.len(),
2469 });
2470 }
2471 }
2472 }
2473
2474 files.sort_by_key(|f| f.start_offset);
2475 Ok(Json(files))
2476}
2477
2478#[derive(Serialize)]
2479struct EventFileMeta {
2480 filename: String,
2481 start_offset: u64,
2482 end_offset: u64,
2483 size_bytes: u64,
2484}
2485
2486async fn get_k8s_status(
2489 State(state): State<AppState>,
2490) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2491 Ok(Json(state.k8s.status().await))
2492}
2493
2494#[derive(Deserialize)]
2495struct ScaleRequest {
2496 replicas: i32,
2497}
2498
2499async fn k8s_scale(
2500 State(state): State<AppState>,
2501 Json(req): Json<ScaleRequest>,
2502) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2503 if req.replicas < 0 || req.replicas > 100 {
2504 return Err((
2505 StatusCode::BAD_REQUEST,
2506 "Replicas must be between 0 and 100".to_string(),
2507 ));
2508 }
2509
2510 state
2511 .audit_log
2512 .log(
2513 AuditCategory::K8s,
2514 format!("scale:{}", req.replicas),
2515 AuditOutcome::Success,
2516 None,
2517 None,
2518 )
2519 .await;
2520
2521 state
2522 .k8s
2523 .scale(req.replicas)
2524 .await
2525 .map(Json)
2526 .map_err(internal_error)
2527}
2528
2529async fn k8s_restart(
2530 State(state): State<AppState>,
2531) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2532 state
2533 .audit_log
2534 .log(
2535 AuditCategory::K8s,
2536 "rolling_restart",
2537 AuditOutcome::Success,
2538 None,
2539 None,
2540 )
2541 .await;
2542
2543 state
2544 .k8s
2545 .rolling_restart()
2546 .await
2547 .map(Json)
2548 .map_err(internal_error)
2549}
2550
2551async fn k8s_list_pods(
2552 State(state): State<AppState>,
2553) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2554 state
2555 .k8s
2556 .list_pods()
2557 .await
2558 .map(Json)
2559 .map_err(internal_error)
2560}
2561
2562async fn k8s_actions(
2563 State(state): State<AppState>,
2564) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2565 Ok(Json(state.k8s.recent_actions(100).await))
2566}
2567
2568#[derive(Deserialize)]
2569struct SpawnSubagentRequest {
2570 subagent_id: String,
2571 #[serde(default)]
2572 image: Option<String>,
2573 #[serde(default)]
2574 env_vars: std::collections::HashMap<String, String>,
2575 #[serde(default)]
2576 labels: std::collections::HashMap<String, String>,
2577 #[serde(default)]
2578 command: Option<Vec<String>>,
2579 #[serde(default)]
2580 args: Option<Vec<String>>,
2581}
2582
2583async fn k8s_spawn_subagent(
2584 State(state): State<AppState>,
2585 Json(req): Json<SpawnSubagentRequest>,
2586) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2587 state
2588 .k8s
2589 .spawn_subagent_pod_with_spec(
2590 &req.subagent_id,
2591 crate::k8s::SubagentPodSpec {
2592 image: req.image,
2593 env_vars: req.env_vars,
2594 labels: req.labels,
2595 command: req.command,
2596 args: req.args,
2597 },
2598 )
2599 .await
2600 .map(Json)
2601 .map_err(internal_error)
2602}
2603
2604async fn k8s_delete_subagent(
2605 State(state): State<AppState>,
2606 axum::extract::Path(id): axum::extract::Path<String>,
2607) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2608 state
2609 .k8s
2610 .delete_subagent_pod(&id)
2611 .await
2612 .map(Json)
2613 .map_err(internal_error)
2614}
2615
2616#[derive(Debug, Deserialize)]
2618pub struct RegisterToolRequest {
2619 pub id: String,
2620 pub name: String,
2621 pub description: String,
2622 pub version: String,
2623 pub endpoint: String,
2624 pub capabilities: Vec<String>,
2625 pub parameters: serde_json::Value,
2626}
2627
2628#[derive(Serialize)]
2630struct RegisterToolResponse {
2631 tool: RegisteredTool,
2632 message: String,
2633}
2634
2635async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredTool>> {
2637 Json(state.tool_registry.list().await)
2638}
2639
2640async fn register_tool(
2642 State(state): State<AppState>,
2643 Json(req): Json<RegisterToolRequest>,
2644) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2645 let now = chrono::Utc::now();
2646 let tool = RegisteredTool {
2647 id: req.id.clone(),
2648 name: req.name,
2649 description: req.description,
2650 version: req.version,
2651 endpoint: req.endpoint,
2652 capabilities: req.capabilities,
2653 parameters: req.parameters,
2654 registered_at: now,
2655 last_heartbeat: now,
2656 expires_at: now + Duration::from_secs(90),
2657 };
2658
2659 state.tool_registry.register(tool.clone()).await;
2660
2661 tracing::info!(tool_id = %tool.id, "Tool registered");
2662
2663 Ok(Json(RegisterToolResponse {
2664 tool,
2665 message: "Tool registered successfully. Heartbeat required every 30s.".to_string(),
2666 }))
2667}
2668
2669async fn tool_heartbeat(
2671 State(state): State<AppState>,
2672 Path(id): Path<String>,
2673) -> Result<Json<RegisteredTool>, (StatusCode, String)> {
2674 state
2675 .tool_registry
2676 .heartbeat(&id)
2677 .await
2678 .map(|tool| {
2679 tracing::info!(tool_id = %id, "Tool heartbeat received");
2680 Json(tool)
2681 })
2682 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2683}
2684
2685async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
2687 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
2688 let signing_key = SigningKey::from_env();
2689 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
2690 Json(PluginListResponse {
2691 server_fingerprint,
2692 signing_available: !test_sig.is_empty(),
2693 plugins: Vec::<PluginManifest>::new(),
2694 })
2695}
2696
2697#[derive(Serialize)]
2698struct PluginListResponse {
2699 server_fingerprint: String,
2700 signing_available: bool,
2701 plugins: Vec<PluginManifest>,
2702}
2703
2704async fn list_agent_tasks(
2710 State(state): State<AppState>,
2711 Query(params): Query<ListAgentTasksQuery>,
2712) -> Json<serde_json::Value> {
2713 let tasks = state.knative_tasks.list().await;
2714 let filtered: Vec<_> = tasks
2715 .into_iter()
2716 .filter(|t| params.status.as_ref().map_or(true, |s| t.status == *s))
2717 .filter(|t| {
2718 params
2719 .agent_type
2720 .as_ref()
2721 .map_or(true, |a| t.agent_type == *a)
2722 })
2723 .collect();
2724 Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2725}
2726
2727#[derive(Deserialize)]
2728struct ListAgentTasksQuery {
2729 status: Option<String>,
2730 agent_type: Option<String>,
2731}
2732
2733async fn create_agent_task(
2735 State(state): State<AppState>,
2736 Json(req): Json<CreateAgentTaskRequest>,
2737) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2738 let task_id = uuid::Uuid::new_v4().to_string();
2739 let task = KnativeTask {
2740 task_id: task_id.clone(),
2741 title: req.title,
2742 description: req.description,
2743 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2744 priority: req.priority.unwrap_or(0),
2745 received_at: chrono::Utc::now(),
2746 status: "pending".to_string(),
2747 };
2748 state.knative_tasks.push(task).await;
2749 Ok(Json(serde_json::json!({
2750 "task_id": task_id,
2751 "status": "pending",
2752 })))
2753}
2754
2755#[derive(Deserialize)]
2756#[allow(dead_code)]
2757struct CreateAgentTaskRequest {
2758 title: String,
2759 description: String,
2760 agent_type: Option<String>,
2761 model: Option<String>,
2762 priority: Option<i32>,
2763}
2764
2765async fn get_agent_task(
2767 State(state): State<AppState>,
2768 Path(task_id): Path<String>,
2769) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2770 state
2771 .knative_tasks
2772 .get(&task_id)
2773 .await
2774 .map(Json)
2775 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2776}
2777
2778async fn get_agent_task_output(
2780 State(state): State<AppState>,
2781 Path(task_id): Path<String>,
2782) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2783 let task = state
2784 .knative_tasks
2785 .get(&task_id)
2786 .await
2787 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2788 Ok(Json(serde_json::json!({
2789 "task_id": task.task_id,
2790 "status": task.status,
2791 "title": task.title,
2792 "output": null,
2793 })))
2794}
2795
2796async fn stream_agent_task_output(
2798 State(state): State<AppState>,
2799 Path(task_id): Path<String>,
2800) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2801 state
2803 .knative_tasks
2804 .get(&task_id)
2805 .await
2806 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2807
2808 let bus_handle = state.bus.handle("task-stream");
2809 let rx = bus_handle.into_receiver();
2810 let topic_prefix = format!("task.{task_id}");
2811
2812 let stream = async_stream::try_stream! {
2813 let mut rx = rx;
2814 loop {
2815 match rx.recv().await {
2816 Ok(envelope) => {
2817 if !envelope.topic.starts_with(&topic_prefix) {
2818 continue;
2819 }
2820 let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2821 yield Event::default().event("output").data(data);
2822 }
2823 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2824 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2825 }
2826 }
2827 };
2828
2829 Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2830}
2831
2832async fn list_connected_workers(
2836 State(state): State<AppState>,
2837) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2838 let pods = state.k8s.list_pods().await.unwrap_or_default();
2840 let workers: Vec<serde_json::Value> = pods
2841 .iter()
2842 .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2843 .map(|p| {
2844 serde_json::json!({
2845 "worker_id": p.name,
2846 "name": p.name,
2847 "status": p.phase,
2848 "is_sse_connected": p.phase == "Running",
2849 "last_seen": chrono::Utc::now().to_rfc3339(),
2850 })
2851 })
2852 .collect();
2853 Ok(Json(serde_json::json!({ "workers": workers })))
2854}
2855
2856async fn dispatch_task(
2860 State(state): State<AppState>,
2861 Json(req): Json<DispatchTaskRequest>,
2862) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2863 let task_id = uuid::Uuid::new_v4().to_string();
2864 let task = KnativeTask {
2865 task_id: task_id.clone(),
2866 title: req.title,
2867 description: req.description,
2868 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2869 priority: req.priority.unwrap_or(0),
2870 received_at: chrono::Utc::now(),
2871 status: "pending".to_string(),
2872 };
2873
2874 let handle = state.bus.handle("task-dispatch");
2876 handle.send(
2877 format!("task.{}", task_id),
2878 crate::bus::BusMessage::TaskUpdate {
2879 task_id: task_id.clone(),
2880 state: crate::a2a::types::TaskState::Submitted,
2881 message: Some(format!("Task dispatched: {}", task.title)),
2882 },
2883 );
2884
2885 state.knative_tasks.push(task).await;
2886
2887 Ok(Json(serde_json::json!({
2888 "task_id": task_id,
2889 "status": "pending",
2890 "dispatched_via_knative": true,
2891 })))
2892}
2893
2894#[derive(Deserialize)]
2895#[allow(dead_code)]
2896struct DispatchTaskRequest {
2897 title: String,
2898 description: String,
2899 agent_type: Option<String>,
2900 model: Option<String>,
2901 priority: Option<i32>,
2902 metadata: Option<serde_json::Value>,
2903}
2904
2905async fn create_voice_session_rest(
2911 State(state): State<AppState>,
2912 Json(req): Json<CreateVoiceSessionRequest>,
2913) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2914 let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
2915 let room_name = format!("voice-{}", uuid::Uuid::new_v4());
2916
2917 let handle = state.bus.handle("voice-rest");
2919 handle.send_voice_session_started(&room_name, &voice_id);
2920
2921 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2922
2923 Ok(Json(serde_json::json!({
2924 "room_name": room_name,
2925 "voice": voice_id,
2926 "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
2927 "access_token": "",
2928 "livekit_url": livekit_url,
2929 "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
2930 })))
2931}
2932
2933#[derive(Deserialize)]
2934#[allow(dead_code)]
2935struct CreateVoiceSessionRequest {
2936 voice: Option<String>,
2937 mode: Option<String>,
2938 codebase_id: Option<String>,
2939 user_id: Option<String>,
2940}
2941
2942async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
2944 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2945 Json(serde_json::json!({
2946 "room_name": room_name,
2947 "state": "active",
2948 "agent_state": "listening",
2949 "access_token": "",
2950 "livekit_url": livekit_url,
2951 }))
2952}
2953
2954async fn delete_voice_session_rest(
2956 State(state): State<AppState>,
2957 Path(room_name): Path<String>,
2958) -> Json<serde_json::Value> {
2959 let handle = state.bus.handle("voice-rest");
2960 handle.send_voice_session_ended(&room_name, "user_ended");
2961 Json(serde_json::json!({ "status": "deleted" }))
2962}
2963
2964async fn list_voices_rest() -> Json<serde_json::Value> {
2966 Json(serde_json::json!({
2967 "voices": [
2968 { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
2969 { "voice_id": "puck", "name": "Puck", "language": "english" },
2970 { "voice_id": "charon", "name": "Charon", "language": "english" },
2971 { "voice_id": "kore", "name": "Kore", "language": "english" },
2972 { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
2973 { "voice_id": "aoede", "name": "Aoede", "language": "english" },
2974 ]
2975 }))
2976}
2977
2978async fn resume_codebase_session(
2982 Path((_codebase_id, session_id)): Path<(String, String)>,
2983 Json(req): Json<ResumeSessionRequest>,
2984) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2985 let mut session = match crate::session::Session::load(&session_id).await {
2987 Ok(s) => s,
2988 Err(_) => {
2989 let mut s = crate::session::Session::new()
2991 .await
2992 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2993 if let Some(agent) = &req.agent {
2994 s.agent = agent.clone();
2995 }
2996 s.save()
2997 .await
2998 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2999 s
3000 }
3001 };
3002
3003 if let Some(prompt) = &req.prompt {
3005 if !prompt.is_empty() {
3006 match session.prompt(prompt).await {
3007 Ok(result) => {
3008 session.save().await.ok();
3009 return Ok(Json(serde_json::json!({
3010 "session_id": session.id,
3011 "active_session_id": session.id,
3012 "status": "completed",
3013 "result": result.text,
3014 })));
3015 }
3016 Err(e) => {
3017 return Ok(Json(serde_json::json!({
3018 "session_id": session.id,
3019 "active_session_id": session.id,
3020 "status": "failed",
3021 "error": e.to_string(),
3022 })));
3023 }
3024 }
3025 }
3026 }
3027
3028 Ok(Json(serde_json::json!({
3030 "session_id": session.id,
3031 "active_session_id": session.id,
3032 "status": "ready",
3033 })))
3034}
3035
3036#[derive(Deserialize)]
3037#[allow(dead_code)]
3038struct ResumeSessionRequest {
3039 prompt: Option<String>,
3040 agent: Option<String>,
3041 model: Option<String>,
3042}
3043
3044fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3045 let message = error.to_string();
3046 if message.contains("not found") {
3047 return (StatusCode::NOT_FOUND, message);
3048 }
3049 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3050 return (StatusCode::BAD_REQUEST, message);
3051 }
3052 (StatusCode::INTERNAL_SERVER_ERROR, message)
3053}
3054
3055fn env_bool(name: &str, default: bool) -> bool {
3056 std::env::var(name)
3057 .ok()
3058 .and_then(|v| match v.to_ascii_lowercase().as_str() {
3059 "1" | "true" | "yes" | "on" => Some(true),
3060 "0" | "false" | "no" | "off" => Some(false),
3061 _ => None,
3062 })
3063 .unwrap_or(default)
3064}
3065
3066#[cfg(test)]
3067mod tests {
3068 use super::{match_policy_rule, normalize_model_reference};
3069
3070 #[test]
3071 fn policy_prompt_session_requires_execute_permission() {
3072 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3073 assert_eq!(permission, Some("agent:execute"));
3074 }
3075
3076 #[test]
3077 fn policy_create_session_keeps_sessions_write_permission() {
3078 let permission = match_policy_rule("/api/session", "POST");
3079 assert_eq!(permission, Some("sessions:write"));
3080 }
3081
3082 #[test]
3083 fn policy_proposal_approval_requires_execute_permission() {
3084 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3085 assert_eq!(permission, Some("agent:execute"));
3086 }
3087
3088 #[test]
3089 fn policy_openai_models_requires_read_permission() {
3090 let permission = match_policy_rule("/v1/models", "GET");
3091 assert_eq!(permission, Some("agent:read"));
3092 }
3093
3094 #[test]
3095 fn policy_openai_chat_completions_requires_execute_permission() {
3096 let permission = match_policy_rule("/v1/chat/completions", "POST");
3097 assert_eq!(permission, Some("agent:execute"));
3098 }
3099
3100 #[test]
3101 fn normalize_model_reference_accepts_colon_format() {
3102 assert_eq!(
3103 normalize_model_reference("openai:gpt-4o"),
3104 "openai/gpt-4o".to_string()
3105 );
3106 }
3107}