Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16    executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24    Router,
25    body::Body,
26    extract::Path,
27    extract::{Query, State},
28    http::{Request, StatusCode},
29    middleware::{self, Next},
30    response::sse::{Event, KeepAlive, Sse},
31    response::{Json, Response},
32    routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
40use tower_http::trace::TraceLayer;
41
42/// Task received from Knative Eventing
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct KnativeTask {
45    pub task_id: String,
46    pub title: String,
47    pub description: String,
48    pub agent_type: String,
49    pub priority: i32,
50    pub received_at: chrono::DateTime<chrono::Utc>,
51    pub status: String,
52}
53
54/// Queue for Knative tasks waiting to be processed
55#[derive(Clone)]
56pub struct KnativeTaskQueue {
57    tasks: Arc<Mutex<Vec<KnativeTask>>>,
58}
59
60impl KnativeTaskQueue {
61    pub fn new() -> Self {
62        Self {
63            tasks: Arc::new(Mutex::new(Vec::new())),
64        }
65    }
66
67    pub async fn push(&self, task: KnativeTask) {
68        self.tasks.lock().await.push(task);
69    }
70
71    pub async fn pop(&self) -> Option<KnativeTask> {
72        self.tasks.lock().await.pop()
73    }
74
75    pub async fn list(&self) -> Vec<KnativeTask> {
76        self.tasks.lock().await.clone()
77    }
78
79    pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
80        self.tasks
81            .lock()
82            .await
83            .iter()
84            .find(|t| t.task_id == task_id)
85            .cloned()
86    }
87
88    pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
89        let mut tasks = self.tasks.lock().await;
90        if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
91            task.status = status.to_string();
92            true
93        } else {
94            false
95        }
96    }
97}
98
99impl Default for KnativeTaskQueue {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105/// Server state shared across handlers
106#[derive(Clone)]
107pub struct AppState {
108    pub config: Arc<Config>,
109    pub cognition: Arc<CognitionRuntime>,
110    pub audit_log: AuditLog,
111    pub k8s: Arc<K8sManager>,
112    pub auth: AuthState,
113    pub bus: Arc<AgentBus>,
114    pub knative_tasks: KnativeTaskQueue,
115}
116
117/// Audit middleware — logs every request/response to the audit trail.
118async fn audit_middleware(
119    State(state): State<AppState>,
120    request: Request<Body>,
121    next: Next,
122) -> Response {
123    let method = request.method().clone();
124    let path = request.uri().path().to_string();
125    let started = std::time::Instant::now();
126
127    let response = next.run(request).await;
128
129    let duration_ms = started.elapsed().as_millis() as u64;
130    let status = response.status().as_u16();
131    let outcome = if status < 400 {
132        AuditOutcome::Success
133    } else if status == 401 || status == 403 {
134        AuditOutcome::Denied
135    } else {
136        AuditOutcome::Failure
137    };
138
139    state
140        .audit_log
141        .record(audit::AuditEntry {
142            id: uuid::Uuid::new_v4().to_string(),
143            timestamp: chrono::Utc::now(),
144            category: AuditCategory::Api,
145            action: format!("{} {}", method, path),
146            principal: None,
147            outcome,
148            detail: Some(serde_json::json!({ "status": status })),
149            duration_ms: Some(duration_ms),
150            okr_id: None,
151            okr_run_id: None,
152            relay_id: None,
153            session_id: None,
154        })
155        .await;
156
157    response
158}
159
160/// Mapping from (path pattern, HTTP method) → OPA permission action.
161/// The first matching rule wins.
162struct PolicyRule {
163    pattern: &'static str,
164    methods: Option<&'static [&'static str]>,
165    permission: &'static str,
166}
167
168const POLICY_RULES: &[PolicyRule] = &[
169    // Public / exempt
170    PolicyRule {
171        pattern: "/health",
172        methods: None,
173        permission: "",
174    },
175    PolicyRule {
176        pattern: "/task",
177        methods: None,
178        permission: "",
179    },
180    PolicyRule {
181        pattern: "/v1/knative/",
182        methods: None,
183        permission: "",
184    },
185    PolicyRule {
186        pattern: "/a2a/",
187        methods: None,
188        permission: "",
189    },
190    // K8s management — admin only
191    PolicyRule {
192        pattern: "/v1/k8s/scale",
193        methods: Some(&["POST"]),
194        permission: "admin:access",
195    },
196    PolicyRule {
197        pattern: "/v1/k8s/restart",
198        methods: Some(&["POST"]),
199        permission: "admin:access",
200    },
201    PolicyRule {
202        pattern: "/v1/k8s/",
203        methods: Some(&["GET"]),
204        permission: "admin:access",
205    },
206    // K8s sub-agent lifecycle — admin only
207    PolicyRule {
208        pattern: "/v1/k8s/subagent",
209        methods: Some(&["POST", "DELETE"]),
210        permission: "admin:access",
211    },
212    // Plugin registry — read
213    PolicyRule {
214        pattern: "/v1/plugins",
215        methods: Some(&["GET"]),
216        permission: "agent:read",
217    },
218    // Audit — admin
219    PolicyRule {
220        pattern: "/v1/audit",
221        methods: None,
222        permission: "admin:access",
223    },
224    // Event stream replay — admin (compliance feature)
225    PolicyRule {
226        pattern: "/v1/audit/replay",
227        methods: Some(&["GET"]),
228        permission: "admin:access",
229    },
230    // Cognition — write operations
231    PolicyRule {
232        pattern: "/v1/cognition/start",
233        methods: Some(&["POST"]),
234        permission: "agent:execute",
235    },
236    PolicyRule {
237        pattern: "/v1/cognition/stop",
238        methods: Some(&["POST"]),
239        permission: "agent:execute",
240    },
241    PolicyRule {
242        pattern: "/v1/cognition/",
243        methods: Some(&["GET"]),
244        permission: "agent:read",
245    },
246    // Swarm persona lifecycle
247    PolicyRule {
248        pattern: "/v1/swarm/personas",
249        methods: Some(&["POST"]),
250        permission: "agent:execute",
251    },
252    PolicyRule {
253        pattern: "/v1/swarm/",
254        methods: Some(&["POST"]),
255        permission: "agent:execute",
256    },
257    PolicyRule {
258        pattern: "/v1/swarm/",
259        methods: Some(&["GET"]),
260        permission: "agent:read",
261    },
262    // Session management
263    // Session prompt execution — requires execute permission
264    PolicyRule {
265        pattern: "/api/session/",
266        methods: Some(&["POST"]),
267        permission: "agent:execute",
268    },
269    PolicyRule {
270        pattern: "/api/session",
271        methods: Some(&["POST"]),
272        permission: "sessions:write",
273    },
274    PolicyRule {
275        pattern: "/api/session",
276        methods: Some(&["GET"]),
277        permission: "sessions:read",
278    },
279    // Proposal approval — governance action
280    PolicyRule {
281        pattern: "/v1/cognition/proposals/",
282        methods: Some(&["POST"]),
283        permission: "agent:execute",
284    },
285    // Config, version, providers, agents — read
286    PolicyRule {
287        pattern: "/api/version",
288        methods: None,
289        permission: "agent:read",
290    },
291    PolicyRule {
292        pattern: "/api/config",
293        methods: None,
294        permission: "agent:read",
295    },
296    PolicyRule {
297        pattern: "/api/provider",
298        methods: None,
299        permission: "agent:read",
300    },
301    PolicyRule {
302        pattern: "/api/agent",
303        methods: None,
304        permission: "agent:read",
305    },
306];
307
308/// Find the required permission for a given path + method.
309/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
310fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
311    for rule in POLICY_RULES {
312        let matches = if rule.pattern.ends_with('/') {
313            path.starts_with(rule.pattern)
314        } else {
315            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
316        };
317        if matches {
318            if let Some(allowed_methods) = rule.methods {
319                if !allowed_methods.contains(&method) {
320                    continue;
321                }
322            }
323            return Some(rule.permission);
324        }
325    }
326    None
327}
328
329/// Policy authorization middleware for Axum.
330///
331/// Maps request paths to OPA permission strings and enforces authorization.
332/// Runs after `require_auth` so the bearer token is already validated.
333/// Currently maps the static bearer token to an admin role since
334/// codetether-agent uses a single shared token model.
335async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
336    let path = request.uri().path().to_string();
337    let method = request.method().as_str().to_string();
338
339    let permission = match match_policy_rule(&path, &method) {
340        None | Some("") => return Ok(next.run(request).await),
341        Some(perm) => perm,
342    };
343
344    // The current auth model uses a single static token for all access.
345    // When this is the case, the authenticated user effectively has admin role.
346    // Future: extract user claims from JWT and build a proper PolicyUser.
347    let user = policy::PolicyUser {
348        user_id: "bearer-token-user".to_string(),
349        roles: vec!["admin".to_string()],
350        tenant_id: None,
351        scopes: vec![],
352        auth_source: "static_token".to_string(),
353    };
354
355    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
356        tracing::warn!(
357            path = %path,
358            method = %method,
359            permission = %permission,
360            "Policy middleware denied request"
361        );
362        return Err(status);
363    }
364
365    Ok(next.run(request).await)
366}
367
368/// Start the HTTP server
369pub async fn serve(args: ServeArgs) -> Result<()> {
370    let t0 = std::time::Instant::now();
371    tracing::info!("[startup] begin");
372    let config = Config::load().await?;
373    tracing::info!(
374        elapsed_ms = t0.elapsed().as_millis(),
375        "[startup] config loaded"
376    );
377    let mut cognition = CognitionRuntime::new_from_env();
378    tracing::info!(
379        elapsed_ms = t0.elapsed().as_millis(),
380        "[startup] cognition runtime created"
381    );
382
383    // Set up tool registry for cognition execution engine.
384    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
385    tracing::info!(
386        elapsed_ms = t0.elapsed().as_millis(),
387        "[startup] tools registered"
388    );
389    let cognition = Arc::new(cognition);
390
391    // Initialize audit log.
392    let audit_log = AuditLog::from_env();
393    let _ = audit::init_audit_log(audit_log.clone());
394
395    // Initialize K8s manager.
396    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
397    let k8s = Arc::new(K8sManager::new().await);
398    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
399    if k8s.is_available() {
400        tracing::info!("K8s self-deployment enabled");
401    }
402
403    // Initialize mandatory auth.
404    let auth_state = AuthState::from_env();
405    tracing::info!(
406        token_len = auth_state.token().len(),
407        "Auth is mandatory. Token required for all API endpoints."
408    );
409    tracing::info!(
410        audit_entries = audit_log.count().await,
411        "Audit log initialized"
412    );
413
414    // Create agent bus for in-process communication
415    let bus = AgentBus::new().into_arc();
416    tracing::info!(
417        elapsed_ms = t0.elapsed().as_millis(),
418        "[startup] bus created"
419    );
420
421    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
422        tracing::info!(
423            elapsed_ms = t0.elapsed().as_millis(),
424            "[startup] auto-starting cognition"
425        );
426        if let Err(error) = cognition.start(None).await {
427            tracing::warn!(%error, "Failed to auto-start cognition loop");
428        } else {
429            tracing::info!("Perpetual cognition auto-started");
430        }
431    }
432
433    tracing::info!(
434        elapsed_ms = t0.elapsed().as_millis(),
435        "[startup] building routes"
436    );
437    let addr = format!("{}:{}", args.hostname, args.port);
438
439    // Build the agent card
440    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
441    let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
442
443    // Build A2A router separately
444    let a2a_router = a2a_server.router();
445
446    // Start gRPC transport on a separate port
447    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
448        .ok()
449        .and_then(|p| p.parse::<u16>().ok())
450        .unwrap_or(50051);
451    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
452    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
453    let grpc_service = grpc_store.into_service();
454    tokio::spawn(async move {
455        tracing::info!("gRPC A2A server listening on {}", grpc_addr);
456        if let Err(e) = tonic::transport::Server::builder()
457            .add_service(grpc_service)
458            .serve(grpc_addr)
459            .await
460        {
461            tracing::error!("gRPC server error: {}", e);
462        }
463    });
464
465    let state = AppState {
466        config: Arc::new(config),
467        cognition,
468        audit_log,
469        k8s,
470        auth: auth_state.clone(),
471        bus,
472        knative_tasks: KnativeTaskQueue::new(),
473    };
474
475    let app = Router::new()
476        // Health check (public — auth exempt)
477        .route("/health", get(health))
478        // CloudEvent receiver for Knative Eventing (public — auth exempt)
479        .route("/task", post(receive_task_event))
480        // Knative task queue APIs
481        .route("/v1/knative/tasks", get(list_knative_tasks))
482        .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
483        .route(
484            "/v1/knative/tasks/{task_id}/claim",
485            post(claim_knative_task),
486        )
487        .route(
488            "/v1/knative/tasks/{task_id}/complete",
489            post(complete_knative_task),
490        )
491        // API routes
492        .route("/api/version", get(get_version))
493        .route("/api/session", get(list_sessions).post(create_session))
494        .route("/api/session/{id}", get(get_session))
495        .route("/api/session/{id}/prompt", post(prompt_session))
496        .route("/api/config", get(get_config))
497        .route("/api/provider", get(list_providers))
498        .route("/api/agent", get(list_agents))
499        // Perpetual cognition APIs
500        .route("/v1/cognition/start", post(start_cognition))
501        .route("/v1/cognition/stop", post(stop_cognition))
502        .route("/v1/cognition/status", get(get_cognition_status))
503        .route("/v1/cognition/stream", get(stream_cognition))
504        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
505        // Swarm persona lifecycle APIs
506        .route("/v1/swarm/personas", post(create_persona))
507        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
508        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
509        .route("/v1/swarm/lineage", get(get_swarm_lineage))
510        // Belief, attention, governance, workspace APIs
511        .route("/v1/cognition/beliefs", get(list_beliefs))
512        .route("/v1/cognition/beliefs/{id}", get(get_belief))
513        .route("/v1/cognition/attention", get(list_attention))
514        .route("/v1/cognition/proposals", get(list_proposals))
515        .route(
516            "/v1/cognition/proposals/{id}/approve",
517            post(approve_proposal),
518        )
519        .route("/v1/cognition/receipts", get(list_receipts))
520        .route("/v1/cognition/workspace", get(get_workspace))
521        .route("/v1/cognition/governance", get(get_governance))
522        .route("/v1/cognition/personas/{id}", get(get_persona))
523        // Audit trail API
524        .route("/v1/audit", get(list_audit_entries))
525        // Event stream replay API (for SOC 2, FedRAMP, ATO compliance)
526        .route("/v1/audit/replay", get(replay_session_events))
527        .route("/v1/audit/replay/index", get(list_session_event_files))
528        // K8s self-deployment APIs
529        .route("/v1/k8s/status", get(get_k8s_status))
530        .route("/v1/k8s/scale", post(k8s_scale))
531        .route("/v1/k8s/restart", post(k8s_restart))
532        .route("/v1/k8s/pods", get(k8s_list_pods))
533        .route("/v1/k8s/actions", get(k8s_actions))
534        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
535        .route(
536            "/v1/k8s/subagent/{id}",
537            axum::routing::delete(k8s_delete_subagent),
538        )
539        // Plugin registry API
540        .route("/v1/plugins", get(list_plugins))
541        .with_state(state.clone())
542        // A2A routes (nested to work with different state type)
543        .nest("/a2a", a2a_router)
544        // Mandatory auth middleware — applies to all routes
545        .layer(middleware::from_fn_with_state(
546            state.clone(),
547            audit_middleware,
548        ))
549        .layer(middleware::from_fn(policy_middleware))
550        .layer(middleware::from_fn(auth::require_auth))
551        .layer(axum::Extension(state.auth.clone()))
552        // CORS + tracing
553        .layer(
554            CorsLayer::new()
555                .allow_origin(AllowOrigin::mirror_request())
556                .allow_credentials(true)
557                .allow_methods(AllowMethods::mirror_request())
558                .allow_headers(AllowHeaders::mirror_request()),
559        )
560        .layer(TraceLayer::new_for_http());
561
562    tracing::info!(
563        elapsed_ms = t0.elapsed().as_millis(),
564        "[startup] router built, binding"
565    );
566    let listener = tokio::net::TcpListener::bind(&addr).await?;
567    tracing::info!(
568        elapsed_ms = t0.elapsed().as_millis(),
569        "[startup] listening on http://{}",
570        addr
571    );
572
573    axum::serve(listener, app).await?;
574
575    Ok(())
576}
577
578/// Health check response
579async fn health() -> &'static str {
580    "ok"
581}
582
583/// List all Knative tasks in queue
584async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
585    Json(state.knative_tasks.list().await)
586}
587
588/// Get a specific Knative task by ID
589async fn get_knative_task(
590    State(state): State<AppState>,
591    Path(task_id): Path<String>,
592) -> Result<Json<KnativeTask>, (StatusCode, String)> {
593    state
594        .knative_tasks
595        .get(&task_id)
596        .await
597        .map(Json)
598        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
599}
600
601/// Claim/start processing a Knative task
602async fn claim_knative_task(
603    State(state): State<AppState>,
604    Path(task_id): Path<String>,
605) -> Result<Json<KnativeTask>, (StatusCode, String)> {
606    // Update status to processing
607    let updated = state
608        .knative_tasks
609        .update_status(&task_id, "processing")
610        .await;
611    if !updated {
612        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
613    }
614
615    state
616        .knative_tasks
617        .get(&task_id)
618        .await
619        .map(Json)
620        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
621}
622
623/// Complete a Knative task
624async fn complete_knative_task(
625    State(state): State<AppState>,
626    Path(task_id): Path<String>,
627) -> Result<Json<KnativeTask>, (StatusCode, String)> {
628    // Update status to completed
629    let updated = state
630        .knative_tasks
631        .update_status(&task_id, "completed")
632        .await;
633    if !updated {
634        return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
635    }
636
637    state
638        .knative_tasks
639        .get(&task_id)
640        .await
641        .map(Json)
642        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
643}
644
645/// CloudEvent handler for Knative Eventing
646/// Receives task events from Knative Broker and triggers execution
647async fn receive_task_event(
648    State(state): State<AppState>,
649    Json(event): Json<CloudEvent>,
650) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
651    tracing::info!(
652        event_type = %event.event_type,
653        event_id = %event.id,
654        "Received CloudEvent from Knative"
655    );
656
657    // Log the incoming event for audit
658    state
659        .audit_log
660        .log(
661            audit::AuditCategory::Api,
662            format!("cloudevents:{}", event.event_type),
663            AuditOutcome::Success,
664            None,
665            None,
666        )
667        .await;
668
669    // Process based on event type
670    match event.event_type.as_str() {
671        "codetether.task.created" | "task.created" => {
672            // Extract task data and queue for execution
673            if let Some(data) = event.data {
674                let task_id = data
675                    .get("task_id")
676                    .and_then(|v| v.as_str())
677                    .unwrap_or("unknown")
678                    .to_string();
679                let title = data
680                    .get("title")
681                    .and_then(|v| v.as_str())
682                    .unwrap_or("")
683                    .to_string();
684                let description = data
685                    .get("description")
686                    .and_then(|v| v.as_str())
687                    .unwrap_or("")
688                    .to_string();
689                let agent_type = data
690                    .get("agent_type")
691                    .and_then(|v| v.as_str())
692                    .unwrap_or("build")
693                    .to_string();
694                let priority = data.get("priority").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
695
696                let task = KnativeTask {
697                    task_id: task_id.clone(),
698                    title,
699                    description,
700                    agent_type,
701                    priority,
702                    received_at: chrono::Utc::now(),
703                    status: "queued".to_string(),
704                };
705
706                state.knative_tasks.push(task).await;
707                tracing::info!(task_id = %task_id, "Task queued for execution");
708            }
709        }
710        "codetether.task.cancelled" => {
711            tracing::info!("Task cancellation event received");
712            // Update task status if we have the task_id
713            if let Some(data) = event.data {
714                if let Some(task_id) = data.get("task_id").and_then(|v| v.as_str()) {
715                    let _ = state
716                        .knative_tasks
717                        .update_status(task_id, "cancelled")
718                        .await;
719                    tracing::info!(task_id = %task_id, "Task cancelled");
720                }
721            }
722        }
723        _ => {
724            tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
725        }
726    }
727
728    Ok(Json(CloudEventResponse {
729        status: "accepted".to_string(),
730        event_id: event.id,
731    }))
732}
733
734/// CloudEvent structure (CloudEvents v1.0 spec)
735#[derive(Deserialize, Serialize)]
736struct CloudEvent {
737    /// Event unique identifier
738    id: String,
739    /// Event source (e.g., knative://broker/a2a-server)
740    source: String,
741    /// Event type (e.g., codetether.task.created)
742    #[serde(rename = "type")]
743    event_type: String,
744    /// Event timestamp (RFC 3339)
745    #[serde(rename = "time")]
746    timestamp: Option<String>,
747    /// CloudEvents spec version
748    #[serde(rename = "specversion")]
749    spec_version: Option<String>,
750    /// Event data payload
751    data: Option<serde_json::Value>,
752}
753
754/// Response to CloudEvent acknowledgment
755#[derive(Serialize)]
756struct CloudEventResponse {
757    status: String,
758    event_id: String,
759}
760
761/// Version info
762#[derive(Serialize)]
763struct VersionInfo {
764    version: &'static str,
765    name: &'static str,
766    binary_hash: Option<String>,
767}
768
769async fn get_version() -> Json<VersionInfo> {
770    let binary_hash = std::env::current_exe()
771        .ok()
772        .and_then(|p| hash_file(&p).ok());
773    Json(VersionInfo {
774        version: env!("CARGO_PKG_VERSION"),
775        name: env!("CARGO_PKG_NAME"),
776        binary_hash,
777    })
778}
779
780/// List sessions
781#[derive(Deserialize)]
782struct ListSessionsQuery {
783    limit: Option<usize>,
784    offset: Option<usize>,
785}
786
787async fn list_sessions(
788    Query(query): Query<ListSessionsQuery>,
789) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
790    let sessions = crate::session::list_sessions()
791        .await
792        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
793
794    let offset = query.offset.unwrap_or(0);
795    let limit = query.limit.unwrap_or(100);
796    Ok(Json(sessions.into_iter().skip(offset).take(limit).collect()))
797}
798
799/// Create a new session
800#[derive(Deserialize)]
801struct CreateSessionRequest {
802    title: Option<String>,
803    agent: Option<String>,
804}
805
806async fn create_session(
807    Json(req): Json<CreateSessionRequest>,
808) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
809    let mut session = crate::session::Session::new()
810        .await
811        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
812
813    session.title = req.title;
814    if let Some(agent) = req.agent {
815        session.agent = agent;
816    }
817
818    session
819        .save()
820        .await
821        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
822
823    Ok(Json(session))
824}
825
826/// Get a session by ID
827async fn get_session(
828    axum::extract::Path(id): axum::extract::Path<String>,
829) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
830    let session = crate::session::Session::load(&id)
831        .await
832        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
833
834    Ok(Json(session))
835}
836
837/// Prompt a session
838#[derive(Deserialize)]
839struct PromptRequest {
840    message: String,
841}
842
843async fn prompt_session(
844    axum::extract::Path(id): axum::extract::Path<String>,
845    Json(req): Json<PromptRequest>,
846) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
847    // Validate the message is not empty
848    if req.message.trim().is_empty() {
849        return Err((
850            StatusCode::BAD_REQUEST,
851            "Message cannot be empty".to_string(),
852        ));
853    }
854
855    // Log the prompt request (uses the message field)
856    tracing::info!(
857        session_id = %id,
858        message_len = req.message.len(),
859        "Received prompt request"
860    );
861
862    // TODO: Implement actual prompting
863    Err((
864        StatusCode::NOT_IMPLEMENTED,
865        "Prompt execution not yet implemented".to_string(),
866    ))
867}
868
869/// Get configuration
870async fn get_config(State(state): State<AppState>) -> Json<Config> {
871    Json((*state.config).clone())
872}
873
874/// List providers
875async fn list_providers() -> Json<Vec<String>> {
876    Json(vec![
877        "openai".to_string(),
878        "anthropic".to_string(),
879        "google".to_string(),
880    ])
881}
882
883/// List agents
884async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
885    let registry = crate::agent::AgentRegistry::with_builtins();
886    Json(registry.list().into_iter().cloned().collect())
887}
888
889async fn start_cognition(
890    State(state): State<AppState>,
891    payload: Option<Json<StartCognitionRequest>>,
892) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
893    state
894        .cognition
895        .start(payload.map(|Json(body)| body))
896        .await
897        .map(Json)
898        .map_err(internal_error)
899}
900
901async fn stop_cognition(
902    State(state): State<AppState>,
903    payload: Option<Json<StopCognitionRequest>>,
904) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
905    let reason = payload.and_then(|Json(body)| body.reason);
906    state
907        .cognition
908        .stop(reason)
909        .await
910        .map(Json)
911        .map_err(internal_error)
912}
913
914async fn get_cognition_status(
915    State(state): State<AppState>,
916) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
917    Ok(Json(state.cognition.status().await))
918}
919
920async fn stream_cognition(
921    State(state): State<AppState>,
922) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
923    let rx = state.cognition.subscribe_events();
924
925    let event_stream = stream::unfold(rx, |mut rx| async move {
926        match rx.recv().await {
927            Ok(event) => {
928                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
929                let sse_event = Event::default().event("cognition").data(payload);
930                Some((Ok(sse_event), rx))
931            }
932            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
933                let lag_event = Event::default()
934                    .event("lag")
935                    .data(format!("skipped {}", skipped));
936                Some((Ok(lag_event), rx))
937            }
938            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
939        }
940    });
941
942    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
943}
944
945async fn get_latest_snapshot(
946    State(state): State<AppState>,
947) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
948    match state.cognition.latest_snapshot().await {
949        Some(snapshot) => Ok(Json(snapshot)),
950        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
951    }
952}
953
954async fn create_persona(
955    State(state): State<AppState>,
956    Json(req): Json<CreatePersonaRequest>,
957) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
958    state
959        .cognition
960        .create_persona(req)
961        .await
962        .map(Json)
963        .map_err(internal_error)
964}
965
966async fn spawn_persona(
967    State(state): State<AppState>,
968    Path(id): Path<String>,
969    Json(req): Json<SpawnPersonaRequest>,
970) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
971    state
972        .cognition
973        .spawn_child(&id, req)
974        .await
975        .map(Json)
976        .map_err(internal_error)
977}
978
979async fn reap_persona(
980    State(state): State<AppState>,
981    Path(id): Path<String>,
982    payload: Option<Json<ReapPersonaRequest>>,
983) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
984    let req = payload
985        .map(|Json(body)| body)
986        .unwrap_or(ReapPersonaRequest {
987            cascade: Some(false),
988            reason: None,
989        });
990
991    state
992        .cognition
993        .reap_persona(&id, req)
994        .await
995        .map(Json)
996        .map_err(internal_error)
997}
998
999async fn get_swarm_lineage(
1000    State(state): State<AppState>,
1001) -> Result<Json<LineageGraph>, (StatusCode, String)> {
1002    Ok(Json(state.cognition.lineage_graph().await))
1003}
1004
1005// ── Belief, Attention, Governance, Workspace handlers ──
1006
1007#[derive(Deserialize)]
1008struct BeliefFilter {
1009    status: Option<String>,
1010    persona: Option<String>,
1011}
1012
1013async fn list_beliefs(
1014    State(state): State<AppState>,
1015    Query(filter): Query<BeliefFilter>,
1016) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
1017    let beliefs = state.cognition.get_beliefs().await;
1018    let mut result: Vec<Belief> = beliefs.into_values().collect();
1019
1020    if let Some(status) = &filter.status {
1021        result.retain(|b| {
1022            let s = serde_json::to_string(&b.status).unwrap_or_default();
1023            s.contains(status)
1024        });
1025    }
1026    if let Some(persona) = &filter.persona {
1027        result.retain(|b| &b.asserted_by == persona);
1028    }
1029
1030    result.sort_by(|a, b| {
1031        b.confidence
1032            .partial_cmp(&a.confidence)
1033            .unwrap_or(std::cmp::Ordering::Equal)
1034    });
1035    Ok(Json(result))
1036}
1037
1038async fn get_belief(
1039    State(state): State<AppState>,
1040    Path(id): Path<String>,
1041) -> Result<Json<Belief>, (StatusCode, String)> {
1042    match state.cognition.get_belief(&id).await {
1043        Some(belief) => Ok(Json(belief)),
1044        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
1045    }
1046}
1047
1048async fn list_attention(
1049    State(state): State<AppState>,
1050) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
1051    Ok(Json(state.cognition.get_attention_queue().await))
1052}
1053
1054async fn list_proposals(
1055    State(state): State<AppState>,
1056) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
1057    let proposals = state.cognition.get_proposals().await;
1058    let mut result: Vec<Proposal> = proposals.into_values().collect();
1059    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1060    Ok(Json(result))
1061}
1062
1063async fn approve_proposal(
1064    State(state): State<AppState>,
1065    Path(id): Path<String>,
1066) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
1067    state
1068        .cognition
1069        .approve_proposal(&id)
1070        .await
1071        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
1072        .map_err(internal_error)
1073}
1074
1075async fn list_receipts(
1076    State(state): State<AppState>,
1077) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
1078    Ok(Json(state.cognition.get_receipts().await))
1079}
1080
1081async fn get_workspace(
1082    State(state): State<AppState>,
1083) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
1084    Ok(Json(state.cognition.get_workspace().await))
1085}
1086
1087async fn get_governance(
1088    State(state): State<AppState>,
1089) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
1090    Ok(Json(state.cognition.get_governance().await))
1091}
1092
1093async fn get_persona(
1094    State(state): State<AppState>,
1095    axum::extract::Path(id): axum::extract::Path<String>,
1096) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
1097    state
1098        .cognition
1099        .get_persona(&id)
1100        .await
1101        .map(Json)
1102        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
1103}
1104
1105// ── Audit trail endpoints ──
1106
1107#[derive(Deserialize)]
1108struct AuditQuery {
1109    limit: Option<usize>,
1110    category: Option<String>,
1111}
1112
1113async fn list_audit_entries(
1114    State(state): State<AppState>,
1115    Query(query): Query<AuditQuery>,
1116) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
1117    let limit = query.limit.unwrap_or(100).min(1000);
1118
1119    let entries = if let Some(ref cat) = query.category {
1120        let category = match cat.as_str() {
1121            "api" => AuditCategory::Api,
1122            "tool" | "tool_execution" => AuditCategory::ToolExecution,
1123            "session" => AuditCategory::Session,
1124            "cognition" => AuditCategory::Cognition,
1125            "swarm" => AuditCategory::Swarm,
1126            "auth" => AuditCategory::Auth,
1127            "k8s" => AuditCategory::K8s,
1128            "sandbox" => AuditCategory::Sandbox,
1129            "config" => AuditCategory::Config,
1130            _ => {
1131                return Err((
1132                    StatusCode::BAD_REQUEST,
1133                    format!("Unknown category: {}", cat),
1134                ));
1135            }
1136        };
1137        state.audit_log.by_category(category, limit).await
1138    } else {
1139        state.audit_log.recent(limit).await
1140    };
1141
1142    Ok(Json(entries))
1143}
1144
1145// ── Event Stream Replay API ──
1146// Enables auditors to reconstruct sessions from byte-range offsets.
1147// This is the key compliance feature for SOC 2, FedRAMP, and ATO processes.
1148
1149#[derive(Deserialize)]
1150struct ReplayQuery {
1151    /// Session ID to replay
1152    session_id: String,
1153    /// Optional: starting byte offset (for seeking to specific point)
1154    start_offset: Option<u64>,
1155    /// Optional: ending byte offset
1156    end_offset: Option<u64>,
1157    /// Optional: limit number of events to return
1158    limit: Option<usize>,
1159    /// Optional: filter by tool name
1160    tool_name: Option<String>,
1161}
1162
1163/// Replay session events from the JSONL event stream by byte-range offsets.
1164/// This allows auditors to reconstruct exactly what happened in a session,
1165/// including tool execution durations and success/failure status.
1166async fn replay_session_events(
1167    Query(query): Query<ReplayQuery>,
1168) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
1169    use std::path::PathBuf;
1170
1171    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1172        .map(PathBuf::from)
1173        .ok()
1174        .ok_or_else(|| {
1175            (
1176                StatusCode::SERVICE_UNAVAILABLE,
1177                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1178            )
1179        })?;
1180
1181    let session_dir = base_dir.join(&query.session_id);
1182
1183    // Check if session directory exists
1184    if !session_dir.exists() {
1185        return Err((
1186            StatusCode::NOT_FOUND,
1187            format!("Session not found: {}", query.session_id),
1188        ));
1189    }
1190
1191    let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
1192
1193    // Read all event files in the session directory using std::fs for simplicity
1194    let entries = std::fs::read_dir(&session_dir)
1195        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1196
1197    for entry in entries {
1198        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1199        let path = entry.path();
1200
1201        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1202            continue;
1203        }
1204
1205        // Parse byte range from filename: {timestamp}-chat-events-{start}-{end}.jsonl
1206        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1207
1208        if let Some(offsets) = filename
1209            .strip_prefix("T")
1210            .or_else(|| filename.strip_prefix("202"))
1211        {
1212            // Extract start and end offsets from filename
1213            let parts: Vec<&str> = offsets.split('-').collect();
1214            if parts.len() >= 4 {
1215                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1216                let end: u64 = parts[parts.len() - 1]
1217                    .trim_end_matches(".jsonl")
1218                    .parse()
1219                    .unwrap_or(0);
1220
1221                // Filter by byte range if specified
1222                if let Some(query_start) = query.start_offset {
1223                    if end <= query_start {
1224                        continue;
1225                    }
1226                }
1227                if let Some(query_end) = query.end_offset {
1228                    if start >= query_end {
1229                        continue;
1230                    }
1231                }
1232
1233                // Read and parse events from this file
1234                let content = std::fs::read_to_string(&path)
1235                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1236
1237                for line in content.lines() {
1238                    if line.trim().is_empty() {
1239                        continue;
1240                    }
1241                    if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
1242                        // Filter by tool name if specified
1243                        if let Some(ref tool_filter) = query.tool_name {
1244                            if let Some(event_tool) =
1245                                event.get("tool_name").and_then(|v| v.as_str())
1246                            {
1247                                if event_tool != tool_filter {
1248                                    continue;
1249                                }
1250                            } else {
1251                                continue;
1252                            }
1253                        }
1254                        all_events.push((start, end, event));
1255                    }
1256                }
1257            }
1258        }
1259    }
1260
1261    // Sort by start offset
1262    all_events.sort_by_key(|(s, _, _)| *s);
1263
1264    // Apply limit
1265    let limit = query.limit.unwrap_or(1000).min(10000);
1266    let events: Vec<_> = all_events
1267        .into_iter()
1268        .take(limit)
1269        .map(|(_, _, e)| e)
1270        .collect();
1271
1272    Ok(Json(events))
1273}
1274
1275/// Get session event files metadata (for audit index)
1276async fn list_session_event_files(
1277    Query(query): Query<ReplayQuery>,
1278) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
1279    use std::path::PathBuf;
1280
1281    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1282        .map(PathBuf::from)
1283        .ok()
1284        .ok_or_else(|| {
1285            (
1286                StatusCode::SERVICE_UNAVAILABLE,
1287                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1288            )
1289        })?;
1290
1291    let session_dir = base_dir.join(&query.session_id);
1292    if !session_dir.exists() {
1293        return Err((
1294            StatusCode::NOT_FOUND,
1295            format!("Session not found: {}", query.session_id),
1296        ));
1297    }
1298
1299    let mut files: Vec<EventFileMeta> = Vec::new();
1300
1301    // Use std::fs for simplicity
1302    let entries = std::fs::read_dir(&session_dir)
1303        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1304
1305    for entry in entries {
1306        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1307        let path = entry.path();
1308
1309        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1310            continue;
1311        }
1312
1313        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1314
1315        // Parse byte range from filename
1316        if let Some(offsets) = filename
1317            .strip_prefix("T")
1318            .or_else(|| filename.strip_prefix("202"))
1319        {
1320            let parts: Vec<&str> = offsets.split('-').collect();
1321            if parts.len() >= 4 {
1322                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1323                let end: u64 = parts[parts.len() - 1]
1324                    .trim_end_matches(".jsonl")
1325                    .parse()
1326                    .unwrap_or(0);
1327
1328                let metadata = std::fs::metadata(&path)
1329                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1330
1331                files.push(EventFileMeta {
1332                    filename: filename.to_string(),
1333                    start_offset: start,
1334                    end_offset: end,
1335                    size_bytes: metadata.len(),
1336                });
1337            }
1338        }
1339    }
1340
1341    files.sort_by_key(|f| f.start_offset);
1342    Ok(Json(files))
1343}
1344
1345#[derive(Serialize)]
1346struct EventFileMeta {
1347    filename: String,
1348    start_offset: u64,
1349    end_offset: u64,
1350    size_bytes: u64,
1351}
1352
1353// ── K8s self-deployment endpoints ──
1354
1355async fn get_k8s_status(
1356    State(state): State<AppState>,
1357) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
1358    Ok(Json(state.k8s.status().await))
1359}
1360
1361#[derive(Deserialize)]
1362struct ScaleRequest {
1363    replicas: i32,
1364}
1365
1366async fn k8s_scale(
1367    State(state): State<AppState>,
1368    Json(req): Json<ScaleRequest>,
1369) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1370    if req.replicas < 0 || req.replicas > 100 {
1371        return Err((
1372            StatusCode::BAD_REQUEST,
1373            "Replicas must be between 0 and 100".to_string(),
1374        ));
1375    }
1376
1377    state
1378        .audit_log
1379        .log(
1380            AuditCategory::K8s,
1381            format!("scale:{}", req.replicas),
1382            AuditOutcome::Success,
1383            None,
1384            None,
1385        )
1386        .await;
1387
1388    state
1389        .k8s
1390        .scale(req.replicas)
1391        .await
1392        .map(Json)
1393        .map_err(internal_error)
1394}
1395
1396async fn k8s_restart(
1397    State(state): State<AppState>,
1398) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1399    state
1400        .audit_log
1401        .log(
1402            AuditCategory::K8s,
1403            "rolling_restart",
1404            AuditOutcome::Success,
1405            None,
1406            None,
1407        )
1408        .await;
1409
1410    state
1411        .k8s
1412        .rolling_restart()
1413        .await
1414        .map(Json)
1415        .map_err(internal_error)
1416}
1417
1418async fn k8s_list_pods(
1419    State(state): State<AppState>,
1420) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
1421    state
1422        .k8s
1423        .list_pods()
1424        .await
1425        .map(Json)
1426        .map_err(internal_error)
1427}
1428
1429async fn k8s_actions(
1430    State(state): State<AppState>,
1431) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
1432    Ok(Json(state.k8s.recent_actions(100).await))
1433}
1434
1435#[derive(Deserialize)]
1436struct SpawnSubagentRequest {
1437    subagent_id: String,
1438    #[serde(default)]
1439    image: Option<String>,
1440    #[serde(default)]
1441    env_vars: std::collections::HashMap<String, String>,
1442}
1443
1444async fn k8s_spawn_subagent(
1445    State(state): State<AppState>,
1446    Json(req): Json<SpawnSubagentRequest>,
1447) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1448    state
1449        .k8s
1450        .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
1451        .await
1452        .map(Json)
1453        .map_err(internal_error)
1454}
1455
1456async fn k8s_delete_subagent(
1457    State(state): State<AppState>,
1458    axum::extract::Path(id): axum::extract::Path<String>,
1459) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1460    state
1461        .k8s
1462        .delete_subagent_pod(&id)
1463        .await
1464        .map(Json)
1465        .map_err(internal_error)
1466}
1467
1468/// List registered plugins.
1469async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
1470    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
1471    let signing_key = SigningKey::from_env();
1472    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
1473    Json(PluginListResponse {
1474        server_fingerprint,
1475        signing_available: !test_sig.is_empty(),
1476        plugins: Vec::<PluginManifest>::new(),
1477    })
1478}
1479
1480#[derive(Serialize)]
1481struct PluginListResponse {
1482    server_fingerprint: String,
1483    signing_available: bool,
1484    plugins: Vec<PluginManifest>,
1485}
1486
1487fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
1488    let message = error.to_string();
1489    if message.contains("not found") {
1490        return (StatusCode::NOT_FOUND, message);
1491    }
1492    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1493        return (StatusCode::BAD_REQUEST, message);
1494    }
1495    (StatusCode::INTERNAL_SERVER_ERROR, message)
1496}
1497
1498fn env_bool(name: &str, default: bool) -> bool {
1499    std::env::var(name)
1500        .ok()
1501        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1502            "1" | "true" | "yes" | "on" => Some(true),
1503            "0" | "false" | "no" | "off" => Some(false),
1504            _ => None,
1505        })
1506        .unwrap_or(default)
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511    use super::match_policy_rule;
1512
1513    #[test]
1514    fn policy_prompt_session_requires_execute_permission() {
1515        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1516        assert_eq!(permission, Some("agent:execute"));
1517    }
1518
1519    #[test]
1520    fn policy_create_session_keeps_sessions_write_permission() {
1521        let permission = match_policy_rule("/api/session", "POST");
1522        assert_eq!(permission, Some("sessions:write"));
1523    }
1524
1525    #[test]
1526    fn policy_proposal_approval_requires_execute_permission() {
1527        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1528        assert_eq!(permission, Some("agent:execute"));
1529    }
1530}