Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7mod policy_user;
8mod tool_contract;
9mod worker_stream;
10mod worker_task_claim;
11mod worker_task_release;
12mod worker_task_stream;
13
14use crate::a2a;
15use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
16use crate::bus::{AgentBus, BusEnvelope};
17use crate::cli::ServeArgs;
18use crate::cloudevents::parse_cloud_event;
19use crate::cognition::{
20    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
21    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
22    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
23    executor::DecisionReceipt,
24};
25use crate::config::Config;
26use crate::k8s::K8sManager;
27use crate::tool::{PluginManifest, PluginRegistry as AgentPluginRegistry, hash_bytes, hash_file};
28use anyhow::Result;
29use auth::AuthState;
30use axum::{
31    Router,
32    body::Body,
33    extract::Path,
34    extract::{Query, State},
35    http::{HeaderMap, Method, Request, StatusCode},
36    middleware::{self, Next},
37    response::sse::{Event, KeepAlive, Sse},
38    response::{IntoResponse, Json, Response},
39    routing::{get, post},
40};
41use futures::{StreamExt, future::join_all, stream};
42use http::HeaderValue;
43use serde::{Deserialize, Serialize};
44use std::collections::HashMap;
45use std::convert::Infallible;
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::{Mutex, broadcast};
49use tonic_web::GrpcWebLayer;
50use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
51use tower_http::trace::TraceLayer;
52
53/// Task received from Knative Eventing
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct KnativeTask {
56    pub task_id: String,
57    pub title: String,
58    pub description: String,
59    pub agent_type: String,
60    pub priority: i32,
61    pub received_at: chrono::DateTime<chrono::Utc>,
62    pub status: String,
63}
64
65/// Queue for Knative tasks waiting to be processed
66#[derive(Clone)]
67pub struct KnativeTaskQueue {
68    tasks: Arc<Mutex<Vec<KnativeTask>>>,
69}
70
71impl KnativeTaskQueue {
72    pub fn new() -> Self {
73        Self {
74            tasks: Arc::new(Mutex::new(Vec::new())),
75        }
76    }
77
78    pub async fn push(&self, task: KnativeTask) {
79        self.tasks.lock().await.push(task);
80    }
81
82    #[allow(dead_code)]
83    pub async fn pop(&self) -> Option<KnativeTask> {
84        self.tasks.lock().await.pop()
85    }
86
87    pub async fn list(&self) -> Vec<KnativeTask> {
88        self.tasks.lock().await.clone()
89    }
90
91    #[allow(dead_code)]
92    pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
93        self.tasks
94            .lock()
95            .await
96            .iter()
97            .find(|t| t.task_id == task_id)
98            .cloned()
99    }
100
101    pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
102        let mut tasks = self.tasks.lock().await;
103        if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
104            task.status = status.to_string();
105            true
106        } else {
107            false
108        }
109    }
110}
111
112impl Default for KnativeTaskQueue {
113    fn default() -> Self {
114        Self::new()
115    }
116}
117
118/// A registered tool with TTL tracking
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct RegisteredTool {
121    pub id: String,
122    pub name: String,
123    pub description: String,
124    pub version: String,
125    pub endpoint: String,
126    pub capabilities: Vec<String>,
127    pub parameters: serde_json::Value,
128    pub execution_mode: String,
129    pub registered_at: chrono::DateTime<chrono::Utc>,
130    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
131    pub expires_at: chrono::DateTime<chrono::Utc>,
132}
133
134#[derive(Serialize)]
135struct RegisteredToolView {
136    #[serde(flatten)]
137    tool: RegisteredTool,
138    agent_executable: bool,
139}
140
141impl From<RegisteredTool> for RegisteredToolView {
142    fn from(tool: RegisteredTool) -> Self {
143        let agent_executable = tool_contract::is_agent_executable(&tool.execution_mode);
144        Self {
145            tool,
146            agent_executable,
147        }
148    }
149}
150
151/// Tool registry with TTL-based expiry
152#[derive(Clone)]
153pub struct ToolRegistry {
154    tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
155}
156
157impl Default for ToolRegistry {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl ToolRegistry {
164    pub fn new() -> Self {
165        Self {
166            tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
167        }
168    }
169
170    /// Register a new tool
171    pub async fn register(&self, tool: RegisteredTool) {
172        let mut tools = self.tools.write().await;
173        tools.insert(tool.id.clone(), tool);
174    }
175
176    /// Get a tool by ID
177    #[allow(dead_code)]
178    pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
179        let tools = self.tools.read().await;
180        tools.get(id).cloned()
181    }
182
183    /// List all tools (excluding expired)
184    pub async fn list(&self) -> Vec<RegisteredTool> {
185        let tools = self.tools.read().await;
186        let now = chrono::Utc::now();
187        tools
188            .values()
189            .filter(|t| t.expires_at > now)
190            .cloned()
191            .collect()
192    }
193
194    /// Update heartbeat for a tool (extends TTL by 30s)
195    pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
196        let mut tools = self.tools.write().await;
197        if let Some(tool) = tools.get_mut(id) {
198            let now = chrono::Utc::now();
199            tool.last_heartbeat = now;
200            tool.expires_at = now + Duration::from_secs(90);
201            return Some(tool.clone());
202        }
203        None
204    }
205
206    /// Clean up expired tools
207    pub async fn cleanup(&self) {
208        let mut tools = self.tools.write().await;
209        let now = chrono::Utc::now();
210        tools.retain(|_, t| t.expires_at > now);
211    }
212}
213
214/// Server state shared across handlers
215#[derive(Clone)]
216pub struct AppState {
217    pub config: Arc<Config>,
218    pub cognition: Arc<CognitionRuntime>,
219    pub audit_log: AuditLog,
220    pub k8s: Arc<K8sManager>,
221    pub auth: AuthState,
222    pub bus: Arc<AgentBus>,
223    pub knative_tasks: KnativeTaskQueue,
224    pub tool_registry: ToolRegistry,
225    pub plugin_registry: AgentPluginRegistry,
226    pub server_fingerprint: String,
227}
228
229/// Audit middleware — logs every request/response to the audit trail.
230async fn audit_middleware(
231    State(state): State<AppState>,
232    request: Request<Body>,
233    next: Next,
234) -> Response {
235    let method = request.method().clone();
236    let path = request.uri().path().to_string();
237    let started = std::time::Instant::now();
238
239    let response = next.run(request).await;
240
241    let duration_ms = started.elapsed().as_millis() as u64;
242    let status = response.status().as_u16();
243    let outcome = if status < 400 {
244        AuditOutcome::Success
245    } else if status == 401 || status == 403 {
246        AuditOutcome::Denied
247    } else {
248        AuditOutcome::Failure
249    };
250
251    state
252        .audit_log
253        .record(audit::AuditEntry {
254            id: uuid::Uuid::new_v4().to_string(),
255            timestamp: chrono::Utc::now(),
256            category: AuditCategory::Api,
257            action: format!("{} {}", method, path),
258            principal: None,
259            outcome,
260            detail: Some(serde_json::json!({ "status": status })),
261            duration_ms: Some(duration_ms),
262            okr_id: None,
263            okr_run_id: None,
264            relay_id: None,
265            session_id: None,
266        })
267        .await;
268
269    response
270}
271
272/// Mapping from (path pattern, HTTP method) → OPA permission action.
273/// The first matching rule wins.
274struct PolicyRule {
275    pattern: &'static str,
276    methods: Option<&'static [&'static str]>,
277    permission: &'static str,
278}
279
280const POLICY_RULES: &[PolicyRule] = &[
281    // Public / exempt
282    PolicyRule {
283        pattern: "/health",
284        methods: None,
285        permission: "",
286    },
287    PolicyRule {
288        pattern: "/task",
289        methods: None,
290        permission: "",
291    },
292    PolicyRule {
293        pattern: "/v1/knative/",
294        methods: None,
295        permission: "",
296    },
297    // OpenAI-compatible model discovery and chat completions
298    PolicyRule {
299        pattern: "/v1/models",
300        methods: Some(&["GET"]),
301        permission: "agent:read",
302    },
303    PolicyRule {
304        pattern: "/v1/chat/completions",
305        methods: Some(&["POST"]),
306        permission: "agent:execute",
307    },
308    // Tool registry — read/write
309    PolicyRule {
310        pattern: "/v1/tools",
311        methods: Some(&["GET"]),
312        permission: "agent:read",
313    },
314    PolicyRule {
315        pattern: "/v1/tools/register",
316        methods: Some(&["POST"]),
317        permission: "agent:execute",
318    },
319    PolicyRule {
320        pattern: "/v1/tools/",
321        methods: Some(&["POST"]),
322        permission: "agent:execute",
323    },
324    PolicyRule {
325        pattern: "/a2a/",
326        methods: None,
327        permission: "",
328    },
329    // K8s management — admin only
330    PolicyRule {
331        pattern: "/v1/k8s/scale",
332        methods: Some(&["POST"]),
333        permission: "admin:access",
334    },
335    PolicyRule {
336        pattern: "/v1/k8s/restart",
337        methods: Some(&["POST"]),
338        permission: "admin:access",
339    },
340    PolicyRule {
341        pattern: "/v1/k8s/",
342        methods: Some(&["GET"]),
343        permission: "admin:access",
344    },
345    // K8s sub-agent lifecycle — admin only
346    PolicyRule {
347        pattern: "/v1/k8s/subagent",
348        methods: Some(&["POST", "DELETE"]),
349        permission: "admin:access",
350    },
351    // Plugin registry — read
352    PolicyRule {
353        pattern: "/v1/plugins",
354        methods: Some(&["GET"]),
355        permission: "agent:read",
356    },
357    // Audit — admin
358    PolicyRule {
359        pattern: "/v1/audit",
360        methods: None,
361        permission: "admin:access",
362    },
363    // Event stream replay — admin (compliance feature)
364    PolicyRule {
365        pattern: "/v1/audit/replay",
366        methods: Some(&["GET"]),
367        permission: "admin:access",
368    },
369    // Cognition — write operations
370    PolicyRule {
371        pattern: "/v1/cognition/start",
372        methods: Some(&["POST"]),
373        permission: "agent:execute",
374    },
375    PolicyRule {
376        pattern: "/v1/cognition/stop",
377        methods: Some(&["POST"]),
378        permission: "agent:execute",
379    },
380    PolicyRule {
381        pattern: "/v1/cognition/",
382        methods: Some(&["GET"]),
383        permission: "agent:read",
384    },
385    // Swarm persona lifecycle
386    PolicyRule {
387        pattern: "/v1/swarm/personas",
388        methods: Some(&["POST"]),
389        permission: "agent:execute",
390    },
391    PolicyRule {
392        pattern: "/v1/swarm/",
393        methods: Some(&["POST"]),
394        permission: "agent:execute",
395    },
396    PolicyRule {
397        pattern: "/v1/swarm/",
398        methods: Some(&["GET"]),
399        permission: "agent:read",
400    },
401    // Agent Bus — read access for stream, filtered by JWT topic claims
402    PolicyRule {
403        pattern: "/v1/bus/stream",
404        methods: Some(&["GET"]),
405        permission: "agent:read",
406    },
407    // Agent Bus — publish messages (requires write permission)
408    PolicyRule {
409        pattern: "/v1/bus/publish",
410        methods: Some(&["POST"]),
411        permission: "agent:execute",
412    },
413    // Session management
414    // Session prompt execution — requires execute permission
415    PolicyRule {
416        pattern: "/api/session/",
417        methods: Some(&["POST"]),
418        permission: "agent:execute",
419    },
420    PolicyRule {
421        pattern: "/api/session",
422        methods: Some(&["POST"]),
423        permission: "sessions:write",
424    },
425    PolicyRule {
426        pattern: "/api/session",
427        methods: Some(&["GET"]),
428        permission: "sessions:read",
429    },
430    // MCP compatibility alias
431    PolicyRule {
432        pattern: "/mcp/v1/tools",
433        methods: Some(&["GET"]),
434        permission: "agent:read",
435    },
436    // Agent task APIs (dashboard compatibility)
437    PolicyRule {
438        pattern: "/v1/agent/tasks",
439        methods: Some(&["GET"]),
440        permission: "agent:read",
441    },
442    PolicyRule {
443        pattern: "/v1/agent/tasks",
444        methods: Some(&["POST"]),
445        permission: "agent:execute",
446    },
447    PolicyRule {
448        pattern: "/v1/agent/tasks/",
449        methods: Some(&["GET"]),
450        permission: "agent:read",
451    },
452    // Worker connectivity
453    PolicyRule {
454        pattern: "/v1/worker/",
455        methods: None,
456        permission: "agent:read",
457    },
458    PolicyRule {
459        pattern: "/v1/agent/workers",
460        methods: Some(&["GET"]),
461        permission: "agent:read",
462    },
463    // Task dispatch
464    PolicyRule {
465        pattern: "/v1/tasks/dispatch",
466        methods: Some(&["POST"]),
467        permission: "agent:execute",
468    },
469    // Voice REST bridge
470    PolicyRule {
471        pattern: "/v1/voice/",
472        methods: Some(&["GET"]),
473        permission: "agent:read",
474    },
475    PolicyRule {
476        pattern: "/v1/voice/",
477        methods: Some(&["POST", "DELETE"]),
478        permission: "agent:execute",
479    },
480    // Session resume
481    PolicyRule {
482        pattern: "/v1/agent/codebases/",
483        methods: Some(&["POST"]),
484        permission: "agent:execute",
485    },
486    // Proposal approval — governance action
487    PolicyRule {
488        pattern: "/v1/cognition/proposals/",
489        methods: Some(&["POST"]),
490        permission: "agent:execute",
491    },
492    // Config, version, providers, agents — read
493    PolicyRule {
494        pattern: "/api/version",
495        methods: None,
496        permission: "agent:read",
497    },
498    PolicyRule {
499        pattern: "/api/config",
500        methods: None,
501        permission: "agent:read",
502    },
503    PolicyRule {
504        pattern: "/api/provider",
505        methods: None,
506        permission: "agent:read",
507    },
508    PolicyRule {
509        pattern: "/api/agent",
510        methods: None,
511        permission: "agent:read",
512    },
513];
514
515/// Find the required permission for a given path + method.
516/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched.
517fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
518    for rule in POLICY_RULES {
519        let matches = if rule.pattern.ends_with('/') {
520            path.starts_with(rule.pattern)
521        } else {
522            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
523        };
524        if matches {
525            if let Some(allowed_methods) = rule.methods
526                && !allowed_methods.contains(&method)
527            {
528                continue;
529            }
530            return Some(rule.permission);
531        }
532    }
533    None
534}
535
536fn required_permission(path: &str, method: &str) -> Result<Option<&'static str>, StatusCode> {
537    match match_policy_rule(path, method) {
538        Some("") => Ok(None),
539        Some(permission) => Ok(Some(permission)),
540        None => Err(StatusCode::FORBIDDEN),
541    }
542}
543
544/// Policy authorization middleware for Axum.
545///
546/// Maps request paths to OPA permission strings and enforces authorization.
547/// Runs after `require_auth` so the bearer token is already validated.
548async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
549    let path = request.uri().path();
550    let method = request.method().as_str();
551
552    let permission = match required_permission(path, method)? {
553        None => return Ok(next.run(request).await),
554        Some(permission) => permission,
555    };
556
557    let user = policy_user::from_request(&request);
558
559    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
560        tracing::warn!(
561            path = %path,
562            method = %method,
563            permission = %permission,
564            "Policy middleware denied request"
565        );
566        return Err(status);
567    }
568
569    Ok(next.run(request).await)
570}
571
572/// Start the HTTP server
573pub async fn serve(args: ServeArgs) -> Result<()> {
574    let t0 = std::time::Instant::now();
575    tracing::info!("[startup] begin");
576    let config = Config::load().await?;
577    tracing::info!(
578        elapsed_ms = t0.elapsed().as_millis(),
579        "[startup] config loaded"
580    );
581    let mut cognition = CognitionRuntime::new_from_env();
582    tracing::info!(
583        elapsed_ms = t0.elapsed().as_millis(),
584        "[startup] cognition runtime created"
585    );
586
587    // Set up tool registry for cognition execution engine.
588    let runtime_tools = Arc::new(crate::tool::ToolRegistry::with_defaults());
589    let plugin_registry = runtime_tools.plugins().clone();
590    cognition.set_tools(runtime_tools);
591    tracing::info!(
592        elapsed_ms = t0.elapsed().as_millis(),
593        "[startup] tools registered"
594    );
595    let cognition = Arc::new(cognition);
596
597    // Initialize audit log.
598    let audit_log = AuditLog::from_env();
599    let _ = audit::init_audit_log(audit_log.clone());
600
601    // Initialize K8s manager.
602    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
603    let k8s = Arc::new(K8sManager::new().await);
604    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
605    if k8s.is_available() {
606        tracing::info!("K8s self-deployment enabled");
607    }
608
609    // Initialize mandatory auth.
610    let auth_state = AuthState::from_env();
611    tracing::info!(
612        token_len = auth_state.token().len(),
613        "Auth is mandatory. Only /health is served without a Bearer token."
614    );
615    tracing::info!(
616        audit_entries = audit_log.count().await,
617        "Audit log initialized"
618    );
619
620    // Create agent bus for in-process communication
621    let bus = AgentBus::new().into_arc();
622    crate::bus::set_global(bus.clone());
623
624    // Auto-start S3 sink if MinIO is configured (set MINIO_ENDPOINT to enable)
625    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
626
627    tracing::info!(
628        elapsed_ms = t0.elapsed().as_millis(),
629        "[startup] bus created"
630    );
631
632    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
633        tracing::info!(
634            elapsed_ms = t0.elapsed().as_millis(),
635            "[startup] auto-starting cognition"
636        );
637        if let Err(error) = cognition.start(None).await {
638            tracing::warn!(%error, "Failed to auto-start cognition loop");
639        } else {
640            tracing::info!("Perpetual cognition auto-started");
641        }
642    }
643
644    tracing::info!(
645        elapsed_ms = t0.elapsed().as_millis(),
646        "[startup] building routes"
647    );
648    let addr = format!("{}:{}", args.hostname, args.port);
649
650    // Build the agent card
651    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
652    let a2a_server = a2a::server::A2AServer::with_bus(agent_card.clone(), bus.clone());
653
654    // Build A2A router separately
655    let a2a_router = a2a_server.clone().router();
656    let a2a_nested_router = a2a_server.router();
657
658    // Start gRPC transport on a separate port
659    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
660        .ok()
661        .and_then(|p| p.parse::<u16>().ok())
662        .unwrap_or(50051);
663    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
664    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
665    let grpc_service = grpc_store.into_service();
666    let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
667    tokio::spawn(async move {
668        tracing::info!(addr = %grpc_addr, "Starting gRPC A2A server");
669
670        // Configure CORS for marketing site (production + local dev)
671        let allowed_origins = [
672            HeaderValue::from_static("https://codetether.run"),
673            HeaderValue::from_static("https://api.codetether.run"),
674            HeaderValue::from_static("https://docs.codetether.run"),
675            HeaderValue::from_static("https://codetether.com"),
676            HeaderValue::from_static("http://localhost:3000"),
677            HeaderValue::from_static("http://localhost:3001"),
678        ];
679        let cors = CorsLayer::new()
680            .allow_origin(AllowOrigin::list(allowed_origins))
681            .allow_methods(AllowMethods::any())
682            .allow_headers(AllowHeaders::any())
683            .expose_headers(tower_http::cors::ExposeHeaders::list([
684                http::header::HeaderName::from_static("grpc-status"),
685                http::header::HeaderName::from_static("grpc-message"),
686            ]));
687
688        if let Err(error) = tonic::transport::Server::builder()
689            .accept_http1(true)
690            .layer(cors)
691            .layer(GrpcWebLayer::new())
692            .add_service(grpc_service)
693            .add_service(voice_service)
694            .serve(grpc_addr)
695            .await
696        {
697            tracing::warn!(addr = %grpc_addr, error = ?error, "gRPC A2A server exited");
698        }
699    });
700
701    let state = AppState {
702        config: Arc::new(config),
703        cognition,
704        audit_log,
705        k8s,
706        auth: auth_state.clone(),
707        bus,
708        knative_tasks: KnativeTaskQueue::new(),
709        tool_registry: ToolRegistry::new(),
710        plugin_registry,
711        server_fingerprint: hash_bytes(env!("CARGO_PKG_VERSION").as_bytes()),
712    };
713
714    // Spawn the tool reaper background task (runs every 15s to clean up expired tools)
715    let tool_registry = state.tool_registry.clone();
716    tokio::spawn(async move {
717        let mut interval = tokio::time::interval(Duration::from_secs(15));
718        loop {
719            interval.tick().await;
720            tool_registry.cleanup().await;
721            tracing::debug!("Tool reaper ran cleanup");
722        }
723    });
724
725    let app = Router::new()
726        // Health check (public — auth exempt)
727        .route("/health", get(health))
728        // CloudEvent receiver for Knative Eventing (public — auth exempt)
729        .route("/task", post(receive_task_event))
730        // Knative task queue APIs
731        .route("/v1/knative/tasks", get(list_knative_tasks))
732        .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
733        .route(
734            "/v1/knative/tasks/{task_id}/claim",
735            post(claim_knative_task),
736        )
737        .route(
738            "/v1/knative/tasks/{task_id}/complete",
739            post(complete_knative_task),
740        )
741        // API routes
742        .route("/api/version", get(get_version))
743        .route("/api/session", get(list_sessions).post(create_session))
744        .route("/api/session/{id}", get(get_session))
745        .route("/api/session/{id}/prompt", post(prompt_session))
746        .route("/api/config", get(get_config))
747        .route("/api/provider", get(list_providers))
748        .route("/api/agent", get(list_agents))
749        // OpenAI-compatible APIs
750        .route("/v1/models", get(list_openai_models))
751        .route("/v1/chat/completions", post(openai_chat_completions))
752        // Perpetual cognition APIs
753        .route("/v1/cognition/start", post(start_cognition))
754        .route("/v1/cognition/stop", post(stop_cognition))
755        .route("/v1/cognition/status", get(get_cognition_status))
756        .route("/v1/cognition/stream", get(stream_cognition))
757        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
758        // Swarm persona lifecycle APIs
759        .route("/v1/swarm/personas", post(create_persona))
760        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
761        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
762        .route("/v1/swarm/lineage", get(get_swarm_lineage))
763        // Belief, attention, governance, workspace APIs
764        .route("/v1/cognition/beliefs", get(list_beliefs))
765        .route("/v1/cognition/beliefs/{id}", get(get_belief))
766        .route("/v1/cognition/attention", get(list_attention))
767        .route("/v1/cognition/proposals", get(list_proposals))
768        .route(
769            "/v1/cognition/proposals/{id}/approve",
770            post(approve_proposal),
771        )
772        .route("/v1/cognition/receipts", get(list_receipts))
773        .route("/v1/cognition/workspace", get(get_workspace))
774        .route("/v1/cognition/governance", get(get_governance))
775        .route("/v1/cognition/personas/{id}", get(get_persona))
776        // Audit trail API
777        .route("/v1/audit", get(list_audit_entries))
778        // Event stream replay API (for SOC 2, FedRAMP, ATO compliance)
779        .route("/v1/audit/replay", get(replay_session_events))
780        .route("/v1/audit/replay/index", get(list_session_event_files))
781        // K8s self-deployment APIs
782        .route("/v1/k8s/status", get(get_k8s_status))
783        .route("/v1/k8s/scale", post(k8s_scale))
784        .route("/v1/k8s/restart", post(k8s_restart))
785        .route("/v1/k8s/pods", get(k8s_list_pods))
786        .route("/v1/k8s/actions", get(k8s_actions))
787        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
788        .route(
789            "/v1/k8s/subagent/{id}",
790            axum::routing::delete(k8s_delete_subagent),
791        )
792        // Plugin registry API
793        .route("/v1/plugins", get(list_plugins))
794        // Tool registry API
795        .route("/v1/tools", get(list_tools))
796        .route("/v1/tools/register", post(register_tool))
797        .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
798        // MCP compatibility alias (marketing site SDK expects /mcp/v1/tools)
799        .route("/mcp/v1/tools", get(list_tools))
800        // Agent task APIs (compatibility surface for dashboard)
801        .route(
802            "/v1/agent/tasks",
803            get(list_agent_tasks).post(create_agent_task),
804        )
805        .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
806        .route(
807            "/v1/agent/tasks/{task_id}/output",
808            get(get_agent_task_output).post(agent_task_output),
809        )
810        .route(
811            "/v1/agent/tasks/{task_id}/output/stream",
812            get(stream_agent_task_output),
813        )
814        // Worker connectivity (dashboard polls this)
815        .route("/v1/worker/connected", get(list_connected_workers))
816        .route("/v1/agent/workers", get(list_connected_workers))
817        // Worker task lifecycle (what the Rust worker calls)
818        .route(
819            "/v1/worker/tasks/stream",
820            get(worker_task_stream::worker_task_stream),
821        )
822        .route(
823            "/v1/worker/tasks/claim",
824            post(worker_task_claim::worker_task_claim),
825        )
826        .route(
827            "/v1/worker/tasks/release",
828            post(worker_task_release::worker_task_release),
829        )
830        // Task dispatch (Knative-backed)
831        .route("/v1/tasks/dispatch", post(dispatch_task))
832        // Voice REST bridge (dashboard expects REST, server has gRPC)
833        .route("/v1/voice/sessions", post(create_voice_session_rest))
834        .route(
835            "/v1/voice/sessions/{room_name}",
836            get(get_voice_session_rest).delete(delete_voice_session_rest),
837        )
838        .route("/v1/voice/voices", get(list_voices_rest))
839        // Session resume (dashboard uses this for codebase-scoped resume)
840        .route(
841            "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
842            post(resume_codebase_session),
843        )
844        // Agent Bus — SSE stream + publish
845        .route("/v1/bus/stream", get(stream_bus_events))
846        .with_state(state.clone())
847        // A2A routes at both the root and /a2a so local runtimes can discover
848        // and invoke the same server through either shape.
849        .merge(a2a_router)
850        .nest("/a2a", a2a_nested_router)
851        // Mandatory auth middleware — applies to all routes
852        .layer(middleware::from_fn_with_state(
853            state.clone(),
854            audit_middleware,
855        ))
856        .layer(middleware::from_fn(policy_middleware))
857        .layer(middleware::from_fn(auth::require_auth))
858        .layer(axum::Extension(state.auth.clone()))
859        // CORS + tracing — explicit origin allowlist so headers are present on
860        // ALL responses (including 4xx/5xx) and not dependent on request mirroring.
861        .layer(
862            CorsLayer::new()
863                .allow_origin([
864                    "https://codetether.run".parse::<HeaderValue>().unwrap(),
865                    "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
866                    "https://docs.codetether.run"
867                        .parse::<HeaderValue>()
868                        .unwrap(),
869                    "https://codetether.com".parse::<HeaderValue>().unwrap(),
870                    "http://localhost:3000".parse::<HeaderValue>().unwrap(),
871                    "http://localhost:3001".parse::<HeaderValue>().unwrap(),
872                    "http://localhost:8000".parse::<HeaderValue>().unwrap(),
873                ])
874                .allow_credentials(true)
875                .allow_methods([
876                    Method::GET,
877                    Method::POST,
878                    Method::PUT,
879                    Method::PATCH,
880                    Method::DELETE,
881                    Method::OPTIONS,
882                ])
883                .allow_headers([
884                    http::header::AUTHORIZATION,
885                    http::header::CONTENT_TYPE,
886                    http::header::ACCEPT,
887                    http::header::ORIGIN,
888                    http::header::CACHE_CONTROL,
889                    http::header::HeaderName::from_static("x-worker-id"),
890                    http::header::HeaderName::from_static("x-agent-name"),
891                    http::header::HeaderName::from_static("x-workspaces"),
892                    http::header::HeaderName::from_static("x-requested-with"),
893                ]),
894        )
895        .layer(TraceLayer::new_for_http());
896
897    tracing::info!(
898        elapsed_ms = t0.elapsed().as_millis(),
899        "[startup] router built, binding"
900    );
901    let listener = tokio::net::TcpListener::bind(&addr).await?;
902    tracing::info!(
903        elapsed_ms = t0.elapsed().as_millis(),
904        "[startup] listening on http://{}",
905        addr
906    );
907
908    axum::serve(listener, app).await?;
909
910    Ok(())
911}
912
913/// Health check response
914async fn health() -> &'static str {
915    "ok"
916}
917
918/// List all Knative tasks in queue
919async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
920    Json(state.knative_tasks.list().await)
921}
922
923/// Get a specific Knative task by ID
924async fn get_knative_task(
925    State(state): State<AppState>,
926    Path(task_id): Path<String>,
927) -> Result<Json<KnativeTask>, (StatusCode, String)> {
928    state
929        .knative_tasks
930        .get(&task_id)
931        .await
932        .map(Json)
933        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
934}
935
936/// Claim/start processing a Knative task
937async fn claim_knative_task(
938    State(state): State<AppState>,
939    Path(task_id): Path<String>,
940) -> Result<Json<KnativeTask>, (StatusCode, String)> {
941    // Update status to processing
942    let updated = state
943        .knative_tasks
944        .update_status(&task_id, "processing")
945        .await;
946    if !updated {
947        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
948    }
949
950    state
951        .knative_tasks
952        .get(&task_id)
953        .await
954        .map(Json)
955        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
956}
957
958/// Complete a Knative task
959async fn complete_knative_task(
960    State(state): State<AppState>,
961    Path(task_id): Path<String>,
962) -> Result<Json<KnativeTask>, (StatusCode, String)> {
963    // Update status to completed
964    let updated = state
965        .knative_tasks
966        .update_status(&task_id, "completed")
967        .await;
968    if !updated {
969        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
970    }
971
972    state
973        .knative_tasks
974        .get(&task_id)
975        .await
976        .map(Json)
977        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
978}
979
980/// CloudEvent handler for Knative Eventing
981/// Receives task events from Knative Broker and triggers execution
982async fn receive_task_event(
983    State(state): State<AppState>,
984    headers: HeaderMap,
985    Json(body): Json<serde_json::Value>,
986) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
987    let event =
988        parse_cloud_event(&headers, body).map_err(|error| (StatusCode::BAD_REQUEST, error))?;
989    tracing::info!(
990        event_type = %event.event_type,
991        event_id = %event.id,
992        "Received CloudEvent from Knative"
993    );
994
995    // Log the incoming event for audit
996    state
997        .audit_log
998        .log(
999            audit::AuditCategory::Api,
1000            format!("cloudevents:{}", event.event_type),
1001            AuditOutcome::Success,
1002            None,
1003            None,
1004        )
1005        .await;
1006
1007    // Process based on event type
1008    match event.event_type.as_str() {
1009        "codetether.task.created" | "task.created" => {
1010            let task_id = event
1011                .data
1012                .get("task_id")
1013                .or_else(|| event.data.get("id"))
1014                .and_then(|v| v.as_str())
1015                .unwrap_or(event.id.as_str())
1016                .to_string();
1017            let title = event
1018                .data
1019                .get("title")
1020                .or_else(|| event.data.get("prompt"))
1021                .and_then(|v| v.as_str())
1022                .unwrap_or("")
1023                .to_string();
1024            let description = event
1025                .data
1026                .get("description")
1027                .or_else(|| event.data.get("prompt"))
1028                .and_then(|v| v.as_str())
1029                .unwrap_or("")
1030                .to_string();
1031            let agent_type = event
1032                .data
1033                .get("agent_type")
1034                .or_else(|| event.data.get("agent"))
1035                .and_then(|v| v.as_str())
1036                .unwrap_or("build")
1037                .to_string();
1038            let priority = event
1039                .data
1040                .get("priority")
1041                .and_then(|v| v.as_i64())
1042                .unwrap_or(0) as i32;
1043
1044            let task = KnativeTask {
1045                task_id: task_id.clone(),
1046                title,
1047                description,
1048                agent_type,
1049                priority,
1050                received_at: chrono::Utc::now(),
1051                status: "queued".to_string(),
1052            };
1053
1054            state.knative_tasks.push(task).await;
1055            tracing::info!(task_id = %task_id, "Task queued for execution");
1056        }
1057        "codetether.task.updated" | "task.updated" => {
1058            if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1059                let status = event
1060                    .data
1061                    .get("status")
1062                    .and_then(|v| v.as_str())
1063                    .unwrap_or("updated");
1064                let _ = state.knative_tasks.update_status(task_id, status).await;
1065                tracing::info!(task_id = %task_id, status = %status, "Task status updated");
1066            }
1067        }
1068        "codetether.task.cancelled" => {
1069            tracing::info!("Task cancellation event received");
1070            // Update task status if we have the task_id
1071            if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1072                let _ = state
1073                    .knative_tasks
1074                    .update_status(task_id, "cancelled")
1075                    .await;
1076                tracing::info!(task_id = %task_id, "Task cancelled");
1077            }
1078        }
1079        _ => {
1080            tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
1081        }
1082    }
1083
1084    Ok(Json(CloudEventResponse {
1085        status: "accepted".to_string(),
1086        event_id: event.id,
1087    }))
1088}
1089
1090/// Response to CloudEvent acknowledgment
1091#[derive(Serialize)]
1092struct CloudEventResponse {
1093    status: String,
1094    event_id: String,
1095}
1096
1097/// Version info
1098#[derive(Serialize)]
1099struct VersionInfo {
1100    version: &'static str,
1101    name: &'static str,
1102    binary_hash: Option<String>,
1103}
1104
1105async fn get_version() -> Json<VersionInfo> {
1106    let binary_hash = std::env::current_exe()
1107        .ok()
1108        .and_then(|p| hash_file(&p).ok());
1109    Json(VersionInfo {
1110        version: env!("CARGO_PKG_VERSION"),
1111        name: env!("CARGO_PKG_NAME"),
1112        binary_hash,
1113    })
1114}
1115
1116/// List sessions
1117#[derive(Deserialize)]
1118struct ListSessionsQuery {
1119    limit: Option<usize>,
1120    offset: Option<usize>,
1121}
1122
1123async fn list_sessions(
1124    Query(query): Query<ListSessionsQuery>,
1125) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1126    let sessions = crate::session::list_sessions()
1127        .await
1128        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1129
1130    let offset = query.offset.unwrap_or(0);
1131    let limit = query.limit.unwrap_or(100);
1132    Ok(Json(
1133        sessions.into_iter().skip(offset).take(limit).collect(),
1134    ))
1135}
1136
1137/// Create a new session
1138#[derive(Deserialize)]
1139struct CreateSessionRequest {
1140    title: Option<String>,
1141    agent: Option<String>,
1142}
1143
1144async fn create_session(
1145    Json(req): Json<CreateSessionRequest>,
1146) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1147    let mut session = crate::session::Session::new()
1148        .await
1149        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1150
1151    session.title = req.title;
1152    if let Some(agent) = req.agent {
1153        session.set_agent_name(agent);
1154    }
1155
1156    session
1157        .save()
1158        .await
1159        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1160
1161    Ok(Json(session))
1162}
1163
1164/// Get a session by ID
1165async fn get_session(
1166    axum::extract::Path(id): axum::extract::Path<String>,
1167) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1168    let session = crate::session::Session::load(&id)
1169        .await
1170        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1171
1172    Ok(Json(session))
1173}
1174
1175/// Prompt a session
1176#[derive(Deserialize)]
1177struct PromptRequest {
1178    message: String,
1179}
1180
1181async fn prompt_session(
1182    axum::extract::Path(id): axum::extract::Path<String>,
1183    Json(req): Json<PromptRequest>,
1184) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1185    // Validate the message is not empty
1186    if req.message.trim().is_empty() {
1187        return Err((
1188            StatusCode::BAD_REQUEST,
1189            "Message cannot be empty".to_string(),
1190        ));
1191    }
1192
1193    // Log the prompt request (uses the message field)
1194    tracing::info!(
1195        session_id = %id,
1196        message_len = req.message.len(),
1197        "Received prompt request"
1198    );
1199
1200    let mut session = crate::session::Session::load(&id)
1201        .await
1202        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1203
1204    let result = session.prompt(&req.message).await.map_err(|e| {
1205        tracing::error!(session_id = %id, error = %e, "Session prompt failed");
1206        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1207    })?;
1208
1209    session.save().await.map_err(|e| {
1210        tracing::error!(session_id = %id, error = %e, "Failed to save session after prompt");
1211        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1212    })?;
1213
1214    Ok(Json(result))
1215}
1216
1217/// Get configuration
1218async fn get_config(State(state): State<AppState>) -> Json<Config> {
1219    Json((*state.config).clone())
1220}
1221
1222/// List providers
1223async fn list_providers() -> Json<Vec<String>> {
1224    Json(vec![
1225        "openai".to_string(),
1226        "anthropic".to_string(),
1227        "google".to_string(),
1228    ])
1229}
1230
1231/// List agents
1232async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1233    let registry = crate::agent::AgentRegistry::with_builtins();
1234    Json(registry.list().into_iter().cloned().collect())
1235}
1236
1237type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1238type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1239
1240#[derive(Debug, Deserialize)]
1241struct OpenAiChatCompletionRequest {
1242    model: String,
1243    messages: Vec<OpenAiRequestMessage>,
1244    #[serde(default)]
1245    tools: Vec<OpenAiRequestTool>,
1246    temperature: Option<f32>,
1247    top_p: Option<f32>,
1248    max_tokens: Option<usize>,
1249    max_completion_tokens: Option<usize>,
1250    stop: Option<OpenAiStop>,
1251    stream: Option<bool>,
1252}
1253
1254#[derive(Debug, Deserialize)]
1255#[serde(untagged)]
1256enum OpenAiStop {
1257    Single(String),
1258    Multiple(Vec<String>),
1259}
1260
1261#[derive(Debug, Deserialize)]
1262struct OpenAiRequestMessage {
1263    role: String,
1264    #[serde(default)]
1265    content: Option<serde_json::Value>,
1266    #[serde(default)]
1267    tool_call_id: Option<String>,
1268    #[serde(default)]
1269    tool_calls: Vec<OpenAiRequestToolCall>,
1270}
1271
1272#[derive(Debug, Deserialize)]
1273struct OpenAiRequestToolCall {
1274    id: String,
1275    #[serde(rename = "type", default = "default_function_type")]
1276    kind: String,
1277    function: OpenAiRequestToolCallFunction,
1278}
1279
1280#[derive(Debug, Deserialize)]
1281struct OpenAiRequestToolCallFunction {
1282    name: String,
1283    #[serde(default = "default_json_object_string")]
1284    arguments: String,
1285}
1286
1287#[derive(Debug, Deserialize)]
1288struct OpenAiRequestTool {
1289    #[serde(rename = "type", default = "default_function_type")]
1290    kind: String,
1291    function: OpenAiRequestToolDefinition,
1292}
1293
1294#[derive(Debug, Deserialize)]
1295struct OpenAiRequestToolDefinition {
1296    name: String,
1297    #[serde(default)]
1298    description: String,
1299    #[serde(default = "default_json_object")]
1300    parameters: serde_json::Value,
1301}
1302
1303#[derive(Debug, Serialize)]
1304struct OpenAiModelsResponse {
1305    object: String,
1306    data: Vec<OpenAiModel>,
1307}
1308
1309#[derive(Debug, Serialize)]
1310struct OpenAiModel {
1311    id: String,
1312    object: String,
1313    created: i64,
1314    owned_by: String,
1315}
1316
1317#[derive(Debug, Serialize)]
1318struct OpenAiChatCompletionResponse {
1319    id: String,
1320    object: String,
1321    created: i64,
1322    model: String,
1323    choices: Vec<OpenAiChatCompletionChoice>,
1324    usage: OpenAiUsage,
1325}
1326
1327#[derive(Debug, Serialize)]
1328struct OpenAiChatCompletionChoice {
1329    index: usize,
1330    message: OpenAiChatCompletionMessage,
1331    finish_reason: String,
1332}
1333
1334#[derive(Debug, Serialize)]
1335struct OpenAiChatCompletionMessage {
1336    role: String,
1337    #[serde(skip_serializing_if = "Option::is_none")]
1338    content: Option<String>,
1339    #[serde(skip_serializing_if = "Option::is_none")]
1340    tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1341}
1342
1343#[derive(Debug, Serialize)]
1344struct OpenAiResponseToolCall {
1345    id: String,
1346    #[serde(rename = "type")]
1347    kind: String,
1348    function: OpenAiResponseToolCallFunction,
1349}
1350
1351#[derive(Debug, Serialize)]
1352struct OpenAiResponseToolCallFunction {
1353    name: String,
1354    arguments: String,
1355}
1356
1357#[derive(Debug, Serialize)]
1358struct OpenAiUsage {
1359    prompt_tokens: usize,
1360    completion_tokens: usize,
1361    total_tokens: usize,
1362}
1363
1364fn default_function_type() -> String {
1365    "function".to_string()
1366}
1367
1368fn default_json_object() -> serde_json::Value {
1369    serde_json::json!({})
1370}
1371
1372fn default_json_object_string() -> String {
1373    "{}".to_string()
1374}
1375
1376fn openai_error(
1377    status: StatusCode,
1378    message: impl Into<String>,
1379    error_type: &str,
1380    code: &str,
1381) -> OpenAiApiError {
1382    (
1383        status,
1384        Json(serde_json::json!({
1385            "error": {
1386                "message": message.into(),
1387                "type": error_type,
1388                "code": code,
1389            }
1390        })),
1391    )
1392}
1393
1394fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1395    openai_error(
1396        StatusCode::BAD_REQUEST,
1397        message,
1398        "invalid_request_error",
1399        "invalid_request",
1400    )
1401}
1402
1403fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1404    openai_error(
1405        StatusCode::INTERNAL_SERVER_ERROR,
1406        message,
1407        "server_error",
1408        "internal_error",
1409    )
1410}
1411
1412fn canonicalize_provider_name(provider: &str) -> &str {
1413    if provider == "zhipuai" {
1414        "zai"
1415    } else {
1416        provider
1417    }
1418}
1419
1420fn normalize_model_reference(model: &str) -> String {
1421    let trimmed = model.trim();
1422    if let Some((provider, model_id)) = trimmed.split_once(':')
1423        && !provider.is_empty()
1424        && !model_id.is_empty()
1425        && !trimmed.contains('/')
1426    {
1427        return format!("{provider}/{model_id}");
1428    }
1429    trimmed.to_string()
1430}
1431
1432fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1433    let provider = canonicalize_provider_name(provider);
1434    let trimmed = model_id.trim_start_matches('/');
1435    if trimmed.starts_with(&format!("{provider}/")) {
1436        trimmed.to_string()
1437    } else {
1438        format!("{provider}/{trimmed}")
1439    }
1440}
1441
1442fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1443    let obj = value.as_object()?;
1444    let part_type = obj
1445        .get("type")
1446        .and_then(serde_json::Value::as_str)
1447        .unwrap_or("text");
1448
1449    match part_type {
1450        "text" | "input_text" => obj
1451            .get("text")
1452            .and_then(serde_json::Value::as_str)
1453            .map(|text| crate::provider::ContentPart::Text {
1454                text: text.to_string(),
1455            }),
1456        "image_url" => obj
1457            .get("image_url")
1458            .and_then(serde_json::Value::as_object)
1459            .and_then(|image| image.get("url"))
1460            .and_then(serde_json::Value::as_str)
1461            .map(|url| crate::provider::ContentPart::Image {
1462                url: url.to_string(),
1463                mime_type: None,
1464            }),
1465        _ => obj
1466            .get("text")
1467            .and_then(serde_json::Value::as_str)
1468            .map(|text| crate::provider::ContentPart::Text {
1469                text: text.to_string(),
1470            }),
1471    }
1472}
1473
1474fn parse_openai_content_parts(
1475    content: &Option<serde_json::Value>,
1476) -> Vec<crate::provider::ContentPart> {
1477    let Some(value) = content else {
1478        return Vec::new();
1479    };
1480
1481    match value {
1482        serde_json::Value::String(text) => {
1483            vec![crate::provider::ContentPart::Text { text: text.clone() }]
1484        }
1485        serde_json::Value::Array(parts) => {
1486            parts.iter().filter_map(parse_openai_content_part).collect()
1487        }
1488        serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1489        _ => Vec::new(),
1490    }
1491}
1492
1493fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1494    parse_openai_content_parts(content)
1495        .into_iter()
1496        .filter_map(|part| match part {
1497            crate::provider::ContentPart::Text { text } => Some(text),
1498            _ => None,
1499        })
1500        .collect::<Vec<_>>()
1501        .join("\n")
1502}
1503
1504fn convert_openai_messages(
1505    messages: &[OpenAiRequestMessage],
1506) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1507    let mut converted = Vec::with_capacity(messages.len());
1508
1509    for (index, message) in messages.iter().enumerate() {
1510        let role = message.role.to_ascii_lowercase();
1511        match role.as_str() {
1512            "system" | "user" => {
1513                let content = parse_openai_content_parts(&message.content);
1514                if content.is_empty() {
1515                    return Err(openai_bad_request(format!(
1516                        "messages[{index}] must include text content"
1517                    )));
1518                }
1519
1520                converted.push(crate::provider::Message {
1521                    role: if role == "system" {
1522                        crate::provider::Role::System
1523                    } else {
1524                        crate::provider::Role::User
1525                    },
1526                    content,
1527                });
1528            }
1529            "assistant" => {
1530                let mut content = parse_openai_content_parts(&message.content);
1531                for tool_call in &message.tool_calls {
1532                    if tool_call.kind != "function" {
1533                        return Err(openai_bad_request(format!(
1534                            "messages[{index}].tool_calls only support `function`"
1535                        )));
1536                    }
1537                    if tool_call.function.name.trim().is_empty() {
1538                        return Err(openai_bad_request(format!(
1539                            "messages[{index}].tool_calls[].function.name is required"
1540                        )));
1541                    }
1542
1543                    content.push(crate::provider::ContentPart::ToolCall {
1544                        id: tool_call.id.clone(),
1545                        name: tool_call.function.name.clone(),
1546                        arguments: tool_call.function.arguments.clone(),
1547                        thought_signature: None,
1548                    });
1549                }
1550
1551                if content.is_empty() {
1552                    return Err(openai_bad_request(format!(
1553                        "messages[{index}] must include `content` or `tool_calls` for assistant role"
1554                    )));
1555                }
1556
1557                converted.push(crate::provider::Message {
1558                    role: crate::provider::Role::Assistant,
1559                    content,
1560                });
1561            }
1562            "tool" => {
1563                let tool_call_id = message
1564                    .tool_call_id
1565                    .as_ref()
1566                    .map(|s| s.trim())
1567                    .filter(|s| !s.is_empty())
1568                    .ok_or_else(|| {
1569                        openai_bad_request(format!(
1570                            "messages[{index}].tool_call_id is required for tool role"
1571                        ))
1572                    })?
1573                    .to_string();
1574
1575                converted.push(crate::provider::Message {
1576                    role: crate::provider::Role::Tool,
1577                    content: vec![crate::provider::ContentPart::ToolResult {
1578                        tool_call_id,
1579                        content: parse_openai_tool_content(&message.content),
1580                    }],
1581                });
1582            }
1583            _ => {
1584                return Err(openai_bad_request(format!(
1585                    "messages[{index}].role `{}` is not supported",
1586                    message.role
1587                )));
1588            }
1589        }
1590    }
1591
1592    Ok(converted)
1593}
1594
1595fn convert_openai_tools(
1596    tools: &[OpenAiRequestTool],
1597) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1598    let mut converted = Vec::with_capacity(tools.len());
1599
1600    for (index, tool) in tools.iter().enumerate() {
1601        if tool.kind != "function" {
1602            return Err(openai_bad_request(format!(
1603                "tools[{index}].type `{}` is not supported",
1604                tool.kind
1605            )));
1606        }
1607        if tool.function.name.trim().is_empty() {
1608            return Err(openai_bad_request(format!(
1609                "tools[{index}].function.name is required"
1610            )));
1611        }
1612
1613        converted.push(crate::provider::ToolDefinition {
1614            name: tool.function.name.clone(),
1615            description: tool.function.description.clone(),
1616            parameters: tool.function.parameters.clone(),
1617        });
1618    }
1619
1620    Ok(converted)
1621}
1622
1623fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1624    match reason {
1625        crate::provider::FinishReason::Stop => "stop",
1626        crate::provider::FinishReason::Length => "length",
1627        crate::provider::FinishReason::ToolCalls => "tool_calls",
1628        crate::provider::FinishReason::ContentFilter => "content_filter",
1629        crate::provider::FinishReason::Error => "error",
1630    }
1631}
1632
1633fn convert_response_message(
1634    message: &crate::provider::Message,
1635) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1636    let mut texts = Vec::new();
1637    let mut tool_calls = Vec::new();
1638
1639    for part in &message.content {
1640        match part {
1641            crate::provider::ContentPart::Text { text } => {
1642                if !text.is_empty() {
1643                    texts.push(text.clone());
1644                }
1645            }
1646            crate::provider::ContentPart::ToolCall {
1647                id,
1648                name,
1649                arguments,
1650                ..
1651            } => {
1652                tool_calls.push(OpenAiResponseToolCall {
1653                    id: id.clone(),
1654                    kind: "function".to_string(),
1655                    function: OpenAiResponseToolCallFunction {
1656                        name: name.clone(),
1657                        arguments: arguments.clone(),
1658                    },
1659                });
1660            }
1661            _ => {}
1662        }
1663    }
1664
1665    let content = if texts.is_empty() {
1666        None
1667    } else {
1668        Some(texts.join("\n"))
1669    };
1670    let tool_calls = if tool_calls.is_empty() {
1671        None
1672    } else {
1673        Some(tool_calls)
1674    };
1675
1676    (content, tool_calls)
1677}
1678
1679fn openai_stream_chunk(
1680    id: &str,
1681    created: i64,
1682    model: &str,
1683    delta: serde_json::Value,
1684    finish_reason: Option<&str>,
1685) -> serde_json::Value {
1686    let finish_reason = finish_reason
1687        .map(|value| serde_json::Value::String(value.to_string()))
1688        .unwrap_or(serde_json::Value::Null);
1689
1690    serde_json::json!({
1691        "id": id,
1692        "object": "chat.completion.chunk",
1693        "created": created,
1694        "model": model,
1695        "choices": [{
1696            "index": 0,
1697            "delta": delta,
1698            "finish_reason": finish_reason,
1699        }]
1700    })
1701}
1702
1703fn openai_stream_event(payload: &serde_json::Value) -> Event {
1704    Event::default().data(payload.to_string())
1705}
1706
1707async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1708    let registry = crate::provider::ProviderRegistry::from_vault()
1709        .await
1710        .map_err(|error| {
1711            tracing::error!(error = %error, "Failed to load providers from Vault");
1712            openai_internal_error(format!("failed to load providers: {error}"))
1713        })?;
1714
1715    let model_futures = registry.list().into_iter().map(|provider_id| {
1716        let provider_id = provider_id.to_string();
1717        let provider = registry.get(&provider_id);
1718
1719        async move {
1720            let Some(provider) = provider else {
1721                return Vec::new();
1722            };
1723
1724            let now = chrono::Utc::now().timestamp();
1725            match provider.list_models().await {
1726                Ok(models) => models
1727                    .into_iter()
1728                    .map(|model| OpenAiModel {
1729                        id: make_openai_model_id(&provider_id, &model.id),
1730                        object: "model".to_string(),
1731                        created: now,
1732                        owned_by: canonicalize_provider_name(&provider_id).to_string(),
1733                    })
1734                    .collect(),
1735                Err(error) => {
1736                    tracing::warn!(
1737                        provider = %provider_id,
1738                        error = %error,
1739                        "Failed to list models for provider"
1740                    );
1741                    Vec::new()
1742                }
1743            }
1744        }
1745    });
1746
1747    let mut data: Vec<OpenAiModel> = join_all(model_futures)
1748        .await
1749        .into_iter()
1750        .flatten()
1751        .collect();
1752    data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1753
1754    Ok(Json(OpenAiModelsResponse {
1755        object: "list".to_string(),
1756        data,
1757    }))
1758}
1759
1760async fn openai_chat_completions(
1761    Json(req): Json<OpenAiChatCompletionRequest>,
1762) -> Result<Response, OpenAiApiError> {
1763    let is_stream = req.stream.unwrap_or(false);
1764    if req.model.trim().is_empty() {
1765        return Err(openai_bad_request("`model` is required"));
1766    }
1767    if req.messages.is_empty() {
1768        return Err(openai_bad_request("`messages` must not be empty"));
1769    }
1770
1771    let registry = crate::provider::ProviderRegistry::from_vault()
1772        .await
1773        .map_err(|error| {
1774            tracing::error!(error = %error, "Failed to load providers from Vault");
1775            openai_internal_error(format!("failed to load providers: {error}"))
1776        })?;
1777
1778    let providers = registry.list();
1779    if providers.is_empty() {
1780        return Err(openai_error(
1781            StatusCode::SERVICE_UNAVAILABLE,
1782            "No providers are configured in Vault",
1783            "server_error",
1784            "no_providers",
1785        ));
1786    }
1787
1788    let normalized_model = normalize_model_reference(&req.model);
1789    let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1790    let model_id = if maybe_provider.is_some() {
1791        if model_id.trim().is_empty() {
1792            return Err(openai_bad_request(
1793                "Model must be in `provider/model` format",
1794            ));
1795        }
1796        model_id.to_string()
1797    } else {
1798        req.model.trim().to_string()
1799    };
1800
1801    let selected_provider = if let Some(provider_name) = maybe_provider {
1802        let provider_name = canonicalize_provider_name(provider_name);
1803        if providers.contains(&provider_name) {
1804            provider_name.to_string()
1805        } else {
1806            return Err(openai_bad_request(format!(
1807                "Provider `{provider_name}` is not configured in Vault"
1808            )));
1809        }
1810    } else if providers.len() == 1 {
1811        providers[0].to_string()
1812    } else if providers.contains(&"openai") {
1813        "openai".to_string()
1814    } else {
1815        return Err(openai_bad_request(
1816            "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1817        ));
1818    };
1819
1820    let provider = registry.get(&selected_provider).ok_or_else(|| {
1821        openai_internal_error(format!(
1822            "Provider `{selected_provider}` was available but could not be loaded"
1823        ))
1824    })?;
1825
1826    let messages = convert_openai_messages(&req.messages)?;
1827    let tools = convert_openai_tools(&req.tools)?;
1828    let stop = match req.stop {
1829        Some(OpenAiStop::Single(stop)) => vec![stop],
1830        Some(OpenAiStop::Multiple(stops)) => stops,
1831        None => Vec::new(),
1832    };
1833
1834    let completion_request = crate::provider::CompletionRequest {
1835        messages,
1836        tools,
1837        model: model_id.clone(),
1838        temperature: req.temperature,
1839        top_p: req.top_p,
1840        max_tokens: req.max_completion_tokens.or(req.max_tokens),
1841        stop,
1842    };
1843
1844    let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1845    let now = chrono::Utc::now().timestamp();
1846    let openai_model = make_openai_model_id(&selected_provider, &model_id);
1847
1848    if is_stream {
1849        let mut provider_stream =
1850            provider
1851                .complete_stream(completion_request)
1852                .await
1853                .map_err(|error| {
1854                    tracing::warn!(
1855                        provider = %selected_provider,
1856                        model = %model_id,
1857                        error = %error,
1858                        "Streaming completion request failed"
1859                    );
1860                    openai_internal_error(error.to_string())
1861                })?;
1862
1863        let (stream_id, stream_model) = (chat_id.clone(), openai_model.clone());
1864        let event_stream = async_stream::stream! {
1865            let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1866            let mut next_tool_call_index = 0usize;
1867            let mut saw_text = false;
1868            let mut saw_tool_calls = false;
1869
1870            let role_chunk = openai_stream_chunk(
1871                &stream_id,
1872                now,
1873                &stream_model,
1874                serde_json::json!({ "role": "assistant" }),
1875                None,
1876            );
1877            yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1878
1879            while let Some(chunk) = provider_stream.next().await {
1880                match chunk {
1881                    crate::provider::StreamChunk::Text(text) => {
1882                        if text.is_empty() {
1883                            continue;
1884                        }
1885                        saw_text = true;
1886                        let content_chunk = openai_stream_chunk(
1887                            &stream_id,
1888                            now,
1889                            &stream_model,
1890                            serde_json::json!({ "content": text }),
1891                            None,
1892                        );
1893                        yield Ok(openai_stream_event(&content_chunk));
1894                    }
1895                    crate::provider::StreamChunk::ToolCallStart { id, name } => {
1896                        saw_tool_calls = true;
1897                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1898                            let value = next_tool_call_index;
1899                            next_tool_call_index += 1;
1900                            value
1901                        });
1902                        let tool_chunk = openai_stream_chunk(
1903                            &stream_id,
1904                            now,
1905                            &stream_model,
1906                            serde_json::json!({
1907                                "tool_calls": [{
1908                                    "index": index,
1909                                    "id": id,
1910                                    "type": "function",
1911                                    "function": {
1912                                        "name": name,
1913                                        "arguments": "",
1914                                    }
1915                                }]
1916                            }),
1917                            None,
1918                        );
1919                        yield Ok(openai_stream_event(&tool_chunk));
1920                    }
1921                    crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1922                        if arguments_delta.is_empty() {
1923                            continue;
1924                        }
1925                        saw_tool_calls = true;
1926                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1927                            let value = next_tool_call_index;
1928                            next_tool_call_index += 1;
1929                            value
1930                        });
1931                        let tool_delta_chunk = openai_stream_chunk(
1932                            &stream_id,
1933                            now,
1934                            &stream_model,
1935                            serde_json::json!({
1936                                "tool_calls": [{
1937                                    "index": index,
1938                                    "id": id,
1939                                    "type": "function",
1940                                    "function": {
1941                                        "arguments": arguments_delta,
1942                                    }
1943                                }]
1944                            }),
1945                            None,
1946                        );
1947                        yield Ok(openai_stream_event(&tool_delta_chunk));
1948                    }
1949                    crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1950                    crate::provider::StreamChunk::Done { .. }
1951                    | crate::provider::StreamChunk::Thinking(_) => {}
1952                    crate::provider::StreamChunk::Error(error) => {
1953                        let error_chunk = openai_stream_chunk(
1954                            &stream_id,
1955                            now,
1956                            &stream_model,
1957                            serde_json::json!({ "content": error }),
1958                            Some("error"),
1959                        );
1960                        yield Ok(openai_stream_event(&error_chunk));
1961                        yield Ok(Event::default().data("[DONE]"));
1962                        return;
1963                    }
1964                }
1965            }
1966
1967            let finish_reason = if saw_tool_calls && !saw_text {
1968                "tool_calls"
1969            } else {
1970                "stop"
1971            };
1972            let final_chunk = openai_stream_chunk(
1973                &stream_id,
1974                now,
1975                &stream_model,
1976                serde_json::json!({}),
1977                Some(finish_reason),
1978            );
1979            yield Ok(openai_stream_event(&final_chunk));
1980            yield Ok(Event::default().data("[DONE]"));
1981        };
1982
1983        return Ok(Sse::new(event_stream)
1984            .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1985            .into_response());
1986    }
1987
1988    let completion = provider
1989        .complete(completion_request)
1990        .await
1991        .map_err(|error| {
1992            tracing::warn!(
1993                provider = %selected_provider,
1994                model = %model_id,
1995                error = %error,
1996                "Completion request failed"
1997            );
1998            openai_internal_error(error.to_string())
1999        })?;
2000
2001    let (content, tool_calls) = convert_response_message(&completion.message);
2002
2003    Ok(Json(OpenAiChatCompletionResponse {
2004        id: chat_id,
2005        object: "chat.completion".to_string(),
2006        created: now,
2007        model: openai_model,
2008        choices: vec![OpenAiChatCompletionChoice {
2009            index: 0,
2010            message: OpenAiChatCompletionMessage {
2011                role: "assistant".to_string(),
2012                content,
2013                tool_calls,
2014            },
2015            finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
2016        }],
2017        usage: OpenAiUsage {
2018            prompt_tokens: completion.usage.prompt_tokens,
2019            completion_tokens: completion.usage.completion_tokens,
2020            total_tokens: completion.usage.total_tokens,
2021        },
2022    })
2023    .into_response())
2024}
2025
2026async fn start_cognition(
2027    State(state): State<AppState>,
2028    payload: Option<Json<StartCognitionRequest>>,
2029) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2030    state
2031        .cognition
2032        .start(payload.map(|Json(body)| body))
2033        .await
2034        .map(Json)
2035        .map_err(internal_error)
2036}
2037
2038async fn stop_cognition(
2039    State(state): State<AppState>,
2040    payload: Option<Json<StopCognitionRequest>>,
2041) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2042    let reason = payload.and_then(|Json(body)| body.reason);
2043    state
2044        .cognition
2045        .stop(reason)
2046        .await
2047        .map(Json)
2048        .map_err(internal_error)
2049}
2050
2051async fn get_cognition_status(
2052    State(state): State<AppState>,
2053) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2054    Ok(Json(state.cognition.status().await))
2055}
2056
2057async fn stream_cognition(
2058    State(state): State<AppState>,
2059) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2060    let rx = state.cognition.subscribe_events();
2061
2062    let event_stream = stream::unfold(rx, |mut rx| async move {
2063        match rx.recv().await {
2064            Ok(event) => {
2065                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
2066                let sse_event = Event::default().event("cognition").data(payload);
2067                Some((Ok(sse_event), rx))
2068            }
2069            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2070                let lag_event = Event::default()
2071                    .event("lag")
2072                    .data(format!("skipped {}", skipped));
2073                Some((Ok(lag_event), rx))
2074            }
2075            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2076        }
2077    });
2078
2079    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2080}
2081
2082/// Stream bus events as SSE, filtered by JWT topic claims.
2083///
2084/// The JWT must contain a `topics` claim with an array of topic patterns
2085/// to subscribe to. Events are filtered to only include envelopes whose
2086/// topic matches one of the allowed patterns.
2087async fn stream_bus_events(
2088    State(state): State<AppState>,
2089    req: Request<Body>,
2090) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2091    // Extract JWT claims from request extensions (set by auth middleware)
2092    let allowed_topics: Vec<String> = req
2093        .extensions()
2094        .get::<crate::server::auth::JwtClaims>()
2095        .map(|claims| claims.topics.clone())
2096        .unwrap_or_default();
2097
2098    // Subscribe to the bus
2099    let bus_handle = state.bus.handle("stream_bus_events");
2100    let rx = bus_handle.into_receiver();
2101
2102    let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2103        let allowed_topics = allowed_topics.clone();
2104        async move {
2105            match rx.recv().await {
2106                Ok(envelope) => {
2107                    // Filter by allowed topics if any are specified
2108                    let should_send = allowed_topics.is_empty()
2109                        || allowed_topics
2110                            .iter()
2111                            .any(|pattern| topic_matches(&envelope.topic, pattern));
2112
2113                    if should_send {
2114                        let payload =
2115                            serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2116                        let sse_event = Event::default().event("bus").data(payload);
2117                        return Some((Ok(sse_event), rx));
2118                    }
2119
2120                    // Skip this event but keep the receiver
2121                    Some((Ok(Event::default().event("keepalive").data("")), rx))
2122                }
2123                Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2124                    let lag_event = Event::default()
2125                        .event("lag")
2126                        .data(format!("skipped {}", skipped));
2127                    Some((Ok(lag_event), rx))
2128                }
2129                Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2130            }
2131        }
2132    });
2133
2134    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2135}
2136
2137/// Check if a topic matches a pattern.
2138/// Supports wildcards: `agent.*` matches `agent.123`, `agent.*.events` matches `agent.456.events`
2139fn topic_matches(topic: &str, pattern: &str) -> bool {
2140    if pattern == "*" {
2141        return true;
2142    }
2143    if let Some(prefix) = pattern.strip_suffix(".*") {
2144        return topic.starts_with(prefix);
2145    }
2146    if let Some(suffix) = pattern.strip_prefix(".*") {
2147        return topic.ends_with(suffix);
2148    }
2149    topic == pattern
2150}
2151
2152async fn get_latest_snapshot(
2153    State(state): State<AppState>,
2154) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2155    match state.cognition.latest_snapshot().await {
2156        Some(snapshot) => Ok(Json(snapshot)),
2157        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2158    }
2159}
2160
2161async fn create_persona(
2162    State(state): State<AppState>,
2163    Json(req): Json<CreatePersonaRequest>,
2164) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2165    state
2166        .cognition
2167        .create_persona(req)
2168        .await
2169        .map(Json)
2170        .map_err(internal_error)
2171}
2172
2173async fn spawn_persona(
2174    State(state): State<AppState>,
2175    Path(id): Path<String>,
2176    Json(req): Json<SpawnPersonaRequest>,
2177) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2178    state
2179        .cognition
2180        .spawn_child(&id, req)
2181        .await
2182        .map(Json)
2183        .map_err(internal_error)
2184}
2185
2186async fn reap_persona(
2187    State(state): State<AppState>,
2188    Path(id): Path<String>,
2189    payload: Option<Json<ReapPersonaRequest>>,
2190) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2191    let req = payload
2192        .map(|Json(body)| body)
2193        .unwrap_or(ReapPersonaRequest {
2194            cascade: Some(false),
2195            reason: None,
2196        });
2197
2198    state
2199        .cognition
2200        .reap_persona(&id, req)
2201        .await
2202        .map(Json)
2203        .map_err(internal_error)
2204}
2205
2206async fn get_swarm_lineage(
2207    State(state): State<AppState>,
2208) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2209    Ok(Json(state.cognition.lineage_graph().await))
2210}
2211
2212// ── Belief, Attention, Governance, Workspace handlers ──
2213
2214#[derive(Deserialize)]
2215struct BeliefFilter {
2216    status: Option<String>,
2217    persona: Option<String>,
2218}
2219
2220async fn list_beliefs(
2221    State(state): State<AppState>,
2222    Query(filter): Query<BeliefFilter>,
2223) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2224    let beliefs = state.cognition.get_beliefs().await;
2225    let mut result: Vec<Belief> = beliefs.into_values().collect();
2226
2227    if let Some(status) = &filter.status {
2228        result.retain(|b| {
2229            let s = serde_json::to_string(&b.status).unwrap_or_default();
2230            s.contains(status)
2231        });
2232    }
2233    if let Some(persona) = &filter.persona {
2234        result.retain(|b| &b.asserted_by == persona);
2235    }
2236
2237    result.sort_by(|a, b| {
2238        b.confidence
2239            .partial_cmp(&a.confidence)
2240            .unwrap_or(std::cmp::Ordering::Equal)
2241    });
2242    Ok(Json(result))
2243}
2244
2245async fn get_belief(
2246    State(state): State<AppState>,
2247    Path(id): Path<String>,
2248) -> Result<Json<Belief>, (StatusCode, String)> {
2249    match state.cognition.get_belief(&id).await {
2250        Some(belief) => Ok(Json(belief)),
2251        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2252    }
2253}
2254
2255async fn list_attention(
2256    State(state): State<AppState>,
2257) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2258    Ok(Json(state.cognition.get_attention_queue().await))
2259}
2260
2261async fn list_proposals(
2262    State(state): State<AppState>,
2263) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2264    let proposals = state.cognition.get_proposals().await;
2265    let mut result: Vec<Proposal> = proposals.into_values().collect();
2266    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2267    Ok(Json(result))
2268}
2269
2270async fn approve_proposal(
2271    State(state): State<AppState>,
2272    Path(id): Path<String>,
2273) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2274    state
2275        .cognition
2276        .approve_proposal(&id)
2277        .await
2278        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2279        .map_err(internal_error)
2280}
2281
2282async fn list_receipts(
2283    State(state): State<AppState>,
2284) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2285    Ok(Json(state.cognition.get_receipts().await))
2286}
2287
2288async fn get_workspace(
2289    State(state): State<AppState>,
2290) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2291    Ok(Json(state.cognition.get_workspace().await))
2292}
2293
2294async fn get_governance(
2295    State(state): State<AppState>,
2296) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2297    Ok(Json(state.cognition.get_governance().await))
2298}
2299
2300async fn get_persona(
2301    State(state): State<AppState>,
2302    axum::extract::Path(id): axum::extract::Path<String>,
2303) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2304    state
2305        .cognition
2306        .get_persona(&id)
2307        .await
2308        .map(Json)
2309        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2310}
2311
2312// ── Audit trail endpoints ──
2313
2314#[derive(Deserialize)]
2315struct AuditQuery {
2316    limit: Option<usize>,
2317    category: Option<String>,
2318}
2319
2320async fn list_audit_entries(
2321    State(state): State<AppState>,
2322    Query(query): Query<AuditQuery>,
2323) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2324    let limit = query.limit.unwrap_or(100).min(1000);
2325
2326    let entries = if let Some(ref cat) = query.category {
2327        let category = match cat.as_str() {
2328            "api" => AuditCategory::Api,
2329            "tool" | "tool_execution" => AuditCategory::ToolExecution,
2330            "session" => AuditCategory::Session,
2331            "cognition" => AuditCategory::Cognition,
2332            "swarm" => AuditCategory::Swarm,
2333            "auth" => AuditCategory::Auth,
2334            "k8s" => AuditCategory::K8s,
2335            "sandbox" => AuditCategory::Sandbox,
2336            "config" => AuditCategory::Config,
2337            _ => {
2338                return Err((
2339                    StatusCode::BAD_REQUEST,
2340                    format!("Unknown category: {}", cat),
2341                ));
2342            }
2343        };
2344        state.audit_log.by_category(category, limit).await
2345    } else {
2346        state.audit_log.recent(limit).await
2347    };
2348
2349    Ok(Json(entries))
2350}
2351
2352// ── Event Stream Replay API ──
2353// Enables auditors to reconstruct sessions from byte-range offsets.
2354// This is the key compliance feature for SOC 2, FedRAMP, and ATO processes.
2355
2356#[derive(Deserialize)]
2357struct ReplayQuery {
2358    /// Session ID to replay
2359    session_id: String,
2360    /// Optional: starting byte offset (for seeking to specific point)
2361    start_offset: Option<u64>,
2362    /// Optional: ending byte offset
2363    end_offset: Option<u64>,
2364    /// Optional: limit number of events to return
2365    limit: Option<usize>,
2366    /// Optional: filter by tool name
2367    tool_name: Option<String>,
2368}
2369
2370/// Replay session events from the JSONL event stream by byte-range offsets.
2371/// This allows auditors to reconstruct exactly what happened in a session,
2372/// including tool execution durations and success/failure status.
2373async fn replay_session_events(
2374    Query(query): Query<ReplayQuery>,
2375) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2376    use std::path::PathBuf;
2377
2378    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2379        .map(PathBuf::from)
2380        .ok()
2381        .ok_or_else(|| {
2382            (
2383                StatusCode::SERVICE_UNAVAILABLE,
2384                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2385            )
2386        })?;
2387
2388    let session_dir = base_dir.join(&query.session_id);
2389
2390    // Check if session directory exists
2391    if !session_dir.exists() {
2392        return Err((
2393            StatusCode::NOT_FOUND,
2394            format!("Session not found: {}", query.session_id),
2395        ));
2396    }
2397
2398    let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2399
2400    // Read all event files in the session directory using std::fs for simplicity
2401    let entries = std::fs::read_dir(&session_dir)
2402        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2403
2404    for entry in entries {
2405        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2406        let path = entry.path();
2407
2408        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2409            continue;
2410        }
2411
2412        // Parse byte range from filename: {timestamp}-chat-events-{start}-{end}.jsonl
2413        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2414
2415        if let Some(offsets) = filename
2416            .strip_prefix("T")
2417            .or_else(|| filename.strip_prefix("202"))
2418        {
2419            // Extract start and end offsets from filename
2420            let parts: Vec<&str> = offsets.split('-').collect();
2421            if parts.len() >= 4 {
2422                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2423                let end: u64 = parts[parts.len() - 1]
2424                    .trim_end_matches(".jsonl")
2425                    .parse()
2426                    .unwrap_or(0);
2427
2428                // Filter by byte range if specified
2429                if let Some(query_start) = query.start_offset
2430                    && end <= query_start
2431                {
2432                    continue;
2433                }
2434                if let Some(query_end) = query.end_offset
2435                    && start >= query_end
2436                {
2437                    continue;
2438                }
2439
2440                // Read and parse events from this file
2441                let content = std::fs::read_to_string(&path)
2442                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2443
2444                for line in content.lines() {
2445                    if line.trim().is_empty() {
2446                        continue;
2447                    }
2448                    if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2449                        // Filter by tool name if specified
2450                        if let Some(ref tool_filter) = query.tool_name {
2451                            if let Some(event_tool) =
2452                                event.get("tool_name").and_then(|v| v.as_str())
2453                            {
2454                                if event_tool != tool_filter {
2455                                    continue;
2456                                }
2457                            } else {
2458                                continue;
2459                            }
2460                        }
2461                        all_events.push((start, end, event));
2462                    }
2463                }
2464            }
2465        }
2466    }
2467
2468    // Sort by start offset
2469    all_events.sort_by_key(|(s, _, _)| *s);
2470
2471    // Apply limit
2472    let limit = query.limit.unwrap_or(1000).min(10000);
2473    let events: Vec<_> = all_events
2474        .into_iter()
2475        .take(limit)
2476        .map(|(_, _, e)| e)
2477        .collect();
2478
2479    Ok(Json(events))
2480}
2481
2482/// Get session event files metadata (for audit index)
2483async fn list_session_event_files(
2484    Query(query): Query<ReplayQuery>,
2485) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2486    use std::path::PathBuf;
2487
2488    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2489        .map(PathBuf::from)
2490        .ok()
2491        .ok_or_else(|| {
2492            (
2493                StatusCode::SERVICE_UNAVAILABLE,
2494                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2495            )
2496        })?;
2497
2498    let session_dir = base_dir.join(&query.session_id);
2499    if !session_dir.exists() {
2500        return Err((
2501            StatusCode::NOT_FOUND,
2502            format!("Session not found: {}", query.session_id),
2503        ));
2504    }
2505
2506    let mut files: Vec<EventFileMeta> = Vec::new();
2507
2508    // Use std::fs for simplicity
2509    let entries = std::fs::read_dir(&session_dir)
2510        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2511
2512    for entry in entries {
2513        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2514        let path = entry.path();
2515
2516        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2517            continue;
2518        }
2519
2520        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2521
2522        // Parse byte range from filename
2523        if let Some(offsets) = filename
2524            .strip_prefix("T")
2525            .or_else(|| filename.strip_prefix("202"))
2526        {
2527            let parts: Vec<&str> = offsets.split('-').collect();
2528            if parts.len() >= 4 {
2529                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2530                let end: u64 = parts[parts.len() - 1]
2531                    .trim_end_matches(".jsonl")
2532                    .parse()
2533                    .unwrap_or(0);
2534
2535                let metadata = std::fs::metadata(&path)
2536                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2537
2538                files.push(EventFileMeta {
2539                    filename: filename.to_string(),
2540                    start_offset: start,
2541                    end_offset: end,
2542                    size_bytes: metadata.len(),
2543                });
2544            }
2545        }
2546    }
2547
2548    files.sort_by_key(|f| f.start_offset);
2549    Ok(Json(files))
2550}
2551
2552#[derive(Serialize)]
2553struct EventFileMeta {
2554    filename: String,
2555    start_offset: u64,
2556    end_offset: u64,
2557    size_bytes: u64,
2558}
2559
2560// ── K8s self-deployment endpoints ──
2561
2562async fn get_k8s_status(
2563    State(state): State<AppState>,
2564) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2565    Ok(Json(state.k8s.status().await))
2566}
2567
2568#[derive(Deserialize)]
2569struct ScaleRequest {
2570    replicas: i32,
2571}
2572
2573async fn k8s_scale(
2574    State(state): State<AppState>,
2575    Json(req): Json<ScaleRequest>,
2576) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2577    if req.replicas < 0 || req.replicas > 100 {
2578        return Err((
2579            StatusCode::BAD_REQUEST,
2580            "Replicas must be between 0 and 100".to_string(),
2581        ));
2582    }
2583
2584    state
2585        .audit_log
2586        .log(
2587            AuditCategory::K8s,
2588            format!("scale:{}", req.replicas),
2589            AuditOutcome::Success,
2590            None,
2591            None,
2592        )
2593        .await;
2594
2595    state
2596        .k8s
2597        .scale(req.replicas)
2598        .await
2599        .map(Json)
2600        .map_err(internal_error)
2601}
2602
2603async fn k8s_restart(
2604    State(state): State<AppState>,
2605) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2606    state
2607        .audit_log
2608        .log(
2609            AuditCategory::K8s,
2610            "rolling_restart",
2611            AuditOutcome::Success,
2612            None,
2613            None,
2614        )
2615        .await;
2616
2617    state
2618        .k8s
2619        .rolling_restart()
2620        .await
2621        .map(Json)
2622        .map_err(internal_error)
2623}
2624
2625async fn k8s_list_pods(
2626    State(state): State<AppState>,
2627) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2628    state
2629        .k8s
2630        .list_pods()
2631        .await
2632        .map(Json)
2633        .map_err(internal_error)
2634}
2635
2636async fn k8s_actions(
2637    State(state): State<AppState>,
2638) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2639    Ok(Json(state.k8s.recent_actions(100).await))
2640}
2641
2642#[derive(Deserialize)]
2643struct SpawnSubagentRequest {
2644    subagent_id: String,
2645    #[serde(default)]
2646    image: Option<String>,
2647    #[serde(default)]
2648    env_vars: std::collections::HashMap<String, String>,
2649    #[serde(default)]
2650    labels: std::collections::HashMap<String, String>,
2651    #[serde(default)]
2652    command: Option<Vec<String>>,
2653    #[serde(default)]
2654    args: Option<Vec<String>>,
2655}
2656
2657async fn k8s_spawn_subagent(
2658    State(state): State<AppState>,
2659    Json(req): Json<SpawnSubagentRequest>,
2660) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2661    state
2662        .k8s
2663        .spawn_subagent_pod_with_spec(
2664            &req.subagent_id,
2665            crate::k8s::SubagentPodSpec {
2666                image: req.image,
2667                env_vars: req.env_vars,
2668                labels: req.labels,
2669                command: req.command,
2670                args: req.args,
2671            },
2672        )
2673        .await
2674        .map(Json)
2675        .map_err(internal_error)
2676}
2677
2678async fn k8s_delete_subagent(
2679    State(state): State<AppState>,
2680    axum::extract::Path(id): axum::extract::Path<String>,
2681) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2682    state
2683        .k8s
2684        .delete_subagent_pod(&id)
2685        .await
2686        .map(Json)
2687        .map_err(internal_error)
2688}
2689
2690/// Request to register a tool
2691#[derive(Debug, Deserialize)]
2692pub struct RegisterToolRequest {
2693    pub id: String,
2694    pub name: String,
2695    pub description: String,
2696    pub version: String,
2697    pub endpoint: String,
2698    pub capabilities: Vec<String>,
2699    pub parameters: serde_json::Value,
2700}
2701
2702/// Response from registering a tool
2703#[derive(Serialize)]
2704struct RegisterToolResponse {
2705    tool: RegisteredToolView,
2706    message: String,
2707}
2708
2709/// List all registered tools (active, non-expired)
2710async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredToolView>> {
2711    Json(
2712        state
2713            .tool_registry
2714            .list()
2715            .await
2716            .into_iter()
2717            .map(RegisteredToolView::from)
2718            .collect(),
2719    )
2720}
2721
2722/// Register a new tool
2723async fn register_tool(
2724    State(state): State<AppState>,
2725    Json(req): Json<RegisterToolRequest>,
2726) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2727    let now = chrono::Utc::now();
2728    let tool = RegisteredTool {
2729        id: req.id.clone(),
2730        name: req.name,
2731        description: req.description,
2732        version: req.version,
2733        endpoint: req.endpoint,
2734        capabilities: req.capabilities,
2735        parameters: req.parameters,
2736        execution_mode: tool_contract::DISCOVERY_ONLY_MODE.to_string(),
2737        registered_at: now,
2738        last_heartbeat: now,
2739        expires_at: now + Duration::from_secs(90),
2740    };
2741
2742    state.tool_registry.register(tool.clone()).await;
2743
2744    tracing::info!(tool_id = %tool.id, "Tool registered");
2745
2746    Ok(Json(RegisterToolResponse {
2747        tool: RegisteredToolView::from(tool),
2748        message: tool_contract::discovery_registration_message(),
2749    }))
2750}
2751
2752/// Heartbeat endpoint to extend tool TTL
2753async fn tool_heartbeat(
2754    State(state): State<AppState>,
2755    Path(id): Path<String>,
2756) -> Result<Json<RegisteredToolView>, (StatusCode, String)> {
2757    state
2758        .tool_registry
2759        .heartbeat(&id)
2760        .await
2761        .map(|tool| {
2762            tracing::info!(tool_id = %id, "Tool heartbeat received");
2763            Json(RegisteredToolView::from(tool))
2764        })
2765        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2766}
2767
2768/// List registered plugins.
2769async fn list_plugins(State(state): State<AppState>) -> Json<PluginListResponse> {
2770    let test_sig =
2771        state
2772            .plugin_registry
2773            .signing_key()
2774            .sign("_probe", "0.0.0", &state.server_fingerprint);
2775    Json(PluginListResponse {
2776        server_fingerprint: state.server_fingerprint.clone(),
2777        signing_available: !test_sig.is_empty(),
2778        plugins: state.plugin_registry.list().await,
2779    })
2780}
2781
2782#[derive(Serialize)]
2783struct PluginListResponse {
2784    server_fingerprint: String,
2785    signing_available: bool,
2786    plugins: Vec<PluginManifest>,
2787}
2788
2789// ── Agent task compatibility surface ─────────────────────────────────────
2790// These handlers provide the /v1/agent/tasks/* surface the marketing-site
2791// dashboard expects, backed by the existing KnativeTaskQueue.
2792
2793/// List all agent tasks (wraps Knative task queue)
2794async fn list_agent_tasks(
2795    State(state): State<AppState>,
2796    Query(params): Query<ListAgentTasksQuery>,
2797) -> Json<serde_json::Value> {
2798    let tasks = state.knative_tasks.list().await;
2799    let filtered: Vec<_> = tasks
2800        .into_iter()
2801        .filter(|t| params.status.as_ref().is_none_or(|s| t.status == *s))
2802        .filter(|t| {
2803            params
2804                .agent_type
2805                .as_ref()
2806                .is_none_or(|a| t.agent_type == *a)
2807        })
2808        .collect();
2809    Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2810}
2811
2812#[derive(Deserialize)]
2813struct ListAgentTasksQuery {
2814    status: Option<String>,
2815    agent_type: Option<String>,
2816}
2817
2818/// Create a new agent task
2819async fn create_agent_task(
2820    State(state): State<AppState>,
2821    Json(req): Json<CreateAgentTaskRequest>,
2822) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2823    let task_id = uuid::Uuid::new_v4().to_string();
2824    let task = KnativeTask {
2825        task_id: task_id.clone(),
2826        title: req.title,
2827        description: req.description,
2828        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2829        priority: req.priority.unwrap_or(0),
2830        received_at: chrono::Utc::now(),
2831        status: "pending".to_string(),
2832    };
2833    state.knative_tasks.push(task).await;
2834    Ok(Json(serde_json::json!({
2835        "task_id": task_id,
2836        "status": "pending",
2837    })))
2838}
2839
2840#[derive(Deserialize)]
2841#[allow(dead_code)]
2842struct CreateAgentTaskRequest {
2843    title: String,
2844    description: String,
2845    agent_type: Option<String>,
2846    model: Option<String>,
2847    priority: Option<i32>,
2848}
2849
2850/// Get a single agent task by ID
2851async fn get_agent_task(
2852    State(state): State<AppState>,
2853    Path(task_id): Path<String>,
2854) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2855    state
2856        .knative_tasks
2857        .get(&task_id)
2858        .await
2859        .map(Json)
2860        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2861}
2862
2863/// Get task output (returns task details including result)
2864async fn get_agent_task_output(
2865    State(state): State<AppState>,
2866    Path(task_id): Path<String>,
2867) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2868    let task = state
2869        .knative_tasks
2870        .get(&task_id)
2871        .await
2872        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2873    Ok(Json(serde_json::json!({
2874        "task_id": task.task_id,
2875        "status": task.status,
2876        "title": task.title,
2877        "output": null,
2878    })))
2879}
2880
2881/// Receive task output from worker and broadcast via bus for SSE streaming
2882#[derive(Deserialize)]
2883struct TaskOutputPayload {
2884    #[allow(dead_code)]
2885    #[serde(default)]
2886    worker_id: Option<String>,
2887    #[serde(default)]
2888    output: Option<String>,
2889}
2890
2891async fn agent_task_output(
2892    State(state): State<AppState>,
2893    Path(task_id): Path<String>,
2894    Json(payload): Json<TaskOutputPayload>,
2895) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2896    // Verify task exists
2897    let task_exists = state.knative_tasks.get(&task_id).await.is_some();
2898    if !task_exists {
2899        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
2900    }
2901
2902    // Update task status to Working if it's still pending
2903    let _ = state.knative_tasks.update_status(&task_id, "working").await;
2904
2905    // Broadcast output via bus for SSE subscribers
2906    if let Some(ref output) = payload.output {
2907        let bus_handle = state.bus.handle("task-output");
2908        let _ = bus_handle.send(
2909            format!("task.{}", task_id),
2910            crate::bus::BusMessage::TaskUpdate {
2911                task_id: task_id.clone(),
2912                state: crate::a2a::types::TaskState::Working,
2913                message: Some(output.clone()),
2914            },
2915        );
2916    }
2917
2918    Ok(Json(serde_json::json!({
2919        "task_id": task_id,
2920        "status": "received",
2921    })))
2922}
2923
2924/// Stream task output as SSE events
2925async fn stream_agent_task_output(
2926    State(state): State<AppState>,
2927    Path(task_id): Path<String>,
2928) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2929    // Verify task exists
2930    state
2931        .knative_tasks
2932        .get(&task_id)
2933        .await
2934        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2935
2936    let bus_handle = state.bus.handle("task-stream");
2937    let rx = bus_handle.into_receiver();
2938    let topic_prefix = format!("task.{task_id}");
2939
2940    let stream = async_stream::try_stream! {
2941        let mut rx = rx;
2942        loop {
2943            match rx.recv().await {
2944                Ok(envelope) => {
2945                    if !envelope.topic.starts_with(&topic_prefix) {
2946                        continue;
2947                    }
2948                    let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2949                    yield Event::default().event("output").data(data);
2950                }
2951                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2952                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2953            }
2954        }
2955    };
2956
2957    Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2958}
2959
2960// ── Worker connectivity ─────────────────────────────────────────────────
2961
2962/// List connected workers (K8s pods with agent label)
2963async fn list_connected_workers(
2964    State(state): State<AppState>,
2965) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2966    // Use K8s pod listing as proxy for connected workers
2967    let pods = state.k8s.list_pods().await.unwrap_or_default();
2968    let workers: Vec<serde_json::Value> = pods
2969        .iter()
2970        .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2971        .map(|p| {
2972            serde_json::json!({
2973                "worker_id": p.name,
2974                "name": p.name,
2975                "status": p.phase,
2976                "is_sse_connected": p.phase == "Running",
2977                "last_seen": chrono::Utc::now().to_rfc3339(),
2978            })
2979        })
2980        .collect();
2981    Ok(Json(serde_json::json!({ "workers": workers })))
2982}
2983
2984// ── Task dispatch ───────────────────────────────────────────────────────
2985
2986/// Dispatch a task (creates a Knative task and returns immediately)
2987async fn dispatch_task(
2988    State(state): State<AppState>,
2989    Json(req): Json<DispatchTaskRequest>,
2990) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2991    let task_id = uuid::Uuid::new_v4().to_string();
2992    let task = KnativeTask {
2993        task_id: task_id.clone(),
2994        title: req.title,
2995        description: req.description,
2996        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2997        priority: req.priority.unwrap_or(0),
2998        received_at: chrono::Utc::now(),
2999        status: "pending".to_string(),
3000    };
3001
3002    // Publish task event to agent bus for connected workers
3003    let handle = state.bus.handle("task-dispatch");
3004    handle.send(
3005        format!("task.{}", task_id),
3006        crate::bus::BusMessage::TaskUpdate {
3007            task_id: task_id.clone(),
3008            state: crate::a2a::types::TaskState::Submitted,
3009            message: Some(format!("Task dispatched: {}", task.title)),
3010        },
3011    );
3012
3013    state.knative_tasks.push(task).await;
3014
3015    Ok(Json(serde_json::json!({
3016        "task_id": task_id,
3017        "status": "pending",
3018        "dispatched_via_knative": true,
3019    })))
3020}
3021
3022#[derive(Deserialize)]
3023#[allow(dead_code)]
3024struct DispatchTaskRequest {
3025    title: String,
3026    description: String,
3027    agent_type: Option<String>,
3028    model: Option<String>,
3029    priority: Option<i32>,
3030    metadata: Option<serde_json::Value>,
3031}
3032
3033// ── Voice REST bridge ───────────────────────────────────────────────────
3034// Bridges the REST /v1/voice/* surface the dashboard expects to the
3035// existing gRPC VoiceService implementation via the AgentBus.
3036
3037/// Create a voice session (REST bridge for VoiceService::CreateVoiceSession)
3038async fn create_voice_session_rest(
3039    State(state): State<AppState>,
3040    Json(req): Json<CreateVoiceSessionRequest>,
3041) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3042    let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
3043    let room_name = format!("voice-{}", uuid::Uuid::new_v4());
3044
3045    // Publish session started event to bus
3046    let handle = state.bus.handle("voice-rest");
3047    handle.send_voice_session_started(&room_name, &voice_id);
3048
3049    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3050
3051    Ok(Json(serde_json::json!({
3052        "room_name": room_name,
3053        "voice": voice_id,
3054        "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
3055        "access_token": "",
3056        "livekit_url": livekit_url,
3057        "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
3058    })))
3059}
3060
3061#[derive(Deserialize)]
3062#[allow(dead_code)]
3063struct CreateVoiceSessionRequest {
3064    voice: Option<String>,
3065    mode: Option<String>,
3066    codebase_id: Option<String>,
3067    user_id: Option<String>,
3068}
3069
3070/// Get a voice session (REST bridge)
3071async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
3072    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3073    Json(serde_json::json!({
3074        "room_name": room_name,
3075        "state": "active",
3076        "agent_state": "listening",
3077        "access_token": "",
3078        "livekit_url": livekit_url,
3079    }))
3080}
3081
3082/// Delete a voice session (REST bridge)
3083async fn delete_voice_session_rest(
3084    State(state): State<AppState>,
3085    Path(room_name): Path<String>,
3086) -> Json<serde_json::Value> {
3087    let handle = state.bus.handle("voice-rest");
3088    handle.send_voice_session_ended(&room_name, "user_ended");
3089    Json(serde_json::json!({ "status": "deleted" }))
3090}
3091
3092/// List available voices (REST bridge)
3093async fn list_voices_rest() -> Json<serde_json::Value> {
3094    Json(serde_json::json!({
3095        "voices": [
3096            { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
3097            { "voice_id": "puck", "name": "Puck", "language": "english" },
3098            { "voice_id": "charon", "name": "Charon", "language": "english" },
3099            { "voice_id": "kore", "name": "Kore", "language": "english" },
3100            { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
3101            { "voice_id": "aoede", "name": "Aoede", "language": "english" },
3102        ]
3103    }))
3104}
3105
3106// ── Session resume ──────────────────────────────────────────────────────
3107
3108/// Resume a codebase-scoped session (what the dashboard session resume flow calls)
3109async fn resume_codebase_session(
3110    Path((_codebase_id, session_id)): Path<(String, String)>,
3111    Json(req): Json<ResumeSessionRequest>,
3112) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3113    // Load the session if it exists, or create a new one
3114    let mut session = match crate::session::Session::load(&session_id).await {
3115        Ok(s) => s,
3116        Err(_) => {
3117            // Create a new session for this codebase
3118            let mut s = crate::session::Session::new()
3119                .await
3120                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3121            if let Some(agent) = &req.agent {
3122                s.set_agent_name(agent.clone());
3123            }
3124            s.save()
3125                .await
3126                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3127            s
3128        }
3129    };
3130
3131    // If there's a prompt, execute it as a task
3132    if let Some(prompt) = &req.prompt
3133        && !prompt.is_empty()
3134    {
3135        match session.prompt(prompt).await {
3136            Ok(result) => {
3137                session.save().await.ok();
3138                return Ok(Json(serde_json::json!({
3139                    "session_id": session.id,
3140                    "active_session_id": session.id,
3141                    "status": "completed",
3142                    "result": result.text,
3143                })));
3144            }
3145            Err(e) => {
3146                return Ok(Json(serde_json::json!({
3147                    "session_id": session.id,
3148                    "active_session_id": session.id,
3149                    "status": "failed",
3150                    "error": e.to_string(),
3151                })));
3152            }
3153        }
3154    }
3155
3156    // No prompt = just resume/check session
3157    Ok(Json(serde_json::json!({
3158        "session_id": session.id,
3159        "active_session_id": session.id,
3160        "status": "ready",
3161    })))
3162}
3163
3164#[derive(Deserialize)]
3165#[allow(dead_code)]
3166struct ResumeSessionRequest {
3167    prompt: Option<String>,
3168    agent: Option<String>,
3169    model: Option<String>,
3170}
3171
3172fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3173    let message = error.to_string();
3174    if message.contains("not found") {
3175        return (StatusCode::NOT_FOUND, message);
3176    }
3177    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3178        return (StatusCode::BAD_REQUEST, message);
3179    }
3180    (StatusCode::INTERNAL_SERVER_ERROR, message)
3181}
3182
3183fn env_bool(name: &str, default: bool) -> bool {
3184    std::env::var(name)
3185        .ok()
3186        .and_then(|v| match v.to_ascii_lowercase().as_str() {
3187            "1" | "true" | "yes" | "on" => Some(true),
3188            "0" | "false" | "no" | "off" => Some(false),
3189            _ => None,
3190        })
3191        .unwrap_or(default)
3192}
3193
3194#[cfg(test)]
3195mod tests {
3196    use super::{
3197        RegisteredTool, RegisteredToolView, match_policy_rule, normalize_model_reference,
3198        required_permission,
3199    };
3200    use axum::http::StatusCode;
3201    use serde_json::json;
3202
3203    #[test]
3204    fn policy_prompt_session_requires_execute_permission() {
3205        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3206        assert_eq!(permission, Some("agent:execute"));
3207    }
3208
3209    #[test]
3210    fn policy_create_session_keeps_sessions_write_permission() {
3211        let permission = match_policy_rule("/api/session", "POST");
3212        assert_eq!(permission, Some("sessions:write"));
3213    }
3214
3215    #[test]
3216    fn policy_proposal_approval_requires_execute_permission() {
3217        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3218        assert_eq!(permission, Some("agent:execute"));
3219    }
3220
3221    #[test]
3222    fn policy_openai_models_requires_read_permission() {
3223        let permission = match_policy_rule("/v1/models", "GET");
3224        assert_eq!(permission, Some("agent:read"));
3225    }
3226
3227    #[test]
3228    fn policy_openai_chat_completions_requires_execute_permission() {
3229        let permission = match_policy_rule("/v1/chat/completions", "POST");
3230        assert_eq!(permission, Some("agent:execute"));
3231    }
3232
3233    #[test]
3234    fn policy_unmapped_route_is_denied() {
3235        let permission = required_permission("/v1/new-route", "GET");
3236        assert_eq!(permission, Err(StatusCode::FORBIDDEN));
3237    }
3238
3239    #[test]
3240    fn normalize_model_reference_accepts_colon_format() {
3241        assert_eq!(
3242            normalize_model_reference("openai:gpt-4o"),
3243            "openai/gpt-4o".to_string()
3244        );
3245    }
3246
3247    #[test]
3248    fn registered_tool_view_derives_discovery_contract() {
3249        let now = chrono::Utc::now();
3250        let tool = RegisteredTool {
3251            id: "demo".into(),
3252            name: "Demo".into(),
3253            description: "Demo tool".into(),
3254            version: "1.0.0".into(),
3255            endpoint: "https://example.test/tool".into(),
3256            capabilities: vec!["demo".into()],
3257            parameters: json!({}),
3258            execution_mode: super::tool_contract::DISCOVERY_ONLY_MODE.into(),
3259            registered_at: now,
3260            last_heartbeat: now,
3261            expires_at: now,
3262        };
3263
3264        let value = serde_json::to_value(RegisteredToolView::from(tool)).unwrap();
3265
3266        assert_eq!(value["execution_mode"], "discovery_only");
3267        assert_eq!(value["agent_executable"], false);
3268    }
3269}