1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16 executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24 Router,
25 body::Body,
26 extract::Path,
27 extract::{Query, State},
28 http::{Request, StatusCode},
29 middleware::{self, Next},
30 response::sse::{Event, KeepAlive, Sse},
31 response::{Json, Response},
32 routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41#[derive(Clone)]
43pub struct AppState {
44 pub config: Arc<Config>,
45 pub cognition: Arc<CognitionRuntime>,
46 pub audit_log: AuditLog,
47 pub k8s: Arc<K8sManager>,
48 pub auth: AuthState,
49 pub bus: Arc<AgentBus>,
50}
51
52async fn audit_middleware(
54 State(state): State<AppState>,
55 request: Request<Body>,
56 next: Next,
57) -> Response {
58 let method = request.method().clone();
59 let path = request.uri().path().to_string();
60 let started = std::time::Instant::now();
61
62 let response = next.run(request).await;
63
64 let duration_ms = started.elapsed().as_millis() as u64;
65 let status = response.status().as_u16();
66 let outcome = if status < 400 {
67 AuditOutcome::Success
68 } else if status == 401 || status == 403 {
69 AuditOutcome::Denied
70 } else {
71 AuditOutcome::Failure
72 };
73
74 state
75 .audit_log
76 .record(audit::AuditEntry {
77 id: uuid::Uuid::new_v4().to_string(),
78 timestamp: chrono::Utc::now(),
79 category: AuditCategory::Api,
80 action: format!("{} {}", method, path),
81 principal: None,
82 outcome,
83 detail: Some(serde_json::json!({ "status": status })),
84 duration_ms: Some(duration_ms),
85 })
86 .await;
87
88 response
89}
90
91struct PolicyRule {
94 pattern: &'static str,
95 methods: Option<&'static [&'static str]>,
96 permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100 PolicyRule {
102 pattern: "/health",
103 methods: None,
104 permission: "",
105 },
106 PolicyRule {
107 pattern: "/a2a/",
108 methods: None,
109 permission: "",
110 },
111 PolicyRule {
113 pattern: "/v1/k8s/scale",
114 methods: Some(&["POST"]),
115 permission: "admin:access",
116 },
117 PolicyRule {
118 pattern: "/v1/k8s/restart",
119 methods: Some(&["POST"]),
120 permission: "admin:access",
121 },
122 PolicyRule {
123 pattern: "/v1/k8s/",
124 methods: Some(&["GET"]),
125 permission: "admin:access",
126 },
127 PolicyRule {
129 pattern: "/v1/k8s/subagent",
130 methods: Some(&["POST", "DELETE"]),
131 permission: "admin:access",
132 },
133 PolicyRule {
135 pattern: "/v1/plugins",
136 methods: Some(&["GET"]),
137 permission: "agent:read",
138 },
139 PolicyRule {
141 pattern: "/v1/audit",
142 methods: None,
143 permission: "admin:access",
144 },
145 PolicyRule {
147 pattern: "/v1/cognition/start",
148 methods: Some(&["POST"]),
149 permission: "agent:execute",
150 },
151 PolicyRule {
152 pattern: "/v1/cognition/stop",
153 methods: Some(&["POST"]),
154 permission: "agent:execute",
155 },
156 PolicyRule {
157 pattern: "/v1/cognition/",
158 methods: Some(&["GET"]),
159 permission: "agent:read",
160 },
161 PolicyRule {
163 pattern: "/v1/swarm/personas",
164 methods: Some(&["POST"]),
165 permission: "agent:execute",
166 },
167 PolicyRule {
168 pattern: "/v1/swarm/",
169 methods: Some(&["POST"]),
170 permission: "agent:execute",
171 },
172 PolicyRule {
173 pattern: "/v1/swarm/",
174 methods: Some(&["GET"]),
175 permission: "agent:read",
176 },
177 PolicyRule {
179 pattern: "/api/session",
180 methods: Some(&["POST"]),
181 permission: "sessions:write",
182 },
183 PolicyRule {
184 pattern: "/api/session/",
185 methods: Some(&["POST"]),
186 permission: "sessions:write",
187 },
188 PolicyRule {
189 pattern: "/api/session",
190 methods: Some(&["GET"]),
191 permission: "sessions:read",
192 },
193 PolicyRule {
195 pattern: "/api/version",
196 methods: None,
197 permission: "agent:read",
198 },
199 PolicyRule {
200 pattern: "/api/config",
201 methods: None,
202 permission: "agent:read",
203 },
204 PolicyRule {
205 pattern: "/api/provider",
206 methods: None,
207 permission: "agent:read",
208 },
209 PolicyRule {
210 pattern: "/api/agent",
211 methods: None,
212 permission: "agent:read",
213 },
214];
215
216fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
219 for rule in POLICY_RULES {
220 let matches = if rule.pattern.ends_with('/') {
221 path.starts_with(rule.pattern) || path == &rule.pattern[..rule.pattern.len() - 1]
222 } else {
223 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
224 };
225 if matches {
226 if let Some(allowed_methods) = rule.methods {
227 if !allowed_methods.contains(&method) {
228 continue;
229 }
230 }
231 return Some(rule.permission);
232 }
233 }
234 None
235}
236
237async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
244 let path = request.uri().path().to_string();
245 let method = request.method().as_str().to_string();
246
247 let permission = match match_policy_rule(&path, &method) {
248 None | Some("") => return Ok(next.run(request).await),
249 Some(perm) => perm,
250 };
251
252 let user = policy::PolicyUser {
256 user_id: "bearer-token-user".to_string(),
257 roles: vec!["admin".to_string()],
258 tenant_id: None,
259 scopes: vec![],
260 auth_source: "static_token".to_string(),
261 };
262
263 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
264 tracing::warn!(
265 path = %path,
266 method = %method,
267 permission = %permission,
268 "Policy middleware denied request"
269 );
270 return Err(status);
271 }
272
273 Ok(next.run(request).await)
274}
275
276pub async fn serve(args: ServeArgs) -> Result<()> {
278 let t0 = std::time::Instant::now();
279 tracing::info!("[startup] begin");
280 let config = Config::load().await?;
281 tracing::info!(
282 elapsed_ms = t0.elapsed().as_millis(),
283 "[startup] config loaded"
284 );
285 let mut cognition = CognitionRuntime::new_from_env();
286 tracing::info!(
287 elapsed_ms = t0.elapsed().as_millis(),
288 "[startup] cognition runtime created"
289 );
290
291 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
293 tracing::info!(
294 elapsed_ms = t0.elapsed().as_millis(),
295 "[startup] tools registered"
296 );
297 let cognition = Arc::new(cognition);
298
299 let audit_log = AuditLog::from_env();
301 let _ = audit::init_audit_log(audit_log.clone());
302
303 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
305 let k8s = Arc::new(K8sManager::new().await);
306 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
307 if k8s.is_available() {
308 tracing::info!("K8s self-deployment enabled");
309 }
310
311 let auth_state = AuthState::from_env();
313 tracing::info!(
314 token_len = auth_state.token().len(),
315 "Auth is mandatory. Token required for all API endpoints."
316 );
317 tracing::info!(
318 audit_entries = audit_log.count().await,
319 "Audit log initialized"
320 );
321
322 let bus = AgentBus::new().into_arc();
324 tracing::info!(
325 elapsed_ms = t0.elapsed().as_millis(),
326 "[startup] bus created"
327 );
328
329 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
330 tracing::info!(
331 elapsed_ms = t0.elapsed().as_millis(),
332 "[startup] auto-starting cognition"
333 );
334 if let Err(error) = cognition.start(None).await {
335 tracing::warn!(%error, "Failed to auto-start cognition loop");
336 } else {
337 tracing::info!("Perpetual cognition auto-started");
338 }
339 }
340
341 tracing::info!(
342 elapsed_ms = t0.elapsed().as_millis(),
343 "[startup] building routes"
344 );
345 let addr = format!("{}:{}", args.hostname, args.port);
346
347 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
349 let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
350
351 let a2a_router = a2a_server.router();
353
354 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
356 .ok()
357 .and_then(|p| p.parse::<u16>().ok())
358 .unwrap_or(50051);
359 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
360 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
361 let grpc_service = grpc_store.into_service();
362 tokio::spawn(async move {
363 tracing::info!("gRPC A2A server listening on {}", grpc_addr);
364 if let Err(e) = tonic::transport::Server::builder()
365 .add_service(grpc_service)
366 .serve(grpc_addr)
367 .await
368 {
369 tracing::error!("gRPC server error: {}", e);
370 }
371 });
372
373 let state = AppState {
374 config: Arc::new(config),
375 cognition,
376 audit_log,
377 k8s,
378 auth: auth_state.clone(),
379 bus,
380 };
381
382 let app = Router::new()
383 .route("/health", get(health))
385 .route("/api/version", get(get_version))
387 .route("/api/session", get(list_sessions).post(create_session))
388 .route("/api/session/{id}", get(get_session))
389 .route("/api/session/{id}/prompt", post(prompt_session))
390 .route("/api/config", get(get_config))
391 .route("/api/provider", get(list_providers))
392 .route("/api/agent", get(list_agents))
393 .route("/v1/cognition/start", post(start_cognition))
395 .route("/v1/cognition/stop", post(stop_cognition))
396 .route("/v1/cognition/status", get(get_cognition_status))
397 .route("/v1/cognition/stream", get(stream_cognition))
398 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
399 .route("/v1/swarm/personas", post(create_persona))
401 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
402 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
403 .route("/v1/swarm/lineage", get(get_swarm_lineage))
404 .route("/v1/cognition/beliefs", get(list_beliefs))
406 .route("/v1/cognition/beliefs/{id}", get(get_belief))
407 .route("/v1/cognition/attention", get(list_attention))
408 .route("/v1/cognition/proposals", get(list_proposals))
409 .route(
410 "/v1/cognition/proposals/{id}/approve",
411 post(approve_proposal),
412 )
413 .route("/v1/cognition/receipts", get(list_receipts))
414 .route("/v1/cognition/workspace", get(get_workspace))
415 .route("/v1/cognition/governance", get(get_governance))
416 .route("/v1/cognition/personas/{id}", get(get_persona))
417 .route("/v1/audit", get(list_audit_entries))
419 .route("/v1/k8s/status", get(get_k8s_status))
421 .route("/v1/k8s/scale", post(k8s_scale))
422 .route("/v1/k8s/restart", post(k8s_restart))
423 .route("/v1/k8s/pods", get(k8s_list_pods))
424 .route("/v1/k8s/actions", get(k8s_actions))
425 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
426 .route(
427 "/v1/k8s/subagent/{id}",
428 axum::routing::delete(k8s_delete_subagent),
429 )
430 .route("/v1/plugins", get(list_plugins))
432 .with_state(state.clone())
433 .nest("/a2a", a2a_router)
435 .layer(middleware::from_fn_with_state(
437 state.clone(),
438 audit_middleware,
439 ))
440 .layer(axum::Extension(state.auth.clone()))
441 .layer(middleware::from_fn(policy_middleware))
442 .layer(middleware::from_fn(auth::require_auth))
443 .layer(
445 CorsLayer::new()
446 .allow_origin(AllowOrigin::mirror_request())
447 .allow_credentials(true)
448 .allow_methods(AllowMethods::mirror_request())
449 .allow_headers(AllowHeaders::mirror_request()),
450 )
451 .layer(TraceLayer::new_for_http());
452
453 tracing::info!(
454 elapsed_ms = t0.elapsed().as_millis(),
455 "[startup] router built, binding"
456 );
457 let listener = tokio::net::TcpListener::bind(&addr).await?;
458 tracing::info!(
459 elapsed_ms = t0.elapsed().as_millis(),
460 "[startup] listening on http://{}",
461 addr
462 );
463
464 axum::serve(listener, app).await?;
465
466 Ok(())
467}
468
469async fn health() -> &'static str {
471 "ok"
472}
473
474#[derive(Serialize)]
476struct VersionInfo {
477 version: &'static str,
478 name: &'static str,
479 binary_hash: Option<String>,
480}
481
482async fn get_version() -> Json<VersionInfo> {
483 let binary_hash = std::env::current_exe()
484 .ok()
485 .and_then(|p| hash_file(&p).ok());
486 Json(VersionInfo {
487 version: env!("CARGO_PKG_VERSION"),
488 name: env!("CARGO_PKG_NAME"),
489 binary_hash,
490 })
491}
492
493#[derive(Deserialize)]
495struct ListSessionsQuery {
496 limit: Option<usize>,
497}
498
499async fn list_sessions(
500 Query(query): Query<ListSessionsQuery>,
501) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
502 let sessions = crate::session::list_sessions()
503 .await
504 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
505
506 let limit = query.limit.unwrap_or(50);
507 Ok(Json(sessions.into_iter().take(limit).collect()))
508}
509
510#[derive(Deserialize)]
512struct CreateSessionRequest {
513 title: Option<String>,
514 agent: Option<String>,
515}
516
517async fn create_session(
518 Json(req): Json<CreateSessionRequest>,
519) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
520 let mut session = crate::session::Session::new()
521 .await
522 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
523
524 session.title = req.title;
525 if let Some(agent) = req.agent {
526 session.agent = agent;
527 }
528
529 session
530 .save()
531 .await
532 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
533
534 Ok(Json(session))
535}
536
537async fn get_session(
539 axum::extract::Path(id): axum::extract::Path<String>,
540) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
541 let session = crate::session::Session::load(&id)
542 .await
543 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
544
545 Ok(Json(session))
546}
547
548#[derive(Deserialize)]
550struct PromptRequest {
551 message: String,
552}
553
554async fn prompt_session(
555 axum::extract::Path(id): axum::extract::Path<String>,
556 Json(req): Json<PromptRequest>,
557) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
558 if req.message.trim().is_empty() {
560 return Err((
561 StatusCode::BAD_REQUEST,
562 "Message cannot be empty".to_string(),
563 ));
564 }
565
566 tracing::info!(
568 session_id = %id,
569 message_len = req.message.len(),
570 "Received prompt request"
571 );
572
573 Err((
575 StatusCode::NOT_IMPLEMENTED,
576 "Prompt execution not yet implemented".to_string(),
577 ))
578}
579
580async fn get_config(State(state): State<AppState>) -> Json<Config> {
582 Json((*state.config).clone())
583}
584
585async fn list_providers() -> Json<Vec<String>> {
587 Json(vec![
588 "openai".to_string(),
589 "anthropic".to_string(),
590 "google".to_string(),
591 ])
592}
593
594async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
596 let registry = crate::agent::AgentRegistry::with_builtins();
597 Json(registry.list().into_iter().cloned().collect())
598}
599
600async fn start_cognition(
601 State(state): State<AppState>,
602 payload: Option<Json<StartCognitionRequest>>,
603) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
604 state
605 .cognition
606 .start(payload.map(|Json(body)| body))
607 .await
608 .map(Json)
609 .map_err(internal_error)
610}
611
612async fn stop_cognition(
613 State(state): State<AppState>,
614 payload: Option<Json<StopCognitionRequest>>,
615) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
616 let reason = payload.and_then(|Json(body)| body.reason);
617 state
618 .cognition
619 .stop(reason)
620 .await
621 .map(Json)
622 .map_err(internal_error)
623}
624
625async fn get_cognition_status(
626 State(state): State<AppState>,
627) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
628 Ok(Json(state.cognition.status().await))
629}
630
631async fn stream_cognition(
632 State(state): State<AppState>,
633) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
634 let rx = state.cognition.subscribe_events();
635
636 let event_stream = stream::unfold(rx, |mut rx| async move {
637 match rx.recv().await {
638 Ok(event) => {
639 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
640 let sse_event = Event::default().event("cognition").data(payload);
641 Some((Ok(sse_event), rx))
642 }
643 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
644 let lag_event = Event::default()
645 .event("lag")
646 .data(format!("skipped {}", skipped));
647 Some((Ok(lag_event), rx))
648 }
649 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
650 }
651 });
652
653 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
654}
655
656async fn get_latest_snapshot(
657 State(state): State<AppState>,
658) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
659 match state.cognition.latest_snapshot().await {
660 Some(snapshot) => Ok(Json(snapshot)),
661 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
662 }
663}
664
665async fn create_persona(
666 State(state): State<AppState>,
667 Json(req): Json<CreatePersonaRequest>,
668) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
669 state
670 .cognition
671 .create_persona(req)
672 .await
673 .map(Json)
674 .map_err(internal_error)
675}
676
677async fn spawn_persona(
678 State(state): State<AppState>,
679 Path(id): Path<String>,
680 Json(req): Json<SpawnPersonaRequest>,
681) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
682 state
683 .cognition
684 .spawn_child(&id, req)
685 .await
686 .map(Json)
687 .map_err(internal_error)
688}
689
690async fn reap_persona(
691 State(state): State<AppState>,
692 Path(id): Path<String>,
693 payload: Option<Json<ReapPersonaRequest>>,
694) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
695 let req = payload
696 .map(|Json(body)| body)
697 .unwrap_or(ReapPersonaRequest {
698 cascade: Some(false),
699 reason: None,
700 });
701
702 state
703 .cognition
704 .reap_persona(&id, req)
705 .await
706 .map(Json)
707 .map_err(internal_error)
708}
709
710async fn get_swarm_lineage(
711 State(state): State<AppState>,
712) -> Result<Json<LineageGraph>, (StatusCode, String)> {
713 Ok(Json(state.cognition.lineage_graph().await))
714}
715
716#[derive(Deserialize)]
719struct BeliefFilter {
720 status: Option<String>,
721 persona: Option<String>,
722}
723
724async fn list_beliefs(
725 State(state): State<AppState>,
726 Query(filter): Query<BeliefFilter>,
727) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
728 let beliefs = state.cognition.get_beliefs().await;
729 let mut result: Vec<Belief> = beliefs.into_values().collect();
730
731 if let Some(status) = &filter.status {
732 result.retain(|b| {
733 let s = serde_json::to_string(&b.status).unwrap_or_default();
734 s.contains(status)
735 });
736 }
737 if let Some(persona) = &filter.persona {
738 result.retain(|b| &b.asserted_by == persona);
739 }
740
741 result.sort_by(|a, b| {
742 b.confidence
743 .partial_cmp(&a.confidence)
744 .unwrap_or(std::cmp::Ordering::Equal)
745 });
746 Ok(Json(result))
747}
748
749async fn get_belief(
750 State(state): State<AppState>,
751 Path(id): Path<String>,
752) -> Result<Json<Belief>, (StatusCode, String)> {
753 match state.cognition.get_belief(&id).await {
754 Some(belief) => Ok(Json(belief)),
755 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
756 }
757}
758
759async fn list_attention(
760 State(state): State<AppState>,
761) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
762 Ok(Json(state.cognition.get_attention_queue().await))
763}
764
765async fn list_proposals(
766 State(state): State<AppState>,
767) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
768 let proposals = state.cognition.get_proposals().await;
769 let mut result: Vec<Proposal> = proposals.into_values().collect();
770 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
771 Ok(Json(result))
772}
773
774async fn approve_proposal(
775 State(state): State<AppState>,
776 Path(id): Path<String>,
777) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
778 state
779 .cognition
780 .approve_proposal(&id)
781 .await
782 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
783 .map_err(internal_error)
784}
785
786async fn list_receipts(
787 State(state): State<AppState>,
788) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
789 Ok(Json(state.cognition.get_receipts().await))
790}
791
792async fn get_workspace(
793 State(state): State<AppState>,
794) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
795 Ok(Json(state.cognition.get_workspace().await))
796}
797
798async fn get_governance(
799 State(state): State<AppState>,
800) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
801 Ok(Json(state.cognition.get_governance().await))
802}
803
804async fn get_persona(
805 State(state): State<AppState>,
806 axum::extract::Path(id): axum::extract::Path<String>,
807) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
808 state
809 .cognition
810 .get_persona(&id)
811 .await
812 .map(Json)
813 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
814}
815
816#[derive(Deserialize)]
819struct AuditQuery {
820 limit: Option<usize>,
821 category: Option<String>,
822}
823
824async fn list_audit_entries(
825 State(state): State<AppState>,
826 Query(query): Query<AuditQuery>,
827) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
828 let limit = query.limit.unwrap_or(100).min(1000);
829
830 let entries = if let Some(ref cat) = query.category {
831 let category = match cat.as_str() {
832 "api" => AuditCategory::Api,
833 "tool" | "tool_execution" => AuditCategory::ToolExecution,
834 "session" => AuditCategory::Session,
835 "cognition" => AuditCategory::Cognition,
836 "swarm" => AuditCategory::Swarm,
837 "auth" => AuditCategory::Auth,
838 "k8s" => AuditCategory::K8s,
839 "sandbox" => AuditCategory::Sandbox,
840 "config" => AuditCategory::Config,
841 _ => {
842 return Err((
843 StatusCode::BAD_REQUEST,
844 format!("Unknown category: {}", cat),
845 ));
846 }
847 };
848 state.audit_log.by_category(category, limit).await
849 } else {
850 state.audit_log.recent(limit).await
851 };
852
853 Ok(Json(entries))
854}
855
856async fn get_k8s_status(
859 State(state): State<AppState>,
860) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
861 Ok(Json(state.k8s.status().await))
862}
863
864#[derive(Deserialize)]
865struct ScaleRequest {
866 replicas: i32,
867}
868
869async fn k8s_scale(
870 State(state): State<AppState>,
871 Json(req): Json<ScaleRequest>,
872) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
873 if req.replicas < 0 || req.replicas > 100 {
874 return Err((
875 StatusCode::BAD_REQUEST,
876 "Replicas must be between 0 and 100".to_string(),
877 ));
878 }
879
880 state
881 .audit_log
882 .log(
883 AuditCategory::K8s,
884 format!("scale:{}", req.replicas),
885 AuditOutcome::Success,
886 None,
887 None,
888 )
889 .await;
890
891 state
892 .k8s
893 .scale(req.replicas)
894 .await
895 .map(Json)
896 .map_err(internal_error)
897}
898
899async fn k8s_restart(
900 State(state): State<AppState>,
901) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
902 state
903 .audit_log
904 .log(
905 AuditCategory::K8s,
906 "rolling_restart",
907 AuditOutcome::Success,
908 None,
909 None,
910 )
911 .await;
912
913 state
914 .k8s
915 .rolling_restart()
916 .await
917 .map(Json)
918 .map_err(internal_error)
919}
920
921async fn k8s_list_pods(
922 State(state): State<AppState>,
923) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
924 state
925 .k8s
926 .list_pods()
927 .await
928 .map(Json)
929 .map_err(internal_error)
930}
931
932async fn k8s_actions(
933 State(state): State<AppState>,
934) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
935 Ok(Json(state.k8s.recent_actions(100).await))
936}
937
938#[derive(Deserialize)]
939struct SpawnSubagentRequest {
940 subagent_id: String,
941 #[serde(default)]
942 image: Option<String>,
943 #[serde(default)]
944 env_vars: std::collections::HashMap<String, String>,
945}
946
947async fn k8s_spawn_subagent(
948 State(state): State<AppState>,
949 Json(req): Json<SpawnSubagentRequest>,
950) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
951 state
952 .k8s
953 .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
954 .await
955 .map(Json)
956 .map_err(internal_error)
957}
958
959async fn k8s_delete_subagent(
960 State(state): State<AppState>,
961 axum::extract::Path(id): axum::extract::Path<String>,
962) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
963 state
964 .k8s
965 .delete_subagent_pod(&id)
966 .await
967 .map(Json)
968 .map_err(internal_error)
969}
970
971async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
973 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
974 let signing_key = SigningKey::from_env();
975 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
976 Json(PluginListResponse {
977 server_fingerprint,
978 signing_available: !test_sig.is_empty(),
979 plugins: Vec::<PluginManifest>::new(),
980 })
981}
982
983#[derive(Serialize)]
984struct PluginListResponse {
985 server_fingerprint: String,
986 signing_available: bool,
987 plugins: Vec<PluginManifest>,
988}
989
990fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
991 let message = error.to_string();
992 if message.contains("not found") {
993 return (StatusCode::NOT_FOUND, message);
994 }
995 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
996 return (StatusCode::BAD_REQUEST, message);
997 }
998 (StatusCode::INTERNAL_SERVER_ERROR, message)
999}
1000
1001fn env_bool(name: &str, default: bool) -> bool {
1002 std::env::var(name)
1003 .ok()
1004 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1005 "1" | "true" | "yes" | "on" => Some(true),
1006 "0" | "false" | "no" | "off" => Some(false),
1007 _ => None,
1008 })
1009 .unwrap_or(default)
1010}