1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16 executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24 Router,
25 body::Body,
26 extract::Path,
27 extract::{Query, State},
28 http::{Request, StatusCode},
29 middleware::{self, Next},
30 response::sse::{Event, KeepAlive, Sse},
31 response::{Json, Response},
32 routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41#[derive(Clone)]
43pub struct AppState {
44 pub config: Arc<Config>,
45 pub cognition: Arc<CognitionRuntime>,
46 pub audit_log: AuditLog,
47 pub k8s: Arc<K8sManager>,
48 pub auth: AuthState,
49 pub bus: Arc<AgentBus>,
50}
51
52async fn audit_middleware(
54 State(state): State<AppState>,
55 request: Request<Body>,
56 next: Next,
57) -> Response {
58 let method = request.method().clone();
59 let path = request.uri().path().to_string();
60 let started = std::time::Instant::now();
61
62 let response = next.run(request).await;
63
64 let duration_ms = started.elapsed().as_millis() as u64;
65 let status = response.status().as_u16();
66 let outcome = if status < 400 {
67 AuditOutcome::Success
68 } else if status == 401 || status == 403 {
69 AuditOutcome::Denied
70 } else {
71 AuditOutcome::Failure
72 };
73
74 state
75 .audit_log
76 .record(audit::AuditEntry {
77 id: uuid::Uuid::new_v4().to_string(),
78 timestamp: chrono::Utc::now(),
79 category: AuditCategory::Api,
80 action: format!("{} {}", method, path),
81 principal: None,
82 outcome,
83 detail: Some(serde_json::json!({ "status": status })),
84 duration_ms: Some(duration_ms),
85 })
86 .await;
87
88 response
89}
90
91struct PolicyRule {
94 pattern: &'static str,
95 methods: Option<&'static [&'static str]>,
96 permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100 PolicyRule {
102 pattern: "/health",
103 methods: None,
104 permission: "",
105 },
106 PolicyRule {
107 pattern: "/a2a/",
108 methods: None,
109 permission: "",
110 },
111 PolicyRule {
113 pattern: "/v1/k8s/scale",
114 methods: Some(&["POST"]),
115 permission: "admin:access",
116 },
117 PolicyRule {
118 pattern: "/v1/k8s/restart",
119 methods: Some(&["POST"]),
120 permission: "admin:access",
121 },
122 PolicyRule {
123 pattern: "/v1/k8s/",
124 methods: Some(&["GET"]),
125 permission: "admin:access",
126 },
127 PolicyRule {
129 pattern: "/v1/k8s/subagent",
130 methods: Some(&["POST", "DELETE"]),
131 permission: "admin:access",
132 },
133 PolicyRule {
135 pattern: "/v1/plugins",
136 methods: Some(&["GET"]),
137 permission: "agent:read",
138 },
139 PolicyRule {
141 pattern: "/v1/audit",
142 methods: None,
143 permission: "admin:access",
144 },
145 PolicyRule {
147 pattern: "/v1/cognition/start",
148 methods: Some(&["POST"]),
149 permission: "agent:execute",
150 },
151 PolicyRule {
152 pattern: "/v1/cognition/stop",
153 methods: Some(&["POST"]),
154 permission: "agent:execute",
155 },
156 PolicyRule {
157 pattern: "/v1/cognition/",
158 methods: Some(&["GET"]),
159 permission: "agent:read",
160 },
161 PolicyRule {
163 pattern: "/v1/swarm/personas",
164 methods: Some(&["POST"]),
165 permission: "agent:execute",
166 },
167 PolicyRule {
168 pattern: "/v1/swarm/",
169 methods: Some(&["POST"]),
170 permission: "agent:execute",
171 },
172 PolicyRule {
173 pattern: "/v1/swarm/",
174 methods: Some(&["GET"]),
175 permission: "agent:read",
176 },
177 PolicyRule {
180 pattern: "/api/session/",
181 methods: Some(&["POST"]),
182 permission: "agent:execute",
183 },
184 PolicyRule {
185 pattern: "/api/session",
186 methods: Some(&["POST"]),
187 permission: "sessions:write",
188 },
189 PolicyRule {
190 pattern: "/api/session",
191 methods: Some(&["GET"]),
192 permission: "sessions:read",
193 },
194 PolicyRule {
196 pattern: "/v1/cognition/proposals/",
197 methods: Some(&["POST"]),
198 permission: "agent:execute",
199 },
200 PolicyRule {
202 pattern: "/api/version",
203 methods: None,
204 permission: "agent:read",
205 },
206 PolicyRule {
207 pattern: "/api/config",
208 methods: None,
209 permission: "agent:read",
210 },
211 PolicyRule {
212 pattern: "/api/provider",
213 methods: None,
214 permission: "agent:read",
215 },
216 PolicyRule {
217 pattern: "/api/agent",
218 methods: None,
219 permission: "agent:read",
220 },
221];
222
223fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
226 for rule in POLICY_RULES {
227 let matches = if rule.pattern.ends_with('/') {
228 path.starts_with(rule.pattern)
229 } else {
230 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
231 };
232 if matches {
233 if let Some(allowed_methods) = rule.methods {
234 if !allowed_methods.contains(&method) {
235 continue;
236 }
237 }
238 return Some(rule.permission);
239 }
240 }
241 None
242}
243
244async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
251 let path = request.uri().path().to_string();
252 let method = request.method().as_str().to_string();
253
254 let permission = match match_policy_rule(&path, &method) {
255 None | Some("") => return Ok(next.run(request).await),
256 Some(perm) => perm,
257 };
258
259 let user = policy::PolicyUser {
263 user_id: "bearer-token-user".to_string(),
264 roles: vec!["admin".to_string()],
265 tenant_id: None,
266 scopes: vec![],
267 auth_source: "static_token".to_string(),
268 };
269
270 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
271 tracing::warn!(
272 path = %path,
273 method = %method,
274 permission = %permission,
275 "Policy middleware denied request"
276 );
277 return Err(status);
278 }
279
280 Ok(next.run(request).await)
281}
282
283pub async fn serve(args: ServeArgs) -> Result<()> {
285 let t0 = std::time::Instant::now();
286 tracing::info!("[startup] begin");
287 let config = Config::load().await?;
288 tracing::info!(
289 elapsed_ms = t0.elapsed().as_millis(),
290 "[startup] config loaded"
291 );
292 let mut cognition = CognitionRuntime::new_from_env();
293 tracing::info!(
294 elapsed_ms = t0.elapsed().as_millis(),
295 "[startup] cognition runtime created"
296 );
297
298 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
300 tracing::info!(
301 elapsed_ms = t0.elapsed().as_millis(),
302 "[startup] tools registered"
303 );
304 let cognition = Arc::new(cognition);
305
306 let audit_log = AuditLog::from_env();
308 let _ = audit::init_audit_log(audit_log.clone());
309
310 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
312 let k8s = Arc::new(K8sManager::new().await);
313 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
314 if k8s.is_available() {
315 tracing::info!("K8s self-deployment enabled");
316 }
317
318 let auth_state = AuthState::from_env();
320 tracing::info!(
321 token_len = auth_state.token().len(),
322 "Auth is mandatory. Token required for all API endpoints."
323 );
324 tracing::info!(
325 audit_entries = audit_log.count().await,
326 "Audit log initialized"
327 );
328
329 let bus = AgentBus::new().into_arc();
331 tracing::info!(
332 elapsed_ms = t0.elapsed().as_millis(),
333 "[startup] bus created"
334 );
335
336 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
337 tracing::info!(
338 elapsed_ms = t0.elapsed().as_millis(),
339 "[startup] auto-starting cognition"
340 );
341 if let Err(error) = cognition.start(None).await {
342 tracing::warn!(%error, "Failed to auto-start cognition loop");
343 } else {
344 tracing::info!("Perpetual cognition auto-started");
345 }
346 }
347
348 tracing::info!(
349 elapsed_ms = t0.elapsed().as_millis(),
350 "[startup] building routes"
351 );
352 let addr = format!("{}:{}", args.hostname, args.port);
353
354 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
356 let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
357
358 let a2a_router = a2a_server.router();
360
361 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
363 .ok()
364 .and_then(|p| p.parse::<u16>().ok())
365 .unwrap_or(50051);
366 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
367 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
368 let grpc_service = grpc_store.into_service();
369 tokio::spawn(async move {
370 tracing::info!("gRPC A2A server listening on {}", grpc_addr);
371 if let Err(e) = tonic::transport::Server::builder()
372 .add_service(grpc_service)
373 .serve(grpc_addr)
374 .await
375 {
376 tracing::error!("gRPC server error: {}", e);
377 }
378 });
379
380 let state = AppState {
381 config: Arc::new(config),
382 cognition,
383 audit_log,
384 k8s,
385 auth: auth_state.clone(),
386 bus,
387 };
388
389 let app = Router::new()
390 .route("/health", get(health))
392 .route("/api/version", get(get_version))
394 .route("/api/session", get(list_sessions).post(create_session))
395 .route("/api/session/{id}", get(get_session))
396 .route("/api/session/{id}/prompt", post(prompt_session))
397 .route("/api/config", get(get_config))
398 .route("/api/provider", get(list_providers))
399 .route("/api/agent", get(list_agents))
400 .route("/v1/cognition/start", post(start_cognition))
402 .route("/v1/cognition/stop", post(stop_cognition))
403 .route("/v1/cognition/status", get(get_cognition_status))
404 .route("/v1/cognition/stream", get(stream_cognition))
405 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
406 .route("/v1/swarm/personas", post(create_persona))
408 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
409 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
410 .route("/v1/swarm/lineage", get(get_swarm_lineage))
411 .route("/v1/cognition/beliefs", get(list_beliefs))
413 .route("/v1/cognition/beliefs/{id}", get(get_belief))
414 .route("/v1/cognition/attention", get(list_attention))
415 .route("/v1/cognition/proposals", get(list_proposals))
416 .route(
417 "/v1/cognition/proposals/{id}/approve",
418 post(approve_proposal),
419 )
420 .route("/v1/cognition/receipts", get(list_receipts))
421 .route("/v1/cognition/workspace", get(get_workspace))
422 .route("/v1/cognition/governance", get(get_governance))
423 .route("/v1/cognition/personas/{id}", get(get_persona))
424 .route("/v1/audit", get(list_audit_entries))
426 .route("/v1/k8s/status", get(get_k8s_status))
428 .route("/v1/k8s/scale", post(k8s_scale))
429 .route("/v1/k8s/restart", post(k8s_restart))
430 .route("/v1/k8s/pods", get(k8s_list_pods))
431 .route("/v1/k8s/actions", get(k8s_actions))
432 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
433 .route(
434 "/v1/k8s/subagent/{id}",
435 axum::routing::delete(k8s_delete_subagent),
436 )
437 .route("/v1/plugins", get(list_plugins))
439 .with_state(state.clone())
440 .nest("/a2a", a2a_router)
442 .layer(middleware::from_fn_with_state(
444 state.clone(),
445 audit_middleware,
446 ))
447 .layer(axum::Extension(state.auth.clone()))
448 .layer(middleware::from_fn(policy_middleware))
449 .layer(middleware::from_fn(auth::require_auth))
450 .layer(
452 CorsLayer::new()
453 .allow_origin(AllowOrigin::mirror_request())
454 .allow_credentials(true)
455 .allow_methods(AllowMethods::mirror_request())
456 .allow_headers(AllowHeaders::mirror_request()),
457 )
458 .layer(TraceLayer::new_for_http());
459
460 tracing::info!(
461 elapsed_ms = t0.elapsed().as_millis(),
462 "[startup] router built, binding"
463 );
464 let listener = tokio::net::TcpListener::bind(&addr).await?;
465 tracing::info!(
466 elapsed_ms = t0.elapsed().as_millis(),
467 "[startup] listening on http://{}",
468 addr
469 );
470
471 axum::serve(listener, app).await?;
472
473 Ok(())
474}
475
476async fn health() -> &'static str {
478 "ok"
479}
480
481#[derive(Serialize)]
483struct VersionInfo {
484 version: &'static str,
485 name: &'static str,
486 binary_hash: Option<String>,
487}
488
489async fn get_version() -> Json<VersionInfo> {
490 let binary_hash = std::env::current_exe()
491 .ok()
492 .and_then(|p| hash_file(&p).ok());
493 Json(VersionInfo {
494 version: env!("CARGO_PKG_VERSION"),
495 name: env!("CARGO_PKG_NAME"),
496 binary_hash,
497 })
498}
499
500#[derive(Deserialize)]
502struct ListSessionsQuery {
503 limit: Option<usize>,
504}
505
506async fn list_sessions(
507 Query(query): Query<ListSessionsQuery>,
508) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
509 let sessions = crate::session::list_sessions()
510 .await
511 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
512
513 let limit = query.limit.unwrap_or(50);
514 Ok(Json(sessions.into_iter().take(limit).collect()))
515}
516
517#[derive(Deserialize)]
519struct CreateSessionRequest {
520 title: Option<String>,
521 agent: Option<String>,
522}
523
524async fn create_session(
525 Json(req): Json<CreateSessionRequest>,
526) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
527 let mut session = crate::session::Session::new()
528 .await
529 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
530
531 session.title = req.title;
532 if let Some(agent) = req.agent {
533 session.agent = agent;
534 }
535
536 session
537 .save()
538 .await
539 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
540
541 Ok(Json(session))
542}
543
544async fn get_session(
546 axum::extract::Path(id): axum::extract::Path<String>,
547) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
548 let session = crate::session::Session::load(&id)
549 .await
550 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
551
552 Ok(Json(session))
553}
554
555#[derive(Deserialize)]
557struct PromptRequest {
558 message: String,
559}
560
561async fn prompt_session(
562 axum::extract::Path(id): axum::extract::Path<String>,
563 Json(req): Json<PromptRequest>,
564) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
565 if req.message.trim().is_empty() {
567 return Err((
568 StatusCode::BAD_REQUEST,
569 "Message cannot be empty".to_string(),
570 ));
571 }
572
573 tracing::info!(
575 session_id = %id,
576 message_len = req.message.len(),
577 "Received prompt request"
578 );
579
580 Err((
582 StatusCode::NOT_IMPLEMENTED,
583 "Prompt execution not yet implemented".to_string(),
584 ))
585}
586
587async fn get_config(State(state): State<AppState>) -> Json<Config> {
589 Json((*state.config).clone())
590}
591
592async fn list_providers() -> Json<Vec<String>> {
594 Json(vec![
595 "openai".to_string(),
596 "anthropic".to_string(),
597 "google".to_string(),
598 ])
599}
600
601async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
603 let registry = crate::agent::AgentRegistry::with_builtins();
604 Json(registry.list().into_iter().cloned().collect())
605}
606
607async fn start_cognition(
608 State(state): State<AppState>,
609 payload: Option<Json<StartCognitionRequest>>,
610) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
611 state
612 .cognition
613 .start(payload.map(|Json(body)| body))
614 .await
615 .map(Json)
616 .map_err(internal_error)
617}
618
619async fn stop_cognition(
620 State(state): State<AppState>,
621 payload: Option<Json<StopCognitionRequest>>,
622) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
623 let reason = payload.and_then(|Json(body)| body.reason);
624 state
625 .cognition
626 .stop(reason)
627 .await
628 .map(Json)
629 .map_err(internal_error)
630}
631
632async fn get_cognition_status(
633 State(state): State<AppState>,
634) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
635 Ok(Json(state.cognition.status().await))
636}
637
638async fn stream_cognition(
639 State(state): State<AppState>,
640) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
641 let rx = state.cognition.subscribe_events();
642
643 let event_stream = stream::unfold(rx, |mut rx| async move {
644 match rx.recv().await {
645 Ok(event) => {
646 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
647 let sse_event = Event::default().event("cognition").data(payload);
648 Some((Ok(sse_event), rx))
649 }
650 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
651 let lag_event = Event::default()
652 .event("lag")
653 .data(format!("skipped {}", skipped));
654 Some((Ok(lag_event), rx))
655 }
656 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
657 }
658 });
659
660 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
661}
662
663async fn get_latest_snapshot(
664 State(state): State<AppState>,
665) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
666 match state.cognition.latest_snapshot().await {
667 Some(snapshot) => Ok(Json(snapshot)),
668 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
669 }
670}
671
672async fn create_persona(
673 State(state): State<AppState>,
674 Json(req): Json<CreatePersonaRequest>,
675) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
676 state
677 .cognition
678 .create_persona(req)
679 .await
680 .map(Json)
681 .map_err(internal_error)
682}
683
684async fn spawn_persona(
685 State(state): State<AppState>,
686 Path(id): Path<String>,
687 Json(req): Json<SpawnPersonaRequest>,
688) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
689 state
690 .cognition
691 .spawn_child(&id, req)
692 .await
693 .map(Json)
694 .map_err(internal_error)
695}
696
697async fn reap_persona(
698 State(state): State<AppState>,
699 Path(id): Path<String>,
700 payload: Option<Json<ReapPersonaRequest>>,
701) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
702 let req = payload
703 .map(|Json(body)| body)
704 .unwrap_or(ReapPersonaRequest {
705 cascade: Some(false),
706 reason: None,
707 });
708
709 state
710 .cognition
711 .reap_persona(&id, req)
712 .await
713 .map(Json)
714 .map_err(internal_error)
715}
716
717async fn get_swarm_lineage(
718 State(state): State<AppState>,
719) -> Result<Json<LineageGraph>, (StatusCode, String)> {
720 Ok(Json(state.cognition.lineage_graph().await))
721}
722
723#[derive(Deserialize)]
726struct BeliefFilter {
727 status: Option<String>,
728 persona: Option<String>,
729}
730
731async fn list_beliefs(
732 State(state): State<AppState>,
733 Query(filter): Query<BeliefFilter>,
734) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
735 let beliefs = state.cognition.get_beliefs().await;
736 let mut result: Vec<Belief> = beliefs.into_values().collect();
737
738 if let Some(status) = &filter.status {
739 result.retain(|b| {
740 let s = serde_json::to_string(&b.status).unwrap_or_default();
741 s.contains(status)
742 });
743 }
744 if let Some(persona) = &filter.persona {
745 result.retain(|b| &b.asserted_by == persona);
746 }
747
748 result.sort_by(|a, b| {
749 b.confidence
750 .partial_cmp(&a.confidence)
751 .unwrap_or(std::cmp::Ordering::Equal)
752 });
753 Ok(Json(result))
754}
755
756async fn get_belief(
757 State(state): State<AppState>,
758 Path(id): Path<String>,
759) -> Result<Json<Belief>, (StatusCode, String)> {
760 match state.cognition.get_belief(&id).await {
761 Some(belief) => Ok(Json(belief)),
762 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
763 }
764}
765
766async fn list_attention(
767 State(state): State<AppState>,
768) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
769 Ok(Json(state.cognition.get_attention_queue().await))
770}
771
772async fn list_proposals(
773 State(state): State<AppState>,
774) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
775 let proposals = state.cognition.get_proposals().await;
776 let mut result: Vec<Proposal> = proposals.into_values().collect();
777 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
778 Ok(Json(result))
779}
780
781async fn approve_proposal(
782 State(state): State<AppState>,
783 Path(id): Path<String>,
784) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
785 state
786 .cognition
787 .approve_proposal(&id)
788 .await
789 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
790 .map_err(internal_error)
791}
792
793async fn list_receipts(
794 State(state): State<AppState>,
795) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
796 Ok(Json(state.cognition.get_receipts().await))
797}
798
799async fn get_workspace(
800 State(state): State<AppState>,
801) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
802 Ok(Json(state.cognition.get_workspace().await))
803}
804
805async fn get_governance(
806 State(state): State<AppState>,
807) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
808 Ok(Json(state.cognition.get_governance().await))
809}
810
811async fn get_persona(
812 State(state): State<AppState>,
813 axum::extract::Path(id): axum::extract::Path<String>,
814) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
815 state
816 .cognition
817 .get_persona(&id)
818 .await
819 .map(Json)
820 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
821}
822
823#[derive(Deserialize)]
826struct AuditQuery {
827 limit: Option<usize>,
828 category: Option<String>,
829}
830
831async fn list_audit_entries(
832 State(state): State<AppState>,
833 Query(query): Query<AuditQuery>,
834) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
835 let limit = query.limit.unwrap_or(100).min(1000);
836
837 let entries = if let Some(ref cat) = query.category {
838 let category = match cat.as_str() {
839 "api" => AuditCategory::Api,
840 "tool" | "tool_execution" => AuditCategory::ToolExecution,
841 "session" => AuditCategory::Session,
842 "cognition" => AuditCategory::Cognition,
843 "swarm" => AuditCategory::Swarm,
844 "auth" => AuditCategory::Auth,
845 "k8s" => AuditCategory::K8s,
846 "sandbox" => AuditCategory::Sandbox,
847 "config" => AuditCategory::Config,
848 _ => {
849 return Err((
850 StatusCode::BAD_REQUEST,
851 format!("Unknown category: {}", cat),
852 ));
853 }
854 };
855 state.audit_log.by_category(category, limit).await
856 } else {
857 state.audit_log.recent(limit).await
858 };
859
860 Ok(Json(entries))
861}
862
863async fn get_k8s_status(
866 State(state): State<AppState>,
867) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
868 Ok(Json(state.k8s.status().await))
869}
870
871#[derive(Deserialize)]
872struct ScaleRequest {
873 replicas: i32,
874}
875
876async fn k8s_scale(
877 State(state): State<AppState>,
878 Json(req): Json<ScaleRequest>,
879) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
880 if req.replicas < 0 || req.replicas > 100 {
881 return Err((
882 StatusCode::BAD_REQUEST,
883 "Replicas must be between 0 and 100".to_string(),
884 ));
885 }
886
887 state
888 .audit_log
889 .log(
890 AuditCategory::K8s,
891 format!("scale:{}", req.replicas),
892 AuditOutcome::Success,
893 None,
894 None,
895 )
896 .await;
897
898 state
899 .k8s
900 .scale(req.replicas)
901 .await
902 .map(Json)
903 .map_err(internal_error)
904}
905
906async fn k8s_restart(
907 State(state): State<AppState>,
908) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
909 state
910 .audit_log
911 .log(
912 AuditCategory::K8s,
913 "rolling_restart",
914 AuditOutcome::Success,
915 None,
916 None,
917 )
918 .await;
919
920 state
921 .k8s
922 .rolling_restart()
923 .await
924 .map(Json)
925 .map_err(internal_error)
926}
927
928async fn k8s_list_pods(
929 State(state): State<AppState>,
930) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
931 state
932 .k8s
933 .list_pods()
934 .await
935 .map(Json)
936 .map_err(internal_error)
937}
938
939async fn k8s_actions(
940 State(state): State<AppState>,
941) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
942 Ok(Json(state.k8s.recent_actions(100).await))
943}
944
945#[derive(Deserialize)]
946struct SpawnSubagentRequest {
947 subagent_id: String,
948 #[serde(default)]
949 image: Option<String>,
950 #[serde(default)]
951 env_vars: std::collections::HashMap<String, String>,
952}
953
954async fn k8s_spawn_subagent(
955 State(state): State<AppState>,
956 Json(req): Json<SpawnSubagentRequest>,
957) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
958 state
959 .k8s
960 .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
961 .await
962 .map(Json)
963 .map_err(internal_error)
964}
965
966async fn k8s_delete_subagent(
967 State(state): State<AppState>,
968 axum::extract::Path(id): axum::extract::Path<String>,
969) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
970 state
971 .k8s
972 .delete_subagent_pod(&id)
973 .await
974 .map(Json)
975 .map_err(internal_error)
976}
977
978async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
980 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
981 let signing_key = SigningKey::from_env();
982 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
983 Json(PluginListResponse {
984 server_fingerprint,
985 signing_available: !test_sig.is_empty(),
986 plugins: Vec::<PluginManifest>::new(),
987 })
988}
989
990#[derive(Serialize)]
991struct PluginListResponse {
992 server_fingerprint: String,
993 signing_available: bool,
994 plugins: Vec<PluginManifest>,
995}
996
997fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
998 let message = error.to_string();
999 if message.contains("not found") {
1000 return (StatusCode::NOT_FOUND, message);
1001 }
1002 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1003 return (StatusCode::BAD_REQUEST, message);
1004 }
1005 (StatusCode::INTERNAL_SERVER_ERROR, message)
1006}
1007
1008fn env_bool(name: &str, default: bool) -> bool {
1009 std::env::var(name)
1010 .ok()
1011 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1012 "1" | "true" | "yes" | "on" => Some(true),
1013 "0" | "false" | "no" | "off" => Some(false),
1014 _ => None,
1015 })
1016 .unwrap_or(default)
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::match_policy_rule;
1022
1023 #[test]
1024 fn policy_prompt_session_requires_execute_permission() {
1025 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1026 assert_eq!(permission, Some("agent:execute"));
1027 }
1028
1029 #[test]
1030 fn policy_create_session_keeps_sessions_write_permission() {
1031 let permission = match_policy_rule("/api/session", "POST");
1032 assert_eq!(permission, Some("sessions:write"));
1033 }
1034
1035 #[test]
1036 fn policy_proposal_approval_requires_execute_permission() {
1037 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1038 assert_eq!(permission, Some("agent:execute"));
1039 }
1040}