Skip to main content

codetether_agent/server/
mod.rs

1//! HTTP Server
2//!
3//! Main API server for the CodeTether Agent
4
5use crate::a2a;
6use crate::cli::ServeArgs;
7use crate::cognition::{
8    CognitionRuntime, CognitionStatus, CreatePersonaRequest, LineageGraph, MemorySnapshot,
9    ReapPersonaRequest, ReapPersonaResponse, SpawnPersonaRequest, StartCognitionRequest,
10    StopCognitionRequest,
11};
12use crate::config::Config;
13use anyhow::Result;
14use axum::{
15    Router,
16    extract::Path,
17    extract::{Query, State},
18    http::StatusCode,
19    response::Json,
20    response::sse::{Event, KeepAlive, Sse},
21    routing::{get, post},
22};
23use futures::stream;
24use serde::{Deserialize, Serialize};
25use std::convert::Infallible;
26use std::sync::Arc;
27use tower_http::cors::{Any, CorsLayer};
28use tower_http::trace::TraceLayer;
29
30/// Server state shared across handlers
31#[derive(Clone)]
32pub struct AppState {
33    pub config: Arc<Config>,
34    pub cognition: Arc<CognitionRuntime>,
35}
36
37/// Start the HTTP server
38pub async fn serve(args: ServeArgs) -> Result<()> {
39    let config = Config::load().await?;
40    let cognition = Arc::new(CognitionRuntime::new_from_env());
41
42    if cognition.is_enabled() && env_bool("CODETETHER_COGNITION_AUTO_START", true) {
43        if let Err(error) = cognition.start(None).await {
44            tracing::warn!(%error, "Failed to auto-start cognition loop");
45        } else {
46            tracing::info!("Perpetual cognition auto-started");
47        }
48    }
49
50    let state = AppState {
51        config: Arc::new(config),
52        cognition,
53    };
54
55    let addr = format!("{}:{}", args.hostname, args.port);
56
57    // Build the agent card
58    let agent_card = a2a::server::A2AServer::default_card(&format!("http://{}", addr));
59    let a2a_server = a2a::server::A2AServer::new(agent_card);
60
61    // Build A2A router separately
62    let a2a_router = a2a_server.router();
63
64    let app = Router::new()
65        // Health check
66        .route("/health", get(health))
67        // API routes
68        .route("/api/version", get(get_version))
69        .route("/api/session", get(list_sessions).post(create_session))
70        .route("/api/session/{id}", get(get_session))
71        .route("/api/session/{id}/prompt", post(prompt_session))
72        .route("/api/config", get(get_config))
73        .route("/api/provider", get(list_providers))
74        .route("/api/agent", get(list_agents))
75        // Perpetual cognition APIs
76        .route("/v1/cognition/start", post(start_cognition))
77        .route("/v1/cognition/stop", post(stop_cognition))
78        .route("/v1/cognition/status", get(get_cognition_status))
79        .route("/v1/cognition/stream", get(stream_cognition))
80        .route("/v1/cognition/snapshots/latest", get(get_latest_snapshot))
81        // Swarm persona lifecycle APIs
82        .route("/v1/swarm/personas", post(create_persona))
83        .route("/v1/swarm/personas/{id}/spawn", post(spawn_persona))
84        .route("/v1/swarm/personas/{id}/reap", post(reap_persona))
85        .route("/v1/swarm/lineage", get(get_swarm_lineage))
86        .with_state(state)
87        // A2A routes (nested to work with different state type)
88        .nest("/a2a", a2a_router)
89        // Middleware
90        .layer(
91            CorsLayer::new()
92                .allow_origin(Any)
93                .allow_methods(Any)
94                .allow_headers(Any),
95        )
96        .layer(TraceLayer::new_for_http());
97
98    let listener = tokio::net::TcpListener::bind(&addr).await?;
99    tracing::info!("Server listening on http://{}", addr);
100
101    axum::serve(listener, app).await?;
102
103    Ok(())
104}
105
106/// Health check response
107async fn health() -> &'static str {
108    "ok"
109}
110
111/// Version info
112#[derive(Serialize)]
113struct VersionInfo {
114    version: &'static str,
115    name: &'static str,
116}
117
118async fn get_version() -> Json<VersionInfo> {
119    Json(VersionInfo {
120        version: env!("CARGO_PKG_VERSION"),
121        name: env!("CARGO_PKG_NAME"),
122    })
123}
124
125/// List sessions
126#[derive(Deserialize)]
127struct ListSessionsQuery {
128    limit: Option<usize>,
129}
130
131async fn list_sessions(
132    Query(query): Query<ListSessionsQuery>,
133) -> Result<Json<Vec<crate::session::SessionSummary>>, (StatusCode, String)> {
134    let sessions = crate::session::list_sessions()
135        .await
136        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
137
138    let limit = query.limit.unwrap_or(50);
139    Ok(Json(sessions.into_iter().take(limit).collect()))
140}
141
142/// Create a new session
143#[derive(Deserialize)]
144struct CreateSessionRequest {
145    title: Option<String>,
146    agent: Option<String>,
147}
148
149async fn create_session(
150    Json(req): Json<CreateSessionRequest>,
151) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
152    let mut session = crate::session::Session::new()
153        .await
154        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
155
156    session.title = req.title;
157    if let Some(agent) = req.agent {
158        session.agent = agent;
159    }
160
161    session
162        .save()
163        .await
164        .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
165
166    Ok(Json(session))
167}
168
169/// Get a session by ID
170async fn get_session(
171    axum::extract::Path(id): axum::extract::Path<String>,
172) -> Result<Json<crate::session::Session>, (StatusCode, String)> {
173    let session = crate::session::Session::load(&id)
174        .await
175        .map_err(|e| (StatusCode::NOT_FOUND, e.to_string()))?;
176
177    Ok(Json(session))
178}
179
180/// Prompt a session
181#[derive(Deserialize)]
182struct PromptRequest {
183    message: String,
184}
185
186async fn prompt_session(
187    axum::extract::Path(id): axum::extract::Path<String>,
188    Json(req): Json<PromptRequest>,
189) -> Result<Json<crate::session::SessionResult>, (StatusCode, String)> {
190    // Validate the message is not empty
191    if req.message.trim().is_empty() {
192        return Err((
193            StatusCode::BAD_REQUEST,
194            "Message cannot be empty".to_string(),
195        ));
196    }
197
198    // Log the prompt request (uses the message field)
199    tracing::info!(
200        session_id = %id,
201        message_len = req.message.len(),
202        "Received prompt request"
203    );
204
205    // TODO: Implement actual prompting
206    Err((
207        StatusCode::NOT_IMPLEMENTED,
208        "Prompt execution not yet implemented".to_string(),
209    ))
210}
211
212/// Get configuration
213async fn get_config(State(state): State<AppState>) -> Json<Config> {
214    Json((*state.config).clone())
215}
216
217/// List providers
218async fn list_providers() -> Json<Vec<String>> {
219    Json(vec![
220        "openai".to_string(),
221        "anthropic".to_string(),
222        "google".to_string(),
223    ])
224}
225
226/// List agents
227async fn list_agents() -> Json<Vec<crate::agent::AgentInfo>> {
228    let registry = crate::agent::AgentRegistry::with_builtins();
229    Json(registry.list().into_iter().cloned().collect())
230}
231
232async fn start_cognition(
233    State(state): State<AppState>,
234    payload: Option<Json<StartCognitionRequest>>,
235) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
236    state
237        .cognition
238        .start(payload.map(|Json(body)| body))
239        .await
240        .map(Json)
241        .map_err(internal_error)
242}
243
244async fn stop_cognition(
245    State(state): State<AppState>,
246    payload: Option<Json<StopCognitionRequest>>,
247) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
248    let reason = payload.and_then(|Json(body)| body.reason);
249    state
250        .cognition
251        .stop(reason)
252        .await
253        .map(Json)
254        .map_err(internal_error)
255}
256
257async fn get_cognition_status(
258    State(state): State<AppState>,
259) -> Result<Json<CognitionStatus>, (StatusCode, String)> {
260    Ok(Json(state.cognition.status().await))
261}
262
263async fn stream_cognition(
264    State(state): State<AppState>,
265) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
266    let rx = state.cognition.subscribe_events();
267
268    let event_stream = stream::unfold(rx, |mut rx| async move {
269        match rx.recv().await {
270            Ok(event) => {
271                let payload = serde_json::to_string(&event).unwrap_or_else(|_| "{}".to_string());
272                let sse_event = Event::default().event("cognition").data(payload);
273                Some((Ok(sse_event), rx))
274            }
275            Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
276                let lag_event = Event::default()
277                    .event("lag")
278                    .data(format!("skipped {}", skipped));
279                Some((Ok(lag_event), rx))
280            }
281            Err(tokio::sync::broadcast::error::RecvError::Closed) => None,
282        }
283    });
284
285    Sse::new(event_stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
286}
287
288async fn get_latest_snapshot(
289    State(state): State<AppState>,
290) -> Result<Json<MemorySnapshot>, (StatusCode, String)> {
291    match state.cognition.latest_snapshot().await {
292        Some(snapshot) => Ok(Json(snapshot)),
293        None => Err((StatusCode::NOT_FOUND, "No snapshots available".to_string())),
294    }
295}
296
297async fn create_persona(
298    State(state): State<AppState>,
299    Json(req): Json<CreatePersonaRequest>,
300) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
301    state
302        .cognition
303        .create_persona(req)
304        .await
305        .map(Json)
306        .map_err(internal_error)
307}
308
309async fn spawn_persona(
310    State(state): State<AppState>,
311    Path(id): Path<String>,
312    Json(req): Json<SpawnPersonaRequest>,
313) -> Result<Json<crate::cognition::PersonaRuntimeState>, (StatusCode, String)> {
314    state
315        .cognition
316        .spawn_child(&id, req)
317        .await
318        .map(Json)
319        .map_err(internal_error)
320}
321
322async fn reap_persona(
323    State(state): State<AppState>,
324    Path(id): Path<String>,
325    payload: Option<Json<ReapPersonaRequest>>,
326) -> Result<Json<ReapPersonaResponse>, (StatusCode, String)> {
327    let req = payload
328        .map(|Json(body)| body)
329        .unwrap_or(ReapPersonaRequest {
330            cascade: Some(false),
331            reason: None,
332        });
333
334    state
335        .cognition
336        .reap_persona(&id, req)
337        .await
338        .map(Json)
339        .map_err(internal_error)
340}
341
342async fn get_swarm_lineage(
343    State(state): State<AppState>,
344) -> Result<Json<LineageGraph>, (StatusCode, String)> {
345    Ok(Json(state.cognition.lineage_graph().await))
346}
347
348fn internal_error(error: anyhow::Error) -> (StatusCode, String) {
349    let message = error.to_string();
350    if message.contains("not found") {
351        return (StatusCode::NOT_FOUND, message);
352    }
353    if message.contains("disabled") || message.contains("exceeds") || message.contains("limit") {
354        return (StatusCode::BAD_REQUEST, message);
355    }
356    (StatusCode::INTERNAL_SERVER_ERROR, message)
357}
358
359fn env_bool(name: &str, default: bool) -> bool {
360    std::env::var(name)
361        .ok()
362        .and_then(|v| match v.to_ascii_lowercase().as_str() {
363            "1" | "true" | "yes" | "on" => Some(true),
364            "0" | "false" | "no" | "off" => Some(false),
365            _ => None,
366        })
367        .unwrap_or(default)
368}