Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16    executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24    Router,
25    body::Body,
26    extract::Path,
27    extract::{Query, State},
28    http::{Request, StatusCode},
29    middleware::{self, Next},
30    response::sse::{Event, KeepAlive, Sse},
31    response::{Json, Response},
32    routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41/// Server state shared across handlers
42#[derive(Clone)]
43pub struct AppState {
44    pub config: Arc<Config>,
45    pub cognition: Arc<CognitionRuntime>,
46    pub audit_log: AuditLog,
47    pub k8s: Arc<K8sManager>,
48    pub auth: AuthState,
49    pub bus: Arc<AgentBus>,
50}
51
52/// Audit middleware — logs every request/response to the audit trail.
53async fn audit_middleware(
54    State(state): State<AppState>,
55    request: Request<Body>,
56    next: Next,
57) -> Response {
58    let method = request.method().clone();
59    let path = request.uri().path().to_string();
60    let started = std::time::Instant::now();
61
62    let response = next.run(request).await;
63
64    let duration_ms = started.elapsed().as_millis() as u64;
65    let status = response.status().as_u16();
66    let outcome = if status < 400 {
67        AuditOutcome::Success
68    } else if status == 401 || status == 403 {
69        AuditOutcome::Denied
70    } else {
71        AuditOutcome::Failure
72    };
73
74    state
75        .audit_log
76        .record(audit::AuditEntry {
77            id: uuid::Uuid::new_v4().to_string(),
78            timestamp: chrono::Utc::now(),
79            category: AuditCategory::Api,
80            action: format!("{} {}", method, path),
81            principal: None,
82            outcome,
83            detail: Some(serde_json::json!({ "status": status })),
84            duration_ms: Some(duration_ms),
85        })
86        .await;
87
88    response
89}
90
91/// Mapping from (path pattern, HTTP method) → OPA permission action.
92/// The first matching rule wins.
93struct PolicyRule {
94    pattern: &'static str,
95    methods: Option<&'static [&'static str]>,
96    permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100    // Public / exempt
101    PolicyRule {
102        pattern: "/health",
103        methods: None,
104        permission: "",
105    },
106    PolicyRule {
107        pattern: "/a2a/",
108        methods: None,
109        permission: "",
110    },
111    // K8s management — admin only
112    PolicyRule {
113        pattern: "/v1/k8s/scale",
114        methods: Some(&["POST"]),
115        permission: "admin:access",
116    },
117    PolicyRule {
118        pattern: "/v1/k8s/restart",
119        methods: Some(&["POST"]),
120        permission: "admin:access",
121    },
122    PolicyRule {
123        pattern: "/v1/k8s/",
124        methods: Some(&["GET"]),
125        permission: "admin:access",
126    },
127    // K8s sub-agent lifecycle — admin only
128    PolicyRule {
129        pattern: "/v1/k8s/subagent",
130        methods: Some(&["POST", "DELETE"]),
131        permission: "admin:access",
132    },
133    // Plugin registry — read
134    PolicyRule {
135        pattern: "/v1/plugins",
136        methods: Some(&["GET"]),
137        permission: "agent:read",
138    },
139    // Audit — admin
140    PolicyRule {
141        pattern: "/v1/audit",
142        methods: None,
143        permission: "admin:access",
144    },
145    // Cognition — write operations
146    PolicyRule {
147        pattern: "/v1/cognition/start",
148        methods: Some(&["POST"]),
149        permission: "agent:execute",
150    },
151    PolicyRule {
152        pattern: "/v1/cognition/stop",
153        methods: Some(&["POST"]),
154        permission: "agent:execute",
155    },
156    PolicyRule {
157        pattern: "/v1/cognition/",
158        methods: Some(&["GET"]),
159        permission: "agent:read",
160    },
161    // Swarm persona lifecycle
162    PolicyRule {
163        pattern: "/v1/swarm/personas",
164        methods: Some(&["POST"]),
165        permission: "agent:execute",
166    },
167    PolicyRule {
168        pattern: "/v1/swarm/",
169        methods: Some(&["POST"]),
170        permission: "agent:execute",
171    },
172    PolicyRule {
173        pattern: "/v1/swarm/",
174        methods: Some(&["GET"]),
175        permission: "agent:read",
176    },
177    // Session management
178    PolicyRule {
179        pattern: "/api/session",
180        methods: Some(&["POST"]),
181        permission: "sessions:write",
182    },
183    PolicyRule {
184        pattern: "/api/session/",
185        methods: Some(&["POST"]),
186        permission: "sessions:write",
187    },
188    PolicyRule {
189        pattern: "/api/session",
190        methods: Some(&["GET"]),
191        permission: "sessions:read",
192    },
193    // Config, version, providers, agents — read
194    PolicyRule {
195        pattern: "/api/version",
196        methods: None,
197        permission: "agent:read",
198    },
199    PolicyRule {
200        pattern: "/api/config",
201        methods: None,
202        permission: "agent:read",
203    },
204    PolicyRule {
205        pattern: "/api/provider",
206        methods: None,
207        permission: "agent:read",
208    },
209    PolicyRule {
210        pattern: "/api/agent",
211        methods: None,
212        permission: "agent:read",
213    },
214];
215
216/// Find the required permission for a given path + method.
217/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
218fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
219    for rule in POLICY_RULES {
220        let matches = if rule.pattern.ends_with('/') {
221            path.starts_with(rule.pattern) || path == &rule.pattern[..rule.pattern.len() - 1]
222        } else {
223            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
224        };
225        if matches {
226            if let Some(allowed_methods) = rule.methods {
227                if !allowed_methods.contains(&method) {
228                    continue;
229                }
230            }
231            return Some(rule.permission);
232        }
233    }
234    None
235}
236
237/// Policy authorization middleware for Axum.
238///
239/// Maps request paths to OPA permission strings and enforces authorization.
240/// Runs after `require_auth` so the bearer token is already validated.
241/// Currently maps the static bearer token to an admin role since
242/// codetether-agent uses a single shared token model.
243async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
244    let path = request.uri().path().to_string();
245    let method = request.method().as_str().to_string();
246
247    let permission = match match_policy_rule(&path, &method) {
248        None | Some("") => return Ok(next.run(request).await),
249        Some(perm) => perm,
250    };
251
252    // The current auth model uses a single static token for all access.
253    // When this is the case, the authenticated user effectively has admin role.
254    // Future: extract user claims from JWT and build a proper PolicyUser.
255    let user = policy::PolicyUser {
256        user_id: "bearer-token-user".to_string(),
257        roles: vec!["admin".to_string()],
258        tenant_id: None,
259        scopes: vec![],
260        auth_source: "static_token".to_string(),
261    };
262
263    if let Err(status) = policy::enforce_policy(&user, permission, None).await {
264        tracing::warn!(
265            path = %path,
266            method = %method,
267            permission = %permission,
268            "Policy middleware denied request"
269        );
270        return Err(status);
271    }
272
273    Ok(next.run(request).await)
274}
275
276/// Start the HTTP server
277pub async fn serve(args: ServeArgs) -> Result<()> {
278    let t0 = std::time::Instant::now();
279    tracing::info!("[startup] begin");
280    let config = Config::load().await?;
281    tracing::info!(
282        elapsed_ms = t0.elapsed().as_millis(),
283        "[startup] config loaded"
284    );
285    let mut cognition = CognitionRuntime::new_from_env();
286    tracing::info!(
287        elapsed_ms = t0.elapsed().as_millis(),
288        "[startup] cognition runtime created"
289    );
290
291    // Set up tool registry for cognition execution engine.
292    cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
293    tracing::info!(
294        elapsed_ms = t0.elapsed().as_millis(),
295        "[startup] tools registered"
296    );
297    let cognition = Arc::new(cognition);
298
299    // Initialize audit log.
300    let audit_log = AuditLog::from_env();
301    let _ = audit::init_audit_log(audit_log.clone());
302
303    // Initialize K8s manager.
304    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
305    let k8s = Arc::new(K8sManager::new().await);
306    tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
307    if k8s.is_available() {
308        tracing::info!("K8s self-deployment enabled");
309    }
310
311    // Initialize mandatory auth.
312    let auth_state = AuthState::from_env();
313    tracing::info!(
314        token_len = auth_state.token().len(),
315        "Auth is mandatory. Token required for all API endpoints."
316    );
317    tracing::info!(
318        audit_entries = audit_log.count().await,
319        "Audit log initialized"
320    );
321
322    // Create agent bus for in-process communication
323    let bus = AgentBus::new().into_arc();
324    tracing::info!(
325        elapsed_ms = t0.elapsed().as_millis(),
326        "[startup] bus created"
327    );
328
329    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
330        tracing::info!(
331            elapsed_ms = t0.elapsed().as_millis(),
332            "[startup] auto-starting cognition"
333        );
334        if let Err(error) = cognition.start(None).await {
335            tracing::warn!(%error, "Failed to auto-start cognition loop");
336        } else {
337            tracing::info!("Perpetual cognition auto-started");
338        }
339    }
340
341    tracing::info!(
342        elapsed_ms = t0.elapsed().as_millis(),
343        "[startup] building routes"
344    );
345    let addr = format!("{}:{}", args.hostname, args.port);
346
347    // Build the agent card
348    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
349    let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
350
351    // Build A2A router separately
352    let a2a_router = a2a_server.router();
353
354    // Start gRPC transport on a separate port
355    let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
356        .ok()
357        .and_then(|p| p.parse::<u16>().ok())
358        .unwrap_or(50051);
359    let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
360    let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
361    let grpc_service = grpc_store.into_service();
362    tokio::spawn(async move {
363        tracing::info!("gRPC A2A server listening on {}", grpc_addr);
364        if let Err(e) = tonic::transport::Server::builder()
365            .add_service(grpc_service)
366            .serve(grpc_addr)
367            .await
368        {
369            tracing::error!("gRPC server error: {}", e);
370        }
371    });
372
373    let state = AppState {
374        config: Arc::new(config),
375        cognition,
376        audit_log,
377        k8s,
378        auth: auth_state.clone(),
379        bus,
380    };
381
382    let app = Router::new()
383        // Health check (public — auth exempt)
384        .route("/health", get(health))
385        // API routes
386        .route("/api/version", get(get_version))
387        .route("/api/session", get(list_sessions).post(create_session))
388        .route("/api/session/{id}", get(get_session))
389        .route("/api/session/{id}/prompt", post(prompt_session))
390        .route("/api/config", get(get_config))
391        .route("/api/provider", get(list_providers))
392        .route("/api/agent", get(list_agents))
393        // Perpetual cognition APIs
394        .route("/v1/cognition/start", post(start_cognition))
395        .route("/v1/cognition/stop", post(stop_cognition))
396        .route("/v1/cognition/status", get(get_cognition_status))
397        .route("/v1/cognition/stream", get(stream_cognition))
398        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
399        // Swarm persona lifecycle APIs
400        .route("/v1/swarm/personas", post(create_persona))
401        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
402        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
403        .route("/v1/swarm/lineage", get(get_swarm_lineage))
404        // Belief, attention, governance, workspace APIs
405        .route("/v1/cognition/beliefs", get(list_beliefs))
406        .route("/v1/cognition/beliefs/{id}", get(get_belief))
407        .route("/v1/cognition/attention", get(list_attention))
408        .route("/v1/cognition/proposals", get(list_proposals))
409        .route(
410            "/v1/cognition/proposals/{id}/approve",
411            post(approve_proposal),
412        )
413        .route("/v1/cognition/receipts", get(list_receipts))
414        .route("/v1/cognition/workspace", get(get_workspace))
415        .route("/v1/cognition/governance", get(get_governance))
416        .route("/v1/cognition/personas/{id}", get(get_persona))
417        // Audit trail API
418        .route("/v1/audit", get(list_audit_entries))
419        // K8s self-deployment APIs
420        .route("/v1/k8s/status", get(get_k8s_status))
421        .route("/v1/k8s/scale", post(k8s_scale))
422        .route("/v1/k8s/restart", post(k8s_restart))
423        .route("/v1/k8s/pods", get(k8s_list_pods))
424        .route("/v1/k8s/actions", get(k8s_actions))
425        .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
426        .route(
427            "/v1/k8s/subagent/{id}",
428            axum::routing::delete(k8s_delete_subagent),
429        )
430        // Plugin registry API
431        .route("/v1/plugins", get(list_plugins))
432        .with_state(state.clone())
433        // A2A routes (nested to work with different state type)
434        .nest("/a2a", a2a_router)
435        // Mandatory auth middleware — applies to all routes
436        .layer(middleware::from_fn_with_state(
437            state.clone(),
438            audit_middleware,
439        ))
440        .layer(axum::Extension(state.auth.clone()))
441        .layer(middleware::from_fn(policy_middleware))
442        .layer(middleware::from_fn(auth::require_auth))
443        // CORS + tracing
444        .layer(
445            CorsLayer::new()
446                .allow_origin(AllowOrigin::mirror_request())
447                .allow_credentials(true)
448                .allow_methods(AllowMethods::mirror_request())
449                .allow_headers(AllowHeaders::mirror_request()),
450        )
451        .layer(TraceLayer::new_for_http());
452
453    tracing::info!(
454        elapsed_ms = t0.elapsed().as_millis(),
455        "[startup] router built, binding"
456    );
457    let listener = tokio::net::TcpListener::bind(&addr).await?;
458    tracing::info!(
459        elapsed_ms = t0.elapsed().as_millis(),
460        "[startup] listening on http://{}",
461        addr
462    );
463
464    axum::serve(listener, app).await?;
465
466    Ok(())
467}
468
469/// Health check response
470async fn health() -> &'static str {
471    "ok"
472}
473
474/// Version info
475#[derive(Serialize)]
476struct VersionInfo {
477    version: &'static str,
478    name: &'static str,
479    binary_hash: Option<String>,
480}
481
482async fn get_version() -> Json<VersionInfo> {
483    let binary_hash = std::env::current_exe()
484        .ok()
485        .and_then(|p| hash_file(&p).ok());
486    Json(VersionInfo {
487        version: env!("CARGO_PKG_VERSION"),
488        name: env!("CARGO_PKG_NAME"),
489        binary_hash,
490    })
491}
492
493/// List sessions
494#[derive(Deserialize)]
495struct ListSessionsQuery {
496    limit: Option<usize>,
497}
498
499async fn list_sessions(
500    Query(query): Query<ListSessionsQuery>,
501) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
502    let sessions = crate::session::list_sessions()
503        .await
504        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
505
506    let limit = query.limit.unwrap_or(50);
507    Ok(Json(sessions.into_iter().take(limit).collect()))
508}
509
510/// Create a new session
511#[derive(Deserialize)]
512struct CreateSessionRequest {
513    title: Option<String>,
514    agent: Option<String>,
515}
516
517async fn create_session(
518    Json(req): Json<CreateSessionRequest>,
519) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
520    let mut session = crate::session::Session::new()
521        .await
522        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
523
524    session.title = req.title;
525    if let Some(agent) = req.agent {
526        session.agent = agent;
527    }
528
529    session
530        .save()
531        .await
532        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
533
534    Ok(Json(session))
535}
536
537/// Get a session by ID
538async fn get_session(
539    axum::extract::Path(id): axum::extract::Path<String>,
540) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
541    let session = crate::session::Session::load(&id)
542        .await
543        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
544
545    Ok(Json(session))
546}
547
548/// Prompt a session
549#[derive(Deserialize)]
550struct PromptRequest {
551    message: String,
552}
553
554async fn prompt_session(
555    axum::extract::Path(id): axum::extract::Path<String>,
556    Json(req): Json<PromptRequest>,
557) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
558    // Validate the message is not empty
559    if req.message.trim().is_empty() {
560        return Err((
561            StatusCode::BAD_REQUEST,
562            "Message cannot be empty".to_string(),
563        ));
564    }
565
566    // Log the prompt request (uses the message field)
567    tracing::info!(
568        session_id = %id,
569        message_len = req.message.len(),
570        "Received prompt request"
571    );
572
573    // TODO: Implement actual prompting
574    Err((
575        StatusCode::NOT_IMPLEMENTED,
576        "Prompt execution not yet implemented".to_string(),
577    ))
578}
579
580/// Get configuration
581async fn get_config(State(state): State<AppState>) -> Json<Config> {
582    Json((*state.config).clone())
583}
584
585/// List providers
586async fn list_providers() -> Json<Vec<String>> {
587    Json(vec![
588        "openai".to_string(),
589        "anthropic".to_string(),
590        "google".to_string(),
591    ])
592}
593
594/// List agents
595async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
596    let registry = crate::agent::AgentRegistry::with_builtins();
597    Json(registry.list().into_iter().cloned().collect())
598}
599
600async fn start_cognition(
601    State(state): State<AppState>,
602    payload: Option<Json<StartCognitionRequest>>,
603) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
604    state
605        .cognition
606        .start(payload.map(|Json(body)| body))
607        .await
608        .map(Json)
609        .map_err(internal_error)
610}
611
612async fn stop_cognition(
613    State(state): State<AppState>,
614    payload: Option<Json<StopCognitionRequest>>,
615) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
616    let reason = payload.and_then(|Json(body)| body.reason);
617    state
618        .cognition
619        .stop(reason)
620        .await
621        .map(Json)
622        .map_err(internal_error)
623}
624
625async fn get_cognition_status(
626    State(state): State<AppState>,
627) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
628    Ok(Json(state.cognition.status().await))
629}
630
631async fn stream_cognition(
632    State(state): State<AppState>,
633) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
634    let rx = state.cognition.subscribe_events();
635
636    let event_stream = stream::unfold(rx, |mut rx| async move {
637        match rx.recv().await {
638            Ok(event) => {
639                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
640                let sse_event = Event::default().event("cognition").data(payload);
641                Some((Ok(sse_event), rx))
642            }
643            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
644                let lag_event = Event::default()
645                    .event("lag")
646                    .data(format!("skipped {}", skipped));
647                Some((Ok(lag_event), rx))
648            }
649            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
650        }
651    });
652
653    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
654}
655
656async fn get_latest_snapshot(
657    State(state): State<AppState>,
658) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
659    match state.cognition.latest_snapshot().await {
660        Some(snapshot) => Ok(Json(snapshot)),
661        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
662    }
663}
664
665async fn create_persona(
666    State(state): State<AppState>,
667    Json(req): Json<CreatePersonaRequest>,
668) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
669    state
670        .cognition
671        .create_persona(req)
672        .await
673        .map(Json)
674        .map_err(internal_error)
675}
676
677async fn spawn_persona(
678    State(state): State<AppState>,
679    Path(id): Path<String>,
680    Json(req): Json<SpawnPersonaRequest>,
681) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
682    state
683        .cognition
684        .spawn_child(&id, req)
685        .await
686        .map(Json)
687        .map_err(internal_error)
688}
689
690async fn reap_persona(
691    State(state): State<AppState>,
692    Path(id): Path<String>,
693    payload: Option<Json<ReapPersonaRequest>>,
694) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
695    let req = payload
696        .map(|Json(body)| body)
697        .unwrap_or(ReapPersonaRequest {
698            cascade: Some(false),
699            reason: None,
700        });
701
702    state
703        .cognition
704        .reap_persona(&id, req)
705        .await
706        .map(Json)
707        .map_err(internal_error)
708}
709
710async fn get_swarm_lineage(
711    State(state): State<AppState>,
712) -> Result<Json<LineageGraph>, (StatusCode, String)> {
713    Ok(Json(state.cognition.lineage_graph().await))
714}
715
716// ── Belief, Attention, Governance, Workspace handlers ──
717
718#[derive(Deserialize)]
719struct BeliefFilter {
720    status: Option<String>,
721    persona: Option<String>,
722}
723
724async fn list_beliefs(
725    State(state): State<AppState>,
726    Query(filter): Query<BeliefFilter>,
727) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
728    let beliefs = state.cognition.get_beliefs().await;
729    let mut result: Vec<Belief> = beliefs.into_values().collect();
730
731    if let Some(status) = &filter.status {
732        result.retain(|b| {
733            let s = serde_json::to_string(&b.status).unwrap_or_default();
734            s.contains(status)
735        });
736    }
737    if let Some(persona) = &filter.persona {
738        result.retain(|b| &b.asserted_by == persona);
739    }
740
741    result.sort_by(|a, b| {
742        b.confidence
743            .partial_cmp(&a.confidence)
744            .unwrap_or(std::cmp::Ordering::Equal)
745    });
746    Ok(Json(result))
747}
748
749async fn get_belief(
750    State(state): State<AppState>,
751    Path(id): Path<String>,
752) -> Result<Json<Belief>, (StatusCode, String)> {
753    match state.cognition.get_belief(&id).await {
754        Some(belief) => Ok(Json(belief)),
755        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
756    }
757}
758
759async fn list_attention(
760    State(state): State<AppState>,
761) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
762    Ok(Json(state.cognition.get_attention_queue().await))
763}
764
765async fn list_proposals(
766    State(state): State<AppState>,
767) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
768    let proposals = state.cognition.get_proposals().await;
769    let mut result: Vec<Proposal> = proposals.into_values().collect();
770    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
771    Ok(Json(result))
772}
773
774async fn approve_proposal(
775    State(state): State<AppState>,
776    Path(id): Path<String>,
777) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
778    state
779        .cognition
780        .approve_proposal(&id)
781        .await
782        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
783        .map_err(internal_error)
784}
785
786async fn list_receipts(
787    State(state): State<AppState>,
788) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
789    Ok(Json(state.cognition.get_receipts().await))
790}
791
792async fn get_workspace(
793    State(state): State<AppState>,
794) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
795    Ok(Json(state.cognition.get_workspace().await))
796}
797
798async fn get_governance(
799    State(state): State<AppState>,
800) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
801    Ok(Json(state.cognition.get_governance().await))
802}
803
804async fn get_persona(
805    State(state): State<AppState>,
806    axum::extract::Path(id): axum::extract::Path<String>,
807) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
808    state
809        .cognition
810        .get_persona(&id)
811        .await
812        .map(Json)
813        .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
814}
815
816// ── Audit trail endpoints ──
817
818#[derive(Deserialize)]
819struct AuditQuery {
820    limit: Option<usize>,
821    category: Option<String>,
822}
823
824async fn list_audit_entries(
825    State(state): State<AppState>,
826    Query(query): Query<AuditQuery>,
827) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
828    let limit = query.limit.unwrap_or(100).min(1000);
829
830    let entries = if let Some(ref cat) = query.category {
831        let category = match cat.as_str() {
832            "api" => AuditCategory::Api,
833            "tool" | "tool_execution" => AuditCategory::ToolExecution,
834            "session" => AuditCategory::Session,
835            "cognition" => AuditCategory::Cognition,
836            "swarm" => AuditCategory::Swarm,
837            "auth" => AuditCategory::Auth,
838            "k8s" => AuditCategory::K8s,
839            "sandbox" => AuditCategory::Sandbox,
840            "config" => AuditCategory::Config,
841            _ => {
842                return Err((
843                    StatusCode::BAD_REQUEST,
844                    format!("Unknown category: {}", cat),
845                ));
846            }
847        };
848        state.audit_log.by_category(category, limit).await
849    } else {
850        state.audit_log.recent(limit).await
851    };
852
853    Ok(Json(entries))
854}
855
856// ── K8s self-deployment endpoints ──
857
858async fn get_k8s_status(
859    State(state): State<AppState>,
860) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
861    Ok(Json(state.k8s.status().await))
862}
863
864#[derive(Deserialize)]
865struct ScaleRequest {
866    replicas: i32,
867}
868
869async fn k8s_scale(
870    State(state): State<AppState>,
871    Json(req): Json<ScaleRequest>,
872) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
873    if req.replicas < 0 || req.replicas > 100 {
874        return Err((
875            StatusCode::BAD_REQUEST,
876            "Replicas must be between 0 and 100".to_string(),
877        ));
878    }
879
880    state
881        .audit_log
882        .log(
883            AuditCategory::K8s,
884            format!("scale:{}", req.replicas),
885            AuditOutcome::Success,
886            None,
887            None,
888        )
889        .await;
890
891    state
892        .k8s
893        .scale(req.replicas)
894        .await
895        .map(Json)
896        .map_err(internal_error)
897}
898
899async fn k8s_restart(
900    State(state): State<AppState>,
901) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
902    state
903        .audit_log
904        .log(
905            AuditCategory::K8s,
906            "rolling_restart",
907            AuditOutcome::Success,
908            None,
909            None,
910        )
911        .await;
912
913    state
914        .k8s
915        .rolling_restart()
916        .await
917        .map(Json)
918        .map_err(internal_error)
919}
920
921async fn k8s_list_pods(
922    State(state): State<AppState>,
923) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
924    state
925        .k8s
926        .list_pods()
927        .await
928        .map(Json)
929        .map_err(internal_error)
930}
931
932async fn k8s_actions(
933    State(state): State<AppState>,
934) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
935    Ok(Json(state.k8s.recent_actions(100).await))
936}
937
938#[derive(Deserialize)]
939struct SpawnSubagentRequest {
940    subagent_id: String,
941    #[serde(default)]
942    image: Option<String>,
943    #[serde(default)]
944    env_vars: std::collections::HashMap<String, String>,
945}
946
947async fn k8s_spawn_subagent(
948    State(state): State<AppState>,
949    Json(req): Json<SpawnSubagentRequest>,
950) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
951    state
952        .k8s
953        .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
954        .await
955        .map(Json)
956        .map_err(internal_error)
957}
958
959async fn k8s_delete_subagent(
960    State(state): State<AppState>,
961    axum::extract::Path(id): axum::extract::Path<String>,
962) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
963    state
964        .k8s
965        .delete_subagent_pod(&id)
966        .await
967        .map(Json)
968        .map_err(internal_error)
969}
970
971/// List registered plugins.
972async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
973    let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
974    let signing_key = SigningKey::from_env();
975    let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
976    Json(PluginListResponse {
977        server_fingerprint,
978        signing_available: !test_sig.is_empty(),
979        plugins: Vec::<PluginManifest>::new(),
980    })
981}
982
983#[derive(Serialize)]
984struct PluginListResponse {
985    server_fingerprint: String,
986    signing_available: bool,
987    plugins: Vec<PluginManifest>,
988}
989
990fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
991    let message = error.to_string();
992    if message.contains("not found") {
993        return (StatusCode::NOT_FOUND, message);
994    }
995    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
996        return (StatusCode::BAD_REQUEST, message);
997    }
998    (StatusCode::INTERNAL_SERVER_ERROR, message)
999}
1000
1001fn env_bool(name: &str, default: bool) -> bool {
1002    std::env::var(name)
1003        .ok()
1004        .and_then(|v| match v.to_ascii_lowercase().as_str() {
1005            "1" | "true" | "yes" | "on" => Some(true),
1006            "0" | "false" | "no" | "off" => Some(false),
1007            _ => None,
1008        })
1009        .unwrap_or(default)
1010}