Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
9    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
10    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
11    executor::DecisionReceipt,
12};
13use crate::config::Config;
14use anyhow::Result;
15use axum::{
16    Router,
17    extract::Path,
18    extract::{Query, State},
19    http::StatusCode,
20    response::Json,
21    response::sse::{Event, KeepAlive, Sse},
22    routing::{get, post},
23};
24use futures::stream;
25use serde::{Deserialize, Serialize};
26use std::convert::Infallible;
27use std::sync::Arc;
28use tower_http::cors::{Any, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31/// Server state shared across handlers
32#[derive(Clone)]
33pub struct AppState {
34    pub config: Arc<Config>,
35    pub cognition: Arc<CognitionRuntime>,
36}
37
38/// Start the HTTP server
39pub async fn serve(args: ServeArgs) -> Result<()> {
40    let config = Config::load().await?;
41    let cognition = Arc::new(CognitionRuntime::new_from_env());
42
43    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
44        if let Err(error) = cognition.start(None).await {
45            tracing::warn!(%error, "Failed to auto-start cognition loop");
46        } else {
47            tracing::info!("Perpetual cognition auto-started");
48        }
49    }
50
51    let state = AppState {
52        config: Arc::new(config),
53        cognition,
54    };
55
56    let addr = format!("{}:{}", args.hostname, args.port);
57
58    // Build the agent card
59    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
60    let a2a_server = a2a::server::A2AServer::new(agent_card);
61
62    // Build A2A router separately
63    let a2a_router = a2a_server.router();
64
65    let app = Router::new()
66        // Health check
67        .route("/health", get(health))
68        // API routes
69        .route("/api/version", get(get_version))
70        .route("/api/session", get(list_sessions).post(create_session))
71        .route("/api/session/{id}", get(get_session))
72        .route("/api/session/{id}/prompt", post(prompt_session))
73        .route("/api/config", get(get_config))
74        .route("/api/provider", get(list_providers))
75        .route("/api/agent", get(list_agents))
76        // Perpetual cognition APIs
77        .route("/v1/cognition/start", post(start_cognition))
78        .route("/v1/cognition/stop", post(stop_cognition))
79        .route("/v1/cognition/status", get(get_cognition_status))
80        .route("/v1/cognition/stream", get(stream_cognition))
81        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
82        // Swarm persona lifecycle APIs
83        .route("/v1/swarm/personas", post(create_persona))
84        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
85        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
86        .route("/v1/swarm/lineage", get(get_swarm_lineage))
87        // Belief, attention, governance, workspace APIs
88        .route("/v1/cognition/beliefs", get(list_beliefs))
89        .route("/v1/cognition/beliefs/{id}", get(get_belief))
90        .route("/v1/cognition/attention", get(list_attention))
91        .route("/v1/cognition/proposals", get(list_proposals))
92        .route(
93            "/v1/cognition/proposals/{id}/approve",
94            post(approve_proposal),
95        )
96        .route("/v1/cognition/receipts", get(list_receipts))
97        .route("/v1/cognition/workspace", get(get_workspace))
98        .with_state(state)
99        // A2A routes (nested to work with different state type)
100        .nest("/a2a", a2a_router)
101        // Middleware
102        .layer(
103            CorsLayer::new()
104                .allow_origin(Any)
105                .allow_methods(Any)
106                .allow_headers(Any),
107        )
108        .layer(TraceLayer::new_for_http());
109
110    let listener = tokio::net::TcpListener::bind(&addr).await?;
111    tracing::info!("Server listening on http://{}", addr);
112
113    axum::serve(listener, app).await?;
114
115    Ok(())
116}
117
118/// Health check response
119async fn health() -> &'static str {
120    "ok"
121}
122
123/// Version info
124#[derive(Serialize)]
125struct VersionInfo {
126    version: &'static str,
127    name: &'static str,
128}
129
130async fn get_version() -> Json<VersionInfo> {
131    Json(VersionInfo {
132        version: env!("CARGO_PKG_VERSION"),
133        name: env!("CARGO_PKG_NAME"),
134    })
135}
136
137/// List sessions
138#[derive(Deserialize)]
139struct ListSessionsQuery {
140    limit: Option<usize>,
141}
142
143async fn list_sessions(
144    Query(query): Query<ListSessionsQuery>,
145) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
146    let sessions = crate::session::list_sessions()
147        .await
148        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
149
150    let limit = query.limit.unwrap_or(50);
151    Ok(Json(sessions.into_iter().take(limit).collect()))
152}
153
154/// Create a new session
155#[derive(Deserialize)]
156struct CreateSessionRequest {
157    title: Option<String>,
158    agent: Option<String>,
159}
160
161async fn create_session(
162    Json(req): Json<CreateSessionRequest>,
163) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
164    let mut session = crate::session::Session::new()
165        .await
166        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
167
168    session.title = req.title;
169    if let Some(agent) = req.agent {
170        session.agent = agent;
171    }
172
173    session
174        .save()
175        .await
176        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
177
178    Ok(Json(session))
179}
180
181/// Get a session by ID
182async fn get_session(
183    axum::extract::Path(id): axum::extract::Path<String>,
184) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
185    let session = crate::session::Session::load(&id)
186        .await
187        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
188
189    Ok(Json(session))
190}
191
192/// Prompt a session
193#[derive(Deserialize)]
194struct PromptRequest {
195    message: String,
196}
197
198async fn prompt_session(
199    axum::extract::Path(id): axum::extract::Path<String>,
200    Json(req): Json<PromptRequest>,
201) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
202    // Validate the message is not empty
203    if req.message.trim().is_empty() {
204        return Err((
205            StatusCode::BAD_REQUEST,
206            "Message cannot be empty".to_string(),
207        ));
208    }
209
210    // Log the prompt request (uses the message field)
211    tracing::info!(
212        session_id = %id,
213        message_len = req.message.len(),
214        "Received prompt request"
215    );
216
217    // TODO: Implement actual prompting
218    Err((
219        StatusCode::NOT_IMPLEMENTED,
220        "Prompt execution not yet implemented".to_string(),
221    ))
222}
223
224/// Get configuration
225async fn get_config(State(state): State<AppState>) -> Json<Config> {
226    Json((*state.config).clone())
227}
228
229/// List providers
230async fn list_providers() -> Json<Vec<String>> {
231    Json(vec![
232        "openai".to_string(),
233        "anthropic".to_string(),
234        "google".to_string(),
235    ])
236}
237
238/// List agents
239async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
240    let registry = crate::agent::AgentRegistry::with_builtins();
241    Json(registry.list().into_iter().cloned().collect())
242}
243
244async fn start_cognition(
245    State(state): State<AppState>,
246    payload: Option<Json<StartCognitionRequest>>,
247) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
248    state
249        .cognition
250        .start(payload.map(|Json(body)| body))
251        .await
252        .map(Json)
253        .map_err(internal_error)
254}
255
256async fn stop_cognition(
257    State(state): State<AppState>,
258    payload: Option<Json<StopCognitionRequest>>,
259) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
260    let reason = payload.and_then(|Json(body)| body.reason);
261    state
262        .cognition
263        .stop(reason)
264        .await
265        .map(Json)
266        .map_err(internal_error)
267}
268
269async fn get_cognition_status(
270    State(state): State<AppState>,
271) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
272    Ok(Json(state.cognition.status().await))
273}
274
275async fn stream_cognition(
276    State(state): State<AppState>,
277) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
278    let rx = state.cognition.subscribe_events();
279
280    let event_stream = stream::unfold(rx, |mut rx| async move {
281        match rx.recv().await {
282            Ok(event) => {
283                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
284                let sse_event = Event::default().event("cognition").data(payload);
285                Some((Ok(sse_event), rx))
286            }
287            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
288                let lag_event = Event::default()
289                    .event("lag")
290                    .data(format!("skipped {}", skipped));
291                Some((Ok(lag_event), rx))
292            }
293            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
294        }
295    });
296
297    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
298}
299
300async fn get_latest_snapshot(
301    State(state): State<AppState>,
302) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
303    match state.cognition.latest_snapshot().await {
304        Some(snapshot) => Ok(Json(snapshot)),
305        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
306    }
307}
308
309async fn create_persona(
310    State(state): State<AppState>,
311    Json(req): Json<CreatePersonaRequest>,
312) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
313    state
314        .cognition
315        .create_persona(req)
316        .await
317        .map(Json)
318        .map_err(internal_error)
319}
320
321async fn spawn_persona(
322    State(state): State<AppState>,
323    Path(id): Path<String>,
324    Json(req): Json<SpawnPersonaRequest>,
325) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
326    state
327        .cognition
328        .spawn_child(&id, req)
329        .await
330        .map(Json)
331        .map_err(internal_error)
332}
333
334async fn reap_persona(
335    State(state): State<AppState>,
336    Path(id): Path<String>,
337    payload: Option<Json<ReapPersonaRequest>>,
338) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
339    let req = payload
340        .map(|Json(body)| body)
341        .unwrap_or(ReapPersonaRequest {
342            cascade: Some(false),
343            reason: None,
344        });
345
346    state
347        .cognition
348        .reap_persona(&id, req)
349        .await
350        .map(Json)
351        .map_err(internal_error)
352}
353
354async fn get_swarm_lineage(
355    State(state): State<AppState>,
356) -> Result<Json<LineageGraph>, (StatusCode, String)> {
357    Ok(Json(state.cognition.lineage_graph().await))
358}
359
360// ── Belief, Attention, Governance, Workspace handlers ──
361
362#[derive(Deserialize)]
363struct BeliefFilter {
364    status: Option<String>,
365    persona: Option<String>,
366}
367
368async fn list_beliefs(
369    State(state): State<AppState>,
370    Query(filter): Query<BeliefFilter>,
371) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
372    let beliefs = state.cognition.get_beliefs().await;
373    let mut result: Vec<Belief> = beliefs.into_values().collect();
374
375    if let Some(status) = &filter.status {
376        result.retain(|b| {
377            let s = serde_json::to_string(&b.status).unwrap_or_default();
378            s.contains(status)
379        });
380    }
381    if let Some(persona) = &filter.persona {
382        result.retain(|b| &b.asserted_by == persona);
383    }
384
385    result.sort_by(|a, b| {
386        b.confidence
387            .partial_cmp(&a.confidence)
388            .unwrap_or(std::cmp::Ordering::Equal)
389    });
390    Ok(Json(result))
391}
392
393async fn get_belief(
394    State(state): State<AppState>,
395    Path(id): Path<String>,
396) -> Result<Json<Belief>, (StatusCode, String)> {
397    match state.cognition.get_belief(&id).await {
398        Some(belief) => Ok(Json(belief)),
399        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
400    }
401}
402
403async fn list_attention(
404    State(state): State<AppState>,
405) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
406    Ok(Json(state.cognition.get_attention_queue().await))
407}
408
409async fn list_proposals(
410    State(state): State<AppState>,
411) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
412    let proposals = state.cognition.get_proposals().await;
413    let mut result: Vec<Proposal> = proposals.into_values().collect();
414    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
415    Ok(Json(result))
416}
417
418async fn approve_proposal(
419    State(state): State<AppState>,
420    Path(id): Path<String>,
421) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
422    state
423        .cognition
424        .approve_proposal(&id)
425        .await
426        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
427        .map_err(internal_error)
428}
429
430async fn list_receipts(
431    State(state): State<AppState>,
432) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
433    Ok(Json(state.cognition.get_receipts().await))
434}
435
436async fn get_workspace(
437    State(state): State<AppState>,
438) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
439    Ok(Json(state.cognition.get_workspace().await))
440}
441
442fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
443    let message = error.to_string();
444    if message.contains("not found") {
445        return (StatusCode::NOT_FOUND, message);
446    }
447    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
448        return (StatusCode::BAD_REQUEST, message);
449    }
450    (StatusCode::INTERNAL_SERVER_ERROR, message)
451}
452
453fn env_bool(name: &str, default: bool) -> bool {
454    std::env::var(name)
455        .ok()
456        .and_then(|v| match v.to_ascii_lowercase().as_str() {
457            "1" | "true" | "yes" | "on" => Some(true),
458            "0" | "false" | "no" | "off" => Some(false),
459            _ => None,
460        })
461        .unwrap_or(default)
462}