1pub mod auth;
6pub mod policy;
7mod policy_user;
8mod tool_contract;
9mod worker_stream;
10mod worker_task_claim;
11mod worker_task_release;
12mod worker_task_stream;
13
14use crate::a2a;
15use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
16use crate::bus::{AgentBus, BusEnvelope};
17use crate::cli::ServeArgs;
18use crate::cloudevents::parse_cloud_event;
19use crate::cognition::{
20 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
21 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
22 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
23 executor::DecisionReceipt,
24};
25use crate::config::Config;
26use crate::k8s::K8sManager;
27use crate::tool::{PluginManifest, PluginRegistry as AgentPluginRegistry, hash_bytes, hash_file};
28use anyhow::Result;
29use auth::AuthState;
30use axum::{
31 Router,
32 body::Body,
33 extract::Path,
34 extract::{Query, State},
35 http::{HeaderMap, Method, Request, StatusCode},
36 middleware::{self, Next},
37 response::sse::{Event, KeepAlive, Sse},
38 response::{IntoResponse, Json, Response},
39 routing::{get, post},
40};
41use futures::{StreamExt, future::join_all, stream};
42use http::HeaderValue;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::convert::Infallible;
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::{Mutex, broadcast};
49use tonic_web::GrpcWebLayer;
50use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
51use tower_http::trace::TraceLayer;
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct KnativeTask {
56 pub task_id: String,
57 pub title: String,
58 pub description: String,
59 pub agent_type: String,
60 pub priority: i32,
61 pub received_at: chrono::DateTime<chrono::Utc>,
62 pub status: String,
63}
64
65#[derive(Clone)]
67pub struct KnativeTaskQueue {
68 tasks: Arc<Mutex<Vec<KnativeTask>>>,
69}
70
71impl KnativeTaskQueue {
72 pub fn new() -> Self {
73 Self {
74 tasks: Arc::new(Mutex::new(Vec::new())),
75 }
76 }
77
78 pub async fn push(&self, task: KnativeTask) {
79 self.tasks.lock().await.push(task);
80 }
81
82 #[allow(dead_code)]
83 pub async fn pop(&self) -> Option<KnativeTask> {
84 self.tasks.lock().await.pop()
85 }
86
87 pub async fn list(&self) -> Vec<KnativeTask> {
88 self.tasks.lock().await.clone()
89 }
90
91 #[allow(dead_code)]
92 pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
93 self.tasks
94 .lock()
95 .await
96 .iter()
97 .find(|t| t.task_id == task_id)
98 .cloned()
99 }
100
101 pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
102 let mut tasks = self.tasks.lock().await;
103 if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
104 task.status = status.to_string();
105 true
106 } else {
107 false
108 }
109 }
110}
111
112impl Default for KnativeTaskQueue {
113 fn default() -> Self {
114 Self::new()
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct RegisteredTool {
121 pub id: String,
122 pub name: String,
123 pub description: String,
124 pub version: String,
125 pub endpoint: String,
126 pub capabilities: Vec<String>,
127 pub parameters: serde_json::Value,
128 pub execution_mode: String,
129 pub registered_at: chrono::DateTime<chrono::Utc>,
130 pub last_heartbeat: chrono::DateTime<chrono::Utc>,
131 pub expires_at: chrono::DateTime<chrono::Utc>,
132}
133
134#[derive(Serialize)]
135struct RegisteredToolView {
136 #[serde(flatten)]
137 tool: RegisteredTool,
138 agent_executable: bool,
139}
140
141impl From<RegisteredTool> for RegisteredToolView {
142 fn from(tool: RegisteredTool) -> Self {
143 let agent_executable = tool_contract::is_agent_executable(&tool.execution_mode);
144 Self {
145 tool,
146 agent_executable,
147 }
148 }
149}
150
151#[derive(Clone)]
153pub struct ToolRegistry {
154 tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
155}
156
157impl Default for ToolRegistry {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl ToolRegistry {
164 pub fn new() -> Self {
165 Self {
166 tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
167 }
168 }
169
170 pub async fn register(&self, tool: RegisteredTool) {
172 let mut tools = self.tools.write().await;
173 tools.insert(tool.id.clone(), tool);
174 }
175
176 #[allow(dead_code)]
178 pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
179 let tools = self.tools.read().await;
180 tools.get(id).cloned()
181 }
182
183 pub async fn list(&self) -> Vec<RegisteredTool> {
185 let tools = self.tools.read().await;
186 let now = chrono::Utc::now();
187 tools
188 .values()
189 .filter(|t| t.expires_at > now)
190 .cloned()
191 .collect()
192 }
193
194 pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
196 let mut tools = self.tools.write().await;
197 if let Some(tool) = tools.get_mut(id) {
198 let now = chrono::Utc::now();
199 tool.last_heartbeat = now;
200 tool.expires_at = now + Duration::from_secs(90);
201 return Some(tool.clone());
202 }
203 None
204 }
205
206 pub async fn cleanup(&self) {
208 let mut tools = self.tools.write().await;
209 let now = chrono::Utc::now();
210 tools.retain(|_, t| t.expires_at > now);
211 }
212}
213
214#[derive(Clone)]
216pub struct AppState {
217 pub config: Arc<Config>,
218 pub cognition: Arc<CognitionRuntime>,
219 pub audit_log: AuditLog,
220 pub k8s: Arc<K8sManager>,
221 pub auth: AuthState,
222 pub bus: Arc<AgentBus>,
223 pub knative_tasks: KnativeTaskQueue,
224 pub tool_registry: ToolRegistry,
225 pub plugin_registry: AgentPluginRegistry,
226 pub server_fingerprint: String,
227}
228
229async fn audit_middleware(
231 State(state): State<AppState>,
232 request: Request<Body>,
233 next: Next,
234) -> Response {
235 let method = request.method().clone();
236 let path = request.uri().path().to_string();
237 let started = std::time::Instant::now();
238
239 let response = next.run(request).await;
240
241 let duration_ms = started.elapsed().as_millis() as u64;
242 let status = response.status().as_u16();
243 let outcome = if status < 400 {
244 AuditOutcome::Success
245 } else if status == 401 || status == 403 {
246 AuditOutcome::Denied
247 } else {
248 AuditOutcome::Failure
249 };
250
251 state
252 .audit_log
253 .record(audit::AuditEntry {
254 id: uuid::Uuid::new_v4().to_string(),
255 timestamp: chrono::Utc::now(),
256 category: AuditCategory::Api,
257 action: format!("{} {}", method, path),
258 principal: None,
259 outcome,
260 detail: Some(serde_json::json!({ "status": status })),
261 duration_ms: Some(duration_ms),
262 okr_id: None,
263 okr_run_id: None,
264 relay_id: None,
265 session_id: None,
266 })
267 .await;
268
269 response
270}
271
272struct PolicyRule {
275 pattern: &'static str,
276 methods: Option<&'static [&'static str]>,
277 permission: &'static str,
278}
279
280const POLICY_RULES: &[PolicyRule] = &[
281 PolicyRule {
283 pattern: "/health",
284 methods: None,
285 permission: "",
286 },
287 PolicyRule {
288 pattern: "/task",
289 methods: None,
290 permission: "",
291 },
292 PolicyRule {
293 pattern: "/v1/knative/",
294 methods: None,
295 permission: "",
296 },
297 PolicyRule {
299 pattern: "/v1/models",
300 methods: Some(&["GET"]),
301 permission: "agent:read",
302 },
303 PolicyRule {
304 pattern: "/v1/chat/completions",
305 methods: Some(&["POST"]),
306 permission: "agent:execute",
307 },
308 PolicyRule {
310 pattern: "/v1/tools",
311 methods: Some(&["GET"]),
312 permission: "agent:read",
313 },
314 PolicyRule {
315 pattern: "/v1/tools/register",
316 methods: Some(&["POST"]),
317 permission: "agent:execute",
318 },
319 PolicyRule {
320 pattern: "/v1/tools/",
321 methods: Some(&["POST"]),
322 permission: "agent:execute",
323 },
324 PolicyRule {
325 pattern: "/a2a/",
326 methods: None,
327 permission: "",
328 },
329 PolicyRule {
331 pattern: "/v1/k8s/scale",
332 methods: Some(&["POST"]),
333 permission: "admin:access",
334 },
335 PolicyRule {
336 pattern: "/v1/k8s/restart",
337 methods: Some(&["POST"]),
338 permission: "admin:access",
339 },
340 PolicyRule {
341 pattern: "/v1/k8s/",
342 methods: Some(&["GET"]),
343 permission: "admin:access",
344 },
345 PolicyRule {
347 pattern: "/v1/k8s/subagent",
348 methods: Some(&["POST", "DELETE"]),
349 permission: "admin:access",
350 },
351 PolicyRule {
353 pattern: "/v1/plugins",
354 methods: Some(&["GET"]),
355 permission: "agent:read",
356 },
357 PolicyRule {
359 pattern: "/v1/audit",
360 methods: None,
361 permission: "admin:access",
362 },
363 PolicyRule {
365 pattern: "/v1/audit/replay",
366 methods: Some(&["GET"]),
367 permission: "admin:access",
368 },
369 PolicyRule {
371 pattern: "/v1/cognition/start",
372 methods: Some(&["POST"]),
373 permission: "agent:execute",
374 },
375 PolicyRule {
376 pattern: "/v1/cognition/stop",
377 methods: Some(&["POST"]),
378 permission: "agent:execute",
379 },
380 PolicyRule {
381 pattern: "/v1/cognition/",
382 methods: Some(&["GET"]),
383 permission: "agent:read",
384 },
385 PolicyRule {
387 pattern: "/v1/swarm/personas",
388 methods: Some(&["POST"]),
389 permission: "agent:execute",
390 },
391 PolicyRule {
392 pattern: "/v1/swarm/",
393 methods: Some(&["POST"]),
394 permission: "agent:execute",
395 },
396 PolicyRule {
397 pattern: "/v1/swarm/",
398 methods: Some(&["GET"]),
399 permission: "agent:read",
400 },
401 PolicyRule {
403 pattern: "/v1/bus/stream",
404 methods: Some(&["GET"]),
405 permission: "agent:read",
406 },
407 PolicyRule {
409 pattern: "/v1/bus/publish",
410 methods: Some(&["POST"]),
411 permission: "agent:execute",
412 },
413 PolicyRule {
416 pattern: "/api/session/",
417 methods: Some(&["POST"]),
418 permission: "agent:execute",
419 },
420 PolicyRule {
421 pattern: "/api/session",
422 methods: Some(&["POST"]),
423 permission: "sessions:write",
424 },
425 PolicyRule {
426 pattern: "/api/session",
427 methods: Some(&["GET"]),
428 permission: "sessions:read",
429 },
430 PolicyRule {
432 pattern: "/mcp/v1/tools",
433 methods: Some(&["GET"]),
434 permission: "agent:read",
435 },
436 PolicyRule {
438 pattern: "/v1/agent/tasks",
439 methods: Some(&["GET"]),
440 permission: "agent:read",
441 },
442 PolicyRule {
443 pattern: "/v1/agent/tasks",
444 methods: Some(&["POST"]),
445 permission: "agent:execute",
446 },
447 PolicyRule {
448 pattern: "/v1/agent/tasks/",
449 methods: Some(&["GET"]),
450 permission: "agent:read",
451 },
452 PolicyRule {
454 pattern: "/v1/worker/",
455 methods: None,
456 permission: "agent:read",
457 },
458 PolicyRule {
459 pattern: "/v1/agent/workers",
460 methods: Some(&["GET"]),
461 permission: "agent:read",
462 },
463 PolicyRule {
465 pattern: "/v1/tasks/dispatch",
466 methods: Some(&["POST"]),
467 permission: "agent:execute",
468 },
469 PolicyRule {
471 pattern: "/v1/voice/",
472 methods: Some(&["GET"]),
473 permission: "agent:read",
474 },
475 PolicyRule {
476 pattern: "/v1/voice/",
477 methods: Some(&["POST", "DELETE"]),
478 permission: "agent:execute",
479 },
480 PolicyRule {
482 pattern: "/v1/agent/codebases/",
483 methods: Some(&["POST"]),
484 permission: "agent:execute",
485 },
486 PolicyRule {
488 pattern: "/v1/cognition/proposals/",
489 methods: Some(&["POST"]),
490 permission: "agent:execute",
491 },
492 PolicyRule {
494 pattern: "/api/version",
495 methods: None,
496 permission: "agent:read",
497 },
498 PolicyRule {
499 pattern: "/api/config",
500 methods: None,
501 permission: "agent:read",
502 },
503 PolicyRule {
504 pattern: "/api/provider",
505 methods: None,
506 permission: "agent:read",
507 },
508 PolicyRule {
509 pattern: "/api/agent",
510 methods: None,
511 permission: "agent:read",
512 },
513];
514
515fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
518 for rule in POLICY_RULES {
519 let matches = if rule.pattern.ends_with('/') {
520 path.starts_with(rule.pattern)
521 } else {
522 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
523 };
524 if matches {
525 if let Some(allowed_methods) = rule.methods
526 && !allowed_methods.contains(&method)
527 {
528 continue;
529 }
530 return Some(rule.permission);
531 }
532 }
533 None
534}
535
536fn required_permission(path: &str, method: &str) -> Result<Option<&'static str>, StatusCode> {
537 match match_policy_rule(path, method) {
538 Some("") => Ok(None),
539 Some(permission) => Ok(Some(permission)),
540 None => Err(StatusCode::FORBIDDEN),
541 }
542}
543
544async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
549 let path = request.uri().path();
550 let method = request.method().as_str();
551
552 let permission = match required_permission(path, method)? {
553 None => return Ok(next.run(request).await),
554 Some(permission) => permission,
555 };
556
557 let user = policy_user::from_request(&request);
558
559 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
560 tracing::warn!(
561 path = %path,
562 method = %method,
563 permission = %permission,
564 "Policy middleware denied request"
565 );
566 return Err(status);
567 }
568
569 Ok(next.run(request).await)
570}
571
572pub async fn serve(args: ServeArgs) -> Result<()> {
574 let t0 = std::time::Instant::now();
575 tracing::info!("[startup] begin");
576 let config = Config::load().await?;
577 tracing::info!(
578 elapsed_ms = t0.elapsed().as_millis(),
579 "[startup] config loaded"
580 );
581 let mut cognition = CognitionRuntime::new_from_env();
582 tracing::info!(
583 elapsed_ms = t0.elapsed().as_millis(),
584 "[startup] cognition runtime created"
585 );
586
587 let runtime_tools = Arc::new(crate::tool::ToolRegistry::with_defaults());
589 let plugin_registry = runtime_tools.plugins().clone();
590 cognition.set_tools(runtime_tools);
591 tracing::info!(
592 elapsed_ms = t0.elapsed().as_millis(),
593 "[startup] tools registered"
594 );
595 let cognition = Arc::new(cognition);
596
597 let audit_log = AuditLog::from_env();
599 let _ = audit::init_audit_log(audit_log.clone());
600
601 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
603 let k8s = Arc::new(K8sManager::new().await);
604 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
605 if k8s.is_available() {
606 tracing::info!("K8s self-deployment enabled");
607 }
608
609 let auth_state = AuthState::from_env();
611 tracing::info!(
612 token_len = auth_state.token().len(),
613 "Auth is mandatory. Only /health is served without a Bearer token."
614 );
615 tracing::info!(
616 audit_entries = audit_log.count().await,
617 "Audit log initialized"
618 );
619
620 let bus = AgentBus::new().into_arc();
622 crate::bus::set_global(bus.clone());
623
624 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
626
627 tracing::info!(
628 elapsed_ms = t0.elapsed().as_millis(),
629 "[startup] bus created"
630 );
631
632 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
633 tracing::info!(
634 elapsed_ms = t0.elapsed().as_millis(),
635 "[startup] auto-starting cognition"
636 );
637 if let Err(error) = cognition.start(None).await {
638 tracing::warn!(%error, "Failed to auto-start cognition loop");
639 } else {
640 tracing::info!("Perpetual cognition auto-started");
641 }
642 }
643
644 tracing::info!(
645 elapsed_ms = t0.elapsed().as_millis(),
646 "[startup] building routes"
647 );
648 let addr = format!("{}:{}", args.hostname, args.port);
649
650 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
652 let a2a_server = a2a::server::A2AServer::with_bus(agent_card.clone(), bus.clone());
653
654 let a2a_router = a2a_server.clone().router();
656 let a2a_nested_router = a2a_server.router();
657
658 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
660 .ok()
661 .and_then(|p| p.parse::<u16>().ok())
662 .unwrap_or(50051);
663 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
664 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
665 let grpc_service = grpc_store.into_service();
666 let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
667 tokio::spawn(async move {
668 tracing::info!(addr = %grpc_addr, "Starting gRPC A2A server");
669
670 let allowed_origins = [
672 HeaderValue::from_static("https://codetether.run"),
673 HeaderValue::from_static("https://api.codetether.run"),
674 HeaderValue::from_static("https://docs.codetether.run"),
675 HeaderValue::from_static("https://codetether.com"),
676 HeaderValue::from_static("http://localhost:3000"),
677 HeaderValue::from_static("http://localhost:3001"),
678 ];
679 let cors = CorsLayer::new()
680 .allow_origin(AllowOrigin::list(allowed_origins))
681 .allow_methods(AllowMethods::any())
682 .allow_headers(AllowHeaders::any())
683 .expose_headers(tower_http::cors::ExposeHeaders::list([
684 http::header::HeaderName::from_static("grpc-status"),
685 http::header::HeaderName::from_static("grpc-message"),
686 ]));
687
688 if let Err(error) = tonic::transport::Server::builder()
689 .accept_http1(true)
690 .layer(cors)
691 .layer(GrpcWebLayer::new())
692 .add_service(grpc_service)
693 .add_service(voice_service)
694 .serve(grpc_addr)
695 .await
696 {
697 tracing::warn!(addr = %grpc_addr, error = ?error, "gRPC A2A server exited");
698 }
699 });
700
701 let state = AppState {
702 config: Arc::new(config),
703 cognition,
704 audit_log,
705 k8s,
706 auth: auth_state.clone(),
707 bus,
708 knative_tasks: KnativeTaskQueue::new(),
709 tool_registry: ToolRegistry::new(),
710 plugin_registry,
711 server_fingerprint: hash_bytes(env!("CARGO_PKG_VERSION").as_bytes()),
712 };
713
714 let tool_registry = state.tool_registry.clone();
716 tokio::spawn(async move {
717 let mut interval = tokio::time::interval(Duration::from_secs(15));
718 loop {
719 interval.tick().await;
720 tool_registry.cleanup().await;
721 tracing::debug!("Tool reaper ran cleanup");
722 }
723 });
724
725 let app = Router::new()
726 .route("/health", get(health))
728 .route("/task", post(receive_task_event))
730 .route("/v1/knative/tasks", get(list_knative_tasks))
732 .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
733 .route(
734 "/v1/knative/tasks/{task_id}/claim",
735 post(claim_knative_task),
736 )
737 .route(
738 "/v1/knative/tasks/{task_id}/complete",
739 post(complete_knative_task),
740 )
741 .route("/api/version", get(get_version))
743 .route("/api/session", get(list_sessions).post(create_session))
744 .route("/api/session/{id}", get(get_session))
745 .route("/api/session/{id}/prompt", post(prompt_session))
746 .route("/api/config", get(get_config))
747 .route("/api/provider", get(list_providers))
748 .route("/api/agent", get(list_agents))
749 .route("/v1/models", get(list_openai_models))
751 .route("/v1/chat/completions", post(openai_chat_completions))
752 .route("/v1/cognition/start", post(start_cognition))
754 .route("/v1/cognition/stop", post(stop_cognition))
755 .route("/v1/cognition/status", get(get_cognition_status))
756 .route("/v1/cognition/stream", get(stream_cognition))
757 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
758 .route("/v1/swarm/personas", post(create_persona))
760 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
761 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
762 .route("/v1/swarm/lineage", get(get_swarm_lineage))
763 .route("/v1/cognition/beliefs", get(list_beliefs))
765 .route("/v1/cognition/beliefs/{id}", get(get_belief))
766 .route("/v1/cognition/attention", get(list_attention))
767 .route("/v1/cognition/proposals", get(list_proposals))
768 .route(
769 "/v1/cognition/proposals/{id}/approve",
770 post(approve_proposal),
771 )
772 .route("/v1/cognition/receipts", get(list_receipts))
773 .route("/v1/cognition/workspace", get(get_workspace))
774 .route("/v1/cognition/governance", get(get_governance))
775 .route("/v1/cognition/personas/{id}", get(get_persona))
776 .route("/v1/audit", get(list_audit_entries))
778 .route("/v1/audit/replay", get(replay_session_events))
780 .route("/v1/audit/replay/index", get(list_session_event_files))
781 .route("/v1/k8s/status", get(get_k8s_status))
783 .route("/v1/k8s/scale", post(k8s_scale))
784 .route("/v1/k8s/restart", post(k8s_restart))
785 .route("/v1/k8s/pods", get(k8s_list_pods))
786 .route("/v1/k8s/actions", get(k8s_actions))
787 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
788 .route(
789 "/v1/k8s/subagent/{id}",
790 axum::routing::delete(k8s_delete_subagent),
791 )
792 .route("/v1/plugins", get(list_plugins))
794 .route("/v1/tools", get(list_tools))
796 .route("/v1/tools/register", post(register_tool))
797 .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
798 .route("/mcp/v1/tools", get(list_tools))
800 .route(
802 "/v1/agent/tasks",
803 get(list_agent_tasks).post(create_agent_task),
804 )
805 .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
806 .route(
807 "/v1/agent/tasks/{task_id}/output",
808 get(get_agent_task_output).post(agent_task_output),
809 )
810 .route(
811 "/v1/agent/tasks/{task_id}/output/stream",
812 get(stream_agent_task_output),
813 )
814 .route("/v1/worker/connected", get(list_connected_workers))
816 .route("/v1/agent/workers", get(list_connected_workers))
817 .route(
819 "/v1/worker/tasks/stream",
820 get(worker_task_stream::worker_task_stream),
821 )
822 .route(
823 "/v1/worker/tasks/claim",
824 post(worker_task_claim::worker_task_claim),
825 )
826 .route(
827 "/v1/worker/tasks/release",
828 post(worker_task_release::worker_task_release),
829 )
830 .route("/v1/tasks/dispatch", post(dispatch_task))
832 .route("/v1/voice/sessions", post(create_voice_session_rest))
834 .route(
835 "/v1/voice/sessions/{room_name}",
836 get(get_voice_session_rest).delete(delete_voice_session_rest),
837 )
838 .route("/v1/voice/voices", get(list_voices_rest))
839 .route(
841 "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
842 post(resume_codebase_session),
843 )
844 .route("/v1/bus/stream", get(stream_bus_events))
846 .with_state(state.clone())
847 .merge(a2a_router)
850 .nest("/a2a", a2a_nested_router)
851 .layer(middleware::from_fn_with_state(
853 state.clone(),
854 audit_middleware,
855 ))
856 .layer(middleware::from_fn(policy_middleware))
857 .layer(middleware::from_fn(auth::require_auth))
858 .layer(axum::Extension(state.auth.clone()))
859 .layer(
862 CorsLayer::new()
863 .allow_origin([
864 "https://codetether.run".parse::<HeaderValue>().unwrap(),
865 "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
866 "https://docs.codetether.run"
867 .parse::<HeaderValue>()
868 .unwrap(),
869 "https://codetether.com".parse::<HeaderValue>().unwrap(),
870 "http://localhost:3000".parse::<HeaderValue>().unwrap(),
871 "http://localhost:3001".parse::<HeaderValue>().unwrap(),
872 "http://localhost:8000".parse::<HeaderValue>().unwrap(),
873 ])
874 .allow_credentials(true)
875 .allow_methods([
876 Method::GET,
877 Method::POST,
878 Method::PUT,
879 Method::PATCH,
880 Method::DELETE,
881 Method::OPTIONS,
882 ])
883 .allow_headers([
884 http::header::AUTHORIZATION,
885 http::header::CONTENT_TYPE,
886 http::header::ACCEPT,
887 http::header::ORIGIN,
888 http::header::CACHE_CONTROL,
889 http::header::HeaderName::from_static("x-worker-id"),
890 http::header::HeaderName::from_static("x-agent-name"),
891 http::header::HeaderName::from_static("x-workspaces"),
892 http::header::HeaderName::from_static("x-requested-with"),
893 ]),
894 )
895 .layer(TraceLayer::new_for_http());
896
897 tracing::info!(
898 elapsed_ms = t0.elapsed().as_millis(),
899 "[startup] router built, binding"
900 );
901 let listener = tokio::net::TcpListener::bind(&addr).await?;
902 tracing::info!(
903 elapsed_ms = t0.elapsed().as_millis(),
904 "[startup] listening on http://{}",
905 addr
906 );
907
908 axum::serve(listener, app).await?;
909
910 Ok(())
911}
912
913async fn health() -> &'static str {
915 "ok"
916}
917
918async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
920 Json(state.knative_tasks.list().await)
921}
922
923async fn get_knative_task(
925 State(state): State<AppState>,
926 Path(task_id): Path<String>,
927) -> Result<Json<KnativeTask>, (StatusCode, String)> {
928 state
929 .knative_tasks
930 .get(&task_id)
931 .await
932 .map(Json)
933 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
934}
935
936async fn claim_knative_task(
938 State(state): State<AppState>,
939 Path(task_id): Path<String>,
940) -> Result<Json<KnativeTask>, (StatusCode, String)> {
941 let updated = state
943 .knative_tasks
944 .update_status(&task_id, "processing")
945 .await;
946 if !updated {
947 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
948 }
949
950 state
951 .knative_tasks
952 .get(&task_id)
953 .await
954 .map(Json)
955 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
956}
957
958async fn complete_knative_task(
960 State(state): State<AppState>,
961 Path(task_id): Path<String>,
962) -> Result<Json<KnativeTask>, (StatusCode, String)> {
963 let updated = state
965 .knative_tasks
966 .update_status(&task_id, "completed")
967 .await;
968 if !updated {
969 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
970 }
971
972 state
973 .knative_tasks
974 .get(&task_id)
975 .await
976 .map(Json)
977 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
978}
979
980async fn receive_task_event(
983 State(state): State<AppState>,
984 headers: HeaderMap,
985 Json(body): Json<serde_json::Value>,
986) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
987 let event =
988 parse_cloud_event(&headers, body).map_err(|error| (StatusCode::BAD_REQUEST, error))?;
989 tracing::info!(
990 event_type = %event.event_type,
991 event_id = %event.id,
992 "Received CloudEvent from Knative"
993 );
994
995 state
997 .audit_log
998 .log(
999 audit::AuditCategory::Api,
1000 format!("cloudevents:{}", event.event_type),
1001 AuditOutcome::Success,
1002 None,
1003 None,
1004 )
1005 .await;
1006
1007 match event.event_type.as_str() {
1009 "codetether.task.created" | "task.created" => {
1010 let task_id = event
1011 .data
1012 .get("task_id")
1013 .or_else(|| event.data.get("id"))
1014 .and_then(|v| v.as_str())
1015 .unwrap_or(event.id.as_str())
1016 .to_string();
1017 let title = event
1018 .data
1019 .get("title")
1020 .or_else(|| event.data.get("prompt"))
1021 .and_then(|v| v.as_str())
1022 .unwrap_or("")
1023 .to_string();
1024 let description = event
1025 .data
1026 .get("description")
1027 .or_else(|| event.data.get("prompt"))
1028 .and_then(|v| v.as_str())
1029 .unwrap_or("")
1030 .to_string();
1031 let agent_type = event
1032 .data
1033 .get("agent_type")
1034 .or_else(|| event.data.get("agent"))
1035 .and_then(|v| v.as_str())
1036 .unwrap_or("build")
1037 .to_string();
1038 let priority = event
1039 .data
1040 .get("priority")
1041 .and_then(|v| v.as_i64())
1042 .unwrap_or(0) as i32;
1043
1044 let task = KnativeTask {
1045 task_id: task_id.clone(),
1046 title,
1047 description,
1048 agent_type,
1049 priority,
1050 received_at: chrono::Utc::now(),
1051 status: "queued".to_string(),
1052 };
1053
1054 state.knative_tasks.push(task).await;
1055 tracing::info!(task_id = %task_id, "Task queued for execution");
1056 }
1057 "codetether.task.updated" | "task.updated" => {
1058 if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1059 let status = event
1060 .data
1061 .get("status")
1062 .and_then(|v| v.as_str())
1063 .unwrap_or("updated");
1064 let _ = state.knative_tasks.update_status(task_id, status).await;
1065 tracing::info!(task_id = %task_id, status = %status, "Task status updated");
1066 }
1067 }
1068 "codetether.task.cancelled" => {
1069 tracing::info!("Task cancellation event received");
1070 if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1072 let _ = state
1073 .knative_tasks
1074 .update_status(task_id, "cancelled")
1075 .await;
1076 tracing::info!(task_id = %task_id, "Task cancelled");
1077 }
1078 }
1079 _ => {
1080 tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
1081 }
1082 }
1083
1084 Ok(Json(CloudEventResponse {
1085 status: "accepted".to_string(),
1086 event_id: event.id,
1087 }))
1088}
1089
1090#[derive(Serialize)]
1092struct CloudEventResponse {
1093 status: String,
1094 event_id: String,
1095}
1096
1097#[derive(Serialize)]
1099struct VersionInfo {
1100 version: &'static str,
1101 name: &'static str,
1102 binary_hash: Option<String>,
1103}
1104
1105async fn get_version() -> Json<VersionInfo> {
1106 let binary_hash = std::env::current_exe()
1107 .ok()
1108 .and_then(|p| hash_file(&p).ok());
1109 Json(VersionInfo {
1110 version: env!("CARGO_PKG_VERSION"),
1111 name: env!("CARGO_PKG_NAME"),
1112 binary_hash,
1113 })
1114}
1115
1116#[derive(Deserialize)]
1118struct ListSessionsQuery {
1119 limit: Option<usize>,
1120 offset: Option<usize>,
1121}
1122
1123async fn list_sessions(
1124 Query(query): Query<ListSessionsQuery>,
1125) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1126 let sessions = crate::session::list_sessions()
1127 .await
1128 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1129
1130 let offset = query.offset.unwrap_or(0);
1131 let limit = query.limit.unwrap_or(100);
1132 Ok(Json(
1133 sessions.into_iter().skip(offset).take(limit).collect(),
1134 ))
1135}
1136
1137#[derive(Deserialize)]
1139struct CreateSessionRequest {
1140 title: Option<String>,
1141 agent: Option<String>,
1142}
1143
1144async fn create_session(
1145 Json(req): Json<CreateSessionRequest>,
1146) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1147 let mut session = crate::session::Session::new()
1148 .await
1149 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1150
1151 session.title = req.title;
1152 if let Some(agent) = req.agent {
1153 session.set_agent_name(agent);
1154 }
1155
1156 session
1157 .save()
1158 .await
1159 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1160
1161 Ok(Json(session))
1162}
1163
1164async fn get_session(
1166 axum::extract::Path(id): axum::extract::Path<String>,
1167) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1168 let session = crate::session::Session::load(&id)
1169 .await
1170 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1171
1172 Ok(Json(session))
1173}
1174
1175#[derive(Deserialize)]
1177struct PromptRequest {
1178 message: String,
1179}
1180
1181async fn prompt_session(
1182 axum::extract::Path(id): axum::extract::Path<String>,
1183 Json(req): Json<PromptRequest>,
1184) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1185 if req.message.trim().is_empty() {
1187 return Err((
1188 StatusCode::BAD_REQUEST,
1189 "Message cannot be empty".to_string(),
1190 ));
1191 }
1192
1193 tracing::info!(
1195 session_id = %id,
1196 message_len = req.message.len(),
1197 "Received prompt request"
1198 );
1199
1200 let mut session = crate::session::Session::load(&id)
1201 .await
1202 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1203
1204 let result = session.prompt(&req.message).await.map_err(|e| {
1205 tracing::error!(session_id = %id, error = %e, "Session prompt failed");
1206 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1207 })?;
1208
1209 session.save().await.map_err(|e| {
1210 tracing::error!(session_id = %id, error = %e, "Failed to save session after prompt");
1211 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1212 })?;
1213
1214 Ok(Json(result))
1215}
1216
1217async fn get_config(State(state): State<AppState>) -> Json<Config> {
1219 Json((*state.config).clone())
1220}
1221
1222async fn list_providers() -> Json<Vec<String>> {
1224 Json(vec![
1225 "openai".to_string(),
1226 "anthropic".to_string(),
1227 "google".to_string(),
1228 ])
1229}
1230
1231async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1233 let registry = crate::agent::AgentRegistry::with_builtins();
1234 Json(registry.list().into_iter().cloned().collect())
1235}
1236
1237type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1238type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1239
1240#[derive(Debug, Deserialize)]
1241struct OpenAiChatCompletionRequest {
1242 model: String,
1243 messages: Vec<OpenAiRequestMessage>,
1244 #[serde(default)]
1245 tools: Vec<OpenAiRequestTool>,
1246 temperature: Option<f32>,
1247 top_p: Option<f32>,
1248 max_tokens: Option<usize>,
1249 max_completion_tokens: Option<usize>,
1250 stop: Option<OpenAiStop>,
1251 stream: Option<bool>,
1252}
1253
1254#[derive(Debug, Deserialize)]
1255#[serde(untagged)]
1256enum OpenAiStop {
1257 Single(String),
1258 Multiple(Vec<String>),
1259}
1260
1261#[derive(Debug, Deserialize)]
1262struct OpenAiRequestMessage {
1263 role: String,
1264 #[serde(default)]
1265 content: Option<serde_json::Value>,
1266 #[serde(default)]
1267 tool_call_id: Option<String>,
1268 #[serde(default)]
1269 tool_calls: Vec<OpenAiRequestToolCall>,
1270}
1271
1272#[derive(Debug, Deserialize)]
1273struct OpenAiRequestToolCall {
1274 id: String,
1275 #[serde(rename = "type", default = "default_function_type")]
1276 kind: String,
1277 function: OpenAiRequestToolCallFunction,
1278}
1279
1280#[derive(Debug, Deserialize)]
1281struct OpenAiRequestToolCallFunction {
1282 name: String,
1283 #[serde(default = "default_json_object_string")]
1284 arguments: String,
1285}
1286
1287#[derive(Debug, Deserialize)]
1288struct OpenAiRequestTool {
1289 #[serde(rename = "type", default = "default_function_type")]
1290 kind: String,
1291 function: OpenAiRequestToolDefinition,
1292}
1293
1294#[derive(Debug, Deserialize)]
1295struct OpenAiRequestToolDefinition {
1296 name: String,
1297 #[serde(default)]
1298 description: String,
1299 #[serde(default = "default_json_object")]
1300 parameters: serde_json::Value,
1301}
1302
1303#[derive(Debug, Serialize)]
1304struct OpenAiModelsResponse {
1305 object: String,
1306 data: Vec<OpenAiModel>,
1307}
1308
1309#[derive(Debug, Serialize)]
1310struct OpenAiModel {
1311 id: String,
1312 object: String,
1313 created: i64,
1314 owned_by: String,
1315}
1316
1317#[derive(Debug, Serialize)]
1318struct OpenAiChatCompletionResponse {
1319 id: String,
1320 object: String,
1321 created: i64,
1322 model: String,
1323 choices: Vec<OpenAiChatCompletionChoice>,
1324 usage: OpenAiUsage,
1325}
1326
1327#[derive(Debug, Serialize)]
1328struct OpenAiChatCompletionChoice {
1329 index: usize,
1330 message: OpenAiChatCompletionMessage,
1331 finish_reason: String,
1332}
1333
1334#[derive(Debug, Serialize)]
1335struct OpenAiChatCompletionMessage {
1336 role: String,
1337 #[serde(skip_serializing_if = "Option::is_none")]
1338 content: Option<String>,
1339 #[serde(skip_serializing_if = "Option::is_none")]
1340 tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1341}
1342
1343#[derive(Debug, Serialize)]
1344struct OpenAiResponseToolCall {
1345 id: String,
1346 #[serde(rename = "type")]
1347 kind: String,
1348 function: OpenAiResponseToolCallFunction,
1349}
1350
1351#[derive(Debug, Serialize)]
1352struct OpenAiResponseToolCallFunction {
1353 name: String,
1354 arguments: String,
1355}
1356
1357#[derive(Debug, Serialize)]
1358struct OpenAiUsage {
1359 prompt_tokens: usize,
1360 completion_tokens: usize,
1361 total_tokens: usize,
1362}
1363
1364fn default_function_type() -> String {
1365 "function".to_string()
1366}
1367
1368fn default_json_object() -> serde_json::Value {
1369 serde_json::json!({})
1370}
1371
1372fn default_json_object_string() -> String {
1373 "{}".to_string()
1374}
1375
1376fn openai_error(
1377 status: StatusCode,
1378 message: impl Into<String>,
1379 error_type: &str,
1380 code: &str,
1381) -> OpenAiApiError {
1382 (
1383 status,
1384 Json(serde_json::json!({
1385 "error": {
1386 "message": message.into(),
1387 "type": error_type,
1388 "code": code,
1389 }
1390 })),
1391 )
1392}
1393
1394fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1395 openai_error(
1396 StatusCode::BAD_REQUEST,
1397 message,
1398 "invalid_request_error",
1399 "invalid_request",
1400 )
1401}
1402
1403fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1404 openai_error(
1405 StatusCode::INTERNAL_SERVER_ERROR,
1406 message,
1407 "server_error",
1408 "internal_error",
1409 )
1410}
1411
1412fn canonicalize_provider_name(provider: &str) -> &str {
1413 if provider == "zhipuai" {
1414 "zai"
1415 } else {
1416 provider
1417 }
1418}
1419
1420fn normalize_model_reference(model: &str) -> String {
1421 let trimmed = model.trim();
1422 if let Some((provider, model_id)) = trimmed.split_once(':')
1423 && !provider.is_empty()
1424 && !model_id.is_empty()
1425 && !trimmed.contains('/')
1426 {
1427 return format!("{provider}/{model_id}");
1428 }
1429 trimmed.to_string()
1430}
1431
1432fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1433 let provider = canonicalize_provider_name(provider);
1434 let trimmed = model_id.trim_start_matches('/');
1435 if trimmed.starts_with(&format!("{provider}/")) {
1436 trimmed.to_string()
1437 } else {
1438 format!("{provider}/{trimmed}")
1439 }
1440}
1441
1442fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1443 let obj = value.as_object()?;
1444 let part_type = obj
1445 .get("type")
1446 .and_then(serde_json::Value::as_str)
1447 .unwrap_or("text");
1448
1449 match part_type {
1450 "text" | "input_text" => obj
1451 .get("text")
1452 .and_then(serde_json::Value::as_str)
1453 .map(|text| crate::provider::ContentPart::Text {
1454 text: text.to_string(),
1455 }),
1456 "image_url" => obj
1457 .get("image_url")
1458 .and_then(serde_json::Value::as_object)
1459 .and_then(|image| image.get("url"))
1460 .and_then(serde_json::Value::as_str)
1461 .map(|url| crate::provider::ContentPart::Image {
1462 url: url.to_string(),
1463 mime_type: None,
1464 }),
1465 _ => obj
1466 .get("text")
1467 .and_then(serde_json::Value::as_str)
1468 .map(|text| crate::provider::ContentPart::Text {
1469 text: text.to_string(),
1470 }),
1471 }
1472}
1473
1474fn parse_openai_content_parts(
1475 content: &Option<serde_json::Value>,
1476) -> Vec<crate::provider::ContentPart> {
1477 let Some(value) = content else {
1478 return Vec::new();
1479 };
1480
1481 match value {
1482 serde_json::Value::String(text) => {
1483 vec![crate::provider::ContentPart::Text { text: text.clone() }]
1484 }
1485 serde_json::Value::Array(parts) => {
1486 parts.iter().filter_map(parse_openai_content_part).collect()
1487 }
1488 serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1489 _ => Vec::new(),
1490 }
1491}
1492
1493fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1494 parse_openai_content_parts(content)
1495 .into_iter()
1496 .filter_map(|part| match part {
1497 crate::provider::ContentPart::Text { text } => Some(text),
1498 _ => None,
1499 })
1500 .collect::<Vec<_>>()
1501 .join("\n")
1502}
1503
1504fn convert_openai_messages(
1505 messages: &[OpenAiRequestMessage],
1506) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1507 let mut converted = Vec::with_capacity(messages.len());
1508
1509 for (index, message) in messages.iter().enumerate() {
1510 let role = message.role.to_ascii_lowercase();
1511 match role.as_str() {
1512 "system" | "user" => {
1513 let content = parse_openai_content_parts(&message.content);
1514 if content.is_empty() {
1515 return Err(openai_bad_request(format!(
1516 "messages[{index}] must include text content"
1517 )));
1518 }
1519
1520 converted.push(crate::provider::Message {
1521 role: if role == "system" {
1522 crate::provider::Role::System
1523 } else {
1524 crate::provider::Role::User
1525 },
1526 content,
1527 });
1528 }
1529 "assistant" => {
1530 let mut content = parse_openai_content_parts(&message.content);
1531 for tool_call in &message.tool_calls {
1532 if tool_call.kind != "function" {
1533 return Err(openai_bad_request(format!(
1534 "messages[{index}].tool_calls only support `function`"
1535 )));
1536 }
1537 if tool_call.function.name.trim().is_empty() {
1538 return Err(openai_bad_request(format!(
1539 "messages[{index}].tool_calls[].function.name is required"
1540 )));
1541 }
1542
1543 content.push(crate::provider::ContentPart::ToolCall {
1544 id: tool_call.id.clone(),
1545 name: tool_call.function.name.clone(),
1546 arguments: tool_call.function.arguments.clone(),
1547 thought_signature: None,
1548 });
1549 }
1550
1551 if content.is_empty() {
1552 return Err(openai_bad_request(format!(
1553 "messages[{index}] must include `content` or `tool_calls` for assistant role"
1554 )));
1555 }
1556
1557 converted.push(crate::provider::Message {
1558 role: crate::provider::Role::Assistant,
1559 content,
1560 });
1561 }
1562 "tool" => {
1563 let tool_call_id = message
1564 .tool_call_id
1565 .as_ref()
1566 .map(|s| s.trim())
1567 .filter(|s| !s.is_empty())
1568 .ok_or_else(|| {
1569 openai_bad_request(format!(
1570 "messages[{index}].tool_call_id is required for tool role"
1571 ))
1572 })?
1573 .to_string();
1574
1575 converted.push(crate::provider::Message {
1576 role: crate::provider::Role::Tool,
1577 content: vec![crate::provider::ContentPart::ToolResult {
1578 tool_call_id,
1579 content: parse_openai_tool_content(&message.content),
1580 }],
1581 });
1582 }
1583 _ => {
1584 return Err(openai_bad_request(format!(
1585 "messages[{index}].role `{}` is not supported",
1586 message.role
1587 )));
1588 }
1589 }
1590 }
1591
1592 Ok(converted)
1593}
1594
1595fn convert_openai_tools(
1596 tools: &[OpenAiRequestTool],
1597) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1598 let mut converted = Vec::with_capacity(tools.len());
1599
1600 for (index, tool) in tools.iter().enumerate() {
1601 if tool.kind != "function" {
1602 return Err(openai_bad_request(format!(
1603 "tools[{index}].type `{}` is not supported",
1604 tool.kind
1605 )));
1606 }
1607 if tool.function.name.trim().is_empty() {
1608 return Err(openai_bad_request(format!(
1609 "tools[{index}].function.name is required"
1610 )));
1611 }
1612
1613 converted.push(crate::provider::ToolDefinition {
1614 name: tool.function.name.clone(),
1615 description: tool.function.description.clone(),
1616 parameters: tool.function.parameters.clone(),
1617 });
1618 }
1619
1620 Ok(converted)
1621}
1622
1623fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1624 match reason {
1625 crate::provider::FinishReason::Stop => "stop",
1626 crate::provider::FinishReason::Length => "length",
1627 crate::provider::FinishReason::ToolCalls => "tool_calls",
1628 crate::provider::FinishReason::ContentFilter => "content_filter",
1629 crate::provider::FinishReason::Error => "error",
1630 }
1631}
1632
1633fn convert_response_message(
1634 message: &crate::provider::Message,
1635) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1636 let mut texts = Vec::new();
1637 let mut tool_calls = Vec::new();
1638
1639 for part in &message.content {
1640 match part {
1641 crate::provider::ContentPart::Text { text } => {
1642 if !text.is_empty() {
1643 texts.push(text.clone());
1644 }
1645 }
1646 crate::provider::ContentPart::ToolCall {
1647 id,
1648 name,
1649 arguments,
1650 ..
1651 } => {
1652 tool_calls.push(OpenAiResponseToolCall {
1653 id: id.clone(),
1654 kind: "function".to_string(),
1655 function: OpenAiResponseToolCallFunction {
1656 name: name.clone(),
1657 arguments: arguments.clone(),
1658 },
1659 });
1660 }
1661 _ => {}
1662 }
1663 }
1664
1665 let content = if texts.is_empty() {
1666 None
1667 } else {
1668 Some(texts.join("\n"))
1669 };
1670 let tool_calls = if tool_calls.is_empty() {
1671 None
1672 } else {
1673 Some(tool_calls)
1674 };
1675
1676 (content, tool_calls)
1677}
1678
1679fn openai_stream_chunk(
1680 id: &str,
1681 created: i64,
1682 model: &str,
1683 delta: serde_json::Value,
1684 finish_reason: Option<&str>,
1685) -> serde_json::Value {
1686 let finish_reason = finish_reason
1687 .map(|value| serde_json::Value::String(value.to_string()))
1688 .unwrap_or(serde_json::Value::Null);
1689
1690 serde_json::json!({
1691 "id": id,
1692 "object": "chat.completion.chunk",
1693 "created": created,
1694 "model": model,
1695 "choices": [{
1696 "index": 0,
1697 "delta": delta,
1698 "finish_reason": finish_reason,
1699 }]
1700 })
1701}
1702
1703fn openai_stream_event(payload: &serde_json::Value) -> Event {
1704 Event::default().data(payload.to_string())
1705}
1706
1707async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1708 let registry = crate::provider::ProviderRegistry::from_vault()
1709 .await
1710 .map_err(|error| {
1711 tracing::error!(error = %error, "Failed to load providers from Vault");
1712 openai_internal_error(format!("failed to load providers: {error}"))
1713 })?;
1714
1715 let model_futures = registry.list().into_iter().map(|provider_id| {
1716 let provider_id = provider_id.to_string();
1717 let provider = registry.get(&provider_id);
1718
1719 async move {
1720 let Some(provider) = provider else {
1721 return Vec::new();
1722 };
1723
1724 let now = chrono::Utc::now().timestamp();
1725 match provider.list_models().await {
1726 Ok(models) => models
1727 .into_iter()
1728 .map(|model| OpenAiModel {
1729 id: make_openai_model_id(&provider_id, &model.id),
1730 object: "model".to_string(),
1731 created: now,
1732 owned_by: canonicalize_provider_name(&provider_id).to_string(),
1733 })
1734 .collect(),
1735 Err(error) => {
1736 tracing::warn!(
1737 provider = %provider_id,
1738 error = %error,
1739 "Failed to list models for provider"
1740 );
1741 Vec::new()
1742 }
1743 }
1744 }
1745 });
1746
1747 let mut data: Vec<OpenAiModel> = join_all(model_futures)
1748 .await
1749 .into_iter()
1750 .flatten()
1751 .collect();
1752 data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1753
1754 Ok(Json(OpenAiModelsResponse {
1755 object: "list".to_string(),
1756 data,
1757 }))
1758}
1759
1760async fn openai_chat_completions(
1761 Json(req): Json<OpenAiChatCompletionRequest>,
1762) -> Result<Response, OpenAiApiError> {
1763 let is_stream = req.stream.unwrap_or(false);
1764 if req.model.trim().is_empty() {
1765 return Err(openai_bad_request("`model` is required"));
1766 }
1767 if req.messages.is_empty() {
1768 return Err(openai_bad_request("`messages` must not be empty"));
1769 }
1770
1771 let registry = crate::provider::ProviderRegistry::from_vault()
1772 .await
1773 .map_err(|error| {
1774 tracing::error!(error = %error, "Failed to load providers from Vault");
1775 openai_internal_error(format!("failed to load providers: {error}"))
1776 })?;
1777
1778 let providers = registry.list();
1779 if providers.is_empty() {
1780 return Err(openai_error(
1781 StatusCode::SERVICE_UNAVAILABLE,
1782 "No providers are configured in Vault",
1783 "server_error",
1784 "no_providers",
1785 ));
1786 }
1787
1788 let normalized_model = normalize_model_reference(&req.model);
1789 let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1790 let model_id = if maybe_provider.is_some() {
1791 if model_id.trim().is_empty() {
1792 return Err(openai_bad_request(
1793 "Model must be in `provider/model` format",
1794 ));
1795 }
1796 model_id.to_string()
1797 } else {
1798 req.model.trim().to_string()
1799 };
1800
1801 let selected_provider = if let Some(provider_name) = maybe_provider {
1802 let provider_name = canonicalize_provider_name(provider_name);
1803 if providers.contains(&provider_name) {
1804 provider_name.to_string()
1805 } else {
1806 return Err(openai_bad_request(format!(
1807 "Provider `{provider_name}` is not configured in Vault"
1808 )));
1809 }
1810 } else if providers.len() == 1 {
1811 providers[0].to_string()
1812 } else if providers.contains(&"openai") {
1813 "openai".to_string()
1814 } else {
1815 return Err(openai_bad_request(
1816 "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1817 ));
1818 };
1819
1820 let provider = registry.get(&selected_provider).ok_or_else(|| {
1821 openai_internal_error(format!(
1822 "Provider `{selected_provider}` was available but could not be loaded"
1823 ))
1824 })?;
1825
1826 let messages = convert_openai_messages(&req.messages)?;
1827 let tools = convert_openai_tools(&req.tools)?;
1828 let stop = match req.stop {
1829 Some(OpenAiStop::Single(stop)) => vec![stop],
1830 Some(OpenAiStop::Multiple(stops)) => stops,
1831 None => Vec::new(),
1832 };
1833
1834 let completion_request = crate::provider::CompletionRequest {
1835 messages,
1836 tools,
1837 model: model_id.clone(),
1838 temperature: req.temperature,
1839 top_p: req.top_p,
1840 max_tokens: req.max_completion_tokens.or(req.max_tokens),
1841 stop,
1842 };
1843
1844 let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1845 let now = chrono::Utc::now().timestamp();
1846 let openai_model = make_openai_model_id(&selected_provider, &model_id);
1847
1848 if is_stream {
1849 let mut provider_stream =
1850 provider
1851 .complete_stream(completion_request)
1852 .await
1853 .map_err(|error| {
1854 tracing::warn!(
1855 provider = %selected_provider,
1856 model = %model_id,
1857 error = %error,
1858 "Streaming completion request failed"
1859 );
1860 openai_internal_error(error.to_string())
1861 })?;
1862
1863 let (stream_id, stream_model) = (chat_id.clone(), openai_model.clone());
1864 let event_stream = async_stream::stream! {
1865 let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1866 let mut next_tool_call_index = 0usize;
1867 let mut saw_text = false;
1868 let mut saw_tool_calls = false;
1869
1870 let role_chunk = openai_stream_chunk(
1871 &stream_id,
1872 now,
1873 &stream_model,
1874 serde_json::json!({ "role": "assistant" }),
1875 None,
1876 );
1877 yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1878
1879 while let Some(chunk) = provider_stream.next().await {
1880 match chunk {
1881 crate::provider::StreamChunk::Text(text) => {
1882 if text.is_empty() {
1883 continue;
1884 }
1885 saw_text = true;
1886 let content_chunk = openai_stream_chunk(
1887 &stream_id,
1888 now,
1889 &stream_model,
1890 serde_json::json!({ "content": text }),
1891 None,
1892 );
1893 yield Ok(openai_stream_event(&content_chunk));
1894 }
1895 crate::provider::StreamChunk::ToolCallStart { id, name } => {
1896 saw_tool_calls = true;
1897 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1898 let value = next_tool_call_index;
1899 next_tool_call_index += 1;
1900 value
1901 });
1902 let tool_chunk = openai_stream_chunk(
1903 &stream_id,
1904 now,
1905 &stream_model,
1906 serde_json::json!({
1907 "tool_calls": [{
1908 "index": index,
1909 "id": id,
1910 "type": "function",
1911 "function": {
1912 "name": name,
1913 "arguments": "",
1914 }
1915 }]
1916 }),
1917 None,
1918 );
1919 yield Ok(openai_stream_event(&tool_chunk));
1920 }
1921 crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1922 if arguments_delta.is_empty() {
1923 continue;
1924 }
1925 saw_tool_calls = true;
1926 let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1927 let value = next_tool_call_index;
1928 next_tool_call_index += 1;
1929 value
1930 });
1931 let tool_delta_chunk = openai_stream_chunk(
1932 &stream_id,
1933 now,
1934 &stream_model,
1935 serde_json::json!({
1936 "tool_calls": [{
1937 "index": index,
1938 "id": id,
1939 "type": "function",
1940 "function": {
1941 "arguments": arguments_delta,
1942 }
1943 }]
1944 }),
1945 None,
1946 );
1947 yield Ok(openai_stream_event(&tool_delta_chunk));
1948 }
1949 crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1950 crate::provider::StreamChunk::Done { .. }
1951 | crate::provider::StreamChunk::Thinking(_) => {}
1952 crate::provider::StreamChunk::Error(error) => {
1953 let error_chunk = openai_stream_chunk(
1954 &stream_id,
1955 now,
1956 &stream_model,
1957 serde_json::json!({ "content": error }),
1958 Some("error"),
1959 );
1960 yield Ok(openai_stream_event(&error_chunk));
1961 yield Ok(Event::default().data("[DONE]"));
1962 return;
1963 }
1964 }
1965 }
1966
1967 let finish_reason = if saw_tool_calls && !saw_text {
1968 "tool_calls"
1969 } else {
1970 "stop"
1971 };
1972 let final_chunk = openai_stream_chunk(
1973 &stream_id,
1974 now,
1975 &stream_model,
1976 serde_json::json!({}),
1977 Some(finish_reason),
1978 );
1979 yield Ok(openai_stream_event(&final_chunk));
1980 yield Ok(Event::default().data("[DONE]"));
1981 };
1982
1983 return Ok(Sse::new(event_stream)
1984 .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1985 .into_response());
1986 }
1987
1988 let completion = provider
1989 .complete(completion_request)
1990 .await
1991 .map_err(|error| {
1992 tracing::warn!(
1993 provider = %selected_provider,
1994 model = %model_id,
1995 error = %error,
1996 "Completion request failed"
1997 );
1998 openai_internal_error(error.to_string())
1999 })?;
2000
2001 let (content, tool_calls) = convert_response_message(&completion.message);
2002
2003 Ok(Json(OpenAiChatCompletionResponse {
2004 id: chat_id,
2005 object: "chat.completion".to_string(),
2006 created: now,
2007 model: openai_model,
2008 choices: vec![OpenAiChatCompletionChoice {
2009 index: 0,
2010 message: OpenAiChatCompletionMessage {
2011 role: "assistant".to_string(),
2012 content,
2013 tool_calls,
2014 },
2015 finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
2016 }],
2017 usage: OpenAiUsage {
2018 prompt_tokens: completion.usage.prompt_tokens,
2019 completion_tokens: completion.usage.completion_tokens,
2020 total_tokens: completion.usage.total_tokens,
2021 },
2022 })
2023 .into_response())
2024}
2025
2026async fn start_cognition(
2027 State(state): State<AppState>,
2028 payload: Option<Json<StartCognitionRequest>>,
2029) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2030 state
2031 .cognition
2032 .start(payload.map(|Json(body)| body))
2033 .await
2034 .map(Json)
2035 .map_err(internal_error)
2036}
2037
2038async fn stop_cognition(
2039 State(state): State<AppState>,
2040 payload: Option<Json<StopCognitionRequest>>,
2041) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2042 let reason = payload.and_then(|Json(body)| body.reason);
2043 state
2044 .cognition
2045 .stop(reason)
2046 .await
2047 .map(Json)
2048 .map_err(internal_error)
2049}
2050
2051async fn get_cognition_status(
2052 State(state): State<AppState>,
2053) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2054 Ok(Json(state.cognition.status().await))
2055}
2056
2057async fn stream_cognition(
2058 State(state): State<AppState>,
2059) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2060 let rx = state.cognition.subscribe_events();
2061
2062 let event_stream = stream::unfold(rx, |mut rx| async move {
2063 match rx.recv().await {
2064 Ok(event) => {
2065 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
2066 let sse_event = Event::default().event("cognition").data(payload);
2067 Some((Ok(sse_event), rx))
2068 }
2069 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2070 let lag_event = Event::default()
2071 .event("lag")
2072 .data(format!("skipped {}", skipped));
2073 Some((Ok(lag_event), rx))
2074 }
2075 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2076 }
2077 });
2078
2079 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2080}
2081
2082async fn stream_bus_events(
2088 State(state): State<AppState>,
2089 req: Request<Body>,
2090) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2091 let allowed_topics: Vec<String> = req
2093 .extensions()
2094 .get::<crate::server::auth::JwtClaims>()
2095 .map(|claims| claims.topics.clone())
2096 .unwrap_or_default();
2097
2098 let bus_handle = state.bus.handle("stream_bus_events");
2100 let rx = bus_handle.into_receiver();
2101
2102 let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2103 let allowed_topics = allowed_topics.clone();
2104 async move {
2105 match rx.recv().await {
2106 Ok(envelope) => {
2107 let should_send = allowed_topics.is_empty()
2109 || allowed_topics
2110 .iter()
2111 .any(|pattern| topic_matches(&envelope.topic, pattern));
2112
2113 if should_send {
2114 let payload =
2115 serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2116 let sse_event = Event::default().event("bus").data(payload);
2117 return Some((Ok(sse_event), rx));
2118 }
2119
2120 Some((Ok(Event::default().event("keepalive").data("")), rx))
2122 }
2123 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2124 let lag_event = Event::default()
2125 .event("lag")
2126 .data(format!("skipped {}", skipped));
2127 Some((Ok(lag_event), rx))
2128 }
2129 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2130 }
2131 }
2132 });
2133
2134 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2135}
2136
2137fn topic_matches(topic: &str, pattern: &str) -> bool {
2140 if pattern == "*" {
2141 return true;
2142 }
2143 if let Some(prefix) = pattern.strip_suffix(".*") {
2144 return topic.starts_with(prefix);
2145 }
2146 if let Some(suffix) = pattern.strip_prefix(".*") {
2147 return topic.ends_with(suffix);
2148 }
2149 topic == pattern
2150}
2151
2152async fn get_latest_snapshot(
2153 State(state): State<AppState>,
2154) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2155 match state.cognition.latest_snapshot().await {
2156 Some(snapshot) => Ok(Json(snapshot)),
2157 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2158 }
2159}
2160
2161async fn create_persona(
2162 State(state): State<AppState>,
2163 Json(req): Json<CreatePersonaRequest>,
2164) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2165 state
2166 .cognition
2167 .create_persona(req)
2168 .await
2169 .map(Json)
2170 .map_err(internal_error)
2171}
2172
2173async fn spawn_persona(
2174 State(state): State<AppState>,
2175 Path(id): Path<String>,
2176 Json(req): Json<SpawnPersonaRequest>,
2177) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2178 state
2179 .cognition
2180 .spawn_child(&id, req)
2181 .await
2182 .map(Json)
2183 .map_err(internal_error)
2184}
2185
2186async fn reap_persona(
2187 State(state): State<AppState>,
2188 Path(id): Path<String>,
2189 payload: Option<Json<ReapPersonaRequest>>,
2190) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2191 let req = payload
2192 .map(|Json(body)| body)
2193 .unwrap_or(ReapPersonaRequest {
2194 cascade: Some(false),
2195 reason: None,
2196 });
2197
2198 state
2199 .cognition
2200 .reap_persona(&id, req)
2201 .await
2202 .map(Json)
2203 .map_err(internal_error)
2204}
2205
2206async fn get_swarm_lineage(
2207 State(state): State<AppState>,
2208) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2209 Ok(Json(state.cognition.lineage_graph().await))
2210}
2211
2212#[derive(Deserialize)]
2215struct BeliefFilter {
2216 status: Option<String>,
2217 persona: Option<String>,
2218}
2219
2220async fn list_beliefs(
2221 State(state): State<AppState>,
2222 Query(filter): Query<BeliefFilter>,
2223) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2224 let beliefs = state.cognition.get_beliefs().await;
2225 let mut result: Vec<Belief> = beliefs.into_values().collect();
2226
2227 if let Some(status) = &filter.status {
2228 result.retain(|b| {
2229 let s = serde_json::to_string(&b.status).unwrap_or_default();
2230 s.contains(status)
2231 });
2232 }
2233 if let Some(persona) = &filter.persona {
2234 result.retain(|b| &b.asserted_by == persona);
2235 }
2236
2237 result.sort_by(|a, b| {
2238 b.confidence
2239 .partial_cmp(&a.confidence)
2240 .unwrap_or(std::cmp::Ordering::Equal)
2241 });
2242 Ok(Json(result))
2243}
2244
2245async fn get_belief(
2246 State(state): State<AppState>,
2247 Path(id): Path<String>,
2248) -> Result<Json<Belief>, (StatusCode, String)> {
2249 match state.cognition.get_belief(&id).await {
2250 Some(belief) => Ok(Json(belief)),
2251 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2252 }
2253}
2254
2255async fn list_attention(
2256 State(state): State<AppState>,
2257) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2258 Ok(Json(state.cognition.get_attention_queue().await))
2259}
2260
2261async fn list_proposals(
2262 State(state): State<AppState>,
2263) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2264 let proposals = state.cognition.get_proposals().await;
2265 let mut result: Vec<Proposal> = proposals.into_values().collect();
2266 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2267 Ok(Json(result))
2268}
2269
2270async fn approve_proposal(
2271 State(state): State<AppState>,
2272 Path(id): Path<String>,
2273) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2274 state
2275 .cognition
2276 .approve_proposal(&id)
2277 .await
2278 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2279 .map_err(internal_error)
2280}
2281
2282async fn list_receipts(
2283 State(state): State<AppState>,
2284) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2285 Ok(Json(state.cognition.get_receipts().await))
2286}
2287
2288async fn get_workspace(
2289 State(state): State<AppState>,
2290) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2291 Ok(Json(state.cognition.get_workspace().await))
2292}
2293
2294async fn get_governance(
2295 State(state): State<AppState>,
2296) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2297 Ok(Json(state.cognition.get_governance().await))
2298}
2299
2300async fn get_persona(
2301 State(state): State<AppState>,
2302 axum::extract::Path(id): axum::extract::Path<String>,
2303) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2304 state
2305 .cognition
2306 .get_persona(&id)
2307 .await
2308 .map(Json)
2309 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2310}
2311
2312#[derive(Deserialize)]
2315struct AuditQuery {
2316 limit: Option<usize>,
2317 category: Option<String>,
2318}
2319
2320async fn list_audit_entries(
2321 State(state): State<AppState>,
2322 Query(query): Query<AuditQuery>,
2323) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2324 let limit = query.limit.unwrap_or(100).min(1000);
2325
2326 let entries = if let Some(ref cat) = query.category {
2327 let category = match cat.as_str() {
2328 "api" => AuditCategory::Api,
2329 "tool" | "tool_execution" => AuditCategory::ToolExecution,
2330 "session" => AuditCategory::Session,
2331 "cognition" => AuditCategory::Cognition,
2332 "swarm" => AuditCategory::Swarm,
2333 "auth" => AuditCategory::Auth,
2334 "k8s" => AuditCategory::K8s,
2335 "sandbox" => AuditCategory::Sandbox,
2336 "config" => AuditCategory::Config,
2337 _ => {
2338 return Err((
2339 StatusCode::BAD_REQUEST,
2340 format!("Unknown category: {}", cat),
2341 ));
2342 }
2343 };
2344 state.audit_log.by_category(category, limit).await
2345 } else {
2346 state.audit_log.recent(limit).await
2347 };
2348
2349 Ok(Json(entries))
2350}
2351
2352#[derive(Deserialize)]
2357struct ReplayQuery {
2358 session_id: String,
2360 start_offset: Option<u64>,
2362 end_offset: Option<u64>,
2364 limit: Option<usize>,
2366 tool_name: Option<String>,
2368}
2369
2370async fn replay_session_events(
2374 Query(query): Query<ReplayQuery>,
2375) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2376 use std::path::PathBuf;
2377
2378 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2379 .map(PathBuf::from)
2380 .ok()
2381 .ok_or_else(|| {
2382 (
2383 StatusCode::SERVICE_UNAVAILABLE,
2384 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2385 )
2386 })?;
2387
2388 let session_dir = base_dir.join(&query.session_id);
2389
2390 if !session_dir.exists() {
2392 return Err((
2393 StatusCode::NOT_FOUND,
2394 format!("Session not found: {}", query.session_id),
2395 ));
2396 }
2397
2398 let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2399
2400 let entries = std::fs::read_dir(&session_dir)
2402 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2403
2404 for entry in entries {
2405 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2406 let path = entry.path();
2407
2408 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2409 continue;
2410 }
2411
2412 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2414
2415 if let Some(offsets) = filename
2416 .strip_prefix("T")
2417 .or_else(|| filename.strip_prefix("202"))
2418 {
2419 let parts: Vec<&str> = offsets.split('-').collect();
2421 if parts.len() >= 4 {
2422 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2423 let end: u64 = parts[parts.len() - 1]
2424 .trim_end_matches(".jsonl")
2425 .parse()
2426 .unwrap_or(0);
2427
2428 if let Some(query_start) = query.start_offset
2430 && end <= query_start
2431 {
2432 continue;
2433 }
2434 if let Some(query_end) = query.end_offset
2435 && start >= query_end
2436 {
2437 continue;
2438 }
2439
2440 let content = std::fs::read_to_string(&path)
2442 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2443
2444 for line in content.lines() {
2445 if line.trim().is_empty() {
2446 continue;
2447 }
2448 if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2449 if let Some(ref tool_filter) = query.tool_name {
2451 if let Some(event_tool) =
2452 event.get("tool_name").and_then(|v| v.as_str())
2453 {
2454 if event_tool != tool_filter {
2455 continue;
2456 }
2457 } else {
2458 continue;
2459 }
2460 }
2461 all_events.push((start, end, event));
2462 }
2463 }
2464 }
2465 }
2466 }
2467
2468 all_events.sort_by_key(|(s, _, _)| *s);
2470
2471 let limit = query.limit.unwrap_or(1000).min(10000);
2473 let events: Vec<_> = all_events
2474 .into_iter()
2475 .take(limit)
2476 .map(|(_, _, e)| e)
2477 .collect();
2478
2479 Ok(Json(events))
2480}
2481
2482async fn list_session_event_files(
2484 Query(query): Query<ReplayQuery>,
2485) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2486 use std::path::PathBuf;
2487
2488 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2489 .map(PathBuf::from)
2490 .ok()
2491 .ok_or_else(|| {
2492 (
2493 StatusCode::SERVICE_UNAVAILABLE,
2494 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2495 )
2496 })?;
2497
2498 let session_dir = base_dir.join(&query.session_id);
2499 if !session_dir.exists() {
2500 return Err((
2501 StatusCode::NOT_FOUND,
2502 format!("Session not found: {}", query.session_id),
2503 ));
2504 }
2505
2506 let mut files: Vec<EventFileMeta> = Vec::new();
2507
2508 let entries = std::fs::read_dir(&session_dir)
2510 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2511
2512 for entry in entries {
2513 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2514 let path = entry.path();
2515
2516 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2517 continue;
2518 }
2519
2520 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2521
2522 if let Some(offsets) = filename
2524 .strip_prefix("T")
2525 .or_else(|| filename.strip_prefix("202"))
2526 {
2527 let parts: Vec<&str> = offsets.split('-').collect();
2528 if parts.len() >= 4 {
2529 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2530 let end: u64 = parts[parts.len() - 1]
2531 .trim_end_matches(".jsonl")
2532 .parse()
2533 .unwrap_or(0);
2534
2535 let metadata = std::fs::metadata(&path)
2536 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2537
2538 files.push(EventFileMeta {
2539 filename: filename.to_string(),
2540 start_offset: start,
2541 end_offset: end,
2542 size_bytes: metadata.len(),
2543 });
2544 }
2545 }
2546 }
2547
2548 files.sort_by_key(|f| f.start_offset);
2549 Ok(Json(files))
2550}
2551
2552#[derive(Serialize)]
2553struct EventFileMeta {
2554 filename: String,
2555 start_offset: u64,
2556 end_offset: u64,
2557 size_bytes: u64,
2558}
2559
2560async fn get_k8s_status(
2563 State(state): State<AppState>,
2564) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2565 Ok(Json(state.k8s.status().await))
2566}
2567
2568#[derive(Deserialize)]
2569struct ScaleRequest {
2570 replicas: i32,
2571}
2572
2573async fn k8s_scale(
2574 State(state): State<AppState>,
2575 Json(req): Json<ScaleRequest>,
2576) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2577 if req.replicas < 0 || req.replicas > 100 {
2578 return Err((
2579 StatusCode::BAD_REQUEST,
2580 "Replicas must be between 0 and 100".to_string(),
2581 ));
2582 }
2583
2584 state
2585 .audit_log
2586 .log(
2587 AuditCategory::K8s,
2588 format!("scale:{}", req.replicas),
2589 AuditOutcome::Success,
2590 None,
2591 None,
2592 )
2593 .await;
2594
2595 state
2596 .k8s
2597 .scale(req.replicas)
2598 .await
2599 .map(Json)
2600 .map_err(internal_error)
2601}
2602
2603async fn k8s_restart(
2604 State(state): State<AppState>,
2605) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2606 state
2607 .audit_log
2608 .log(
2609 AuditCategory::K8s,
2610 "rolling_restart",
2611 AuditOutcome::Success,
2612 None,
2613 None,
2614 )
2615 .await;
2616
2617 state
2618 .k8s
2619 .rolling_restart()
2620 .await
2621 .map(Json)
2622 .map_err(internal_error)
2623}
2624
2625async fn k8s_list_pods(
2626 State(state): State<AppState>,
2627) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2628 state
2629 .k8s
2630 .list_pods()
2631 .await
2632 .map(Json)
2633 .map_err(internal_error)
2634}
2635
2636async fn k8s_actions(
2637 State(state): State<AppState>,
2638) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2639 Ok(Json(state.k8s.recent_actions(100).await))
2640}
2641
2642#[derive(Deserialize)]
2643struct SpawnSubagentRequest {
2644 subagent_id: String,
2645 #[serde(default)]
2646 image: Option<String>,
2647 #[serde(default)]
2648 env_vars: std::collections::HashMap<String, String>,
2649 #[serde(default)]
2650 labels: std::collections::HashMap<String, String>,
2651 #[serde(default)]
2652 command: Option<Vec<String>>,
2653 #[serde(default)]
2654 args: Option<Vec<String>>,
2655}
2656
2657async fn k8s_spawn_subagent(
2658 State(state): State<AppState>,
2659 Json(req): Json<SpawnSubagentRequest>,
2660) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2661 state
2662 .k8s
2663 .spawn_subagent_pod_with_spec(
2664 &req.subagent_id,
2665 crate::k8s::SubagentPodSpec {
2666 image: req.image,
2667 env_vars: req.env_vars,
2668 labels: req.labels,
2669 command: req.command,
2670 args: req.args,
2671 },
2672 )
2673 .await
2674 .map(Json)
2675 .map_err(internal_error)
2676}
2677
2678async fn k8s_delete_subagent(
2679 State(state): State<AppState>,
2680 axum::extract::Path(id): axum::extract::Path<String>,
2681) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2682 state
2683 .k8s
2684 .delete_subagent_pod(&id)
2685 .await
2686 .map(Json)
2687 .map_err(internal_error)
2688}
2689
2690#[derive(Debug, Deserialize)]
2692pub struct RegisterToolRequest {
2693 pub id: String,
2694 pub name: String,
2695 pub description: String,
2696 pub version: String,
2697 pub endpoint: String,
2698 pub capabilities: Vec<String>,
2699 pub parameters: serde_json::Value,
2700}
2701
2702#[derive(Serialize)]
2704struct RegisterToolResponse {
2705 tool: RegisteredToolView,
2706 message: String,
2707}
2708
2709async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredToolView>> {
2711 Json(
2712 state
2713 .tool_registry
2714 .list()
2715 .await
2716 .into_iter()
2717 .map(RegisteredToolView::from)
2718 .collect(),
2719 )
2720}
2721
2722async fn register_tool(
2724 State(state): State<AppState>,
2725 Json(req): Json<RegisterToolRequest>,
2726) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2727 let now = chrono::Utc::now();
2728 let tool = RegisteredTool {
2729 id: req.id.clone(),
2730 name: req.name,
2731 description: req.description,
2732 version: req.version,
2733 endpoint: req.endpoint,
2734 capabilities: req.capabilities,
2735 parameters: req.parameters,
2736 execution_mode: tool_contract::DISCOVERY_ONLY_MODE.to_string(),
2737 registered_at: now,
2738 last_heartbeat: now,
2739 expires_at: now + Duration::from_secs(90),
2740 };
2741
2742 state.tool_registry.register(tool.clone()).await;
2743
2744 tracing::info!(tool_id = %tool.id, "Tool registered");
2745
2746 Ok(Json(RegisterToolResponse {
2747 tool: RegisteredToolView::from(tool),
2748 message: tool_contract::discovery_registration_message(),
2749 }))
2750}
2751
2752async fn tool_heartbeat(
2754 State(state): State<AppState>,
2755 Path(id): Path<String>,
2756) -> Result<Json<RegisteredToolView>, (StatusCode, String)> {
2757 state
2758 .tool_registry
2759 .heartbeat(&id)
2760 .await
2761 .map(|tool| {
2762 tracing::info!(tool_id = %id, "Tool heartbeat received");
2763 Json(RegisteredToolView::from(tool))
2764 })
2765 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2766}
2767
2768async fn list_plugins(State(state): State<AppState>) -> Json<PluginListResponse> {
2770 let test_sig =
2771 state
2772 .plugin_registry
2773 .signing_key()
2774 .sign("_probe", "0.0.0", &state.server_fingerprint);
2775 Json(PluginListResponse {
2776 server_fingerprint: state.server_fingerprint.clone(),
2777 signing_available: !test_sig.is_empty(),
2778 plugins: state.plugin_registry.list().await,
2779 })
2780}
2781
2782#[derive(Serialize)]
2783struct PluginListResponse {
2784 server_fingerprint: String,
2785 signing_available: bool,
2786 plugins: Vec<PluginManifest>,
2787}
2788
2789async fn list_agent_tasks(
2795 State(state): State<AppState>,
2796 Query(params): Query<ListAgentTasksQuery>,
2797) -> Json<serde_json::Value> {
2798 let tasks = state.knative_tasks.list().await;
2799 let filtered: Vec<_> = tasks
2800 .into_iter()
2801 .filter(|t| params.status.as_ref().is_none_or(|s| t.status == *s))
2802 .filter(|t| {
2803 params
2804 .agent_type
2805 .as_ref()
2806 .is_none_or(|a| t.agent_type == *a)
2807 })
2808 .collect();
2809 Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2810}
2811
2812#[derive(Deserialize)]
2813struct ListAgentTasksQuery {
2814 status: Option<String>,
2815 agent_type: Option<String>,
2816}
2817
2818async fn create_agent_task(
2820 State(state): State<AppState>,
2821 Json(req): Json<CreateAgentTaskRequest>,
2822) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2823 let task_id = uuid::Uuid::new_v4().to_string();
2824 let task = KnativeTask {
2825 task_id: task_id.clone(),
2826 title: req.title,
2827 description: req.description,
2828 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2829 priority: req.priority.unwrap_or(0),
2830 received_at: chrono::Utc::now(),
2831 status: "pending".to_string(),
2832 };
2833 state.knative_tasks.push(task).await;
2834 Ok(Json(serde_json::json!({
2835 "task_id": task_id,
2836 "status": "pending",
2837 })))
2838}
2839
2840#[derive(Deserialize)]
2841#[allow(dead_code)]
2842struct CreateAgentTaskRequest {
2843 title: String,
2844 description: String,
2845 agent_type: Option<String>,
2846 model: Option<String>,
2847 priority: Option<i32>,
2848}
2849
2850async fn get_agent_task(
2852 State(state): State<AppState>,
2853 Path(task_id): Path<String>,
2854) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2855 state
2856 .knative_tasks
2857 .get(&task_id)
2858 .await
2859 .map(Json)
2860 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2861}
2862
2863async fn get_agent_task_output(
2865 State(state): State<AppState>,
2866 Path(task_id): Path<String>,
2867) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2868 let task = state
2869 .knative_tasks
2870 .get(&task_id)
2871 .await
2872 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2873 Ok(Json(serde_json::json!({
2874 "task_id": task.task_id,
2875 "status": task.status,
2876 "title": task.title,
2877 "output": null,
2878 })))
2879}
2880
2881#[derive(Deserialize)]
2883struct TaskOutputPayload {
2884 #[allow(dead_code)]
2885 #[serde(default)]
2886 worker_id: Option<String>,
2887 #[serde(default)]
2888 output: Option<String>,
2889}
2890
2891async fn agent_task_output(
2892 State(state): State<AppState>,
2893 Path(task_id): Path<String>,
2894 Json(payload): Json<TaskOutputPayload>,
2895) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2896 let task_exists = state.knative_tasks.get(&task_id).await.is_some();
2898 if !task_exists {
2899 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
2900 }
2901
2902 let _ = state.knative_tasks.update_status(&task_id, "working").await;
2904
2905 if let Some(ref output) = payload.output {
2907 let bus_handle = state.bus.handle("task-output");
2908 let _ = bus_handle.send(
2909 format!("task.{}", task_id),
2910 crate::bus::BusMessage::TaskUpdate {
2911 task_id: task_id.clone(),
2912 state: crate::a2a::types::TaskState::Working,
2913 message: Some(output.clone()),
2914 },
2915 );
2916 }
2917
2918 Ok(Json(serde_json::json!({
2919 "task_id": task_id,
2920 "status": "received",
2921 })))
2922}
2923
2924async fn stream_agent_task_output(
2926 State(state): State<AppState>,
2927 Path(task_id): Path<String>,
2928) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2929 state
2931 .knative_tasks
2932 .get(&task_id)
2933 .await
2934 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2935
2936 let bus_handle = state.bus.handle("task-stream");
2937 let rx = bus_handle.into_receiver();
2938 let topic_prefix = format!("task.{task_id}");
2939
2940 let stream = async_stream::try_stream! {
2941 let mut rx = rx;
2942 loop {
2943 match rx.recv().await {
2944 Ok(envelope) => {
2945 if !envelope.topic.starts_with(&topic_prefix) {
2946 continue;
2947 }
2948 let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2949 yield Event::default().event("output").data(data);
2950 }
2951 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2952 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2953 }
2954 }
2955 };
2956
2957 Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2958}
2959
2960async fn list_connected_workers(
2964 State(state): State<AppState>,
2965) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2966 let pods = state.k8s.list_pods().await.unwrap_or_default();
2968 let workers: Vec<serde_json::Value> = pods
2969 .iter()
2970 .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2971 .map(|p| {
2972 serde_json::json!({
2973 "worker_id": p.name,
2974 "name": p.name,
2975 "status": p.phase,
2976 "is_sse_connected": p.phase == "Running",
2977 "last_seen": chrono::Utc::now().to_rfc3339(),
2978 })
2979 })
2980 .collect();
2981 Ok(Json(serde_json::json!({ "workers": workers })))
2982}
2983
2984async fn dispatch_task(
2988 State(state): State<AppState>,
2989 Json(req): Json<DispatchTaskRequest>,
2990) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2991 let task_id = uuid::Uuid::new_v4().to_string();
2992 let task = KnativeTask {
2993 task_id: task_id.clone(),
2994 title: req.title,
2995 description: req.description,
2996 agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2997 priority: req.priority.unwrap_or(0),
2998 received_at: chrono::Utc::now(),
2999 status: "pending".to_string(),
3000 };
3001
3002 let handle = state.bus.handle("task-dispatch");
3004 handle.send(
3005 format!("task.{}", task_id),
3006 crate::bus::BusMessage::TaskUpdate {
3007 task_id: task_id.clone(),
3008 state: crate::a2a::types::TaskState::Submitted,
3009 message: Some(format!("Task dispatched: {}", task.title)),
3010 },
3011 );
3012
3013 state.knative_tasks.push(task).await;
3014
3015 Ok(Json(serde_json::json!({
3016 "task_id": task_id,
3017 "status": "pending",
3018 "dispatched_via_knative": true,
3019 })))
3020}
3021
3022#[derive(Deserialize)]
3023#[allow(dead_code)]
3024struct DispatchTaskRequest {
3025 title: String,
3026 description: String,
3027 agent_type: Option<String>,
3028 model: Option<String>,
3029 priority: Option<i32>,
3030 metadata: Option<serde_json::Value>,
3031}
3032
3033async fn create_voice_session_rest(
3039 State(state): State<AppState>,
3040 Json(req): Json<CreateVoiceSessionRequest>,
3041) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3042 let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
3043 let room_name = format!("voice-{}", uuid::Uuid::new_v4());
3044
3045 let handle = state.bus.handle("voice-rest");
3047 handle.send_voice_session_started(&room_name, &voice_id);
3048
3049 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3050
3051 Ok(Json(serde_json::json!({
3052 "room_name": room_name,
3053 "voice": voice_id,
3054 "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
3055 "access_token": "",
3056 "livekit_url": livekit_url,
3057 "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
3058 })))
3059}
3060
3061#[derive(Deserialize)]
3062#[allow(dead_code)]
3063struct CreateVoiceSessionRequest {
3064 voice: Option<String>,
3065 mode: Option<String>,
3066 codebase_id: Option<String>,
3067 user_id: Option<String>,
3068}
3069
3070async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
3072 let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3073 Json(serde_json::json!({
3074 "room_name": room_name,
3075 "state": "active",
3076 "agent_state": "listening",
3077 "access_token": "",
3078 "livekit_url": livekit_url,
3079 }))
3080}
3081
3082async fn delete_voice_session_rest(
3084 State(state): State<AppState>,
3085 Path(room_name): Path<String>,
3086) -> Json<serde_json::Value> {
3087 let handle = state.bus.handle("voice-rest");
3088 handle.send_voice_session_ended(&room_name, "user_ended");
3089 Json(serde_json::json!({ "status": "deleted" }))
3090}
3091
3092async fn list_voices_rest() -> Json<serde_json::Value> {
3094 Json(serde_json::json!({
3095 "voices": [
3096 { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
3097 { "voice_id": "puck", "name": "Puck", "language": "english" },
3098 { "voice_id": "charon", "name": "Charon", "language": "english" },
3099 { "voice_id": "kore", "name": "Kore", "language": "english" },
3100 { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
3101 { "voice_id": "aoede", "name": "Aoede", "language": "english" },
3102 ]
3103 }))
3104}
3105
3106async fn resume_codebase_session(
3110 Path((_codebase_id, session_id)): Path<(String, String)>,
3111 Json(req): Json<ResumeSessionRequest>,
3112) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3113 let mut session = match crate::session::Session::load(&session_id).await {
3115 Ok(s) => s,
3116 Err(_) => {
3117 let mut s = crate::session::Session::new()
3119 .await
3120 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3121 if let Some(agent) = &req.agent {
3122 s.set_agent_name(agent.clone());
3123 }
3124 s.save()
3125 .await
3126 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3127 s
3128 }
3129 };
3130
3131 if let Some(prompt) = &req.prompt
3133 && !prompt.is_empty()
3134 {
3135 match session.prompt(prompt).await {
3136 Ok(result) => {
3137 session.save().await.ok();
3138 return Ok(Json(serde_json::json!({
3139 "session_id": session.id,
3140 "active_session_id": session.id,
3141 "status": "completed",
3142 "result": result.text,
3143 })));
3144 }
3145 Err(e) => {
3146 return Ok(Json(serde_json::json!({
3147 "session_id": session.id,
3148 "active_session_id": session.id,
3149 "status": "failed",
3150 "error": e.to_string(),
3151 })));
3152 }
3153 }
3154 }
3155
3156 Ok(Json(serde_json::json!({
3158 "session_id": session.id,
3159 "active_session_id": session.id,
3160 "status": "ready",
3161 })))
3162}
3163
3164#[derive(Deserialize)]
3165#[allow(dead_code)]
3166struct ResumeSessionRequest {
3167 prompt: Option<String>,
3168 agent: Option<String>,
3169 model: Option<String>,
3170}
3171
3172fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3173 let message = error.to_string();
3174 if message.contains("not found") {
3175 return (StatusCode::NOT_FOUND, message);
3176 }
3177 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3178 return (StatusCode::BAD_REQUEST, message);
3179 }
3180 (StatusCode::INTERNAL_SERVER_ERROR, message)
3181}
3182
3183fn env_bool(name: &str, default: bool) -> bool {
3184 std::env::var(name)
3185 .ok()
3186 .and_then(|v| match v.to_ascii_lowercase().as_str() {
3187 "1" | "true" | "yes" | "on" => Some(true),
3188 "0" | "false" | "no" | "off" => Some(false),
3189 _ => None,
3190 })
3191 .unwrap_or(default)
3192}
3193
3194#[cfg(test)]
3195mod tests {
3196 use super::{
3197 RegisteredTool, RegisteredToolView, match_policy_rule, normalize_model_reference,
3198 required_permission,
3199 };
3200 use axum::http::StatusCode;
3201 use serde_json::json;
3202
3203 #[test]
3204 fn policy_prompt_session_requires_execute_permission() {
3205 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3206 assert_eq!(permission, Some("agent:execute"));
3207 }
3208
3209 #[test]
3210 fn policy_create_session_keeps_sessions_write_permission() {
3211 let permission = match_policy_rule("/api/session", "POST");
3212 assert_eq!(permission, Some("sessions:write"));
3213 }
3214
3215 #[test]
3216 fn policy_proposal_approval_requires_execute_permission() {
3217 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3218 assert_eq!(permission, Some("agent:execute"));
3219 }
3220
3221 #[test]
3222 fn policy_openai_models_requires_read_permission() {
3223 let permission = match_policy_rule("/v1/models", "GET");
3224 assert_eq!(permission, Some("agent:read"));
3225 }
3226
3227 #[test]
3228 fn policy_openai_chat_completions_requires_execute_permission() {
3229 let permission = match_policy_rule("/v1/chat/completions", "POST");
3230 assert_eq!(permission, Some("agent:execute"));
3231 }
3232
3233 #[test]
3234 fn policy_unmapped_route_is_denied() {
3235 let permission = required_permission("/v1/new-route", "GET");
3236 assert_eq!(permission, Err(StatusCode::FORBIDDEN));
3237 }
3238
3239 #[test]
3240 fn normalize_model_reference_accepts_colon_format() {
3241 assert_eq!(
3242 normalize_model_reference("openai:gpt-4o"),
3243 "openai/gpt-4o".to_string()
3244 );
3245 }
3246
3247 #[test]
3248 fn registered_tool_view_derives_discovery_contract() {
3249 let now = chrono::Utc::now();
3250 let tool = RegisteredTool {
3251 id: "demo".into(),
3252 name: "Demo".into(),
3253 description: "Demo tool".into(),
3254 version: "1.0.0".into(),
3255 endpoint: "https://example.test/tool".into(),
3256 capabilities: vec!["demo".into()],
3257 parameters: json!({}),
3258 execution_mode: super::tool_contract::DISCOVERY_ONLY_MODE.into(),
3259 registered_at: now,
3260 last_heartbeat: now,
3261 expires_at: now,
3262 };
3263
3264 let value = serde_json::to_value(RegisteredToolView::from(tool)).unwrap();
3265
3266 assert_eq!(value["execution_mode"], "discovery_only");
3267 assert_eq!(value["agent_executable"], false);
3268 }
3269}