1use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8 AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
9 LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
10 SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
11 executor::DecisionReceipt,
12};
13use crate::config::Config;
14use anyhow::Result;
15use axum::{
16 Router,
17 extract::Path,
18 extract::{Query, State},
19 http::StatusCode,
20 response::Json,
21 response::sse::{Event, KeepAlive, Sse},
22 routing::{get, post},
23};
24use futures::stream;
25use serde::{Deserialize, Serialize};
26use std::convert::Infallible;
27use std::sync::Arc;
28use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31#[derive(Clone)]
33pub struct AppState {
34 pub config: Arc<Config>,
35 pub cognition: Arc<CognitionRuntime>,
36}
37
38pub async fn serve(args: ServeArgs) -> Result<()> {
40 let config = Config::load().await?;
41 let cognition = Arc::new(CognitionRuntime::new_from_env());
42
43 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
44 if let Err(error) = cognition.start(None).await {
45 tracing::warn!(%error, "Failed to auto-start cognition loop");
46 } else {
47 tracing::info!("Perpetual cognition auto-started");
48 }
49 }
50
51 let state = AppState {
52 config: Arc::new(config),
53 cognition,
54 };
55
56 let addr = format!("{}:{}", args.hostname, args.port);
57
58 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
60 let a2a_server = a2a::server::A2AServer::new(agent_card);
61
62 let a2a_router = a2a_server.router();
64
65 let app = Router::new()
66 .route("/health", get(health))
68 .route("/api/version", get(get_version))
70 .route("/api/session", get(list_sessions).post(create_session))
71 .route("/api/session/{id}", get(get_session))
72 .route("/api/session/{id}/prompt", post(prompt_session))
73 .route("/api/config", get(get_config))
74 .route("/api/provider", get(list_providers))
75 .route("/api/agent", get(list_agents))
76 .route("/v1/cognition/start", post(start_cognition))
78 .route("/v1/cognition/stop", post(stop_cognition))
79 .route("/v1/cognition/status", get(get_cognition_status))
80 .route("/v1/cognition/stream", get(stream_cognition))
81 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
82 .route("/v1/swarm/personas", post(create_persona))
84 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
85 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
86 .route("/v1/swarm/lineage", get(get_swarm_lineage))
87 .route("/v1/cognition/beliefs", get(list_beliefs))
89 .route("/v1/cognition/beliefs/{id}", get(get_belief))
90 .route("/v1/cognition/attention", get(list_attention))
91 .route("/v1/cognition/proposals", get(list_proposals))
92 .route(
93 "/v1/cognition/proposals/{id}/approve",
94 post(approve_proposal),
95 )
96 .route("/v1/cognition/receipts", get(list_receipts))
97 .route("/v1/cognition/workspace", get(get_workspace))
98 .with_state(state)
99 .nest("/a2a", a2a_router)
101 .layer(
103 CorsLayer::new()
105 .allow_origin(AllowOrigin::mirror_request())
106 .allow_credentials(true)
107 .allow_methods(AllowMethods::mirror_request())
108 .allow_headers(AllowHeaders::mirror_request()),
109 )
110 .layer(TraceLayer::new_for_http());
111
112 let listener = tokio::net::TcpListener::bind(&addr).await?;
113 tracing::info!("Server listening on http://{}", addr);
114
115 axum::serve(listener, app).await?;
116
117 Ok(())
118}
119
120async fn health() -> &'static str {
122 "ok"
123}
124
125#[derive(Serialize)]
127struct VersionInfo {
128 version: &'static str,
129 name: &'static str,
130}
131
132async fn get_version() -> Json<VersionInfo> {
133 Json(VersionInfo {
134 version: env!("CARGO_PKG_VERSION"),
135 name: env!("CARGO_PKG_NAME"),
136 })
137}
138
139#[derive(Deserialize)]
141struct ListSessionsQuery {
142 limit: Option<usize>,
143}
144
145async fn list_sessions(
146 Query(query): Query<ListSessionsQuery>,
147) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
148 let sessions = crate::session::list_sessions()
149 .await
150 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
151
152 let limit = query.limit.unwrap_or(50);
153 Ok(Json(sessions.into_iter().take(limit).collect()))
154}
155
156#[derive(Deserialize)]
158struct CreateSessionRequest {
159 title: Option<String>,
160 agent: Option<String>,
161}
162
163async fn create_session(
164 Json(req): Json<CreateSessionRequest>,
165) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
166 let mut session = crate::session::Session::new()
167 .await
168 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
169
170 session.title = req.title;
171 if let Some(agent) = req.agent {
172 session.agent = agent;
173 }
174
175 session
176 .save()
177 .await
178 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
179
180 Ok(Json(session))
181}
182
183async fn get_session(
185 axum::extract::Path(id): axum::extract::Path<String>,
186) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
187 let session = crate::session::Session::load(&id)
188 .await
189 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
190
191 Ok(Json(session))
192}
193
194#[derive(Deserialize)]
196struct PromptRequest {
197 message: String,
198}
199
200async fn prompt_session(
201 axum::extract::Path(id): axum::extract::Path<String>,
202 Json(req): Json<PromptRequest>,
203) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
204 if req.message.trim().is_empty() {
206 return Err((
207 StatusCode::BAD_REQUEST,
208 "Message cannot be empty".to_string(),
209 ));
210 }
211
212 tracing::info!(
214 session_id = %id,
215 message_len = req.message.len(),
216 "Received prompt request"
217 );
218
219 Err((
221 StatusCode::NOT_IMPLEMENTED,
222 "Prompt execution not yet implemented".to_string(),
223 ))
224}
225
226async fn get_config(State(state): State<AppState>) -> Json<Config> {
228 Json((*state.config).clone())
229}
230
231async fn list_providers() -> Json<Vec<String>> {
233 Json(vec![
234 "openai".to_string(),
235 "anthropic".to_string(),
236 "google".to_string(),
237 ])
238}
239
240async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
242 let registry = crate::agent::AgentRegistry::with_builtins();
243 Json(registry.list().into_iter().cloned().collect())
244}
245
246async fn start_cognition(
247 State(state): State<AppState>,
248 payload: Option<Json<StartCognitionRequest>>,
249) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
250 state
251 .cognition
252 .start(payload.map(|Json(body)| body))
253 .await
254 .map(Json)
255 .map_err(internal_error)
256}
257
258async fn stop_cognition(
259 State(state): State<AppState>,
260 payload: Option<Json<StopCognitionRequest>>,
261) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
262 let reason = payload.and_then(|Json(body)| body.reason);
263 state
264 .cognition
265 .stop(reason)
266 .await
267 .map(Json)
268 .map_err(internal_error)
269}
270
271async fn get_cognition_status(
272 State(state): State<AppState>,
273) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
274 Ok(Json(state.cognition.status().await))
275}
276
277async fn stream_cognition(
278 State(state): State<AppState>,
279) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
280 let rx = state.cognition.subscribe_events();
281
282 let event_stream = stream::unfold(rx, |mut rx| async move {
283 match rx.recv().await {
284 Ok(event) => {
285 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
286 let sse_event = Event::default().event("cognition").data(payload);
287 Some((Ok(sse_event), rx))
288 }
289 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
290 let lag_event = Event::default()
291 .event("lag")
292 .data(format!("skipped {}", skipped));
293 Some((Ok(lag_event), rx))
294 }
295 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
296 }
297 });
298
299 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
300}
301
302async fn get_latest_snapshot(
303 State(state): State<AppState>,
304) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
305 match state.cognition.latest_snapshot().await {
306 Some(snapshot) => Ok(Json(snapshot)),
307 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
308 }
309}
310
311async fn create_persona(
312 State(state): State<AppState>,
313 Json(req): Json<CreatePersonaRequest>,
314) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
315 state
316 .cognition
317 .create_persona(req)
318 .await
319 .map(Json)
320 .map_err(internal_error)
321}
322
323async fn spawn_persona(
324 State(state): State<AppState>,
325 Path(id): Path<String>,
326 Json(req): Json<SpawnPersonaRequest>,
327) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
328 state
329 .cognition
330 .spawn_child(&id, req)
331 .await
332 .map(Json)
333 .map_err(internal_error)
334}
335
336async fn reap_persona(
337 State(state): State<AppState>,
338 Path(id): Path<String>,
339 payload: Option<Json<ReapPersonaRequest>>,
340) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
341 let req = payload
342 .map(|Json(body)| body)
343 .unwrap_or(ReapPersonaRequest {
344 cascade: Some(false),
345 reason: None,
346 });
347
348 state
349 .cognition
350 .reap_persona(&id, req)
351 .await
352 .map(Json)
353 .map_err(internal_error)
354}
355
356async fn get_swarm_lineage(
357 State(state): State<AppState>,
358) -> Result<Json<LineageGraph>, (StatusCode, String)> {
359 Ok(Json(state.cognition.lineage_graph().await))
360}
361
362#[derive(Deserialize)]
365struct BeliefFilter {
366 status: Option<String>,
367 persona: Option<String>,
368}
369
370async fn list_beliefs(
371 State(state): State<AppState>,
372 Query(filter): Query<BeliefFilter>,
373) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
374 let beliefs = state.cognition.get_beliefs().await;
375 let mut result: Vec<Belief> = beliefs.into_values().collect();
376
377 if let Some(status) = &filter.status {
378 result.retain(|b| {
379 let s = serde_json::to_string(&b.status).unwrap_or_default();
380 s.contains(status)
381 });
382 }
383 if let Some(persona) = &filter.persona {
384 result.retain(|b| &b.asserted_by == persona);
385 }
386
387 result.sort_by(|a, b| {
388 b.confidence
389 .partial_cmp(&a.confidence)
390 .unwrap_or(std::cmp::Ordering::Equal)
391 });
392 Ok(Json(result))
393}
394
395async fn get_belief(
396 State(state): State<AppState>,
397 Path(id): Path<String>,
398) -> Result<Json<Belief>, (StatusCode, String)> {
399 match state.cognition.get_belief(&id).await {
400 Some(belief) => Ok(Json(belief)),
401 None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
402 }
403}
404
405async fn list_attention(
406 State(state): State<AppState>,
407) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
408 Ok(Json(state.cognition.get_attention_queue().await))
409}
410
411async fn list_proposals(
412 State(state): State<AppState>,
413) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
414 let proposals = state.cognition.get_proposals().await;
415 let mut result: Vec<Proposal> = proposals.into_values().collect();
416 result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
417 Ok(Json(result))
418}
419
420async fn approve_proposal(
421 State(state): State<AppState>,
422 Path(id): Path<String>,
423) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
424 state
425 .cognition
426 .approve_proposal(&id)
427 .await
428 .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
429 .map_err(internal_error)
430}
431
432async fn list_receipts(
433 State(state): State<AppState>,
434) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
435 Ok(Json(state.cognition.get_receipts().await))
436}
437
438async fn get_workspace(
439 State(state): State<AppState>,
440) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
441 Ok(Json(state.cognition.get_workspace().await))
442}
443
444fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
445 let message = error.to_string();
446 if message.contains("not found") {
447 return (StatusCode::NOT_FOUND, message);
448 }
449 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
450 return (StatusCode::BAD_REQUEST, message);
451 }
452 (StatusCode::INTERNAL_SERVER_ERROR, message)
453}
454
455fn env_bool(name: &str, default: bool) -> bool {
456 std::env::var(name)
457 .ok()
458 .and_then(|v| match v.to_ascii_lowercase().as_str() {
459 "1" | "true" | "yes" | "on" => Some(true),
460 "0" | "false" | "no" | "off" => Some(false),
461 _ => None,
462 })
463 .unwrap_or(default)
464}