1use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
9 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
10 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
11 executor::DecisionReceipt,
12};
13use crate::config::Config;
14use anyhow::Result;
15use axum::{
16 Router,
17 extract::Path,
18 extract::{Query, State},
19 http::StatusCode,
20 response::Json,
21 response::sse::{Event, KeepAlive, Sse},
22 routing::{get, post},
23};
24use futures::stream;
25use serde::{Deserialize, Serialize};
26use std::convert::Infallible;
27use std::sync::Arc;
28use tower_http::cors::{Any, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31#[derive(Clone)]
33pub struct AppState {
34 pub config: Arc<Config>,
35 pub cognition: Arc<CognitionRuntime>,
36}
37
38pub async fn serve(args: ServeArgs) -> Result<()> {
40 let config = Config::load().await?;
41 let cognition = Arc::new(CognitionRuntime::new_from_env());
42
43 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
44 if let Err(error) = cognition.start(None).await {
45 tracing::warn!(%error, "Failed to auto-start cognition loop");
46 } else {
47 tracing::info!("Perpetual cognition auto-started");
48 }
49 }
50
51 let state = AppState {
52 config: Arc::new(config),
53 cognition,
54 };
55
56 let addr = format!("{}:{}", args.hostname, args.port);
57
58 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
60 let a2a_server = a2a::server::A2AServer::new(agent_card);
61
62 let a2a_router = a2a_server.router();
64
65 let app = Router::new()
66 .route("/health", get(health))
68 .route("/api/version", get(get_version))
70 .route("/api/session", get(list_sessions).post(create_session))
71 .route("/api/session/{id}", get(get_session))
72 .route("/api/session/{id}/prompt", post(prompt_session))
73 .route("/api/config", get(get_config))
74 .route("/api/provider", get(list_providers))
75 .route("/api/agent", get(list_agents))
76 .route("/v1/cognition/start", post(start_cognition))
78 .route("/v1/cognition/stop", post(stop_cognition))
79 .route("/v1/cognition/status", get(get_cognition_status))
80 .route("/v1/cognition/stream", get(stream_cognition))
81 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
82 .route("/v1/swarm/personas", post(create_persona))
84 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
85 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
86 .route("/v1/swarm/lineage", get(get_swarm_lineage))
87 .route("/v1/cognition/beliefs", get(list_beliefs))
89 .route("/v1/cognition/beliefs/{id}", get(get_belief))
90 .route("/v1/cognition/attention", get(list_attention))
91 .route("/v1/cognition/proposals", get(list_proposals))
92 .route(
93 "/v1/cognition/proposals/{id}/approve",
94 post(approve_proposal),
95 )
96 .route("/v1/cognition/receipts", get(list_receipts))
97 .route("/v1/cognition/workspace", get(get_workspace))
98 .with_state(state)
99 .nest("/a2a", a2a_router)
101 .layer(
103 CorsLayer::new()
104 .allow_origin(Any)
105 .allow_methods(Any)
106 .allow_headers(Any),
107 )
108 .layer(TraceLayer::new_for_http());
109
110 let listener = tokio::net::TcpListener::bind(&addr).await?;
111 tracing::info!("Server listening on http://{}", addr);
112
113 axum::serve(listener, app).await?;
114
115 Ok(())
116}
117
118async fn health() -> &'static str {
120 "ok"
121}
122
123#[derive(Serialize)]
125struct VersionInfo {
126 version: &'static str,
127 name: &'static str,
128}
129
130async fn get_version() -> Json<VersionInfo> {
131 Json(VersionInfo {
132 version: env!("CARGO_PKG_VERSION"),
133 name: env!("CARGO_PKG_NAME"),
134 })
135}
136
137#[derive(Deserialize)]
139struct ListSessionsQuery {
140 limit: Option<usize>,
141}
142
143async fn list_sessions(
144 Query(query): Query<ListSessionsQuery>,
145) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
146 let sessions = crate::session::list_sessions()
147 .await
148 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
149
150 let limit = query.limit.unwrap_or(50);
151 Ok(Json(sessions.into_iter().take(limit).collect()))
152}
153
154#[derive(Deserialize)]
156struct CreateSessionRequest {
157 title: Option<String>,
158 agent: Option<String>,
159}
160
161async fn create_session(
162 Json(req): Json<CreateSessionRequest>,
163) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
164 let mut session = crate::session::Session::new()
165 .await
166 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
167
168 session.title = req.title;
169 if let Some(agent) = req.agent {
170 session.agent = agent;
171 }
172
173 session
174 .save()
175 .await
176 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
177
178 Ok(Json(session))
179}
180
181async fn get_session(
183 axum::extract::Path(id): axum::extract::Path<String>,
184) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
185 let session = crate::session::Session::load(&id)
186 .await
187 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
188
189 Ok(Json(session))
190}
191
192#[derive(Deserialize)]
194struct PromptRequest {
195 message: String,
196}
197
198async fn prompt_session(
199 axum::extract::Path(id): axum::extract::Path<String>,
200 Json(req): Json<PromptRequest>,
201) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
202 if req.message.trim().is_empty() {
204 return Err((
205 StatusCode::BAD_REQUEST,
206 "Message cannot be empty".to_string(),
207 ));
208 }
209
210 tracing::info!(
212 session_id = %id,
213 message_len = req.message.len(),
214 "Received prompt request"
215 );
216
217 Err((
219 StatusCode::NOT_IMPLEMENTED,
220 "Prompt execution not yet implemented".to_string(),
221 ))
222}
223
224async fn get_config(State(state): State<AppState>) -> Json<Config> {
226 Json((*state.config).clone())
227}
228
229async fn list_providers() -> Json<Vec<String>> {
231 Json(vec![
232 "openai".to_string(),
233 "anthropic".to_string(),
234 "google".to_string(),
235 ])
236}
237
238async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
240 let registry = crate::agent::AgentRegistry::with_builtins();
241 Json(registry.list().into_iter().cloned().collect())
242}
243
244async fn start_cognition(
245 State(state): State<AppState>,
246 payload: Option<Json<StartCognitionRequest>>,
247) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
248 state
249 .cognition
250 .start(payload.map(|Json(body)| body))
251 .await
252 .map(Json)
253 .map_err(internal_error)
254}
255
256async fn stop_cognition(
257 State(state): State<AppState>,
258 payload: Option<Json<StopCognitionRequest>>,
259) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
260 let reason = payload.and_then(|Json(body)| body.reason);
261 state
262 .cognition
263 .stop(reason)
264 .await
265 .map(Json)
266 .map_err(internal_error)
267}
268
269async fn get_cognition_status(
270 State(state): State<AppState>,
271) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
272 Ok(Json(state.cognition.status().await))
273}
274
275async fn stream_cognition(
276 State(state): State<AppState>,
277) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
278 let rx = state.cognition.subscribe_events();
279
280 let event_stream = stream::unfold(rx, |mut rx| async move {
281 match rx.recv().await {
282 Ok(event) => {
283 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
284 let sse_event = Event::default().event("cognition").data(payload);
285 Some((Ok(sse_event), rx))
286 }
287 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
288 let lag_event = Event::default()
289 .event("lag")
290 .data(format!("skipped {}", skipped));
291 Some((Ok(lag_event), rx))
292 }
293 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
294 }
295 });
296
297 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
298}
299
300async fn get_latest_snapshot(
301 State(state): State<AppState>,
302) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
303 match state.cognition.latest_snapshot().await {
304 Some(snapshot) => Ok(Json(snapshot)),
305 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
306 }
307}
308
309async fn create_persona(
310 State(state): State<AppState>,
311 Json(req): Json<CreatePersonaRequest>,
312) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
313 state
314 .cognition
315 .create_persona(req)
316 .await
317 .map(Json)
318 .map_err(internal_error)
319}
320
321async fn spawn_persona(
322 State(state): State<AppState>,
323 Path(id): Path<String>,
324 Json(req): Json<SpawnPersonaRequest>,
325) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
326 state
327 .cognition
328 .spawn_child(&id, req)
329 .await
330 .map(Json)
331 .map_err(internal_error)
332}
333
334async fn reap_persona(
335 State(state): State<AppState>,
336 Path(id): Path<String>,
337 payload: Option<Json<ReapPersonaRequest>>,
338) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
339 let req = payload
340 .map(|Json(body)| body)
341 .unwrap_or(ReapPersonaRequest {
342 cascade: Some(false),
343 reason: None,
344 });
345
346 state
347 .cognition
348 .reap_persona(&id, req)
349 .await
350 .map(Json)
351 .map_err(internal_error)
352}
353
354async fn get_swarm_lineage(
355 State(state): State<AppState>,
356) -> Result<Json<LineageGraph>, (StatusCode, String)> {
357 Ok(Json(state.cognition.lineage_graph().await))
358}
359
360#[derive(Deserialize)]
363struct BeliefFilter {
364 status: Option<String>,
365 persona: Option<String>,
366}
367
368async fn list_beliefs(
369 State(state): State<AppState>,
370 Query(filter): Query<BeliefFilter>,
371) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
372 let beliefs = state.cognition.get_beliefs().await;
373 let mut result: Vec<Belief> = beliefs.into_values().collect();
374
375 if let Some(status) = &filter.status {
376 result.retain(|b| {
377 let s = serde_json::to_string(&b.status).unwrap_or_default();
378 s.contains(status)
379 });
380 }
381 if let Some(persona) = &filter.persona {
382 result.retain(|b| &b.asserted_by == persona);
383 }
384
385 result.sort_by(|a, b| {
386 b.confidence
387 .partial_cmp(&a.confidence)
388 .unwrap_or(std::cmp::Ordering::Equal)
389 });
390 Ok(Json(result))
391}
392
393async fn get_belief(
394 State(state): State<AppState>,
395 Path(id): Path<String>,
396) -> Result<Json<Belief>, (StatusCode, String)> {
397 match state.cognition.get_belief(&id).await {
398 Some(belief) => Ok(Json(belief)),
399 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
400 }
401}
402
403async fn list_attention(
404 State(state): State<AppState>,
405) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
406 Ok(Json(state.cognition.get_attention_queue().await))
407}
408
409async fn list_proposals(
410 State(state): State<AppState>,
411) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
412 let proposals = state.cognition.get_proposals().await;
413 let mut result: Vec<Proposal> = proposals.into_values().collect();
414 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
415 Ok(Json(result))
416}
417
418async fn approve_proposal(
419 State(state): State<AppState>,
420 Path(id): Path<String>,
421) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
422 state
423 .cognition
424 .approve_proposal(&id)
425 .await
426 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
427 .map_err(internal_error)
428}
429
430async fn list_receipts(
431 State(state): State<AppState>,
432) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
433 Ok(Json(state.cognition.get_receipts().await))
434}
435
436async fn get_workspace(
437 State(state): State<AppState>,
438) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
439 Ok(Json(state.cognition.get_workspace().await))
440}
441
442fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
443 let message = error.to_string();
444 if message.contains("not found") {
445 return (StatusCode::NOT_FOUND, message);
446 }
447 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
448 return (StatusCode::BAD_REQUEST, message);
449 }
450 (StatusCode::INTERNAL_SERVER_ERROR, message)
451}
452
453fn env_bool(name: &str, default: bool) -> bool {
454 std::env::var(name)
455 .ok()
456 .and_then(|v| match v.to_ascii_lowercase().as_str() {
457 "1" | "true" | "yes" | "on" => Some(true),
458 "0" | "false" | "no" | "off" => Some(false),
459 _ => None,
460 })
461 .unwrap_or(default)
462}