Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::{AgentBus, BusEnvelope};
11use crate::cli::ServeArgs;
12use crate::cloudevents::parse_cloud_event;
13use crate::cognition::{
14    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
15    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
16    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
17    executor::DecisionReceipt,
18};
19use crate::config::Config;
20use crate::k8s::K8sManager;
21use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
22use anyhow::Result;
23use auth::AuthState;
24use axum::{
25    Router,
26    body::Body,
27    extract::Path,
28    extract::{Query, State},
29    http::{HeaderMap, Method, Request, StatusCode},
30    middleware::{self, Next},
31    response::sse::{Event, KeepAlive, Sse},
32    response::{IntoResponse, Json, Response},
33    routing::{get, post},
34};
35use futures::{StreamExt, future::join_all, stream};
36use http::HeaderValue;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::convert::Infallible;
40use std::sync::Arc;
41use std::time::Duration;
42use tokio::sync::{Mutex, broadcast};
43use tonic_web::GrpcWebLayer;
44use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
45use tower_http::trace::TraceLayer;
46
47/// Task received from Knative Eventing
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct KnativeTask {
50    pub task_id: String,
51    pub title: String,
52    pub description: String,
53    pub agent_type: String,
54    pub priority: i32,
55    pub received_at: chrono::DateTime<chrono::Utc>,
56    pub status: String,
57}
58
59/// Queue for Knative tasks waiting to be processed
60#[derive(Clone)]
61pub struct KnativeTaskQueue {
62    tasks: Arc<Mutex<Vec<KnativeTask>>>,
63}
64
65impl KnativeTaskQueue {
66    pub fn new() -> Self {
67        Self {
68            tasks: Arc::new(Mutex::new(Vec::new())),
69        }
70    }
71
72    pub async fn push(&self, task: KnativeTask) {
73        self.tasks.lock().await.push(task);
74    }
75
76    #[allow(dead_code)]
77    pub async fn pop(&self) -> Option<KnativeTask> {
78        self.tasks.lock().await.pop()
79    }
80
81    pub async fn list(&self) -> Vec<KnativeTask> {
82        self.tasks.lock().await.clone()
83    }
84
85    #[allow(dead_code)]
86    pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
87        self.tasks
88            .lock()
89            .await
90            .iter()
91            .find(|t| t.task_id == task_id)
92            .cloned()
93    }
94
95    pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
96        let mut tasks = self.tasks.lock().await;
97        if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
98            task.status = status.to_string();
99            true
100        } else {
101            false
102        }
103    }
104}
105
106impl Default for KnativeTaskQueue {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112/// A registered tool with TTL tracking
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct RegisteredTool {
115    pub id: String,
116    pub name: String,
117    pub description: String,
118    pub version: String,
119    pub endpoint: String,
120    pub capabilities: Vec<String>,
121    pub parameters: serde_json::Value,
122    pub registered_at: chrono::DateTime<chrono::Utc>,
123    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
124    pub expires_at: chrono::DateTime<chrono::Utc>,
125}
126
127/// Tool registry with TTL-based expiry
128#[derive(Clone)]
129pub struct ToolRegistry {
130    tools: Arc<tokio::sync::RwLock<HashMap<String, RegisteredTool>>>,
131}
132
133impl Default for ToolRegistry {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139impl ToolRegistry {
140    pub fn new() -> Self {
141        Self {
142            tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
143        }
144    }
145
146    /// Register a new tool
147    pub async fn register(&self, tool: RegisteredTool) {
148        let mut tools = self.tools.write().await;
149        tools.insert(tool.id.clone(), tool);
150    }
151
152    /// Get a tool by ID
153    #[allow(dead_code)]
154    pub async fn get(&self, id: &str) -> Option<RegisteredTool> {
155        let tools = self.tools.read().await;
156        tools.get(id).cloned()
157    }
158
159    /// List all tools (excluding expired)
160    pub async fn list(&self) -> Vec<RegisteredTool> {
161        let tools = self.tools.read().await;
162        let now = chrono::Utc::now();
163        tools
164            .values()
165            .filter(|t| t.expires_at > now)
166            .cloned()
167            .collect()
168    }
169
170    /// Update heartbeat for a tool (extends TTL by 30s)
171    pub async fn heartbeat(&self, id: &str) -> Option<RegisteredTool> {
172        let mut tools = self.tools.write().await;
173        if let Some(tool) = tools.get_mut(id) {
174            let now = chrono::Utc::now();
175            tool.last_heartbeat = now;
176            tool.expires_at = now + Duration::from_secs(90);
177            return Some(tool.clone());
178        }
179        None
180    }
181
182    /// Clean up expired tools
183    pub async fn cleanup(&self) {
184        let mut tools = self.tools.write().await;
185        let now = chrono::Utc::now();
186        tools.retain(|_, t| t.expires_at > now);
187    }
188}
189
190/// Server state shared across handlers
191#[derive(Clone)]
192pub struct AppState {
193    pub config: Arc<Config>,
194    pub cognition: Arc<CognitionRuntime>,
195    pub audit_log: AuditLog,
196    pub k8s: Arc<K8sManager>,
197    pub auth: AuthState,
198    pub bus: Arc<AgentBus>,
199    pub knative_tasks: KnativeTaskQueue,
200    pub tool_registry: ToolRegistry,
201}
202
203/// Audit middleware — logs every request/response to the audit trail.
204async fn audit_middleware(
205    State(state): State<AppState>,
206    request: Request<Body>,
207    next: Next,
208) -> Response {
209    let method = request.method().clone();
210    let path = request.uri().path().to_string();
211    let started = std::time::Instant::now();
212
213    let response = next.run(request).await;
214
215    let duration_ms = started.elapsed().as_millis() as u64;
216    let status = response.status().as_u16();
217    let outcome = if status < 400 {
218        AuditOutcome::Success
219    } else if status == 401 || status == 403 {
220        AuditOutcome::Denied
221    } else {
222        AuditOutcome::Failure
223    };
224
225    state
226        .audit_log
227        .record(audit::AuditEntry {
228            id: uuid::Uuid::new_v4().to_string(),
229            timestamp: chrono::Utc::now(),
230            category: AuditCategory::Api,
231            action: format!("{} {}", method, path),
232            principal: None,
233            outcome,
234            detail: Some(serde_json::json!({ "status": status })),
235            duration_ms: Some(duration_ms),
236            okr_id: None,
237            okr_run_id: None,
238            relay_id: None,
239            session_id: None,
240        })
241        .await;
242
243    response
244}
245
246/// Mapping from (path pattern, HTTP method) → OPA permission action.
247/// The first matching rule wins.
248struct PolicyRule {
249    pattern: &'static str,
250    methods: Option<&'static [&'static str]>,
251    permission: &'static str,
252}
253
254const POLICY_RULES: &[PolicyRule] = &[
255    // Public / exempt
256    PolicyRule {
257        pattern: "/health",
258        methods: None,
259        permission: "",
260    },
261    PolicyRule {
262        pattern: "/task",
263        methods: None,
264        permission: "",
265    },
266    PolicyRule {
267        pattern: "/v1/knative/",
268        methods: None,
269        permission: "",
270    },
271    // OpenAI-compatible model discovery and chat completions
272    PolicyRule {
273        pattern: "/v1/models",
274        methods: Some(&["GET"]),
275        permission: "agent:read",
276    },
277    PolicyRule {
278        pattern: "/v1/chat/completions",
279        methods: Some(&["POST"]),
280        permission: "agent:execute",
281    },
282    // Tool registry — read/write
283    PolicyRule {
284        pattern: "/v1/tools",
285        methods: Some(&["GET"]),
286        permission: "agent:read",
287    },
288    PolicyRule {
289        pattern: "/v1/tools/register",
290        methods: Some(&["POST"]),
291        permission: "agent:execute",
292    },
293    PolicyRule {
294        pattern: "/v1/tools/",
295        methods: Some(&["POST"]),
296        permission: "agent:execute",
297    },
298    PolicyRule {
299        pattern: "/a2a/",
300        methods: None,
301        permission: "",
302    },
303    // K8s management — admin only
304    PolicyRule {
305        pattern: "/v1/k8s/scale",
306        methods: Some(&["POST"]),
307        permission: "admin:access",
308    },
309    PolicyRule {
310        pattern: "/v1/k8s/restart",
311        methods: Some(&["POST"]),
312        permission: "admin:access",
313    },
314    PolicyRule {
315        pattern: "/v1/k8s/",
316        methods: Some(&["GET"]),
317        permission: "admin:access",
318    },
319    // K8s sub-agent lifecycle — admin only
320    PolicyRule {
321        pattern: "/v1/k8s/subagent",
322        methods: Some(&["POST", "DELETE"]),
323        permission: "admin:access",
324    },
325    // Plugin registry — read
326    PolicyRule {
327        pattern: "/v1/plugins",
328        methods: Some(&["GET"]),
329        permission: "agent:read",
330    },
331    // Audit — admin
332    PolicyRule {
333        pattern: "/v1/audit",
334        methods: None,
335        permission: "admin:access",
336    },
337    // Event stream replay — admin (compliance feature)
338    PolicyRule {
339        pattern: "/v1/audit/replay",
340        methods: Some(&["GET"]),
341        permission: "admin:access",
342    },
343    // Cognition — write operations
344    PolicyRule {
345        pattern: "/v1/cognition/start",
346        methods: Some(&["POST"]),
347        permission: "agent:execute",
348    },
349    PolicyRule {
350        pattern: "/v1/cognition/stop",
351        methods: Some(&["POST"]),
352        permission: "agent:execute",
353    },
354    PolicyRule {
355        pattern: "/v1/cognition/",
356        methods: Some(&["GET"]),
357        permission: "agent:read",
358    },
359    // Swarm persona lifecycle
360    PolicyRule {
361        pattern: "/v1/swarm/personas",
362        methods: Some(&["POST"]),
363        permission: "agent:execute",
364    },
365    PolicyRule {
366        pattern: "/v1/swarm/",
367        methods: Some(&["POST"]),
368        permission: "agent:execute",
369    },
370    PolicyRule {
371        pattern: "/v1/swarm/",
372        methods: Some(&["GET"]),
373        permission: "agent:read",
374    },
375    // Agent Bus — read access for stream, filtered by JWT topic claims
376    PolicyRule {
377        pattern: "/v1/bus/stream",
378        methods: Some(&["GET"]),
379        permission: "agent:read",
380    },
381    // Agent Bus — publish messages (requires write permission)
382    PolicyRule {
383        pattern: "/v1/bus/publish",
384        methods: Some(&["POST"]),
385        permission: "agent:execute",
386    },
387    // Session management
388    // Session prompt execution — requires execute permission
389    PolicyRule {
390        pattern: "/api/session/",
391        methods: Some(&["POST"]),
392        permission: "agent:execute",
393    },
394    PolicyRule {
395        pattern: "/api/session",
396        methods: Some(&["POST"]),
397        permission: "sessions:write",
398    },
399    PolicyRule {
400        pattern: "/api/session",
401        methods: Some(&["GET"]),
402        permission: "sessions:read",
403    },
404    // MCP compatibility alias
405    PolicyRule {
406        pattern: "/mcp/v1/tools",
407        methods: Some(&["GET"]),
408        permission: "agent:read",
409    },
410    // Agent task APIs (dashboard compatibility)
411    PolicyRule {
412        pattern: "/v1/agent/tasks",
413        methods: Some(&["GET"]),
414        permission: "agent:read",
415    },
416    PolicyRule {
417        pattern: "/v1/agent/tasks",
418        methods: Some(&["POST"]),
419        permission: "agent:execute",
420    },
421    PolicyRule {
422        pattern: "/v1/agent/tasks/",
423        methods: Some(&["GET"]),
424        permission: "agent:read",
425    },
426    // Worker connectivity
427    PolicyRule {
428        pattern: "/v1/worker/",
429        methods: Some(&["GET"]),
430        permission: "agent:read",
431    },
432    PolicyRule {
433        pattern: "/v1/agent/workers",
434        methods: Some(&["GET"]),
435        permission: "agent:read",
436    },
437    // Task dispatch
438    PolicyRule {
439        pattern: "/v1/tasks/dispatch",
440        methods: Some(&["POST"]),
441        permission: "agent:execute",
442    },
443    // Voice REST bridge
444    PolicyRule {
445        pattern: "/v1/voice/",
446        methods: Some(&["GET"]),
447        permission: "agent:read",
448    },
449    PolicyRule {
450        pattern: "/v1/voice/",
451        methods: Some(&["POST", "DELETE"]),
452        permission: "agent:execute",
453    },
454    // Session resume
455    PolicyRule {
456        pattern: "/v1/agent/codebases/",
457        methods: Some(&["POST"]),
458        permission: "agent:execute",
459    },
460    // Proposal approval — governance action
461    PolicyRule {
462        pattern: "/v1/cognition/proposals/",
463        methods: Some(&["POST"]),
464        permission: "agent:execute",
465    },
466    // Config, version, providers, agents — read
467    PolicyRule {
468        pattern: "/api/version",
469        methods: None,
470        permission: "agent:read",
471    },
472    PolicyRule {
473        pattern: "/api/config",
474        methods: None,
475        permission: "agent:read",
476    },
477    PolicyRule {
478        pattern: "/api/provider",
479        methods: None,
480        permission: "agent:read",
481    },
482    PolicyRule {
483        pattern: "/api/agent",
484        methods: None,
485        permission: "agent:read",
486    },
487];
488
489/// Find the required permission for a given path + method.
490/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
491fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
492    for rule in POLICY_RULES {
493        let matches = if rule.pattern.ends_with('/') {
494            path.starts_with(rule.pattern)
495        } else {
496            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
497        };
498        if matches {
499            if let Some(allowed_methods) = rule.methods
500                && !allowed_methods.contains(&method)
501            {
502                continue;
503            }
504            return Some(rule.permission);
505        }
506    }
507    None
508}
509
510/// Policy authorization middleware for Axum.
511///
512/// Maps request paths to OPA permission strings and enforces authorization.
513/// Runs after `require_auth` so the bearer token is already validated.
514/// Currently maps the static bearer token to an admin role since
515/// codetether-agent uses a single shared token model.
516async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
517    let path = request.uri().path().to_string();
518    let method = request.method().as_str().to_string();
519
520    let permission = match match_policy_rule(&path, &method) {
521        None | Some("") => return Ok(next.run(request).await),
522        Some(perm) => perm,
523    };
524
525    // The current auth model uses a single static token for all access.
526    // When this is the case, the authenticated user effectively has admin role.
527    // Future: extract user claims from JWT and build a proper PolicyUser.
528    let user = policy::PolicyUser {
529        user_id: "bearer-token-user".to_string(),
530        roles: vec!["admin".to_string()],
531        tenant_id: None,
532        scopes: vec![],
533        auth_source: "static_token".to_string(),
534    };
535
536    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
537        tracing::warn!(
538            path = %path,
539            method = %method,
540            permission = %permission,
541            "Policy middleware denied request"
542        );
543        return Err(status);
544    }
545
546    Ok(next.run(request).await)
547}
548
549/// Start the HTTP server
550pub async fn serve(args: ServeArgs) -> Result<()> {
551    let t0 = std::time::Instant::now();
552    tracing::info!("[startup] begin");
553    let config = Config::load().await?;
554    tracing::info!(
555        elapsed_ms = t0.elapsed().as_millis(),
556        "[startup] config loaded"
557    );
558    let mut cognition = CognitionRuntime::new_from_env();
559    tracing::info!(
560        elapsed_ms = t0.elapsed().as_millis(),
561        "[startup] cognition runtime created"
562    );
563
564    // Set up tool registry for cognition execution engine.
565    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
566    tracing::info!(
567        elapsed_ms = t0.elapsed().as_millis(),
568        "[startup] tools registered"
569    );
570    let cognition = Arc::new(cognition);
571
572    // Initialize audit log.
573    let audit_log = AuditLog::from_env();
574    let _ = audit::init_audit_log(audit_log.clone());
575
576    // Initialize K8s manager.
577    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
578    let k8s = Arc::new(K8sManager::new().await);
579    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
580    if k8s.is_available() {
581        tracing::info!("K8s self-deployment enabled");
582    }
583
584    // Initialize mandatory auth.
585    let auth_state = AuthState::from_env();
586    tracing::info!(
587        token_len = auth_state.token().len(),
588        "Auth is mandatory. Only /health is served without a Bearer token."
589    );
590    tracing::info!(
591        audit_entries = audit_log.count().await,
592        "Audit log initialized"
593    );
594
595    // Create agent bus for in-process communication
596    let bus = AgentBus::new().into_arc();
597    crate::bus::set_global(bus.clone());
598
599    // Auto-start S3 sink if MinIO is configured (set MINIO_ENDPOINT to enable)
600    crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
601
602    tracing::info!(
603        elapsed_ms = t0.elapsed().as_millis(),
604        "[startup] bus created"
605    );
606
607    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
608        tracing::info!(
609            elapsed_ms = t0.elapsed().as_millis(),
610            "[startup] auto-starting cognition"
611        );
612        if let Err(error) = cognition.start(None).await {
613            tracing::warn!(%error, "Failed to auto-start cognition loop");
614        } else {
615            tracing::info!("Perpetual cognition auto-started");
616        }
617    }
618
619    tracing::info!(
620        elapsed_ms = t0.elapsed().as_millis(),
621        "[startup] building routes"
622    );
623    let addr = format!("{}:{}", args.hostname, args.port);
624
625    // Build the agent card
626    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
627    let a2a_server = a2a::server::A2AServer::with_bus(agent_card.clone(), bus.clone());
628
629    // Build A2A router separately
630    let a2a_router = a2a_server.clone().router();
631    let a2a_nested_router = a2a_server.router();
632
633    // Start gRPC transport on a separate port
634    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
635        .ok()
636        .and_then(|p| p.parse::<u16>().ok())
637        .unwrap_or(50051);
638    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
639    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
640    let grpc_service = grpc_store.into_service();
641    let voice_service = crate::a2a::voice_grpc::VoiceServiceImpl::new(bus.clone()).into_service();
642    tokio::spawn(async move {
643        tracing::info!(addr = %grpc_addr, "Starting gRPC A2A server");
644
645        // Configure CORS for marketing site (production + local dev)
646        let allowed_origins = [
647            HeaderValue::from_static("https://codetether.run"),
648            HeaderValue::from_static("https://api.codetether.run"),
649            HeaderValue::from_static("https://docs.codetether.run"),
650            HeaderValue::from_static("https://codetether.com"),
651            HeaderValue::from_static("http://localhost:3000"),
652            HeaderValue::from_static("http://localhost:3001"),
653        ];
654        let cors = CorsLayer::new()
655            .allow_origin(AllowOrigin::list(allowed_origins))
656            .allow_methods(AllowMethods::any())
657            .allow_headers(AllowHeaders::any())
658            .expose_headers(tower_http::cors::ExposeHeaders::list([
659                http::header::HeaderName::from_static("grpc-status"),
660                http::header::HeaderName::from_static("grpc-message"),
661            ]));
662
663        if let Err(error) = tonic::transport::Server::builder()
664            .accept_http1(true)
665            .layer(cors)
666            .layer(GrpcWebLayer::new())
667            .add_service(grpc_service)
668            .add_service(voice_service)
669            .serve(grpc_addr)
670            .await
671        {
672            tracing::warn!(addr = %grpc_addr, error = ?error, "gRPC A2A server exited");
673        }
674    });
675
676    let state = AppState {
677        config: Arc::new(config),
678        cognition,
679        audit_log,
680        k8s,
681        auth: auth_state.clone(),
682        bus,
683        knative_tasks: KnativeTaskQueue::new(),
684        tool_registry: ToolRegistry::new(),
685    };
686
687    // Spawn the tool reaper background task (runs every 15s to clean up expired tools)
688    let tool_registry = state.tool_registry.clone();
689    tokio::spawn(async move {
690        let mut interval = tokio::time::interval(Duration::from_secs(15));
691        loop {
692            interval.tick().await;
693            tool_registry.cleanup().await;
694            tracing::debug!("Tool reaper ran cleanup");
695        }
696    });
697
698    let app = Router::new()
699        // Health check (public — auth exempt)
700        .route("/health", get(health))
701        // CloudEvent receiver for Knative Eventing (public — auth exempt)
702        .route("/task", post(receive_task_event))
703        // Knative task queue APIs
704        .route("/v1/knative/tasks", get(list_knative_tasks))
705        .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
706        .route(
707            "/v1/knative/tasks/{task_id}/claim",
708            post(claim_knative_task),
709        )
710        .route(
711            "/v1/knative/tasks/{task_id}/complete",
712            post(complete_knative_task),
713        )
714        // API routes
715        .route("/api/version", get(get_version))
716        .route("/api/session", get(list_sessions).post(create_session))
717        .route("/api/session/{id}", get(get_session))
718        .route("/api/session/{id}/prompt", post(prompt_session))
719        .route("/api/config", get(get_config))
720        .route("/api/provider", get(list_providers))
721        .route("/api/agent", get(list_agents))
722        // OpenAI-compatible APIs
723        .route("/v1/models", get(list_openai_models))
724        .route("/v1/chat/completions", post(openai_chat_completions))
725        // Perpetual cognition APIs
726        .route("/v1/cognition/start", post(start_cognition))
727        .route("/v1/cognition/stop", post(stop_cognition))
728        .route("/v1/cognition/status", get(get_cognition_status))
729        .route("/v1/cognition/stream", get(stream_cognition))
730        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
731        // Swarm persona lifecycle APIs
732        .route("/v1/swarm/personas", post(create_persona))
733        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
734        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
735        .route("/v1/swarm/lineage", get(get_swarm_lineage))
736        // Belief, attention, governance, workspace APIs
737        .route("/v1/cognition/beliefs", get(list_beliefs))
738        .route("/v1/cognition/beliefs/{id}", get(get_belief))
739        .route("/v1/cognition/attention", get(list_attention))
740        .route("/v1/cognition/proposals", get(list_proposals))
741        .route(
742            "/v1/cognition/proposals/{id}/approve",
743            post(approve_proposal),
744        )
745        .route("/v1/cognition/receipts", get(list_receipts))
746        .route("/v1/cognition/workspace", get(get_workspace))
747        .route("/v1/cognition/governance", get(get_governance))
748        .route("/v1/cognition/personas/{id}", get(get_persona))
749        // Audit trail API
750        .route("/v1/audit", get(list_audit_entries))
751        // Event stream replay API (for SOC 2, FedRAMP, ATO compliance)
752        .route("/v1/audit/replay", get(replay_session_events))
753        .route("/v1/audit/replay/index", get(list_session_event_files))
754        // K8s self-deployment APIs
755        .route("/v1/k8s/status", get(get_k8s_status))
756        .route("/v1/k8s/scale", post(k8s_scale))
757        .route("/v1/k8s/restart", post(k8s_restart))
758        .route("/v1/k8s/pods", get(k8s_list_pods))
759        .route("/v1/k8s/actions", get(k8s_actions))
760        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
761        .route(
762            "/v1/k8s/subagent/{id}",
763            axum::routing::delete(k8s_delete_subagent),
764        )
765        // Plugin registry API
766        .route("/v1/plugins", get(list_plugins))
767        // Tool registry API
768        .route("/v1/tools", get(list_tools))
769        .route("/v1/tools/register", post(register_tool))
770        .route("/v1/tools/{id}/heartbeat", post(tool_heartbeat))
771        // MCP compatibility alias (marketing site SDK expects /mcp/v1/tools)
772        .route("/mcp/v1/tools", get(list_tools))
773        // Agent task APIs (compatibility surface for dashboard)
774        .route(
775            "/v1/agent/tasks",
776            get(list_agent_tasks).post(create_agent_task),
777        )
778        .route("/v1/agent/tasks/{task_id}", get(get_agent_task))
779        .route(
780            "/v1/agent/tasks/{task_id}/output",
781            get(get_agent_task_output).post(agent_task_output),
782        )
783        .route(
784            "/v1/agent/tasks/{task_id}/output/stream",
785            get(stream_agent_task_output),
786        )
787        // Worker connectivity (dashboard polls this)
788        .route("/v1/worker/connected", get(list_connected_workers))
789        .route("/v1/agent/workers", get(list_connected_workers))
790        // Task dispatch (Knative-backed)
791        .route("/v1/tasks/dispatch", post(dispatch_task))
792        // Voice REST bridge (dashboard expects REST, server has gRPC)
793        .route("/v1/voice/sessions", post(create_voice_session_rest))
794        .route(
795            "/v1/voice/sessions/{room_name}",
796            get(get_voice_session_rest).delete(delete_voice_session_rest),
797        )
798        .route("/v1/voice/voices", get(list_voices_rest))
799        // Session resume (dashboard uses this for codebase-scoped resume)
800        .route(
801            "/v1/agent/codebases/{codebase_id}/sessions/{session_id}/resume",
802            post(resume_codebase_session),
803        )
804        // Agent Bus — SSE stream + publish
805        .route("/v1/bus/stream", get(stream_bus_events))
806        .with_state(state.clone())
807        // A2A routes at both the root and /a2a so local runtimes can discover
808        // and invoke the same server through either shape.
809        .merge(a2a_router)
810        .nest("/a2a", a2a_nested_router)
811        // Mandatory auth middleware — applies to all routes
812        .layer(middleware::from_fn_with_state(
813            state.clone(),
814            audit_middleware,
815        ))
816        .layer(middleware::from_fn(policy_middleware))
817        .layer(middleware::from_fn(auth::require_auth))
818        .layer(axum::Extension(state.auth.clone()))
819        // CORS + tracing — explicit origin allowlist so headers are present on
820        // ALL responses (including 4xx/5xx) and not dependent on request mirroring.
821        .layer(
822            CorsLayer::new()
823                .allow_origin([
824                    "https://codetether.run".parse::<HeaderValue>().unwrap(),
825                    "https://api.codetether.run".parse::<HeaderValue>().unwrap(),
826                    "https://docs.codetether.run"
827                        .parse::<HeaderValue>()
828                        .unwrap(),
829                    "https://codetether.com".parse::<HeaderValue>().unwrap(),
830                    "http://localhost:3000".parse::<HeaderValue>().unwrap(),
831                    "http://localhost:3001".parse::<HeaderValue>().unwrap(),
832                    "http://localhost:8000".parse::<HeaderValue>().unwrap(),
833                ])
834                .allow_credentials(true)
835                .allow_methods([
836                    Method::GET,
837                    Method::POST,
838                    Method::PUT,
839                    Method::PATCH,
840                    Method::DELETE,
841                    Method::OPTIONS,
842                ])
843                .allow_headers([
844                    http::header::AUTHORIZATION,
845                    http::header::CONTENT_TYPE,
846                    http::header::ACCEPT,
847                    http::header::ORIGIN,
848                    http::header::CACHE_CONTROL,
849                    http::header::HeaderName::from_static("x-worker-id"),
850                    http::header::HeaderName::from_static("x-agent-name"),
851                    http::header::HeaderName::from_static("x-workspaces"),
852                    http::header::HeaderName::from_static("x-requested-with"),
853                ]),
854        )
855        .layer(TraceLayer::new_for_http());
856
857    tracing::info!(
858        elapsed_ms = t0.elapsed().as_millis(),
859        "[startup] router built, binding"
860    );
861    let listener = tokio::net::TcpListener::bind(&addr).await?;
862    tracing::info!(
863        elapsed_ms = t0.elapsed().as_millis(),
864        "[startup] listening on http://{}",
865        addr
866    );
867
868    axum::serve(listener, app).await?;
869
870    Ok(())
871}
872
873/// Health check response
874async fn health() -> &'static str {
875    "ok"
876}
877
878/// List all Knative tasks in queue
879async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
880    Json(state.knative_tasks.list().await)
881}
882
883/// Get a specific Knative task by ID
884async fn get_knative_task(
885    State(state): State<AppState>,
886    Path(task_id): Path<String>,
887) -> Result<Json<KnativeTask>, (StatusCode, String)> {
888    state
889        .knative_tasks
890        .get(&task_id)
891        .await
892        .map(Json)
893        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
894}
895
896/// Claim/start processing a Knative task
897async fn claim_knative_task(
898    State(state): State<AppState>,
899    Path(task_id): Path<String>,
900) -> Result<Json<KnativeTask>, (StatusCode, String)> {
901    // Update status to processing
902    let updated = state
903        .knative_tasks
904        .update_status(&task_id, "processing")
905        .await;
906    if !updated {
907        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
908    }
909
910    state
911        .knative_tasks
912        .get(&task_id)
913        .await
914        .map(Json)
915        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
916}
917
918/// Complete a Knative task
919async fn complete_knative_task(
920    State(state): State<AppState>,
921    Path(task_id): Path<String>,
922) -> Result<Json<KnativeTask>, (StatusCode, String)> {
923    // Update status to completed
924    let updated = state
925        .knative_tasks
926        .update_status(&task_id, "completed")
927        .await;
928    if !updated {
929        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
930    }
931
932    state
933        .knative_tasks
934        .get(&task_id)
935        .await
936        .map(Json)
937        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
938}
939
940/// CloudEvent handler for Knative Eventing
941/// Receives task events from Knative Broker and triggers execution
942async fn receive_task_event(
943    State(state): State<AppState>,
944    headers: HeaderMap,
945    Json(body): Json<serde_json::Value>,
946) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
947    let event =
948        parse_cloud_event(&headers, body).map_err(|error| (StatusCode::BAD_REQUEST, error))?;
949    tracing::info!(
950        event_type = %event.event_type,
951        event_id = %event.id,
952        "Received CloudEvent from Knative"
953    );
954
955    // Log the incoming event for audit
956    state
957        .audit_log
958        .log(
959            audit::AuditCategory::Api,
960            format!("cloudevents:{}", event.event_type),
961            AuditOutcome::Success,
962            None,
963            None,
964        )
965        .await;
966
967    // Process based on event type
968    match event.event_type.as_str() {
969        "codetether.task.created" | "task.created" => {
970            let task_id = event
971                .data
972                .get("task_id")
973                .or_else(|| event.data.get("id"))
974                .and_then(|v| v.as_str())
975                .unwrap_or(event.id.as_str())
976                .to_string();
977            let title = event
978                .data
979                .get("title")
980                .or_else(|| event.data.get("prompt"))
981                .and_then(|v| v.as_str())
982                .unwrap_or("")
983                .to_string();
984            let description = event
985                .data
986                .get("description")
987                .or_else(|| event.data.get("prompt"))
988                .and_then(|v| v.as_str())
989                .unwrap_or("")
990                .to_string();
991            let agent_type = event
992                .data
993                .get("agent_type")
994                .or_else(|| event.data.get("agent"))
995                .and_then(|v| v.as_str())
996                .unwrap_or("build")
997                .to_string();
998            let priority = event
999                .data
1000                .get("priority")
1001                .and_then(|v| v.as_i64())
1002                .unwrap_or(0) as i32;
1003
1004            let task = KnativeTask {
1005                task_id: task_id.clone(),
1006                title,
1007                description,
1008                agent_type,
1009                priority,
1010                received_at: chrono::Utc::now(),
1011                status: "queued".to_string(),
1012            };
1013
1014            state.knative_tasks.push(task).await;
1015            tracing::info!(task_id = %task_id, "Task queued for execution");
1016        }
1017        "codetether.task.updated" | "task.updated" => {
1018            if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1019                let status = event
1020                    .data
1021                    .get("status")
1022                    .and_then(|v| v.as_str())
1023                    .unwrap_or("updated");
1024                let _ = state.knative_tasks.update_status(task_id, status).await;
1025                tracing::info!(task_id = %task_id, status = %status, "Task status updated");
1026            }
1027        }
1028        "codetether.task.cancelled" => {
1029            tracing::info!("Task cancellation event received");
1030            // Update task status if we have the task_id
1031            if let Some(task_id) = event.data.get("task_id").and_then(|v| v.as_str()) {
1032                let _ = state
1033                    .knative_tasks
1034                    .update_status(task_id, "cancelled")
1035                    .await;
1036                tracing::info!(task_id = %task_id, "Task cancelled");
1037            }
1038        }
1039        _ => {
1040            tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
1041        }
1042    }
1043
1044    Ok(Json(CloudEventResponse {
1045        status: "accepted".to_string(),
1046        event_id: event.id,
1047    }))
1048}
1049
1050/// Response to CloudEvent acknowledgment
1051#[derive(Serialize)]
1052struct CloudEventResponse {
1053    status: String,
1054    event_id: String,
1055}
1056
1057/// Version info
1058#[derive(Serialize)]
1059struct VersionInfo {
1060    version: &'static str,
1061    name: &'static str,
1062    binary_hash: Option<String>,
1063}
1064
1065async fn get_version() -> Json<VersionInfo> {
1066    let binary_hash = std::env::current_exe()
1067        .ok()
1068        .and_then(|p| hash_file(&p).ok());
1069    Json(VersionInfo {
1070        version: env!("CARGO_PKG_VERSION"),
1071        name: env!("CARGO_PKG_NAME"),
1072        binary_hash,
1073    })
1074}
1075
1076/// List sessions
1077#[derive(Deserialize)]
1078struct ListSessionsQuery {
1079    limit: Option<usize>,
1080    offset: Option<usize>,
1081}
1082
1083async fn list_sessions(
1084    Query(query): Query<ListSessionsQuery>,
1085) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
1086    let sessions = crate::session::list_sessions()
1087        .await
1088        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1089
1090    let offset = query.offset.unwrap_or(0);
1091    let limit = query.limit.unwrap_or(100);
1092    Ok(Json(
1093        sessions.into_iter().skip(offset).take(limit).collect(),
1094    ))
1095}
1096
1097/// Create a new session
1098#[derive(Deserialize)]
1099struct CreateSessionRequest {
1100    title: Option<String>,
1101    agent: Option<String>,
1102}
1103
1104async fn create_session(
1105    Json(req): Json<CreateSessionRequest>,
1106) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1107    let mut session = crate::session::Session::new()
1108        .await
1109        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1110
1111    session.title = req.title;
1112    if let Some(agent) = req.agent {
1113        session.set_agent_name(agent);
1114    }
1115
1116    session
1117        .save()
1118        .await
1119        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1120
1121    Ok(Json(session))
1122}
1123
1124/// Get a session by ID
1125async fn get_session(
1126    axum::extract::Path(id): axum::extract::Path<String>,
1127) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
1128    let session = crate::session::Session::load(&id)
1129        .await
1130        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1131
1132    Ok(Json(session))
1133}
1134
1135/// Prompt a session
1136#[derive(Deserialize)]
1137struct PromptRequest {
1138    message: String,
1139}
1140
1141async fn prompt_session(
1142    axum::extract::Path(id): axum::extract::Path<String>,
1143    Json(req): Json<PromptRequest>,
1144) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
1145    // Validate the message is not empty
1146    if req.message.trim().is_empty() {
1147        return Err((
1148            StatusCode::BAD_REQUEST,
1149            "Message cannot be empty".to_string(),
1150        ));
1151    }
1152
1153    // Log the prompt request (uses the message field)
1154    tracing::info!(
1155        session_id = %id,
1156        message_len = req.message.len(),
1157        "Received prompt request"
1158    );
1159
1160    let mut session = crate::session::Session::load(&id)
1161        .await
1162        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
1163
1164    let result = session.prompt(&req.message).await.map_err(|e| {
1165        tracing::error!(session_id = %id, error = %e, "Session prompt failed");
1166        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1167    })?;
1168
1169    session.save().await.map_err(|e| {
1170        tracing::error!(session_id = %id, error = %e, "Failed to save session after prompt");
1171        (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
1172    })?;
1173
1174    Ok(Json(result))
1175}
1176
1177/// Get configuration
1178async fn get_config(State(state): State<AppState>) -> Json<Config> {
1179    Json((*state.config).clone())
1180}
1181
1182/// List providers
1183async fn list_providers() -> Json<Vec<String>> {
1184    Json(vec![
1185        "openai".to_string(),
1186        "anthropic".to_string(),
1187        "google".to_string(),
1188    ])
1189}
1190
1191/// List agents
1192async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
1193    let registry = crate::agent::AgentRegistry::with_builtins();
1194    Json(registry.list().into_iter().cloned().collect())
1195}
1196
1197type OpenAiApiError = (StatusCode, Json<serde_json::Value>);
1198type OpenAiApiResult<T> = Result<T, OpenAiApiError>;
1199
1200#[derive(Debug, Deserialize)]
1201struct OpenAiChatCompletionRequest {
1202    model: String,
1203    messages: Vec<OpenAiRequestMessage>,
1204    #[serde(default)]
1205    tools: Vec<OpenAiRequestTool>,
1206    temperature: Option<f32>,
1207    top_p: Option<f32>,
1208    max_tokens: Option<usize>,
1209    max_completion_tokens: Option<usize>,
1210    stop: Option<OpenAiStop>,
1211    stream: Option<bool>,
1212}
1213
1214#[derive(Debug, Deserialize)]
1215#[serde(untagged)]
1216enum OpenAiStop {
1217    Single(String),
1218    Multiple(Vec<String>),
1219}
1220
1221#[derive(Debug, Deserialize)]
1222struct OpenAiRequestMessage {
1223    role: String,
1224    #[serde(default)]
1225    content: Option<serde_json::Value>,
1226    #[serde(default)]
1227    tool_call_id: Option<String>,
1228    #[serde(default)]
1229    tool_calls: Vec<OpenAiRequestToolCall>,
1230}
1231
1232#[derive(Debug, Deserialize)]
1233struct OpenAiRequestToolCall {
1234    id: String,
1235    #[serde(rename = "type", default = "default_function_type")]
1236    kind: String,
1237    function: OpenAiRequestToolCallFunction,
1238}
1239
1240#[derive(Debug, Deserialize)]
1241struct OpenAiRequestToolCallFunction {
1242    name: String,
1243    #[serde(default = "default_json_object_string")]
1244    arguments: String,
1245}
1246
1247#[derive(Debug, Deserialize)]
1248struct OpenAiRequestTool {
1249    #[serde(rename = "type", default = "default_function_type")]
1250    kind: String,
1251    function: OpenAiRequestToolDefinition,
1252}
1253
1254#[derive(Debug, Deserialize)]
1255struct OpenAiRequestToolDefinition {
1256    name: String,
1257    #[serde(default)]
1258    description: String,
1259    #[serde(default = "default_json_object")]
1260    parameters: serde_json::Value,
1261}
1262
1263#[derive(Debug, Serialize)]
1264struct OpenAiModelsResponse {
1265    object: String,
1266    data: Vec<OpenAiModel>,
1267}
1268
1269#[derive(Debug, Serialize)]
1270struct OpenAiModel {
1271    id: String,
1272    object: String,
1273    created: i64,
1274    owned_by: String,
1275}
1276
1277#[derive(Debug, Serialize)]
1278struct OpenAiChatCompletionResponse {
1279    id: String,
1280    object: String,
1281    created: i64,
1282    model: String,
1283    choices: Vec<OpenAiChatCompletionChoice>,
1284    usage: OpenAiUsage,
1285}
1286
1287#[derive(Debug, Serialize)]
1288struct OpenAiChatCompletionChoice {
1289    index: usize,
1290    message: OpenAiChatCompletionMessage,
1291    finish_reason: String,
1292}
1293
1294#[derive(Debug, Serialize)]
1295struct OpenAiChatCompletionMessage {
1296    role: String,
1297    #[serde(skip_serializing_if = "Option::is_none")]
1298    content: Option<String>,
1299    #[serde(skip_serializing_if = "Option::is_none")]
1300    tool_calls: Option<Vec<OpenAiResponseToolCall>>,
1301}
1302
1303#[derive(Debug, Serialize)]
1304struct OpenAiResponseToolCall {
1305    id: String,
1306    #[serde(rename = "type")]
1307    kind: String,
1308    function: OpenAiResponseToolCallFunction,
1309}
1310
1311#[derive(Debug, Serialize)]
1312struct OpenAiResponseToolCallFunction {
1313    name: String,
1314    arguments: String,
1315}
1316
1317#[derive(Debug, Serialize)]
1318struct OpenAiUsage {
1319    prompt_tokens: usize,
1320    completion_tokens: usize,
1321    total_tokens: usize,
1322}
1323
1324fn default_function_type() -> String {
1325    "function".to_string()
1326}
1327
1328fn default_json_object() -> serde_json::Value {
1329    serde_json::json!({})
1330}
1331
1332fn default_json_object_string() -> String {
1333    "{}".to_string()
1334}
1335
1336fn openai_error(
1337    status: StatusCode,
1338    message: impl Into<String>,
1339    error_type: &str,
1340    code: &str,
1341) -> OpenAiApiError {
1342    (
1343        status,
1344        Json(serde_json::json!({
1345            "error": {
1346                "message": message.into(),
1347                "type": error_type,
1348                "code": code,
1349            }
1350        })),
1351    )
1352}
1353
1354fn openai_bad_request(message: impl Into<String>) -> OpenAiApiError {
1355    openai_error(
1356        StatusCode::BAD_REQUEST,
1357        message,
1358        "invalid_request_error",
1359        "invalid_request",
1360    )
1361}
1362
1363fn openai_internal_error(message: impl Into<String>) -> OpenAiApiError {
1364    openai_error(
1365        StatusCode::INTERNAL_SERVER_ERROR,
1366        message,
1367        "server_error",
1368        "internal_error",
1369    )
1370}
1371
1372fn canonicalize_provider_name(provider: &str) -> &str {
1373    if provider == "zhipuai" {
1374        "zai"
1375    } else {
1376        provider
1377    }
1378}
1379
1380fn normalize_model_reference(model: &str) -> String {
1381    let trimmed = model.trim();
1382    if let Some((provider, model_id)) = trimmed.split_once(':')
1383        && !provider.is_empty()
1384        && !model_id.is_empty()
1385        && !trimmed.contains('/')
1386    {
1387        return format!("{provider}/{model_id}");
1388    }
1389    trimmed.to_string()
1390}
1391
1392fn make_openai_model_id(provider: &str, model_id: &str) -> String {
1393    let provider = canonicalize_provider_name(provider);
1394    let trimmed = model_id.trim_start_matches('/');
1395    if trimmed.starts_with(&format!("{provider}/")) {
1396        trimmed.to_string()
1397    } else {
1398        format!("{provider}/{trimmed}")
1399    }
1400}
1401
1402fn parse_openai_content_part(value: &serde_json::Value) -> Option<crate::provider::ContentPart> {
1403    let obj = value.as_object()?;
1404    let part_type = obj
1405        .get("type")
1406        .and_then(serde_json::Value::as_str)
1407        .unwrap_or("text");
1408
1409    match part_type {
1410        "text" | "input_text" => obj
1411            .get("text")
1412            .and_then(serde_json::Value::as_str)
1413            .map(|text| crate::provider::ContentPart::Text {
1414                text: text.to_string(),
1415            }),
1416        "image_url" => obj
1417            .get("image_url")
1418            .and_then(serde_json::Value::as_object)
1419            .and_then(|image| image.get("url"))
1420            .and_then(serde_json::Value::as_str)
1421            .map(|url| crate::provider::ContentPart::Image {
1422                url: url.to_string(),
1423                mime_type: None,
1424            }),
1425        _ => obj
1426            .get("text")
1427            .and_then(serde_json::Value::as_str)
1428            .map(|text| crate::provider::ContentPart::Text {
1429                text: text.to_string(),
1430            }),
1431    }
1432}
1433
1434fn parse_openai_content_parts(
1435    content: &Option<serde_json::Value>,
1436) -> Vec<crate::provider::ContentPart> {
1437    let Some(value) = content else {
1438        return Vec::new();
1439    };
1440
1441    match value {
1442        serde_json::Value::String(text) => {
1443            vec![crate::provider::ContentPart::Text { text: text.clone() }]
1444        }
1445        serde_json::Value::Array(parts) => {
1446            parts.iter().filter_map(parse_openai_content_part).collect()
1447        }
1448        serde_json::Value::Object(_) => parse_openai_content_part(value).into_iter().collect(),
1449        _ => Vec::new(),
1450    }
1451}
1452
1453fn parse_openai_tool_content(content: &Option<serde_json::Value>) -> String {
1454    parse_openai_content_parts(content)
1455        .into_iter()
1456        .filter_map(|part| match part {
1457            crate::provider::ContentPart::Text { text } => Some(text),
1458            _ => None,
1459        })
1460        .collect::<Vec<_>>()
1461        .join("\n")
1462}
1463
1464fn convert_openai_messages(
1465    messages: &[OpenAiRequestMessage],
1466) -> OpenAiApiResult<Vec<crate::provider::Message>> {
1467    let mut converted = Vec::with_capacity(messages.len());
1468
1469    for (index, message) in messages.iter().enumerate() {
1470        let role = message.role.to_ascii_lowercase();
1471        match role.as_str() {
1472            "system" | "user" => {
1473                let content = parse_openai_content_parts(&message.content);
1474                if content.is_empty() {
1475                    return Err(openai_bad_request(format!(
1476                        "messages[{index}] must include text content"
1477                    )));
1478                }
1479
1480                converted.push(crate::provider::Message {
1481                    role: if role == "system" {
1482                        crate::provider::Role::System
1483                    } else {
1484                        crate::provider::Role::User
1485                    },
1486                    content,
1487                });
1488            }
1489            "assistant" => {
1490                let mut content = parse_openai_content_parts(&message.content);
1491                for tool_call in &message.tool_calls {
1492                    if tool_call.kind != "function" {
1493                        return Err(openai_bad_request(format!(
1494                            "messages[{index}].tool_calls only support `function`"
1495                        )));
1496                    }
1497                    if tool_call.function.name.trim().is_empty() {
1498                        return Err(openai_bad_request(format!(
1499                            "messages[{index}].tool_calls[].function.name is required"
1500                        )));
1501                    }
1502
1503                    content.push(crate::provider::ContentPart::ToolCall {
1504                        id: tool_call.id.clone(),
1505                        name: tool_call.function.name.clone(),
1506                        arguments: tool_call.function.arguments.clone(),
1507                        thought_signature: None,
1508                    });
1509                }
1510
1511                if content.is_empty() {
1512                    return Err(openai_bad_request(format!(
1513                        "messages[{index}] must include `content` or `tool_calls` for assistant role"
1514                    )));
1515                }
1516
1517                converted.push(crate::provider::Message {
1518                    role: crate::provider::Role::Assistant,
1519                    content,
1520                });
1521            }
1522            "tool" => {
1523                let tool_call_id = message
1524                    .tool_call_id
1525                    .as_ref()
1526                    .map(|s| s.trim())
1527                    .filter(|s| !s.is_empty())
1528                    .ok_or_else(|| {
1529                        openai_bad_request(format!(
1530                            "messages[{index}].tool_call_id is required for tool role"
1531                        ))
1532                    })?
1533                    .to_string();
1534
1535                converted.push(crate::provider::Message {
1536                    role: crate::provider::Role::Tool,
1537                    content: vec![crate::provider::ContentPart::ToolResult {
1538                        tool_call_id,
1539                        content: parse_openai_tool_content(&message.content),
1540                    }],
1541                });
1542            }
1543            _ => {
1544                return Err(openai_bad_request(format!(
1545                    "messages[{index}].role `{}` is not supported",
1546                    message.role
1547                )));
1548            }
1549        }
1550    }
1551
1552    Ok(converted)
1553}
1554
1555fn convert_openai_tools(
1556    tools: &[OpenAiRequestTool],
1557) -> OpenAiApiResult<Vec<crate::provider::ToolDefinition>> {
1558    let mut converted = Vec::with_capacity(tools.len());
1559
1560    for (index, tool) in tools.iter().enumerate() {
1561        if tool.kind != "function" {
1562            return Err(openai_bad_request(format!(
1563                "tools[{index}].type `{}` is not supported",
1564                tool.kind
1565            )));
1566        }
1567        if tool.function.name.trim().is_empty() {
1568            return Err(openai_bad_request(format!(
1569                "tools[{index}].function.name is required"
1570            )));
1571        }
1572
1573        converted.push(crate::provider::ToolDefinition {
1574            name: tool.function.name.clone(),
1575            description: tool.function.description.clone(),
1576            parameters: tool.function.parameters.clone(),
1577        });
1578    }
1579
1580    Ok(converted)
1581}
1582
1583fn convert_finish_reason(reason: crate::provider::FinishReason) -> &'static str {
1584    match reason {
1585        crate::provider::FinishReason::Stop => "stop",
1586        crate::provider::FinishReason::Length => "length",
1587        crate::provider::FinishReason::ToolCalls => "tool_calls",
1588        crate::provider::FinishReason::ContentFilter => "content_filter",
1589        crate::provider::FinishReason::Error => "error",
1590    }
1591}
1592
1593fn convert_response_message(
1594    message: &crate::provider::Message,
1595) -> (Option<String>, Option<Vec<OpenAiResponseToolCall>>) {
1596    let mut texts = Vec::new();
1597    let mut tool_calls = Vec::new();
1598
1599    for part in &message.content {
1600        match part {
1601            crate::provider::ContentPart::Text { text } => {
1602                if !text.is_empty() {
1603                    texts.push(text.clone());
1604                }
1605            }
1606            crate::provider::ContentPart::ToolCall {
1607                id,
1608                name,
1609                arguments,
1610                ..
1611            } => {
1612                tool_calls.push(OpenAiResponseToolCall {
1613                    id: id.clone(),
1614                    kind: "function".to_string(),
1615                    function: OpenAiResponseToolCallFunction {
1616                        name: name.clone(),
1617                        arguments: arguments.clone(),
1618                    },
1619                });
1620            }
1621            _ => {}
1622        }
1623    }
1624
1625    let content = if texts.is_empty() {
1626        None
1627    } else {
1628        Some(texts.join("\n"))
1629    };
1630    let tool_calls = if tool_calls.is_empty() {
1631        None
1632    } else {
1633        Some(tool_calls)
1634    };
1635
1636    (content, tool_calls)
1637}
1638
1639fn openai_stream_chunk(
1640    id: &str,
1641    created: i64,
1642    model: &str,
1643    delta: serde_json::Value,
1644    finish_reason: Option<&str>,
1645) -> serde_json::Value {
1646    let finish_reason = finish_reason
1647        .map(|value| serde_json::Value::String(value.to_string()))
1648        .unwrap_or(serde_json::Value::Null);
1649
1650    serde_json::json!({
1651        "id": id,
1652        "object": "chat.completion.chunk",
1653        "created": created,
1654        "model": model,
1655        "choices": [{
1656            "index": 0,
1657            "delta": delta,
1658            "finish_reason": finish_reason,
1659        }]
1660    })
1661}
1662
1663fn openai_stream_event(payload: &serde_json::Value) -> Event {
1664    Event::default().data(payload.to_string())
1665}
1666
1667async fn list_openai_models() -> OpenAiApiResult<Json<OpenAiModelsResponse>> {
1668    let registry = crate::provider::ProviderRegistry::from_vault()
1669        .await
1670        .map_err(|error| {
1671            tracing::error!(error = %error, "Failed to load providers from Vault");
1672            openai_internal_error(format!("failed to load providers: {error}"))
1673        })?;
1674
1675    let model_futures = registry.list().into_iter().map(|provider_id| {
1676        let provider_id = provider_id.to_string();
1677        let provider = registry.get(&provider_id);
1678
1679        async move {
1680            let Some(provider) = provider else {
1681                return Vec::new();
1682            };
1683
1684            let now = chrono::Utc::now().timestamp();
1685            match provider.list_models().await {
1686                Ok(models) => models
1687                    .into_iter()
1688                    .map(|model| OpenAiModel {
1689                        id: make_openai_model_id(&provider_id, &model.id),
1690                        object: "model".to_string(),
1691                        created: now,
1692                        owned_by: canonicalize_provider_name(&provider_id).to_string(),
1693                    })
1694                    .collect(),
1695                Err(error) => {
1696                    tracing::warn!(
1697                        provider = %provider_id,
1698                        error = %error,
1699                        "Failed to list models for provider"
1700                    );
1701                    Vec::new()
1702                }
1703            }
1704        }
1705    });
1706
1707    let mut data: Vec<OpenAiModel> = join_all(model_futures)
1708        .await
1709        .into_iter()
1710        .flatten()
1711        .collect();
1712    data.sort_by(|a, b| a.owned_by.cmp(&b.owned_by).then(a.id.cmp(&b.id)));
1713
1714    Ok(Json(OpenAiModelsResponse {
1715        object: "list".to_string(),
1716        data,
1717    }))
1718}
1719
1720async fn openai_chat_completions(
1721    Json(req): Json<OpenAiChatCompletionRequest>,
1722) -> Result<Response, OpenAiApiError> {
1723    let is_stream = req.stream.unwrap_or(false);
1724    if req.model.trim().is_empty() {
1725        return Err(openai_bad_request("`model` is required"));
1726    }
1727    if req.messages.is_empty() {
1728        return Err(openai_bad_request("`messages` must not be empty"));
1729    }
1730
1731    let registry = crate::provider::ProviderRegistry::from_vault()
1732        .await
1733        .map_err(|error| {
1734            tracing::error!(error = %error, "Failed to load providers from Vault");
1735            openai_internal_error(format!("failed to load providers: {error}"))
1736        })?;
1737
1738    let providers = registry.list();
1739    if providers.is_empty() {
1740        return Err(openai_error(
1741            StatusCode::SERVICE_UNAVAILABLE,
1742            "No providers are configured in Vault",
1743            "server_error",
1744            "no_providers",
1745        ));
1746    }
1747
1748    let normalized_model = normalize_model_reference(&req.model);
1749    let (maybe_provider, model_id) = crate::provider::parse_model_string(&normalized_model);
1750    let model_id = if maybe_provider.is_some() {
1751        if model_id.trim().is_empty() {
1752            return Err(openai_bad_request(
1753                "Model must be in `provider/model` format",
1754            ));
1755        }
1756        model_id.to_string()
1757    } else {
1758        req.model.trim().to_string()
1759    };
1760
1761    let selected_provider = if let Some(provider_name) = maybe_provider {
1762        let provider_name = canonicalize_provider_name(provider_name);
1763        if providers.contains(&provider_name) {
1764            provider_name.to_string()
1765        } else {
1766            return Err(openai_bad_request(format!(
1767                "Provider `{provider_name}` is not configured in Vault"
1768            )));
1769        }
1770    } else if providers.len() == 1 {
1771        providers[0].to_string()
1772    } else if providers.contains(&"openai") {
1773        "openai".to_string()
1774    } else {
1775        return Err(openai_bad_request(
1776            "When multiple providers are configured, use `provider/model`. See GET /v1/models.",
1777        ));
1778    };
1779
1780    let provider = registry.get(&selected_provider).ok_or_else(|| {
1781        openai_internal_error(format!(
1782            "Provider `{selected_provider}` was available but could not be loaded"
1783        ))
1784    })?;
1785
1786    let messages = convert_openai_messages(&req.messages)?;
1787    let tools = convert_openai_tools(&req.tools)?;
1788    let stop = match req.stop {
1789        Some(OpenAiStop::Single(stop)) => vec![stop],
1790        Some(OpenAiStop::Multiple(stops)) => stops,
1791        None => Vec::new(),
1792    };
1793
1794    let completion_request = crate::provider::CompletionRequest {
1795        messages,
1796        tools,
1797        model: model_id.clone(),
1798        temperature: req.temperature,
1799        top_p: req.top_p,
1800        max_tokens: req.max_completion_tokens.or(req.max_tokens),
1801        stop,
1802    };
1803
1804    let chat_id = format!("chatcmpl-{}", uuid::Uuid::new_v4());
1805    let now = chrono::Utc::now().timestamp();
1806    let openai_model = make_openai_model_id(&selected_provider, &model_id);
1807
1808    if is_stream {
1809        let mut provider_stream =
1810            provider
1811                .complete_stream(completion_request)
1812                .await
1813                .map_err(|error| {
1814                    tracing::warn!(
1815                        provider = %selected_provider,
1816                        model = %model_id,
1817                        error = %error,
1818                        "Streaming completion request failed"
1819                    );
1820                    openai_internal_error(error.to_string())
1821                })?;
1822
1823        let stream_id = chat_id.clone();
1824        let stream_model = openai_model.clone();
1825        let event_stream = async_stream::stream! {
1826            let mut tool_call_indices: HashMap<String, usize> = HashMap::new();
1827            let mut next_tool_call_index = 0usize;
1828            let mut saw_text = false;
1829            let mut saw_tool_calls = false;
1830
1831            let role_chunk = openai_stream_chunk(
1832                &stream_id,
1833                now,
1834                &stream_model,
1835                serde_json::json!({ "role": "assistant" }),
1836                None,
1837            );
1838            yield Ok::<Event, Infallible>(openai_stream_event(&role_chunk));
1839
1840            while let Some(chunk) = provider_stream.next().await {
1841                match chunk {
1842                    crate::provider::StreamChunk::Text(text) => {
1843                        if text.is_empty() {
1844                            continue;
1845                        }
1846                        saw_text = true;
1847                        let content_chunk = openai_stream_chunk(
1848                            &stream_id,
1849                            now,
1850                            &stream_model,
1851                            serde_json::json!({ "content": text }),
1852                            None,
1853                        );
1854                        yield Ok(openai_stream_event(&content_chunk));
1855                    }
1856                    crate::provider::StreamChunk::ToolCallStart { id, name } => {
1857                        saw_tool_calls = true;
1858                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1859                            let value = next_tool_call_index;
1860                            next_tool_call_index += 1;
1861                            value
1862                        });
1863                        let tool_chunk = openai_stream_chunk(
1864                            &stream_id,
1865                            now,
1866                            &stream_model,
1867                            serde_json::json!({
1868                                "tool_calls": [{
1869                                    "index": index,
1870                                    "id": id,
1871                                    "type": "function",
1872                                    "function": {
1873                                        "name": name,
1874                                        "arguments": "",
1875                                    }
1876                                }]
1877                            }),
1878                            None,
1879                        );
1880                        yield Ok(openai_stream_event(&tool_chunk));
1881                    }
1882                    crate::provider::StreamChunk::ToolCallDelta { id, arguments_delta } => {
1883                        if arguments_delta.is_empty() {
1884                            continue;
1885                        }
1886                        saw_tool_calls = true;
1887                        let index = *tool_call_indices.entry(id.clone()).or_insert_with(|| {
1888                            let value = next_tool_call_index;
1889                            next_tool_call_index += 1;
1890                            value
1891                        });
1892                        let tool_delta_chunk = openai_stream_chunk(
1893                            &stream_id,
1894                            now,
1895                            &stream_model,
1896                            serde_json::json!({
1897                                "tool_calls": [{
1898                                    "index": index,
1899                                    "id": id,
1900                                    "type": "function",
1901                                    "function": {
1902                                        "arguments": arguments_delta,
1903                                    }
1904                                }]
1905                            }),
1906                            None,
1907                        );
1908                        yield Ok(openai_stream_event(&tool_delta_chunk));
1909                    }
1910                    crate::provider::StreamChunk::ToolCallEnd { .. } => {}
1911                    crate::provider::StreamChunk::Done { .. } => {}
1912                    crate::provider::StreamChunk::Error(error) => {
1913                        let error_chunk = openai_stream_chunk(
1914                            &stream_id,
1915                            now,
1916                            &stream_model,
1917                            serde_json::json!({ "content": error }),
1918                            Some("error"),
1919                        );
1920                        yield Ok(openai_stream_event(&error_chunk));
1921                        yield Ok(Event::default().data("[DONE]"));
1922                        return;
1923                    }
1924                }
1925            }
1926
1927            let finish_reason = if saw_tool_calls && !saw_text {
1928                "tool_calls"
1929            } else {
1930                "stop"
1931            };
1932            let final_chunk = openai_stream_chunk(
1933                &stream_id,
1934                now,
1935                &stream_model,
1936                serde_json::json!({}),
1937                Some(finish_reason),
1938            );
1939            yield Ok(openai_stream_event(&final_chunk));
1940            yield Ok(Event::default().data("[DONE]"));
1941        };
1942
1943        return Ok(Sse::new(event_stream)
1944            .keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
1945            .into_response());
1946    }
1947
1948    let completion = provider
1949        .complete(completion_request)
1950        .await
1951        .map_err(|error| {
1952            tracing::warn!(
1953                provider = %selected_provider,
1954                model = %model_id,
1955                error = %error,
1956                "Completion request failed"
1957            );
1958            openai_internal_error(error.to_string())
1959        })?;
1960
1961    let (content, tool_calls) = convert_response_message(&completion.message);
1962
1963    Ok(Json(OpenAiChatCompletionResponse {
1964        id: chat_id,
1965        object: "chat.completion".to_string(),
1966        created: now,
1967        model: openai_model,
1968        choices: vec![OpenAiChatCompletionChoice {
1969            index: 0,
1970            message: OpenAiChatCompletionMessage {
1971                role: "assistant".to_string(),
1972                content,
1973                tool_calls,
1974            },
1975            finish_reason: convert_finish_reason(completion.finish_reason).to_string(),
1976        }],
1977        usage: OpenAiUsage {
1978            prompt_tokens: completion.usage.prompt_tokens,
1979            completion_tokens: completion.usage.completion_tokens,
1980            total_tokens: completion.usage.total_tokens,
1981        },
1982    })
1983    .into_response())
1984}
1985
1986async fn start_cognition(
1987    State(state): State<AppState>,
1988    payload: Option<Json<StartCognitionRequest>>,
1989) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
1990    state
1991        .cognition
1992        .start(payload.map(|Json(body)| body))
1993        .await
1994        .map(Json)
1995        .map_err(internal_error)
1996}
1997
1998async fn stop_cognition(
1999    State(state): State<AppState>,
2000    payload: Option<Json<StopCognitionRequest>>,
2001) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2002    let reason = payload.and_then(|Json(body)| body.reason);
2003    state
2004        .cognition
2005        .stop(reason)
2006        .await
2007        .map(Json)
2008        .map_err(internal_error)
2009}
2010
2011async fn get_cognition_status(
2012    State(state): State<AppState>,
2013) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
2014    Ok(Json(state.cognition.status().await))
2015}
2016
2017async fn stream_cognition(
2018    State(state): State<AppState>,
2019) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2020    let rx = state.cognition.subscribe_events();
2021
2022    let event_stream = stream::unfold(rx, |mut rx| async move {
2023        match rx.recv().await {
2024            Ok(event) => {
2025                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
2026                let sse_event = Event::default().event("cognition").data(payload);
2027                Some((Ok(sse_event), rx))
2028            }
2029            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2030                let lag_event = Event::default()
2031                    .event("lag")
2032                    .data(format!("skipped {}", skipped));
2033                Some((Ok(lag_event), rx))
2034            }
2035            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2036        }
2037    });
2038
2039    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2040}
2041
2042/// Stream bus events as SSE, filtered by JWT topic claims.
2043///
2044/// The JWT must contain a `topics` claim with an array of topic patterns
2045/// to subscribe to. Events are filtered to only include envelopes whose
2046/// topic matches one of the allowed patterns.
2047async fn stream_bus_events(
2048    State(state): State<AppState>,
2049    req: Request<Body>,
2050) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
2051    // Extract JWT claims from request extensions (set by auth middleware)
2052    let allowed_topics: Vec<String> = req
2053        .extensions()
2054        .get::<crate::server::auth::JwtClaims>()
2055        .map(|claims| claims.topics.clone())
2056        .unwrap_or_default();
2057
2058    // Subscribe to the bus
2059    let bus_handle = state.bus.handle("stream_bus_events");
2060    let rx = bus_handle.into_receiver();
2061
2062    let event_stream = stream::unfold(rx, move |mut rx: broadcast::Receiver<BusEnvelope>| {
2063        let allowed_topics = allowed_topics.clone();
2064        async move {
2065            match rx.recv().await {
2066                Ok(envelope) => {
2067                    // Filter by allowed topics if any are specified
2068                    let should_send = allowed_topics.is_empty()
2069                        || allowed_topics
2070                            .iter()
2071                            .any(|pattern| topic_matches(&envelope.topic, pattern));
2072
2073                    if should_send {
2074                        let payload =
2075                            serde_json::to_string(&envelope).unwrap_or_else(|_| "{}".to_string());
2076                        let sse_event = Event::default().event("bus").data(payload);
2077                        return Some((Ok(sse_event), rx));
2078                    }
2079
2080                    // Skip this event but keep the receiver
2081                    Some((Ok(Event::default().event("keepalive").data("")), rx))
2082                }
2083                Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
2084                    let lag_event = Event::default()
2085                        .event("lag")
2086                        .data(format!("skipped {}", skipped));
2087                    Some((Ok(lag_event), rx))
2088                }
2089                Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
2090            }
2091        }
2092    });
2093
2094    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
2095}
2096
2097/// Check if a topic matches a pattern.
2098/// Supports wildcards: `agent.*` matches `agent.123`, `agent.*.events` matches `agent.456.events`
2099fn topic_matches(topic: &str, pattern: &str) -> bool {
2100    if pattern == "*" {
2101        return true;
2102    }
2103    if let Some(prefix) = pattern.strip_suffix(".*") {
2104        return topic.starts_with(prefix);
2105    }
2106    if let Some(suffix) = pattern.strip_prefix(".*") {
2107        return topic.ends_with(suffix);
2108    }
2109    topic == pattern
2110}
2111
2112async fn get_latest_snapshot(
2113    State(state): State<AppState>,
2114) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
2115    match state.cognition.latest_snapshot().await {
2116        Some(snapshot) => Ok(Json(snapshot)),
2117        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
2118    }
2119}
2120
2121async fn create_persona(
2122    State(state): State<AppState>,
2123    Json(req): Json<CreatePersonaRequest>,
2124) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2125    state
2126        .cognition
2127        .create_persona(req)
2128        .await
2129        .map(Json)
2130        .map_err(internal_error)
2131}
2132
2133async fn spawn_persona(
2134    State(state): State<AppState>,
2135    Path(id): Path<String>,
2136    Json(req): Json<SpawnPersonaRequest>,
2137) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2138    state
2139        .cognition
2140        .spawn_child(&id, req)
2141        .await
2142        .map(Json)
2143        .map_err(internal_error)
2144}
2145
2146async fn reap_persona(
2147    State(state): State<AppState>,
2148    Path(id): Path<String>,
2149    payload: Option<Json<ReapPersonaRequest>>,
2150) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
2151    let req = payload
2152        .map(|Json(body)| body)
2153        .unwrap_or(ReapPersonaRequest {
2154            cascade: Some(false),
2155            reason: None,
2156        });
2157
2158    state
2159        .cognition
2160        .reap_persona(&id, req)
2161        .await
2162        .map(Json)
2163        .map_err(internal_error)
2164}
2165
2166async fn get_swarm_lineage(
2167    State(state): State<AppState>,
2168) -> Result<Json<LineageGraph>, (StatusCode, String)> {
2169    Ok(Json(state.cognition.lineage_graph().await))
2170}
2171
2172// ── Belief, Attention, Governance, Workspace handlers ──
2173
2174#[derive(Deserialize)]
2175struct BeliefFilter {
2176    status: Option<String>,
2177    persona: Option<String>,
2178}
2179
2180async fn list_beliefs(
2181    State(state): State<AppState>,
2182    Query(filter): Query<BeliefFilter>,
2183) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
2184    let beliefs = state.cognition.get_beliefs().await;
2185    let mut result: Vec<Belief> = beliefs.into_values().collect();
2186
2187    if let Some(status) = &filter.status {
2188        result.retain(|b| {
2189            let s = serde_json::to_string(&b.status).unwrap_or_default();
2190            s.contains(status)
2191        });
2192    }
2193    if let Some(persona) = &filter.persona {
2194        result.retain(|b| &b.asserted_by == persona);
2195    }
2196
2197    result.sort_by(|a, b| {
2198        b.confidence
2199            .partial_cmp(&a.confidence)
2200            .unwrap_or(std::cmp::Ordering::Equal)
2201    });
2202    Ok(Json(result))
2203}
2204
2205async fn get_belief(
2206    State(state): State<AppState>,
2207    Path(id): Path<String>,
2208) -> Result<Json<Belief>, (StatusCode, String)> {
2209    match state.cognition.get_belief(&id).await {
2210        Some(belief) => Ok(Json(belief)),
2211        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
2212    }
2213}
2214
2215async fn list_attention(
2216    State(state): State<AppState>,
2217) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
2218    Ok(Json(state.cognition.get_attention_queue().await))
2219}
2220
2221async fn list_proposals(
2222    State(state): State<AppState>,
2223) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
2224    let proposals = state.cognition.get_proposals().await;
2225    let mut result: Vec<Proposal> = proposals.into_values().collect();
2226    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
2227    Ok(Json(result))
2228}
2229
2230async fn approve_proposal(
2231    State(state): State<AppState>,
2232    Path(id): Path<String>,
2233) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2234    state
2235        .cognition
2236        .approve_proposal(&id)
2237        .await
2238        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
2239        .map_err(internal_error)
2240}
2241
2242async fn list_receipts(
2243    State(state): State<AppState>,
2244) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
2245    Ok(Json(state.cognition.get_receipts().await))
2246}
2247
2248async fn get_workspace(
2249    State(state): State<AppState>,
2250) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
2251    Ok(Json(state.cognition.get_workspace().await))
2252}
2253
2254async fn get_governance(
2255    State(state): State<AppState>,
2256) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
2257    Ok(Json(state.cognition.get_governance().await))
2258}
2259
2260async fn get_persona(
2261    State(state): State<AppState>,
2262    axum::extract::Path(id): axum::extract::Path<String>,
2263) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
2264    state
2265        .cognition
2266        .get_persona(&id)
2267        .await
2268        .map(Json)
2269        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
2270}
2271
2272// ── Audit trail endpoints ──
2273
2274#[derive(Deserialize)]
2275struct AuditQuery {
2276    limit: Option<usize>,
2277    category: Option<String>,
2278}
2279
2280async fn list_audit_entries(
2281    State(state): State<AppState>,
2282    Query(query): Query<AuditQuery>,
2283) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
2284    let limit = query.limit.unwrap_or(100).min(1000);
2285
2286    let entries = if let Some(ref cat) = query.category {
2287        let category = match cat.as_str() {
2288            "api" => AuditCategory::Api,
2289            "tool" | "tool_execution" => AuditCategory::ToolExecution,
2290            "session" => AuditCategory::Session,
2291            "cognition" => AuditCategory::Cognition,
2292            "swarm" => AuditCategory::Swarm,
2293            "auth" => AuditCategory::Auth,
2294            "k8s" => AuditCategory::K8s,
2295            "sandbox" => AuditCategory::Sandbox,
2296            "config" => AuditCategory::Config,
2297            _ => {
2298                return Err((
2299                    StatusCode::BAD_REQUEST,
2300                    format!("Unknown category: {}", cat),
2301                ));
2302            }
2303        };
2304        state.audit_log.by_category(category, limit).await
2305    } else {
2306        state.audit_log.recent(limit).await
2307    };
2308
2309    Ok(Json(entries))
2310}
2311
2312// ── Event Stream Replay API ──
2313// Enables auditors to reconstruct sessions from byte-range offsets.
2314// This is the key compliance feature for SOC 2, FedRAMP, and ATO processes.
2315
2316#[derive(Deserialize)]
2317struct ReplayQuery {
2318    /// Session ID to replay
2319    session_id: String,
2320    /// Optional: starting byte offset (for seeking to specific point)
2321    start_offset: Option<u64>,
2322    /// Optional: ending byte offset
2323    end_offset: Option<u64>,
2324    /// Optional: limit number of events to return
2325    limit: Option<usize>,
2326    /// Optional: filter by tool name
2327    tool_name: Option<String>,
2328}
2329
2330/// Replay session events from the JSONL event stream by byte-range offsets.
2331/// This allows auditors to reconstruct exactly what happened in a session,
2332/// including tool execution durations and success/failure status.
2333async fn replay_session_events(
2334    Query(query): Query<ReplayQuery>,
2335) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
2336    use std::path::PathBuf;
2337
2338    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2339        .map(PathBuf::from)
2340        .ok()
2341        .ok_or_else(|| {
2342            (
2343                StatusCode::SERVICE_UNAVAILABLE,
2344                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2345            )
2346        })?;
2347
2348    let session_dir = base_dir.join(&query.session_id);
2349
2350    // Check if session directory exists
2351    if !session_dir.exists() {
2352        return Err((
2353            StatusCode::NOT_FOUND,
2354            format!("Session not found: {}", query.session_id),
2355        ));
2356    }
2357
2358    let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
2359
2360    // Read all event files in the session directory using std::fs for simplicity
2361    let entries = std::fs::read_dir(&session_dir)
2362        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2363
2364    for entry in entries {
2365        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2366        let path = entry.path();
2367
2368        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2369            continue;
2370        }
2371
2372        // Parse byte range from filename: {timestamp}-chat-events-{start}-{end}.jsonl
2373        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2374
2375        if let Some(offsets) = filename
2376            .strip_prefix("T")
2377            .or_else(|| filename.strip_prefix("202"))
2378        {
2379            // Extract start and end offsets from filename
2380            let parts: Vec<&str> = offsets.split('-').collect();
2381            if parts.len() >= 4 {
2382                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2383                let end: u64 = parts[parts.len() - 1]
2384                    .trim_end_matches(".jsonl")
2385                    .parse()
2386                    .unwrap_or(0);
2387
2388                // Filter by byte range if specified
2389                if let Some(query_start) = query.start_offset
2390                    && end <= query_start
2391                {
2392                    continue;
2393                }
2394                if let Some(query_end) = query.end_offset
2395                    && start >= query_end
2396                {
2397                    continue;
2398                }
2399
2400                // Read and parse events from this file
2401                let content = std::fs::read_to_string(&path)
2402                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2403
2404                for line in content.lines() {
2405                    if line.trim().is_empty() {
2406                        continue;
2407                    }
2408                    if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
2409                        // Filter by tool name if specified
2410                        if let Some(ref tool_filter) = query.tool_name {
2411                            if let Some(event_tool) =
2412                                event.get("tool_name").and_then(|v| v.as_str())
2413                            {
2414                                if event_tool != tool_filter {
2415                                    continue;
2416                                }
2417                            } else {
2418                                continue;
2419                            }
2420                        }
2421                        all_events.push((start, end, event));
2422                    }
2423                }
2424            }
2425        }
2426    }
2427
2428    // Sort by start offset
2429    all_events.sort_by_key(|(s, _, _)| *s);
2430
2431    // Apply limit
2432    let limit = query.limit.unwrap_or(1000).min(10000);
2433    let events: Vec<_> = all_events
2434        .into_iter()
2435        .take(limit)
2436        .map(|(_, _, e)| e)
2437        .collect();
2438
2439    Ok(Json(events))
2440}
2441
2442/// Get session event files metadata (for audit index)
2443async fn list_session_event_files(
2444    Query(query): Query<ReplayQuery>,
2445) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
2446    use std::path::PathBuf;
2447
2448    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
2449        .map(PathBuf::from)
2450        .ok()
2451        .ok_or_else(|| {
2452            (
2453                StatusCode::SERVICE_UNAVAILABLE,
2454                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
2455            )
2456        })?;
2457
2458    let session_dir = base_dir.join(&query.session_id);
2459    if !session_dir.exists() {
2460        return Err((
2461            StatusCode::NOT_FOUND,
2462            format!("Session not found: {}", query.session_id),
2463        ));
2464    }
2465
2466    let mut files: Vec<EventFileMeta> = Vec::new();
2467
2468    // Use std::fs for simplicity
2469    let entries = std::fs::read_dir(&session_dir)
2470        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2471
2472    for entry in entries {
2473        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2474        let path = entry.path();
2475
2476        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
2477            continue;
2478        }
2479
2480        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
2481
2482        // Parse byte range from filename
2483        if let Some(offsets) = filename
2484            .strip_prefix("T")
2485            .or_else(|| filename.strip_prefix("202"))
2486        {
2487            let parts: Vec<&str> = offsets.split('-').collect();
2488            if parts.len() >= 4 {
2489                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
2490                let end: u64 = parts[parts.len() - 1]
2491                    .trim_end_matches(".jsonl")
2492                    .parse()
2493                    .unwrap_or(0);
2494
2495                let metadata = std::fs::metadata(&path)
2496                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
2497
2498                files.push(EventFileMeta {
2499                    filename: filename.to_string(),
2500                    start_offset: start,
2501                    end_offset: end,
2502                    size_bytes: metadata.len(),
2503                });
2504            }
2505        }
2506    }
2507
2508    files.sort_by_key(|f| f.start_offset);
2509    Ok(Json(files))
2510}
2511
2512#[derive(Serialize)]
2513struct EventFileMeta {
2514    filename: String,
2515    start_offset: u64,
2516    end_offset: u64,
2517    size_bytes: u64,
2518}
2519
2520// ── K8s self-deployment endpoints ──
2521
2522async fn get_k8s_status(
2523    State(state): State<AppState>,
2524) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
2525    Ok(Json(state.k8s.status().await))
2526}
2527
2528#[derive(Deserialize)]
2529struct ScaleRequest {
2530    replicas: i32,
2531}
2532
2533async fn k8s_scale(
2534    State(state): State<AppState>,
2535    Json(req): Json<ScaleRequest>,
2536) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2537    if req.replicas < 0 || req.replicas > 100 {
2538        return Err((
2539            StatusCode::BAD_REQUEST,
2540            "Replicas must be between 0 and 100".to_string(),
2541        ));
2542    }
2543
2544    state
2545        .audit_log
2546        .log(
2547            AuditCategory::K8s,
2548            format!("scale:{}", req.replicas),
2549            AuditOutcome::Success,
2550            None,
2551            None,
2552        )
2553        .await;
2554
2555    state
2556        .k8s
2557        .scale(req.replicas)
2558        .await
2559        .map(Json)
2560        .map_err(internal_error)
2561}
2562
2563async fn k8s_restart(
2564    State(state): State<AppState>,
2565) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2566    state
2567        .audit_log
2568        .log(
2569            AuditCategory::K8s,
2570            "rolling_restart",
2571            AuditOutcome::Success,
2572            None,
2573            None,
2574        )
2575        .await;
2576
2577    state
2578        .k8s
2579        .rolling_restart()
2580        .await
2581        .map(Json)
2582        .map_err(internal_error)
2583}
2584
2585async fn k8s_list_pods(
2586    State(state): State<AppState>,
2587) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
2588    state
2589        .k8s
2590        .list_pods()
2591        .await
2592        .map(Json)
2593        .map_err(internal_error)
2594}
2595
2596async fn k8s_actions(
2597    State(state): State<AppState>,
2598) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
2599    Ok(Json(state.k8s.recent_actions(100).await))
2600}
2601
2602#[derive(Deserialize)]
2603struct SpawnSubagentRequest {
2604    subagent_id: String,
2605    #[serde(default)]
2606    image: Option<String>,
2607    #[serde(default)]
2608    env_vars: std::collections::HashMap<String, String>,
2609    #[serde(default)]
2610    labels: std::collections::HashMap<String, String>,
2611    #[serde(default)]
2612    command: Option<Vec<String>>,
2613    #[serde(default)]
2614    args: Option<Vec<String>>,
2615}
2616
2617async fn k8s_spawn_subagent(
2618    State(state): State<AppState>,
2619    Json(req): Json<SpawnSubagentRequest>,
2620) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2621    state
2622        .k8s
2623        .spawn_subagent_pod_with_spec(
2624            &req.subagent_id,
2625            crate::k8s::SubagentPodSpec {
2626                image: req.image,
2627                env_vars: req.env_vars,
2628                labels: req.labels,
2629                command: req.command,
2630                args: req.args,
2631            },
2632        )
2633        .await
2634        .map(Json)
2635        .map_err(internal_error)
2636}
2637
2638async fn k8s_delete_subagent(
2639    State(state): State<AppState>,
2640    axum::extract::Path(id): axum::extract::Path<String>,
2641) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
2642    state
2643        .k8s
2644        .delete_subagent_pod(&id)
2645        .await
2646        .map(Json)
2647        .map_err(internal_error)
2648}
2649
2650/// Request to register a tool
2651#[derive(Debug, Deserialize)]
2652pub struct RegisterToolRequest {
2653    pub id: String,
2654    pub name: String,
2655    pub description: String,
2656    pub version: String,
2657    pub endpoint: String,
2658    pub capabilities: Vec<String>,
2659    pub parameters: serde_json::Value,
2660}
2661
2662/// Response from registering a tool
2663#[derive(Serialize)]
2664struct RegisterToolResponse {
2665    tool: RegisteredTool,
2666    message: String,
2667}
2668
2669/// List all registered tools (active, non-expired)
2670async fn list_tools(State(state): State<AppState>) -> Json<Vec<RegisteredTool>> {
2671    Json(state.tool_registry.list().await)
2672}
2673
2674/// Register a new tool
2675async fn register_tool(
2676    State(state): State<AppState>,
2677    Json(req): Json<RegisterToolRequest>,
2678) -> Result<Json<RegisterToolResponse>, (StatusCode, String)> {
2679    let now = chrono::Utc::now();
2680    let tool = RegisteredTool {
2681        id: req.id.clone(),
2682        name: req.name,
2683        description: req.description,
2684        version: req.version,
2685        endpoint: req.endpoint,
2686        capabilities: req.capabilities,
2687        parameters: req.parameters,
2688        registered_at: now,
2689        last_heartbeat: now,
2690        expires_at: now + Duration::from_secs(90),
2691    };
2692
2693    state.tool_registry.register(tool.clone()).await;
2694
2695    tracing::info!(tool_id = %tool.id, "Tool registered");
2696
2697    Ok(Json(RegisterToolResponse {
2698        tool,
2699        message: "Tool registered successfully. Heartbeat required every 30s.".to_string(),
2700    }))
2701}
2702
2703/// Heartbeat endpoint to extend tool TTL
2704async fn tool_heartbeat(
2705    State(state): State<AppState>,
2706    Path(id): Path<String>,
2707) -> Result<Json<RegisteredTool>, (StatusCode, String)> {
2708    state
2709        .tool_registry
2710        .heartbeat(&id)
2711        .await
2712        .map(|tool| {
2713            tracing::info!(tool_id = %id, "Tool heartbeat received");
2714            Json(tool)
2715        })
2716        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Tool {} not found", id)))
2717}
2718
2719/// List registered plugins.
2720async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
2721    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
2722    let signing_key = SigningKey::from_env();
2723    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
2724    Json(PluginListResponse {
2725        server_fingerprint,
2726        signing_available: !test_sig.is_empty(),
2727        plugins: Vec::<PluginManifest>::new(),
2728    })
2729}
2730
2731#[derive(Serialize)]
2732struct PluginListResponse {
2733    server_fingerprint: String,
2734    signing_available: bool,
2735    plugins: Vec<PluginManifest>,
2736}
2737
2738// ── Agent task compatibility surface ─────────────────────────────────────
2739// These handlers provide the /v1/agent/tasks/* surface the marketing-site
2740// dashboard expects, backed by the existing KnativeTaskQueue.
2741
2742/// List all agent tasks (wraps Knative task queue)
2743async fn list_agent_tasks(
2744    State(state): State<AppState>,
2745    Query(params): Query<ListAgentTasksQuery>,
2746) -> Json<serde_json::Value> {
2747    let tasks = state.knative_tasks.list().await;
2748    let filtered: Vec<_> = tasks
2749        .into_iter()
2750        .filter(|t| params.status.as_ref().is_none_or(|s| t.status == *s))
2751        .filter(|t| {
2752            params
2753                .agent_type
2754                .as_ref()
2755                .is_none_or(|a| t.agent_type == *a)
2756        })
2757        .collect();
2758    Json(serde_json::json!({ "tasks": filtered, "total": filtered.len() }))
2759}
2760
2761#[derive(Deserialize)]
2762struct ListAgentTasksQuery {
2763    status: Option<String>,
2764    agent_type: Option<String>,
2765}
2766
2767/// Create a new agent task
2768async fn create_agent_task(
2769    State(state): State<AppState>,
2770    Json(req): Json<CreateAgentTaskRequest>,
2771) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2772    let task_id = uuid::Uuid::new_v4().to_string();
2773    let task = KnativeTask {
2774        task_id: task_id.clone(),
2775        title: req.title,
2776        description: req.description,
2777        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2778        priority: req.priority.unwrap_or(0),
2779        received_at: chrono::Utc::now(),
2780        status: "pending".to_string(),
2781    };
2782    state.knative_tasks.push(task).await;
2783    Ok(Json(serde_json::json!({
2784        "task_id": task_id,
2785        "status": "pending",
2786    })))
2787}
2788
2789#[derive(Deserialize)]
2790#[allow(dead_code)]
2791struct CreateAgentTaskRequest {
2792    title: String,
2793    description: String,
2794    agent_type: Option<String>,
2795    model: Option<String>,
2796    priority: Option<i32>,
2797}
2798
2799/// Get a single agent task by ID
2800async fn get_agent_task(
2801    State(state): State<AppState>,
2802    Path(task_id): Path<String>,
2803) -> Result<Json<KnativeTask>, (StatusCode, String)> {
2804    state
2805        .knative_tasks
2806        .get(&task_id)
2807        .await
2808        .map(Json)
2809        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
2810}
2811
2812/// Get task output (returns task details including result)
2813async fn get_agent_task_output(
2814    State(state): State<AppState>,
2815    Path(task_id): Path<String>,
2816) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2817    let task = state
2818        .knative_tasks
2819        .get(&task_id)
2820        .await
2821        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2822    Ok(Json(serde_json::json!({
2823        "task_id": task.task_id,
2824        "status": task.status,
2825        "title": task.title,
2826        "output": null,
2827    })))
2828}
2829
2830/// Receive task output from worker and broadcast via bus for SSE streaming
2831#[derive(Deserialize)]
2832struct TaskOutputPayload {
2833    #[allow(dead_code)]
2834    #[serde(default)]
2835    worker_id: Option<String>,
2836    #[serde(default)]
2837    output: Option<String>,
2838}
2839
2840async fn agent_task_output(
2841    State(state): State<AppState>,
2842    Path(task_id): Path<String>,
2843    Json(payload): Json<TaskOutputPayload>,
2844) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2845    // Verify task exists
2846    let task_exists = state.knative_tasks.get(&task_id).await.is_some();
2847    if !task_exists {
2848        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
2849    }
2850
2851    // Update task status to Working if it's still pending
2852    let _ = state.knative_tasks.update_status(&task_id, "working").await;
2853
2854    // Broadcast output via bus for SSE subscribers
2855    if let Some(ref output) = payload.output {
2856        let bus_handle = state.bus.handle("task-output");
2857        let _ = bus_handle.send(
2858            format!("task.{}", task_id),
2859            crate::bus::BusMessage::TaskUpdate {
2860                task_id: task_id.clone(),
2861                state: crate::a2a::types::TaskState::Working,
2862                message: Some(output.clone()),
2863            },
2864        );
2865    }
2866
2867    Ok(Json(serde_json::json!({
2868        "task_id": task_id,
2869        "status": "received",
2870    })))
2871}
2872
2873/// Stream task output as SSE events
2874async fn stream_agent_task_output(
2875    State(state): State<AppState>,
2876    Path(task_id): Path<String>,
2877) -> Result<Sse<impl stream::Stream<Item = Result<Event, Infallible>>>, (StatusCode, String)> {
2878    // Verify task exists
2879    state
2880        .knative_tasks
2881        .get(&task_id)
2882        .await
2883        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))?;
2884
2885    let bus_handle = state.bus.handle("task-stream");
2886    let rx = bus_handle.into_receiver();
2887    let topic_prefix = format!("task.{task_id}");
2888
2889    let stream = async_stream::try_stream! {
2890        let mut rx = rx;
2891        loop {
2892            match rx.recv().await {
2893                Ok(envelope) => {
2894                    if !envelope.topic.starts_with(&topic_prefix) {
2895                        continue;
2896                    }
2897                    let data = serde_json::to_string(&envelope.message).unwrap_or_default();
2898                    yield Event::default().event("output").data(data);
2899                }
2900                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
2901                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
2902            }
2903        }
2904    };
2905
2906    Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
2907}
2908
2909// ── Worker connectivity ─────────────────────────────────────────────────
2910
2911/// List connected workers (K8s pods with agent label)
2912async fn list_connected_workers(
2913    State(state): State<AppState>,
2914) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2915    // Use K8s pod listing as proxy for connected workers
2916    let pods = state.k8s.list_pods().await.unwrap_or_default();
2917    let workers: Vec<serde_json::Value> = pods
2918        .iter()
2919        .filter(|p| p.name.contains("codetether") || p.name.contains("worker"))
2920        .map(|p| {
2921            serde_json::json!({
2922                "worker_id": p.name,
2923                "name": p.name,
2924                "status": p.phase,
2925                "is_sse_connected": p.phase == "Running",
2926                "last_seen": chrono::Utc::now().to_rfc3339(),
2927            })
2928        })
2929        .collect();
2930    Ok(Json(serde_json::json!({ "workers": workers })))
2931}
2932
2933// ── Task dispatch ───────────────────────────────────────────────────────
2934
2935/// Dispatch a task (creates a Knative task and returns immediately)
2936async fn dispatch_task(
2937    State(state): State<AppState>,
2938    Json(req): Json<DispatchTaskRequest>,
2939) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2940    let task_id = uuid::Uuid::new_v4().to_string();
2941    let task = KnativeTask {
2942        task_id: task_id.clone(),
2943        title: req.title,
2944        description: req.description,
2945        agent_type: req.agent_type.unwrap_or_else(|| "build".to_string()),
2946        priority: req.priority.unwrap_or(0),
2947        received_at: chrono::Utc::now(),
2948        status: "pending".to_string(),
2949    };
2950
2951    // Publish task event to agent bus for connected workers
2952    let handle = state.bus.handle("task-dispatch");
2953    handle.send(
2954        format!("task.{}", task_id),
2955        crate::bus::BusMessage::TaskUpdate {
2956            task_id: task_id.clone(),
2957            state: crate::a2a::types::TaskState::Submitted,
2958            message: Some(format!("Task dispatched: {}", task.title)),
2959        },
2960    );
2961
2962    state.knative_tasks.push(task).await;
2963
2964    Ok(Json(serde_json::json!({
2965        "task_id": task_id,
2966        "status": "pending",
2967        "dispatched_via_knative": true,
2968    })))
2969}
2970
2971#[derive(Deserialize)]
2972#[allow(dead_code)]
2973struct DispatchTaskRequest {
2974    title: String,
2975    description: String,
2976    agent_type: Option<String>,
2977    model: Option<String>,
2978    priority: Option<i32>,
2979    metadata: Option<serde_json::Value>,
2980}
2981
2982// ── Voice REST bridge ───────────────────────────────────────────────────
2983// Bridges the REST /v1/voice/* surface the dashboard expects to the
2984// existing gRPC VoiceService implementation via the AgentBus.
2985
2986/// Create a voice session (REST bridge for VoiceService::CreateVoiceSession)
2987async fn create_voice_session_rest(
2988    State(state): State<AppState>,
2989    Json(req): Json<CreateVoiceSessionRequest>,
2990) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
2991    let voice_id = req.voice.unwrap_or_else(|| "960f89fc".to_string());
2992    let room_name = format!("voice-{}", uuid::Uuid::new_v4());
2993
2994    // Publish session started event to bus
2995    let handle = state.bus.handle("voice-rest");
2996    handle.send_voice_session_started(&room_name, &voice_id);
2997
2998    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
2999
3000    Ok(Json(serde_json::json!({
3001        "room_name": room_name,
3002        "voice": voice_id,
3003        "mode": req.mode.unwrap_or_else(|| "chat".to_string()),
3004        "access_token": "",
3005        "livekit_url": livekit_url,
3006        "expires_at": (chrono::Utc::now() + chrono::Duration::hours(1)).to_rfc3339(),
3007    })))
3008}
3009
3010#[derive(Deserialize)]
3011#[allow(dead_code)]
3012struct CreateVoiceSessionRequest {
3013    voice: Option<String>,
3014    mode: Option<String>,
3015    codebase_id: Option<String>,
3016    user_id: Option<String>,
3017}
3018
3019/// Get a voice session (REST bridge)
3020async fn get_voice_session_rest(Path(room_name): Path<String>) -> Json<serde_json::Value> {
3021    let livekit_url = std::env::var("LIVEKIT_URL").unwrap_or_default();
3022    Json(serde_json::json!({
3023        "room_name": room_name,
3024        "state": "active",
3025        "agent_state": "listening",
3026        "access_token": "",
3027        "livekit_url": livekit_url,
3028    }))
3029}
3030
3031/// Delete a voice session (REST bridge)
3032async fn delete_voice_session_rest(
3033    State(state): State<AppState>,
3034    Path(room_name): Path<String>,
3035) -> Json<serde_json::Value> {
3036    let handle = state.bus.handle("voice-rest");
3037    handle.send_voice_session_ended(&room_name, "user_ended");
3038    Json(serde_json::json!({ "status": "deleted" }))
3039}
3040
3041/// List available voices (REST bridge)
3042async fn list_voices_rest() -> Json<serde_json::Value> {
3043    Json(serde_json::json!({
3044        "voices": [
3045            { "voice_id": "960f89fc", "name": "Riley", "language": "english" },
3046            { "voice_id": "puck", "name": "Puck", "language": "english" },
3047            { "voice_id": "charon", "name": "Charon", "language": "english" },
3048            { "voice_id": "kore", "name": "Kore", "language": "english" },
3049            { "voice_id": "fenrir", "name": "Fenrir", "language": "english" },
3050            { "voice_id": "aoede", "name": "Aoede", "language": "english" },
3051        ]
3052    }))
3053}
3054
3055// ── Session resume ──────────────────────────────────────────────────────
3056
3057/// Resume a codebase-scoped session (what the dashboard session resume flow calls)
3058async fn resume_codebase_session(
3059    Path((_codebase_id, session_id)): Path<(String, String)>,
3060    Json(req): Json<ResumeSessionRequest>,
3061) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
3062    // Load the session if it exists, or create a new one
3063    let mut session = match crate::session::Session::load(&session_id).await {
3064        Ok(s) => s,
3065        Err(_) => {
3066            // Create a new session for this codebase
3067            let mut s = crate::session::Session::new()
3068                .await
3069                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3070            if let Some(agent) = &req.agent {
3071                s.set_agent_name(agent.clone());
3072            }
3073            s.save()
3074                .await
3075                .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
3076            s
3077        }
3078    };
3079
3080    // If there's a prompt, execute it as a task
3081    if let Some(prompt) = &req.prompt
3082        && !prompt.is_empty()
3083    {
3084        match session.prompt(prompt).await {
3085            Ok(result) => {
3086                session.save().await.ok();
3087                return Ok(Json(serde_json::json!({
3088                    "session_id": session.id,
3089                    "active_session_id": session.id,
3090                    "status": "completed",
3091                    "result": result.text,
3092                })));
3093            }
3094            Err(e) => {
3095                return Ok(Json(serde_json::json!({
3096                    "session_id": session.id,
3097                    "active_session_id": session.id,
3098                    "status": "failed",
3099                    "error": e.to_string(),
3100                })));
3101            }
3102        }
3103    }
3104
3105    // No prompt = just resume/check session
3106    Ok(Json(serde_json::json!({
3107        "session_id": session.id,
3108        "active_session_id": session.id,
3109        "status": "ready",
3110    })))
3111}
3112
3113#[derive(Deserialize)]
3114#[allow(dead_code)]
3115struct ResumeSessionRequest {
3116    prompt: Option<String>,
3117    agent: Option<String>,
3118    model: Option<String>,
3119}
3120
3121fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
3122    let message = error.to_string();
3123    if message.contains("not found") {
3124        return (StatusCode::NOT_FOUND, message);
3125    }
3126    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
3127        return (StatusCode::BAD_REQUEST, message);
3128    }
3129    (StatusCode::INTERNAL_SERVER_ERROR, message)
3130}
3131
3132fn env_bool(name: &str, default: bool) -> bool {
3133    std::env::var(name)
3134        .ok()
3135        .and_then(|v| match v.to_ascii_lowercase().as_str() {
3136            "1" | "true" | "yes" | "on" => Some(true),
3137            "0" | "false" | "no" | "off" => Some(false),
3138            _ => None,
3139        })
3140        .unwrap_or(default)
3141}
3142
3143#[cfg(test)]
3144mod tests {
3145    use super::{match_policy_rule, normalize_model_reference};
3146
3147    #[test]
3148    fn policy_prompt_session_requires_execute_permission() {
3149        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
3150        assert_eq!(permission, Some("agent:execute"));
3151    }
3152
3153    #[test]
3154    fn policy_create_session_keeps_sessions_write_permission() {
3155        let permission = match_policy_rule("/api/session", "POST");
3156        assert_eq!(permission, Some("sessions:write"));
3157    }
3158
3159    #[test]
3160    fn policy_proposal_approval_requires_execute_permission() {
3161        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
3162        assert_eq!(permission, Some("agent:execute"));
3163    }
3164
3165    #[test]
3166    fn policy_openai_models_requires_read_permission() {
3167        let permission = match_policy_rule("/v1/models", "GET");
3168        assert_eq!(permission, Some("agent:read"));
3169    }
3170
3171    #[test]
3172    fn policy_openai_chat_completions_requires_execute_permission() {
3173        let permission = match_policy_rule("/v1/chat/completions", "POST");
3174        assert_eq!(permission, Some("agent:execute"));
3175    }
3176
3177    #[test]
3178    fn normalize_model_reference_accepts_colon_format() {
3179        assert_eq!(
3180            normalize_model_reference("openai:gpt-4o"),
3181            "openai/gpt-4o".to_string()
3182        );
3183    }
3184}