Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::{AgentBus, BusEnvelope};
11use crate::cli::ServeArgs;
12use crate::cognition::{
13    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16    executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24    Router,
25    body::Body,
26    extract::Path,
27    extract::{Query, State},
28    http::{Request, StatusCode},
29    middleware::{self, Next},
30    response::sse::{Event, KeepAlive, Sse},
31    response::{IntoResponse, Json, Response},
32    routing::{get, post},
33};
34use futures::{StreamExt, future::join_all, stream};
35use http::HeaderValue;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::convert::Infallible;
39use std::sync::Arc;
40use std::time::Duration;
41use tokio::sync::{Mutex, broadcast};
42use tonic_web::GrpcWebLayer;
43use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
44use tower_http::trace::TraceLayer;
45
46/// Task received from Knative Eventing
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct KnativeTask {
49    pub task_id: String,
50    pub title: String,
51    pub description: String,
52    pub agent_type: String,
53    pub priority: i32,
54    pub received_at: chrono::DateTime<chrono::Utc>,
55    pub status: String,
56}
57
58/// Queue for Knative tasks waiting to be processed
59#[derive(Clone)]
60pub struct KnativeTaskQueue {
61    tasks: Arc<Mutex<Vec<KnativeTask>>>,
62}
63
64impl KnativeTaskQueue {
65    pub fn new() -> Self {
66        Self {
67            tasks: Arc::new(Mutex::new(Vec::new())),
68        }
69    }
70
71    pub async fn push(&self, task: KnativeTask) {
72        self.tasks.lock().await.push(task);
73    }
74
75    pub async fn pop(&self) -> Option<KnativeTask> {
76        self.tasks.lock().await.pop()
77    }
78
79    pub async fn list(&self) -> Vec<KnativeTask> {
80        self.tasks.lock().await.clone()
81    }
82
83    pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
84        self.tasks
85            .lock()
86            .await
87            .iter()
88            .find(|t| t.task_id == task_id)
89            .cloned()
90    }
91
92    pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
93        let mut tasks = self.tasks.lock().await;
94        if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
95            task.status = status.to_string();
96            true
97        } else {
98            false
99        }
100    }
101}
102
103impl Default for KnativeTaskQueue {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109/// A registered tool with TTL tracking
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct RegisteredTool {
112    pub id: String,
113    pub name: String,
114    pub description: String,
115    pub version: String,
116    pub endpoint: String,
117    pub capabilities: Vec<String>,
118    pub parameters: serde_json::Value,
119    pub registered_at: chrono::DateTime<chrono::Utc>,
120    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
121    pub expires_at: chrono::DateTime<chrono::Utc>,
122}
123
124/// Tool registry with TTL-based expiry
125#[derive(Clone)]
126pub struct ToolRegistry {
127    tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
128}
129
130impl Default for ToolRegistry {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl ToolRegistry {
137    pub fn new() -> Self {
138        Self {
139            tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
140        }
141    }
142
143    /// Register a new tool
144    pub async fn register(&self, tool: RegisteredTool) {
145        let mut tools = self.tools.write().await;
146        tools.insert(tool.id.clone(), tool);
147    }
148
149    /// Get a tool by ID
150    pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
151        let tools = self.tools.read().await;
152        tools.get(id).cloned()
153    }
154
155    /// List all tools (excluding expired)
156    pub async fn list(&self) -> Vec<RegisteredTool> {
157        let tools = self.tools.read().await;
158        let now = chrono::Utc::now();
159        tools
160            .values()
161            .filter(|t| t.expires_at > now)
162            .cloned()
163            .collect()
164    }
165
166    /// Update heartbeat for a tool (extends TTL by 30s)
167    pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
168        let mut tools = self.tools.write().await;
169        if let Some(tool) = tools.get_mut(id) {
170            let now = chrono::Utc::now();
171            tool.last_heartbeat = now;
172            tool.expires_at = now + Duration::from_secs(90);
173            return Some(tool.clone());
174        }
175        None
176    }
177
178    /// Clean up expired tools
179    pub async fn cleanup(&self) {
180        let mut tools = self.tools.write().await;
181        let now = chrono::Utc::now();
182        tools.retain(|_, t| t.expires_at > now);
183    }
184}
185
186/// Server state shared across handlers
187#[derive(Clone)]
188pub struct AppState {
189    pub config: Arc<Config>,
190    pub cognition: Arc<CognitionRuntime>,
191    pub audit_log: AuditLog,
192    pub k8s: Arc<K8sManager>,
193    pub auth: AuthState,
194    pub bus: Arc<AgentBus>,
195    pub knative_tasks: KnativeTaskQueue,
196    pub tool_registry: ToolRegistry,
197}
198
199/// Audit middleware — logs every request/response to the audit trail.
200async fn audit_middleware(
201    State(state): State<AppState>,
202    request: Request<Body>,
203    next: Next,
204) -> Response {
205    let method = request.method().clone();
206    let path = request.uri().path().to_string();
207    let started = std::time::Instant::now();
208
209    let response = next.run(request).await;
210
211    let duration_ms = started.elapsed().as_millis() as u64;
212    let status = response.status().as_u16();
213    let outcome = if status < 400 {
214        AuditOutcome::Success
215    } else if status == 401 || status == 403 {
216        AuditOutcome::Denied
217    } else {
218        AuditOutcome::Failure
219    };
220
221    state
222        .audit_log
223        .record(audit::AuditEntry {
224            id: uuid::Uuid::new_v4().to_string(),
225            timestamp: chrono::Utc::now(),
226            category: AuditCategory::Api,
227            action: format!("{} {}", method, path),
228            principal: None,
229            outcome,
230            detail: Some(serde_json::json!({ "status": status })),
231            duration_ms: Some(duration_ms),
232            okr_id: None,
233            okr_run_id: None,
234            relay_id: None,
235            session_id: None,
236        })
237        .await;
238
239    response
240}
241
242/// Mapping from (path pattern, HTTP method) → OPA permission action.
243/// The first matching rule wins.
244struct PolicyRule {
245    pattern: &'static str,
246    methods: Option<&'static [&'static str]>,
247    permission: &'static str,
248}
249
250const POLICY_RULES: &[PolicyRule] = &[
251    // Public / exempt
252    PolicyRule {
253        pattern: "/health",
254        methods: None,
255        permission: "",
256    },
257    PolicyRule {
258        pattern: "/task",
259        methods: None,
260        permission: "",
261    },
262    PolicyRule {
263        pattern: "/v1/knative/",
264        methods: None,
265        permission: "",
266    },
267    // OpenAI-compatible model discovery and chat completions
268    PolicyRule {
269        pattern: "/v1/models",
270        methods: Some(&["GET"]),
271        permission: "agent:read",
272    },
273    PolicyRule {
274        pattern: "/v1/chat/completions",
275        methods: Some(&["POST"]),
276        permission: "agent:execute",
277    },
278    // Tool registry — read/write
279    PolicyRule {
280        pattern: "/v1/tools",
281        methods: Some(&["GET"]),
282        permission: "agent:read",
283    },
284    PolicyRule {
285        pattern: "/v1/tools/register",
286        methods: Some(&["POST"]),
287        permission: "agent:execute",
288    },
289    PolicyRule {
290        pattern: "/v1/tools/",
291        methods: Some(&["POST"]),
292        permission: "agent:execute",
293    },
294    PolicyRule {
295        pattern: "/a2a/",
296        methods: None,
297        permission: "",
298    },
299    // K8s management — admin only
300    PolicyRule {
301        pattern: "/v1/k8s/scale",
302        methods: Some(&["POST"]),
303        permission: "admin:access",
304    },
305    PolicyRule {
306        pattern: "/v1/k8s/restart",
307        methods: Some(&["POST"]),
308        permission: "admin:access",
309    },
310    PolicyRule {
311        pattern: "/v1/k8s/",
312        methods: Some(&["GET"]),
313        permission: "admin:access",
314    },
315    // K8s sub-agent lifecycle — admin only
316    PolicyRule {
317        pattern: "/v1/k8s/subagent",
318        methods: Some(&["POST", "DELETE"]),
319        permission: "admin:access",
320    },
321    // Plugin registry — read
322    PolicyRule {
323        pattern: "/v1/plugins",
324        methods: Some(&["GET"]),
325        permission: "agent:read",
326    },
327    // Audit — admin
328    PolicyRule {
329        pattern: "/v1/audit",
330        methods: None,
331        permission: "admin:access",
332    },
333    // Event stream replay — admin (compliance feature)
334    PolicyRule {
335        pattern: "/v1/audit/replay",
336        methods: Some(&["GET"]),
337        permission: "admin:access",
338    },
339    // Cognition — write operations
340    PolicyRule {
341        pattern: "/v1/cognition/start",
342        methods: Some(&["POST"]),
343        permission: "agent:execute",
344    },
345    PolicyRule {
346        pattern: "/v1/cognition/stop",
347        methods: Some(&["POST"]),
348        permission: "agent:execute",
349    },
350    PolicyRule {
351        pattern: "/v1/cognition/",
352        methods: Some(&["GET"]),
353        permission: "agent:read",
354    },
355    // Swarm persona lifecycle
356    PolicyRule {
357        pattern: "/v1/swarm/personas",
358        methods: Some(&["POST"]),
359        permission: "agent:execute",
360    },
361    PolicyRule {
362        pattern: "/v1/swarm/",
363        methods: Some(&["POST"]),
364        permission: "agent:execute",
365    },
366    PolicyRule {
367        pattern: "/v1/swarm/",
368        methods: Some(&["GET"]),
369        permission: "agent:read",
370    },
371    // Agent Bus — read access for stream, filtered by JWT topic claims
372    PolicyRule {
373        pattern: "/v1/bus/stream",
374        methods: Some(&["GET"]),
375        permission: "agent:read",
376    },
377    // Agent Bus — publish messages (requires write permission)
378    PolicyRule {
379        pattern: "/v1/bus/publish",
380        methods: Some(&["POST"]),
381        permission: "agent:execute",
382    },
383    // Session management
384    // Session prompt execution — requires execute permission
385    PolicyRule {
386        pattern: "/api/session/",
387        methods: Some(&["POST"]),
388        permission: "agent:execute",
389    },
390    PolicyRule {
391        pattern: "/api/session",
392        methods: Some(&["POST"]),
393        permission: "sessions:write",
394    },
395    PolicyRule {
396        pattern: "/api/session",
397        methods: Some(&["GET"]),
398        permission: "sessions:read",
399    },
400    // MCP compatibility alias
401    PolicyRule {
402        pattern: "/mcp/v1/tools",
403        methods: Some(&["GET"]),
404        permission: "agent:read",
405    },
406    // Agent task APIs (dashboard compatibility)
407    PolicyRule {
408        pattern: "/v1/agent/tasks",
409        methods: Some(&["GET"]),
410        permission: "agent:read",
411    },
412    PolicyRule {
413        pattern: "/v1/agent/tasks",
414        methods: Some(&["POST"]),
415        permission: "agent:execute",
416    },
417    PolicyRule {
418        pattern: "/v1/agent/tasks/",
419        methods: Some(&["GET"]),
420        permission: "agent:read",
421    },
422    // Worker connectivity
423    PolicyRule {
424        pattern: "/v1/worker/",
425        methods: Some(&["GET"]),
426        permission: "agent:read",
427    },
428    PolicyRule {
429        pattern: "/v1/agent/workers",
430        methods: Some(&["GET"]),
431        permission: "agent:read",
432    },
433    // Task dispatch
434    PolicyRule {
435        pattern: "/v1/tasks/dispatch",
436        methods: Some(&["POST"]),
437        permission: "agent:execute",
438    },
439    // Voice REST bridge
440    PolicyRule {
441        pattern: "/v1/voice/",
442        methods: Some(&["GET"]),
443        permission: "agent:read",
444    },
445    PolicyRule {
446        pattern: "/v1/voice/",
447        methods: Some(&["POST", "DELETE"]),
448        permission: "agent:execute",
449    },
450    // Session resume
451    PolicyRule {
452        pattern: "/v1/agent/codebases/",
453        methods: Some(&["POST"]),
454        permission: "agent:execute",
455    },
456    // Proposal approval — governance action
457    PolicyRule {
458        pattern: "/v1/cognition/proposals/",
459        methods: Some(&["POST"]),
460        permission: "agent:execute",
461    },
462    // Config, version, providers, agents — read
463    PolicyRule {
464        pattern: "/api/version",
465        methods: None,
466        permission: "agent:read",
467    },
468    PolicyRule {
469        pattern: "/api/config",
470        methods: None,
471        permission: "agent:read",
472    },
473    PolicyRule {
474        pattern: "/api/provider",
475        methods: None,
476        permission: "agent:read",
477    },
478    PolicyRule {
479        pattern: "/api/agent",
480        methods: None,
481        permission: "agent:read",
482    },
483];
484
485/// Find the required permission for a given path + method.
486/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
487fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
488    for rule in POLICY_RULES {
489        let matches = if rule.pattern.ends_with('/') {
490            path.starts_with(rule.pattern)
491        } else {
492            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
493        };
494        if matches {
495            if let Some(allowed_methods) = rule.methods {
496                if !allowed_methods.contains(&method) {
497                    continue;
498                }
499            }
500            return Some(rule.permission);
501        }
502    }
503    None
504}
505
506/// Policy authorization middleware for Axum.
507///
508/// Maps request paths to OPA permission strings and enforces authorization.
509/// Runs after `require_auth` so the bearer token is already validated.
510/// Currently maps the static bearer token to an admin role since
511/// codetether-agent uses a single shared token model.
512async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
513    let path = request.uri().path().to_string();
514    let method = request.method().as_str().to_string();
515
516    let permission = match match_policy_rule(&path, &method) {
517        None | Some("") => return Ok(next.run(request).await),
518        Some(perm) => perm,
519    };
520
521    // The current auth model uses a single static token for all access.
522    // When this is the case, the authenticated user effectively has admin role.
523    // Future: extract user claims from JWT and build a proper PolicyUser.
524    let user = policy::PolicyUser {
525        user_id: "bearer-token-user".to_string(),
526        roles: vec!["admin".to_string()],
527        tenant_id: None,
528        scopes: vec![],
529        auth_source: "static_token".to_string(),
530    };
531
532    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
533        tracing::warn!(
534            path = %path,
535            method = %method,
536            permission = %permission,
537            "Policy middleware denied request"
538        );
539        return Err(status);
540    }
541
542    Ok(next.run(request).await)
543}
544
545/// Start the HTTP server
546pub async fn serve(args: ServeArgs) -> Result<()> {
547    let t0 = std::time::Instant::now();
548    tracing::info!("[startup] begin");
549    let config = Config::load().await?;
550    tracing::info!(
551        elapsed_ms = t0.elapsed().as_millis(),
552        "[startup] config loaded"
553    );
554    let mut cognition = CognitionRuntime::new_from_env();
555    tracing::info!(
556        elapsed_ms = t0.elapsed().as_millis(),
557        "[startup] cognition runtime created"
558    );
559
560    // Set up tool registry for cognition execution engine.
561    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
562    tracing::info!(
563        elapsed_ms = t0.elapsed().as_millis(),
564        "[startup] tools registered"
565    );
566    let cognition = Arc::new(cognition);
567
568    // Initialize audit log.
569    let audit_log = AuditLog::from_env();
570    let _ = audit::init_audit_log(audit_log.clone());
571
572    // Initialize K8s manager.
573    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
574    let k8s = Arc::new(K8sManager::new().await);
575    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
576    if k8s.is_available() {
577        tracing::info!("K8s self-deployment enabled");
578    }
579
580    // Initialize mandatory auth.
581    let auth_state = AuthState::from_env();
582    tracing::info!(
583        token_len = auth_state.token().len(),
584        "Auth is mandatory. Token required for all API endpoints."
585    );
586    tracing::info!(
587        audit_entries = audit_log.count().await,
588        "Audit log initialized"
589    );
590
591    // Create agent bus for in-process communication
592    let bus = AgentBus::new().into_arc();
593
594    // Auto-start S3 sink if MinIO is configured (set MINIO_ENDPOINT to enable)
595    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
596
597    tracing::info!(
598        elapsed_ms = t0.elapsed().as_millis(),
599        "[startup] bus created"
600    );
601
602    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
603        tracing::info!(
604            elapsed_ms = t0.elapsed().as_millis(),
605            "[startup] auto-starting cognition"
606        );
607        if let Err(error) = cognition.start(None).await {
608            tracing::warn!(%error, "Failed to auto-start cognition loop");
609        } else {
610            tracing::info!("Perpetual cognition auto-started");
611        }
612    }
613
614    tracing::info!(
615        elapsed_ms = t0.elapsed().as_millis(),
616        "[startup] building routes"
617    );
618    let addr = format!("{}:{}", args.hostname, args.port);
619
620    // Build the agent card
621    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
622    let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
623
624    // Build A2A router separately
625    let a2a_router = a2a_server.router();
626
627    // Start gRPC transport on a separate port
628    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
629        .ok()
630        .and_then(|p| p.parse::<u16>().ok())
631        .unwrap_or(50051);
632    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
633    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
634    let grpc_service = grpc_store.into_service();
635    let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
636    tokio::spawn(async move {
637        tracing::info!("gRPC A2A server listening on {}", grpc_addr);
638
639        // Configure CORS for marketing site (production + local dev)
640        let allowed_origins = [
641            HeaderValue::from_static("https://codetether.run"),
642            HeaderValue::from_static("https://api.codetether.run"),
643            HeaderValue::from_static("https://docs.codetether.run"),
644            HeaderValue::from_static("https://codetether.com"),
645            HeaderValue::from_static("http://localhost:3000"),
646            HeaderValue::from_static("http://localhost:3001"),
647        ];
648        let cors = CorsLayer::new()
649            .allow_origin(AllowOrigin::list(allowed_origins))
650            .allow_methods(AllowMethods::any())
651            .allow_headers(AllowHeaders::any())
652            .expose_headers(tower_http::cors::ExposeHeaders::list([
653                http::header::HeaderName::from_static("grpc-status"),
654                http::header::HeaderName::from_static("grpc-message"),
655            ]));
656
657        if let Err(e) = tonic::transport::Server::builder()
658            .accept_http1(true)
659            .layer(cors)
660            .layer(GrpcWebLayer::new())
661            .add_service(grpc_service)
662            .add_service(voice_service)
663            .serve(grpc_addr)
664            .await
665        {
666            tracing::error!("gRPC server error: {}", e);
667        }
668    });
669
670    let state = AppState {
671        config: Arc::new(config),
672        cognition,
673        audit_log,
674        k8s,
675        auth: auth_state.clone(),
676        bus,
677        knative_tasks: KnativeTaskQueue::new(),
678        tool_registry: ToolRegistry::new(),
679    };
680
681    // Spawn the tool reaper background task (runs every 15s to clean up expired tools)
682    let tool_registry = state.tool_registry.clone();
683    tokio::spawn(async move {
684        let mut interval = tokio::time::interval(Duration::from_secs(15));
685        loop {
686            interval.tick().await;
687            tool_registry.cleanup().await;
688            tracing::debug!("Tool reaper ran cleanup");
689        }
690    });
691
692    let app = Router::new()
693        // Health check (public — auth exempt)
694        .route("/health", get(health))
695        // CloudEvent receiver for Knative Eventing (public — auth exempt)
696        .route("/task", post(receive_task_event))
697        // Knative task queue APIs
698        .route("/v1/knative/tasks", get(list_knative_tasks))
699        .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
700        .route(
701            "/v1/knative/tasks/{task_id}/claim",
702            post(claim_knative_task),
703        )
704        .route(
705            "/v1/knative/tasks/{task_id}/complete",
706            post(complete_knative_task),
707        )
708        // API routes
709        .route("/api/version", get(get_version))
710        .route("/api/session", get(list_sessions).post(create_session))
711        .route("/api/session/{id}", get(get_session))
712        .route("/api/session/{id}/prompt", post(prompt_session))
713        .route("/api/config", get(get_config))
714        .route("/api/provider", get(list_providers))
715        .route("/api/agent", get(list_agents))
716        // OpenAI-compatible APIs
717        .route("/v1/models", get(list_openai_models))
718        .route("/v1/chat/completions", post(openai_chat_completions))
719        // Perpetual cognition APIs
720        .route("/v1/cognition/start", post(start_cognition))
721        .route("/v1/cognition/stop", post(stop_cognition))
722        .route("/v1/cognition/status", get(get_cognition_status))
723        .route("/v1/cognition/stream", get(stream_cognition))
724        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
725        // Swarm persona lifecycle APIs
726        .route("/v1/swarm/personas", post(create_persona))
727        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
728        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
729        .route("/v1/swarm/lineage", get(get_swarm_lineage))
730        // Belief, attention, governance, workspace APIs
731        .route("/v1/cognition/beliefs", get(list_beliefs))
732        .route("/v1/cognition/beliefs/{id}", get(get_belief))
733        .route("/v1/cognition/attention", get(list_attention))
734        .route("/v1/cognition/proposals", get(list_proposals))
735        .route(
736            "/v1/cognition/proposals/{id}/approve",
737            post(approve_proposal),
738        )
739        .route("/v1/cognition/receipts", get(list_receipts))
740        .route("/v1/cognition/workspace", get(get_workspace))
741        .route("/v1/cognition/governance", get(get_governance))
742        .route("/v1/cognition/personas/{id}", get(get_persona))
743        // Audit trail API
744        .route("/v1/audit", get(list_audit_entries))
745        // Event stream replay API (for SOC 2, FedRAMP, ATO compliance)
746        .route("/v1/audit/replay", get(replay_session_events))
747        .route("/v1/audit/replay/index", get(list_session_event_files))
748        // K8s self-deployment APIs
749        .route("/v1/k8s/status", get(get_k8s_status))
750        .route("/v1/k8s/scale", post(k8s_scale))
751        .route("/v1/k8s/restart", post(k8s_restart))
752        .route("/v1/k8s/pods", get(k8s_list_pods))
753        .route("/v1/k8s/actions", get(k8s_actions))
754        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
755        .route(
756            "/v1/k8s/subagent/{id}",
757            axum::routing::delete(k8s_delete_subagent),
758        )
759        // Plugin registry API
760        .route("/v1/plugins", get(list_plugins))
761        // Tool registry API
762        .route("/v1/tools", get(list_tools))
763        .route("/v1/tools/register", post(register_tool))
764        .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
765        // MCP compatibility alias (marketing site SDK expects /mcp/v1/tools)
766        .route("/mcp/v1/tools", get(list_tools))
767        // Agent task APIs (compatibility surface for dashboard)
768        .route(
769            "/v1/agent/tasks",
770            get(list_agent_tasks).post(create_agent_task),
771        )
772        .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
773        .route(
774            "/v1/agent/tasks/{task_id}/output",
775            get(get_agent_task_output),
776        )
777        .route(
778            "/v1/agent/tasks/{task_id}/output/stream",
779            get(stream_agent_task_output),
780        )
781        // Worker connectivity (dashboard polls this)
782        .route("/v1/worker/connected", get(list_connected_workers))
783        .route("/v1/agent/workers", get(list_connected_workers))
784        // Task dispatch (Knative-backed)
785        .route("/v1/tasks/dispatch", post(dispatch_task))
786        // Voice REST bridge (dashboard expects REST, server has gRPC)
787        .route("/v1/voice/sessions", post(create_voice_session_rest))
788        .route(
789            "/v1/voice/sessions/{room_name}",
790            get(get_voice_session_rest).delete(delete_voice_session_rest),
791        )
792        .route("/v1/voice/voices", get(list_voices_rest))
793        // Session resume (dashboard uses this for codebase-scoped resume)
794        .route(
795            "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
796            post(resume_codebase_session),
797        )
798        // Agent Bus — SSE stream + publish
799        .route("/v1/bus/stream", get(stream_bus_events))
800        .with_state(state.clone())
801        // A2A routes (nested to work with different state type)
802        .nest("/a2a", a2a_router)
803        // Mandatory auth middleware — applies to all routes
804        .layer(middleware::from_fn_with_state(
805            state.clone(),
806            audit_middleware,
807        ))
808        .layer(middleware::from_fn(policy_middleware))
809        .layer(middleware::from_fn(auth::require_auth))
810        .layer(axum::Extension(state.auth.clone()))
811        // CORS + tracing — explicit origin allowlist so headers are present on
812        // ALL responses (including 4xx/5xx) and not dependent on request mirroring.
813        .layer(
814            CorsLayer::new()
815                .allow_origin([
816                    "https://codetether.run".parse::<HeaderValue>().unwrap(),
817                    "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
818                    "https://docs.codetether.run"
819                        .parse::<HeaderValue>()
820                        .unwrap(),
821                    "https://codetether.com".parse::<HeaderValue>().unwrap(),
822                    "http://localhost:3000".parse::<HeaderValue>().unwrap(),
823                    "http://localhost:3001".parse::<HeaderValue>().unwrap(),
824                    "http://localhost:8000".parse::<HeaderValue>().unwrap(),
825                ])
826                .allow_credentials(true)
827                .allow_methods(AllowMethods::any())
828                .allow_headers(AllowHeaders::any()),
829        )
830        .layer(TraceLayer::new_for_http());
831
832    tracing::info!(
833        elapsed_ms = t0.elapsed().as_millis(),
834        "[startup] router built, binding"
835    );
836    let listener = tokio::net::TcpListener::bind(&addr).await?;
837    tracing::info!(
838        elapsed_ms = t0.elapsed().as_millis(),
839        "[startup] listening on http://{}",
840        addr
841    );
842
843    axum::serve(listener, app).await?;
844
845    Ok(())
846}
847
848/// Health check response
849async fn health() -> &'static str {
850    "ok"
851}
852
853/// List all Knative tasks in queue
854async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
855    Json(state.knative_tasks.list().await)
856}
857
858/// Get a specific Knative task by ID
859async fn get_knative_task(
860    State(state): State<AppState>,
861    Path(task_id): Path<String>,
862) -> Result<Json<KnativeTask>, (StatusCode, String)> {
863    state
864        .knative_tasks
865        .get(&task_id)
866        .await
867        .map(Json)
868        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
869}
870
871/// Claim/start processing a Knative task
872async fn claim_knative_task(
873    State(state): State<AppState>,
874    Path(task_id): Path<String>,
875) -> Result<Json<KnativeTask>, (StatusCode, String)> {
876    // Update status to processing
877    let updated = state
878        .knative_tasks
879        .update_status(&task_id, "processing")
880        .await;
881    if !updated {
882        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
883    }
884
885    state
886        .knative_tasks
887        .get(&task_id)
888        .await
889        .map(Json)
890        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
891}
892
893/// Complete a Knative task
894async fn complete_knative_task(
895    State(state): State<AppState>,
896    Path(task_id): Path<String>,
897) -> Result<Json<KnativeTask>, (StatusCode, String)> {
898    // Update status to completed
899    let updated = state
900        .knative_tasks
901        .update_status(&task_id, "completed")
902        .await;
903    if !updated {
904        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
905    }
906
907    state
908        .knative_tasks
909        .get(&task_id)
910        .await
911        .map(Json)
912        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
913}
914
915/// CloudEvent handler for Knative Eventing
916/// Receives task events from Knative Broker and triggers execution
917async fn receive_task_event(
918    State(state): State<AppState>,
919    Json(event): Json<CloudEvent>,
920) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
921    tracing::info!(
922        event_type = %event.event_type,
923        event_id = %event.id,
924        "Received CloudEvent from Knative"
925    );
926
927    // Log the incoming event for audit
928    state
929        .audit_log
930        .log(
931            audit::AuditCategory::Api,
932            format!("cloudevents:{}", event.event_type),
933            AuditOutcome::Success,
934            None,
935            None,
936        )
937        .await;
938
939    // Process based on event type
940    match event.event_type.as_str() {
941        "codetether.task.created" | "task.created" => {
942            // Extract task data and queue for execution
943            if let Some(data) = event.data {
944                let task_id = data
945                    .get("task_id")
946                    .and_then(|v| v.as_str())
947                    .unwrap_or("unknown")
948                    .to_string();
949                let title = data
950                    .get("title")
951                    .and_then(|v| v.as_str())
952                    .unwrap_or("")
953                    .to_string();
954                let description = data
955                    .get("description")
956                    .and_then(|v| v.as_str())
957                    .unwrap_or("")
958                    .to_string();
959                let agent_type = data
960                    .get("agent_type")
961                    .and_then(|v| v.as_str())
962                    .unwrap_or("build")
963                    .to_string();
964                let priority = data.get("priority").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
965
966                let task = KnativeTask {
967                    task_id: task_id.clone(),
968                    title,
969                    description,
970                    agent_type,
971                    priority,
972                    received_at: chrono::Utc::now(),
973                    status: "queued".to_string(),
974                };
975
976                state.knative_tasks.push(task).await;
977                tracing::info!(task_id = %task_id, "Task queued for execution");
978            }
979        }
980        "codetether.task.cancelled" => {
981            tracing::info!("Task cancellation event received");
982            // Update task status if we have the task_id
983            if let Some(data) = event.data {
984                if let Some(task_id) = data.get("task_id").and_then(|v| v.as_str()) {
985                    let _ = state
986                        .knative_tasks
987                        .update_status(task_id, "cancelled")
988                        .await;
989                    tracing::info!(task_id = %task_id, "Task cancelled");
990                }
991            }
992        }
993        _ => {
994            tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
995        }
996    }
997
998    Ok(Json(CloudEventResponse {
999        status: "accepted".to_string(),
1000        event_id: event.id,
1001    }))
1002}
1003
1004/// CloudEvent structure (CloudEvents v1.0 spec)
1005#[derive(Deserialize, Serialize)]
1006struct CloudEvent {
1007    /// Event unique identifier
1008    id: String,
1009    /// Event source (e.g., knative://broker/a2a-server)
1010    source: String,
1011    /// Event type (e.g., codetether.task.created)
1012    #[serde(rename = "type")]
1013    event_type: String,
1014    /// Event timestamp (RFC 3339)
1015    #[serde(rename = "time")]
1016    timestamp: Option<String>,
1017    /// CloudEvents spec version
1018    #[serde(rename = "specversion")]
1019    spec_version: Option<String>,
1020    /// Event data payload
1021    data: Option<serde_json::Value>,
1022}
1023
1024/// Response to CloudEvent acknowledgment
1025#[derive(Serialize)]
1026struct CloudEventResponse {
1027    status: String,
1028    event_id: String,
1029}
1030
1031/// Version info
1032#[derive(Serialize)]
1033struct VersionInfo {
1034    version: &'static str,
1035    name: &'static str,
1036    binary_hash: Option<String>,
1037}
1038
1039async fn get_version() -> Json<VersionInfo> {
1040    let binary_hash = std::env::current_exe()
1041        .ok()
1042        .and_then(|p| hash_file(&p).ok());
1043    Json(VersionInfo {
1044        version: env!("CARGO_PKG_VERSION"),
1045        name: env!("CARGO_PKG_NAME"),
1046        binary_hash,
1047    })
1048}
1049
1050/// List sessions
1051#[derive(Deserialize)]
1052struct ListSessionsQuery {
1053    limit: Option<usize>,
1054    offset: Option<usize>,
1055}
1056
1057async fn list_sessions(
1058    Query(query): Query<ListSessionsQuery>,
1059) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1060    let sessions = crate::session::list_sessions()
1061        .await
1062        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1063
1064    let offset = query.offset.unwrap_or(0);
1065    let limit = query.limit.unwrap_or(100);
1066    Ok(Json(
1067        sessions.into_iter().skip(offset).take(limit).collect(),
1068    ))
1069}
1070
1071/// Create a new session
1072#[derive(Deserialize)]
1073struct CreateSessionRequest {
1074    title: Option<String>,
1075    agent: Option<String>,
1076}
1077
1078async fn create_session(
1079    Json(req): Json<CreateSessionRequest>,
1080) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1081    let mut session = crate::session::Session::new()
1082        .await
1083        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1084
1085    session.title = req.title;
1086    if let Some(agent) = req.agent {
1087        session.agent = agent;
1088    }
1089
1090    session
1091        .save()
1092        .await
1093        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1094
1095    Ok(Json(session))
1096}
1097
1098/// Get a session by ID
1099async fn get_session(
1100    axum::extract::Path(id): axum::extract::Path<String>,
1101) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1102    let session = crate::session::Session::load(&id)
1103        .await
1104        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1105
1106    Ok(Json(session))
1107}
1108
1109/// Prompt a session
1110#[derive(Deserialize)]
1111struct PromptRequest {
1112    message: String,
1113}
1114
1115async fn prompt_session(
1116    axum::extract::Path(id): axum::extract::Path<String>,
1117    Json(req): Json<PromptRequest>,
1118) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1119    // Validate the message is not empty
1120    if req.message.trim().is_empty() {
1121        return Err((
1122            StatusCode::BAD_REQUEST,
1123            "Message cannot be empty".to_string(),
1124        ));
1125    }
1126
1127    // Log the prompt request (uses the message field)
1128    tracing::info!(
1129        session_id = %id,
1130        message_len = req.message.len(),
1131        "Received prompt request"
1132    );
1133
1134    // TODO: Implement actual prompting
1135    Err((
1136        StatusCode::NOT_IMPLEMENTED,
1137        "Prompt execution not yet implemented".to_string(),
1138    ))
1139}
1140
1141/// Get configuration
1142async fn get_config(State(state): State<AppState>) -> Json<Config> {
1143    Json((*state.config).clone())
1144}
1145
1146/// List providers
1147async fn list_providers() -> Json<Vec<String>> {
1148    Json(vec![
1149        "openai".to_string(),
1150        "anthropic".to_string(),
1151        "google".to_string(),
1152    ])
1153}
1154
1155/// List agents
1156async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1157    let registry = crate::agent::AgentRegistry::with_builtins();
1158    Json(registry.list().into_iter().cloned().collect())
1159}
1160
1161type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1162type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1163
1164#[derive(Debug, Deserialize)]
1165struct OpenAiChatCompletionRequest {
1166    model: String,
1167    messages: Vec<OpenAiRequestMessage>,
1168    #[serde(default)]
1169    tools: Vec<OpenAiRequestTool>,
1170    temperature: Option<f32>,
1171    top_p: Option<f32>,
1172    max_tokens: Option<usize>,
1173    max_completion_tokens: Option<usize>,
1174    stop: Option<OpenAiStop>,
1175    stream: Option<bool>,
1176}
1177
1178#[derive(Debug, Deserialize)]
1179#[serde(untagged)]
1180enum OpenAiStop {
1181    Single(String),
1182    Multiple(Vec<String>),
1183}
1184
1185#[derive(Debug, Deserialize)]
1186struct OpenAiRequestMessage {
1187    role: String,
1188    #[serde(default)]
1189    content: Option<serde_json::Value>,
1190    #[serde(default)]
1191    tool_call_id: Option<String>,
1192    #[serde(default)]
1193    tool_calls: Vec<OpenAiRequestToolCall>,
1194}
1195
1196#[derive(Debug, Deserialize)]
1197struct OpenAiRequestToolCall {
1198    id: String,
1199    #[serde(rename = "type", default = "default_function_type")]
1200    kind: String,
1201    function: OpenAiRequestToolCallFunction,
1202}
1203
1204#[derive(Debug, Deserialize)]
1205struct OpenAiRequestToolCallFunction {
1206    name: String,
1207    #[serde(default = "default_json_object_string")]
1208    arguments: String,
1209}
1210
1211#[derive(Debug, Deserialize)]
1212struct OpenAiRequestTool {
1213    #[serde(rename = "type", default = "default_function_type")]
1214    kind: String,
1215    function: OpenAiRequestToolDefinition,
1216}
1217
1218#[derive(Debug, Deserialize)]
1219struct OpenAiRequestToolDefinition {
1220    name: String,
1221    #[serde(default)]
1222    description: String,
1223    #[serde(default = "default_json_object")]
1224    parameters: serde_json::Value,
1225}
1226
1227#[derive(Debug, Serialize)]
1228struct OpenAiModelsResponse {
1229    object: String,
1230    data: Vec<OpenAiModel>,
1231}
1232
1233#[derive(Debug, Serialize)]
1234struct OpenAiModel {
1235    id: String,
1236    object: String,
1237    created: i64,
1238    owned_by: String,
1239}
1240
1241#[derive(Debug, Serialize)]
1242struct OpenAiChatCompletionResponse {
1243    id: String,
1244    object: String,
1245    created: i64,
1246    model: String,
1247    choices: Vec<OpenAiChatCompletionChoice>,
1248    usage: OpenAiUsage,
1249}
1250
1251#[derive(Debug, Serialize)]
1252struct OpenAiChatCompletionChoice {
1253    index: usize,
1254    message: OpenAiChatCompletionMessage,
1255    finish_reason: String,
1256}
1257
1258#[derive(Debug, Serialize)]
1259struct OpenAiChatCompletionMessage {
1260    role: String,
1261    #[serde(skip_serializing_if = "Option::is_none")]
1262    content: Option<String>,
1263    #[serde(skip_serializing_if = "Option::is_none")]
1264    tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1265}
1266
1267#[derive(Debug, Serialize)]
1268struct OpenAiResponseToolCall {
1269    id: String,
1270    #[serde(rename = "type")]
1271    kind: String,
1272    function: OpenAiResponseToolCallFunction,
1273}
1274
1275#[derive(Debug, Serialize)]
1276struct OpenAiResponseToolCallFunction {
1277    name: String,
1278    arguments: String,
1279}
1280
1281#[derive(Debug, Serialize)]
1282struct OpenAiUsage {
1283    prompt_tokens: usize,
1284    completion_tokens: usize,
1285    total_tokens: usize,
1286}
1287
1288fn default_function_type() -> String {
1289    "function".to_string()
1290}
1291
1292fn default_json_object() -> serde_json::Value {
1293    serde_json::json!({})
1294}
1295
1296fn default_json_object_string() -> String {
1297    "{}".to_string()
1298}
1299
1300fn openai_error(
1301    status: StatusCode,
1302    message: impl Into<String>,
1303    error_type: &str,
1304    code: &str,
1305) -> OpenAiApiError {
1306    (
1307        status,
1308        Json(serde_json::json!({
1309            "error": {
1310                "message": message.into(),
1311                "type": error_type,
1312                "code": code,
1313            }
1314        })),
1315    )
1316}
1317
1318fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1319    openai_error(
1320        StatusCode::BAD_REQUEST,
1321        message,
1322        "invalid_request_error",
1323        "invalid_request",
1324    )
1325}
1326
1327fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1328    openai_error(
1329        StatusCode::INTERNAL_SERVER_ERROR,
1330        message,
1331        "server_error",
1332        "internal_error",
1333    )
1334}
1335
1336fn canonicalize_provider_name(provider: &str) -> &str {
1337    if provider == "zhipuai" {
1338        "zai"
1339    } else {
1340        provider
1341    }
1342}
1343
1344fn normalize_model_reference(model: &str) -> String {
1345    let trimmed = model.trim();
1346    if let Some((provider, model_id)) = trimmed.split_once(':')
1347        && !provider.is_empty()
1348        && !model_id.is_empty()
1349        && !trimmed.contains('/')
1350    {
1351        return format!("{provider}/{model_id}");
1352    }
1353    trimmed.to_string()
1354}
1355
1356fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1357    let provider = canonicalize_provider_name(provider);
1358    let trimmed = model_id.trim_start_matches('/');
1359    if trimmed.starts_with(&format!("{provider}/")) {
1360        trimmed.to_string()
1361    } else {
1362        format!("{provider}/{trimmed}")
1363    }
1364}
1365
1366fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1367    let obj = value.as_object()?;
1368    let part_type = obj
1369        .get("type")
1370        .and_then(serde_json::Value::as_str)
1371        .unwrap_or("text");
1372
1373    match part_type {
1374        "text" | "input_text" => obj
1375            .get("text")
1376            .and_then(serde_json::Value::as_str)
1377            .map(|text| crate::provider::ContentPart::Text {
1378                text: text.to_string(),
1379            }),
1380        "image_url" => obj
1381            .get("image_url")
1382            .and_then(serde_json::Value::as_object)
1383            .and_then(|image| image.get("url"))
1384            .and_then(serde_json::Value::as_str)
1385            .map(|url| crate::provider::ContentPart::Image {
1386                url: url.to_string(),
1387                mime_type: None,
1388            }),
1389        _ => obj
1390            .get("text")
1391            .and_then(serde_json::Value::as_str)
1392            .map(|text| crate::provider::ContentPart::Text {
1393                text: text.to_string(),
1394            }),
1395    }
1396}
1397
1398fn parse_openai_content_parts(
1399    content: &Option<serde_json::Value>,
1400) -> Vec<crate::provider::ContentPart> {
1401    let Some(value) = content else {
1402        return Vec::new();
1403    };
1404
1405    match value {
1406        serde_json::Value::String(text) => {
1407            vec![crate::provider::ContentPart::Text { text: text.clone() }]
1408        }
1409        serde_json::Value::Array(parts) => {
1410            parts.iter().filter_map(parse_openai_content_part).collect()
1411        }
1412        serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1413        _ => Vec::new(),
1414    }
1415}
1416
1417fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1418    parse_openai_content_parts(content)
1419        .into_iter()
1420        .filter_map(|part| match part {
1421            crate::provider::ContentPart::Text { text } => Some(text),
1422            _ => None,
1423        })
1424        .collect::<Vec<_>>()
1425        .join("\n")
1426}
1427
1428fn convert_openai_messages(
1429    messages: &[OpenAiRequestMessage],
1430) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1431    let mut converted = Vec::with_capacity(messages.len());
1432
1433    for (index, message) in messages.iter().enumerate() {
1434        let role = message.role.to_ascii_lowercase();
1435        match role.as_str() {
1436            "system" | "user" => {
1437                let content = parse_openai_content_parts(&message.content);
1438                if content.is_empty() {
1439                    return Err(openai_bad_request(format!(
1440                        "messages[{index}] must include text content"
1441                    )));
1442                }
1443
1444                converted.push(crate::provider::Message {
1445                    role: if role == "system" {
1446                        crate::provider::Role::System
1447                    } else {
1448                        crate::provider::Role::User
1449                    },
1450                    content,
1451                });
1452            }
1453            "assistant" => {
1454                let mut content = parse_openai_content_parts(&message.content);
1455                for tool_call in &message.tool_calls {
1456                    if tool_call.kind != "function" {
1457                        return Err(openai_bad_request(format!(
1458                            "messages[{index}].tool_calls only support `function`"
1459                        )));
1460                    }
1461                    if tool_call.function.name.trim().is_empty() {
1462                        return Err(openai_bad_request(format!(
1463                            "messages[{index}].tool_calls[].function.name is required"
1464                        )));
1465                    }
1466
1467                    content.push(crate::provider::ContentPart::ToolCall {
1468                        id: tool_call.id.clone(),
1469                        name: tool_call.function.name.clone(),
1470                        arguments: tool_call.function.arguments.clone(),
1471                        thought_signature: None,
1472                    });
1473                }
1474
1475                if content.is_empty() {
1476                    return Err(openai_bad_request(format!(
1477                        "messages[{index}] must include `content` or `tool_calls` for assistant role"
1478                    )));
1479                }
1480
1481                converted.push(crate::provider::Message {
1482                    role: crate::provider::Role::Assistant,
1483                    content,
1484                });
1485            }
1486            "tool" => {
1487                let tool_call_id = message
1488                    .tool_call_id
1489                    .as_ref()
1490                    .map(|s| s.trim())
1491                    .filter(|s| !s.is_empty())
1492                    .ok_or_else(|| {
1493                        openai_bad_request(format!(
1494                            "messages[{index}].tool_call_id is required for tool role"
1495                        ))
1496                    })?
1497                    .to_string();
1498
1499                converted.push(crate::provider::Message {
1500                    role: crate::provider::Role::Tool,
1501                    content: vec![crate::provider::ContentPart::ToolResult {
1502                        tool_call_id,
1503                        content: parse_openai_tool_content(&message.content),
1504                    }],
1505                });
1506            }
1507            _ => {
1508                return Err(openai_bad_request(format!(
1509                    "messages[{index}].role `{}` is not supported",
1510                    message.role
1511                )));
1512            }
1513        }
1514    }
1515
1516    Ok(converted)
1517}
1518
1519fn convert_openai_tools(
1520    tools: &[OpenAiRequestTool],
1521) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1522    let mut converted = Vec::with_capacity(tools.len());
1523
1524    for (index, tool) in tools.iter().enumerate() {
1525        if tool.kind != "function" {
1526            return Err(openai_bad_request(format!(
1527                "tools[{index}].type `{}` is not supported",
1528                tool.kind
1529            )));
1530        }
1531        if tool.function.name.trim().is_empty() {
1532            return Err(openai_bad_request(format!(
1533                "tools[{index}].function.name is required"
1534            )));
1535        }
1536
1537        converted.push(crate::provider::ToolDefinition {
1538            name: tool.function.name.clone(),
1539            description: tool.function.description.clone(),
1540            parameters: tool.function.parameters.clone(),
1541        });
1542    }
1543
1544    Ok(converted)
1545}
1546
1547fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1548    match reason {
1549        crate::provider::FinishReason::Stop => "stop",
1550        crate::provider::FinishReason::Length => "length",
1551        crate::provider::FinishReason::ToolCalls => "tool_calls",
1552        crate::provider::FinishReason::ContentFilter => "content_filter",
1553        crate::provider::FinishReason::Error => "error",
1554    }
1555}
1556
1557fn convert_response_message(
1558    message: &crate::provider::Message,
1559) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1560    let mut texts = Vec::new();
1561    let mut tool_calls = Vec::new();
1562
1563    for part in &message.content {
1564        match part {
1565            crate::provider::ContentPart::Text { text } => {
1566                if !text.is_empty() {
1567                    texts.push(text.clone());
1568                }
1569            }
1570            crate::provider::ContentPart::ToolCall {
1571                id,
1572                name,
1573                arguments,
1574                ..
1575            } => {
1576                tool_calls.push(OpenAiResponseToolCall {
1577                    id: id.clone(),
1578                    kind: "function".to_string(),
1579                    function: OpenAiResponseToolCallFunction {
1580                        name: name.clone(),
1581                        arguments: arguments.clone(),
1582                    },
1583                });
1584            }
1585            _ => {}
1586        }
1587    }
1588
1589    let content = if texts.is_empty() {
1590        None
1591    } else {
1592        Some(texts.join("\n"))
1593    };
1594    let tool_calls = if tool_calls.is_empty() {
1595        None
1596    } else {
1597        Some(tool_calls)
1598    };
1599
1600    (content, tool_calls)
1601}
1602
1603fn openai_stream_chunk(
1604    id: &str,
1605    created: i64,
1606    model: &str,
1607    delta: serde_json::Value,
1608    finish_reason: Option<&str>,
1609) -> serde_json::Value {
1610    let finish_reason = finish_reason
1611        .map(|value| serde_json::Value::String(value.to_string()))
1612        .unwrap_or(serde_json::Value::Null);
1613
1614    serde_json::json!({
1615        "id": id,
1616        "object": "chat.completion.chunk",
1617        "created": created,
1618        "model": model,
1619        "choices": [{
1620            "index": 0,
1621            "delta": delta,
1622            "finish_reason": finish_reason,
1623        }]
1624    })
1625}
1626
1627fn openai_stream_event(payload: &serde_json::Value) -> Event {
1628    Event::default().data(payload.to_string())
1629}
1630
1631async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1632    let registry = crate::provider::ProviderRegistry::from_vault()
1633        .await
1634        .map_err(|error| {
1635            tracing::error!(error = %error, "Failed to load providers from Vault");
1636            openai_internal_error(format!("failed to load providers: {error}"))
1637        })?;
1638
1639    let model_futures = registry.list().into_iter().map(|provider_id| {
1640        let provider_id = provider_id.to_string();
1641        let provider = registry.get(&provider_id);
1642
1643        async move {
1644            let Some(provider) = provider else {
1645                return Vec::new();
1646            };
1647
1648            let now = chrono::Utc::now().timestamp();
1649            match provider.list_models().await {
1650                Ok(models) => models
1651                    .into_iter()
1652                    .map(|model| OpenAiModel {
1653                        id: make_openai_model_id(&provider_id, &model.id),
1654                        object: "model".to_string(),
1655                        created: now,
1656                        owned_by: canonicalize_provider_name(&provider_id).to_string(),
1657                    })
1658                    .collect(),
1659                Err(error) => {
1660                    tracing::warn!(
1661                        provider = %provider_id,
1662                        error = %error,
1663                        "Failed to list models for provider"
1664                    );
1665                    Vec::new()
1666                }
1667            }
1668        }
1669    });
1670
1671    let mut data: Vec<OpenAiModel> = join_all(model_futures)
1672        .await
1673        .into_iter()
1674        .flatten()
1675        .collect();
1676    data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1677
1678    Ok(Json(OpenAiModelsResponse {
1679        object: "list".to_string(),
1680        data,
1681    }))
1682}
1683
1684async fn openai_chat_completions(
1685    Json(req): Json<OpenAiChatCompletionRequest>,
1686) -> Result<Response, OpenAiApiError> {
1687    let is_stream = req.stream.unwrap_or(false);
1688    if req.model.trim().is_empty() {
1689        return Err(openai_bad_request("`model` is required"));
1690    }
1691    if req.messages.is_empty() {
1692        return Err(openai_bad_request("`messages` must not be empty"));
1693    }
1694
1695    let registry = crate::provider::ProviderRegistry::from_vault()
1696        .await
1697        .map_err(|error| {
1698            tracing::error!(error = %error, "Failed to load providers from Vault");
1699            openai_internal_error(format!("failed to load providers: {error}"))
1700        })?;
1701
1702    let providers = registry.list();
1703    if providers.is_empty() {
1704        return Err(openai_error(
1705            StatusCode::SERVICE_UNAVAILABLE,
1706            "No providers are configured in Vault",
1707            "server_error",
1708            "no_providers",
1709        ));
1710    }
1711
1712    let normalized_model = normalize_model_reference(&req.model);
1713    let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1714    let model_id = if maybe_provider.is_some() {
1715        if model_id.trim().is_empty() {
1716            return Err(openai_bad_request(
1717                "Model must be in `provider/model` format",
1718            ));
1719        }
1720        model_id.to_string()
1721    } else {
1722        req.model.trim().to_string()
1723    };
1724
1725    let selected_provider = if let Some(provider_name) = maybe_provider {
1726        let provider_name = canonicalize_provider_name(provider_name);
1727        if providers.contains(&provider_name) {
1728            provider_name.to_string()
1729        } else {
1730            return Err(openai_bad_request(format!(
1731                "Provider `{provider_name}` is not configured in Vault"
1732            )));
1733        }
1734    } else if providers.len() == 1 {
1735        providers[0].to_string()
1736    } else if providers.contains(&"openai") {
1737        "openai".to_string()
1738    } else {
1739        return Err(openai_bad_request(
1740            "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1741        ));
1742    };
1743
1744    let provider = registry.get(&selected_provider).ok_or_else(|| {
1745        openai_internal_error(format!(
1746            "Provider `{selected_provider}` was available but could not be loaded"
1747        ))
1748    })?;
1749
1750    let messages = convert_openai_messages(&req.messages)?;
1751    let tools = convert_openai_tools(&req.tools)?;
1752    let stop = match req.stop {
1753        Some(OpenAiStop::Single(stop)) => vec![stop],
1754        Some(OpenAiStop::Multiple(stops)) => stops,
1755        None => Vec::new(),
1756    };
1757
1758    let completion_request = crate::provider::CompletionRequest {
1759        messages,
1760        tools,
1761        model: model_id.clone(),
1762        temperature: req.temperature,
1763        top_p: req.top_p,
1764        max_tokens: req.max_completion_tokens.or(req.max_tokens),
1765        stop,
1766    };
1767
1768    let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1769    let now = chrono::Utc::now().timestamp();
1770    let openai_model = make_openai_model_id(&selected_provider, &model_id);
1771
1772    if is_stream {
1773        let mut provider_stream =
1774            provider
1775                .complete_stream(completion_request)
1776                .await
1777                .map_err(|error| {
1778                    tracing::warn!(
1779                        provider = %selected_provider,
1780                        model = %model_id,
1781                        error = %error,
1782                        "Streaming completion request failed"
1783                    );
1784                    openai_internal_error(error.to_string())
1785                })?;
1786
1787        let stream_id = chat_id.clone();
1788        let stream_model = openai_model.clone();
1789        let event_stream = async_stream::stream! {
1790            let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1791            let mut next_tool_call_index = 0usize;
1792            let mut saw_text = false;
1793            let mut saw_tool_calls = false;
1794
1795            let role_chunk = openai_stream_chunk(
1796                &stream_id,
1797                now,
1798                &stream_model,
1799                serde_json::json!({ "role": "assistant" }),
1800                None,
1801            );
1802            yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1803
1804            while let Some(chunk) = provider_stream.next().await {
1805                match chunk {
1806                    crate::provider::StreamChunk::Text(text) => {
1807                        if text.is_empty() {
1808                            continue;
1809                        }
1810                        saw_text = true;
1811                        let content_chunk = openai_stream_chunk(
1812                            &stream_id,
1813                            now,
1814                            &stream_model,
1815                            serde_json::json!({ "content": text }),
1816                            None,
1817                        );
1818                        yield Ok(openai_stream_event(&content_chunk));
1819                    }
1820                    crate::provider::StreamChunk::ToolCallStart { id, name } => {
1821                        saw_tool_calls = true;
1822                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1823                            let value = next_tool_call_index;
1824                            next_tool_call_index += 1;
1825                            value
1826                        });
1827                        let tool_chunk = openai_stream_chunk(
1828                            &stream_id,
1829                            now,
1830                            &stream_model,
1831                            serde_json::json!({
1832                                "tool_calls": [{
1833                                    "index": index,
1834                                    "id": id,
1835                                    "type": "function",
1836                                    "function": {
1837                                        "name": name,
1838                                        "arguments": "",
1839                                    }
1840                                }]
1841                            }),
1842                            None,
1843                        );
1844                        yield Ok(openai_stream_event(&tool_chunk));
1845                    }
1846                    crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1847                        if arguments_delta.is_empty() {
1848                            continue;
1849                        }
1850                        saw_tool_calls = true;
1851                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1852                            let value = next_tool_call_index;
1853                            next_tool_call_index += 1;
1854                            value
1855                        });
1856                        let tool_delta_chunk = openai_stream_chunk(
1857                            &stream_id,
1858                            now,
1859                            &stream_model,
1860                            serde_json::json!({
1861                                "tool_calls": [{
1862                                    "index": index,
1863                                    "id": id,
1864                                    "type": "function",
1865                                    "function": {
1866                                        "arguments": arguments_delta,
1867                                    }
1868                                }]
1869                            }),
1870                            None,
1871                        );
1872                        yield Ok(openai_stream_event(&tool_delta_chunk));
1873                    }
1874                    crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1875                    crate::provider::StreamChunk::Done { .. } => {}
1876                    crate::provider::StreamChunk::Error(error) => {
1877                        let error_chunk = openai_stream_chunk(
1878                            &stream_id,
1879                            now,
1880                            &stream_model,
1881                            serde_json::json!({ "content": error }),
1882                            Some("error"),
1883                        );
1884                        yield Ok(openai_stream_event(&error_chunk));
1885                        yield Ok(Event::default().data("[DONE]"));
1886                        return;
1887                    }
1888                }
1889            }
1890
1891            let finish_reason = if saw_tool_calls && !saw_text {
1892                "tool_calls"
1893            } else {
1894                "stop"
1895            };
1896            let final_chunk = openai_stream_chunk(
1897                &stream_id,
1898                now,
1899                &stream_model,
1900                serde_json::json!({}),
1901                Some(finish_reason),
1902            );
1903            yield Ok(openai_stream_event(&final_chunk));
1904            yield Ok(Event::default().data("[DONE]"));
1905        };
1906
1907        return Ok(Sse::new(event_stream)
1908            .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1909            .into_response());
1910    }
1911
1912    let completion = provider
1913        .complete(completion_request)
1914        .await
1915        .map_err(|error| {
1916            tracing::warn!(
1917                provider = %selected_provider,
1918                model = %model_id,
1919                error = %error,
1920                "Completion request failed"
1921            );
1922            openai_internal_error(error.to_string())
1923        })?;
1924
1925    let (content, tool_calls) = convert_response_message(&completion.message);
1926
1927    Ok(Json(OpenAiChatCompletionResponse {
1928        id: chat_id,
1929        object: "chat.completion".to_string(),
1930        created: now,
1931        model: openai_model,
1932        choices: vec![OpenAiChatCompletionChoice {
1933            index: 0,
1934            message: OpenAiChatCompletionMessage {
1935                role: "assistant".to_string(),
1936                content,
1937                tool_calls,
1938            },
1939            finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
1940        }],
1941        usage: OpenAiUsage {
1942            prompt_tokens: completion.usage.prompt_tokens,
1943            completion_tokens: completion.usage.completion_tokens,
1944            total_tokens: completion.usage.total_tokens,
1945        },
1946    })
1947    .into_response())
1948}
1949
1950async fn start_cognition(
1951    State(state): State<AppState>,
1952    payload: Option<Json<StartCognitionRequest>>,
1953) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1954    state
1955        .cognition
1956        .start(payload.map(|Json(body)| body))
1957        .await
1958        .map(Json)
1959        .map_err(internal_error)
1960}
1961
1962async fn stop_cognition(
1963    State(state): State<AppState>,
1964    payload: Option<Json<StopCognitionRequest>>,
1965) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1966    let reason = payload.and_then(|Json(body)| body.reason);
1967    state
1968        .cognition
1969        .stop(reason)
1970        .await
1971        .map(Json)
1972        .map_err(internal_error)
1973}
1974
1975async fn get_cognition_status(
1976    State(state): State<AppState>,
1977) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1978    Ok(Json(state.cognition.status().await))
1979}
1980
1981async fn stream_cognition(
1982    State(state): State<AppState>,
1983) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
1984    let rx = state.cognition.subscribe_events();
1985
1986    let event_stream = stream::unfold(rx, |mut rx| async move {
1987        match rx.recv().await {
1988            Ok(event) => {
1989                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
1990                let sse_event = Event::default().event("cognition").data(payload);
1991                Some((Ok(sse_event), rx))
1992            }
1993            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
1994                let lag_event = Event::default()
1995                    .event("lag")
1996                    .data(format!("skipped {}", skipped));
1997                Some((Ok(lag_event), rx))
1998            }
1999            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2000        }
2001    });
2002
2003    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2004}
2005
2006/// Stream bus events as SSE, filtered by JWT topic claims.
2007///
2008/// The JWT must contain a `topics` claim with an array of topic patterns
2009/// to subscribe to. Events are filtered to only include envelopes whose
2010/// topic matches one of the allowed patterns.
2011async fn stream_bus_events(
2012    State(state): State<AppState>,
2013    req: Request<Body>,
2014) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2015    // Extract JWT claims from request extensions (set by auth middleware)
2016    let allowed_topics: Vec<String> = req
2017        .extensions()
2018        .get::<crate::server::auth::JwtClaims>()
2019        .map(|claims| claims.topics.clone())
2020        .unwrap_or_default();
2021
2022    // Subscribe to the bus
2023    let bus_handle = state.bus.handle("stream_bus_events");
2024    let rx = bus_handle.into_receiver();
2025
2026    let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2027        let allowed_topics = allowed_topics.clone();
2028        async move {
2029            match rx.recv().await {
2030                Ok(envelope) => {
2031                    // Filter by allowed topics if any are specified
2032                    let should_send = allowed_topics.is_empty()
2033                        || allowed_topics
2034                            .iter()
2035                            .any(|pattern| topic_matches(&envelope.topic, pattern));
2036
2037                    if should_send {
2038                        let payload =
2039                            serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2040                        let sse_event = Event::default().event("bus").data(payload);
2041                        return Some((Ok(sse_event), rx));
2042                    }
2043
2044                    // Skip this event but keep the receiver
2045                    Some((Ok(Event::default().event("keepalive").data("")), rx))
2046                }
2047                Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2048                    let lag_event = Event::default()
2049                        .event("lag")
2050                        .data(format!("skipped {}", skipped));
2051                    Some((Ok(lag_event), rx))
2052                }
2053                Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2054            }
2055        }
2056    });
2057
2058    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2059}
2060
2061/// Check if a topic matches a pattern.
2062/// Supports wildcards: `agent.*` matches `agent.123`, `agent.*.events` matches `agent.456.events`
2063fn topic_matches(topic: &str, pattern: &str) -> bool {
2064    if pattern == "*" {
2065        return true;
2066    }
2067    if pattern.ends_with(".*") {
2068        let prefix = &pattern[..pattern.len() - 2];
2069        return topic.starts_with(prefix);
2070    }
2071    if pattern.starts_with(".*") {
2072        let suffix = &pattern[2..];
2073        return topic.ends_with(suffix);
2074    }
2075    topic == pattern
2076}
2077
2078async fn get_latest_snapshot(
2079    State(state): State<AppState>,
2080) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2081    match state.cognition.latest_snapshot().await {
2082        Some(snapshot) => Ok(Json(snapshot)),
2083        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2084    }
2085}
2086
2087async fn create_persona(
2088    State(state): State<AppState>,
2089    Json(req): Json<CreatePersonaRequest>,
2090) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2091    state
2092        .cognition
2093        .create_persona(req)
2094        .await
2095        .map(Json)
2096        .map_err(internal_error)
2097}
2098
2099async fn spawn_persona(
2100    State(state): State<AppState>,
2101    Path(id): Path<String>,
2102    Json(req): Json<SpawnPersonaRequest>,
2103) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2104    state
2105        .cognition
2106        .spawn_child(&id, req)
2107        .await
2108        .map(Json)
2109        .map_err(internal_error)
2110}
2111
2112async fn reap_persona(
2113    State(state): State<AppState>,
2114    Path(id): Path<String>,
2115    payload: Option<Json<ReapPersonaRequest>>,
2116) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2117    let req = payload
2118        .map(|Json(body)| body)
2119        .unwrap_or(ReapPersonaRequest {
2120            cascade: Some(false),
2121            reason: None,
2122        });
2123
2124    state
2125        .cognition
2126        .reap_persona(&id, req)
2127        .await
2128        .map(Json)
2129        .map_err(internal_error)
2130}
2131
2132async fn get_swarm_lineage(
2133    State(state): State<AppState>,
2134) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2135    Ok(Json(state.cognition.lineage_graph().await))
2136}
2137
2138// ── Belief, Attention, Governance, Workspace handlers ──
2139
2140#[derive(Deserialize)]
2141struct BeliefFilter {
2142    status: Option<String>,
2143    persona: Option<String>,
2144}
2145
2146async fn list_beliefs(
2147    State(state): State<AppState>,
2148    Query(filter): Query<BeliefFilter>,
2149) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2150    let beliefs = state.cognition.get_beliefs().await;
2151    let mut result: Vec<Belief> = beliefs.into_values().collect();
2152
2153    if let Some(status) = &filter.status {
2154        result.retain(|b| {
2155            let s = serde_json::to_string(&b.status).unwrap_or_default();
2156            s.contains(status)
2157        });
2158    }
2159    if let Some(persona) = &filter.persona {
2160        result.retain(|b| &b.asserted_by == persona);
2161    }
2162
2163    result.sort_by(|a, b| {
2164        b.confidence
2165            .partial_cmp(&a.confidence)
2166            .unwrap_or(std::cmp::Ordering::Equal)
2167    });
2168    Ok(Json(result))
2169}
2170
2171async fn get_belief(
2172    State(state): State<AppState>,
2173    Path(id): Path<String>,
2174) -> Result<Json<Belief>, (StatusCode, String)> {
2175    match state.cognition.get_belief(&id).await {
2176        Some(belief) => Ok(Json(belief)),
2177        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2178    }
2179}
2180
2181async fn list_attention(
2182    State(state): State<AppState>,
2183) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2184    Ok(Json(state.cognition.get_attention_queue().await))
2185}
2186
2187async fn list_proposals(
2188    State(state): State<AppState>,
2189) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2190    let proposals = state.cognition.get_proposals().await;
2191    let mut result: Vec<Proposal> = proposals.into_values().collect();
2192    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2193    Ok(Json(result))
2194}
2195
2196async fn approve_proposal(
2197    State(state): State<AppState>,
2198    Path(id): Path<String>,
2199) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2200    state
2201        .cognition
2202        .approve_proposal(&id)
2203        .await
2204        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2205        .map_err(internal_error)
2206}
2207
2208async fn list_receipts(
2209    State(state): State<AppState>,
2210) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2211    Ok(Json(state.cognition.get_receipts().await))
2212}
2213
2214async fn get_workspace(
2215    State(state): State<AppState>,
2216) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2217    Ok(Json(state.cognition.get_workspace().await))
2218}
2219
2220async fn get_governance(
2221    State(state): State<AppState>,
2222) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2223    Ok(Json(state.cognition.get_governance().await))
2224}
2225
2226async fn get_persona(
2227    State(state): State<AppState>,
2228    axum::extract::Path(id): axum::extract::Path<String>,
2229) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2230    state
2231        .cognition
2232        .get_persona(&id)
2233        .await
2234        .map(Json)
2235        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2236}
2237
2238// ── Audit trail endpoints ──
2239
2240#[derive(Deserialize)]
2241struct AuditQuery {
2242    limit: Option<usize>,
2243    category: Option<String>,
2244}
2245
2246async fn list_audit_entries(
2247    State(state): State<AppState>,
2248    Query(query): Query<AuditQuery>,
2249) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2250    let limit = query.limit.unwrap_or(100).min(1000);
2251
2252    let entries = if let Some(ref cat) = query.category {
2253        let category = match cat.as_str() {
2254            "api" => AuditCategory::Api,
2255            "tool" | "tool_execution" => AuditCategory::ToolExecution,
2256            "session" => AuditCategory::Session,
2257            "cognition" => AuditCategory::Cognition,
2258            "swarm" => AuditCategory::Swarm,
2259            "auth" => AuditCategory::Auth,
2260            "k8s" => AuditCategory::K8s,
2261            "sandbox" => AuditCategory::Sandbox,
2262            "config" => AuditCategory::Config,
2263            _ => {
2264                return Err((
2265                    StatusCode::BAD_REQUEST,
2266                    format!("Unknown category: {}", cat),
2267                ));
2268            }
2269        };
2270        state.audit_log.by_category(category, limit).await
2271    } else {
2272        state.audit_log.recent(limit).await
2273    };
2274
2275    Ok(Json(entries))
2276}
2277
2278// ── Event Stream Replay API ──
2279// Enables auditors to reconstruct sessions from byte-range offsets.
2280// This is the key compliance feature for SOC 2, FedRAMP, and ATO processes.
2281
2282#[derive(Deserialize)]
2283struct ReplayQuery {
2284    /// Session ID to replay
2285    session_id: String,
2286    /// Optional: starting byte offset (for seeking to specific point)
2287    start_offset: Option<u64>,
2288    /// Optional: ending byte offset
2289    end_offset: Option<u64>,
2290    /// Optional: limit number of events to return
2291    limit: Option<usize>,
2292    /// Optional: filter by tool name
2293    tool_name: Option<String>,
2294}
2295
2296/// Replay session events from the JSONL event stream by byte-range offsets.
2297/// This allows auditors to reconstruct exactly what happened in a session,
2298/// including tool execution durations and success/failure status.
2299async fn replay_session_events(
2300    Query(query): Query<ReplayQuery>,
2301) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2302    use std::path::PathBuf;
2303
2304    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2305        .map(PathBuf::from)
2306        .ok()
2307        .ok_or_else(|| {
2308            (
2309                StatusCode::SERVICE_UNAVAILABLE,
2310                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2311            )
2312        })?;
2313
2314    let session_dir = base_dir.join(&query.session_id);
2315
2316    // Check if session directory exists
2317    if !session_dir.exists() {
2318        return Err((
2319            StatusCode::NOT_FOUND,
2320            format!("Session not found: {}", query.session_id),
2321        ));
2322    }
2323
2324    let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2325
2326    // Read all event files in the session directory using std::fs for simplicity
2327    let entries = std::fs::read_dir(&session_dir)
2328        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2329
2330    for entry in entries {
2331        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2332        let path = entry.path();
2333
2334        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2335            continue;
2336        }
2337
2338        // Parse byte range from filename: {timestamp}-chat-events-{start}-{end}.jsonl
2339        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2340
2341        if let Some(offsets) = filename
2342            .strip_prefix("T")
2343            .or_else(|| filename.strip_prefix("202"))
2344        {
2345            // Extract start and end offsets from filename
2346            let parts: Vec<&str> = offsets.split('-').collect();
2347            if parts.len() >= 4 {
2348                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2349                let end: u64 = parts[parts.len() - 1]
2350                    .trim_end_matches(".jsonl")
2351                    .parse()
2352                    .unwrap_or(0);
2353
2354                // Filter by byte range if specified
2355                if let Some(query_start) = query.start_offset {
2356                    if end <= query_start {
2357                        continue;
2358                    }
2359                }
2360                if let Some(query_end) = query.end_offset {
2361                    if start >= query_end {
2362                        continue;
2363                    }
2364                }
2365
2366                // Read and parse events from this file
2367                let content = std::fs::read_to_string(&path)
2368                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2369
2370                for line in content.lines() {
2371                    if line.trim().is_empty() {
2372                        continue;
2373                    }
2374                    if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2375                        // Filter by tool name if specified
2376                        if let Some(ref tool_filter) = query.tool_name {
2377                            if let Some(event_tool) =
2378                                event.get("tool_name").and_then(|v| v.as_str())
2379                            {
2380                                if event_tool != tool_filter {
2381                                    continue;
2382                                }
2383                            } else {
2384                                continue;
2385                            }
2386                        }
2387                        all_events.push((start, end, event));
2388                    }
2389                }
2390            }
2391        }
2392    }
2393
2394    // Sort by start offset
2395    all_events.sort_by_key(|(s, _, _)| *s);
2396
2397    // Apply limit
2398    let limit = query.limit.unwrap_or(1000).min(10000);
2399    let events: Vec<_> = all_events
2400        .into_iter()
2401        .take(limit)
2402        .map(|(_, _, e)| e)
2403        .collect();
2404
2405    Ok(Json(events))
2406}
2407
2408/// Get session event files metadata (for audit index)
2409async fn list_session_event_files(
2410    Query(query): Query<ReplayQuery>,
2411) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2412    use std::path::PathBuf;
2413
2414    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2415        .map(PathBuf::from)
2416        .ok()
2417        .ok_or_else(|| {
2418            (
2419                StatusCode::SERVICE_UNAVAILABLE,
2420                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2421            )
2422        })?;
2423
2424    let session_dir = base_dir.join(&query.session_id);
2425    if !session_dir.exists() {
2426        return Err((
2427            StatusCode::NOT_FOUND,
2428            format!("Session not found: {}", query.session_id),
2429        ));
2430    }
2431
2432    let mut files: Vec<EventFileMeta> = Vec::new();
2433
2434    // Use std::fs for simplicity
2435    let entries = std::fs::read_dir(&session_dir)
2436        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2437
2438    for entry in entries {
2439        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2440        let path = entry.path();
2441
2442        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2443            continue;
2444        }
2445
2446        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2447
2448        // Parse byte range from filename
2449        if let Some(offsets) = filename
2450            .strip_prefix("T")
2451            .or_else(|| filename.strip_prefix("202"))
2452        {
2453            let parts: Vec<&str> = offsets.split('-').collect();
2454            if parts.len() >= 4 {
2455                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2456                let end: u64 = parts[parts.len() - 1]
2457                    .trim_end_matches(".jsonl")
2458                    .parse()
2459                    .unwrap_or(0);
2460
2461                let metadata = std::fs::metadata(&path)
2462                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2463
2464                files.push(EventFileMeta {
2465                    filename: filename.to_string(),
2466                    start_offset: start,
2467                    end_offset: end,
2468                    size_bytes: metadata.len(),
2469                });
2470            }
2471        }
2472    }
2473
2474    files.sort_by_key(|f| f.start_offset);
2475    Ok(Json(files))
2476}
2477
2478#[derive(Serialize)]
2479struct EventFileMeta {
2480    filename: String,
2481    start_offset: u64,
2482    end_offset: u64,
2483    size_bytes: u64,
2484}
2485
2486// ── K8s self-deployment endpoints ──
2487
2488async fn get_k8s_status(
2489    State(state): State<AppState>,
2490) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2491    Ok(Json(state.k8s.status().await))
2492}
2493
2494#[derive(Deserialize)]
2495struct ScaleRequest {
2496    replicas: i32,
2497}
2498
2499async fn k8s_scale(
2500    State(state): State<AppState>,
2501    Json(req): Json<ScaleRequest>,
2502) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2503    if req.replicas < 0 || req.replicas > 100 {
2504        return Err((
2505            StatusCode::BAD_REQUEST,
2506            "Replicas must be between 0 and 100".to_string(),
2507        ));
2508    }
2509
2510    state
2511        .audit_log
2512        .log(
2513            AuditCategory::K8s,
2514            format!("scale:{}", req.replicas),
2515            AuditOutcome::Success,
2516            None,
2517            None,
2518        )
2519        .await;
2520
2521    state
2522        .k8s
2523        .scale(req.replicas)
2524        .await
2525        .map(Json)
2526        .map_err(internal_error)
2527}
2528
2529async fn k8s_restart(
2530    State(state): State<AppState>,
2531) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2532    state
2533        .audit_log
2534        .log(
2535            AuditCategory::K8s,
2536            "rolling_restart",
2537            AuditOutcome::Success,
2538            None,
2539            None,
2540        )
2541        .await;
2542
2543    state
2544        .k8s
2545        .rolling_restart()
2546        .await
2547        .map(Json)
2548        .map_err(internal_error)
2549}
2550
2551async fn k8s_list_pods(
2552    State(state): State<AppState>,
2553) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2554    state
2555        .k8s
2556        .list_pods()
2557        .await
2558        .map(Json)
2559        .map_err(internal_error)
2560}
2561
2562async fn k8s_actions(
2563    State(state): State<AppState>,
2564) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2565    Ok(Json(state.k8s.recent_actions(100).await))
2566}
2567
2568#[derive(Deserialize)]
2569struct SpawnSubagentRequest {
2570    subagent_id: String,
2571    #[serde(default)]
2572    image: Option<String>,
2573    #[serde(default)]
2574    env_vars: std::collections::HashMap<String, String>,
2575    #[serde(default)]
2576    labels: std::collections::HashMap<String, String>,
2577    #[serde(default)]
2578    command: Option<Vec<String>>,
2579    #[serde(default)]
2580    args: Option<Vec<String>>,
2581}
2582
2583async fn k8s_spawn_subagent(
2584    State(state): State<AppState>,
2585    Json(req): Json<SpawnSubagentRequest>,
2586) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2587    state
2588        .k8s
2589        .spawn_subagent_pod_with_spec(
2590            &req.subagent_id,
2591            crate::k8s::SubagentPodSpec {
2592                image: req.image,
2593                env_vars: req.env_vars,
2594                labels: req.labels,
2595                command: req.command,
2596                args: req.args,
2597            },
2598        )
2599        .await
2600        .map(Json)
2601        .map_err(internal_error)
2602}
2603
2604async fn k8s_delete_subagent(
2605    State(state): State<AppState>,
2606    axum::extract::Path(id): axum::extract::Path<String>,
2607) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2608    state
2609        .k8s
2610        .delete_subagent_pod(&id)
2611        .await
2612        .map(Json)
2613        .map_err(internal_error)
2614}
2615
2616/// Request to register a tool
2617#[derive(Debug, Deserialize)]
2618pub struct RegisterToolRequest {
2619    pub id: String,
2620    pub name: String,
2621    pub description: String,
2622    pub version: String,
2623    pub endpoint: String,
2624    pub capabilities: Vec<String>,
2625    pub parameters: serde_json::Value,
2626}
2627
2628/// Response from registering a tool
2629#[derive(Serialize)]
2630struct RegisterToolResponse {
2631    tool: RegisteredTool,
2632    message: String,
2633}
2634
2635/// List all registered tools (active, non-expired)
2636async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredTool>> {
2637    Json(state.tool_registry.list().await)
2638}
2639
2640/// Register a new tool
2641async fn register_tool(
2642    State(state): State<AppState>,
2643    Json(req): Json<RegisterToolRequest>,
2644) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2645    let now = chrono::Utc::now();
2646    let tool = RegisteredTool {
2647        id: req.id.clone(),
2648        name: req.name,
2649        description: req.description,
2650        version: req.version,
2651        endpoint: req.endpoint,
2652        capabilities: req.capabilities,
2653        parameters: req.parameters,
2654        registered_at: now,
2655        last_heartbeat: now,
2656        expires_at: now + Duration::from_secs(90),
2657    };
2658
2659    state.tool_registry.register(tool.clone()).await;
2660
2661    tracing::info!(tool_id = %tool.id, "Tool registered");
2662
2663    Ok(Json(RegisterToolResponse {
2664        tool,
2665        message: "Tool registered successfully. Heartbeat required every 30s.".to_string(),
2666    }))
2667}
2668
2669/// Heartbeat endpoint to extend tool TTL
2670async fn tool_heartbeat(
2671    State(state): State<AppState>,
2672    Path(id): Path<String>,
2673) -> Result<Json<RegisteredTool>, (StatusCode, String)> {
2674    state
2675        .tool_registry
2676        .heartbeat(&id)
2677        .await
2678        .map(|tool| {
2679            tracing::info!(tool_id = %id, "Tool heartbeat received");
2680            Json(tool)
2681        })
2682        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2683}
2684
2685/// List registered plugins.
2686async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
2687    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
2688    let signing_key = SigningKey::from_env();
2689    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
2690    Json(PluginListResponse {
2691        server_fingerprint,
2692        signing_available: !test_sig.is_empty(),
2693        plugins: Vec::<PluginManifest>::new(),
2694    })
2695}
2696
2697#[derive(Serialize)]
2698struct PluginListResponse {
2699    server_fingerprint: String,
2700    signing_available: bool,
2701    plugins: Vec<PluginManifest>,
2702}
2703
2704// ── Agent task compatibility surface ─────────────────────────────────────
2705// These handlers provide the /v1/agent/tasks/* surface the marketing-site
2706// dashboard expects, backed by the existing KnativeTaskQueue.
2707
2708/// List all agent tasks (wraps Knative task queue)
2709async fn list_agent_tasks(
2710    State(state): State<AppState>,
2711    Query(params): Query<ListAgentTasksQuery>,
2712) -> Json<serde_json::Value> {
2713    let tasks = state.knative_tasks.list().await;
2714    let filtered: Vec<_> = tasks
2715        .into_iter()
2716        .filter(|t| params.status.as_ref().map_or(true, |s| t.status == *s))
2717        .filter(|t| {
2718            params
2719                .agent_type
2720                .as_ref()
2721                .map_or(true, |a| t.agent_type == *a)
2722        })
2723        .collect();
2724    Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2725}
2726
2727#[derive(Deserialize)]
2728struct ListAgentTasksQuery {
2729    status: Option<String>,
2730    agent_type: Option<String>,
2731}
2732
2733/// Create a new agent task
2734async fn create_agent_task(
2735    State(state): State<AppState>,
2736    Json(req): Json<CreateAgentTaskRequest>,
2737) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2738    let task_id = uuid::Uuid::new_v4().to_string();
2739    let task = KnativeTask {
2740        task_id: task_id.clone(),
2741        title: req.title,
2742        description: req.description,
2743        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2744        priority: req.priority.unwrap_or(0),
2745        received_at: chrono::Utc::now(),
2746        status: "pending".to_string(),
2747    };
2748    state.knative_tasks.push(task).await;
2749    Ok(Json(serde_json::json!({
2750        "task_id": task_id,
2751        "status": "pending",
2752    })))
2753}
2754
2755#[derive(Deserialize)]
2756#[allow(dead_code)]
2757struct CreateAgentTaskRequest {
2758    title: String,
2759    description: String,
2760    agent_type: Option<String>,
2761    model: Option<String>,
2762    priority: Option<i32>,
2763}
2764
2765/// Get a single agent task by ID
2766async fn get_agent_task(
2767    State(state): State<AppState>,
2768    Path(task_id): Path<String>,
2769) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2770    state
2771        .knative_tasks
2772        .get(&task_id)
2773        .await
2774        .map(Json)
2775        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2776}
2777
2778/// Get task output (returns task details including result)
2779async fn get_agent_task_output(
2780    State(state): State<AppState>,
2781    Path(task_id): Path<String>,
2782) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2783    let task = state
2784        .knative_tasks
2785        .get(&task_id)
2786        .await
2787        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2788    Ok(Json(serde_json::json!({
2789        "task_id": task.task_id,
2790        "status": task.status,
2791        "title": task.title,
2792        "output": null,
2793    })))
2794}
2795
2796/// Stream task output as SSE events
2797async fn stream_agent_task_output(
2798    State(state): State<AppState>,
2799    Path(task_id): Path<String>,
2800) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2801    // Verify task exists
2802    state
2803        .knative_tasks
2804        .get(&task_id)
2805        .await
2806        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2807
2808    let bus_handle = state.bus.handle("task-stream");
2809    let rx = bus_handle.into_receiver();
2810    let topic_prefix = format!("task.{task_id}");
2811
2812    let stream = async_stream::try_stream! {
2813        let mut rx = rx;
2814        loop {
2815            match rx.recv().await {
2816                Ok(envelope) => {
2817                    if !envelope.topic.starts_with(&topic_prefix) {
2818                        continue;
2819                    }
2820                    let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2821                    yield Event::default().event("output").data(data);
2822                }
2823                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2824                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2825            }
2826        }
2827    };
2828
2829    Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2830}
2831
2832// ── Worker connectivity ─────────────────────────────────────────────────
2833
2834/// List connected workers (K8s pods with agent label)
2835async fn list_connected_workers(
2836    State(state): State<AppState>,
2837) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2838    // Use K8s pod listing as proxy for connected workers
2839    let pods = state.k8s.list_pods().await.unwrap_or_default();
2840    let workers: Vec<serde_json::Value> = pods
2841        .iter()
2842        .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2843        .map(|p| {
2844            serde_json::json!({
2845                "worker_id": p.name,
2846                "name": p.name,
2847                "status": p.phase,
2848                "is_sse_connected": p.phase == "Running",
2849                "last_seen": chrono::Utc::now().to_rfc3339(),
2850            })
2851        })
2852        .collect();
2853    Ok(Json(serde_json::json!({ "workers": workers })))
2854}
2855
2856// ── Task dispatch ───────────────────────────────────────────────────────
2857
2858/// Dispatch a task (creates a Knative task and returns immediately)
2859async fn dispatch_task(
2860    State(state): State<AppState>,
2861    Json(req): Json<DispatchTaskRequest>,
2862) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2863    let task_id = uuid::Uuid::new_v4().to_string();
2864    let task = KnativeTask {
2865        task_id: task_id.clone(),
2866        title: req.title,
2867        description: req.description,
2868        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2869        priority: req.priority.unwrap_or(0),
2870        received_at: chrono::Utc::now(),
2871        status: "pending".to_string(),
2872    };
2873
2874    // Publish task event to agent bus for connected workers
2875    let handle = state.bus.handle("task-dispatch");
2876    handle.send(
2877        format!("task.{}", task_id),
2878        crate::bus::BusMessage::TaskUpdate {
2879            task_id: task_id.clone(),
2880            state: crate::a2a::types::TaskState::Submitted,
2881            message: Some(format!("Task dispatched: {}", task.title)),
2882        },
2883    );
2884
2885    state.knative_tasks.push(task).await;
2886
2887    Ok(Json(serde_json::json!({
2888        "task_id": task_id,
2889        "status": "pending",
2890        "dispatched_via_knative": true,
2891    })))
2892}
2893
2894#[derive(Deserialize)]
2895#[allow(dead_code)]
2896struct DispatchTaskRequest {
2897    title: String,
2898    description: String,
2899    agent_type: Option<String>,
2900    model: Option<String>,
2901    priority: Option<i32>,
2902    metadata: Option<serde_json::Value>,
2903}
2904
2905// ── Voice REST bridge ───────────────────────────────────────────────────
2906// Bridges the REST /v1/voice/* surface the dashboard expects to the
2907// existing gRPC VoiceService implementation via the AgentBus.
2908
2909/// Create a voice session (REST bridge for VoiceService::CreateVoiceSession)
2910async fn create_voice_session_rest(
2911    State(state): State<AppState>,
2912    Json(req): Json<CreateVoiceSessionRequest>,
2913) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2914    let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
2915    let room_name = format!("voice-{}", uuid::Uuid::new_v4());
2916
2917    // Publish session started event to bus
2918    let handle = state.bus.handle("voice-rest");
2919    handle.send_voice_session_started(&room_name, &voice_id);
2920
2921    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2922
2923    Ok(Json(serde_json::json!({
2924        "room_name": room_name,
2925        "voice": voice_id,
2926        "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
2927        "access_token": "",
2928        "livekit_url": livekit_url,
2929        "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
2930    })))
2931}
2932
2933#[derive(Deserialize)]
2934#[allow(dead_code)]
2935struct CreateVoiceSessionRequest {
2936    voice: Option<String>,
2937    mode: Option<String>,
2938    codebase_id: Option<String>,
2939    user_id: Option<String>,
2940}
2941
2942/// Get a voice session (REST bridge)
2943async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
2944    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2945    Json(serde_json::json!({
2946        "room_name": room_name,
2947        "state": "active",
2948        "agent_state": "listening",
2949        "access_token": "",
2950        "livekit_url": livekit_url,
2951    }))
2952}
2953
2954/// Delete a voice session (REST bridge)
2955async fn delete_voice_session_rest(
2956    State(state): State<AppState>,
2957    Path(room_name): Path<String>,
2958) -> Json<serde_json::Value> {
2959    let handle = state.bus.handle("voice-rest");
2960    handle.send_voice_session_ended(&room_name, "user_ended");
2961    Json(serde_json::json!({ "status": "deleted" }))
2962}
2963
2964/// List available voices (REST bridge)
2965async fn list_voices_rest() -> Json<serde_json::Value> {
2966    Json(serde_json::json!({
2967        "voices": [
2968            { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
2969            { "voice_id": "puck", "name": "Puck", "language": "english" },
2970            { "voice_id": "charon", "name": "Charon", "language": "english" },
2971            { "voice_id": "kore", "name": "Kore", "language": "english" },
2972            { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
2973            { "voice_id": "aoede", "name": "Aoede", "language": "english" },
2974        ]
2975    }))
2976}
2977
2978// ── Session resume ──────────────────────────────────────────────────────
2979
2980/// Resume a codebase-scoped session (what the dashboard session resume flow calls)
2981async fn resume_codebase_session(
2982    Path((_codebase_id, session_id)): Path<(String, String)>,
2983    Json(req): Json<ResumeSessionRequest>,
2984) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2985    // Load the session if it exists, or create a new one
2986    let mut session = match crate::session::Session::load(&session_id).await {
2987        Ok(s) => s,
2988        Err(_) => {
2989            // Create a new session for this codebase
2990            let mut s = crate::session::Session::new()
2991                .await
2992                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2993            if let Some(agent) = &req.agent {
2994                s.agent = agent.clone();
2995            }
2996            s.save()
2997                .await
2998                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2999            s
3000        }
3001    };
3002
3003    // If there's a prompt, execute it as a task
3004    if let Some(prompt) = &req.prompt {
3005        if !prompt.is_empty() {
3006            match session.prompt(prompt).await {
3007                Ok(result) => {
3008                    session.save().await.ok();
3009                    return Ok(Json(serde_json::json!({
3010                        "session_id": session.id,
3011                        "active_session_id": session.id,
3012                        "status": "completed",
3013                        "result": result.text,
3014                    })));
3015                }
3016                Err(e) => {
3017                    return Ok(Json(serde_json::json!({
3018                        "session_id": session.id,
3019                        "active_session_id": session.id,
3020                        "status": "failed",
3021                        "error": e.to_string(),
3022                    })));
3023                }
3024            }
3025        }
3026    }
3027
3028    // No prompt = just resume/check session
3029    Ok(Json(serde_json::json!({
3030        "session_id": session.id,
3031        "active_session_id": session.id,
3032        "status": "ready",
3033    })))
3034}
3035
3036#[derive(Deserialize)]
3037#[allow(dead_code)]
3038struct ResumeSessionRequest {
3039    prompt: Option<String>,
3040    agent: Option<String>,
3041    model: Option<String>,
3042}
3043
3044fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3045    let message = error.to_string();
3046    if message.contains("not found") {
3047        return (StatusCode::NOT_FOUND, message);
3048    }
3049    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3050        return (StatusCode::BAD_REQUEST, message);
3051    }
3052    (StatusCode::INTERNAL_SERVER_ERROR, message)
3053}
3054
3055fn env_bool(name: &str, default: bool) -> bool {
3056    std::env::var(name)
3057        .ok()
3058        .and_then(|v| match v.to_ascii_lowercase().as_str() {
3059            "1" | "true" | "yes" | "on" => Some(true),
3060            "0" | "false" | "no" | "off" => Some(false),
3061            _ => None,
3062        })
3063        .unwrap_or(default)
3064}
3065
3066#[cfg(test)]
3067mod tests {
3068    use super::{match_policy_rule, normalize_model_reference};
3069
3070    #[test]
3071    fn policy_prompt_session_requires_execute_permission() {
3072        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3073        assert_eq!(permission, Some("agent:execute"));
3074    }
3075
3076    #[test]
3077    fn policy_create_session_keeps_sessions_write_permission() {
3078        let permission = match_policy_rule("/api/session", "POST");
3079        assert_eq!(permission, Some("sessions:write"));
3080    }
3081
3082    #[test]
3083    fn policy_proposal_approval_requires_execute_permission() {
3084        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3085        assert_eq!(permission, Some("agent:execute"));
3086    }
3087
3088    #[test]
3089    fn policy_openai_models_requires_read_permission() {
3090        let permission = match_policy_rule("/v1/models", "GET");
3091        assert_eq!(permission, Some("agent:read"));
3092    }
3093
3094    #[test]
3095    fn policy_openai_chat_completions_requires_execute_permission() {
3096        let permission = match_policy_rule("/v1/chat/completions", "POST");
3097        assert_eq!(permission, Some("agent:execute"));
3098    }
3099
3100    #[test]
3101    fn normalize_model_reference_accepts_colon_format() {
3102        assert_eq!(
3103            normalize_model_reference("openai:gpt-4o"),
3104            "openai/gpt-4o".to_string()
3105        );
3106    }
3107}