Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::cli::ServeArgs;
11use crate::cognition::{
12    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
13    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
14    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
15    executor::DecisionReceipt,
16};
17use crate::config::Config;
18use crate::k8s::K8sManager;
19use anyhow::Result;
20use auth::AuthState;
21use axum::{
22    Router,
23    body::Body,
24    extract::Path,
25    extract::{Query, State},
26    http::{Request, StatusCode},
27    middleware::{self, Next},
28    response::sse::{Event, KeepAlive, Sse},
29    response::{Json, Response},
30    routing::{get, post},
31};
32use futures::stream;
33use serde::{Deserialize, Serialize};
34use std::convert::Infallible;
35use std::sync::Arc;
36use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
37use tower_http::trace::TraceLayer;
38
39/// Server state shared across handlers
40#[derive(Clone)]
41pub struct AppState {
42    pub config: Arc<Config>,
43    pub cognition: Arc<CognitionRuntime>,
44    pub audit_log: AuditLog,
45    pub k8s: Arc<K8sManager>,
46    pub auth: AuthState,
47}
48
49/// Audit middleware — logs every request/response to the audit trail.
50async fn audit_middleware(
51    State(state): State<AppState>,
52    request: Request<Body>,
53    next: Next,
54) -> Response {
55    let method = request.method().clone();
56    let path = request.uri().path().to_string();
57    let started = std::time::Instant::now();
58
59    let response = next.run(request).await;
60
61    let duration_ms = started.elapsed().as_millis() as u64;
62    let status = response.status().as_u16();
63    let outcome = if status < 400 {
64        AuditOutcome::Success
65    } else if status == 401 || status == 403 {
66        AuditOutcome::Denied
67    } else {
68        AuditOutcome::Failure
69    };
70
71    state
72        .audit_log
73        .record(audit::AuditEntry {
74            id: uuid::Uuid::new_v4().to_string(),
75            timestamp: chrono::Utc::now(),
76            category: AuditCategory::Api,
77            action: format!("{} {}", method, path),
78            principal: None,
79            outcome,
80            detail: Some(serde_json::json!({ "status": status })),
81            duration_ms: Some(duration_ms),
82        })
83        .await;
84
85    response
86}
87
88/// Mapping from (path pattern, HTTP method) → OPA permission action.
89/// The first matching rule wins.
90struct PolicyRule {
91    pattern: &'static str,
92    methods: Option<&'static [&'static str]>,
93    permission: &'static str,
94}
95
96const POLICY_RULES: &[PolicyRule] = &[
97    // Public / exempt
98    PolicyRule {
99        pattern: "/health",
100        methods: None,
101        permission: "",
102    },
103    PolicyRule {
104        pattern: "/a2a/",
105        methods: None,
106        permission: "",
107    },
108    // K8s management — admin only
109    PolicyRule {
110        pattern: "/v1/k8s/scale",
111        methods: Some(&["POST"]),
112        permission: "admin:access",
113    },
114    PolicyRule {
115        pattern: "/v1/k8s/restart",
116        methods: Some(&["POST"]),
117        permission: "admin:access",
118    },
119    PolicyRule {
120        pattern: "/v1/k8s/",
121        methods: Some(&["GET"]),
122        permission: "admin:access",
123    },
124    // Audit — admin
125    PolicyRule {
126        pattern: "/v1/audit",
127        methods: None,
128        permission: "admin:access",
129    },
130    // Cognition — write operations
131    PolicyRule {
132        pattern: "/v1/cognition/start",
133        methods: Some(&["POST"]),
134        permission: "agent:execute",
135    },
136    PolicyRule {
137        pattern: "/v1/cognition/stop",
138        methods: Some(&["POST"]),
139        permission: "agent:execute",
140    },
141    PolicyRule {
142        pattern: "/v1/cognition/",
143        methods: Some(&["GET"]),
144        permission: "agent:read",
145    },
146    // Swarm persona lifecycle
147    PolicyRule {
148        pattern: "/v1/swarm/personas",
149        methods: Some(&["POST"]),
150        permission: "agent:execute",
151    },
152    PolicyRule {
153        pattern: "/v1/swarm/",
154        methods: Some(&["POST"]),
155        permission: "agent:execute",
156    },
157    PolicyRule {
158        pattern: "/v1/swarm/",
159        methods: Some(&["GET"]),
160        permission: "agent:read",
161    },
162    // Session management
163    PolicyRule {
164        pattern: "/api/session",
165        methods: Some(&["POST"]),
166        permission: "sessions:write",
167    },
168    PolicyRule {
169        pattern: "/api/session/",
170        methods: Some(&["POST"]),
171        permission: "sessions:write",
172    },
173    PolicyRule {
174        pattern: "/api/session",
175        methods: Some(&["GET"]),
176        permission: "sessions:read",
177    },
178    // Config, version, providers, agents — read
179    PolicyRule {
180        pattern: "/api/version",
181        methods: None,
182        permission: "agent:read",
183    },
184    PolicyRule {
185        pattern: "/api/config",
186        methods: None,
187        permission: "agent:read",
188    },
189    PolicyRule {
190        pattern: "/api/provider",
191        methods: None,
192        permission: "agent:read",
193    },
194    PolicyRule {
195        pattern: "/api/agent",
196        methods: None,
197        permission: "agent:read",
198    },
199];
200
201/// Find the required permission for a given path + method.
202/// Returns `Some("")` for exempt, `Some(perm)` for required, `None` for unmatched (pass-through).
203fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
204    for rule in POLICY_RULES {
205        let matches = if rule.pattern.ends_with('/') {
206            path.starts_with(rule.pattern) || path == &rule.pattern[..rule.pattern.len() - 1]
207        } else {
208            path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
209        };
210        if matches {
211            if let Some(allowed_methods) = rule.methods {
212                if !allowed_methods.contains(&method) {
213                    continue;
214                }
215            }
216            return Some(rule.permission);
217        }
218    }
219    None
220}
221
222/// Policy authorization middleware for Axum.
223///
224/// Maps request paths to OPA permission strings and enforces authorization.
225/// Runs after `require_auth` so the bearer token is already validated.
226/// Currently maps the static bearer token to an admin role since
227/// codetether-agent uses a single shared token model.
228async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
229    let path = request.uri().path().to_string();
230    let method = request.method().as_str().to_string();
231
232    let permission = match match_policy_rule(&path, &method) {
233        None | Some("") => return Ok(next.run(request).await),
234        Some(perm) => perm,
235    };
236
237    // The current auth model uses a single static token for all access.
238    // When this is the case, the authenticated user effectively has admin role.
239    // Future: extract user claims from JWT and build a proper PolicyUser.
240    let user = policy::PolicyUser {
241        user_id: "bearer-token-user".to_string(),
242        roles: vec!["admin".to_string()],
243        tenant_id: None,
244        scopes: vec![],
245        auth_source: "static_token".to_string(),
246    };
247
248    if !policy::check_policy(&user, permission, None).await {
249        tracing::warn!(
250            path = %path,
251            method = %method,
252            permission = %permission,
253            "Policy middleware denied request"
254        );
255        return Err(StatusCode::FORBIDDEN);
256    }
257
258    Ok(next.run(request).await)
259}
260
261/// Start the HTTP server
262pub async fn serve(args: ServeArgs) -> Result<()> {
263    let config = Config::load().await?;
264    let cognition = Arc::new(CognitionRuntime::new_from_env());
265
266    // Initialize audit log.
267    let audit_log = AuditLog::from_env();
268    let _ = audit::init_audit_log(audit_log.clone());
269
270    // Initialize K8s manager.
271    let k8s = Arc::new(K8sManager::new().await);
272    if k8s.is_available() {
273        tracing::info!("K8s self-deployment enabled");
274    }
275
276    // Initialize mandatory auth.
277    let auth_state = AuthState::from_env();
278    tracing::info!("Auth is mandatory. Token required for all API endpoints.");
279
280    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
281        if let Err(error) = cognition.start(None).await {
282            tracing::warn!(%error, "Failed to auto-start cognition loop");
283        } else {
284            tracing::info!("Perpetual cognition auto-started");
285        }
286    }
287
288    let state = AppState {
289        config: Arc::new(config),
290        cognition,
291        audit_log,
292        k8s,
293        auth: auth_state.clone(),
294    };
295
296    let addr = format!("{}:{}", args.hostname, args.port);
297
298    // Build the agent card
299    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
300    let a2a_server = a2a::server::A2AServer::new(agent_card);
301
302    // Build A2A router separately
303    let a2a_router = a2a_server.router();
304
305    let app = Router::new()
306        // Health check (public — auth exempt)
307        .route("/health", get(health))
308        // API routes
309        .route("/api/version", get(get_version))
310        .route("/api/session", get(list_sessions).post(create_session))
311        .route("/api/session/{id}", get(get_session))
312        .route("/api/session/{id}/prompt", post(prompt_session))
313        .route("/api/config", get(get_config))
314        .route("/api/provider", get(list_providers))
315        .route("/api/agent", get(list_agents))
316        // Perpetual cognition APIs
317        .route("/v1/cognition/start", post(start_cognition))
318        .route("/v1/cognition/stop", post(stop_cognition))
319        .route("/v1/cognition/status", get(get_cognition_status))
320        .route("/v1/cognition/stream", get(stream_cognition))
321        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
322        // Swarm persona lifecycle APIs
323        .route("/v1/swarm/personas", post(create_persona))
324        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
325        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
326        .route("/v1/swarm/lineage", get(get_swarm_lineage))
327        // Belief, attention, governance, workspace APIs
328        .route("/v1/cognition/beliefs", get(list_beliefs))
329        .route("/v1/cognition/beliefs/{id}", get(get_belief))
330        .route("/v1/cognition/attention", get(list_attention))
331        .route("/v1/cognition/proposals", get(list_proposals))
332        .route(
333            "/v1/cognition/proposals/{id}/approve",
334            post(approve_proposal),
335        )
336        .route("/v1/cognition/receipts", get(list_receipts))
337        .route("/v1/cognition/workspace", get(get_workspace))
338        // Audit trail API
339        .route("/v1/audit", get(list_audit_entries))
340        // K8s self-deployment APIs
341        .route("/v1/k8s/status", get(get_k8s_status))
342        .route("/v1/k8s/scale", post(k8s_scale))
343        .route("/v1/k8s/restart", post(k8s_restart))
344        .route("/v1/k8s/pods", get(k8s_list_pods))
345        .route("/v1/k8s/actions", get(k8s_actions))
346        .with_state(state.clone())
347        // A2A routes (nested to work with different state type)
348        .nest("/a2a", a2a_router)
349        // Mandatory auth middleware — applies to all routes
350        .layer(middleware::from_fn_with_state(
351            state.clone(),
352            audit_middleware,
353        ))
354        .layer(axum::Extension(state.auth.clone()))
355        .layer(middleware::from_fn(policy_middleware))
356        .layer(middleware::from_fn(auth::require_auth))
357        // CORS + tracing
358        .layer(
359            CorsLayer::new()
360                .allow_origin(AllowOrigin::mirror_request())
361                .allow_credentials(true)
362                .allow_methods(AllowMethods::mirror_request())
363                .allow_headers(AllowHeaders::mirror_request()),
364        )
365        .layer(TraceLayer::new_for_http());
366
367    let listener = tokio::net::TcpListener::bind(&addr).await?;
368    tracing::info!("Server listening on http://{}", addr);
369
370    axum::serve(listener, app).await?;
371
372    Ok(())
373}
374
375/// Health check response
376async fn health() -> &'static str {
377    "ok"
378}
379
380/// Version info
381#[derive(Serialize)]
382struct VersionInfo {
383    version: &'static str,
384    name: &'static str,
385}
386
387async fn get_version() -> Json<VersionInfo> {
388    Json(VersionInfo {
389        version: env!("CARGO_PKG_VERSION"),
390        name: env!("CARGO_PKG_NAME"),
391    })
392}
393
394/// List sessions
395#[derive(Deserialize)]
396struct ListSessionsQuery {
397    limit: Option<usize>,
398}
399
400async fn list_sessions(
401    Query(query): Query<ListSessionsQuery>,
402) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
403    let sessions = crate::session::list_sessions()
404        .await
405        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
406
407    let limit = query.limit.unwrap_or(50);
408    Ok(Json(sessions.into_iter().take(limit).collect()))
409}
410
411/// Create a new session
412#[derive(Deserialize)]
413struct CreateSessionRequest {
414    title: Option<String>,
415    agent: Option<String>,
416}
417
418async fn create_session(
419    Json(req): Json<CreateSessionRequest>,
420) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
421    let mut session = crate::session::Session::new()
422        .await
423        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
424
425    session.title = req.title;
426    if let Some(agent) = req.agent {
427        session.agent = agent;
428    }
429
430    session
431        .save()
432        .await
433        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
434
435    Ok(Json(session))
436}
437
438/// Get a session by ID
439async fn get_session(
440    axum::extract::Path(id): axum::extract::Path<String>,
441) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
442    let session = crate::session::Session::load(&id)
443        .await
444        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
445
446    Ok(Json(session))
447}
448
449/// Prompt a session
450#[derive(Deserialize)]
451struct PromptRequest {
452    message: String,
453}
454
455async fn prompt_session(
456    axum::extract::Path(id): axum::extract::Path<String>,
457    Json(req): Json<PromptRequest>,
458) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
459    // Validate the message is not empty
460    if req.message.trim().is_empty() {
461        return Err((
462            StatusCode::BAD_REQUEST,
463            "Message cannot be empty".to_string(),
464        ));
465    }
466
467    // Log the prompt request (uses the message field)
468    tracing::info!(
469        session_id = %id,
470        message_len = req.message.len(),
471        "Received prompt request"
472    );
473
474    // TODO: Implement actual prompting
475    Err((
476        StatusCode::NOT_IMPLEMENTED,
477        "Prompt execution not yet implemented".to_string(),
478    ))
479}
480
481/// Get configuration
482async fn get_config(State(state): State<AppState>) -> Json<Config> {
483    Json((*state.config).clone())
484}
485
486/// List providers
487async fn list_providers() -> Json<Vec<String>> {
488    Json(vec![
489        "openai".to_string(),
490        "anthropic".to_string(),
491        "google".to_string(),
492    ])
493}
494
495/// List agents
496async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
497    let registry = crate::agent::AgentRegistry::with_builtins();
498    Json(registry.list().into_iter().cloned().collect())
499}
500
501async fn start_cognition(
502    State(state): State<AppState>,
503    payload: Option<Json<StartCognitionRequest>>,
504) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
505    state
506        .cognition
507        .start(payload.map(|Json(body)| body))
508        .await
509        .map(Json)
510        .map_err(internal_error)
511}
512
513async fn stop_cognition(
514    State(state): State<AppState>,
515    payload: Option<Json<StopCognitionRequest>>,
516) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
517    let reason = payload.and_then(|Json(body)| body.reason);
518    state
519        .cognition
520        .stop(reason)
521        .await
522        .map(Json)
523        .map_err(internal_error)
524}
525
526async fn get_cognition_status(
527    State(state): State<AppState>,
528) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
529    Ok(Json(state.cognition.status().await))
530}
531
532async fn stream_cognition(
533    State(state): State<AppState>,
534) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
535    let rx = state.cognition.subscribe_events();
536
537    let event_stream = stream::unfold(rx, |mut rx| async move {
538        match rx.recv().await {
539            Ok(event) => {
540                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
541                let sse_event = Event::default().event("cognition").data(payload);
542                Some((Ok(sse_event), rx))
543            }
544            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
545                let lag_event = Event::default()
546                    .event("lag")
547                    .data(format!("skipped {}", skipped));
548                Some((Ok(lag_event), rx))
549            }
550            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
551        }
552    });
553
554    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
555}
556
557async fn get_latest_snapshot(
558    State(state): State<AppState>,
559) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
560    match state.cognition.latest_snapshot().await {
561        Some(snapshot) => Ok(Json(snapshot)),
562        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
563    }
564}
565
566async fn create_persona(
567    State(state): State<AppState>,
568    Json(req): Json<CreatePersonaRequest>,
569) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
570    state
571        .cognition
572        .create_persona(req)
573        .await
574        .map(Json)
575        .map_err(internal_error)
576}
577
578async fn spawn_persona(
579    State(state): State<AppState>,
580    Path(id): Path<String>,
581    Json(req): Json<SpawnPersonaRequest>,
582) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
583    state
584        .cognition
585        .spawn_child(&id, req)
586        .await
587        .map(Json)
588        .map_err(internal_error)
589}
590
591async fn reap_persona(
592    State(state): State<AppState>,
593    Path(id): Path<String>,
594    payload: Option<Json<ReapPersonaRequest>>,
595) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
596    let req = payload
597        .map(|Json(body)| body)
598        .unwrap_or(ReapPersonaRequest {
599            cascade: Some(false),
600            reason: None,
601        });
602
603    state
604        .cognition
605        .reap_persona(&id, req)
606        .await
607        .map(Json)
608        .map_err(internal_error)
609}
610
611async fn get_swarm_lineage(
612    State(state): State<AppState>,
613) -> Result<Json<LineageGraph>, (StatusCode, String)> {
614    Ok(Json(state.cognition.lineage_graph().await))
615}
616
617// ── Belief, Attention, Governance, Workspace handlers ──
618
619#[derive(Deserialize)]
620struct BeliefFilter {
621    status: Option<String>,
622    persona: Option<String>,
623}
624
625async fn list_beliefs(
626    State(state): State<AppState>,
627    Query(filter): Query<BeliefFilter>,
628) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
629    let beliefs = state.cognition.get_beliefs().await;
630    let mut result: Vec<Belief> = beliefs.into_values().collect();
631
632    if let Some(status) = &filter.status {
633        result.retain(|b| {
634            let s = serde_json::to_string(&b.status).unwrap_or_default();
635            s.contains(status)
636        });
637    }
638    if let Some(persona) = &filter.persona {
639        result.retain(|b| &b.asserted_by == persona);
640    }
641
642    result.sort_by(|a, b| {
643        b.confidence
644            .partial_cmp(&a.confidence)
645            .unwrap_or(std::cmp::Ordering::Equal)
646    });
647    Ok(Json(result))
648}
649
650async fn get_belief(
651    State(state): State<AppState>,
652    Path(id): Path<String>,
653) -> Result<Json<Belief>, (StatusCode, String)> {
654    match state.cognition.get_belief(&id).await {
655        Some(belief) => Ok(Json(belief)),
656        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
657    }
658}
659
660async fn list_attention(
661    State(state): State<AppState>,
662) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
663    Ok(Json(state.cognition.get_attention_queue().await))
664}
665
666async fn list_proposals(
667    State(state): State<AppState>,
668) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
669    let proposals = state.cognition.get_proposals().await;
670    let mut result: Vec<Proposal> = proposals.into_values().collect();
671    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
672    Ok(Json(result))
673}
674
675async fn approve_proposal(
676    State(state): State<AppState>,
677    Path(id): Path<String>,
678) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
679    state
680        .cognition
681        .approve_proposal(&id)
682        .await
683        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
684        .map_err(internal_error)
685}
686
687async fn list_receipts(
688    State(state): State<AppState>,
689) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
690    Ok(Json(state.cognition.get_receipts().await))
691}
692
693async fn get_workspace(
694    State(state): State<AppState>,
695) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
696    Ok(Json(state.cognition.get_workspace().await))
697}
698
699// ── Audit trail endpoints ──
700
701#[derive(Deserialize)]
702struct AuditQuery {
703    limit: Option<usize>,
704    category: Option<String>,
705}
706
707async fn list_audit_entries(
708    State(state): State<AppState>,
709    Query(query): Query<AuditQuery>,
710) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
711    let limit = query.limit.unwrap_or(100).min(1000);
712
713    let entries = if let Some(ref cat) = query.category {
714        let category = match cat.as_str() {
715            "api" => AuditCategory::Api,
716            "tool" | "tool_execution" => AuditCategory::ToolExecution,
717            "session" => AuditCategory::Session,
718            "cognition" => AuditCategory::Cognition,
719            "swarm" => AuditCategory::Swarm,
720            "auth" => AuditCategory::Auth,
721            "k8s" => AuditCategory::K8s,
722            "sandbox" => AuditCategory::Sandbox,
723            "config" => AuditCategory::Config,
724            _ => {
725                return Err((
726                    StatusCode::BAD_REQUEST,
727                    format!("Unknown category: {}", cat),
728                ));
729            }
730        };
731        state.audit_log.by_category(category, limit).await
732    } else {
733        state.audit_log.recent(limit).await
734    };
735
736    Ok(Json(entries))
737}
738
739// ── K8s self-deployment endpoints ──
740
741async fn get_k8s_status(
742    State(state): State<AppState>,
743) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
744    Ok(Json(state.k8s.status().await))
745}
746
747#[derive(Deserialize)]
748struct ScaleRequest {
749    replicas: i32,
750}
751
752async fn k8s_scale(
753    State(state): State<AppState>,
754    Json(req): Json<ScaleRequest>,
755) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
756    if req.replicas < 0 || req.replicas > 100 {
757        return Err((
758            StatusCode::BAD_REQUEST,
759            "Replicas must be between 0 and 100".to_string(),
760        ));
761    }
762
763    state
764        .audit_log
765        .log(
766            AuditCategory::K8s,
767            format!("scale:{}", req.replicas),
768            AuditOutcome::Success,
769            None,
770            None,
771        )
772        .await;
773
774    state
775        .k8s
776        .scale(req.replicas)
777        .await
778        .map(Json)
779        .map_err(internal_error)
780}
781
782async fn k8s_restart(
783    State(state): State<AppState>,
784) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
785    state
786        .audit_log
787        .log(
788            AuditCategory::K8s,
789            "rolling_restart",
790            AuditOutcome::Success,
791            None,
792            None,
793        )
794        .await;
795
796    state
797        .k8s
798        .rolling_restart()
799        .await
800        .map(Json)
801        .map_err(internal_error)
802}
803
804async fn k8s_list_pods(
805    State(state): State<AppState>,
806) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
807    state
808        .k8s
809        .list_pods()
810        .await
811        .map(Json)
812        .map_err(internal_error)
813}
814
815async fn k8s_actions(
816    State(state): State<AppState>,
817) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
818    Ok(Json(state.k8s.recent_actions(100).await))
819}
820
821fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
822    let message = error.to_string();
823    if message.contains("not found") {
824        return (StatusCode::NOT_FOUND, message);
825    }
826    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
827        return (StatusCode::BAD_REQUEST, message);
828    }
829    (StatusCode::INTERNAL_SERVER_ERROR, message)
830}
831
832fn env_bool(name: &str, default: bool) -> bool {
833    std::env::var(name)
834        .ok()
835        .and_then(|v| match v.to_ascii_lowercase().as_str() {
836            "1" | "true" | "yes" | "on" => Some(true),
837            "0" | "false" | "no" | "off" => Some(false),
838            _ => None,
839        })
840        .unwrap_or(default)
841}