Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8    AttentionItem, CognitionRuntime, CognitionStatus, CreatePersonaRequest, GlobalWorkspace,
9    LineageGraph, MemorySnapshot, Proposal, ReapPersonaRequest, ReapPersonaResponse,
10    SpawnPersonaRequest, StartCognitionRequest, StopCognitionRequest, beliefs::Belief,
11    executor::DecisionReceipt,
12};
13use crate::config::Config;
14use anyhow::Result;
15use axum::{
16    Router,
17    extract::Path,
18    extract::{Query, State},
19    http::StatusCode,
20    response::Json,
21    response::sse::{Event, KeepAlive, Sse},
22    routing::{get, post},
23};
24use futures::stream;
25use serde::{Deserialize, Serialize};
26use std::convert::Infallible;
27use std::sync::Arc;
28use tower_http::cors::{AllowHeaders, AllowMethods, AllowOrigin, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31/// Server state shared across handlers
32#[derive(Clone)]
33pub struct AppState {
34    pub config: Arc<Config>,
35    pub cognition: Arc<CognitionRuntime>,
36}
37
38/// Start the HTTP server
39pub async fn serve(args: ServeArgs) -> Result<()> {
40    let config = Config::load().await?;
41    let cognition = Arc::new(CognitionRuntime::new_from_env());
42
43    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
44        if let Err(error) = cognition.start(None).await {
45            tracing::warn!(%error, "Failed to auto-start cognition loop");
46        } else {
47            tracing::info!("Perpetual cognition auto-started");
48        }
49    }
50
51    let state = AppState {
52        config: Arc::new(config),
53        cognition,
54    };
55
56    let addr = format!("{}:{}", args.hostname, args.port);
57
58    // Build the agent card
59    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
60    let a2a_server = a2a::server::A2AServer::new(agent_card);
61
62    // Build A2A router separately
63    let a2a_router = a2a_server.router();
64
65    let app = Router::new()
66        // Health check
67        .route("/health", get(health))
68        // API routes
69        .route("/api/version", get(get_version))
70        .route("/api/session", get(list_sessions).post(create_session))
71        .route("/api/session/{id}", get(get_session))
72        .route("/api/session/{id}/prompt", post(prompt_session))
73        .route("/api/config", get(get_config))
74        .route("/api/provider", get(list_providers))
75        .route("/api/agent", get(list_agents))
76        // Perpetual cognition APIs
77        .route("/v1/cognition/start", post(start_cognition))
78        .route("/v1/cognition/stop", post(stop_cognition))
79        .route("/v1/cognition/status", get(get_cognition_status))
80        .route("/v1/cognition/stream", get(stream_cognition))
81        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
82        // Swarm persona lifecycle APIs
83        .route("/v1/swarm/personas", post(create_persona))
84        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
85        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
86        .route("/v1/swarm/lineage", get(get_swarm_lineage))
87        // Belief, attention, governance, workspace APIs
88        .route("/v1/cognition/beliefs", get(list_beliefs))
89        .route("/v1/cognition/beliefs/{id}", get(get_belief))
90        .route("/v1/cognition/attention", get(list_attention))
91        .route("/v1/cognition/proposals", get(list_proposals))
92        .route(
93            "/v1/cognition/proposals/{id}/approve",
94            post(approve_proposal),
95        )
96        .route("/v1/cognition/receipts", get(list_receipts))
97        .route("/v1/cognition/workspace", get(get_workspace))
98        .with_state(state)
99        // A2A routes (nested to work with different state type)
100        .nest("/a2a", a2a_router)
101        // Middleware
102        .layer(
103            // Mirror request origin so credentialed browser requests do not fail CORS.
104            CorsLayer::new()
105                .allow_origin(AllowOrigin::mirror_request())
106                .allow_credentials(true)
107                .allow_methods(AllowMethods::mirror_request())
108                .allow_headers(AllowHeaders::mirror_request()),
109        )
110        .layer(TraceLayer::new_for_http());
111
112    let listener = tokio::net::TcpListener::bind(&addr).await?;
113    tracing::info!("Server listening on http://{}", addr);
114
115    axum::serve(listener, app).await?;
116
117    Ok(())
118}
119
120/// Health check response
121async fn health() -> &'static str {
122    "ok"
123}
124
125/// Version info
126#[derive(Serialize)]
127struct VersionInfo {
128    version: &'static str,
129    name: &'static str,
130}
131
132async fn get_version() -> Json<VersionInfo> {
133    Json(VersionInfo {
134        version: env!("CARGO_PKG_VERSION"),
135        name: env!("CARGO_PKG_NAME"),
136    })
137}
138
139/// List sessions
140#[derive(Deserialize)]
141struct ListSessionsQuery {
142    limit: Option<usize>,
143}
144
145async fn list_sessions(
146    Query(query): Query<ListSessionsQuery>,
147) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
148    let sessions = crate::session::list_sessions()
149        .await
150        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
151
152    let limit = query.limit.unwrap_or(50);
153    Ok(Json(sessions.into_iter().take(limit).collect()))
154}
155
156/// Create a new session
157#[derive(Deserialize)]
158struct CreateSessionRequest {
159    title: Option<String>,
160    agent: Option<String>,
161}
162
163async fn create_session(
164    Json(req): Json<CreateSessionRequest>,
165) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
166    let mut session = crate::session::Session::new()
167        .await
168        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
169
170    session.title = req.title;
171    if let Some(agent) = req.agent {
172        session.agent = agent;
173    }
174
175    session
176        .save()
177        .await
178        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
179
180    Ok(Json(session))
181}
182
183/// Get a session by ID
184async fn get_session(
185    axum::extract::Path(id): axum::extract::Path<String>,
186) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
187    let session = crate::session::Session::load(&id)
188        .await
189        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
190
191    Ok(Json(session))
192}
193
194/// Prompt a session
195#[derive(Deserialize)]
196struct PromptRequest {
197    message: String,
198}
199
200async fn prompt_session(
201    axum::extract::Path(id): axum::extract::Path<String>,
202    Json(req): Json<PromptRequest>,
203) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
204    // Validate the message is not empty
205    if req.message.trim().is_empty() {
206        return Err((
207            StatusCode::BAD_REQUEST,
208            "Message cannot be empty".to_string(),
209        ));
210    }
211
212    // Log the prompt request (uses the message field)
213    tracing::info!(
214        session_id = %id,
215        message_len = req.message.len(),
216        "Received prompt request"
217    );
218
219    // TODO: Implement actual prompting
220    Err((
221        StatusCode::NOT_IMPLEMENTED,
222        "Prompt execution not yet implemented".to_string(),
223    ))
224}
225
226/// Get configuration
227async fn get_config(State(state): State<AppState>) -> Json<Config> {
228    Json((*state.config).clone())
229}
230
231/// List providers
232async fn list_providers() -> Json<Vec<String>> {
233    Json(vec![
234        "openai".to_string(),
235        "anthropic".to_string(),
236        "google".to_string(),
237    ])
238}
239
240/// List agents
241async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
242    let registry = crate::agent::AgentRegistry::with_builtins();
243    Json(registry.list().into_iter().cloned().collect())
244}
245
246async fn start_cognition(
247    State(state): State<AppState>,
248    payload: Option<Json<StartCognitionRequest>>,
249) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
250    state
251        .cognition
252        .start(payload.map(|Json(body)| body))
253        .await
254        .map(Json)
255        .map_err(internal_error)
256}
257
258async fn stop_cognition(
259    State(state): State<AppState>,
260    payload: Option<Json<StopCognitionRequest>>,
261) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
262    let reason = payload.and_then(|Json(body)| body.reason);
263    state
264        .cognition
265        .stop(reason)
266        .await
267        .map(Json)
268        .map_err(internal_error)
269}
270
271async fn get_cognition_status(
272    State(state): State<AppState>,
273) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
274    Ok(Json(state.cognition.status().await))
275}
276
277async fn stream_cognition(
278    State(state): State<AppState>,
279) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
280    let rx = state.cognition.subscribe_events();
281
282    let event_stream = stream::unfold(rx, |mut rx| async move {
283        match rx.recv().await {
284            Ok(event) => {
285                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
286                let sse_event = Event::default().event("cognition").data(payload);
287                Some((Ok(sse_event), rx))
288            }
289            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
290                let lag_event = Event::default()
291                    .event("lag")
292                    .data(format!("skipped {}", skipped));
293                Some((Ok(lag_event), rx))
294            }
295            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
296        }
297    });
298
299    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
300}
301
302async fn get_latest_snapshot(
303    State(state): State<AppState>,
304) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
305    match state.cognition.latest_snapshot().await {
306        Some(snapshot) => Ok(Json(snapshot)),
307        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
308    }
309}
310
311async fn create_persona(
312    State(state): State<AppState>,
313    Json(req): Json<CreatePersonaRequest>,
314) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
315    state
316        .cognition
317        .create_persona(req)
318        .await
319        .map(Json)
320        .map_err(internal_error)
321}
322
323async fn spawn_persona(
324    State(state): State<AppState>,
325    Path(id): Path<String>,
326    Json(req): Json<SpawnPersonaRequest>,
327) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
328    state
329        .cognition
330        .spawn_child(&id, req)
331        .await
332        .map(Json)
333        .map_err(internal_error)
334}
335
336async fn reap_persona(
337    State(state): State<AppState>,
338    Path(id): Path<String>,
339    payload: Option<Json<ReapPersonaRequest>>,
340) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
341    let req = payload
342        .map(|Json(body)| body)
343        .unwrap_or(ReapPersonaRequest {
344            cascade: Some(false),
345            reason: None,
346        });
347
348    state
349        .cognition
350        .reap_persona(&id, req)
351        .await
352        .map(Json)
353        .map_err(internal_error)
354}
355
356async fn get_swarm_lineage(
357    State(state): State<AppState>,
358) -> Result<Json<LineageGraph>, (StatusCode, String)> {
359    Ok(Json(state.cognition.lineage_graph().await))
360}
361
362// ── Belief, Attention, Governance, Workspace handlers ──
363
364#[derive(Deserialize)]
365struct BeliefFilter {
366    status: Option<String>,
367    persona: Option<String>,
368}
369
370async fn list_beliefs(
371    State(state): State<AppState>,
372    Query(filter): Query<BeliefFilter>,
373) -> Result<Json<Vec<Belief>>, (StatusCode, String)> {
374    let beliefs = state.cognition.get_beliefs().await;
375    let mut result: Vec<Belief> = beliefs.into_values().collect();
376
377    if let Some(status) = &filter.status {
378        result.retain(|b| {
379            let s = serde_json::to_string(&b.status).unwrap_or_default();
380            s.contains(status)
381        });
382    }
383    if let Some(persona) = &filter.persona {
384        result.retain(|b| &b.asserted_by == persona);
385    }
386
387    result.sort_by(|a, b| {
388        b.confidence
389            .partial_cmp(&a.confidence)
390            .unwrap_or(std::cmp::Ordering::Equal)
391    });
392    Ok(Json(result))
393}
394
395async fn get_belief(
396    State(state): State<AppState>,
397    Path(id): Path<String>,
398) -> Result<Json<Belief>, (StatusCode, String)> {
399    match state.cognition.get_belief(&id).await {
400        Some(belief) => Ok(Json(belief)),
401        None => Err((StatusCode::NOT_FOUND, format!("Belief not found: {}", id))),
402    }
403}
404
405async fn list_attention(
406    State(state): State<AppState>,
407) -> Result<Json<Vec<AttentionItem>>, (StatusCode, String)> {
408    Ok(Json(state.cognition.get_attention_queue().await))
409}
410
411async fn list_proposals(
412    State(state): State<AppState>,
413) -> Result<Json<Vec<Proposal>>, (StatusCode, String)> {
414    let proposals = state.cognition.get_proposals().await;
415    let mut result: Vec<Proposal> = proposals.into_values().collect();
416    result.sort_by(|a, b| b.created_at.cmp(&a.created_at));
417    Ok(Json(result))
418}
419
420async fn approve_proposal(
421    State(state): State<AppState>,
422    Path(id): Path<String>,
423) -> Result<Json<serde_json::Value>, (StatusCode, String)> {
424    state
425        .cognition
426        .approve_proposal(&id)
427        .await
428        .map(|_| Json(serde_json::json!({ "approved": true, "proposal_id": id })))
429        .map_err(internal_error)
430}
431
432async fn list_receipts(
433    State(state): State<AppState>,
434) -> Result<Json<Vec<DecisionReceipt>>, (StatusCode, String)> {
435    Ok(Json(state.cognition.get_receipts().await))
436}
437
438async fn get_workspace(
439    State(state): State<AppState>,
440) -> Result<Json<GlobalWorkspace>, (StatusCode, String)> {
441    Ok(Json(state.cognition.get_workspace().await))
442}
443
444fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
445    let message = error.to_string();
446    if message.contains("not found") {
447        return (StatusCode::NOT_FOUND, message);
448    }
449    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
450        return (StatusCode::BAD_REQUEST, message);
451    }
452    (StatusCode::INTERNAL_SERVER_ERROR, message)
453}
454
455fn env_bool(name: &str, default: bool) -> bool {
456    std::env::var(name)
457        .ok()
458        .and_then(|v| match v.to_ascii_lowercase().as_str() {
459            "1" | "true" | "yes" | "on" => Some(true),
460            "0" | "false" | "no" | "off" => Some(false),
461            _ => None,
462        })
463        .unwrap_or(default)
464}