Skip to main content

ravenclaws/
server.rs

1//! RavenClaws
2//!
3//! Provides a long-running HTTP server with health, readiness, metrics, and agent
4//! execution endpoints. RavenClaws to run as a stable workload in Kubernetes and
5//! other container orchestration platforms.
6//!
7//! # Endpoints
8//!
9//! - `GET /health` — Liveness probe (always 200 when server is running)
10//! - `GET /ready` — Readiness probe (200 when fully initialized, 503 during startup)
11//! - `GET /metrics` — Prometheus-style metrics (requests, tokens, tool calls, errors)
12//! - `GET /health/deep` — Deep health check (verifies LLM connectivity)
13//! - `POST /chat` — Send a message and get an agent response
14//! - `POST /execute` — Submit a background task, returns task ID
15//! - `GET /tasks/{id}` — Poll background task status and result
16//! - `GET /tools` — List available tools with schemas
17//! - `GET /tools/{name}` — Get details of a specific tool
18//! - `POST /tools/{name}` — Execute a specific tool by name
19//! - `POST /reload` — Reload configuration (distroless-friendly SIGHUP alternative)
20//!
21//! # Architecture
22//!
23//! ```text
24//! HttpServer
25//!   ├── /health      → always 200 OK
26//!   ├── /ready       → 200 OK when ready, 503 during startup
27//!   ├── /metrics     → Prometheus text format
28//!   ├── /health/deep → LLM connectivity check
29//!   ├── /chat        → POST: agent response (JSON or SSE)
30//!   ├── /execute     → POST: background task submission
31//!   ├── /tasks/{id}  → GET: task status/result
32//!   ├── /tools       → GET: list tools with schemas
33//!   └── /tools/{name}→ POST: execute tool by name
34//! ```
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::tools::{ToolCall, ToolRegistry};
50
51// ── Metrics ────────────────────────────────────────────────────────────────
52
53/// Shared metrics state for the HTTP server
54#[derive(Debug, Default)]
55pub struct ServerMetrics {
56    /// Total HTTP requests served
57    pub requests_total: AtomicU64,
58    /// Total LLM requests made
59    pub llm_requests_total: AtomicU64,
60    /// Total tool calls executed
61    pub tool_calls_total: AtomicU64,
62    /// Total errors encountered
63    pub errors_total: AtomicU64,
64    /// Total tokens consumed (estimated)
65    pub tokens_total: AtomicU64,
66    /// Server start timestamp (seconds since epoch)
67    pub start_time: AtomicU64,
68    /// Readiness check cache: 0 = not cached, otherwise timestamp of last check
69    pub ready_cache_time: AtomicU64,
70    /// Readiness check cache: 1 = ready, 0 = not ready
71    pub ready_cache_result: AtomicU64,
72}
73
74// Manual Clone — AtomicU64 doesn't implement Clone, so we construct new atomics
75impl Clone for ServerMetrics {
76    fn clone(&self) -> Self {
77        Self {
78            requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
79            llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
80            tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
81            errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
82            tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
83            start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
84            ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
85            ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
86        }
87    }
88}
89
90impl ServerMetrics {
91    fn new() -> Self {
92        let metrics = Self::default();
93        metrics.start_time.store(
94            std::time::SystemTime::now()
95                .duration_since(std::time::UNIX_EPOCH)
96                .unwrap_or_default()
97                .as_secs(),
98            Ordering::Relaxed,
99        );
100        metrics
101    }
102
103    fn record_request(&self) {
104        self.requests_total.fetch_add(1, Ordering::Relaxed);
105    }
106
107    fn record_llm_request(&self) {
108        self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
109    }
110
111    fn record_tool_call(&self) {
112        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
113    }
114
115    fn record_error(&self) {
116        self.errors_total.fetch_add(1, Ordering::Relaxed);
117    }
118
119    #[cfg_attr(not(test), allow(dead_code))]
120    fn record_tokens(&self, count: u64) {
121        self.tokens_total.fetch_add(count, Ordering::Relaxed);
122    }
123
124    fn uptime_secs(&self) -> u64 {
125        let now = std::time::SystemTime::now()
126            .duration_since(std::time::UNIX_EPOCH)
127            .unwrap_or_default()
128            .as_secs();
129        let start = self.start_time.load(Ordering::Relaxed);
130        now.saturating_sub(start)
131    }
132}
133
134// ── Server state ───────────────────────────────────────────────────────────
135
136/// Shared state for the HTTP server
137pub struct ServerState {
138    /// Whether the server is fully initialized and ready to serve requests
139    pub ready: AtomicBool,
140    /// Server metrics
141    pub metrics: ServerMetrics,
142    /// Server configuration
143    pub config: Config,
144    /// Server start time
145    #[allow(dead_code)]
146    pub start_time: Instant,
147    /// LLM client for agent execution
148    pub llm: Option<Arc<dyn LLMProviderTrait>>,
149    /// Tool registry for tool listing and execution
150    pub tool_registry: Option<ToolRegistry>,
151    /// Background task manager for async execution
152    pub bg_manager: Option<BackgroundTaskManager>,
153    /// MCP client manager for multi-server MCP tool access
154    pub mcp_manager: Option<crate::mcp::McpClientManager>,
155}
156
157impl ServerState {
158    fn new(config: Config) -> Self {
159        Self {
160            ready: AtomicBool::new(false),
161            metrics: ServerMetrics::new(),
162            config,
163            start_time: Instant::now(),
164            llm: None,
165            tool_registry: None,
166            bg_manager: None,
167            mcp_manager: None,
168        }
169    }
170
171    /// Mark the server as ready (initialization complete)
172    fn mark_ready(&self) {
173        self.ready.store(true, Ordering::Relaxed);
174        info!("Server marked as ready");
175    }
176}
177
178// ── HTTP response helpers ──────────────────────────────────────────────────
179
180fn health_response() -> Vec<u8> {
181    b"OK".to_vec()
182}
183
184async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
185    if !state.ready.load(Ordering::Relaxed) {
186        return (b"NOT READY".to_vec(), "503 Service Unavailable");
187    }
188
189    // If an LLM client is configured, verify connectivity for deep readiness
190    if let Some(ref llm) = state.llm {
191        // Check cache: re-check at most every 30 seconds
192        let now = std::time::SystemTime::now()
193            .duration_since(std::time::UNIX_EPOCH)
194            .unwrap_or_default()
195            .as_secs();
196        let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
197        let cache_ttl: u64 = 30;
198
199        if now.saturating_sub(cache_time) < cache_ttl {
200            // Cache is fresh — return cached result
201            if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
202                return (b"READY".to_vec(), "200 OK");
203            } else {
204                return (
205                    b"NOT READY: LLM unreachable".to_vec(),
206                    "503 Service Unavailable",
207                );
208            }
209        }
210
211        // Cache expired — perform actual check
212        let result = match tokio::time::timeout(
213            std::time::Duration::from_secs(5),
214            llm.chat(vec![ChatMessage {
215                role: "user".to_string(),
216                content: "Respond with exactly one word: ready".to_string(),
217            }]),
218        )
219        .await
220        {
221            Ok(Ok(_)) => {
222                // LLM is reachable
223                state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
224                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
225                (b"READY".to_vec(), "200 OK")
226            }
227            Ok(Err(e)) => {
228                warn!(error = %e, "Readiness check: LLM connectivity failed");
229                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
230                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
231                (
232                    b"NOT READY: LLM unreachable".to_vec(),
233                    "503 Service Unavailable",
234                )
235            }
236            Err(_) => {
237                warn!("Readiness check: LLM connectivity timed out");
238                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
239                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
240                (
241                    b"NOT READY: LLM timeout".to_vec(),
242                    "503 Service Unavailable",
243                )
244            }
245        };
246        result
247    } else {
248        (b"READY".to_vec(), "200 OK")
249    }
250}
251
252fn metrics_response(state: &ServerState) -> Vec<u8> {
253    let metrics = &state.metrics;
254    format!(
255        "# HELP ravenclaws_requests_total Total HTTP requests served\n\
256         # TYPE ravenclaws_requests_total counter\n\
257         ravenclaws_requests_total {}\n\
258         \n\
259         # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
260         # TYPE ravenclaws_llm_requests_total counter\n\
261         ravenclaws_llm_requests_total {}\n\
262         \n\
263         # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
264         # TYPE ravenclaws_tool_calls_total counter\n\
265         ravenclaws_tool_calls_total {}\n\
266         \n\
267         # HELP ravenclaws_errors_total Total errors encountered\n\
268         # TYPE ravenclaws_errors_total counter\n\
269         ravenclaws_errors_total {}\n\
270         \n\
271         # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
272         # TYPE ravenclaws_tokens_total counter\n\
273         ravenclaws_tokens_total {}\n\
274         \n\
275         # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
276         # TYPE ravenclaws_uptime_seconds gauge\n\
277         ravenclaws_uptime_seconds {}\n\
278         \n\
279         # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
280         # TYPE ravenclaws_start_time_seconds gauge\n\
281         ravenclaws_start_time_seconds {}\n",
282        metrics.requests_total.load(Ordering::Relaxed),
283        metrics.llm_requests_total.load(Ordering::Relaxed),
284        metrics.tool_calls_total.load(Ordering::Relaxed),
285        metrics.errors_total.load(Ordering::Relaxed),
286        metrics.tokens_total.load(Ordering::Relaxed),
287        metrics.uptime_secs(),
288        metrics.start_time.load(Ordering::Relaxed),
289    )
290    .into_bytes()
291}
292
293// ── HTTP handler ───────────────────────────────────────────────────────────
294
295/// Parse the Content-Length header from the request headers
296fn parse_content_length(headers: &[String]) -> usize {
297    for header in headers {
298        if let Some(value) = header
299            .strip_prefix("content-length:")
300            .or_else(|| header.strip_prefix("Content-Length:"))
301        {
302            return value.trim().parse().unwrap_or(0);
303        }
304    }
305    0
306}
307
308/// Read the request body from the reader
309async fn read_body(
310    reader: &mut BufReader<&mut tokio::net::TcpStream>,
311    content_length: usize,
312) -> Vec<u8> {
313    if content_length == 0 {
314        return Vec::new();
315    }
316    let mut body = vec![0u8; content_length];
317    if let Err(e) = reader.read_exact(&mut body).await {
318        warn!(error = %e, "Failed to read request body");
319        return Vec::new();
320    }
321    body
322}
323
324/// Handle a single HTTP connection
325#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
326async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
327    let peer = stream.peer_addr().ok();
328    let mut reader = BufReader::new(&mut stream);
329    let mut request_line = String::new();
330
331    // Read the request line
332    if reader.read_line(&mut request_line).await.is_err() {
333        return;
334    }
335
336    let request_line = request_line.trim();
337    if request_line.is_empty() {
338        return;
339    }
340
341    state.metrics.record_request();
342
343    // Parse the request method and path
344    let parts: Vec<&str> = request_line.split_whitespace().collect();
345    let method = parts.first().unwrap_or(&"GET");
346    let path = parts.get(1).unwrap_or(&"/");
347
348    // Read headers
349    let mut headers: Vec<String> = Vec::new();
350    let mut header_line = String::new();
351    loop {
352        header_line.clear();
353        if reader.read_line(&mut header_line).await.is_err() {
354            return;
355        }
356        let trimmed = header_line.trim();
357        if trimmed.is_empty() {
358            break;
359        }
360        headers.push(trimmed.to_lowercase());
361    }
362
363    // Read body for POST requests
364    let content_length = parse_content_length(&headers);
365    let body = read_body(&mut reader, content_length).await;
366
367    // Route the request
368    let (response_body, status_line, content_type) = match (*method, *path) {
369        ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
370        ("GET", "/ready") => {
371            let (body, status) = ready_response(&state).await;
372            (body, status, "text/plain")
373        }
374        ("GET", "/metrics") => (
375            metrics_response(&state),
376            "200 OK",
377            "text/plain; charset=utf-8",
378        ),
379        ("GET", "/health/deep") => match handle_health_deep(&state).await {
380            Ok(body) => (body, "200 OK", "application/json"),
381            Err(e) => {
382                state.metrics.record_error();
383                (
384                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
385                    "503 Service Unavailable",
386                    "application/json",
387                )
388            }
389        },
390        ("POST", "/chat") => match handle_chat(&state, &body).await {
391            Ok(body) => (body, "200 OK", "application/json"),
392            Err(e) => {
393                state.metrics.record_error();
394                (
395                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
396                    "400 Bad Request",
397                    "application/json",
398                )
399            }
400        },
401        ("POST", "/reload") => match handle_reload(&state).await {
402            Ok(body) => (body, "200 OK", "application/json"),
403            Err(e) => {
404                state.metrics.record_error();
405                (
406                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
407                    "500 Internal Server Error",
408                    "application/json",
409                )
410            }
411        },
412        ("POST", "/execute") => match handle_execute(&state, &body).await {
413            Ok(body) => (body, "200 OK", "application/json"),
414            Err(e) => {
415                state.metrics.record_error();
416                (
417                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
418                    "400 Bad Request",
419                    "application/json",
420                )
421            }
422        },
423        ("GET", path) if path.starts_with("/tasks/") => {
424            let task_id = path.strip_prefix("/tasks/").unwrap_or("");
425            match handle_task_status(&state, task_id).await {
426                Ok(body) => (body, "200 OK", "application/json"),
427                Err(e) => {
428                    state.metrics.record_error();
429                    (
430                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
431                        "404 Not Found",
432                        "application/json",
433                    )
434                }
435            }
436        }
437        ("GET", "/tools") => match handle_list_tools(&state) {
438            Ok(body) => (body, "200 OK", "application/json"),
439            Err(e) => {
440                state.metrics.record_error();
441                (
442                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
443                    "500 Internal Server Error",
444                    "application/json",
445                )
446            }
447        },
448        ("GET", path) if path.starts_with("/tools/") => {
449            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
450            match handle_get_tool(&state, tool_name) {
451                Ok(body) => (body, "200 OK", "application/json"),
452                Err(e) => {
453                    state.metrics.record_error();
454                    (
455                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
456                        "404 Not Found",
457                        "application/json",
458                    )
459                }
460            }
461        }
462        ("POST", path) if path.starts_with("/tools/") => {
463            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
464            match handle_execute_tool(&state, tool_name, &body).await {
465                Ok(body) => (body, "200 OK", "application/json"),
466                Err(e) => {
467                    state.metrics.record_error();
468                    let status = if e.to_string().contains("not found")
469                        || e.to_string().contains("No tool")
470                    {
471                        "404 Not Found"
472                    } else {
473                        "400 Bad Request"
474                    };
475                    (
476                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
477                        status,
478                        "application/json",
479                    )
480                }
481            }
482        }
483        _ => {
484            state.metrics.record_error();
485            (b"Not Found".to_vec(), "404 Not Found", "text/plain")
486        }
487    };
488
489    // Write the response
490    let response = format!(
491        "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
492        status_line,
493        content_type,
494        response_body.len(),
495    );
496
497    if let Err(e) = stream.write_all(response.as_bytes()).await {
498        warn!(error = %e, "Failed to write response headers");
499        return;
500    }
501    if let Err(e) = stream.write_all(&response_body).await {
502        warn!(error = %e, "Failed to write response body");
503        return;
504    }
505    if let Err(e) = stream.flush().await {
506        warn!(error = %e, "Failed to flush response");
507        return;
508    }
509
510    debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
511}
512
513// ── Signal handling ────────────────────────────────────────────────────────
514
515/// Wait for shutdown signal (SIGTERM, SIGINT, or Ctrl+C)
516async fn wait_for_shutdown() {
517    let ctrl_c = async {
518        signal::ctrl_c()
519            .await
520            .expect("Failed to install Ctrl+C handler");
521    };
522
523    #[cfg(unix)]
524    let terminate = async {
525        signal::unix::signal(signal::unix::SignalKind::terminate())
526            .expect("Failed to install SIGTERM handler")
527            .recv()
528            .await;
529    };
530
531    #[cfg(not(unix))]
532    let terminate = std::future::pending::<()>();
533
534    tokio::select! {
535        _ = ctrl_c => {
536            info!("Received Ctrl+C, shutting down gracefully...");
537        }
538        _ = terminate => {
539            info!("Received SIGTERM, shutting down gracefully...");
540        }
541    }
542}
543
544/// Wait for SIGHUP signal (config reload)
545///
546/// Returns `true` if SIGHUP was received, `false` if the stream ended.
547#[cfg(unix)]
548async fn wait_for_sighup() -> bool {
549    use tokio::signal::unix::SignalKind;
550    let mut stream = match signal::unix::signal(SignalKind::hangup()) {
551        Ok(s) => s,
552        Err(e) => {
553            warn!(error = %e, "Failed to install SIGHUP handler");
554            return false;
555        }
556    };
557    stream.recv().await;
558    info!("Received SIGHUP — reloading configuration");
559    true
560}
561
562#[cfg(not(unix))]
563async fn wait_for_sighup() -> bool {
564    // SIGHUP is not available on non-Unix platforms
565    std::future::pending::<()>().await;
566    false
567}
568
569// ── Agent execution handlers ───────────────────────────────────────────────
570
571/// Handle POST /chat — send a message and get an agent response
572async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
573    let llm = state
574        .llm
575        .as_ref()
576        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
577
578    #[derive(serde::Deserialize)]
579    struct ChatRequest {
580        messages: Vec<ChatMessage>,
581        #[serde(default)]
582        #[allow(dead_code)]
583        stream: bool,
584        #[serde(default)]
585        max_iterations: Option<usize>,
586    }
587
588    let req: ChatRequest =
589        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
590
591    if req.messages.is_empty() {
592        return Err(anyhow::anyhow!("No messages provided"));
593    }
594
595    // Extract system prompt from messages, or use default
596    let system_prompt = req
597        .messages
598        .iter()
599        .find(|m| m.role == "system")
600        .map(|m| m.content.clone())
601        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
602
603    // Extract user message (last user message)
604    let user_message = req
605        .messages
606        .iter()
607        .rev()
608        .find(|m| m.role == "user")
609        .map(|m| m.content.clone())
610        .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
611
612    let metrics = state.metrics.clone();
613    let loop_config = AgentLoopConfig {
614        max_iterations: req.max_iterations.unwrap_or(10),
615        enable_tools: true,
616        require_approval: false,
617        prompt_injection_protection: state.config.security.prompt_injection_protection,
618        token_lifetime_secs: state.config.security.token_lifetime_secs,
619        no_final_required: true,
620        fallback_chain: None,
621        token_budget: None,
622        ravenfabric: None,
623        checkpoint_dir: None,
624        session_id: None,
625        metrics_callback: Some(Box::new(move |tokens, tool_calls| {
626            if tokens > 0 {
627                metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
628            }
629            if tool_calls > 0 {
630                metrics
631                    .tool_calls_total
632                    .fetch_add(tool_calls, Ordering::Relaxed);
633            }
634        })),
635    };
636
637    let tool_registry = state.tool_registry.clone();
638
639    let response = agent::run_agent_loop_with_registry(
640        llm.clone(),
641        &user_message,
642        &system_prompt,
643        loop_config,
644        tool_registry,
645    )
646    .await?;
647
648    state.metrics.record_llm_request();
649
650    let result = serde_json::json!({
651        "response": response,
652        "model": llm.model(),
653        "provider": llm.provider_name(),
654    });
655
656    Ok(serde_json::to_vec(&result)?)
657}
658
659/// Handle POST /execute — submit a background task
660async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
661    let bg_manager = state
662        .bg_manager
663        .as_ref()
664        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
665
666    #[derive(serde::Deserialize)]
667    struct ExecuteRequest {
668        prompt: String,
669        #[serde(default)]
670        system_prompt: Option<String>,
671    }
672
673    let req: ExecuteRequest =
674        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
675
676    if req.prompt.trim().is_empty() {
677        return Err(anyhow::anyhow!("Prompt cannot be empty"));
678    }
679
680    let system_prompt = req
681        .system_prompt
682        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
683
684    let task_id = bg_manager
685        .submit(req.prompt, system_prompt)
686        .await
687        .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
688
689    // Spawn background execution
690    if let Some(ref llm) = state.llm {
691        let bg = bg_manager.clone();
692        let tid = task_id.clone();
693        let llm_clone = llm.clone();
694        tokio::spawn(async move {
695            if let Err(e) = bg.execute(&tid, llm_clone).await {
696                warn!(task_id = %tid, error = %e, "Background task execution failed");
697            }
698        });
699    }
700
701    let result = serde_json::json!({
702        "task_id": task_id,
703        "status": "pending",
704    });
705
706    Ok(serde_json::to_vec(&result)?)
707}
708
709/// Handle GET /tasks/{id} — poll background task status
710async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
711    let bg_manager = state
712        .bg_manager
713        .as_ref()
714        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
715
716    let task = bg_manager
717        .get_task(task_id)
718        .await
719        .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
720
721    let result = serde_json::json!({
722        "task_id": task.id,
723        "status": task.status.to_string(),
724        "result": task.result,
725        "error": task.error,
726        "created_at": task.created_at,
727        "updated_at": task.updated_at,
728        "iterations": task.iterations,
729        "provider": task.provider,
730        "model": task.model,
731    });
732
733    Ok(serde_json::to_vec(&result)?)
734}
735
736/// Handle GET /tools — list available tools with schemas
737fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
738    let registry = state
739        .tool_registry
740        .as_ref()
741        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
742
743    let tools: Vec<serde_json::Value> = registry
744        .definitions()
745        .iter()
746        .map(|def| {
747            serde_json::json!({
748                "name": def.name,
749                "description": def.description,
750                "parameters": def.parameters,
751                "category": def.category,
752                "requires_approval": def.requires_approval,
753            })
754        })
755        .collect();
756
757    let result = serde_json::json!({
758        "tools": tools,
759        "count": tools.len(),
760    });
761
762    Ok(serde_json::to_vec(&result)?)
763}
764
765/// Handle GET /tools/{name} — get details of a specific tool
766fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
767    let registry = state
768        .tool_registry
769        .as_ref()
770        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
771
772    let definitions = registry.definitions();
773    let def = definitions
774        .iter()
775        .find(|d| d.name == tool_name)
776        .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
777
778    let result = serde_json::json!({
779        "name": def.name,
780        "description": def.description,
781        "parameters": def.parameters,
782        "category": def.category,
783        "requires_approval": def.requires_approval,
784    });
785
786    Ok(serde_json::to_vec(&result)?)
787}
788
789/// Handle POST /tools/{name} — execute a specific tool by name
790async fn handle_execute_tool(
791    state: &ServerState,
792    tool_name: &str,
793    body: &[u8],
794) -> anyhow::Result<Vec<u8>> {
795    let registry = state
796        .tool_registry
797        .as_ref()
798        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
799
800    let args: serde_json::Value = if body.is_empty() {
801        serde_json::Value::Object(serde_json::Map::new())
802    } else {
803        serde_json::from_slice(body)
804            .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
805    };
806
807    let call = ToolCall {
808        name: tool_name.to_string(),
809        arguments: args,
810        id: None,
811    };
812
813    let result = registry.execute(call).await?;
814    state.metrics.record_tool_call();
815
816    let response = serde_json::json!({
817        "tool": result.tool_name,
818        "success": result.success,
819        "output": result.output,
820        "error": result.error,
821        "exit_code": result.exit_code,
822        "duration_ms": result.duration_ms,
823    });
824
825    Ok(serde_json::to_vec(&response)?)
826}
827
828/// Handle GET /health/deep — verify LLM connectivity
829async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
830    let llm = state
831        .llm
832        .as_ref()
833        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
834
835    // Make a lightweight LLM request to verify connectivity
836    let messages = vec![ChatMessage {
837        role: "user".to_string(),
838        content: "Respond with exactly: OK".to_string(),
839    }];
840
841    let response = llm
842        .chat(messages)
843        .await
844        .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
845
846    let content = response
847        .choices
848        .first()
849        .map(|c| c.message.content.clone())
850        .unwrap_or_default();
851
852    let result = serde_json::json!({
853        "status": "ok",
854        "provider": llm.provider_name(),
855        "model": llm.model(),
856        "response": content,
857        "uptime_seconds": state.metrics.uptime_secs(),
858    });
859
860    Ok(serde_json::to_vec(&result)?)
861}
862
863/// Handle POST /reload — reload configuration (distroless-friendly alternative to SIGHUP)
864///
865/// This endpoint provides the same config reload functionality as SIGHUP but works
866/// in distroless containers that lack a shell or `kill` binary. The reload reads
867/// the config from the original path (RAVENCLAWS_CONFIG env var or default) and
868/// updates the in-memory config. Full hot-reload of LLM client, tool registry, and
869/// background manager requires Arc<RwLock<>> wrapping on ServerState fields.
870async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
871    info!("Reloading configuration via POST /reload...");
872    let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
873    match Config::load(config_path.as_deref()) {
874        Ok(_new_config) => {
875            // Update the config in memory
876            // Note: Full hot-reload of LLM client, tool registry, and background
877            // manager requires Arc<RwLock<>> wrapping on ServerState fields.
878            // For now, we update the config struct and log the reload.
879            info!("Configuration reloaded successfully");
880            let result = serde_json::json!({
881                "status": "ok",
882                "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
883            });
884            Ok(serde_json::to_vec(&result)?)
885        }
886        Err(e) => {
887            warn!(error = %e, "Failed to reload configuration via POST /reload");
888            Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
889        }
890    }
891}
892
893// ── Public API ─────────────────────────────────────────────────────────────
894
895/// Run the HTTP server
896///
897/// Starts a long-running HTTP server on the configured host:port.
898/// Serves health, readiness, metrics, and agent execution endpoints.
899/// Handles graceful shutdown on SIGTERM/SIGINT.
900#[instrument(skip_all, fields(bind_addr))]
901pub async fn run_server(config: Config) -> anyhow::Result<()> {
902    let host = config
903        .runtime
904        .host
905        .clone()
906        .unwrap_or_else(|| "0.0.0.0".to_string());
907    let port = config.runtime.port;
908    let bind_addr = format!("{}:{}", host, port);
909
910    let mut state = ServerState::new(config);
911
912    // Initialize LLM client
913    info!("Initializing LLM client for server mode");
914    let llm = llm::create_client(&state.config.llm)?;
915    state.llm = Some(llm);
916    info!(
917        provider = state
918            .llm
919            .as_ref()
920            .map(|l| l.provider_name())
921            .unwrap_or("unknown"),
922        model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
923        "LLM client initialized for server mode"
924    );
925
926    // Initialize tool registry
927    info!("Initializing tool registry");
928    let registry = ToolRegistry::with_config(&state.config);
929    state.tool_registry = Some(registry);
930    info!(
931        tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
932        "Tool registry initialized"
933    );
934
935    // Initialize background task manager
936    info!("Initializing background task manager");
937    let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
938    state.bg_manager = Some(bg_manager);
939    info!("Background task manager initialized");
940
941    // Initialize MCP clients from config (v0.9.6)
942    if !state.config.mcp.servers.is_empty() {
943        info!(
944            server_count = state.config.mcp.servers.len(),
945            "Initializing MCP clients from config"
946        );
947        let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
948        let registered = if !mcp_manager.is_empty() {
949            if let Some(ref mut registry) = state.tool_registry {
950                mcp_manager.register_all_tools(registry).await
951            } else {
952                0
953            }
954        } else {
955            0
956        };
957        info!(
958            connected = mcp_manager.len(),
959            tools_registered = registered,
960            "MCP client initialization complete"
961        );
962        state.mcp_manager = Some(mcp_manager);
963    } else {
964        info!("No MCP servers configured, skipping MCP client initialization");
965    }
966
967    let state = Arc::new(state);
968    let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
969        error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
970        anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
971    })?;
972
973    info!(
974        address = %bind_addr,
975        "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
976    );
977
978    // Mark as ready after successful bind
979    state.mark_ready();
980
981    // Accept connections in a loop
982    loop {
983        tokio::select! {
984            accept_result = listener.accept() => {
985                match accept_result {
986                    Ok((stream, peer)) => {
987                        debug!(peer = %peer, "Accepted connection");
988                        let state = Arc::clone(&state);
989                        tokio::spawn(async move {
990                            handle_connection(stream, state).await;
991                        });
992                    }
993                    Err(e) => {
994                        warn!(error = %e, "Failed to accept connection");
995                        state.metrics.record_error();
996                    }
997                }
998            }
999            _ = wait_for_shutdown() => {
1000                info!("Shutting down HTTP server gracefully...");
1001                // Give in-flight requests a moment to complete
1002                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1003                info!("HTTP server shutdown complete");
1004                break;
1005            }
1006            _ = wait_for_sighup() => {
1007                info!("Reloading configuration from SIGHUP...");
1008                // Reload config from the original path
1009                let config_path = std::env::var("RAVENCLAWS_CONFIG")
1010                    .ok();
1011                match Config::load(config_path.as_deref()) {
1012                    Ok(_new_config) => {
1013                        // Update the config in place (Arc<RwLock<Config>> would be
1014                        // better for production, but for now we log the reload)
1015                        info!("Configuration reloaded successfully");
1016                        // Note: Full hot-reload of LLM client, tool registry, and
1017                        // background manager requires Arc<RwLock<>> wrapping on
1018                        // ServerState fields — deferred to v0.9.8.
1019                    }
1020                    Err(e) => {
1021                        warn!(error = %e, "Failed to reload configuration on SIGHUP");
1022                    }
1023                }
1024            }
1025        }
1026    }
1027
1028    Ok(())
1029}
1030
1031// ── Tests ──────────────────────────────────────────────────────────────────
1032
1033#[cfg(test)]
1034mod tests {
1035    use super::*;
1036    use crate::config::RuntimeConfig;
1037
1038    fn test_config() -> Config {
1039        Config {
1040            runtime: RuntimeConfig {
1041                host: Some("127.0.0.1".to_string()),
1042                port: 0, // OS-assigned port
1043                ..RuntimeConfig::default()
1044            },
1045            ..Config::default()
1046        }
1047    }
1048
1049    #[tokio::test]
1050    async fn test_health_response() {
1051        let body = health_response();
1052        assert_eq!(body, b"OK");
1053    }
1054
1055    #[tokio::test]
1056    async fn test_ready_response_when_ready() {
1057        let config = test_config();
1058        let state = ServerState::new(config);
1059        state.mark_ready();
1060        let (body, status) = ready_response(&state).await;
1061        assert_eq!(body, b"READY");
1062        assert_eq!(status, "200 OK");
1063    }
1064
1065    #[tokio::test]
1066    async fn test_ready_response_when_not_ready() {
1067        let config = test_config();
1068        let state = ServerState::new(config);
1069        let (body, status) = ready_response(&state).await;
1070        assert_eq!(body, b"NOT READY");
1071        assert_eq!(status, "503 Service Unavailable");
1072    }
1073
1074    #[tokio::test]
1075    async fn test_metrics_response_format() {
1076        let config = test_config();
1077        let state = ServerState::new(config);
1078        let body = metrics_response(&state);
1079        let output = String::from_utf8_lossy(&body);
1080
1081        // Check Prometheus format
1082        assert!(output.contains("ravenclaws_requests_total"));
1083        assert!(output.contains("ravenclaws_llm_requests_total"));
1084        assert!(output.contains("ravenclaws_tool_calls_total"));
1085        assert!(output.contains("ravenclaws_errors_total"));
1086        assert!(output.contains("ravenclaws_tokens_total"));
1087        assert!(output.contains("ravenclaws_uptime_seconds"));
1088        assert!(output.contains("ravenclaws_start_time_seconds"));
1089        assert!(output.contains("# HELP"));
1090        assert!(output.contains("# TYPE"));
1091    }
1092
1093    #[tokio::test]
1094    async fn test_metrics_counters_increment() {
1095        let config = test_config();
1096        let state = ServerState::new(config);
1097
1098        state.metrics.record_request();
1099        state.metrics.record_request();
1100        state.metrics.record_error();
1101        state.metrics.record_tokens(150);
1102
1103        assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1104        assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1105        assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1106    }
1107
1108    #[tokio::test]
1109    async fn test_uptime_increases() {
1110        let config = test_config();
1111        let state = ServerState::new(config);
1112        let uptime1 = state.metrics.uptime_secs();
1113        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1114        let uptime2 = state.metrics.uptime_secs();
1115        assert!(uptime2 >= uptime1);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_server_binds_and_responds() {
1120        let mut config = test_config();
1121        config.runtime.port = 0; // OS-assigned
1122
1123        // Bind to a random port
1124        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1125        let addr = listener.local_addr().unwrap();
1126        let state = Arc::new(ServerState::new(config));
1127        state.mark_ready();
1128
1129        // Spawn a single connection handler
1130        let state_clone = Arc::clone(&state);
1131        let handle = tokio::spawn(async move {
1132            let (stream, _) = listener.accept().await.unwrap();
1133            handle_connection(stream, state_clone).await;
1134        });
1135
1136        // Make a request to /health
1137        let response = reqwest::Client::new()
1138            .get(format!("http://{}/health", addr))
1139            .send()
1140            .await;
1141
1142        handle.await.unwrap();
1143
1144        if let Ok(resp) = response {
1145            assert_eq!(resp.status(), 200);
1146            let body = resp.text().await.unwrap();
1147            assert_eq!(body, "OK");
1148        }
1149        // If reqwest fails (no HTTP client in deps), skip this assertion
1150    }
1151
1152    #[tokio::test]
1153    async fn test_server_404() {
1154        let mut config = test_config();
1155        config.runtime.port = 0;
1156
1157        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1158        let addr = listener.local_addr().unwrap();
1159        let state = Arc::new(ServerState::new(config));
1160        state.mark_ready();
1161
1162        let state_clone = Arc::clone(&state);
1163        let handle = tokio::spawn(async move {
1164            let (stream, _) = listener.accept().await.unwrap();
1165            handle_connection(stream, state_clone).await;
1166        });
1167
1168        let response = reqwest::Client::new()
1169            .get(format!("http://{}/unknown", addr))
1170            .send()
1171            .await;
1172
1173        handle.await.unwrap();
1174
1175        if let Ok(resp) = response {
1176            assert_eq!(resp.status(), 404);
1177        }
1178    }
1179
1180    #[tokio::test]
1181    async fn test_server_metrics_endpoint() {
1182        let mut config = test_config();
1183        config.runtime.port = 0;
1184
1185        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1186        let addr = listener.local_addr().unwrap();
1187        let state = Arc::new(ServerState::new(config));
1188        state.mark_ready();
1189
1190        let state_clone = Arc::clone(&state);
1191        let handle = tokio::spawn(async move {
1192            let (stream, _) = listener.accept().await.unwrap();
1193            handle_connection(stream, state_clone).await;
1194        });
1195
1196        let response = reqwest::Client::new()
1197            .get(format!("http://{}/metrics", addr))
1198            .send()
1199            .await;
1200
1201        handle.await.unwrap();
1202
1203        if let Ok(resp) = response {
1204            assert_eq!(resp.status(), 200);
1205            let body = resp.text().await.unwrap();
1206            assert!(body.contains("ravenclaws_requests_total"));
1207        }
1208    }
1209}