1pub mod auth;
6pub mod policy;
7
8use crate::a2a;
9use crate::audit::{self, AuditCategory, AuditLog, AuditOutcome};
10use crate::bus::AgentBus;
11use crate::cli::ServeArgs;
12use crate::cognition::{
13 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
14 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
15 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
16 executor::DecisionReceipt,
17};
18use crate::config::Config;
19use crate::k8s::K8sManager;
20use crate::tool::{PluginManifest, SigningKey, hash_bytes, hash_file};
21use anyhow::Result;
22use auth::AuthState;
23use axum::{
24 Router,
25 body::Body,
26 extract::Path,
27 extract::{Query, State},
28 http::{Request, StatusCode},
29 middleware::{self, Next},
30 response::sse::{Event, KeepAlive, Sse},
31 response::{Json, Response},
32 routing::{get, post},
33};
34use futures::stream;
35use serde::{Deserialize, Serialize};
36use std::convert::Infallible;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
40use tower_http::trace::TraceLayer;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct KnativeTask {
45 pub task_id: String,
46 pub title: String,
47 pub description: String,
48 pub agent_type: String,
49 pub priority: i32,
50 pub received_at: chrono::DateTime<chrono::Utc>,
51 pub status: String,
52}
53
54#[derive(Clone)]
56pub struct KnativeTaskQueue {
57 tasks: Arc<Mutex<Vec<KnativeTask>>>,
58}
59
60impl KnativeTaskQueue {
61 pub fn new() -> Self {
62 Self {
63 tasks: Arc::new(Mutex::new(Vec::new())),
64 }
65 }
66
67 pub async fn push(&self, task: KnativeTask) {
68 self.tasks.lock().await.push(task);
69 }
70
71 pub async fn pop(&self) -> Option<KnativeTask> {
72 self.tasks.lock().await.pop()
73 }
74
75 pub async fn list(&self) -> Vec<KnativeTask> {
76 self.tasks.lock().await.clone()
77 }
78
79 pub async fn get(&self, task_id: &str) -> Option<KnativeTask> {
80 self.tasks
81 .lock()
82 .await
83 .iter()
84 .find(|t| t.task_id == task_id)
85 .cloned()
86 }
87
88 pub async fn update_status(&self, task_id: &str, status: &str) -> bool {
89 let mut tasks = self.tasks.lock().await;
90 if let Some(task) = tasks.iter_mut().find(|t| t.task_id == task_id) {
91 task.status = status.to_string();
92 true
93 } else {
94 false
95 }
96 }
97}
98
99impl Default for KnativeTaskQueue {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105#[derive(Clone)]
107pub struct AppState {
108 pub config: Arc<Config>,
109 pub cognition: Arc<CognitionRuntime>,
110 pub audit_log: AuditLog,
111 pub k8s: Arc<K8sManager>,
112 pub auth: AuthState,
113 pub bus: Arc<AgentBus>,
114 pub knative_tasks: KnativeTaskQueue,
115}
116
117async fn audit_middleware(
119 State(state): State<AppState>,
120 request: Request<Body>,
121 next: Next,
122) -> Response {
123 let method = request.method().clone();
124 let path = request.uri().path().to_string();
125 let started = std::time::Instant::now();
126
127 let response = next.run(request).await;
128
129 let duration_ms = started.elapsed().as_millis() as u64;
130 let status = response.status().as_u16();
131 let outcome = if status < 400 {
132 AuditOutcome::Success
133 } else if status == 401 || status == 403 {
134 AuditOutcome::Denied
135 } else {
136 AuditOutcome::Failure
137 };
138
139 state
140 .audit_log
141 .record(audit::AuditEntry {
142 id: uuid::Uuid::new_v4().to_string(),
143 timestamp: chrono::Utc::now(),
144 category: AuditCategory::Api,
145 action: format!("{} {}", method, path),
146 principal: None,
147 outcome,
148 detail: Some(serde_json::json!({ "status": status })),
149 duration_ms: Some(duration_ms),
150 okr_id: None,
151 okr_run_id: None,
152 relay_id: None,
153 session_id: None,
154 })
155 .await;
156
157 response
158}
159
160struct PolicyRule {
163 pattern: &'static str,
164 methods: Option<&'static [&'static str]>,
165 permission: &'static str,
166}
167
168const POLICY_RULES: &[PolicyRule] = &[
169 PolicyRule {
171 pattern: "/health",
172 methods: None,
173 permission: "",
174 },
175 PolicyRule {
176 pattern: "/task",
177 methods: None,
178 permission: "",
179 },
180 PolicyRule {
181 pattern: "/v1/knative/",
182 methods: None,
183 permission: "",
184 },
185 PolicyRule {
186 pattern: "/a2a/",
187 methods: None,
188 permission: "",
189 },
190 PolicyRule {
192 pattern: "/v1/k8s/scale",
193 methods: Some(&["POST"]),
194 permission: "admin:access",
195 },
196 PolicyRule {
197 pattern: "/v1/k8s/restart",
198 methods: Some(&["POST"]),
199 permission: "admin:access",
200 },
201 PolicyRule {
202 pattern: "/v1/k8s/",
203 methods: Some(&["GET"]),
204 permission: "admin:access",
205 },
206 PolicyRule {
208 pattern: "/v1/k8s/subagent",
209 methods: Some(&["POST", "DELETE"]),
210 permission: "admin:access",
211 },
212 PolicyRule {
214 pattern: "/v1/plugins",
215 methods: Some(&["GET"]),
216 permission: "agent:read",
217 },
218 PolicyRule {
220 pattern: "/v1/audit",
221 methods: None,
222 permission: "admin:access",
223 },
224 PolicyRule {
226 pattern: "/v1/audit/replay",
227 methods: Some(&["GET"]),
228 permission: "admin:access",
229 },
230 PolicyRule {
232 pattern: "/v1/cognition/start",
233 methods: Some(&["POST"]),
234 permission: "agent:execute",
235 },
236 PolicyRule {
237 pattern: "/v1/cognition/stop",
238 methods: Some(&["POST"]),
239 permission: "agent:execute",
240 },
241 PolicyRule {
242 pattern: "/v1/cognition/",
243 methods: Some(&["GET"]),
244 permission: "agent:read",
245 },
246 PolicyRule {
248 pattern: "/v1/swarm/personas",
249 methods: Some(&["POST"]),
250 permission: "agent:execute",
251 },
252 PolicyRule {
253 pattern: "/v1/swarm/",
254 methods: Some(&["POST"]),
255 permission: "agent:execute",
256 },
257 PolicyRule {
258 pattern: "/v1/swarm/",
259 methods: Some(&["GET"]),
260 permission: "agent:read",
261 },
262 PolicyRule {
265 pattern: "/api/session/",
266 methods: Some(&["POST"]),
267 permission: "agent:execute",
268 },
269 PolicyRule {
270 pattern: "/api/session",
271 methods: Some(&["POST"]),
272 permission: "sessions:write",
273 },
274 PolicyRule {
275 pattern: "/api/session",
276 methods: Some(&["GET"]),
277 permission: "sessions:read",
278 },
279 PolicyRule {
281 pattern: "/v1/cognition/proposals/",
282 methods: Some(&["POST"]),
283 permission: "agent:execute",
284 },
285 PolicyRule {
287 pattern: "/api/version",
288 methods: None,
289 permission: "agent:read",
290 },
291 PolicyRule {
292 pattern: "/api/config",
293 methods: None,
294 permission: "agent:read",
295 },
296 PolicyRule {
297 pattern: "/api/provider",
298 methods: None,
299 permission: "agent:read",
300 },
301 PolicyRule {
302 pattern: "/api/agent",
303 methods: None,
304 permission: "agent:read",
305 },
306];
307
308fn match_policy_rule(path: &str, method: &str) -> Option<&'static str> {
311 for rule in POLICY_RULES {
312 let matches = if rule.pattern.ends_with('/') {
313 path.starts_with(rule.pattern)
314 } else {
315 path == rule.pattern || path.starts_with(&format!("{}/", rule.pattern))
316 };
317 if matches {
318 if let Some(allowed_methods) = rule.methods {
319 if !allowed_methods.contains(&method) {
320 continue;
321 }
322 }
323 return Some(rule.permission);
324 }
325 }
326 None
327}
328
329async fn policy_middleware(request: Request<Body>, next: Next) -> Result<Response, StatusCode> {
336 let path = request.uri().path().to_string();
337 let method = request.method().as_str().to_string();
338
339 let permission = match match_policy_rule(&path, &method) {
340 None | Some("") => return Ok(next.run(request).await),
341 Some(perm) => perm,
342 };
343
344 let user = policy::PolicyUser {
348 user_id: "bearer-token-user".to_string(),
349 roles: vec!["admin".to_string()],
350 tenant_id: None,
351 scopes: vec![],
352 auth_source: "static_token".to_string(),
353 };
354
355 if let Err(status) = policy::enforce_policy(&user, permission, None).await {
356 tracing::warn!(
357 path = %path,
358 method = %method,
359 permission = %permission,
360 "Policy middleware denied request"
361 );
362 return Err(status);
363 }
364
365 Ok(next.run(request).await)
366}
367
368pub async fn serve(args: ServeArgs) -> Result<()> {
370 let t0 = std::time::Instant::now();
371 tracing::info!("[startup] begin");
372 let config = Config::load().await?;
373 tracing::info!(
374 elapsed_ms = t0.elapsed().as_millis(),
375 "[startup] config loaded"
376 );
377 let mut cognition = CognitionRuntime::new_from_env();
378 tracing::info!(
379 elapsed_ms = t0.elapsed().as_millis(),
380 "[startup] cognition runtime created"
381 );
382
383 cognition.set_tools(Arc::new(crate::tool::ToolRegistry::with_defaults()));
385 tracing::info!(
386 elapsed_ms = t0.elapsed().as_millis(),
387 "[startup] tools registered"
388 );
389 let cognition = Arc::new(cognition);
390
391 let audit_log = AuditLog::from_env();
393 let _ = audit::init_audit_log(audit_log.clone());
394
395 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] pre-k8s");
397 let k8s = Arc::new(K8sManager::new().await);
398 tracing::info!(elapsed_ms = t0.elapsed().as_millis(), "[startup] k8s done");
399 if k8s.is_available() {
400 tracing::info!("K8s self-deployment enabled");
401 }
402
403 let auth_state = AuthState::from_env();
405 tracing::info!(
406 token_len = auth_state.token().len(),
407 "Auth is mandatory. Token required for all API endpoints."
408 );
409 tracing::info!(
410 audit_entries = audit_log.count().await,
411 "Audit log initialized"
412 );
413
414 let bus = AgentBus::new().into_arc();
416 tracing::info!(
417 elapsed_ms = t0.elapsed().as_millis(),
418 "[startup] bus created"
419 );
420
421 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
422 tracing::info!(
423 elapsed_ms = t0.elapsed().as_millis(),
424 "[startup] auto-starting cognition"
425 );
426 if let Err(error) = cognition.start(None).await {
427 tracing::warn!(%error, "Failed to auto-start cognition loop");
428 } else {
429 tracing::info!("Perpetual cognition auto-started");
430 }
431 }
432
433 tracing::info!(
434 elapsed_ms = t0.elapsed().as_millis(),
435 "[startup] building routes"
436 );
437 let addr = format!("{}:{}", args.hostname, args.port);
438
439 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
441 let a2a_server = a2a::server::A2AServer::new(agent_card.clone());
442
443 let a2a_router = a2a_server.router();
445
446 let grpc_port = std::env::var("CODETETHER_GRPC_PORT")
448 .ok()
449 .and_then(|p| p.parse::<u16>().ok())
450 .unwrap_or(50051);
451 let grpc_addr: std::net::SocketAddr = format!("{}:{}", args.hostname, grpc_port).parse()?;
452 let grpc_store = crate::a2a::grpc::GrpcTaskStore::with_bus(agent_card, bus.clone());
453 let grpc_service = grpc_store.into_service();
454 tokio::spawn(async move {
455 tracing::info!("gRPC A2A server listening on {}", grpc_addr);
456 if let Err(e) = tonic::transport::Server::builder()
457 .add_service(grpc_service)
458 .serve(grpc_addr)
459 .await
460 {
461 tracing::error!("gRPC server error: {}", e);
462 }
463 });
464
465 let state = AppState {
466 config: Arc::new(config),
467 cognition,
468 audit_log,
469 k8s,
470 auth: auth_state.clone(),
471 bus,
472 knative_tasks: KnativeTaskQueue::new(),
473 };
474
475 let app = Router::new()
476 .route("/health", get(health))
478 .route("/task", post(receive_task_event))
480 .route("/v1/knative/tasks", get(list_knative_tasks))
482 .route("/v1/knative/tasks/{task_id}", get(get_knative_task))
483 .route(
484 "/v1/knative/tasks/{task_id}/claim",
485 post(claim_knative_task),
486 )
487 .route(
488 "/v1/knative/tasks/{task_id}/complete",
489 post(complete_knative_task),
490 )
491 .route("/api/version", get(get_version))
493 .route("/api/session", get(list_sessions).post(create_session))
494 .route("/api/session/{id}", get(get_session))
495 .route("/api/session/{id}/prompt", post(prompt_session))
496 .route("/api/config", get(get_config))
497 .route("/api/provider", get(list_providers))
498 .route("/api/agent", get(list_agents))
499 .route("/v1/cognition/start", post(start_cognition))
501 .route("/v1/cognition/stop", post(stop_cognition))
502 .route("/v1/cognition/status", get(get_cognition_status))
503 .route("/v1/cognition/stream", get(stream_cognition))
504 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
505 .route("/v1/swarm/personas", post(create_persona))
507 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
508 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
509 .route("/v1/swarm/lineage", get(get_swarm_lineage))
510 .route("/v1/cognition/beliefs", get(list_beliefs))
512 .route("/v1/cognition/beliefs/{id}", get(get_belief))
513 .route("/v1/cognition/attention", get(list_attention))
514 .route("/v1/cognition/proposals", get(list_proposals))
515 .route(
516 "/v1/cognition/proposals/{id}/approve",
517 post(approve_proposal),
518 )
519 .route("/v1/cognition/receipts", get(list_receipts))
520 .route("/v1/cognition/workspace", get(get_workspace))
521 .route("/v1/cognition/governance", get(get_governance))
522 .route("/v1/cognition/personas/{id}", get(get_persona))
523 .route("/v1/audit", get(list_audit_entries))
525 .route("/v1/audit/replay", get(replay_session_events))
527 .route("/v1/audit/replay/index", get(list_session_event_files))
528 .route("/v1/k8s/status", get(get_k8s_status))
530 .route("/v1/k8s/scale", post(k8s_scale))
531 .route("/v1/k8s/restart", post(k8s_restart))
532 .route("/v1/k8s/pods", get(k8s_list_pods))
533 .route("/v1/k8s/actions", get(k8s_actions))
534 .route("/v1/k8s/subagent", post(k8s_spawn_subagent))
535 .route(
536 "/v1/k8s/subagent/{id}",
537 axum::routing::delete(k8s_delete_subagent),
538 )
539 .route("/v1/plugins", get(list_plugins))
541 .with_state(state.clone())
542 .nest("/a2a", a2a_router)
544 .layer(middleware::from_fn_with_state(
546 state.clone(),
547 audit_middleware,
548 ))
549 .layer(middleware::from_fn(policy_middleware))
550 .layer(middleware::from_fn(auth::require_auth))
551 .layer(axum::Extension(state.auth.clone()))
552 .layer(
554 CorsLayer::new()
555 .allow_origin(AllowOrigin::mirror_request())
556 .allow_credentials(true)
557 .allow_methods(AllowMethods::mirror_request())
558 .allow_headers(AllowHeaders::mirror_request()),
559 )
560 .layer(TraceLayer::new_for_http());
561
562 tracing::info!(
563 elapsed_ms = t0.elapsed().as_millis(),
564 "[startup] router built, binding"
565 );
566 let listener = tokio::net::TcpListener::bind(&addr).await?;
567 tracing::info!(
568 elapsed_ms = t0.elapsed().as_millis(),
569 "[startup] listening on http://{}",
570 addr
571 );
572
573 axum::serve(listener, app).await?;
574
575 Ok(())
576}
577
578async fn health() -> &'static str {
580 "ok"
581}
582
583async fn list_knative_tasks(State(state): State<AppState>) -> Json<Vec<KnativeTask>> {
585 Json(state.knative_tasks.list().await)
586}
587
588async fn get_knative_task(
590 State(state): State<AppState>,
591 Path(task_id): Path<String>,
592) -> Result<Json<KnativeTask>, (StatusCode, String)> {
593 state
594 .knative_tasks
595 .get(&task_id)
596 .await
597 .map(Json)
598 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
599}
600
601async fn claim_knative_task(
603 State(state): State<AppState>,
604 Path(task_id): Path<String>,
605) -> Result<Json<KnativeTask>, (StatusCode, String)> {
606 let updated = state
608 .knative_tasks
609 .update_status(&task_id, "processing")
610 .await;
611 if !updated {
612 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
613 }
614
615 state
616 .knative_tasks
617 .get(&task_id)
618 .await
619 .map(Json)
620 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
621}
622
623async fn complete_knative_task(
625 State(state): State<AppState>,
626 Path(task_id): Path<String>,
627) -> Result<Json<KnativeTask>, (StatusCode, String)> {
628 let updated = state
630 .knative_tasks
631 .update_status(&task_id, "completed")
632 .await;
633 if !updated {
634 return Err((StatusCode::NOT_FOUND, format!("Task {} not found", task_id)));
635 }
636
637 state
638 .knative_tasks
639 .get(&task_id)
640 .await
641 .map(Json)
642 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Task {} not found", task_id)))
643}
644
645async fn receive_task_event(
648 State(state): State<AppState>,
649 Json(event): Json<CloudEvent>,
650) -> Result<Json<CloudEventResponse>, (StatusCode, String)> {
651 tracing::info!(
652 event_type = %event.event_type,
653 event_id = %event.id,
654 "Received CloudEvent from Knative"
655 );
656
657 state
659 .audit_log
660 .log(
661 audit::AuditCategory::Api,
662 format!("cloudevents:{}", event.event_type),
663 AuditOutcome::Success,
664 None,
665 None,
666 )
667 .await;
668
669 match event.event_type.as_str() {
671 "codetether.task.created" | "task.created" => {
672 if let Some(data) = event.data {
674 let task_id = data
675 .get("task_id")
676 .and_then(|v| v.as_str())
677 .unwrap_or("unknown")
678 .to_string();
679 let title = data
680 .get("title")
681 .and_then(|v| v.as_str())
682 .unwrap_or("")
683 .to_string();
684 let description = data
685 .get("description")
686 .and_then(|v| v.as_str())
687 .unwrap_or("")
688 .to_string();
689 let agent_type = data
690 .get("agent_type")
691 .and_then(|v| v.as_str())
692 .unwrap_or("build")
693 .to_string();
694 let priority = data.get("priority").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
695
696 let task = KnativeTask {
697 task_id: task_id.clone(),
698 title,
699 description,
700 agent_type,
701 priority,
702 received_at: chrono::Utc::now(),
703 status: "queued".to_string(),
704 };
705
706 state.knative_tasks.push(task).await;
707 tracing::info!(task_id = %task_id, "Task queued for execution");
708 }
709 }
710 "codetether.task.cancelled" => {
711 tracing::info!("Task cancellation event received");
712 if let Some(data) = event.data {
714 if let Some(task_id) = data.get("task_id").and_then(|v| v.as_str()) {
715 let _ = state
716 .knative_tasks
717 .update_status(task_id, "cancelled")
718 .await;
719 tracing::info!(task_id = %task_id, "Task cancelled");
720 }
721 }
722 }
723 _ => {
724 tracing::warn!(event_type = %event.event_type, "Unknown CloudEvent type");
725 }
726 }
727
728 Ok(Json(CloudEventResponse {
729 status: "accepted".to_string(),
730 event_id: event.id,
731 }))
732}
733
734#[derive(Deserialize, Serialize)]
736struct CloudEvent {
737 id: String,
739 source: String,
741 #[serde(rename = "type")]
743 event_type: String,
744 #[serde(rename = "time")]
746 timestamp: Option<String>,
747 #[serde(rename = "specversion")]
749 spec_version: Option<String>,
750 data: Option<serde_json::Value>,
752}
753
754#[derive(Serialize)]
756struct CloudEventResponse {
757 status: String,
758 event_id: String,
759}
760
761#[derive(Serialize)]
763struct VersionInfo {
764 version: &'static str,
765 name: &'static str,
766 binary_hash: Option<String>,
767}
768
769async fn get_version() -> Json<VersionInfo> {
770 let binary_hash = std::env::current_exe()
771 .ok()
772 .and_then(|p| hash_file(&p).ok());
773 Json(VersionInfo {
774 version: env!("CARGO_PKG_VERSION"),
775 name: env!("CARGO_PKG_NAME"),
776 binary_hash,
777 })
778}
779
780#[derive(Deserialize)]
782struct ListSessionsQuery {
783 limit: Option<usize>,
784 offset: Option<usize>,
785}
786
787async fn list_sessions(
788 Query(query): Query<ListSessionsQuery>,
789) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
790 let sessions = crate::session::list_sessions()
791 .await
792 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
793
794 let offset = query.offset.unwrap_or(0);
795 let limit = query.limit.unwrap_or(100);
796 Ok(Json(sessions.into_iter().skip(offset).take(limit).collect()))
797}
798
799#[derive(Deserialize)]
801struct CreateSessionRequest {
802 title: Option<String>,
803 agent: Option<String>,
804}
805
806async fn create_session(
807 Json(req): Json<CreateSessionRequest>,
808) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
809 let mut session = crate::session::Session::new()
810 .await
811 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
812
813 session.title = req.title;
814 if let Some(agent) = req.agent {
815 session.agent = agent;
816 }
817
818 session
819 .save()
820 .await
821 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
822
823 Ok(Json(session))
824}
825
826async fn get_session(
828 axum::extract::Path(id): axum::extract::Path<String>,
829) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
830 let session = crate::session::Session::load(&id)
831 .await
832 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
833
834 Ok(Json(session))
835}
836
837#[derive(Deserialize)]
839struct PromptRequest {
840 message: String,
841}
842
843async fn prompt_session(
844 axum::extract::Path(id): axum::extract::Path<String>,
845 Json(req): Json<PromptRequest>,
846) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
847 if req.message.trim().is_empty() {
849 return Err((
850 StatusCode::BAD_REQUEST,
851 "Message cannot be empty".to_string(),
852 ));
853 }
854
855 tracing::info!(
857 session_id = %id,
858 message_len = req.message.len(),
859 "Received prompt request"
860 );
861
862 Err((
864 StatusCode::NOT_IMPLEMENTED,
865 "Prompt execution not yet implemented".to_string(),
866 ))
867}
868
869async fn get_config(State(state): State<AppState>) -> Json<Config> {
871 Json((*state.config).clone())
872}
873
874async fn list_providers() -> Json<Vec<String>> {
876 Json(vec![
877 "openai".to_string(),
878 "anthropic".to_string(),
879 "google".to_string(),
880 ])
881}
882
883async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
885 let registry = crate::agent::AgentRegistry::with_builtins();
886 Json(registry.list().into_iter().cloned().collect())
887}
888
889async fn start_cognition(
890 State(state): State<AppState>,
891 payload: Option<Json<StartCognitionRequest>>,
892) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
893 state
894 .cognition
895 .start(payload.map(|Json(body)| body))
896 .await
897 .map(Json)
898 .map_err(internal_error)
899}
900
901async fn stop_cognition(
902 State(state): State<AppState>,
903 payload: Option<Json<StopCognitionRequest>>,
904) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
905 let reason = payload.and_then(|Json(body)| body.reason);
906 state
907 .cognition
908 .stop(reason)
909 .await
910 .map(Json)
911 .map_err(internal_error)
912}
913
914async fn get_cognition_status(
915 State(state): State<AppState>,
916) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
917 Ok(Json(state.cognition.status().await))
918}
919
920async fn stream_cognition(
921 State(state): State<AppState>,
922) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
923 let rx = state.cognition.subscribe_events();
924
925 let event_stream = stream::unfold(rx, |mut rx| async move {
926 match rx.recv().await {
927 Ok(event) => {
928 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
929 let sse_event = Event::default().event("cognition").data(payload);
930 Some((Ok(sse_event), rx))
931 }
932 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
933 let lag_event = Event::default()
934 .event("lag")
935 .data(format!("skipped {}", skipped));
936 Some((Ok(lag_event), rx))
937 }
938 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
939 }
940 });
941
942 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
943}
944
945async fn get_latest_snapshot(
946 State(state): State<AppState>,
947) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
948 match state.cognition.latest_snapshot().await {
949 Some(snapshot) => Ok(Json(snapshot)),
950 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
951 }
952}
953
954async fn create_persona(
955 State(state): State<AppState>,
956 Json(req): Json<CreatePersonaRequest>,
957) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
958 state
959 .cognition
960 .create_persona(req)
961 .await
962 .map(Json)
963 .map_err(internal_error)
964}
965
966async fn spawn_persona(
967 State(state): State<AppState>,
968 Path(id): Path<String>,
969 Json(req): Json<SpawnPersonaRequest>,
970) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
971 state
972 .cognition
973 .spawn_child(&id, req)
974 .await
975 .map(Json)
976 .map_err(internal_error)
977}
978
979async fn reap_persona(
980 State(state): State<AppState>,
981 Path(id): Path<String>,
982 payload: Option<Json<ReapPersonaRequest>>,
983) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
984 let req = payload
985 .map(|Json(body)| body)
986 .unwrap_or(ReapPersonaRequest {
987 cascade: Some(false),
988 reason: None,
989 });
990
991 state
992 .cognition
993 .reap_persona(&id, req)
994 .await
995 .map(Json)
996 .map_err(internal_error)
997}
998
999async fn get_swarm_lineage(
1000 State(state): State<AppState>,
1001) -> Result<Json<LineageGraph>, (StatusCode, String)> {
1002 Ok(Json(state.cognition.lineage_graph().await))
1003}
1004
1005#[derive(Deserialize)]
1008struct BeliefFilter {
1009 status: Option<String>,
1010 persona: Option<String>,
1011}
1012
1013async fn list_beliefs(
1014 State(state): State<AppState>,
1015 Query(filter): Query<BeliefFilter>,
1016) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
1017 let beliefs = state.cognition.get_beliefs().await;
1018 let mut result: Vec<Belief> = beliefs.into_values().collect();
1019
1020 if let Some(status) = &filter.status {
1021 result.retain(|b| {
1022 let s = serde_json::to_string(&b.status).unwrap_or_default();
1023 s.contains(status)
1024 });
1025 }
1026 if let Some(persona) = &filter.persona {
1027 result.retain(|b| &b.asserted_by == persona);
1028 }
1029
1030 result.sort_by(|a, b| {
1031 b.confidence
1032 .partial_cmp(&a.confidence)
1033 .unwrap_or(std::cmp::Ordering::Equal)
1034 });
1035 Ok(Json(result))
1036}
1037
1038async fn get_belief(
1039 State(state): State<AppState>,
1040 Path(id): Path<String>,
1041) -> Result<Json<Belief>, (StatusCode, String)> {
1042 match state.cognition.get_belief(&id).await {
1043 Some(belief) => Ok(Json(belief)),
1044 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
1045 }
1046}
1047
1048async fn list_attention(
1049 State(state): State<AppState>,
1050) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
1051 Ok(Json(state.cognition.get_attention_queue().await))
1052}
1053
1054async fn list_proposals(
1055 State(state): State<AppState>,
1056) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
1057 let proposals = state.cognition.get_proposals().await;
1058 let mut result: Vec<Proposal> = proposals.into_values().collect();
1059 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
1060 Ok(Json(result))
1061}
1062
1063async fn approve_proposal(
1064 State(state): State<AppState>,
1065 Path(id): Path<String>,
1066) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
1067 state
1068 .cognition
1069 .approve_proposal(&id)
1070 .await
1071 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
1072 .map_err(internal_error)
1073}
1074
1075async fn list_receipts(
1076 State(state): State<AppState>,
1077) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
1078 Ok(Json(state.cognition.get_receipts().await))
1079}
1080
1081async fn get_workspace(
1082 State(state): State<AppState>,
1083) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
1084 Ok(Json(state.cognition.get_workspace().await))
1085}
1086
1087async fn get_governance(
1088 State(state): State<AppState>,
1089) -> Result<Json<crate::cognition::SwarmGovernance>, (StatusCode, String)> {
1090 Ok(Json(state.cognition.get_governance().await))
1091}
1092
1093async fn get_persona(
1094 State(state): State<AppState>,
1095 axum::extract::Path(id): axum::extract::Path<String>,
1096) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
1097 state
1098 .cognition
1099 .get_persona(&id)
1100 .await
1101 .map(Json)
1102 .ok_or_else(|| (StatusCode::NOT_FOUND, format!("Persona not found: {}", id)))
1103}
1104
1105#[derive(Deserialize)]
1108struct AuditQuery {
1109 limit: Option<usize>,
1110 category: Option<String>,
1111}
1112
1113async fn list_audit_entries(
1114 State(state): State<AppState>,
1115 Query(query): Query<AuditQuery>,
1116) -> Result<Json<Vec<audit::AuditEntry>>, (StatusCode, String)> {
1117 let limit = query.limit.unwrap_or(100).min(1000);
1118
1119 let entries = if let Some(ref cat) = query.category {
1120 let category = match cat.as_str() {
1121 "api" => AuditCategory::Api,
1122 "tool" | "tool_execution" => AuditCategory::ToolExecution,
1123 "session" => AuditCategory::Session,
1124 "cognition" => AuditCategory::Cognition,
1125 "swarm" => AuditCategory::Swarm,
1126 "auth" => AuditCategory::Auth,
1127 "k8s" => AuditCategory::K8s,
1128 "sandbox" => AuditCategory::Sandbox,
1129 "config" => AuditCategory::Config,
1130 _ => {
1131 return Err((
1132 StatusCode::BAD_REQUEST,
1133 format!("Unknown category: {}", cat),
1134 ));
1135 }
1136 };
1137 state.audit_log.by_category(category, limit).await
1138 } else {
1139 state.audit_log.recent(limit).await
1140 };
1141
1142 Ok(Json(entries))
1143}
1144
1145#[derive(Deserialize)]
1150struct ReplayQuery {
1151 session_id: String,
1153 start_offset: Option<u64>,
1155 end_offset: Option<u64>,
1157 limit: Option<usize>,
1159 tool_name: Option<String>,
1161}
1162
1163async fn replay_session_events(
1167 Query(query): Query<ReplayQuery>,
1168) -> Result<Json<Vec<serde_json::Value>>, (StatusCode, String)> {
1169 use std::path::PathBuf;
1170
1171 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1172 .map(PathBuf::from)
1173 .ok()
1174 .ok_or_else(|| {
1175 (
1176 StatusCode::SERVICE_UNAVAILABLE,
1177 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1178 )
1179 })?;
1180
1181 let session_dir = base_dir.join(&query.session_id);
1182
1183 if !session_dir.exists() {
1185 return Err((
1186 StatusCode::NOT_FOUND,
1187 format!("Session not found: {}", query.session_id),
1188 ));
1189 }
1190
1191 let mut all_events: Vec<(u64, u64, serde_json::Value)> = Vec::new();
1192
1193 let entries = std::fs::read_dir(&session_dir)
1195 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1196
1197 for entry in entries {
1198 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1199 let path = entry.path();
1200
1201 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1202 continue;
1203 }
1204
1205 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1207
1208 if let Some(offsets) = filename
1209 .strip_prefix("T")
1210 .or_else(|| filename.strip_prefix("202"))
1211 {
1212 let parts: Vec<&str> = offsets.split('-').collect();
1214 if parts.len() >= 4 {
1215 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1216 let end: u64 = parts[parts.len() - 1]
1217 .trim_end_matches(".jsonl")
1218 .parse()
1219 .unwrap_or(0);
1220
1221 if let Some(query_start) = query.start_offset {
1223 if end <= query_start {
1224 continue;
1225 }
1226 }
1227 if let Some(query_end) = query.end_offset {
1228 if start >= query_end {
1229 continue;
1230 }
1231 }
1232
1233 let content = std::fs::read_to_string(&path)
1235 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1236
1237 for line in content.lines() {
1238 if line.trim().is_empty() {
1239 continue;
1240 }
1241 if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
1242 if let Some(ref tool_filter) = query.tool_name {
1244 if let Some(event_tool) =
1245 event.get("tool_name").and_then(|v| v.as_str())
1246 {
1247 if event_tool != tool_filter {
1248 continue;
1249 }
1250 } else {
1251 continue;
1252 }
1253 }
1254 all_events.push((start, end, event));
1255 }
1256 }
1257 }
1258 }
1259 }
1260
1261 all_events.sort_by_key(|(s, _, _)| *s);
1263
1264 let limit = query.limit.unwrap_or(1000).min(10000);
1266 let events: Vec<_> = all_events
1267 .into_iter()
1268 .take(limit)
1269 .map(|(_, _, e)| e)
1270 .collect();
1271
1272 Ok(Json(events))
1273}
1274
1275async fn list_session_event_files(
1277 Query(query): Query<ReplayQuery>,
1278) -> Result<Json<Vec<EventFileMeta>>, (StatusCode, String)> {
1279 use std::path::PathBuf;
1280
1281 let base_dir = std::env::var("CODETETHER_EVENT_STREAM_PATH")
1282 .map(PathBuf::from)
1283 .ok()
1284 .ok_or_else(|| {
1285 (
1286 StatusCode::SERVICE_UNAVAILABLE,
1287 "Event stream not configured. Set CODETETHER_EVENT_STREAM_PATH.".to_string(),
1288 )
1289 })?;
1290
1291 let session_dir = base_dir.join(&query.session_id);
1292 if !session_dir.exists() {
1293 return Err((
1294 StatusCode::NOT_FOUND,
1295 format!("Session not found: {}", query.session_id),
1296 ));
1297 }
1298
1299 let mut files: Vec<EventFileMeta> = Vec::new();
1300
1301 let entries = std::fs::read_dir(&session_dir)
1303 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1304
1305 for entry in entries {
1306 let entry = entry.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1307 let path = entry.path();
1308
1309 if path.extension().and_then(|s| s.to_str()) != Some("jsonl") {
1310 continue;
1311 }
1312
1313 let filename = path.file_name().and_then(|s| s.to_str()).unwrap_or("");
1314
1315 if let Some(offsets) = filename
1317 .strip_prefix("T")
1318 .or_else(|| filename.strip_prefix("202"))
1319 {
1320 let parts: Vec<&str> = offsets.split('-').collect();
1321 if parts.len() >= 4 {
1322 let start: u64 = parts[parts.len() - 2].parse().unwrap_or(0);
1323 let end: u64 = parts[parts.len() - 1]
1324 .trim_end_matches(".jsonl")
1325 .parse()
1326 .unwrap_or(0);
1327
1328 let metadata = std::fs::metadata(&path)
1329 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
1330
1331 files.push(EventFileMeta {
1332 filename: filename.to_string(),
1333 start_offset: start,
1334 end_offset: end,
1335 size_bytes: metadata.len(),
1336 });
1337 }
1338 }
1339 }
1340
1341 files.sort_by_key(|f| f.start_offset);
1342 Ok(Json(files))
1343}
1344
1345#[derive(Serialize)]
1346struct EventFileMeta {
1347 filename: String,
1348 start_offset: u64,
1349 end_offset: u64,
1350 size_bytes: u64,
1351}
1352
1353async fn get_k8s_status(
1356 State(state): State<AppState>,
1357) -> Result<Json<crate::k8s::K8sStatus>, (StatusCode, String)> {
1358 Ok(Json(state.k8s.status().await))
1359}
1360
1361#[derive(Deserialize)]
1362struct ScaleRequest {
1363 replicas: i32,
1364}
1365
1366async fn k8s_scale(
1367 State(state): State<AppState>,
1368 Json(req): Json<ScaleRequest>,
1369) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1370 if req.replicas < 0 || req.replicas > 100 {
1371 return Err((
1372 StatusCode::BAD_REQUEST,
1373 "Replicas must be between 0 and 100".to_string(),
1374 ));
1375 }
1376
1377 state
1378 .audit_log
1379 .log(
1380 AuditCategory::K8s,
1381 format!("scale:{}", req.replicas),
1382 AuditOutcome::Success,
1383 None,
1384 None,
1385 )
1386 .await;
1387
1388 state
1389 .k8s
1390 .scale(req.replicas)
1391 .await
1392 .map(Json)
1393 .map_err(internal_error)
1394}
1395
1396async fn k8s_restart(
1397 State(state): State<AppState>,
1398) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1399 state
1400 .audit_log
1401 .log(
1402 AuditCategory::K8s,
1403 "rolling_restart",
1404 AuditOutcome::Success,
1405 None,
1406 None,
1407 )
1408 .await;
1409
1410 state
1411 .k8s
1412 .rolling_restart()
1413 .await
1414 .map(Json)
1415 .map_err(internal_error)
1416}
1417
1418async fn k8s_list_pods(
1419 State(state): State<AppState>,
1420) -> Result<Json<Vec<crate::k8s::PodInfo>>, (StatusCode, String)> {
1421 state
1422 .k8s
1423 .list_pods()
1424 .await
1425 .map(Json)
1426 .map_err(internal_error)
1427}
1428
1429async fn k8s_actions(
1430 State(state): State<AppState>,
1431) -> Result<Json<Vec<crate::k8s::DeployAction>>, (StatusCode, String)> {
1432 Ok(Json(state.k8s.recent_actions(100).await))
1433}
1434
1435#[derive(Deserialize)]
1436struct SpawnSubagentRequest {
1437 subagent_id: String,
1438 #[serde(default)]
1439 image: Option<String>,
1440 #[serde(default)]
1441 env_vars: std::collections::HashMap<String, String>,
1442}
1443
1444async fn k8s_spawn_subagent(
1445 State(state): State<AppState>,
1446 Json(req): Json<SpawnSubagentRequest>,
1447) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1448 state
1449 .k8s
1450 .spawn_subagent_pod(&req.subagent_id, req.image.as_deref(), req.env_vars)
1451 .await
1452 .map(Json)
1453 .map_err(internal_error)
1454}
1455
1456async fn k8s_delete_subagent(
1457 State(state): State<AppState>,
1458 axum::extract::Path(id): axum::extract::Path<String>,
1459) -> Result<Json<crate::k8s::DeployAction>, (StatusCode, String)> {
1460 state
1461 .k8s
1462 .delete_subagent_pod(&id)
1463 .await
1464 .map(Json)
1465 .map_err(internal_error)
1466}
1467
1468async fn list_plugins(State(_state): State<AppState>) -> Json<PluginListResponse> {
1470 let server_fingerprint = hash_bytes(env!("CARGO_PKG_VERSION").as_bytes());
1471 let signing_key = SigningKey::from_env();
1472 let test_sig = signing_key.sign("_probe", "0.0.0", &server_fingerprint);
1473 Json(PluginListResponse {
1474 server_fingerprint,
1475 signing_available: !test_sig.is_empty(),
1476 plugins: Vec::<PluginManifest>::new(),
1477 })
1478}
1479
1480#[derive(Serialize)]
1481struct PluginListResponse {
1482 server_fingerprint: String,
1483 signing_available: bool,
1484 plugins: Vec<PluginManifest>,
1485}
1486
1487fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
1488 let message = error.to_string();
1489 if message.contains("not found") {
1490 return (StatusCode::NOT_FOUND, message);
1491 }
1492 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
1493 return (StatusCode::BAD_REQUEST, message);
1494 }
1495 (StatusCode::INTERNAL_SERVER_ERROR, message)
1496}
1497
1498fn env_bool(name: &str, default: bool) -> bool {
1499 std::env::var(name)
1500 .ok()
1501 .and_then(|v| match v.to_ascii_lowercase().as_str() {
1502 "1" | "true" | "yes" | "on" => Some(true),
1503 "0" | "false" | "no" | "off" => Some(false),
1504 _ => None,
1505 })
1506 .unwrap_or(default)
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511 use super::match_policy_rule;
1512
1513 #[test]
1514 fn policy_prompt_session_requires_execute_permission() {
1515 let permission = match_policy_rule("/api/session/abc123/prompt", "POST");
1516 assert_eq!(permission, Some("agent:execute"));
1517 }
1518
1519 #[test]
1520 fn policy_create_session_keeps_sessions_write_permission() {
1521 let permission = match_policy_rule("/api/session", "POST");
1522 assert_eq!(permission, Some("sessions:write"));
1523 }
1524
1525 #[test]
1526 fn policy_proposal_approval_requires_execute_permission() {
1527 let permission = match_policy_rule("/v1/cognition/proposals/p1/approve", "POST");
1528 assert_eq!(permission, Some("agent:execute"));
1529 }
1530}