1use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8 CognitionRuntime, CognitionStatus, CreatePersonaRequest, LineageGraph, MemorySnapshot,
9 ReapPersonaRequest, ReapPersonaResponse, SpawnPersonaRequest, StartCognitionRequest,
10 StopCognitionRequest,
11};
12use crate::config::Config;
13use anyhow::Result;
14use axum::{
15 Router,
16 extract::Path,
17 extract::{Query, State},
18 http::StatusCode,
19 response::Json,
20 response::sse::{Event, KeepAlive, Sse},
21 routing::{get, post},
22};
23use futures::stream;
24use serde::{Deserialize, Serialize};
25use std::convert::Infallible;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tower_http::trace::TraceLayer;
29
30#[derive(Clone)]
32pub struct AppState {
33 pub config: Arc<Config>,
34 pub cognition: Arc<CognitionRuntime>,
35}
36
37pub async fn serve(args: ServeArgs) -> Result<()> {
39 let config = Config::load().await?;
40 let cognition = Arc::new(CognitionRuntime::new_from_env());
41
42 if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
43 if let Err(error) = cognition.start(None).await {
44 tracing::warn!(%error, "Failed to auto-start cognition loop");
45 } else {
46 tracing::info!("Perpetual cognition auto-started");
47 }
48 }
49
50 let state = AppState {
51 config: Arc::new(config),
52 cognition,
53 };
54
55 let addr = format!("{}:{}", args.hostname, args.port);
56
57 let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
59 let a2a_server = a2a::server::A2AServer::new(agent_card);
60
61 let a2a_router = a2a_server.router();
63
64 let app = Router::new()
65 .route("/health", get(health))
67 .route("/api/version", get(get_version))
69 .route("/api/session", get(list_sessions).post(create_session))
70 .route("/api/session/{id}", get(get_session))
71 .route("/api/session/{id}/prompt", post(prompt_session))
72 .route("/api/config", get(get_config))
73 .route("/api/provider", get(list_providers))
74 .route("/api/agent", get(list_agents))
75 .route("/v1/cognition/start", post(start_cognition))
77 .route("/v1/cognition/stop", post(stop_cognition))
78 .route("/v1/cognition/status", get(get_cognition_status))
79 .route("/v1/cognition/stream", get(stream_cognition))
80 .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
81 .route("/v1/swarm/personas", post(create_persona))
83 .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
84 .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
85 .route("/v1/swarm/lineage", get(get_swarm_lineage))
86 .with_state(state)
87 .nest("/a2a", a2a_router)
89 .layer(
91 CorsLayer::new()
92 .allow_origin(Any)
93 .allow_methods(Any)
94 .allow_headers(Any),
95 )
96 .layer(TraceLayer::new_for_http());
97
98 let listener = tokio::net::TcpListener::bind(&addr).await?;
99 tracing::info!("Server listening on http://{}", addr);
100
101 axum::serve(listener, app).await?;
102
103 Ok(())
104}
105
106async fn health() -> &'static str {
108 "ok"
109}
110
111#[derive(Serialize)]
113struct VersionInfo {
114 version: &'static str,
115 name: &'static str,
116}
117
118async fn get_version() -> Json<VersionInfo> {
119 Json(VersionInfo {
120 version: env!("CARGO_PKG_VERSION"),
121 name: env!("CARGO_PKG_NAME"),
122 })
123}
124
125#[derive(Deserialize)]
127struct ListSessionsQuery {
128 limit: Option<usize>,
129}
130
131async fn list_sessions(
132 Query(query): Query<ListSessionsQuery>,
133) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
134 let sessions = crate::session::list_sessions()
135 .await
136 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
137
138 let limit = query.limit.unwrap_or(50);
139 Ok(Json(sessions.into_iter().take(limit).collect()))
140}
141
142#[derive(Deserialize)]
144struct CreateSessionRequest {
145 title: Option<String>,
146 agent: Option<String>,
147}
148
149async fn create_session(
150 Json(req): Json<CreateSessionRequest>,
151) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
152 let mut session = crate::session::Session::new()
153 .await
154 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
155
156 session.title = req.title;
157 if let Some(agent) = req.agent {
158 session.agent = agent;
159 }
160
161 session
162 .save()
163 .await
164 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
165
166 Ok(Json(session))
167}
168
169async fn get_session(
171 axum::extract::Path(id): axum::extract::Path<String>,
172) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
173 let session = crate::session::Session::load(&id)
174 .await
175 .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
176
177 Ok(Json(session))
178}
179
180#[derive(Deserialize)]
182struct PromptRequest {
183 message: String,
184}
185
186async fn prompt_session(
187 axum::extract::Path(id): axum::extract::Path<String>,
188 Json(req): Json<PromptRequest>,
189) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
190 if req.message.trim().is_empty() {
192 return Err((
193 StatusCode::BAD_REQUEST,
194 "Message cannot be empty".to_string(),
195 ));
196 }
197
198 tracing::info!(
200 session_id = %id,
201 message_len = req.message.len(),
202 "Received prompt request"
203 );
204
205 Err((
207 StatusCode::NOT_IMPLEMENTED,
208 "Prompt execution not yet implemented".to_string(),
209 ))
210}
211
212async fn get_config(State(state): State<AppState>) -> Json<Config> {
214 Json((*state.config).clone())
215}
216
217async fn list_providers() -> Json<Vec<String>> {
219 Json(vec![
220 "openai".to_string(),
221 "anthropic".to_string(),
222 "google".to_string(),
223 ])
224}
225
226async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
228 let registry = crate::agent::AgentRegistry::with_builtins();
229 Json(registry.list().into_iter().cloned().collect())
230}
231
232async fn start_cognition(
233 State(state): State<AppState>,
234 payload: Option<Json<StartCognitionRequest>>,
235) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
236 state
237 .cognition
238 .start(payload.map(|Json(body)| body))
239 .await
240 .map(Json)
241 .map_err(internal_error)
242}
243
244async fn stop_cognition(
245 State(state): State<AppState>,
246 payload: Option<Json<StopCognitionRequest>>,
247) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
248 let reason = payload.and_then(|Json(body)| body.reason);
249 state
250 .cognition
251 .stop(reason)
252 .await
253 .map(Json)
254 .map_err(internal_error)
255}
256
257async fn get_cognition_status(
258 State(state): State<AppState>,
259) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
260 Ok(Json(state.cognition.status().await))
261}
262
263async fn stream_cognition(
264 State(state): State<AppState>,
265) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
266 let rx = state.cognition.subscribe_events();
267
268 let event_stream = stream::unfold(rx, |mut rx| async move {
269 match rx.recv().await {
270 Ok(event) => {
271 let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
272 let sse_event = Event::default().event("cognition").data(payload);
273 Some((Ok(sse_event), rx))
274 }
275 Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
276 let lag_event = Event::default()
277 .event("lag")
278 .data(format!("skipped {}", skipped));
279 Some((Ok(lag_event), rx))
280 }
281 Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
282 }
283 });
284
285 Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
286}
287
288async fn get_latest_snapshot(
289 State(state): State<AppState>,
290) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
291 match state.cognition.latest_snapshot().await {
292 Some(snapshot) => Ok(Json(snapshot)),
293 None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
294 }
295}
296
297async fn create_persona(
298 State(state): State<AppState>,
299 Json(req): Json<CreatePersonaRequest>,
300) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
301 state
302 .cognition
303 .create_persona(req)
304 .await
305 .map(Json)
306 .map_err(internal_error)
307}
308
309async fn spawn_persona(
310 State(state): State<AppState>,
311 Path(id): Path<String>,
312 Json(req): Json<SpawnPersonaRequest>,
313) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
314 state
315 .cognition
316 .spawn_child(&id, req)
317 .await
318 .map(Json)
319 .map_err(internal_error)
320}
321
322async fn reap_persona(
323 State(state): State<AppState>,
324 Path(id): Path<String>,
325 payload: Option<Json<ReapPersonaRequest>>,
326) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
327 let req = payload
328 .map(|Json(body)| body)
329 .unwrap_or(ReapPersonaRequest {
330 cascade: Some(false),
331 reason: None,
332 });
333
334 state
335 .cognition
336 .reap_persona(&id, req)
337 .await
338 .map(Json)
339 .map_err(internal_error)
340}
341
342async fn get_swarm_lineage(
343 State(state): State<AppState>,
344) -> Result<Json<LineageGraph>, (StatusCode, String)> {
345 Ok(Json(state.cognition.lineage_graph().await))
346}
347
348fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
349 let message = error.to_string();
350 if message.contains("not found") {
351 return (StatusCode::NOT_FOUND, message);
352 }
353 if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
354 return (StatusCode::BAD_REQUEST, message);
355 }
356 (StatusCode::INTERNAL_SERVER_ERROR, message)
357}
358
359fn env_bool(name: &str, default: bool) -> bool {
360 std::env::var(name)
361 .ok()
362 .and_then(|v| match v.to_ascii_lowercase().as_str() {
363 "1" | "true" | "yes" | "on" => Some(true),
364 "0" | "false" | "no" | "off" => Some(false),
365 _ => None,
366 })
367 .unwrap_or(default)
368}