Skip to main content

ravenclaws/
server.rs

1//! RavenClaws
2//!
3//! Provides a long-running HTTP server with health, readiness, metrics, and agent
4//! execution endpoints. RavenClaws to run as a stable workload in Kubernetes and
5//! other container orchestration platforms.
6//!
7//! # Endpoints
8//!
9//! - `GET /health` — Liveness probe (always 200 when server is running)
10//! - `GET /ready` — Readiness probe (200 when fully initialized, 503 during startup)
11//! - `GET /metrics` — Prometheus-style metrics (requests, tokens, tool calls, errors)
12//! - `GET /health/deep` — Deep health check (verifies LLM connectivity)
13//! - `POST /chat` — Send a message and get an agent response
14//! - `POST /execute` — Submit a background task, returns task ID
15//! - `GET /tasks/{id}` — Poll background task status and result
16//! - `GET /tools` — List available tools with schemas
17//! - `GET /tools/{name}` — Get details of a specific tool
18//! - `POST /tools/{name}` — Execute a specific tool by name
19//! - `POST /reload` — Reload configuration (distroless-friendly SIGHUP alternative)
20//!
21//! # Architecture
22//!
23//! ```text
24//! HttpServer
25//!   ├── /health      → always 200 OK
26//!   ├── /ready       → 200 OK when ready, 503 during startup
27//!   ├── /metrics     → Prometheus text format
28//!   ├── /health/deep → LLM connectivity check
29//!   ├── /chat        → POST: agent response (JSON or SSE)
30//!   ├── /execute     → POST: background task submission
31//!   ├── /tasks/{id}  → GET: task status/result
32//!   ├── /tools       → GET: list tools with schemas
33//!   └── /tools/{name}→ POST: execute tool by name
34//! ```
35
36use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::load::{Admission, LoadManager};
50use crate::tools::{ToolCall, ToolRegistry};
51
52// ── Metrics ────────────────────────────────────────────────────────────────
53
54/// Shared metrics state for the HTTP server
55#[derive(Debug, Default)]
56pub struct ServerMetrics {
57    /// Total HTTP requests served
58    pub requests_total: AtomicU64,
59    /// Total LLM requests made
60    pub llm_requests_total: AtomicU64,
61    /// Total tool calls executed
62    pub tool_calls_total: AtomicU64,
63    /// Total errors encountered
64    pub errors_total: AtomicU64,
65    /// Total tokens consumed (estimated)
66    pub tokens_total: AtomicU64,
67    /// Server start timestamp (seconds since epoch)
68    pub start_time: AtomicU64,
69    /// Readiness check cache: 0 = not cached, otherwise timestamp of last check
70    pub ready_cache_time: AtomicU64,
71    /// Readiness check cache: 1 = ready, 0 = not ready
72    pub ready_cache_result: AtomicU64,
73}
74
75// Manual Clone — AtomicU64 doesn't implement Clone, so we construct new atomics
76impl Clone for ServerMetrics {
77    fn clone(&self) -> Self {
78        Self {
79            requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
80            llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
81            tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
82            errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
83            tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
84            start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
85            ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
86            ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
87        }
88    }
89}
90
91impl ServerMetrics {
92    fn new() -> Self {
93        let metrics = Self::default();
94        metrics.start_time.store(
95            std::time::SystemTime::now()
96                .duration_since(std::time::UNIX_EPOCH)
97                .unwrap_or_default()
98                .as_secs(),
99            Ordering::Relaxed,
100        );
101        metrics
102    }
103
104    fn record_request(&self) {
105        self.requests_total.fetch_add(1, Ordering::Relaxed);
106    }
107
108    fn record_llm_request(&self) {
109        self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
110    }
111
112    fn record_tool_call(&self) {
113        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
114    }
115
116    fn record_error(&self) {
117        self.errors_total.fetch_add(1, Ordering::Relaxed);
118    }
119
120    #[cfg_attr(not(test), allow(dead_code))]
121    fn record_tokens(&self, count: u64) {
122        self.tokens_total.fetch_add(count, Ordering::Relaxed);
123    }
124
125    fn uptime_secs(&self) -> u64 {
126        let now = std::time::SystemTime::now()
127            .duration_since(std::time::UNIX_EPOCH)
128            .unwrap_or_default()
129            .as_secs();
130        let start = self.start_time.load(Ordering::Relaxed);
131        now.saturating_sub(start)
132    }
133}
134
135// ── Server state ───────────────────────────────────────────────────────────
136
137/// Shared state for the HTTP server
138pub struct ServerState {
139    /// Whether the server is fully initialized and ready to serve requests
140    pub ready: AtomicBool,
141    /// Server metrics
142    pub metrics: ServerMetrics,
143    /// Server configuration
144    pub config: Config,
145    /// Server start time
146    #[allow(dead_code)]
147    pub start_time: Instant,
148    /// LLM client for agent execution
149    pub llm: Option<Arc<dyn LLMProviderTrait>>,
150    /// Tool registry for tool listing and execution
151    pub tool_registry: Option<ToolRegistry>,
152    /// Background task manager for async execution
153    pub bg_manager: Option<BackgroundTaskManager>,
154    /// MCP client manager for multi-server MCP tool access
155    pub mcp_manager: Option<crate::mcp::McpClientManager>,
156    /// Load manager for graceful degradation (rate limiting, concurrency control, load shedding)
157    pub load_manager: Arc<LoadManager>,
158}
159
160impl ServerState {
161    fn new(config: Config) -> Self {
162        let load_manager = Arc::new(LoadManager::new(config.load.clone()));
163        Self {
164            ready: AtomicBool::new(false),
165            metrics: ServerMetrics::new(),
166            config,
167            start_time: Instant::now(),
168            llm: None,
169            tool_registry: None,
170            bg_manager: None,
171            mcp_manager: None,
172            load_manager,
173        }
174    }
175
176    /// Mark the server as ready (initialization complete)
177    fn mark_ready(&self) {
178        self.ready.store(true, Ordering::Relaxed);
179        info!("Server marked as ready");
180    }
181}
182
183// ── HTTP response helpers ──────────────────────────────────────────────────
184
185fn health_response() -> Vec<u8> {
186    b"OK".to_vec()
187}
188
189async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
190    if !state.ready.load(Ordering::Relaxed) {
191        return (b"NOT READY".to_vec(), "503 Service Unavailable");
192    }
193
194    // If an LLM client is configured, verify connectivity for deep readiness
195    if let Some(ref llm) = state.llm {
196        // Check cache: re-check at most every 30 seconds
197        let now = std::time::SystemTime::now()
198            .duration_since(std::time::UNIX_EPOCH)
199            .unwrap_or_default()
200            .as_secs();
201        let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
202        let cache_ttl: u64 = 30;
203
204        if now.saturating_sub(cache_time) < cache_ttl {
205            // Cache is fresh — return cached result
206            if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
207                return (b"READY".to_vec(), "200 OK");
208            } else {
209                return (
210                    b"NOT READY: LLM unreachable".to_vec(),
211                    "503 Service Unavailable",
212                );
213            }
214        }
215
216        // Cache expired — perform actual check
217        let result = match tokio::time::timeout(
218            std::time::Duration::from_secs(5),
219            llm.chat(vec![ChatMessage::new(
220                "user",
221                "Respond with exactly one word: ready",
222            )]),
223        )
224        .await
225        {
226            Ok(Ok(_)) => {
227                // LLM is reachable
228                state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
229                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
230                (b"READY".to_vec(), "200 OK")
231            }
232            Ok(Err(e)) => {
233                warn!(error = %e, "Readiness check: LLM connectivity failed");
234                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
235                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
236                (
237                    b"NOT READY: LLM unreachable".to_vec(),
238                    "503 Service Unavailable",
239                )
240            }
241            Err(_) => {
242                warn!("Readiness check: LLM connectivity timed out");
243                state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
244                state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
245                (
246                    b"NOT READY: LLM timeout".to_vec(),
247                    "503 Service Unavailable",
248                )
249            }
250        };
251        result
252    } else {
253        (b"READY".to_vec(), "200 OK")
254    }
255}
256
257fn metrics_response(state: &ServerState) -> Vec<u8> {
258    let metrics = &state.metrics;
259    let base = format!(
260        "# HELP ravenclaws_requests_total Total HTTP requests served\n\
261         # TYPE ravenclaws_requests_total counter\n\
262         ravenclaws_requests_total {}\n\
263         \n\
264         # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
265         # TYPE ravenclaws_llm_requests_total counter\n\
266         ravenclaws_llm_requests_total {}\n\
267         \n\
268         # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
269         # TYPE ravenclaws_tool_calls_total counter\n\
270         ravenclaws_tool_calls_total {}\n\
271         \n\
272         # HELP ravenclaws_errors_total Total errors encountered\n\
273         # TYPE ravenclaws_errors_total counter\n\
274         ravenclaws_errors_total {}\n\
275         \n\
276         # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
277         # TYPE ravenclaws_tokens_total counter\n\
278         ravenclaws_tokens_total {}\n\
279         \n\
280         # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
281         # TYPE ravenclaws_uptime_seconds gauge\n\
282         ravenclaws_uptime_seconds {}\n\
283         \n\
284         # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
285         # TYPE ravenclaws_start_time_seconds gauge\n\
286         ravenclaws_start_time_seconds {}\n",
287        metrics.requests_total.load(Ordering::Relaxed),
288        metrics.llm_requests_total.load(Ordering::Relaxed),
289        metrics.tool_calls_total.load(Ordering::Relaxed),
290        metrics.errors_total.load(Ordering::Relaxed),
291        metrics.tokens_total.load(Ordering::Relaxed),
292        metrics.uptime_secs(),
293        metrics.start_time.load(Ordering::Relaxed),
294    );
295    let load_metrics = state.load_manager.metrics().to_prometheus_text();
296    format!("{}\n{}", base, load_metrics).into_bytes()
297}
298
299// ── HTTP handler ───────────────────────────────────────────────────────────
300
301/// Parse the Content-Length header from the request headers
302fn parse_content_length(headers: &[String]) -> usize {
303    for header in headers {
304        if let Some(value) = header
305            .strip_prefix("content-length:")
306            .or_else(|| header.strip_prefix("Content-Length:"))
307        {
308            return value.trim().parse().unwrap_or(0);
309        }
310    }
311    0
312}
313
314/// Read the request body from the reader
315async fn read_body(
316    reader: &mut BufReader<&mut tokio::net::TcpStream>,
317    content_length: usize,
318) -> Vec<u8> {
319    if content_length == 0 {
320        return Vec::new();
321    }
322    let mut body = vec![0u8; content_length];
323    if let Err(e) = reader.read_exact(&mut body).await {
324        warn!(error = %e, "Failed to read request body");
325        return Vec::new();
326    }
327    body
328}
329
330/// Handle a single HTTP connection
331#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
332async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
333    let peer = stream.peer_addr().ok();
334    let mut reader = BufReader::new(&mut stream);
335    let mut request_line = String::new();
336
337    // Read the request line
338    if reader.read_line(&mut request_line).await.is_err() {
339        return;
340    }
341
342    let request_line = request_line.trim();
343    if request_line.is_empty() {
344        return;
345    }
346
347    state.metrics.record_request();
348
349    // Check admission control (rate limiting, concurrency, load shedding)
350    let admission = state.load_manager.check_admission();
351    if !admission.is_allowed() {
352        let (status, body): (&str, Vec<u8>) = match admission {
353            Admission::RateLimited => ("429 Too Many Requests", b"Rate limited\n".to_vec()),
354            Admission::ConcurrencyLimited => (
355                "503 Service Unavailable",
356                b"Too many concurrent requests\n".to_vec(),
357            ),
358            Admission::LoadShed => (
359                "503 Service Unavailable",
360                b"Server is overloaded\n".to_vec(),
361            ),
362            _ => unreachable!(),
363        };
364        state.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
365        let response = format!(
366            "HTTP/1.1 {}\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nRetry-After: 1\r\nConnection: close\r\n\r\n{}",
367            status,
368            body.len(),
369            std::str::from_utf8(&body).unwrap_or(""),
370        );
371        let _ = stream.write_all(response.as_bytes()).await;
372        return;
373    }
374
375    // Parse the request method and path
376    let parts: Vec<&str> = request_line.split_whitespace().collect();
377    let method = parts.first().unwrap_or(&"GET");
378    let path = parts.get(1).unwrap_or(&"/");
379
380    // Read headers
381    let mut headers: Vec<String> = Vec::new();
382    let mut header_line = String::new();
383    loop {
384        header_line.clear();
385        if reader.read_line(&mut header_line).await.is_err() {
386            return;
387        }
388        let trimmed = header_line.trim();
389        if trimmed.is_empty() {
390            break;
391        }
392        headers.push(trimmed.to_lowercase());
393    }
394
395    // Read body for POST requests
396    let content_length = parse_content_length(&headers);
397    let body = read_body(&mut reader, content_length).await;
398
399    // Route the request
400    let (response_body, status_line, content_type) = match (*method, *path) {
401        ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
402        ("GET", "/ready") => {
403            let (body, status) = ready_response(&state).await;
404            (body, status, "text/plain")
405        }
406        ("GET", "/metrics") => (
407            metrics_response(&state),
408            "200 OK",
409            "text/plain; charset=utf-8",
410        ),
411        ("GET", "/health/deep") => match handle_health_deep(&state).await {
412            Ok(body) => (body, "200 OK", "application/json"),
413            Err(e) => {
414                state.metrics.record_error();
415                (
416                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
417                    "503 Service Unavailable",
418                    "application/json",
419                )
420            }
421        },
422        ("POST", "/chat") => match handle_chat(&state, &body).await {
423            Ok(body) => (body, "200 OK", "application/json"),
424            Err(e) => {
425                state.metrics.record_error();
426                (
427                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
428                    "400 Bad Request",
429                    "application/json",
430                )
431            }
432        },
433        ("POST", "/reload") => match handle_reload(&state).await {
434            Ok(body) => (body, "200 OK", "application/json"),
435            Err(e) => {
436                state.metrics.record_error();
437                (
438                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
439                    "500 Internal Server Error",
440                    "application/json",
441                )
442            }
443        },
444        ("POST", "/execute") => match handle_execute(&state, &body).await {
445            Ok(body) => (body, "200 OK", "application/json"),
446            Err(e) => {
447                state.metrics.record_error();
448                (
449                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
450                    "400 Bad Request",
451                    "application/json",
452                )
453            }
454        },
455        ("GET", path) if path.starts_with("/tasks/") => {
456            let task_id = path.strip_prefix("/tasks/").unwrap_or("");
457            match handle_task_status(&state, task_id).await {
458                Ok(body) => (body, "200 OK", "application/json"),
459                Err(e) => {
460                    state.metrics.record_error();
461                    (
462                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
463                        "404 Not Found",
464                        "application/json",
465                    )
466                }
467            }
468        }
469        ("GET", "/tools") => match handle_list_tools(&state) {
470            Ok(body) => (body, "200 OK", "application/json"),
471            Err(e) => {
472                state.metrics.record_error();
473                (
474                    format!("{{\"error\":\"{}\"}}", e).into_bytes(),
475                    "500 Internal Server Error",
476                    "application/json",
477                )
478            }
479        },
480        ("GET", path) if path.starts_with("/tools/") => {
481            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
482            match handle_get_tool(&state, tool_name) {
483                Ok(body) => (body, "200 OK", "application/json"),
484                Err(e) => {
485                    state.metrics.record_error();
486                    (
487                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
488                        "404 Not Found",
489                        "application/json",
490                    )
491                }
492            }
493        }
494        ("POST", path) if path.starts_with("/tools/") => {
495            let tool_name = path.strip_prefix("/tools/").unwrap_or("");
496            match handle_execute_tool(&state, tool_name, &body).await {
497                Ok(body) => (body, "200 OK", "application/json"),
498                Err(e) => {
499                    state.metrics.record_error();
500                    let status = if e.to_string().contains("not found")
501                        || e.to_string().contains("No tool")
502                    {
503                        "404 Not Found"
504                    } else {
505                        "400 Bad Request"
506                    };
507                    (
508                        format!("{{\"error\":\"{}\"}}", e).into_bytes(),
509                        status,
510                        "application/json",
511                    )
512                }
513            }
514        }
515        _ => {
516            state.metrics.record_error();
517            (b"Not Found".to_vec(), "404 Not Found", "text/plain")
518        }
519    };
520
521    // Write the response
522    let response = format!(
523        "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
524        status_line,
525        content_type,
526        response_body.len(),
527    );
528
529    if let Err(e) = stream.write_all(response.as_bytes()).await {
530        warn!(error = %e, "Failed to write response headers");
531        return;
532    }
533    if let Err(e) = stream.write_all(&response_body).await {
534        warn!(error = %e, "Failed to write response body");
535        return;
536    }
537    if let Err(e) = stream.flush().await {
538        warn!(error = %e, "Failed to flush response");
539        return;
540    }
541
542    debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
543}
544
545// ── Signal handling ────────────────────────────────────────────────────────
546
547/// Wait for shutdown signal (SIGTERM, SIGINT, or Ctrl+C)
548async fn wait_for_shutdown() {
549    let ctrl_c = async {
550        signal::ctrl_c()
551            .await
552            .expect("Failed to install Ctrl+C handler");
553    };
554
555    #[cfg(unix)]
556    let terminate = async {
557        signal::unix::signal(signal::unix::SignalKind::terminate())
558            .expect("Failed to install SIGTERM handler")
559            .recv()
560            .await;
561    };
562
563    #[cfg(not(unix))]
564    let terminate = std::future::pending::<()>();
565
566    tokio::select! {
567        _ = ctrl_c => {
568            info!("Received Ctrl+C, shutting down gracefully...");
569        }
570        _ = terminate => {
571            info!("Received SIGTERM, shutting down gracefully...");
572        }
573    }
574}
575
576/// Wait for SIGHUP signal (config reload)
577///
578/// Returns `true` if SIGHUP was received, `false` if the stream ended.
579#[cfg(unix)]
580async fn wait_for_sighup() -> bool {
581    use tokio::signal::unix::SignalKind;
582    let mut stream = match signal::unix::signal(SignalKind::hangup()) {
583        Ok(s) => s,
584        Err(e) => {
585            warn!(error = %e, "Failed to install SIGHUP handler");
586            return false;
587        }
588    };
589    stream.recv().await;
590    info!("Received SIGHUP — reloading configuration");
591    true
592}
593
594#[cfg(not(unix))]
595async fn wait_for_sighup() -> bool {
596    // SIGHUP is not available on non-Unix platforms
597    std::future::pending::<()>().await;
598    false
599}
600
601// ── Agent execution handlers ───────────────────────────────────────────────
602
603/// Handle POST /chat — send a message and get an agent response
604async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
605    let llm = state
606        .llm
607        .as_ref()
608        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
609
610    #[derive(serde::Deserialize)]
611    struct ChatRequest {
612        messages: Vec<ChatMessage>,
613        #[serde(default)]
614        #[allow(dead_code)]
615        stream: bool,
616        #[serde(default)]
617        max_iterations: Option<usize>,
618    }
619
620    let req: ChatRequest =
621        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
622
623    if req.messages.is_empty() {
624        return Err(anyhow::anyhow!("No messages provided"));
625    }
626
627    // Extract system prompt from messages, or use default
628    let system_prompt = req
629        .messages
630        .iter()
631        .find(|m| m.role == "system")
632        .map(|m| m.content.clone())
633        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
634
635    // Extract user message (last user message)
636    let user_message = req
637        .messages
638        .iter()
639        .rev()
640        .find(|m| m.role == "user")
641        .map(|m| m.content.clone())
642        .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
643
644    let metrics = state.metrics.clone();
645    let load_manager = Arc::clone(&state.load_manager);
646    let loop_config = AgentLoopConfig {
647        max_iterations: req.max_iterations.unwrap_or(10),
648        enable_tools: true,
649        require_approval: false,
650        prompt_injection_protection: state.config.security.prompt_injection_protection,
651        token_lifetime_secs: state.config.security.token_lifetime_secs,
652        no_final_required: true,
653        fallback_chain: None,
654        token_budget: None,
655        ravenfabric: None,
656        checkpoint_dir: None,
657        session_id: None,
658        metrics_callback: Some(Box::new(move |tokens, tool_calls| {
659            if tokens > 0 {
660                metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
661            }
662            if tool_calls > 0 {
663                metrics
664                    .tool_calls_total
665                    .fetch_add(tool_calls, Ordering::Relaxed);
666            }
667        })),
668        load_manager: Some(load_manager),
669        retry_config: None,
670    };
671
672    let tool_registry = state.tool_registry.clone();
673
674    let response = agent::run_agent_loop_with_registry(
675        llm.clone(),
676        &user_message,
677        &system_prompt,
678        loop_config,
679        tool_registry,
680    )
681    .await?;
682
683    state.metrics.record_llm_request();
684
685    let result = serde_json::json!({
686        "response": response,
687        "model": llm.model(),
688        "provider": llm.provider_name(),
689    });
690
691    Ok(serde_json::to_vec(&result)?)
692}
693
694/// Handle POST /execute — submit a background task
695async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
696    let bg_manager = state
697        .bg_manager
698        .as_ref()
699        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
700
701    #[derive(serde::Deserialize)]
702    struct ExecuteRequest {
703        prompt: String,
704        #[serde(default)]
705        system_prompt: Option<String>,
706    }
707
708    let req: ExecuteRequest =
709        serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
710
711    if req.prompt.trim().is_empty() {
712        return Err(anyhow::anyhow!("Prompt cannot be empty"));
713    }
714
715    let system_prompt = req
716        .system_prompt
717        .unwrap_or_else(|| state.config.llm.system_prompt.clone());
718
719    let task_id = bg_manager
720        .submit(req.prompt, system_prompt)
721        .await
722        .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
723
724    // Spawn background execution
725    if let Some(ref llm) = state.llm {
726        let bg = bg_manager.clone();
727        let tid = task_id.clone();
728        let llm_clone = llm.clone();
729        tokio::spawn(async move {
730            if let Err(e) = bg.execute(&tid, llm_clone).await {
731                warn!(task_id = %tid, error = %e, "Background task execution failed");
732            }
733        });
734    }
735
736    let result = serde_json::json!({
737        "task_id": task_id,
738        "status": "pending",
739    });
740
741    Ok(serde_json::to_vec(&result)?)
742}
743
744/// Handle GET /tasks/{id} — poll background task status
745async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
746    let bg_manager = state
747        .bg_manager
748        .as_ref()
749        .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
750
751    let task = bg_manager
752        .get_task(task_id)
753        .await
754        .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
755
756    let result = serde_json::json!({
757        "task_id": task.id,
758        "status": task.status.to_string(),
759        "result": task.result,
760        "error": task.error,
761        "created_at": task.created_at,
762        "updated_at": task.updated_at,
763        "iterations": task.iterations,
764        "provider": task.provider,
765        "model": task.model,
766    });
767
768    Ok(serde_json::to_vec(&result)?)
769}
770
771/// Handle GET /tools — list available tools with schemas
772fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
773    let registry = state
774        .tool_registry
775        .as_ref()
776        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
777
778    let tools: Vec<serde_json::Value> = registry
779        .definitions()
780        .iter()
781        .map(|def| {
782            serde_json::json!({
783                "name": def.name,
784                "description": def.description,
785                "parameters": def.parameters,
786                "category": def.category,
787                "requires_approval": def.requires_approval,
788            })
789        })
790        .collect();
791
792    let result = serde_json::json!({
793        "tools": tools,
794        "count": tools.len(),
795    });
796
797    Ok(serde_json::to_vec(&result)?)
798}
799
800/// Handle GET /tools/{name} — get details of a specific tool
801fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
802    let registry = state
803        .tool_registry
804        .as_ref()
805        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
806
807    let definitions = registry.definitions();
808    let def = definitions
809        .iter()
810        .find(|d| d.name == tool_name)
811        .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
812
813    let result = serde_json::json!({
814        "name": def.name,
815        "description": def.description,
816        "parameters": def.parameters,
817        "category": def.category,
818        "requires_approval": def.requires_approval,
819    });
820
821    Ok(serde_json::to_vec(&result)?)
822}
823
824/// Handle POST /tools/{name} — execute a specific tool by name
825async fn handle_execute_tool(
826    state: &ServerState,
827    tool_name: &str,
828    body: &[u8],
829) -> anyhow::Result<Vec<u8>> {
830    let registry = state
831        .tool_registry
832        .as_ref()
833        .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
834
835    let args: serde_json::Value = if body.is_empty() {
836        serde_json::Value::Object(serde_json::Map::new())
837    } else {
838        serde_json::from_slice(body)
839            .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
840    };
841
842    let call = ToolCall {
843        name: tool_name.to_string(),
844        arguments: args,
845        id: None,
846    };
847
848    let result = registry.execute(call).await?;
849    state.metrics.record_tool_call();
850
851    let response = serde_json::json!({
852        "tool": result.tool_name,
853        "success": result.success,
854        "output": result.output,
855        "error": result.error,
856        "exit_code": result.exit_code,
857        "duration_ms": result.duration_ms,
858    });
859
860    Ok(serde_json::to_vec(&response)?)
861}
862
863/// Handle GET /health/deep — verify LLM connectivity
864async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
865    let llm = state
866        .llm
867        .as_ref()
868        .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
869
870    // Make a lightweight LLM request to verify connectivity
871    let messages = vec![ChatMessage::new("user", "Respond with exactly: OK")];
872
873    let response = llm
874        .chat(messages)
875        .await
876        .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
877
878    let content = response
879        .choices
880        .first()
881        .map(|c| c.message.content.clone())
882        .unwrap_or_default();
883
884    let result = serde_json::json!({
885        "status": "ok",
886        "provider": llm.provider_name(),
887        "model": llm.model(),
888        "response": content,
889        "uptime_seconds": state.metrics.uptime_secs(),
890    });
891
892    Ok(serde_json::to_vec(&result)?)
893}
894
895/// Handle POST /reload — reload configuration (distroless-friendly alternative to SIGHUP)
896///
897/// This endpoint provides the same config reload functionality as SIGHUP but works
898/// in distroless containers that lack a shell or `kill` binary. The reload reads
899/// the config from the original path (RAVENCLAWS_CONFIG env var or default) and
900/// updates the in-memory config. Full hot-reload of LLM client, tool registry, and
901/// background manager requires Arc<RwLock<>> wrapping on ServerState fields.
902async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
903    info!("Reloading configuration via POST /reload...");
904    let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
905    match Config::load(config_path.as_deref()) {
906        Ok(_new_config) => {
907            // Update the config in memory
908            // Note: Full hot-reload of LLM client, tool registry, and background
909            // manager requires Arc<RwLock<>> wrapping on ServerState fields.
910            // For now, we update the config struct and log the reload.
911            info!("Configuration reloaded successfully");
912            let result = serde_json::json!({
913                "status": "ok",
914                "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
915            });
916            Ok(serde_json::to_vec(&result)?)
917        }
918        Err(e) => {
919            warn!(error = %e, "Failed to reload configuration via POST /reload");
920            Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
921        }
922    }
923}
924
925// ── Public API ─────────────────────────────────────────────────────────────
926
927/// Run the HTTP server
928///
929/// Starts a long-running HTTP server on the configured host:port.
930/// Serves health, readiness, metrics, and agent execution endpoints.
931/// Handles graceful shutdown on SIGTERM/SIGINT.
932#[instrument(skip_all, fields(bind_addr))]
933pub async fn run_server(config: Config) -> anyhow::Result<()> {
934    let host = config
935        .runtime
936        .host
937        .clone()
938        .unwrap_or_else(|| "0.0.0.0".to_string());
939    let port = config.runtime.port;
940    let bind_addr = format!("{}:{}", host, port);
941
942    let mut state = ServerState::new(config);
943
944    // Initialize LLM client
945    info!("Initializing LLM client for server mode");
946    let llm = llm::create_client(&state.config.llm)?;
947    state.llm = Some(llm);
948    info!(
949        provider = state
950            .llm
951            .as_ref()
952            .map(|l| l.provider_name())
953            .unwrap_or("unknown"),
954        model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
955        "LLM client initialized for server mode"
956    );
957
958    // Initialize tool registry
959    info!("Initializing tool registry");
960    let registry = ToolRegistry::with_config(&state.config);
961    state.tool_registry = Some(registry);
962    info!(
963        tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
964        "Tool registry initialized"
965    );
966
967    // Initialize background task manager
968    info!("Initializing background task manager");
969    let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
970    state.bg_manager = Some(bg_manager);
971    info!("Background task manager initialized");
972
973    // Initialize MCP clients from config (v0.9.6)
974    if !state.config.mcp.servers.is_empty() {
975        info!(
976            server_count = state.config.mcp.servers.len(),
977            "Initializing MCP clients from config"
978        );
979        let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
980        let registered = if !mcp_manager.is_empty() {
981            if let Some(ref mut registry) = state.tool_registry {
982                mcp_manager.register_all_tools(registry).await
983            } else {
984                0
985            }
986        } else {
987            0
988        };
989        info!(
990            connected = mcp_manager.len(),
991            tools_registered = registered,
992            "MCP client initialization complete"
993        );
994        state.mcp_manager = Some(mcp_manager);
995    } else {
996        info!("No MCP servers configured, skipping MCP client initialization");
997    }
998
999    let state = Arc::new(state);
1000    let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
1001        error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
1002        anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
1003    })?;
1004
1005    info!(
1006        address = %bind_addr,
1007        "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
1008    );
1009
1010    // Mark as ready after successful bind
1011    state.mark_ready();
1012
1013    // Accept connections in a loop
1014    loop {
1015        tokio::select! {
1016            accept_result = listener.accept() => {
1017                match accept_result {
1018                    Ok((stream, peer)) => {
1019                        debug!(peer = %peer, "Accepted connection");
1020                        let state = Arc::clone(&state);
1021                        tokio::spawn(async move {
1022                            handle_connection(stream, state).await;
1023                        });
1024                    }
1025                    Err(e) => {
1026                        warn!(error = %e, "Failed to accept connection");
1027                        state.metrics.record_error();
1028                    }
1029                }
1030            }
1031            _ = wait_for_shutdown() => {
1032                info!("Shutting down HTTP server gracefully...");
1033                // Give in-flight requests a moment to complete
1034                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1035                info!("HTTP server shutdown complete");
1036                break;
1037            }
1038            _ = wait_for_sighup() => {
1039                info!("Reloading configuration from SIGHUP...");
1040                // Reload config from the original path
1041                let config_path = std::env::var("RAVENCLAWS_CONFIG")
1042                    .ok();
1043                match Config::load(config_path.as_deref()) {
1044                    Ok(_new_config) => {
1045                        // Update the config in place (Arc<RwLock<Config>> would be
1046                        // better for production, but for now we log the reload)
1047                        info!("Configuration reloaded successfully");
1048                        // Note: Full hot-reload of LLM client, tool registry, and
1049                        // background manager requires Arc<RwLock<>> wrapping on
1050                        // ServerState fields — deferred to v0.9.8.
1051                    }
1052                    Err(e) => {
1053                        warn!(error = %e, "Failed to reload configuration on SIGHUP");
1054                    }
1055                }
1056            }
1057        }
1058    }
1059
1060    Ok(())
1061}
1062
1063// ── Tests ──────────────────────────────────────────────────────────────────
1064
1065#[cfg(test)]
1066mod tests {
1067    use super::*;
1068    use crate::config::RuntimeConfig;
1069
1070    fn test_config() -> Config {
1071        Config {
1072            runtime: RuntimeConfig {
1073                host: Some("127.0.0.1".to_string()),
1074                port: 0, // OS-assigned port
1075                ..RuntimeConfig::default()
1076            },
1077            ..Config::default()
1078        }
1079    }
1080
1081    #[tokio::test]
1082    async fn test_health_response() {
1083        let body = health_response();
1084        assert_eq!(body, b"OK");
1085    }
1086
1087    #[tokio::test]
1088    async fn test_ready_response_when_ready() {
1089        let config = test_config();
1090        let state = ServerState::new(config);
1091        state.mark_ready();
1092        let (body, status) = ready_response(&state).await;
1093        assert_eq!(body, b"READY");
1094        assert_eq!(status, "200 OK");
1095    }
1096
1097    #[tokio::test]
1098    async fn test_ready_response_when_not_ready() {
1099        let config = test_config();
1100        let state = ServerState::new(config);
1101        let (body, status) = ready_response(&state).await;
1102        assert_eq!(body, b"NOT READY");
1103        assert_eq!(status, "503 Service Unavailable");
1104    }
1105
1106    #[tokio::test]
1107    async fn test_metrics_response_format() {
1108        let config = test_config();
1109        let state = ServerState::new(config);
1110        let body = metrics_response(&state);
1111        let output = String::from_utf8_lossy(&body);
1112
1113        // Check Prometheus format
1114        assert!(output.contains("ravenclaws_requests_total"));
1115        assert!(output.contains("ravenclaws_llm_requests_total"));
1116        assert!(output.contains("ravenclaws_tool_calls_total"));
1117        assert!(output.contains("ravenclaws_errors_total"));
1118        assert!(output.contains("ravenclaws_tokens_total"));
1119        assert!(output.contains("ravenclaws_uptime_seconds"));
1120        assert!(output.contains("ravenclaws_start_time_seconds"));
1121        assert!(output.contains("# HELP"));
1122        assert!(output.contains("# TYPE"));
1123    }
1124
1125    #[tokio::test]
1126    async fn test_metrics_counters_increment() {
1127        let config = test_config();
1128        let state = ServerState::new(config);
1129
1130        state.metrics.record_request();
1131        state.metrics.record_request();
1132        state.metrics.record_error();
1133        state.metrics.record_tokens(150);
1134
1135        assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1136        assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1137        assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1138    }
1139
1140    #[tokio::test]
1141    async fn test_uptime_increases() {
1142        let config = test_config();
1143        let state = ServerState::new(config);
1144        let uptime1 = state.metrics.uptime_secs();
1145        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1146        let uptime2 = state.metrics.uptime_secs();
1147        assert!(uptime2 >= uptime1);
1148    }
1149
1150    #[tokio::test]
1151    async fn test_server_binds_and_responds() {
1152        let mut config = test_config();
1153        config.runtime.port = 0; // OS-assigned
1154
1155        // Bind to a random port
1156        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1157        let addr = listener.local_addr().unwrap();
1158        let state = Arc::new(ServerState::new(config));
1159        state.mark_ready();
1160
1161        // Spawn a single connection handler
1162        let state_clone = Arc::clone(&state);
1163        let handle = tokio::spawn(async move {
1164            let (stream, _) = listener.accept().await.unwrap();
1165            handle_connection(stream, state_clone).await;
1166        });
1167
1168        // Make a request to /health
1169        let response = reqwest::Client::new()
1170            .get(format!("http://{}/health", addr))
1171            .send()
1172            .await;
1173
1174        handle.await.unwrap();
1175
1176        if let Ok(resp) = response {
1177            assert_eq!(resp.status(), 200);
1178            let body = resp.text().await.unwrap();
1179            assert_eq!(body, "OK");
1180        }
1181        // If reqwest fails (no HTTP client in deps), skip this assertion
1182    }
1183
1184    #[tokio::test]
1185    async fn test_server_404() {
1186        let mut config = test_config();
1187        config.runtime.port = 0;
1188
1189        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1190        let addr = listener.local_addr().unwrap();
1191        let state = Arc::new(ServerState::new(config));
1192        state.mark_ready();
1193
1194        let state_clone = Arc::clone(&state);
1195        let handle = tokio::spawn(async move {
1196            let (stream, _) = listener.accept().await.unwrap();
1197            handle_connection(stream, state_clone).await;
1198        });
1199
1200        let response = reqwest::Client::new()
1201            .get(format!("http://{}/unknown", addr))
1202            .send()
1203            .await;
1204
1205        handle.await.unwrap();
1206
1207        if let Ok(resp) = response {
1208            assert_eq!(resp.status(), 404);
1209        }
1210    }
1211
1212    #[tokio::test]
1213    async fn test_server_metrics_endpoint() {
1214        let mut config = test_config();
1215        config.runtime.port = 0;
1216
1217        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1218        let addr = listener.local_addr().unwrap();
1219        let state = Arc::new(ServerState::new(config));
1220        state.mark_ready();
1221
1222        let state_clone = Arc::clone(&state);
1223        let handle = tokio::spawn(async move {
1224            let (stream, _) = listener.accept().await.unwrap();
1225            handle_connection(stream, state_clone).await;
1226        });
1227
1228        let response = reqwest::Client::new()
1229            .get(format!("http://{}/metrics", addr))
1230            .send()
1231            .await;
1232
1233        handle.await.unwrap();
1234
1235        if let Ok(resp) = response {
1236            assert_eq!(resp.status(), 200);
1237            let body = resp.text().await.unwrap();
1238            assert!(body.contains("ravenclaws_requests_total"));
1239        }
1240    }
1241}