Skip to main content

ravenclaws/
server.rs

1//! RavenClaws
2//!
3//! Provides a long-running HTTP server with health, readiness, metrics, and agent
4//! execution endpoints. RavenClaws to run as a stable workload in Kubernetes and
5//! other container orchestration platforms.
6//!
7//! # Endpoints
8//!
9//! - `GET /health` — Liveness probe (always 200 when server is running)
10//! - `GET /ready` — Readiness probe (200 when fully initialized, 503 during startup)
11//! - `GET /metrics` — Prometheus-style metrics (requests, tokens, tool calls, errors)
12//! - `GET /health/deep` — Deep health check (verifies LLM connectivity)
13//! - `POST /chat` — Send a message and get an agent response
14//! - `POST /execute` — Submit a background task, returns task ID
15//! - `GET /tasks/{id}` — Poll background task status and result
16//! - `GET /tools` — List available tools with schemas
17//! - `GET /tools/{name}` — Get details of a specific tool
18//! - `POST /tools/{name}` — Execute a specific tool by name
19//! - `POST /reload` — Reload configuration (distroless-friendly SIGHUP alternative)
20//!
21//! # Architecture
22//!
23//! ```text
24//! HttpServer
25//!   ├── /health      → always 200 OK
26//!   ├── /ready       → 200 OK when ready, 503 during startup
27//!   ├── /metrics     → Prometheus text format
28//!   ├── /health/deep → LLM connectivity check
29//!   ├── /chat        → POST: agent response (JSON or SSE)
30//!   ├── /execute     → POST: background task submission
31//!   ├── /tasks/{id}  → GET: task status/result
32//!   ├── /tools       → GET: list tools with schemas
33//!   └── /tools/{name}→ POST: execute tool by name
34//! ```
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::load::{Admission, LoadManager};
50use crate::tools::{ToolCall, ToolRegistry};
51
52// ── Metrics ────────────────────────────────────────────────────────────────
53
54/// Shared metrics state for the HTTP server
55#[derive(Debug, Default)]
56pub struct ServerMetrics {
57    /// Total HTTP requests served
58    pub requests_total: AtomicU64,
59    /// Total LLM requests made
60    pub llm_requests_total: AtomicU64,
61    /// Total tool calls executed
62    pub tool_calls_total: AtomicU64,
63    /// Total errors encountered
64    pub errors_total: AtomicU64,
65    /// Total tokens consumed (estimated)
66    pub tokens_total: AtomicU64,
67    /// Server start timestamp (seconds since epoch)
68    pub start_time: AtomicU64,
69    /// Readiness check cache: 0 = not cached, otherwise timestamp of last check
70    pub ready_cache_time: AtomicU64,
71    /// Readiness check cache: 1 = ready, 0 = not ready
72    pub ready_cache_result: AtomicU64,
73}
74
75// Manual Clone — AtomicU64 doesn't implement Clone, so we construct new atomics
76impl Clone for ServerMetrics {
77    fn clone(&self) -> Self {
78        Self {
79            requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
80            llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
81            tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
82            errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
83            tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
84            start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
85            ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
86            ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
87        }
88    }
89}
90
91impl ServerMetrics {
92    fn new() -> Self {
93        let metrics = Self::default();
94        metrics.start_time.store(
95            std::time::SystemTime::now()
96                .duration_since(std::time::UNIX_EPOCH)
97                .unwrap_or_default()
98                .as_secs(),
99            Ordering::Relaxed,
100        );
101        metrics
102    }
103
104    fn record_request(&self) {
105        self.requests_total.fetch_add(1, Ordering::Relaxed);
106    }
107
108    fn record_llm_request(&self) {
109        self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
110    }
111
112    fn record_tool_call(&self) {
113        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
114    }
115
116    fn record_error(&self) {
117        self.errors_total.fetch_add(1, Ordering::Relaxed);
118    }
119
120    #[cfg_attr(not(test), allow(dead_code))]
121    fn record_tokens(&self, count: u64) {
122        self.tokens_total.fetch_add(count, Ordering::Relaxed);
123    }
124
125    fn uptime_secs(&self) -> u64 {
126        let now = std::time::SystemTime::now()
127            .duration_since(std::time::UNIX_EPOCH)
128            .unwrap_or_default()
129            .as_secs();
130        let start = self.start_time.load(Ordering::Relaxed);
131        now.saturating_sub(start)
132    }
133}
134
135// ── Server state ───────────────────────────────────────────────────────────
136
137/// Shared state for the HTTP server
138pub struct ServerState {
139    /// Whether the server is fully initialized and ready to serve requests
140    pub ready: AtomicBool,
141    /// Server metrics
142    pub metrics: ServerMetrics,
143    /// Server configuration
144    pub config: Config,
145    /// Server start time
146    #[allow(dead_code)]
147    pub start_time: Instant,
148    /// LLM client for agent execution
149    pub llm: Option<Arc<dyn LLMProviderTrait>>,
150    /// Tool registry for tool listing and execution
151    pub tool_registry: Option<ToolRegistry>,
152    /// Background task manager for async execution
153    pub bg_manager: Option<BackgroundTaskManager>,
154    /// MCP client manager for multi-server MCP tool access
155    pub mcp_manager: Option<crate::mcp::McpClientManager>,
156    /// Load manager for graceful degradation (rate limiting, concurrency control, load shedding)
157    pub load_manager: Arc<LoadManager>,
158}
159
160impl ServerState {
161    fn new(config: Config) -> Self {
162        let load_manager = Arc::new(LoadManager::new(config.load.clone()));
163        Self {
164            ready: AtomicBool::new(false),
165            metrics: ServerMetrics::new(),
166            config,
167            start_time: Instant::now(),
168            llm: None,
169            tool_registry: None,
170            bg_manager: None,
171            mcp_manager: None,
172            load_manager,
173        }
174    }
175
176    /// Mark the server as ready (initialization complete)
177    fn mark_ready(&self) {
178        self.ready.store(true, Ordering::Relaxed);
179        info!("Server marked as ready");
180    }
181}
182
183// ── HTTP response helpers ──────────────────────────────────────────────────
184
185fn health_response() -> Vec<u8> {
186    b"OK".to_vec()
187}
188
189async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
190    if !state.ready.load(Ordering::Relaxed) {
191        return (b"NOT READY".to_vec(), "503 Service Unavailable");
192    }
193
194    // If an LLM client is configured, verify connectivity for deep readiness
195    if let Some(ref llm) = state.llm {
196        // Check cache: re-check at most every 30 seconds
197        let now = std::time::SystemTime::now()
198            .duration_since(std::time::UNIX_EPOCH)
199            .unwrap_or_default()
200            .as_secs();
201        let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
202        let cache_ttl: u64 = 30;
203
204        if now.saturating_sub(cache_time) < cache_ttl {
205            // Cache is fresh — return cached result
206            if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
207                return (b"READY".to_vec(), "200 OK");
208            } else {
209                return (
210                    b"NOT READY: LLM unreachable".to_vec(),
211                    "503 Service Unavailable",
212                );
213            }
214        }
215
216        // Cache expired — perform actual check
217        let result = match tokio::time::timeout(
218            std::time::Duration::from_secs(5),
219            llm.chat(vec![ChatMessage::new(
220                "user",
221                "Respond with exactly one word: ready",
222            )]),
223        )
224        .await
225        {
226            Ok(Ok(_)) => {
227                // LLM is reachable
228                state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
229                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
230                (b"READY".to_vec(), "200 OK")
231            }
232            Ok(Err(e)) => {
233                warn!(error = %e, "Readiness check: LLM connectivity failed");
234                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
235                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
236                (
237                    b"NOT READY: LLM unreachable".to_vec(),
238                    "503 Service Unavailable",
239                )
240            }
241            Err(_) => {
242                warn!("Readiness check: LLM connectivity timed out");
243                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
244                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
245                (
246                    b"NOT READY: LLM timeout".to_vec(),
247                    "503 Service Unavailable",
248                )
249            }
250        };
251        result
252    } else {
253        (b"READY".to_vec(), "200 OK")
254    }
255}
256
257fn metrics_response(state: &ServerState) -> Vec<u8> {
258    let metrics = &state.metrics;
259    let base = format!(
260        "# HELP ravenclaws_requests_total Total HTTP requests served\n\
261         # TYPE ravenclaws_requests_total counter\n\
262         ravenclaws_requests_total {}\n\
263         \n\
264         # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
265         # TYPE ravenclaws_llm_requests_total counter\n\
266         ravenclaws_llm_requests_total {}\n\
267         \n\
268         # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
269         # TYPE ravenclaws_tool_calls_total counter\n\
270         ravenclaws_tool_calls_total {}\n\
271         \n\
272         # HELP ravenclaws_errors_total Total errors encountered\n\
273         # TYPE ravenclaws_errors_total counter\n\
274         ravenclaws_errors_total {}\n\
275         \n\
276         # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
277         # TYPE ravenclaws_tokens_total counter\n\
278         ravenclaws_tokens_total {}\n\
279         \n\
280         # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
281         # TYPE ravenclaws_uptime_seconds gauge\n\
282         ravenclaws_uptime_seconds {}\n\
283         \n\
284         # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
285         # TYPE ravenclaws_start_time_seconds gauge\n\
286         ravenclaws_start_time_seconds {}\n",
287        metrics.requests_total.load(Ordering::Relaxed),
288        metrics.llm_requests_total.load(Ordering::Relaxed),
289        metrics.tool_calls_total.load(Ordering::Relaxed),
290        metrics.errors_total.load(Ordering::Relaxed),
291        metrics.tokens_total.load(Ordering::Relaxed),
292        metrics.uptime_secs(),
293        metrics.start_time.load(Ordering::Relaxed),
294    );
295    let load_metrics = state.load_manager.metrics().to_prometheus_text();
296    format!("{}\n{}", base, load_metrics).into_bytes()
297}
298
299// ── HTTP handler ───────────────────────────────────────────────────────────
300
301/// Parse the Content-Length header from the request headers
302fn parse_content_length(headers: &[String]) -> usize {
303    for header in headers {
304        if let Some(value) = header
305            .strip_prefix("content-length:")
306            .or_else(|| header.strip_prefix("Content-Length:"))
307        {
308            return value.trim().parse().unwrap_or(0);
309        }
310    }
311    0
312}
313
314/// Read the request body from the reader
315async fn read_body(
316    reader: &mut BufReader<&mut tokio::net::TcpStream>,
317    content_length: usize,
318) -> Vec<u8> {
319    if content_length == 0 {
320        return Vec::new();
321    }
322    let mut body = vec![0u8; content_length];
323    if let Err(e) = reader.read_exact(&mut body).await {
324        warn!(error = %e, "Failed to read request body");
325        return Vec::new();
326    }
327    body
328}
329
330/// Handle a single HTTP connection
331#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
332async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
333    let peer = stream.peer_addr().ok();
334    let mut reader = BufReader::new(&mut stream);
335    let mut request_line = String::new();
336
337    // Read the request line
338    if reader.read_line(&mut request_line).await.is_err() {
339        return;
340    }
341
342    let request_line = request_line.trim();
343    if request_line.is_empty() {
344        return;
345    }
346
347    state.metrics.record_request();
348
349    // Check admission control (rate limiting, concurrency, load shedding)
350    let admission = state.load_manager.check_admission();
351    if !admission.is_allowed() {
352        let (status, body): (&str, Vec<u8>) = match admission {
353            Admission::RateLimited => ("429 Too Many Requests", b"Rate limited\n".to_vec()),
354            Admission::ConcurrencyLimited => (
355                "503 Service Unavailable",
356                b"Too many concurrent requests\n".to_vec(),
357            ),
358            Admission::LoadShed => (
359                "503 Service Unavailable",
360                b"Server is overloaded\n".to_vec(),
361            ),
362            _ => unreachable!(),
363        };
364        state.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
365        let response = format!(
366            "HTTP/1.1 {}\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nRetry-After: 1\r\nConnection: close\r\n\r\n{}",
367            status,
368            body.len(),
369            std::str::from_utf8(&body).unwrap_or(""),
370        );
371        let _ = stream.write_all(response.as_bytes()).await;
372        return;
373    }
374
375    // Parse the request method and path
376    let parts: Vec<&str> = request_line.split_whitespace().collect();
377    let method = parts.first().unwrap_or(&"GET");
378    let path = parts.get(1).unwrap_or(&"/");
379
380    // Read headers
381    let mut headers: Vec<String> = Vec::new();
382    let mut header_line = String::new();
383    loop {
384        header_line.clear();
385        if reader.read_line(&mut header_line).await.is_err() {
386            return;
387        }
388        let trimmed = header_line.trim();
389        if trimmed.is_empty() {
390            break;
391        }
392        headers.push(trimmed.to_lowercase());
393    }
394
395    // Read body for POST requests
396    let content_length = parse_content_length(&headers);
397    let body = read_body(&mut reader, content_length).await;
398
399    // Route the request
400    let (response_body, status_line, content_type) = match (*method, *path) {
401        ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
402        ("GET", "/ready") => {
403            let (body, status) = ready_response(&state).await;
404            (body, status, "text/plain")
405        }
406        ("GET", "/metrics") => (
407            metrics_response(&state),
408            "200 OK",
409            "text/plain; charset=utf-8",
410        ),
411        ("GET", "/health/deep") => match handle_health_deep(&state).await {
412            Ok(body) => (body, "200 OK", "application/json"),
413            Err(e) => {
414                state.metrics.record_error();
415                (
416                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
417                    "503 Service Unavailable",
418                    "application/json",
419                )
420            }
421        },
422        ("POST", "/chat") => match handle_chat(&state, &body).await {
423            Ok(body) => (body, "200 OK", "application/json"),
424            Err(e) => {
425                state.metrics.record_error();
426                (
427                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
428                    "400 Bad Request",
429                    "application/json",
430                )
431            }
432        },
433        ("POST", "/reload") => match handle_reload(&state).await {
434            Ok(body) => (body, "200 OK", "application/json"),
435            Err(e) => {
436                state.metrics.record_error();
437                (
438                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
439                    "500 Internal Server Error",
440                    "application/json",
441                )
442            }
443        },
444        ("POST", "/execute") => match handle_execute(&state, &body).await {
445            Ok(body) => (body, "200 OK", "application/json"),
446            Err(e) => {
447                state.metrics.record_error();
448                (
449                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
450                    "400 Bad Request",
451                    "application/json",
452                )
453            }
454        },
455        ("GET", path) if path.starts_with("/tasks/") => {
456            let task_id = path.strip_prefix("/tasks/").unwrap_or("");
457            match handle_task_status(&state, task_id).await {
458                Ok(body) => (body, "200 OK", "application/json"),
459                Err(e) => {
460                    state.metrics.record_error();
461                    (
462                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
463                        "404 Not Found",
464                        "application/json",
465                    )
466                }
467            }
468        }
469        ("GET", "/tools") => match handle_list_tools(&state) {
470            Ok(body) => (body, "200 OK", "application/json"),
471            Err(e) => {
472                state.metrics.record_error();
473                (
474                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
475                    "500 Internal Server Error",
476                    "application/json",
477                )
478            }
479        },
480        ("GET", path) if path.starts_with("/tools/") => {
481            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
482            match handle_get_tool(&state, tool_name) {
483                Ok(body) => (body, "200 OK", "application/json"),
484                Err(e) => {
485                    state.metrics.record_error();
486                    (
487                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
488                        "404 Not Found",
489                        "application/json",
490                    )
491                }
492            }
493        }
494        ("POST", path) if path.starts_with("/tools/") => {
495            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
496            match handle_execute_tool(&state, tool_name, &body).await {
497                Ok(body) => (body, "200 OK", "application/json"),
498                Err(e) => {
499                    state.metrics.record_error();
500                    let status = if e.to_string().contains("not found")
501                        || e.to_string().contains("No tool")
502                    {
503                        "404 Not Found"
504                    } else {
505                        "400 Bad Request"
506                    };
507                    (
508                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
509                        status,
510                        "application/json",
511                    )
512                }
513            }
514        }
515        _ => {
516            state.metrics.record_error();
517            (b"Not Found".to_vec(), "404 Not Found", "text/plain")
518        }
519    };
520
521    // Write the response
522    let response = format!(
523        "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
524        status_line,
525        content_type,
526        response_body.len(),
527    );
528
529    if let Err(e) = stream.write_all(response.as_bytes()).await {
530        warn!(error = %e, "Failed to write response headers");
531        return;
532    }
533    if let Err(e) = stream.write_all(&response_body).await {
534        warn!(error = %e, "Failed to write response body");
535        return;
536    }
537    if let Err(e) = stream.flush().await {
538        warn!(error = %e, "Failed to flush response");
539        return;
540    }
541
542    debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
543}
544
545// ── Signal handling ────────────────────────────────────────────────────────
546
547/// Wait for shutdown signal (SIGTERM, SIGINT, or Ctrl+C)
548async fn wait_for_shutdown() {
549    let ctrl_c = async {
550        signal::ctrl_c()
551            .await
552            .expect("Failed to install Ctrl+C handler");
553    };
554
555    #[cfg(unix)]
556    let terminate = async {
557        signal::unix::signal(signal::unix::SignalKind::terminate())
558            .expect("Failed to install SIGTERM handler")
559            .recv()
560            .await;
561    };
562
563    #[cfg(not(unix))]
564    let terminate = std::future::pending::<()>();
565
566    tokio::select! {
567        _ = ctrl_c => {
568            info!("Received Ctrl+C, shutting down gracefully...");
569        }
570        _ = terminate => {
571            info!("Received SIGTERM, shutting down gracefully...");
572        }
573    }
574}
575
576/// Wait for SIGHUP signal (config reload)
577///
578/// Returns `true` if SIGHUP was received, `false` if the stream ended.
579#[cfg(unix)]
580async fn wait_for_sighup() -> bool {
581    use tokio::signal::unix::SignalKind;
582    let mut stream = match signal::unix::signal(SignalKind::hangup()) {
583        Ok(s) => s,
584        Err(e) => {
585            warn!(error = %e, "Failed to install SIGHUP handler");
586            return false;
587        }
588    };
589    stream.recv().await;
590    info!("Received SIGHUP — reloading configuration");
591    true
592}
593
594#[cfg(not(unix))]
595async fn wait_for_sighup() -> bool {
596    // SIGHUP is not available on non-Unix platforms
597    std::future::pending::<()>().await;
598    false
599}
600
601// ── Agent execution handlers ───────────────────────────────────────────────
602
603/// Handle POST /chat — send a message and get an agent response
604async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
605    let llm = state
606        .llm
607        .as_ref()
608        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
609
610    #[derive(serde::Deserialize)]
611    struct ChatRequest {
612        messages: Vec<ChatMessage>,
613        #[serde(default)]
614        #[allow(dead_code)]
615        stream: bool,
616        #[serde(default)]
617        max_iterations: Option<usize>,
618    }
619
620    let req: ChatRequest =
621        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
622
623    if req.messages.is_empty() {
624        return Err(anyhow::anyhow!("No messages provided"));
625    }
626
627    // Extract system prompt from messages, or use default
628    let system_prompt = req
629        .messages
630        .iter()
631        .find(|m| m.role == "system")
632        .map(|m| m.content.clone())
633        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
634
635    // Extract user message (last user message)
636    let user_message = req
637        .messages
638        .iter()
639        .rev()
640        .find(|m| m.role == "user")
641        .map(|m| m.content.clone())
642        .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
643
644    let metrics = state.metrics.clone();
645    let load_manager = Arc::clone(&state.load_manager);
646    let loop_config = AgentLoopConfig {
647        max_iterations: req.max_iterations.unwrap_or(10),
648        enable_tools: true,
649        require_approval: false,
650        prompt_injection_protection: state.config.security.prompt_injection_protection,
651        token_lifetime_secs: state.config.security.token_lifetime_secs,
652        no_final_required: true,
653        fallback_chain: None,
654        token_budget: None,
655        ravenfabric: None,
656        checkpoint_dir: None,
657        session_id: None,
658        metrics_callback: Some(Box::new(move |tokens, tool_calls| {
659            if tokens > 0 {
660                metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
661            }
662            if tool_calls > 0 {
663                metrics
664                    .tool_calls_total
665                    .fetch_add(tool_calls, Ordering::Relaxed);
666            }
667        })),
668        load_manager: Some(load_manager),
669    };
670
671    let tool_registry = state.tool_registry.clone();
672
673    let response = agent::run_agent_loop_with_registry(
674        llm.clone(),
675        &user_message,
676        &system_prompt,
677        loop_config,
678        tool_registry,
679    )
680    .await?;
681
682    state.metrics.record_llm_request();
683
684    let result = serde_json::json!({
685        "response": response,
686        "model": llm.model(),
687        "provider": llm.provider_name(),
688    });
689
690    Ok(serde_json::to_vec(&result)?)
691}
692
693/// Handle POST /execute — submit a background task
694async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
695    let bg_manager = state
696        .bg_manager
697        .as_ref()
698        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
699
700    #[derive(serde::Deserialize)]
701    struct ExecuteRequest {
702        prompt: String,
703        #[serde(default)]
704        system_prompt: Option<String>,
705    }
706
707    let req: ExecuteRequest =
708        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
709
710    if req.prompt.trim().is_empty() {
711        return Err(anyhow::anyhow!("Prompt cannot be empty"));
712    }
713
714    let system_prompt = req
715        .system_prompt
716        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
717
718    let task_id = bg_manager
719        .submit(req.prompt, system_prompt)
720        .await
721        .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
722
723    // Spawn background execution
724    if let Some(ref llm) = state.llm {
725        let bg = bg_manager.clone();
726        let tid = task_id.clone();
727        let llm_clone = llm.clone();
728        tokio::spawn(async move {
729            if let Err(e) = bg.execute(&tid, llm_clone).await {
730                warn!(task_id = %tid, error = %e, "Background task execution failed");
731            }
732        });
733    }
734
735    let result = serde_json::json!({
736        "task_id": task_id,
737        "status": "pending",
738    });
739
740    Ok(serde_json::to_vec(&result)?)
741}
742
743/// Handle GET /tasks/{id} — poll background task status
744async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
745    let bg_manager = state
746        .bg_manager
747        .as_ref()
748        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
749
750    let task = bg_manager
751        .get_task(task_id)
752        .await
753        .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
754
755    let result = serde_json::json!({
756        "task_id": task.id,
757        "status": task.status.to_string(),
758        "result": task.result,
759        "error": task.error,
760        "created_at": task.created_at,
761        "updated_at": task.updated_at,
762        "iterations": task.iterations,
763        "provider": task.provider,
764        "model": task.model,
765    });
766
767    Ok(serde_json::to_vec(&result)?)
768}
769
770/// Handle GET /tools — list available tools with schemas
771fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
772    let registry = state
773        .tool_registry
774        .as_ref()
775        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
776
777    let tools: Vec<serde_json::Value> = registry
778        .definitions()
779        .iter()
780        .map(|def| {
781            serde_json::json!({
782                "name": def.name,
783                "description": def.description,
784                "parameters": def.parameters,
785                "category": def.category,
786                "requires_approval": def.requires_approval,
787            })
788        })
789        .collect();
790
791    let result = serde_json::json!({
792        "tools": tools,
793        "count": tools.len(),
794    });
795
796    Ok(serde_json::to_vec(&result)?)
797}
798
799/// Handle GET /tools/{name} — get details of a specific tool
800fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
801    let registry = state
802        .tool_registry
803        .as_ref()
804        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
805
806    let definitions = registry.definitions();
807    let def = definitions
808        .iter()
809        .find(|d| d.name == tool_name)
810        .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
811
812    let result = serde_json::json!({
813        "name": def.name,
814        "description": def.description,
815        "parameters": def.parameters,
816        "category": def.category,
817        "requires_approval": def.requires_approval,
818    });
819
820    Ok(serde_json::to_vec(&result)?)
821}
822
823/// Handle POST /tools/{name} — execute a specific tool by name
824async fn handle_execute_tool(
825    state: &ServerState,
826    tool_name: &str,
827    body: &[u8],
828) -> anyhow::Result<Vec<u8>> {
829    let registry = state
830        .tool_registry
831        .as_ref()
832        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
833
834    let args: serde_json::Value = if body.is_empty() {
835        serde_json::Value::Object(serde_json::Map::new())
836    } else {
837        serde_json::from_slice(body)
838            .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
839    };
840
841    let call = ToolCall {
842        name: tool_name.to_string(),
843        arguments: args,
844        id: None,
845    };
846
847    let result = registry.execute(call).await?;
848    state.metrics.record_tool_call();
849
850    let response = serde_json::json!({
851        "tool": result.tool_name,
852        "success": result.success,
853        "output": result.output,
854        "error": result.error,
855        "exit_code": result.exit_code,
856        "duration_ms": result.duration_ms,
857    });
858
859    Ok(serde_json::to_vec(&response)?)
860}
861
862/// Handle GET /health/deep — verify LLM connectivity
863async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
864    let llm = state
865        .llm
866        .as_ref()
867        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
868
869    // Make a lightweight LLM request to verify connectivity
870    let messages = vec![ChatMessage::new("user", "Respond with exactly: OK")];
871
872    let response = llm
873        .chat(messages)
874        .await
875        .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
876
877    let content = response
878        .choices
879        .first()
880        .map(|c| c.message.content.clone())
881        .unwrap_or_default();
882
883    let result = serde_json::json!({
884        "status": "ok",
885        "provider": llm.provider_name(),
886        "model": llm.model(),
887        "response": content,
888        "uptime_seconds": state.metrics.uptime_secs(),
889    });
890
891    Ok(serde_json::to_vec(&result)?)
892}
893
894/// Handle POST /reload — reload configuration (distroless-friendly alternative to SIGHUP)
895///
896/// This endpoint provides the same config reload functionality as SIGHUP but works
897/// in distroless containers that lack a shell or `kill` binary. The reload reads
898/// the config from the original path (RAVENCLAWS_CONFIG env var or default) and
899/// updates the in-memory config. Full hot-reload of LLM client, tool registry, and
900/// background manager requires Arc<RwLock<>> wrapping on ServerState fields.
901async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
902    info!("Reloading configuration via POST /reload...");
903    let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
904    match Config::load(config_path.as_deref()) {
905        Ok(_new_config) => {
906            // Update the config in memory
907            // Note: Full hot-reload of LLM client, tool registry, and background
908            // manager requires Arc<RwLock<>> wrapping on ServerState fields.
909            // For now, we update the config struct and log the reload.
910            info!("Configuration reloaded successfully");
911            let result = serde_json::json!({
912                "status": "ok",
913                "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
914            });
915            Ok(serde_json::to_vec(&result)?)
916        }
917        Err(e) => {
918            warn!(error = %e, "Failed to reload configuration via POST /reload");
919            Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
920        }
921    }
922}
923
924// ── Public API ─────────────────────────────────────────────────────────────
925
926/// Run the HTTP server
927///
928/// Starts a long-running HTTP server on the configured host:port.
929/// Serves health, readiness, metrics, and agent execution endpoints.
930/// Handles graceful shutdown on SIGTERM/SIGINT.
931#[instrument(skip_all, fields(bind_addr))]
932pub async fn run_server(config: Config) -> anyhow::Result<()> {
933    let host = config
934        .runtime
935        .host
936        .clone()
937        .unwrap_or_else(|| "0.0.0.0".to_string());
938    let port = config.runtime.port;
939    let bind_addr = format!("{}:{}", host, port);
940
941    let mut state = ServerState::new(config);
942
943    // Initialize LLM client
944    info!("Initializing LLM client for server mode");
945    let llm = llm::create_client(&state.config.llm)?;
946    state.llm = Some(llm);
947    info!(
948        provider = state
949            .llm
950            .as_ref()
951            .map(|l| l.provider_name())
952            .unwrap_or("unknown"),
953        model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
954        "LLM client initialized for server mode"
955    );
956
957    // Initialize tool registry
958    info!("Initializing tool registry");
959    let registry = ToolRegistry::with_config(&state.config);
960    state.tool_registry = Some(registry);
961    info!(
962        tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
963        "Tool registry initialized"
964    );
965
966    // Initialize background task manager
967    info!("Initializing background task manager");
968    let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
969    state.bg_manager = Some(bg_manager);
970    info!("Background task manager initialized");
971
972    // Initialize MCP clients from config (v0.9.6)
973    if !state.config.mcp.servers.is_empty() {
974        info!(
975            server_count = state.config.mcp.servers.len(),
976            "Initializing MCP clients from config"
977        );
978        let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
979        let registered = if !mcp_manager.is_empty() {
980            if let Some(ref mut registry) = state.tool_registry {
981                mcp_manager.register_all_tools(registry).await
982            } else {
983                0
984            }
985        } else {
986            0
987        };
988        info!(
989            connected = mcp_manager.len(),
990            tools_registered = registered,
991            "MCP client initialization complete"
992        );
993        state.mcp_manager = Some(mcp_manager);
994    } else {
995        info!("No MCP servers configured, skipping MCP client initialization");
996    }
997
998    let state = Arc::new(state);
999    let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
1000        error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
1001        anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
1002    })?;
1003
1004    info!(
1005        address = %bind_addr,
1006        "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
1007    );
1008
1009    // Mark as ready after successful bind
1010    state.mark_ready();
1011
1012    // Accept connections in a loop
1013    loop {
1014        tokio::select! {
1015            accept_result = listener.accept() => {
1016                match accept_result {
1017                    Ok((stream, peer)) => {
1018                        debug!(peer = %peer, "Accepted connection");
1019                        let state = Arc::clone(&state);
1020                        tokio::spawn(async move {
1021                            handle_connection(stream, state).await;
1022                        });
1023                    }
1024                    Err(e) => {
1025                        warn!(error = %e, "Failed to accept connection");
1026                        state.metrics.record_error();
1027                    }
1028                }
1029            }
1030            _ = wait_for_shutdown() => {
1031                info!("Shutting down HTTP server gracefully...");
1032                // Give in-flight requests a moment to complete
1033                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1034                info!("HTTP server shutdown complete");
1035                break;
1036            }
1037            _ = wait_for_sighup() => {
1038                info!("Reloading configuration from SIGHUP...");
1039                // Reload config from the original path
1040                let config_path = std::env::var("RAVENCLAWS_CONFIG")
1041                    .ok();
1042                match Config::load(config_path.as_deref()) {
1043                    Ok(_new_config) => {
1044                        // Update the config in place (Arc<RwLock<Config>> would be
1045                        // better for production, but for now we log the reload)
1046                        info!("Configuration reloaded successfully");
1047                        // Note: Full hot-reload of LLM client, tool registry, and
1048                        // background manager requires Arc<RwLock<>> wrapping on
1049                        // ServerState fields — deferred to v0.9.8.
1050                    }
1051                    Err(e) => {
1052                        warn!(error = %e, "Failed to reload configuration on SIGHUP");
1053                    }
1054                }
1055            }
1056        }
1057    }
1058
1059    Ok(())
1060}
1061
1062// ── Tests ──────────────────────────────────────────────────────────────────
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067    use crate::config::RuntimeConfig;
1068
1069    fn test_config() -> Config {
1070        Config {
1071            runtime: RuntimeConfig {
1072                host: Some("127.0.0.1".to_string()),
1073                port: 0, // OS-assigned port
1074                ..RuntimeConfig::default()
1075            },
1076            ..Config::default()
1077        }
1078    }
1079
1080    #[tokio::test]
1081    async fn test_health_response() {
1082        let body = health_response();
1083        assert_eq!(body, b"OK");
1084    }
1085
1086    #[tokio::test]
1087    async fn test_ready_response_when_ready() {
1088        let config = test_config();
1089        let state = ServerState::new(config);
1090        state.mark_ready();
1091        let (body, status) = ready_response(&state).await;
1092        assert_eq!(body, b"READY");
1093        assert_eq!(status, "200 OK");
1094    }
1095
1096    #[tokio::test]
1097    async fn test_ready_response_when_not_ready() {
1098        let config = test_config();
1099        let state = ServerState::new(config);
1100        let (body, status) = ready_response(&state).await;
1101        assert_eq!(body, b"NOT READY");
1102        assert_eq!(status, "503 Service Unavailable");
1103    }
1104
1105    #[tokio::test]
1106    async fn test_metrics_response_format() {
1107        let config = test_config();
1108        let state = ServerState::new(config);
1109        let body = metrics_response(&state);
1110        let output = String::from_utf8_lossy(&body);
1111
1112        // Check Prometheus format
1113        assert!(output.contains("ravenclaws_requests_total"));
1114        assert!(output.contains("ravenclaws_llm_requests_total"));
1115        assert!(output.contains("ravenclaws_tool_calls_total"));
1116        assert!(output.contains("ravenclaws_errors_total"));
1117        assert!(output.contains("ravenclaws_tokens_total"));
1118        assert!(output.contains("ravenclaws_uptime_seconds"));
1119        assert!(output.contains("ravenclaws_start_time_seconds"));
1120        assert!(output.contains("# HELP"));
1121        assert!(output.contains("# TYPE"));
1122    }
1123
1124    #[tokio::test]
1125    async fn test_metrics_counters_increment() {
1126        let config = test_config();
1127        let state = ServerState::new(config);
1128
1129        state.metrics.record_request();
1130        state.metrics.record_request();
1131        state.metrics.record_error();
1132        state.metrics.record_tokens(150);
1133
1134        assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1135        assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1136        assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1137    }
1138
1139    #[tokio::test]
1140    async fn test_uptime_increases() {
1141        let config = test_config();
1142        let state = ServerState::new(config);
1143        let uptime1 = state.metrics.uptime_secs();
1144        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1145        let uptime2 = state.metrics.uptime_secs();
1146        assert!(uptime2 >= uptime1);
1147    }
1148
1149    #[tokio::test]
1150    async fn test_server_binds_and_responds() {
1151        let mut config = test_config();
1152        config.runtime.port = 0; // OS-assigned
1153
1154        // Bind to a random port
1155        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1156        let addr = listener.local_addr().unwrap();
1157        let state = Arc::new(ServerState::new(config));
1158        state.mark_ready();
1159
1160        // Spawn a single connection handler
1161        let state_clone = Arc::clone(&state);
1162        let handle = tokio::spawn(async move {
1163            let (stream, _) = listener.accept().await.unwrap();
1164            handle_connection(stream, state_clone).await;
1165        });
1166
1167        // Make a request to /health
1168        let response = reqwest::Client::new()
1169            .get(format!("http://{}/health", addr))
1170            .send()
1171            .await;
1172
1173        handle.await.unwrap();
1174
1175        if let Ok(resp) = response {
1176            assert_eq!(resp.status(), 200);
1177            let body = resp.text().await.unwrap();
1178            assert_eq!(body, "OK");
1179        }
1180        // If reqwest fails (no HTTP client in deps), skip this assertion
1181    }
1182
1183    #[tokio::test]
1184    async fn test_server_404() {
1185        let mut config = test_config();
1186        config.runtime.port = 0;
1187
1188        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1189        let addr = listener.local_addr().unwrap();
1190        let state = Arc::new(ServerState::new(config));
1191        state.mark_ready();
1192
1193        let state_clone = Arc::clone(&state);
1194        let handle = tokio::spawn(async move {
1195            let (stream, _) = listener.accept().await.unwrap();
1196            handle_connection(stream, state_clone).await;
1197        });
1198
1199        let response = reqwest::Client::new()
1200            .get(format!("http://{}/unknown", addr))
1201            .send()
1202            .await;
1203
1204        handle.await.unwrap();
1205
1206        if let Ok(resp) = response {
1207            assert_eq!(resp.status(), 404);
1208        }
1209    }
1210
1211    #[tokio::test]
1212    async fn test_server_metrics_endpoint() {
1213        let mut config = test_config();
1214        config.runtime.port = 0;
1215
1216        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1217        let addr = listener.local_addr().unwrap();
1218        let state = Arc::new(ServerState::new(config));
1219        state.mark_ready();
1220
1221        let state_clone = Arc::clone(&state);
1222        let handle = tokio::spawn(async move {
1223            let (stream, _) = listener.accept().await.unwrap();
1224            handle_connection(stream, state_clone).await;
1225        });
1226
1227        let response = reqwest::Client::new()
1228            .get(format!("http://{}/metrics", addr))
1229            .send()
1230            .await;
1231
1232        handle.await.unwrap();
1233
1234        if let Ok(resp) = response {
1235            assert_eq!(resp.status(), 200);
1236            let body = resp.text().await.unwrap();
1237            assert!(body.contains("ravenclaws_requests_total"));
1238        }
1239    }
1240}