Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16    executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24    Router,
25    body::Body,
26    extract::Path,
27    extract::{Query, State},
28    http::{Request, StatusCode},
29    middleware::{self, Next},
30    response::sse::{Event, KeepAlive, Sse},
31    response::{Json, Response},
32    routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41/// Server state shared across handlers
42#[derive(Clone)]
43pub struct AppState {
44    pub config: Arc<Config>,
45    pub cognition: Arc<CognitionRuntime>,
46    pub audit_log: AuditLog,
47    pub k8s: Arc<K8sManager>,
48    pub auth: AuthState,
49    pub bus: Arc<AgentBus>,
50}
51
52/// Audit middleware — logs every request/response to the audit trail.
53async fn audit_middleware(
54    State(state): State<AppState>,
55    request: Request<Body>,
56    next: Next,
57) -> Response {
58    let method = request.method().clone();
59    let path = request.uri().path().to_string();
60    let started = std::time::Instant::now();
61
62    let response = next.run(request).await;
63
64    let duration_ms = started.elapsed().as_millis() as u64;
65    let status = response.status().as_u16();
66    let outcome = if status < 400 {
67        AuditOutcome::Success
68    } else if status == 401 || status == 403 {
69        AuditOutcome::Denied
70    } else {
71        AuditOutcome::Failure
72    };
73
74    state
75        .audit_log
76        .record(audit::AuditEntry {
77            id: uuid::Uuid::new_v4().to_string(),
78            timestamp: chrono::Utc::now(),
79            category: AuditCategory::Api,
80            action: format!("{} {}", method, path),
81            principal: None,
82            outcome,
83            detail: Some(serde_json::json!({ "status": status })),
84            duration_ms: Some(duration_ms),
85        })
86        .await;
87
88    response
89}
90
91/// Mapping from (path pattern, HTTP method) → OPA permission action.
92/// The first matching rule wins.
93struct PolicyRule {
94    pattern: &'static str,
95    methods: Option<&'static [&'static str]>,
96    permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100    // Public / exempt
101    PolicyRule {
102        pattern: "/health",
103        methods: None,
104        permission: "",
105    },
106    PolicyRule {
107        pattern: "/a2a/",
108        methods: None,
109        permission: "",
110    },
111    // K8s management — admin only
112    PolicyRule {
113        pattern: "/v1/k8s/scale",
114        methods: Some(&["POST"]),
115        permission: "admin:access",
116    },
117    PolicyRule {
118        pattern: "/v1/k8s/restart",
119        methods: Some(&["POST"]),
120        permission: "admin:access",
121    },
122    PolicyRule {
123        pattern: "/v1/k8s/",
124        methods: Some(&["GET"]),
125        permission: "admin:access",
126    },
127    // K8s sub-agent lifecycle — admin only
128    PolicyRule {
129        pattern: "/v1/k8s/subagent",
130        methods: Some(&["POST", "DELETE"]),
131        permission: "admin:access",
132    },
133    // Plugin registry — read
134    PolicyRule {
135        pattern: "/v1/plugins",
136        methods: Some(&["GET"]),
137        permission: "agent:read",
138    },
139    // Audit — admin
140    PolicyRule {
141        pattern: "/v1/audit",
142        methods: None,
143        permission: "admin:access",
144    },
145    // Event stream replay — admin (compliance feature)
146    PolicyRule {
147        pattern: "/v1/audit/replay",
148        methods: Some(&["GET"]),
149        permission: "admin:access",
150    },
151    // Cognition — write operations
152    PolicyRule {
153        pattern: "/v1/cognition/start",
154        methods: Some(&["POST"]),
155        permission: "agent:execute",
156    },
157    PolicyRule {
158        pattern: "/v1/cognition/stop",
159        methods: Some(&["POST"]),
160        permission: "agent:execute",
161    },
162    PolicyRule {
163        pattern: "/v1/cognition/",
164        methods: Some(&["GET"]),
165        permission: "agent:read",
166    },
167    // Swarm persona lifecycle
168    PolicyRule {
169        pattern: "/v1/swarm/personas",
170        methods: Some(&["POST"]),
171        permission: "agent:execute",
172    },
173    PolicyRule {
174        pattern: "/v1/swarm/",
175        methods: Some(&["POST"]),
176        permission: "agent:execute",
177    },
178    PolicyRule {
179        pattern: "/v1/swarm/",
180        methods: Some(&["GET"]),
181        permission: "agent:read",
182    },
183    // Session management
184    // Session prompt execution — requires execute permission
185    PolicyRule {
186        pattern: "/api/session/",
187        methods: Some(&["POST"]),
188        permission: "agent:execute",
189    },
190    PolicyRule {
191        pattern: "/api/session",
192        methods: Some(&["POST"]),
193        permission: "sessions:write",
194    },
195    PolicyRule {
196        pattern: "/api/session",
197        methods: Some(&["GET"]),
198        permission: "sessions:read",
199    },
200    // Proposal approval — governance action
201    PolicyRule {
202        pattern: "/v1/cognition/proposals/",
203        methods: Some(&["POST"]),
204        permission: "agent:execute",
205    },
206    // Config, version, providers, agents — read
207    PolicyRule {
208        pattern: "/api/version",
209        methods: None,
210        permission: "agent:read",
211    },
212    PolicyRule {
213        pattern: "/api/config",
214        methods: None,
215        permission: "agent:read",
216    },
217    PolicyRule {
218        pattern: "/api/provider",
219        methods: None,
220        permission: "agent:read",
221    },
222    PolicyRule {
223        pattern: "/api/agent",
224        methods: None,
225        permission: "agent:read",
226    },
227];
228
229/// Find the required permission for a given path + method.
230/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
231fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
232    for rule in POLICY_RULES {
233        let matches = if rule.pattern.ends_with('/') {
234            path.starts_with(rule.pattern)
235        } else {
236            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
237        };
238        if matches {
239            if let Some(allowed_methods) = rule.methods {
240                if !allowed_methods.contains(&method) {
241                    continue;
242                }
243            }
244            return Some(rule.permission);
245        }
246    }
247    None
248}
249
250/// Policy authorization middleware for Axum.
251///
252/// Maps request paths to OPA permission strings and enforces authorization.
253/// Runs after `require_auth` so the bearer token is already validated.
254/// Currently maps the static bearer token to an admin role since
255/// codetether-agent uses a single shared token model.
256async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
257    let path = request.uri().path().to_string();
258    let method = request.method().as_str().to_string();
259
260    let permission = match match_policy_rule(&path, &method) {
261        None | Some("") => return Ok(next.run(request).await),
262        Some(perm) => perm,
263    };
264
265    // The current auth model uses a single static token for all access.
266    // When this is the case, the authenticated user effectively has admin role.
267    // Future: extract user claims from JWT and build a proper PolicyUser.
268    let user = policy::PolicyUser {
269        user_id: "bearer-token-user".to_string(),
270        roles: vec!["admin".to_string()],
271        tenant_id: None,
272        scopes: vec![],
273        auth_source: "static_token".to_string(),
274    };
275
276    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
277        tracing::warn!(
278            path = %path,
279            method = %method,
280            permission = %permission,
281            "Policy middleware denied request"
282        );
283        return Err(status);
284    }
285
286    Ok(next.run(request).await)
287}
288
289/// Start the HTTP server
290pub async fn serve(args: ServeArgs) -> Result<()> {
291    let t0 = std::time::Instant::now();
292    tracing::info!("[startup] begin");
293    let config = Config::load().await?;
294    tracing::info!(
295        elapsed_ms = t0.elapsed().as_millis(),
296        "[startup] config loaded"
297    );
298    let mut cognition = CognitionRuntime::new_from_env();
299    tracing::info!(
300        elapsed_ms = t0.elapsed().as_millis(),
301        "[startup] cognition runtime created"
302    );
303
304    // Set up tool registry for cognition execution engine.
305    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
306    tracing::info!(
307        elapsed_ms = t0.elapsed().as_millis(),
308        "[startup] tools registered"
309    );
310    let cognition = Arc::new(cognition);
311
312    // Initialize audit log.
313    let audit_log = AuditLog::from_env();
314    let _ = audit::init_audit_log(audit_log.clone());
315
316    // Initialize K8s manager.
317    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
318    let k8s = Arc::new(K8sManager::new().await);
319    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
320    if k8s.is_available() {
321        tracing::info!("K8s self-deployment enabled");
322    }
323
324    // Initialize mandatory auth.
325    let auth_state = AuthState::from_env();
326    tracing::info!(
327        token_len = auth_state.token().len(),
328        "Auth is mandatory. Token required for all API endpoints."
329    );
330    tracing::info!(
331        audit_entries = audit_log.count().await,
332        "Audit log initialized"
333    );
334
335    // Create agent bus for in-process communication
336    let bus = AgentBus::new().into_arc();
337    tracing::info!(
338        elapsed_ms = t0.elapsed().as_millis(),
339        "[startup] bus created"
340    );
341
342    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
343        tracing::info!(
344            elapsed_ms = t0.elapsed().as_millis(),
345            "[startup] auto-starting cognition"
346        );
347        if let Err(error) = cognition.start(None).await {
348            tracing::warn!(%error, "Failed to auto-start cognition loop");
349        } else {
350            tracing::info!("Perpetual cognition auto-started");
351        }
352    }
353
354    tracing::info!(
355        elapsed_ms = t0.elapsed().as_millis(),
356        "[startup] building routes"
357    );
358    let addr = format!("{}:{}", args.hostname, args.port);
359
360    // Build the agent card
361    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
362    let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
363
364    // Build A2A router separately
365    let a2a_router = a2a_server.router();
366
367    // Start gRPC transport on a separate port
368    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
369        .ok()
370        .and_then(|p| p.parse::<u16>().ok())
371        .unwrap_or(50051);
372    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
373    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
374    let grpc_service = grpc_store.into_service();
375    tokio::spawn(async move {
376        tracing::info!("gRPC A2A server listening on {}", grpc_addr);
377        if let Err(e) = tonic::transport::Server::builder()
378            .add_service(grpc_service)
379            .serve(grpc_addr)
380            .await
381        {
382            tracing::error!("gRPC server error: {}", e);
383        }
384    });
385
386    let state = AppState {
387        config: Arc::new(config),
388        cognition,
389        audit_log,
390        k8s,
391        auth: auth_state.clone(),
392        bus,
393    };
394
395    let app = Router::new()
396        // Health check (public — auth exempt)
397        .route("/health", get(health))
398        // API routes
399        .route("/api/version", get(get_version))
400        .route("/api/session", get(list_sessions).post(create_session))
401        .route("/api/session/{id}", get(get_session))
402        .route("/api/session/{id}/prompt", post(prompt_session))
403        .route("/api/config", get(get_config))
404        .route("/api/provider", get(list_providers))
405        .route("/api/agent", get(list_agents))
406        // Perpetual cognition APIs
407        .route("/v1/cognition/start", post(start_cognition))
408        .route("/v1/cognition/stop", post(stop_cognition))
409        .route("/v1/cognition/status", get(get_cognition_status))
410        .route("/v1/cognition/stream", get(stream_cognition))
411        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
412        // Swarm persona lifecycle APIs
413        .route("/v1/swarm/personas", post(create_persona))
414        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
415        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
416        .route("/v1/swarm/lineage", get(get_swarm_lineage))
417        // Belief, attention, governance, workspace APIs
418        .route("/v1/cognition/beliefs", get(list_beliefs))
419        .route("/v1/cognition/beliefs/{id}", get(get_belief))
420        .route("/v1/cognition/attention", get(list_attention))
421        .route("/v1/cognition/proposals", get(list_proposals))
422        .route(
423            "/v1/cognition/proposals/{id}/approve",
424            post(approve_proposal),
425        )
426        .route("/v1/cognition/receipts", get(list_receipts))
427        .route("/v1/cognition/workspace", get(get_workspace))
428        .route("/v1/cognition/governance", get(get_governance))
429        .route("/v1/cognition/personas/{id}", get(get_persona))
430        // Audit trail API
431        .route("/v1/audit", get(list_audit_entries))
432        // Event stream replay API (for SOC 2, FedRAMP, ATO compliance)
433        .route("/v1/audit/replay", get(replay_session_events))
434        .route("/v1/audit/replay/index", get(list_session_event_files))
435        // K8s self-deployment APIs
436        .route("/v1/k8s/status", get(get_k8s_status))
437        .route("/v1/k8s/scale", post(k8s_scale))
438        .route("/v1/k8s/restart", post(k8s_restart))
439        .route("/v1/k8s/pods", get(k8s_list_pods))
440        .route("/v1/k8s/actions", get(k8s_actions))
441        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
442        .route(
443            "/v1/k8s/subagent/{id}",
444            axum::routing::delete(k8s_delete_subagent),
445        )
446        // Plugin registry API
447        .route("/v1/plugins", get(list_plugins))
448        .with_state(state.clone())
449        // A2A routes (nested to work with different state type)
450        .nest("/a2a", a2a_router)
451        // Mandatory auth middleware — applies to all routes
452        .layer(middleware::from_fn_with_state(
453            state.clone(),
454            audit_middleware,
455        ))
456        .layer(axum::Extension(state.auth.clone()))
457        .layer(middleware::from_fn(policy_middleware))
458        .layer(middleware::from_fn(auth::require_auth))
459        // CORS + tracing
460        .layer(
461            CorsLayer::new()
462                .allow_origin(AllowOrigin::mirror_request())
463                .allow_credentials(true)
464                .allow_methods(AllowMethods::mirror_request())
465                .allow_headers(AllowHeaders::mirror_request()),
466        )
467        .layer(TraceLayer::new_for_http());
468
469    tracing::info!(
470        elapsed_ms = t0.elapsed().as_millis(),
471        "[startup] router built, binding"
472    );
473    let listener = tokio::net::TcpListener::bind(&addr).await?;
474    tracing::info!(
475        elapsed_ms = t0.elapsed().as_millis(),
476        "[startup] listening on http://{}",
477        addr
478    );
479
480    axum::serve(listener, app).await?;
481
482    Ok(())
483}
484
485/// Health check response
486async fn health() -> &'static str {
487    "ok"
488}
489
490/// Version info
491#[derive(Serialize)]
492struct VersionInfo {
493    version: &'static str,
494    name: &'static str,
495    binary_hash: Option<String>,
496}
497
498async fn get_version() -> Json<VersionInfo> {
499    let binary_hash = std::env::current_exe()
500        .ok()
501        .and_then(|p| hash_file(&p).ok());
502    Json(VersionInfo {
503        version: env!("CARGO_PKG_VERSION"),
504        name: env!("CARGO_PKG_NAME"),
505        binary_hash,
506    })
507}
508
509/// List sessions
510#[derive(Deserialize)]
511struct ListSessionsQuery {
512    limit: Option<usize>,
513}
514
515async fn list_sessions(
516    Query(query): Query<ListSessionsQuery>,
517) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
518    let sessions = crate::session::list_sessions()
519        .await
520        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
521
522    let limit = query.limit.unwrap_or(50);
523    Ok(Json(sessions.into_iter().take(limit).collect()))
524}
525
526/// Create a new session
527#[derive(Deserialize)]
528struct CreateSessionRequest {
529    title: Option<String>,
530    agent: Option<String>,
531}
532
533async fn create_session(
534    Json(req): Json<CreateSessionRequest>,
535) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
536    let mut session = crate::session::Session::new()
537        .await
538        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
539
540    session.title = req.title;
541    if let Some(agent) = req.agent {
542        session.agent = agent;
543    }
544
545    session
546        .save()
547        .await
548        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
549
550    Ok(Json(session))
551}
552
553/// Get a session by ID
554async fn get_session(
555    axum::extract::Path(id): axum::extract::Path<String>,
556) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
557    let session = crate::session::Session::load(&id)
558        .await
559        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
560
561    Ok(Json(session))
562}
563
564/// Prompt a session
565#[derive(Deserialize)]
566struct PromptRequest {
567    message: String,
568}
569
570async fn prompt_session(
571    axum::extract::Path(id): axum::extract::Path<String>,
572    Json(req): Json<PromptRequest>,
573) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
574    // Validate the message is not empty
575    if req.message.trim().is_empty() {
576        return Err((
577            StatusCode::BAD_REQUEST,
578            "Message cannot be empty".to_string(),
579        ));
580    }
581
582    // Log the prompt request (uses the message field)
583    tracing::info!(
584        session_id = %id,
585        message_len = req.message.len(),
586        "Received prompt request"
587    );
588
589    // TODO: Implement actual prompting
590    Err((
591        StatusCode::NOT_IMPLEMENTED,
592        "Prompt execution not yet implemented".to_string(),
593    ))
594}
595
596/// Get configuration
597async fn get_config(State(state): State<AppState>) -> Json<Config> {
598    Json((*state.config).clone())
599}
600
601/// List providers
602async fn list_providers() -> Json<Vec<String>> {
603    Json(vec![
604        "openai".to_string(),
605        "anthropic".to_string(),
606        "google".to_string(),
607    ])
608}
609
610/// List agents
611async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
612    let registry = crate::agent::AgentRegistry::with_builtins();
613    Json(registry.list().into_iter().cloned().collect())
614}
615
616async fn start_cognition(
617    State(state): State<AppState>,
618    payload: Option<Json<StartCognitionRequest>>,
619) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
620    state
621        .cognition
622        .start(payload.map(|Json(body)| body))
623        .await
624        .map(Json)
625        .map_err(internal_error)
626}
627
628async fn stop_cognition(
629    State(state): State<AppState>,
630    payload: Option<Json<StopCognitionRequest>>,
631) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
632    let reason = payload.and_then(|Json(body)| body.reason);
633    state
634        .cognition
635        .stop(reason)
636        .await
637        .map(Json)
638        .map_err(internal_error)
639}
640
641async fn get_cognition_status(
642    State(state): State<AppState>,
643) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
644    Ok(Json(state.cognition.status().await))
645}
646
647async fn stream_cognition(
648    State(state): State<AppState>,
649) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
650    let rx = state.cognition.subscribe_events();
651
652    let event_stream = stream::unfold(rx, |mut rx| async move {
653        match rx.recv().await {
654            Ok(event) => {
655                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
656                let sse_event = Event::default().event("cognition").data(payload);
657                Some((Ok(sse_event), rx))
658            }
659            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
660                let lag_event = Event::default()
661                    .event("lag")
662                    .data(format!("skipped {}", skipped));
663                Some((Ok(lag_event), rx))
664            }
665            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
666        }
667    });
668
669    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
670}
671
672async fn get_latest_snapshot(
673    State(state): State<AppState>,
674) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
675    match state.cognition.latest_snapshot().await {
676        Some(snapshot) => Ok(Json(snapshot)),
677        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
678    }
679}
680
681async fn create_persona(
682    State(state): State<AppState>,
683    Json(req): Json<CreatePersonaRequest>,
684) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
685    state
686        .cognition
687        .create_persona(req)
688        .await
689        .map(Json)
690        .map_err(internal_error)
691}
692
693async fn spawn_persona(
694    State(state): State<AppState>,
695    Path(id): Path<String>,
696    Json(req): Json<SpawnPersonaRequest>,
697) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
698    state
699        .cognition
700        .spawn_child(&id, req)
701        .await
702        .map(Json)
703        .map_err(internal_error)
704}
705
706async fn reap_persona(
707    State(state): State<AppState>,
708    Path(id): Path<String>,
709    payload: Option<Json<ReapPersonaRequest>>,
710) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
711    let req = payload
712        .map(|Json(body)| body)
713        .unwrap_or(ReapPersonaRequest {
714            cascade: Some(false),
715            reason: None,
716        });
717
718    state
719        .cognition
720        .reap_persona(&id, req)
721        .await
722        .map(Json)
723        .map_err(internal_error)
724}
725
726async fn get_swarm_lineage(
727    State(state): State<AppState>,
728) -> Result<Json<LineageGraph>, (StatusCode, String)> {
729    Ok(Json(state.cognition.lineage_graph().await))
730}
731
732// ── Belief, Attention, Governance, Workspace handlers ──
733
734#[derive(Deserialize)]
735struct BeliefFilter {
736    status: Option<String>,
737    persona: Option<String>,
738}
739
740async fn list_beliefs(
741    State(state): State<AppState>,
742    Query(filter): Query<BeliefFilter>,
743) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
744    let beliefs = state.cognition.get_beliefs().await;
745    let mut result: Vec<Belief> = beliefs.into_values().collect();
746
747    if let Some(status) = &filter.status {
748        result.retain(|b| {
749            let s = serde_json::to_string(&b.status).unwrap_or_default();
750            s.contains(status)
751        });
752    }
753    if let Some(persona) = &filter.persona {
754        result.retain(|b| &b.asserted_by == persona);
755    }
756
757    result.sort_by(|a, b| {
758        b.confidence
759            .partial_cmp(&a.confidence)
760            .unwrap_or(std::cmp::Ordering::Equal)
761    });
762    Ok(Json(result))
763}
764
765async fn get_belief(
766    State(state): State<AppState>,
767    Path(id): Path<String>,
768) -> Result<Json<Belief>, (StatusCode, String)> {
769    match state.cognition.get_belief(&id).await {
770        Some(belief) => Ok(Json(belief)),
771        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
772    }
773}
774
775async fn list_attention(
776    State(state): State<AppState>,
777) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
778    Ok(Json(state.cognition.get_attention_queue().await))
779}
780
781async fn list_proposals(
782    State(state): State<AppState>,
783) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
784    let proposals = state.cognition.get_proposals().await;
785    let mut result: Vec<Proposal> = proposals.into_values().collect();
786    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
787    Ok(Json(result))
788}
789
790async fn approve_proposal(
791    State(state): State<AppState>,
792    Path(id): Path<String>,
793) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
794    state
795        .cognition
796        .approve_proposal(&id)
797        .await
798        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
799        .map_err(internal_error)
800}
801
802async fn list_receipts(
803    State(state): State<AppState>,
804) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
805    Ok(Json(state.cognition.get_receipts().await))
806}
807
808async fn get_workspace(
809    State(state): State<AppState>,
810) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
811    Ok(Json(state.cognition.get_workspace().await))
812}
813
814async fn get_governance(
815    State(state): State<AppState>,
816) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
817    Ok(Json(state.cognition.get_governance().await))
818}
819
820async fn get_persona(
821    State(state): State<AppState>,
822    axum::extract::Path(id): axum::extract::Path<String>,
823) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
824    state
825        .cognition
826        .get_persona(&id)
827        .await
828        .map(Json)
829        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
830}
831
832// ── Audit trail endpoints ──
833
834#[derive(Deserialize)]
835struct AuditQuery {
836    limit: Option<usize>,
837    category: Option<String>,
838}
839
840async fn list_audit_entries(
841    State(state): State<AppState>,
842    Query(query): Query<AuditQuery>,
843) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
844    let limit = query.limit.unwrap_or(100).min(1000);
845
846    let entries = if let Some(ref cat) = query.category {
847        let category = match cat.as_str() {
848            "api" => AuditCategory::Api,
849            "tool" | "tool_execution" => AuditCategory::ToolExecution,
850            "session" => AuditCategory::Session,
851            "cognition" => AuditCategory::Cognition,
852            "swarm" => AuditCategory::Swarm,
853            "auth" => AuditCategory::Auth,
854            "k8s" => AuditCategory::K8s,
855            "sandbox" => AuditCategory::Sandbox,
856            "config" => AuditCategory::Config,
857            _ => {
858                return Err((
859                    StatusCode::BAD_REQUEST,
860                    format!("Unknown category: {}", cat),
861                ));
862            }
863        };
864        state.audit_log.by_category(category, limit).await
865    } else {
866        state.audit_log.recent(limit).await
867    };
868
869    Ok(Json(entries))
870}
871
872// ── Event Stream Replay API ──
873// Enables auditors to reconstruct sessions from byte-range offsets.
874// This is the key compliance feature for SOC 2, FedRAMP, and ATO processes.
875
876#[derive(Deserialize)]
877struct ReplayQuery {
878    /// Session ID to replay
879    session_id: String,
880    /// Optional: starting byte offset (for seeking to specific point)
881    start_offset: Option<u64>,
882    /// Optional: ending byte offset
883    end_offset: Option<u64>,
884    /// Optional: limit number of events to return
885    limit: Option<usize>,
886    /// Optional: filter by tool name
887    tool_name: Option<String>,
888}
889
890/// Replay session events from the JSONL event stream by byte-range offsets.
891/// This allows auditors to reconstruct exactly what happened in a session,
892/// including tool execution durations and success/failure status.
893async fn replay_session_events(
894    Query(query): Query<ReplayQuery>,
895) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
896    use std::path::PathBuf;
897
898    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
899        .map(PathBuf::from)
900        .ok()
901        .ok_or_else(|| {
902            (
903                StatusCode::SERVICE_UNAVAILABLE,
904                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
905            )
906        })?;
907
908    let session_dir = base_dir.join(&query.session_id);
909
910    // Check if session directory exists
911    if !session_dir.exists() {
912        return Err((
913            StatusCode::NOT_FOUND,
914            format!("Session not found: {}", query.session_id),
915        ));
916    }
917
918    let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
919
920    // Read all event files in the session directory using std::fs for simplicity
921    let entries = std::fs::read_dir(&session_dir)
922        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
923
924    for entry in entries {
925        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
926        let path = entry.path();
927
928        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
929            continue;
930        }
931
932        // Parse byte range from filename: {timestamp}-chat-events-{start}-{end}.jsonl
933        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
934
935        if let Some(offsets) = filename
936            .strip_prefix("T")
937            .or_else(|| filename.strip_prefix("202"))
938        {
939            // Extract start and end offsets from filename
940            let parts: Vec<&str> = offsets.split('-').collect();
941            if parts.len() >= 4 {
942                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
943                let end: u64 = parts[parts.len() - 1]
944                    .trim_end_matches(".jsonl")
945                    .parse()
946                    .unwrap_or(0);
947
948                // Filter by byte range if specified
949                if let Some(query_start) = query.start_offset {
950                    if end <= query_start {
951                        continue;
952                    }
953                }
954                if let Some(query_end) = query.end_offset {
955                    if start >= query_end {
956                        continue;
957                    }
958                }
959
960                // Read and parse events from this file
961                let content = std::fs::read_to_string(&path)
962                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
963
964                for line in content.lines() {
965                    if line.trim().is_empty() {
966                        continue;
967                    }
968                    if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
969                        // Filter by tool name if specified
970                        if let Some(ref tool_filter) = query.tool_name {
971                            if let Some(event_tool) =
972                                event.get("tool_name").and_then(|v| v.as_str())
973                            {
974                                if event_tool != tool_filter {
975                                    continue;
976                                }
977                            } else {
978                                continue;
979                            }
980                        }
981                        all_events.push((start, end, event));
982                    }
983                }
984            }
985        }
986    }
987
988    // Sort by start offset
989    all_events.sort_by_key(|(s, _, _)| *s);
990
991    // Apply limit
992    let limit = query.limit.unwrap_or(1000).min(10000);
993    let events: Vec<_> = all_events
994        .into_iter()
995        .take(limit)
996        .map(|(_, _, e)| e)
997        .collect();
998
999    Ok(Json(events))
1000}
1001
1002/// Get session event files metadata (for audit index)
1003async fn list_session_event_files(
1004    Query(query): Query<ReplayQuery>,
1005) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
1006    use std::path::PathBuf;
1007
1008    let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1009        .map(PathBuf::from)
1010        .ok()
1011        .ok_or_else(|| {
1012            (
1013                StatusCode::SERVICE_UNAVAILABLE,
1014                "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1015            )
1016        })?;
1017
1018    let session_dir = base_dir.join(&query.session_id);
1019    if !session_dir.exists() {
1020        return Err((
1021            StatusCode::NOT_FOUND,
1022            format!("Session not found: {}", query.session_id),
1023        ));
1024    }
1025
1026    let mut files: Vec<EventFileMeta> = Vec::new();
1027
1028    // Use std::fs for simplicity
1029    let entries = std::fs::read_dir(&session_dir)
1030        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1031
1032    for entry in entries {
1033        let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1034        let path = entry.path();
1035
1036        if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1037            continue;
1038        }
1039
1040        let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1041
1042        // Parse byte range from filename
1043        if let Some(offsets) = filename
1044            .strip_prefix("T")
1045            .or_else(|| filename.strip_prefix("202"))
1046        {
1047            let parts: Vec<&str> = offsets.split('-').collect();
1048            if parts.len() >= 4 {
1049                let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1050                let end: u64 = parts[parts.len() - 1]
1051                    .trim_end_matches(".jsonl")
1052                    .parse()
1053                    .unwrap_or(0);
1054
1055                let metadata = std::fs::metadata(&path)
1056                    .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1057
1058                files.push(EventFileMeta {
1059                    filename: filename.to_string(),
1060                    start_offset: start,
1061                    end_offset: end,
1062                    size_bytes: metadata.len(),
1063                });
1064            }
1065        }
1066    }
1067
1068    files.sort_by_key(|f| f.start_offset);
1069    Ok(Json(files))
1070}
1071
1072#[derive(Serialize)]
1073struct EventFileMeta {
1074    filename: String,
1075    start_offset: u64,
1076    end_offset: u64,
1077    size_bytes: u64,
1078}
1079
1080// ── K8s self-deployment endpoints ──
1081
1082async fn get_k8s_status(
1083    State(state): State<AppState>,
1084) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
1085    Ok(Json(state.k8s.status().await))
1086}
1087
1088#[derive(Deserialize)]
1089struct ScaleRequest {
1090    replicas: i32,
1091}
1092
1093async fn k8s_scale(
1094    State(state): State<AppState>,
1095    Json(req): Json<ScaleRequest>,
1096) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1097    if req.replicas < 0 || req.replicas > 100 {
1098        return Err((
1099            StatusCode::BAD_REQUEST,
1100            "Replicas must be between 0 and 100".to_string(),
1101        ));
1102    }
1103
1104    state
1105        .audit_log
1106        .log(
1107            AuditCategory::K8s,
1108            format!("scale:{}", req.replicas),
1109            AuditOutcome::Success,
1110            None,
1111            None,
1112        )
1113        .await;
1114
1115    state
1116        .k8s
1117        .scale(req.replicas)
1118        .await
1119        .map(Json)
1120        .map_err(internal_error)
1121}
1122
1123async fn k8s_restart(
1124    State(state): State<AppState>,
1125) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1126    state
1127        .audit_log
1128        .log(
1129            AuditCategory::K8s,
1130            "rolling_restart",
1131            AuditOutcome::Success,
1132            None,
1133            None,
1134        )
1135        .await;
1136
1137    state
1138        .k8s
1139        .rolling_restart()
1140        .await
1141        .map(Json)
1142        .map_err(internal_error)
1143}
1144
1145async fn k8s_list_pods(
1146    State(state): State<AppState>,
1147) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
1148    state
1149        .k8s
1150        .list_pods()
1151        .await
1152        .map(Json)
1153        .map_err(internal_error)
1154}
1155
1156async fn k8s_actions(
1157    State(state): State<AppState>,
1158) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
1159    Ok(Json(state.k8s.recent_actions(100).await))
1160}
1161
1162#[derive(Deserialize)]
1163struct SpawnSubagentRequest {
1164    subagent_id: String,
1165    #[serde(default)]
1166    image: Option<String>,
1167    #[serde(default)]
1168    env_vars: std::collections::HashMap<String, String>,
1169}
1170
1171async fn k8s_spawn_subagent(
1172    State(state): State<AppState>,
1173    Json(req): Json<SpawnSubagentRequest>,
1174) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1175    state
1176        .k8s
1177        .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
1178        .await
1179        .map(Json)
1180        .map_err(internal_error)
1181}
1182
1183async fn k8s_delete_subagent(
1184    State(state): State<AppState>,
1185    axum::extract::Path(id): axum::extract::Path<String>,
1186) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1187    state
1188        .k8s
1189        .delete_subagent_pod(&id)
1190        .await
1191        .map(Json)
1192        .map_err(internal_error)
1193}
1194
1195/// List registered plugins.
1196async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
1197    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
1198    let signing_key = SigningKey::from_env();
1199    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
1200    Json(PluginListResponse {
1201        server_fingerprint,
1202        signing_available: !test_sig.is_empty(),
1203        plugins: Vec::<PluginManifest>::new(),
1204    })
1205}
1206
1207#[derive(Serialize)]
1208struct PluginListResponse {
1209    server_fingerprint: String,
1210    signing_available: bool,
1211    plugins: Vec<PluginManifest>,
1212}
1213
1214fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
1215    let message = error.to_string();
1216    if message.contains("not found") {
1217        return (StatusCode::NOT_FOUND, message);
1218    }
1219    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1220        return (StatusCode::BAD_REQUEST, message);
1221    }
1222    (StatusCode::INTERNAL_SERVER_ERROR, message)
1223}
1224
1225fn env_bool(name: &str, default: bool) -> bool {
1226    std::env::var(name)
1227        .ok()
1228        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1229            "1" | "true" | "yes" | "on" => Some(true),
1230            "0" | "false" | "no" | "off" => Some(false),
1231            _ => None,
1232        })
1233        .unwrap_or(default)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::match_policy_rule;
1239
1240    #[test]
1241    fn policy_prompt_session_requires_execute_permission() {
1242        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1243        assert_eq!(permission, Some("agent:execute"));
1244    }
1245
1246    #[test]
1247    fn policy_create_session_keeps_sessions_write_permission() {
1248        let permission = match_policy_rule("/api/session", "POST");
1249        assert_eq!(permission, Some("sessions:write"));
1250    }
1251
1252    #[test]
1253    fn policy_proposal_approval_requires_execute_permission() {
1254        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1255        assert_eq!(permission, Some("agent:execute"));
1256    }
1257}