Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16    executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24    Router,
25    body::Body,
26    extract::Path,
27    extract::{Query, State},
28    http::{Request, StatusCode},
29    middleware::{self, Next},
30    response::sse::{Event, KeepAlive, Sse},
31    response::{Json, Response},
32    routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41/// Server state shared across handlers
42#[derive(Clone)]
43pub struct AppState {
44    pub config: Arc<Config>,
45    pub cognition: Arc<CognitionRuntime>,
46    pub audit_log: AuditLog,
47    pub k8s: Arc<K8sManager>,
48    pub auth: AuthState,
49    pub bus: Arc<AgentBus>,
50}
51
52/// Audit middleware — logs every request/response to the audit trail.
53async fn audit_middleware(
54    State(state): State<AppState>,
55    request: Request<Body>,
56    next: Next,
57) -> Response {
58    let method = request.method().clone();
59    let path = request.uri().path().to_string();
60    let started = std::time::Instant::now();
61
62    let response = next.run(request).await;
63
64    let duration_ms = started.elapsed().as_millis() as u64;
65    let status = response.status().as_u16();
66    let outcome = if status < 400 {
67        AuditOutcome::Success
68    } else if status == 401 || status == 403 {
69        AuditOutcome::Denied
70    } else {
71        AuditOutcome::Failure
72    };
73
74    state
75        .audit_log
76        .record(audit::AuditEntry {
77            id: uuid::Uuid::new_v4().to_string(),
78            timestamp: chrono::Utc::now(),
79            category: AuditCategory::Api,
80            action: format!("{} {}", method, path),
81            principal: None,
82            outcome,
83            detail: Some(serde_json::json!({ "status": status })),
84            duration_ms: Some(duration_ms),
85        })
86        .await;
87
88    response
89}
90
91/// Mapping from (path pattern, HTTP method) → OPA permission action.
92/// The first matching rule wins.
93struct PolicyRule {
94    pattern: &'static str,
95    methods: Option<&'static [&'static str]>,
96    permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100    // Public / exempt
101    PolicyRule {
102        pattern: "/health",
103        methods: None,
104        permission: "",
105    },
106    PolicyRule {
107        pattern: "/a2a/",
108        methods: None,
109        permission: "",
110    },
111    // K8s management — admin only
112    PolicyRule {
113        pattern: "/v1/k8s/scale",
114        methods: Some(&["POST"]),
115        permission: "admin:access",
116    },
117    PolicyRule {
118        pattern: "/v1/k8s/restart",
119        methods: Some(&["POST"]),
120        permission: "admin:access",
121    },
122    PolicyRule {
123        pattern: "/v1/k8s/",
124        methods: Some(&["GET"]),
125        permission: "admin:access",
126    },
127    // K8s sub-agent lifecycle — admin only
128    PolicyRule {
129        pattern: "/v1/k8s/subagent",
130        methods: Some(&["POST", "DELETE"]),
131        permission: "admin:access",
132    },
133    // Plugin registry — read
134    PolicyRule {
135        pattern: "/v1/plugins",
136        methods: Some(&["GET"]),
137        permission: "agent:read",
138    },
139    // Audit — admin
140    PolicyRule {
141        pattern: "/v1/audit",
142        methods: None,
143        permission: "admin:access",
144    },
145    // Cognition — write operations
146    PolicyRule {
147        pattern: "/v1/cognition/start",
148        methods: Some(&["POST"]),
149        permission: "agent:execute",
150    },
151    PolicyRule {
152        pattern: "/v1/cognition/stop",
153        methods: Some(&["POST"]),
154        permission: "agent:execute",
155    },
156    PolicyRule {
157        pattern: "/v1/cognition/",
158        methods: Some(&["GET"]),
159        permission: "agent:read",
160    },
161    // Swarm persona lifecycle
162    PolicyRule {
163        pattern: "/v1/swarm/personas",
164        methods: Some(&["POST"]),
165        permission: "agent:execute",
166    },
167    PolicyRule {
168        pattern: "/v1/swarm/",
169        methods: Some(&["POST"]),
170        permission: "agent:execute",
171    },
172    PolicyRule {
173        pattern: "/v1/swarm/",
174        methods: Some(&["GET"]),
175        permission: "agent:read",
176    },
177    // Session management
178    // Session prompt execution — requires execute permission
179    PolicyRule {
180        pattern: "/api/session/",
181        methods: Some(&["POST"]),
182        permission: "agent:execute",
183    },
184    PolicyRule {
185        pattern: "/api/session",
186        methods: Some(&["POST"]),
187        permission: "sessions:write",
188    },
189    PolicyRule {
190        pattern: "/api/session",
191        methods: Some(&["GET"]),
192        permission: "sessions:read",
193    },
194    // Proposal approval — governance action
195    PolicyRule {
196        pattern: "/v1/cognition/proposals/",
197        methods: Some(&["POST"]),
198        permission: "agent:execute",
199    },
200    // Config, version, providers, agents — read
201    PolicyRule {
202        pattern: "/api/version",
203        methods: None,
204        permission: "agent:read",
205    },
206    PolicyRule {
207        pattern: "/api/config",
208        methods: None,
209        permission: "agent:read",
210    },
211    PolicyRule {
212        pattern: "/api/provider",
213        methods: None,
214        permission: "agent:read",
215    },
216    PolicyRule {
217        pattern: "/api/agent",
218        methods: None,
219        permission: "agent:read",
220    },
221];
222
223/// Find the required permission for a given path + method.
224/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
225fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
226    for rule in POLICY_RULES {
227        let matches = if rule.pattern.ends_with('/') {
228            path.starts_with(rule.pattern)
229        } else {
230            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
231        };
232        if matches {
233            if let Some(allowed_methods) = rule.methods {
234                if !allowed_methods.contains(&method) {
235                    continue;
236                }
237            }
238            return Some(rule.permission);
239        }
240    }
241    None
242}
243
244/// Policy authorization middleware for Axum.
245///
246/// Maps request paths to OPA permission strings and enforces authorization.
247/// Runs after `require_auth` so the bearer token is already validated.
248/// Currently maps the static bearer token to an admin role since
249/// codetether-agent uses a single shared token model.
250async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
251    let path = request.uri().path().to_string();
252    let method = request.method().as_str().to_string();
253
254    let permission = match match_policy_rule(&path, &method) {
255        None | Some("") => return Ok(next.run(request).await),
256        Some(perm) => perm,
257    };
258
259    // The current auth model uses a single static token for all access.
260    // When this is the case, the authenticated user effectively has admin role.
261    // Future: extract user claims from JWT and build a proper PolicyUser.
262    let user = policy::PolicyUser {
263        user_id: "bearer-token-user".to_string(),
264        roles: vec!["admin".to_string()],
265        tenant_id: None,
266        scopes: vec![],
267        auth_source: "static_token".to_string(),
268    };
269
270    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
271        tracing::warn!(
272            path = %path,
273            method = %method,
274            permission = %permission,
275            "Policy middleware denied request"
276        );
277        return Err(status);
278    }
279
280    Ok(next.run(request).await)
281}
282
283/// Start the HTTP server
284pub async fn serve(args: ServeArgs) -> Result<()> {
285    let t0 = std::time::Instant::now();
286    tracing::info!("[startup] begin");
287    let config = Config::load().await?;
288    tracing::info!(
289        elapsed_ms = t0.elapsed().as_millis(),
290        "[startup] config loaded"
291    );
292    let mut cognition = CognitionRuntime::new_from_env();
293    tracing::info!(
294        elapsed_ms = t0.elapsed().as_millis(),
295        "[startup] cognition runtime created"
296    );
297
298    // Set up tool registry for cognition execution engine.
299    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
300    tracing::info!(
301        elapsed_ms = t0.elapsed().as_millis(),
302        "[startup] tools registered"
303    );
304    let cognition = Arc::new(cognition);
305
306    // Initialize audit log.
307    let audit_log = AuditLog::from_env();
308    let _ = audit::init_audit_log(audit_log.clone());
309
310    // Initialize K8s manager.
311    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
312    let k8s = Arc::new(K8sManager::new().await);
313    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
314    if k8s.is_available() {
315        tracing::info!("K8s self-deployment enabled");
316    }
317
318    // Initialize mandatory auth.
319    let auth_state = AuthState::from_env();
320    tracing::info!(
321        token_len = auth_state.token().len(),
322        "Auth is mandatory. Token required for all API endpoints."
323    );
324    tracing::info!(
325        audit_entries = audit_log.count().await,
326        "Audit log initialized"
327    );
328
329    // Create agent bus for in-process communication
330    let bus = AgentBus::new().into_arc();
331    tracing::info!(
332        elapsed_ms = t0.elapsed().as_millis(),
333        "[startup] bus created"
334    );
335
336    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
337        tracing::info!(
338            elapsed_ms = t0.elapsed().as_millis(),
339            "[startup] auto-starting cognition"
340        );
341        if let Err(error) = cognition.start(None).await {
342            tracing::warn!(%error, "Failed to auto-start cognition loop");
343        } else {
344            tracing::info!("Perpetual cognition auto-started");
345        }
346    }
347
348    tracing::info!(
349        elapsed_ms = t0.elapsed().as_millis(),
350        "[startup] building routes"
351    );
352    let addr = format!("{}:{}", args.hostname, args.port);
353
354    // Build the agent card
355    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
356    let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
357
358    // Build A2A router separately
359    let a2a_router = a2a_server.router();
360
361    // Start gRPC transport on a separate port
362    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
363        .ok()
364        .and_then(|p| p.parse::<u16>().ok())
365        .unwrap_or(50051);
366    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
367    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
368    let grpc_service = grpc_store.into_service();
369    tokio::spawn(async move {
370        tracing::info!("gRPC A2A server listening on {}", grpc_addr);
371        if let Err(e) = tonic::transport::Server::builder()
372            .add_service(grpc_service)
373            .serve(grpc_addr)
374            .await
375        {
376            tracing::error!("gRPC server error: {}", e);
377        }
378    });
379
380    let state = AppState {
381        config: Arc::new(config),
382        cognition,
383        audit_log,
384        k8s,
385        auth: auth_state.clone(),
386        bus,
387    };
388
389    let app = Router::new()
390        // Health check (public — auth exempt)
391        .route("/health", get(health))
392        // API routes
393        .route("/api/version", get(get_version))
394        .route("/api/session", get(list_sessions).post(create_session))
395        .route("/api/session/{id}", get(get_session))
396        .route("/api/session/{id}/prompt", post(prompt_session))
397        .route("/api/config", get(get_config))
398        .route("/api/provider", get(list_providers))
399        .route("/api/agent", get(list_agents))
400        // Perpetual cognition APIs
401        .route("/v1/cognition/start", post(start_cognition))
402        .route("/v1/cognition/stop", post(stop_cognition))
403        .route("/v1/cognition/status", get(get_cognition_status))
404        .route("/v1/cognition/stream", get(stream_cognition))
405        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
406        // Swarm persona lifecycle APIs
407        .route("/v1/swarm/personas", post(create_persona))
408        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
409        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
410        .route("/v1/swarm/lineage", get(get_swarm_lineage))
411        // Belief, attention, governance, workspace APIs
412        .route("/v1/cognition/beliefs", get(list_beliefs))
413        .route("/v1/cognition/beliefs/{id}", get(get_belief))
414        .route("/v1/cognition/attention", get(list_attention))
415        .route("/v1/cognition/proposals", get(list_proposals))
416        .route(
417            "/v1/cognition/proposals/{id}/approve",
418            post(approve_proposal),
419        )
420        .route("/v1/cognition/receipts", get(list_receipts))
421        .route("/v1/cognition/workspace", get(get_workspace))
422        .route("/v1/cognition/governance", get(get_governance))
423        .route("/v1/cognition/personas/{id}", get(get_persona))
424        // Audit trail API
425        .route("/v1/audit", get(list_audit_entries))
426        // K8s self-deployment APIs
427        .route("/v1/k8s/status", get(get_k8s_status))
428        .route("/v1/k8s/scale", post(k8s_scale))
429        .route("/v1/k8s/restart", post(k8s_restart))
430        .route("/v1/k8s/pods", get(k8s_list_pods))
431        .route("/v1/k8s/actions", get(k8s_actions))
432        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
433        .route(
434            "/v1/k8s/subagent/{id}",
435            axum::routing::delete(k8s_delete_subagent),
436        )
437        // Plugin registry API
438        .route("/v1/plugins", get(list_plugins))
439        .with_state(state.clone())
440        // A2A routes (nested to work with different state type)
441        .nest("/a2a", a2a_router)
442        // Mandatory auth middleware — applies to all routes
443        .layer(middleware::from_fn_with_state(
444            state.clone(),
445            audit_middleware,
446        ))
447        .layer(axum::Extension(state.auth.clone()))
448        .layer(middleware::from_fn(policy_middleware))
449        .layer(middleware::from_fn(auth::require_auth))
450        // CORS + tracing
451        .layer(
452            CorsLayer::new()
453                .allow_origin(AllowOrigin::mirror_request())
454                .allow_credentials(true)
455                .allow_methods(AllowMethods::mirror_request())
456                .allow_headers(AllowHeaders::mirror_request()),
457        )
458        .layer(TraceLayer::new_for_http());
459
460    tracing::info!(
461        elapsed_ms = t0.elapsed().as_millis(),
462        "[startup] router built, binding"
463    );
464    let listener = tokio::net::TcpListener::bind(&addr).await?;
465    tracing::info!(
466        elapsed_ms = t0.elapsed().as_millis(),
467        "[startup] listening on http://{}",
468        addr
469    );
470
471    axum::serve(listener, app).await?;
472
473    Ok(())
474}
475
476/// Health check response
477async fn health() -> &'static str {
478    "ok"
479}
480
481/// Version info
482#[derive(Serialize)]
483struct VersionInfo {
484    version: &'static str,
485    name: &'static str,
486    binary_hash: Option<String>,
487}
488
489async fn get_version() -> Json<VersionInfo> {
490    let binary_hash = std::env::current_exe()
491        .ok()
492        .and_then(|p| hash_file(&p).ok());
493    Json(VersionInfo {
494        version: env!("CARGO_PKG_VERSION"),
495        name: env!("CARGO_PKG_NAME"),
496        binary_hash,
497    })
498}
499
500/// List sessions
501#[derive(Deserialize)]
502struct ListSessionsQuery {
503    limit: Option<usize>,
504}
505
506async fn list_sessions(
507    Query(query): Query<ListSessionsQuery>,
508) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
509    let sessions = crate::session::list_sessions()
510        .await
511        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
512
513    let limit = query.limit.unwrap_or(50);
514    Ok(Json(sessions.into_iter().take(limit).collect()))
515}
516
517/// Create a new session
518#[derive(Deserialize)]
519struct CreateSessionRequest {
520    title: Option<String>,
521    agent: Option<String>,
522}
523
524async fn create_session(
525    Json(req): Json<CreateSessionRequest>,
526) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
527    let mut session = crate::session::Session::new()
528        .await
529        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
530
531    session.title = req.title;
532    if let Some(agent) = req.agent {
533        session.agent = agent;
534    }
535
536    session
537        .save()
538        .await
539        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
540
541    Ok(Json(session))
542}
543
544/// Get a session by ID
545async fn get_session(
546    axum::extract::Path(id): axum::extract::Path<String>,
547) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
548    let session = crate::session::Session::load(&id)
549        .await
550        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
551
552    Ok(Json(session))
553}
554
555/// Prompt a session
556#[derive(Deserialize)]
557struct PromptRequest {
558    message: String,
559}
560
561async fn prompt_session(
562    axum::extract::Path(id): axum::extract::Path<String>,
563    Json(req): Json<PromptRequest>,
564) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
565    // Validate the message is not empty
566    if req.message.trim().is_empty() {
567        return Err((
568            StatusCode::BAD_REQUEST,
569            "Message cannot be empty".to_string(),
570        ));
571    }
572
573    // Log the prompt request (uses the message field)
574    tracing::info!(
575        session_id = %id,
576        message_len = req.message.len(),
577        "Received prompt request"
578    );
579
580    // TODO: Implement actual prompting
581    Err((
582        StatusCode::NOT_IMPLEMENTED,
583        "Prompt execution not yet implemented".to_string(),
584    ))
585}
586
587/// Get configuration
588async fn get_config(State(state): State<AppState>) -> Json<Config> {
589    Json((*state.config).clone())
590}
591
592/// List providers
593async fn list_providers() -> Json<Vec<String>> {
594    Json(vec![
595        "openai".to_string(),
596        "anthropic".to_string(),
597        "google".to_string(),
598    ])
599}
600
601/// List agents
602async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
603    let registry = crate::agent::AgentRegistry::with_builtins();
604    Json(registry.list().into_iter().cloned().collect())
605}
606
607async fn start_cognition(
608    State(state): State<AppState>,
609    payload: Option<Json<StartCognitionRequest>>,
610) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
611    state
612        .cognition
613        .start(payload.map(|Json(body)| body))
614        .await
615        .map(Json)
616        .map_err(internal_error)
617}
618
619async fn stop_cognition(
620    State(state): State<AppState>,
621    payload: Option<Json<StopCognitionRequest>>,
622) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
623    let reason = payload.and_then(|Json(body)| body.reason);
624    state
625        .cognition
626        .stop(reason)
627        .await
628        .map(Json)
629        .map_err(internal_error)
630}
631
632async fn get_cognition_status(
633    State(state): State<AppState>,
634) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
635    Ok(Json(state.cognition.status().await))
636}
637
638async fn stream_cognition(
639    State(state): State<AppState>,
640) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
641    let rx = state.cognition.subscribe_events();
642
643    let event_stream = stream::unfold(rx, |mut rx| async move {
644        match rx.recv().await {
645            Ok(event) => {
646                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
647                let sse_event = Event::default().event("cognition").data(payload);
648                Some((Ok(sse_event), rx))
649            }
650            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
651                let lag_event = Event::default()
652                    .event("lag")
653                    .data(format!("skipped {}", skipped));
654                Some((Ok(lag_event), rx))
655            }
656            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
657        }
658    });
659
660    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
661}
662
663async fn get_latest_snapshot(
664    State(state): State<AppState>,
665) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
666    match state.cognition.latest_snapshot().await {
667        Some(snapshot) => Ok(Json(snapshot)),
668        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
669    }
670}
671
672async fn create_persona(
673    State(state): State<AppState>,
674    Json(req): Json<CreatePersonaRequest>,
675) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
676    state
677        .cognition
678        .create_persona(req)
679        .await
680        .map(Json)
681        .map_err(internal_error)
682}
683
684async fn spawn_persona(
685    State(state): State<AppState>,
686    Path(id): Path<String>,
687    Json(req): Json<SpawnPersonaRequest>,
688) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
689    state
690        .cognition
691        .spawn_child(&id, req)
692        .await
693        .map(Json)
694        .map_err(internal_error)
695}
696
697async fn reap_persona(
698    State(state): State<AppState>,
699    Path(id): Path<String>,
700    payload: Option<Json<ReapPersonaRequest>>,
701) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
702    let req = payload
703        .map(|Json(body)| body)
704        .unwrap_or(ReapPersonaRequest {
705            cascade: Some(false),
706            reason: None,
707        });
708
709    state
710        .cognition
711        .reap_persona(&id, req)
712        .await
713        .map(Json)
714        .map_err(internal_error)
715}
716
717async fn get_swarm_lineage(
718    State(state): State<AppState>,
719) -> Result<Json<LineageGraph>, (StatusCode, String)> {
720    Ok(Json(state.cognition.lineage_graph().await))
721}
722
723// ── Belief, Attention, Governance, Workspace handlers ──
724
725#[derive(Deserialize)]
726struct BeliefFilter {
727    status: Option<String>,
728    persona: Option<String>,
729}
730
731async fn list_beliefs(
732    State(state): State<AppState>,
733    Query(filter): Query<BeliefFilter>,
734) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
735    let beliefs = state.cognition.get_beliefs().await;
736    let mut result: Vec<Belief> = beliefs.into_values().collect();
737
738    if let Some(status) = &filter.status {
739        result.retain(|b| {
740            let s = serde_json::to_string(&b.status).unwrap_or_default();
741            s.contains(status)
742        });
743    }
744    if let Some(persona) = &filter.persona {
745        result.retain(|b| &b.asserted_by == persona);
746    }
747
748    result.sort_by(|a, b| {
749        b.confidence
750            .partial_cmp(&a.confidence)
751            .unwrap_or(std::cmp::Ordering::Equal)
752    });
753    Ok(Json(result))
754}
755
756async fn get_belief(
757    State(state): State<AppState>,
758    Path(id): Path<String>,
759) -> Result<Json<Belief>, (StatusCode, String)> {
760    match state.cognition.get_belief(&id).await {
761        Some(belief) => Ok(Json(belief)),
762        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
763    }
764}
765
766async fn list_attention(
767    State(state): State<AppState>,
768) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
769    Ok(Json(state.cognition.get_attention_queue().await))
770}
771
772async fn list_proposals(
773    State(state): State<AppState>,
774) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
775    let proposals = state.cognition.get_proposals().await;
776    let mut result: Vec<Proposal> = proposals.into_values().collect();
777    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
778    Ok(Json(result))
779}
780
781async fn approve_proposal(
782    State(state): State<AppState>,
783    Path(id): Path<String>,
784) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
785    state
786        .cognition
787        .approve_proposal(&id)
788        .await
789        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
790        .map_err(internal_error)
791}
792
793async fn list_receipts(
794    State(state): State<AppState>,
795) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
796    Ok(Json(state.cognition.get_receipts().await))
797}
798
799async fn get_workspace(
800    State(state): State<AppState>,
801) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
802    Ok(Json(state.cognition.get_workspace().await))
803}
804
805async fn get_governance(
806    State(state): State<AppState>,
807) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
808    Ok(Json(state.cognition.get_governance().await))
809}
810
811async fn get_persona(
812    State(state): State<AppState>,
813    axum::extract::Path(id): axum::extract::Path<String>,
814) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
815    state
816        .cognition
817        .get_persona(&id)
818        .await
819        .map(Json)
820        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
821}
822
823// ── Audit trail endpoints ──
824
825#[derive(Deserialize)]
826struct AuditQuery {
827    limit: Option<usize>,
828    category: Option<String>,
829}
830
831async fn list_audit_entries(
832    State(state): State<AppState>,
833    Query(query): Query<AuditQuery>,
834) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
835    let limit = query.limit.unwrap_or(100).min(1000);
836
837    let entries = if let Some(ref cat) = query.category {
838        let category = match cat.as_str() {
839            "api" => AuditCategory::Api,
840            "tool" | "tool_execution" => AuditCategory::ToolExecution,
841            "session" => AuditCategory::Session,
842            "cognition" => AuditCategory::Cognition,
843            "swarm" => AuditCategory::Swarm,
844            "auth" => AuditCategory::Auth,
845            "k8s" => AuditCategory::K8s,
846            "sandbox" => AuditCategory::Sandbox,
847            "config" => AuditCategory::Config,
848            _ => {
849                return Err((
850                    StatusCode::BAD_REQUEST,
851                    format!("Unknown category: {}", cat),
852                ));
853            }
854        };
855        state.audit_log.by_category(category, limit).await
856    } else {
857        state.audit_log.recent(limit).await
858    };
859
860    Ok(Json(entries))
861}
862
863// ── K8s self-deployment endpoints ──
864
865async fn get_k8s_status(
866    State(state): State<AppState>,
867) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
868    Ok(Json(state.k8s.status().await))
869}
870
871#[derive(Deserialize)]
872struct ScaleRequest {
873    replicas: i32,
874}
875
876async fn k8s_scale(
877    State(state): State<AppState>,
878    Json(req): Json<ScaleRequest>,
879) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
880    if req.replicas < 0 || req.replicas > 100 {
881        return Err((
882            StatusCode::BAD_REQUEST,
883            "Replicas must be between 0 and 100".to_string(),
884        ));
885    }
886
887    state
888        .audit_log
889        .log(
890            AuditCategory::K8s,
891            format!("scale:{}", req.replicas),
892            AuditOutcome::Success,
893            None,
894            None,
895        )
896        .await;
897
898    state
899        .k8s
900        .scale(req.replicas)
901        .await
902        .map(Json)
903        .map_err(internal_error)
904}
905
906async fn k8s_restart(
907    State(state): State<AppState>,
908) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
909    state
910        .audit_log
911        .log(
912            AuditCategory::K8s,
913            "rolling_restart",
914            AuditOutcome::Success,
915            None,
916            None,
917        )
918        .await;
919
920    state
921        .k8s
922        .rolling_restart()
923        .await
924        .map(Json)
925        .map_err(internal_error)
926}
927
928async fn k8s_list_pods(
929    State(state): State<AppState>,
930) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
931    state
932        .k8s
933        .list_pods()
934        .await
935        .map(Json)
936        .map_err(internal_error)
937}
938
939async fn k8s_actions(
940    State(state): State<AppState>,
941) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
942    Ok(Json(state.k8s.recent_actions(100).await))
943}
944
945#[derive(Deserialize)]
946struct SpawnSubagentRequest {
947    subagent_id: String,
948    #[serde(default)]
949    image: Option<String>,
950    #[serde(default)]
951    env_vars: std::collections::HashMap<String, String>,
952}
953
954async fn k8s_spawn_subagent(
955    State(state): State<AppState>,
956    Json(req): Json<SpawnSubagentRequest>,
957) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
958    state
959        .k8s
960        .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
961        .await
962        .map(Json)
963        .map_err(internal_error)
964}
965
966async fn k8s_delete_subagent(
967    State(state): State<AppState>,
968    axum::extract::Path(id): axum::extract::Path<String>,
969) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
970    state
971        .k8s
972        .delete_subagent_pod(&id)
973        .await
974        .map(Json)
975        .map_err(internal_error)
976}
977
978/// List registered plugins.
979async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
980    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
981    let signing_key = SigningKey::from_env();
982    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
983    Json(PluginListResponse {
984        server_fingerprint,
985        signing_available: !test_sig.is_empty(),
986        plugins: Vec::<PluginManifest>::new(),
987    })
988}
989
990#[derive(Serialize)]
991struct PluginListResponse {
992    server_fingerprint: String,
993    signing_available: bool,
994    plugins: Vec<PluginManifest>,
995}
996
997fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
998    let message = error.to_string();
999    if message.contains("not found") {
1000        return (StatusCode::NOT_FOUND, message);
1001    }
1002    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1003        return (StatusCode::BAD_REQUEST, message);
1004    }
1005    (StatusCode::INTERNAL_SERVER_ERROR, message)
1006}
1007
1008fn env_bool(name: &str, default: bool) -> bool {
1009    std::env::var(name)
1010        .ok()
1011        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1012            "1" | "true" | "yes" | "on" => Some(true),
1013            "0" | "false" | "no" | "off" => Some(false),
1014            _ => None,
1015        })
1016        .unwrap_or(default)
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::match_policy_rule;
1022
1023    #[test]
1024    fn policy_prompt_session_requires_execute_permission() {
1025        let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1026        assert_eq!(permission, Some("agent:execute"));
1027    }
1028
1029    #[test]
1030    fn policy_create_session_keeps_sessions_write_permission() {
1031        let permission = match_policy_rule("/api/session", "POST");
1032        assert_eq!(permission, Some("sessions:write"));
1033    }
1034
1035    #[test]
1036    fn policy_proposal_approval_requires_execute_permission() {
1037        let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1038        assert_eq!(permission, Some("agent:execute"));
1039    }
1040}