1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16 executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24 Router,
25 body::Body,
26 extract::Path,
27 extract::{Query, State},
28 http::{Request, StatusCode},
29 middleware::{self, Next},
30 response::sse::{Event, KeepAlive, Sse},
31 response::{Json, Response},
32 routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
39use tower_http::trace::TraceLayer;
40
41#[derive(Clone)]
43pub struct AppState {
44 pub config: Arc<Config>,
45 pub cognition: Arc<CognitionRuntime>,
46 pub audit_log: AuditLog,
47 pub k8s: Arc<K8sManager>,
48 pub auth: AuthState,
49 pub bus: Arc<AgentBus>,
50}
51
52async fn audit_middleware(
54 State(state): State<AppState>,
55 request: Request<Body>,
56 next: Next,
57) -> Response {
58 let method = request.method().clone();
59 let path = request.uri().path().to_string();
60 let started = std::time::Instant::now();
61
62 let response = next.run(request).await;
63
64 let duration_ms = started.elapsed().as_millis() as u64;
65 let status = response.status().as_u16();
66 let outcome = if status < 400 {
67 AuditOutcome::Success
68 } else if status == 401 || status == 403 {
69 AuditOutcome::Denied
70 } else {
71 AuditOutcome::Failure
72 };
73
74 state
75 .audit_log
76 .record(audit::AuditEntry {
77 id: uuid::Uuid::new_v4().to_string(),
78 timestamp: chrono::Utc::now(),
79 category: AuditCategory::Api,
80 action: format!("{} {}", method, path),
81 principal: None,
82 outcome,
83 detail: Some(serde_json::json!({ "status": status })),
84 duration_ms: Some(duration_ms),
85 })
86 .await;
87
88 response
89}
90
91struct PolicyRule {
94 pattern: &'static str,
95 methods: Option<&'static [&'static str]>,
96 permission: &'static str,
97}
98
99const POLICY_RULES: &[PolicyRule] = &[
100 PolicyRule {
102 pattern: "/health",
103 methods: None,
104 permission: "",
105 },
106 PolicyRule {
107 pattern: "/a2a/",
108 methods: None,
109 permission: "",
110 },
111 PolicyRule {
113 pattern: "/v1/k8s/scale",
114 methods: Some(&["POST"]),
115 permission: "admin:access",
116 },
117 PolicyRule {
118 pattern: "/v1/k8s/restart",
119 methods: Some(&["POST"]),
120 permission: "admin:access",
121 },
122 PolicyRule {
123 pattern: "/v1/k8s/",
124 methods: Some(&["GET"]),
125 permission: "admin:access",
126 },
127 PolicyRule {
129 pattern: "/v1/k8s/subagent",
130 methods: Some(&["POST", "DELETE"]),
131 permission: "admin:access",
132 },
133 PolicyRule {
135 pattern: "/v1/plugins",
136 methods: Some(&["GET"]),
137 permission: "agent:read",
138 },
139 PolicyRule {
141 pattern: "/v1/audit",
142 methods: None,
143 permission: "admin:access",
144 },
145 PolicyRule {
147 pattern: "/v1/audit/replay",
148 methods: Some(&["GET"]),
149 permission: "admin:access",
150 },
151 PolicyRule {
153 pattern: "/v1/cognition/start",
154 methods: Some(&["POST"]),
155 permission: "agent:execute",
156 },
157 PolicyRule {
158 pattern: "/v1/cognition/stop",
159 methods: Some(&["POST"]),
160 permission: "agent:execute",
161 },
162 PolicyRule {
163 pattern: "/v1/cognition/",
164 methods: Some(&["GET"]),
165 permission: "agent:read",
166 },
167 PolicyRule {
169 pattern: "/v1/swarm/personas",
170 methods: Some(&["POST"]),
171 permission: "agent:execute",
172 },
173 PolicyRule {
174 pattern: "/v1/swarm/",
175 methods: Some(&["POST"]),
176 permission: "agent:execute",
177 },
178 PolicyRule {
179 pattern: "/v1/swarm/",
180 methods: Some(&["GET"]),
181 permission: "agent:read",
182 },
183 PolicyRule {
186 pattern: "/api/session/",
187 methods: Some(&["POST"]),
188 permission: "agent:execute",
189 },
190 PolicyRule {
191 pattern: "/api/session",
192 methods: Some(&["POST"]),
193 permission: "sessions:write",
194 },
195 PolicyRule {
196 pattern: "/api/session",
197 methods: Some(&["GET"]),
198 permission: "sessions:read",
199 },
200 PolicyRule {
202 pattern: "/v1/cognition/proposals/",
203 methods: Some(&["POST"]),
204 permission: "agent:execute",
205 },
206 PolicyRule {
208 pattern: "/api/version",
209 methods: None,
210 permission: "agent:read",
211 },
212 PolicyRule {
213 pattern: "/api/config",
214 methods: None,
215 permission: "agent:read",
216 },
217 PolicyRule {
218 pattern: "/api/provider",
219 methods: None,
220 permission: "agent:read",
221 },
222 PolicyRule {
223 pattern: "/api/agent",
224 methods: None,
225 permission: "agent:read",
226 },
227];
228
229fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
232 for rule in POLICY_RULES {
233 let matches = if rule.pattern.ends_with('/') {
234 path.starts_with(rule.pattern)
235 } else {
236 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
237 };
238 if matches {
239 if let Some(allowed_methods) = rule.methods {
240 if !allowed_methods.contains(&method) {
241 continue;
242 }
243 }
244 return Some(rule.permission);
245 }
246 }
247 None
248}
249
250async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
257 let path = request.uri().path().to_string();
258 let method = request.method().as_str().to_string();
259
260 let permission = match match_policy_rule(&path, &method) {
261 None | Some("") => return Ok(next.run(request).await),
262 Some(perm) => perm,
263 };
264
265 let user = policy::PolicyUser {
269 user_id: "bearer-token-user".to_string(),
270 roles: vec!["admin".to_string()],
271 tenant_id: None,
272 scopes: vec![],
273 auth_source: "static_token".to_string(),
274 };
275
276 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
277 tracing::warn!(
278 path = %path,
279 method = %method,
280 permission = %permission,
281 "Policy middleware denied request"
282 );
283 return Err(status);
284 }
285
286 Ok(next.run(request).await)
287}
288
289pub async fn serve(args: ServeArgs) -> Result<()> {
291 let t0 = std::time::Instant::now();
292 tracing::info!("[startup] begin");
293 let config = Config::load().await?;
294 tracing::info!(
295 elapsed_ms = t0.elapsed().as_millis(),
296 "[startup] config loaded"
297 );
298 let mut cognition = CognitionRuntime::new_from_env();
299 tracing::info!(
300 elapsed_ms = t0.elapsed().as_millis(),
301 "[startup] cognition runtime created"
302 );
303
304 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
306 tracing::info!(
307 elapsed_ms = t0.elapsed().as_millis(),
308 "[startup] tools registered"
309 );
310 let cognition = Arc::new(cognition);
311
312 let audit_log = AuditLog::from_env();
314 let _ = audit::init_audit_log(audit_log.clone());
315
316 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
318 let k8s = Arc::new(K8sManager::new().await);
319 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
320 if k8s.is_available() {
321 tracing::info!("K8s self-deployment enabled");
322 }
323
324 let auth_state = AuthState::from_env();
326 tracing::info!(
327 token_len = auth_state.token().len(),
328 "Auth is mandatory. Token required for all API endpoints."
329 );
330 tracing::info!(
331 audit_entries = audit_log.count().await,
332 "Audit log initialized"
333 );
334
335 let bus = AgentBus::new().into_arc();
337 tracing::info!(
338 elapsed_ms = t0.elapsed().as_millis(),
339 "[startup] bus created"
340 );
341
342 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
343 tracing::info!(
344 elapsed_ms = t0.elapsed().as_millis(),
345 "[startup] auto-starting cognition"
346 );
347 if let Err(error) = cognition.start(None).await {
348 tracing::warn!(%error, "Failed to auto-start cognition loop");
349 } else {
350 tracing::info!("Perpetual cognition auto-started");
351 }
352 }
353
354 tracing::info!(
355 elapsed_ms = t0.elapsed().as_millis(),
356 "[startup] building routes"
357 );
358 let addr = format!("{}:{}", args.hostname, args.port);
359
360 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
362 let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
363
364 let a2a_router = a2a_server.router();
366
367 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
369 .ok()
370 .and_then(|p| p.parse::<u16>().ok())
371 .unwrap_or(50051);
372 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
373 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
374 let grpc_service = grpc_store.into_service();
375 tokio::spawn(async move {
376 tracing::info!("gRPC A2A server listening on {}", grpc_addr);
377 if let Err(e) = tonic::transport::Server::builder()
378 .add_service(grpc_service)
379 .serve(grpc_addr)
380 .await
381 {
382 tracing::error!("gRPC server error: {}", e);
383 }
384 });
385
386 let state = AppState {
387 config: Arc::new(config),
388 cognition,
389 audit_log,
390 k8s,
391 auth: auth_state.clone(),
392 bus,
393 };
394
395 let app = Router::new()
396 .route("/health", get(health))
398 .route("/api/version", get(get_version))
400 .route("/api/session", get(list_sessions).post(create_session))
401 .route("/api/session/{id}", get(get_session))
402 .route("/api/session/{id}/prompt", post(prompt_session))
403 .route("/api/config", get(get_config))
404 .route("/api/provider", get(list_providers))
405 .route("/api/agent", get(list_agents))
406 .route("/v1/cognition/start", post(start_cognition))
408 .route("/v1/cognition/stop", post(stop_cognition))
409 .route("/v1/cognition/status", get(get_cognition_status))
410 .route("/v1/cognition/stream", get(stream_cognition))
411 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
412 .route("/v1/swarm/personas", post(create_persona))
414 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
415 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
416 .route("/v1/swarm/lineage", get(get_swarm_lineage))
417 .route("/v1/cognition/beliefs", get(list_beliefs))
419 .route("/v1/cognition/beliefs/{id}", get(get_belief))
420 .route("/v1/cognition/attention", get(list_attention))
421 .route("/v1/cognition/proposals", get(list_proposals))
422 .route(
423 "/v1/cognition/proposals/{id}/approve",
424 post(approve_proposal),
425 )
426 .route("/v1/cognition/receipts", get(list_receipts))
427 .route("/v1/cognition/workspace", get(get_workspace))
428 .route("/v1/cognition/governance", get(get_governance))
429 .route("/v1/cognition/personas/{id}", get(get_persona))
430 .route("/v1/audit", get(list_audit_entries))
432 .route("/v1/audit/replay", get(replay_session_events))
434 .route("/v1/audit/replay/index", get(list_session_event_files))
435 .route("/v1/k8s/status", get(get_k8s_status))
437 .route("/v1/k8s/scale", post(k8s_scale))
438 .route("/v1/k8s/restart", post(k8s_restart))
439 .route("/v1/k8s/pods", get(k8s_list_pods))
440 .route("/v1/k8s/actions", get(k8s_actions))
441 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
442 .route(
443 "/v1/k8s/subagent/{id}",
444 axum::routing::delete(k8s_delete_subagent),
445 )
446 .route("/v1/plugins", get(list_plugins))
448 .with_state(state.clone())
449 .nest("/a2a", a2a_router)
451 .layer(middleware::from_fn_with_state(
453 state.clone(),
454 audit_middleware,
455 ))
456 .layer(axum::Extension(state.auth.clone()))
457 .layer(middleware::from_fn(policy_middleware))
458 .layer(middleware::from_fn(auth::require_auth))
459 .layer(
461 CorsLayer::new()
462 .allow_origin(AllowOrigin::mirror_request())
463 .allow_credentials(true)
464 .allow_methods(AllowMethods::mirror_request())
465 .allow_headers(AllowHeaders::mirror_request()),
466 )
467 .layer(TraceLayer::new_for_http());
468
469 tracing::info!(
470 elapsed_ms = t0.elapsed().as_millis(),
471 "[startup] router built, binding"
472 );
473 let listener = tokio::net::TcpListener::bind(&addr).await?;
474 tracing::info!(
475 elapsed_ms = t0.elapsed().as_millis(),
476 "[startup] listening on http://{}",
477 addr
478 );
479
480 axum::serve(listener, app).await?;
481
482 Ok(())
483}
484
485async fn health() -> &'static str {
487 "ok"
488}
489
490#[derive(Serialize)]
492struct VersionInfo {
493 version: &'static str,
494 name: &'static str,
495 binary_hash: Option<String>,
496}
497
498async fn get_version() -> Json<VersionInfo> {
499 let binary_hash = std::env::current_exe()
500 .ok()
501 .and_then(|p| hash_file(&p).ok());
502 Json(VersionInfo {
503 version: env!("CARGO_PKG_VERSION"),
504 name: env!("CARGO_PKG_NAME"),
505 binary_hash,
506 })
507}
508
509#[derive(Deserialize)]
511struct ListSessionsQuery {
512 limit: Option<usize>,
513}
514
515async fn list_sessions(
516 Query(query): Query<ListSessionsQuery>,
517) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
518 let sessions = crate::session::list_sessions()
519 .await
520 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
521
522 let limit = query.limit.unwrap_or(50);
523 Ok(Json(sessions.into_iter().take(limit).collect()))
524}
525
526#[derive(Deserialize)]
528struct CreateSessionRequest {
529 title: Option<String>,
530 agent: Option<String>,
531}
532
533async fn create_session(
534 Json(req): Json<CreateSessionRequest>,
535) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
536 let mut session = crate::session::Session::new()
537 .await
538 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
539
540 session.title = req.title;
541 if let Some(agent) = req.agent {
542 session.agent = agent;
543 }
544
545 session
546 .save()
547 .await
548 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
549
550 Ok(Json(session))
551}
552
553async fn get_session(
555 axum::extract::Path(id): axum::extract::Path<String>,
556) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
557 let session = crate::session::Session::load(&id)
558 .await
559 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
560
561 Ok(Json(session))
562}
563
564#[derive(Deserialize)]
566struct PromptRequest {
567 message: String,
568}
569
570async fn prompt_session(
571 axum::extract::Path(id): axum::extract::Path<String>,
572 Json(req): Json<PromptRequest>,
573) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
574 if req.message.trim().is_empty() {
576 return Err((
577 StatusCode::BAD_REQUEST,
578 "Message cannot be empty".to_string(),
579 ));
580 }
581
582 tracing::info!(
584 session_id = %id,
585 message_len = req.message.len(),
586 "Received prompt request"
587 );
588
589 Err((
591 StatusCode::NOT_IMPLEMENTED,
592 "Prompt execution not yet implemented".to_string(),
593 ))
594}
595
596async fn get_config(State(state): State<AppState>) -> Json<Config> {
598 Json((*state.config).clone())
599}
600
601async fn list_providers() -> Json<Vec<String>> {
603 Json(vec![
604 "openai".to_string(),
605 "anthropic".to_string(),
606 "google".to_string(),
607 ])
608}
609
610async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
612 let registry = crate::agent::AgentRegistry::with_builtins();
613 Json(registry.list().into_iter().cloned().collect())
614}
615
616async fn start_cognition(
617 State(state): State<AppState>,
618 payload: Option<Json<StartCognitionRequest>>,
619) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
620 state
621 .cognition
622 .start(payload.map(|Json(body)| body))
623 .await
624 .map(Json)
625 .map_err(internal_error)
626}
627
628async fn stop_cognition(
629 State(state): State<AppState>,
630 payload: Option<Json<StopCognitionRequest>>,
631) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
632 let reason = payload.and_then(|Json(body)| body.reason);
633 state
634 .cognition
635 .stop(reason)
636 .await
637 .map(Json)
638 .map_err(internal_error)
639}
640
641async fn get_cognition_status(
642 State(state): State<AppState>,
643) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
644 Ok(Json(state.cognition.status().await))
645}
646
647async fn stream_cognition(
648 State(state): State<AppState>,
649) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
650 let rx = state.cognition.subscribe_events();
651
652 let event_stream = stream::unfold(rx, |mut rx| async move {
653 match rx.recv().await {
654 Ok(event) => {
655 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
656 let sse_event = Event::default().event("cognition").data(payload);
657 Some((Ok(sse_event), rx))
658 }
659 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
660 let lag_event = Event::default()
661 .event("lag")
662 .data(format!("skipped {}", skipped));
663 Some((Ok(lag_event), rx))
664 }
665 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
666 }
667 });
668
669 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
670}
671
672async fn get_latest_snapshot(
673 State(state): State<AppState>,
674) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
675 match state.cognition.latest_snapshot().await {
676 Some(snapshot) => Ok(Json(snapshot)),
677 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
678 }
679}
680
681async fn create_persona(
682 State(state): State<AppState>,
683 Json(req): Json<CreatePersonaRequest>,
684) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
685 state
686 .cognition
687 .create_persona(req)
688 .await
689 .map(Json)
690 .map_err(internal_error)
691}
692
693async fn spawn_persona(
694 State(state): State<AppState>,
695 Path(id): Path<String>,
696 Json(req): Json<SpawnPersonaRequest>,
697) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
698 state
699 .cognition
700 .spawn_child(&id, req)
701 .await
702 .map(Json)
703 .map_err(internal_error)
704}
705
706async fn reap_persona(
707 State(state): State<AppState>,
708 Path(id): Path<String>,
709 payload: Option<Json<ReapPersonaRequest>>,
710) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
711 let req = payload
712 .map(|Json(body)| body)
713 .unwrap_or(ReapPersonaRequest {
714 cascade: Some(false),
715 reason: None,
716 });
717
718 state
719 .cognition
720 .reap_persona(&id, req)
721 .await
722 .map(Json)
723 .map_err(internal_error)
724}
725
726async fn get_swarm_lineage(
727 State(state): State<AppState>,
728) -> Result<Json<LineageGraph>, (StatusCode, String)> {
729 Ok(Json(state.cognition.lineage_graph().await))
730}
731
732#[derive(Deserialize)]
735struct BeliefFilter {
736 status: Option<String>,
737 persona: Option<String>,
738}
739
740async fn list_beliefs(
741 State(state): State<AppState>,
742 Query(filter): Query<BeliefFilter>,
743) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
744 let beliefs = state.cognition.get_beliefs().await;
745 let mut result: Vec<Belief> = beliefs.into_values().collect();
746
747 if let Some(status) = &filter.status {
748 result.retain(|b| {
749 let s = serde_json::to_string(&b.status).unwrap_or_default();
750 s.contains(status)
751 });
752 }
753 if let Some(persona) = &filter.persona {
754 result.retain(|b| &b.asserted_by == persona);
755 }
756
757 result.sort_by(|a, b| {
758 b.confidence
759 .partial_cmp(&a.confidence)
760 .unwrap_or(std::cmp::Ordering::Equal)
761 });
762 Ok(Json(result))
763}
764
765async fn get_belief(
766 State(state): State<AppState>,
767 Path(id): Path<String>,
768) -> Result<Json<Belief>, (StatusCode, String)> {
769 match state.cognition.get_belief(&id).await {
770 Some(belief) => Ok(Json(belief)),
771 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
772 }
773}
774
775async fn list_attention(
776 State(state): State<AppState>,
777) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
778 Ok(Json(state.cognition.get_attention_queue().await))
779}
780
781async fn list_proposals(
782 State(state): State<AppState>,
783) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
784 let proposals = state.cognition.get_proposals().await;
785 let mut result: Vec<Proposal> = proposals.into_values().collect();
786 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
787 Ok(Json(result))
788}
789
790async fn approve_proposal(
791 State(state): State<AppState>,
792 Path(id): Path<String>,
793) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
794 state
795 .cognition
796 .approve_proposal(&id)
797 .await
798 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
799 .map_err(internal_error)
800}
801
802async fn list_receipts(
803 State(state): State<AppState>,
804) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
805 Ok(Json(state.cognition.get_receipts().await))
806}
807
808async fn get_workspace(
809 State(state): State<AppState>,
810) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
811 Ok(Json(state.cognition.get_workspace().await))
812}
813
814async fn get_governance(
815 State(state): State<AppState>,
816) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
817 Ok(Json(state.cognition.get_governance().await))
818}
819
820async fn get_persona(
821 State(state): State<AppState>,
822 axum::extract::Path(id): axum::extract::Path<String>,
823) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
824 state
825 .cognition
826 .get_persona(&id)
827 .await
828 .map(Json)
829 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
830}
831
832#[derive(Deserialize)]
835struct AuditQuery {
836 limit: Option<usize>,
837 category: Option<String>,
838}
839
840async fn list_audit_entries(
841 State(state): State<AppState>,
842 Query(query): Query<AuditQuery>,
843) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
844 let limit = query.limit.unwrap_or(100).min(1000);
845
846 let entries = if let Some(ref cat) = query.category {
847 let category = match cat.as_str() {
848 "api" => AuditCategory::Api,
849 "tool" | "tool_execution" => AuditCategory::ToolExecution,
850 "session" => AuditCategory::Session,
851 "cognition" => AuditCategory::Cognition,
852 "swarm" => AuditCategory::Swarm,
853 "auth" => AuditCategory::Auth,
854 "k8s" => AuditCategory::K8s,
855 "sandbox" => AuditCategory::Sandbox,
856 "config" => AuditCategory::Config,
857 _ => {
858 return Err((
859 StatusCode::BAD_REQUEST,
860 format!("Unknown category: {}", cat),
861 ));
862 }
863 };
864 state.audit_log.by_category(category, limit).await
865 } else {
866 state.audit_log.recent(limit).await
867 };
868
869 Ok(Json(entries))
870}
871
872#[derive(Deserialize)]
877struct ReplayQuery {
878 session_id: String,
880 start_offset: Option<u64>,
882 end_offset: Option<u64>,
884 limit: Option<usize>,
886 tool_name: Option<String>,
888}
889
890async fn replay_session_events(
894 Query(query): Query<ReplayQuery>,
895) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
896 use std::path::PathBuf;
897
898 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
899 .map(PathBuf::from)
900 .ok()
901 .ok_or_else(|| {
902 (
903 StatusCode::SERVICE_UNAVAILABLE,
904 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
905 )
906 })?;
907
908 let session_dir = base_dir.join(&query.session_id);
909
910 if !session_dir.exists() {
912 return Err((
913 StatusCode::NOT_FOUND,
914 format!("Session not found: {}", query.session_id),
915 ));
916 }
917
918 let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
919
920 let entries = std::fs::read_dir(&session_dir)
922 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
923
924 for entry in entries {
925 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
926 let path = entry.path();
927
928 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
929 continue;
930 }
931
932 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
934
935 if let Some(offsets) = filename
936 .strip_prefix("T")
937 .or_else(|| filename.strip_prefix("202"))
938 {
939 let parts: Vec<&str> = offsets.split('-').collect();
941 if parts.len() >= 4 {
942 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
943 let end: u64 = parts[parts.len() - 1]
944 .trim_end_matches(".jsonl")
945 .parse()
946 .unwrap_or(0);
947
948 if let Some(query_start) = query.start_offset {
950 if end <= query_start {
951 continue;
952 }
953 }
954 if let Some(query_end) = query.end_offset {
955 if start >= query_end {
956 continue;
957 }
958 }
959
960 let content = std::fs::read_to_string(&path)
962 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
963
964 for line in content.lines() {
965 if line.trim().is_empty() {
966 continue;
967 }
968 if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
969 if let Some(ref tool_filter) = query.tool_name {
971 if let Some(event_tool) =
972 event.get("tool_name").and_then(|v| v.as_str())
973 {
974 if event_tool != tool_filter {
975 continue;
976 }
977 } else {
978 continue;
979 }
980 }
981 all_events.push((start, end, event));
982 }
983 }
984 }
985 }
986 }
987
988 all_events.sort_by_key(|(s, _, _)| *s);
990
991 let limit = query.limit.unwrap_or(1000).min(10000);
993 let events: Vec<_> = all_events
994 .into_iter()
995 .take(limit)
996 .map(|(_, _, e)| e)
997 .collect();
998
999 Ok(Json(events))
1000}
1001
1002async fn list_session_event_files(
1004 Query(query): Query<ReplayQuery>,
1005) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
1006 use std::path::PathBuf;
1007
1008 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1009 .map(PathBuf::from)
1010 .ok()
1011 .ok_or_else(|| {
1012 (
1013 StatusCode::SERVICE_UNAVAILABLE,
1014 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1015 )
1016 })?;
1017
1018 let session_dir = base_dir.join(&query.session_id);
1019 if !session_dir.exists() {
1020 return Err((
1021 StatusCode::NOT_FOUND,
1022 format!("Session not found: {}", query.session_id),
1023 ));
1024 }
1025
1026 let mut files: Vec<EventFileMeta> = Vec::new();
1027
1028 let entries = std::fs::read_dir(&session_dir)
1030 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1031
1032 for entry in entries {
1033 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1034 let path = entry.path();
1035
1036 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1037 continue;
1038 }
1039
1040 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1041
1042 if let Some(offsets) = filename
1044 .strip_prefix("T")
1045 .or_else(|| filename.strip_prefix("202"))
1046 {
1047 let parts: Vec<&str> = offsets.split('-').collect();
1048 if parts.len() >= 4 {
1049 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1050 let end: u64 = parts[parts.len() - 1]
1051 .trim_end_matches(".jsonl")
1052 .parse()
1053 .unwrap_or(0);
1054
1055 let metadata = std::fs::metadata(&path)
1056 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1057
1058 files.push(EventFileMeta {
1059 filename: filename.to_string(),
1060 start_offset: start,
1061 end_offset: end,
1062 size_bytes: metadata.len(),
1063 });
1064 }
1065 }
1066 }
1067
1068 files.sort_by_key(|f| f.start_offset);
1069 Ok(Json(files))
1070}
1071
1072#[derive(Serialize)]
1073struct EventFileMeta {
1074 filename: String,
1075 start_offset: u64,
1076 end_offset: u64,
1077 size_bytes: u64,
1078}
1079
1080async fn get_k8s_status(
1083 State(state): State<AppState>,
1084) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
1085 Ok(Json(state.k8s.status().await))
1086}
1087
1088#[derive(Deserialize)]
1089struct ScaleRequest {
1090 replicas: i32,
1091}
1092
1093async fn k8s_scale(
1094 State(state): State<AppState>,
1095 Json(req): Json<ScaleRequest>,
1096) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1097 if req.replicas < 0 || req.replicas > 100 {
1098 return Err((
1099 StatusCode::BAD_REQUEST,
1100 "Replicas must be between 0 and 100".to_string(),
1101 ));
1102 }
1103
1104 state
1105 .audit_log
1106 .log(
1107 AuditCategory::K8s,
1108 format!("scale:{}", req.replicas),
1109 AuditOutcome::Success,
1110 None,
1111 None,
1112 )
1113 .await;
1114
1115 state
1116 .k8s
1117 .scale(req.replicas)
1118 .await
1119 .map(Json)
1120 .map_err(internal_error)
1121}
1122
1123async fn k8s_restart(
1124 State(state): State<AppState>,
1125) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1126 state
1127 .audit_log
1128 .log(
1129 AuditCategory::K8s,
1130 "rolling_restart",
1131 AuditOutcome::Success,
1132 None,
1133 None,
1134 )
1135 .await;
1136
1137 state
1138 .k8s
1139 .rolling_restart()
1140 .await
1141 .map(Json)
1142 .map_err(internal_error)
1143}
1144
1145async fn k8s_list_pods(
1146 State(state): State<AppState>,
1147) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
1148 state
1149 .k8s
1150 .list_pods()
1151 .await
1152 .map(Json)
1153 .map_err(internal_error)
1154}
1155
1156async fn k8s_actions(
1157 State(state): State<AppState>,
1158) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
1159 Ok(Json(state.k8s.recent_actions(100).await))
1160}
1161
1162#[derive(Deserialize)]
1163struct SpawnSubagentRequest {
1164 subagent_id: String,
1165 #[serde(default)]
1166 image: Option<String>,
1167 #[serde(default)]
1168 env_vars: std::collections::HashMap<String, String>,
1169}
1170
1171async fn k8s_spawn_subagent(
1172 State(state): State<AppState>,
1173 Json(req): Json<SpawnSubagentRequest>,
1174) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1175 state
1176 .k8s
1177 .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
1178 .await
1179 .map(Json)
1180 .map_err(internal_error)
1181}
1182
1183async fn k8s_delete_subagent(
1184 State(state): State<AppState>,
1185 axum::extract::Path(id): axum::extract::Path<String>,
1186) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1187 state
1188 .k8s
1189 .delete_subagent_pod(&id)
1190 .await
1191 .map(Json)
1192 .map_err(internal_error)
1193}
1194
1195async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
1197 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
1198 let signing_key = SigningKey::from_env();
1199 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
1200 Json(PluginListResponse {
1201 server_fingerprint,
1202 signing_available: !test_sig.is_empty(),
1203 plugins: Vec::<PluginManifest>::new(),
1204 })
1205}
1206
1207#[derive(Serialize)]
1208struct PluginListResponse {
1209 server_fingerprint: String,
1210 signing_available: bool,
1211 plugins: Vec<PluginManifest>,
1212}
1213
1214fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
1215 let message = error.to_string();
1216 if message.contains("not found") {
1217 return (StatusCode::NOT_FOUND, message);
1218 }
1219 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1220 return (StatusCode::BAD_REQUEST, message);
1221 }
1222 (StatusCode::INTERNAL_SERVER_ERROR, message)
1223}
1224
1225fn env_bool(name: &str, default: bool) -> bool {
1226 std::env::var(name)
1227 .ok()
1228 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1229 "1" | "true" | "yes" | "on" => Some(true),
1230 "0" | "false" | "no" | "off" => Some(false),
1231 _ => None,
1232 })
1233 .unwrap_or(default)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::match_policy_rule;
1239
1240 #[test]
1241 fn policy_prompt_session_requires_execute_permission() {
1242 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1243 assert_eq!(permission, Some("agent:execute"));
1244 }
1245
1246 #[test]
1247 fn policy_create_session_keeps_sessions_write_permission() {
1248 let permission = match_policy_rule("/api/session", "POST");
1249 assert_eq!(permission, Some("sessions:write"));
1250 }
1251
1252 #[test]
1253 fn policy_proposal_approval_requires_execute_permission() {
1254 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1255 assert_eq!(permission, Some("agent:execute"));
1256 }
1257}