1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::load::{Admission, LoadManager};
50use crate::tools::{ToolCall, ToolRegistry};
51
52#[derive(Debug, Default)]
56pub struct ServerMetrics {
57 pub requests_total: AtomicU64,
59 pub llm_requests_total: AtomicU64,
61 pub tool_calls_total: AtomicU64,
63 pub errors_total: AtomicU64,
65 pub tokens_total: AtomicU64,
67 pub start_time: AtomicU64,
69 pub ready_cache_time: AtomicU64,
71 pub ready_cache_result: AtomicU64,
73}
74
75impl Clone for ServerMetrics {
77 fn clone(&self) -> Self {
78 Self {
79 requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
80 llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
81 tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
82 errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
83 tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
84 start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
85 ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
86 ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
87 }
88 }
89}
90
91impl ServerMetrics {
92 fn new() -> Self {
93 let metrics = Self::default();
94 metrics.start_time.store(
95 std::time::SystemTime::now()
96 .duration_since(std::time::UNIX_EPOCH)
97 .unwrap_or_default()
98 .as_secs(),
99 Ordering::Relaxed,
100 );
101 metrics
102 }
103
104 fn record_request(&self) {
105 self.requests_total.fetch_add(1, Ordering::Relaxed);
106 }
107
108 fn record_llm_request(&self) {
109 self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
110 }
111
112 fn record_tool_call(&self) {
113 self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
114 }
115
116 fn record_error(&self) {
117 self.errors_total.fetch_add(1, Ordering::Relaxed);
118 }
119
120 #[cfg_attr(not(test), allow(dead_code))]
121 fn record_tokens(&self, count: u64) {
122 self.tokens_total.fetch_add(count, Ordering::Relaxed);
123 }
124
125 fn uptime_secs(&self) -> u64 {
126 let now = std::time::SystemTime::now()
127 .duration_since(std::time::UNIX_EPOCH)
128 .unwrap_or_default()
129 .as_secs();
130 let start = self.start_time.load(Ordering::Relaxed);
131 now.saturating_sub(start)
132 }
133}
134
135pub struct ServerState {
139 pub ready: AtomicBool,
141 pub metrics: ServerMetrics,
143 pub config: Config,
145 #[allow(dead_code)]
147 pub start_time: Instant,
148 pub llm: Option<Arc<dyn LLMProviderTrait>>,
150 pub tool_registry: Option<ToolRegistry>,
152 pub bg_manager: Option<BackgroundTaskManager>,
154 pub mcp_manager: Option<crate::mcp::McpClientManager>,
156 pub load_manager: Arc<LoadManager>,
158}
159
160impl ServerState {
161 fn new(config: Config) -> Self {
162 let load_manager = Arc::new(LoadManager::new(config.load.clone()));
163 Self {
164 ready: AtomicBool::new(false),
165 metrics: ServerMetrics::new(),
166 config,
167 start_time: Instant::now(),
168 llm: None,
169 tool_registry: None,
170 bg_manager: None,
171 mcp_manager: None,
172 load_manager,
173 }
174 }
175
176 fn mark_ready(&self) {
178 self.ready.store(true, Ordering::Relaxed);
179 info!("Server marked as ready");
180 }
181}
182
183fn health_response() -> Vec<u8> {
186 b"OK".to_vec()
187}
188
189async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
190 if !state.ready.load(Ordering::Relaxed) {
191 return (b"NOT READY".to_vec(), "503 Service Unavailable");
192 }
193
194 if let Some(ref llm) = state.llm {
196 let now = std::time::SystemTime::now()
198 .duration_since(std::time::UNIX_EPOCH)
199 .unwrap_or_default()
200 .as_secs();
201 let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
202 let cache_ttl: u64 = 30;
203
204 if now.saturating_sub(cache_time) < cache_ttl {
205 if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
207 return (b"READY".to_vec(), "200 OK");
208 } else {
209 return (
210 b"NOT READY: LLM unreachable".to_vec(),
211 "503 Service Unavailable",
212 );
213 }
214 }
215
216 let result = match tokio::time::timeout(
218 std::time::Duration::from_secs(5),
219 llm.chat(vec![ChatMessage::new(
220 "user",
221 "Respond with exactly one word: ready",
222 )]),
223 )
224 .await
225 {
226 Ok(Ok(_)) => {
227 state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
229 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
230 (b"READY".to_vec(), "200 OK")
231 }
232 Ok(Err(e)) => {
233 warn!(error = %e, "Readiness check: LLM connectivity failed");
234 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
235 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
236 (
237 b"NOT READY: LLM unreachable".to_vec(),
238 "503 Service Unavailable",
239 )
240 }
241 Err(_) => {
242 warn!("Readiness check: LLM connectivity timed out");
243 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
244 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
245 (
246 b"NOT READY: LLM timeout".to_vec(),
247 "503 Service Unavailable",
248 )
249 }
250 };
251 result
252 } else {
253 (b"READY".to_vec(), "200 OK")
254 }
255}
256
257fn metrics_response(state: &ServerState) -> Vec<u8> {
258 let metrics = &state.metrics;
259 let base = format!(
260 "# HELP ravenclaws_requests_total Total HTTP requests served\n\
261 # TYPE ravenclaws_requests_total counter\n\
262 ravenclaws_requests_total {}\n\
263 \n\
264 # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
265 # TYPE ravenclaws_llm_requests_total counter\n\
266 ravenclaws_llm_requests_total {}\n\
267 \n\
268 # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
269 # TYPE ravenclaws_tool_calls_total counter\n\
270 ravenclaws_tool_calls_total {}\n\
271 \n\
272 # HELP ravenclaws_errors_total Total errors encountered\n\
273 # TYPE ravenclaws_errors_total counter\n\
274 ravenclaws_errors_total {}\n\
275 \n\
276 # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
277 # TYPE ravenclaws_tokens_total counter\n\
278 ravenclaws_tokens_total {}\n\
279 \n\
280 # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
281 # TYPE ravenclaws_uptime_seconds gauge\n\
282 ravenclaws_uptime_seconds {}\n\
283 \n\
284 # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
285 # TYPE ravenclaws_start_time_seconds gauge\n\
286 ravenclaws_start_time_seconds {}\n",
287 metrics.requests_total.load(Ordering::Relaxed),
288 metrics.llm_requests_total.load(Ordering::Relaxed),
289 metrics.tool_calls_total.load(Ordering::Relaxed),
290 metrics.errors_total.load(Ordering::Relaxed),
291 metrics.tokens_total.load(Ordering::Relaxed),
292 metrics.uptime_secs(),
293 metrics.start_time.load(Ordering::Relaxed),
294 );
295 let load_metrics = state.load_manager.metrics().to_prometheus_text();
296 format!("{}\n{}", base, load_metrics).into_bytes()
297}
298
299fn parse_content_length(headers: &[String]) -> usize {
303 for header in headers {
304 if let Some(value) = header
305 .strip_prefix("content-length:")
306 .or_else(|| header.strip_prefix("Content-Length:"))
307 {
308 return value.trim().parse().unwrap_or(0);
309 }
310 }
311 0
312}
313
314async fn read_body(
316 reader: &mut BufReader<&mut tokio::net::TcpStream>,
317 content_length: usize,
318) -> Vec<u8> {
319 if content_length == 0 {
320 return Vec::new();
321 }
322 let mut body = vec![0u8; content_length];
323 if let Err(e) = reader.read_exact(&mut body).await {
324 warn!(error = %e, "Failed to read request body");
325 return Vec::new();
326 }
327 body
328}
329
330#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
332async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
333 let peer = stream.peer_addr().ok();
334 let mut reader = BufReader::new(&mut stream);
335 let mut request_line = String::new();
336
337 if reader.read_line(&mut request_line).await.is_err() {
339 return;
340 }
341
342 let request_line = request_line.trim();
343 if request_line.is_empty() {
344 return;
345 }
346
347 state.metrics.record_request();
348
349 let admission = state.load_manager.check_admission();
351 if !admission.is_allowed() {
352 let (status, body): (&str, Vec<u8>) = match admission {
353 Admission::RateLimited => ("429 Too Many Requests", b"Rate limited\n".to_vec()),
354 Admission::ConcurrencyLimited => (
355 "503 Service Unavailable",
356 b"Too many concurrent requests\n".to_vec(),
357 ),
358 Admission::LoadShed => (
359 "503 Service Unavailable",
360 b"Server is overloaded\n".to_vec(),
361 ),
362 _ => unreachable!(),
363 };
364 state.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
365 let response = format!(
366 "HTTP/1.1 {}\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nRetry-After: 1\r\nConnection: close\r\n\r\n{}",
367 status,
368 body.len(),
369 std::str::from_utf8(&body).unwrap_or(""),
370 );
371 let _ = stream.write_all(response.as_bytes()).await;
372 return;
373 }
374
375 let parts: Vec<&str> = request_line.split_whitespace().collect();
377 let method = parts.first().unwrap_or(&"GET");
378 let path = parts.get(1).unwrap_or(&"/");
379
380 let mut headers: Vec<String> = Vec::new();
382 let mut header_line = String::new();
383 loop {
384 header_line.clear();
385 if reader.read_line(&mut header_line).await.is_err() {
386 return;
387 }
388 let trimmed = header_line.trim();
389 if trimmed.is_empty() {
390 break;
391 }
392 headers.push(trimmed.to_lowercase());
393 }
394
395 let content_length = parse_content_length(&headers);
397 let body = read_body(&mut reader, content_length).await;
398
399 let (response_body, status_line, content_type) = match (*method, *path) {
401 ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
402 ("GET", "/ready") => {
403 let (body, status) = ready_response(&state).await;
404 (body, status, "text/plain")
405 }
406 ("GET", "/metrics") => (
407 metrics_response(&state),
408 "200 OK",
409 "text/plain; charset=utf-8",
410 ),
411 ("GET", "/health/deep") => match handle_health_deep(&state).await {
412 Ok(body) => (body, "200 OK", "application/json"),
413 Err(e) => {
414 state.metrics.record_error();
415 (
416 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
417 "503 Service Unavailable",
418 "application/json",
419 )
420 }
421 },
422 ("POST", "/chat") => match handle_chat(&state, &body).await {
423 Ok(body) => (body, "200 OK", "application/json"),
424 Err(e) => {
425 state.metrics.record_error();
426 (
427 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
428 "400 Bad Request",
429 "application/json",
430 )
431 }
432 },
433 ("POST", "/reload") => match handle_reload(&state).await {
434 Ok(body) => (body, "200 OK", "application/json"),
435 Err(e) => {
436 state.metrics.record_error();
437 (
438 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
439 "500 Internal Server Error",
440 "application/json",
441 )
442 }
443 },
444 ("POST", "/execute") => match handle_execute(&state, &body).await {
445 Ok(body) => (body, "200 OK", "application/json"),
446 Err(e) => {
447 state.metrics.record_error();
448 (
449 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
450 "400 Bad Request",
451 "application/json",
452 )
453 }
454 },
455 ("GET", path) if path.starts_with("/tasks/") => {
456 let task_id = path.strip_prefix("/tasks/").unwrap_or("");
457 match handle_task_status(&state, task_id).await {
458 Ok(body) => (body, "200 OK", "application/json"),
459 Err(e) => {
460 state.metrics.record_error();
461 (
462 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
463 "404 Not Found",
464 "application/json",
465 )
466 }
467 }
468 }
469 ("GET", "/tools") => match handle_list_tools(&state) {
470 Ok(body) => (body, "200 OK", "application/json"),
471 Err(e) => {
472 state.metrics.record_error();
473 (
474 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
475 "500 Internal Server Error",
476 "application/json",
477 )
478 }
479 },
480 ("GET", path) if path.starts_with("/tools/") => {
481 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
482 match handle_get_tool(&state, tool_name) {
483 Ok(body) => (body, "200 OK", "application/json"),
484 Err(e) => {
485 state.metrics.record_error();
486 (
487 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
488 "404 Not Found",
489 "application/json",
490 )
491 }
492 }
493 }
494 ("POST", path) if path.starts_with("/tools/") => {
495 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
496 match handle_execute_tool(&state, tool_name, &body).await {
497 Ok(body) => (body, "200 OK", "application/json"),
498 Err(e) => {
499 state.metrics.record_error();
500 let status = if e.to_string().contains("not found")
501 || e.to_string().contains("No tool")
502 {
503 "404 Not Found"
504 } else {
505 "400 Bad Request"
506 };
507 (
508 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
509 status,
510 "application/json",
511 )
512 }
513 }
514 }
515 _ => {
516 state.metrics.record_error();
517 (b"Not Found".to_vec(), "404 Not Found", "text/plain")
518 }
519 };
520
521 let response = format!(
523 "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
524 status_line,
525 content_type,
526 response_body.len(),
527 );
528
529 if let Err(e) = stream.write_all(response.as_bytes()).await {
530 warn!(error = %e, "Failed to write response headers");
531 return;
532 }
533 if let Err(e) = stream.write_all(&response_body).await {
534 warn!(error = %e, "Failed to write response body");
535 return;
536 }
537 if let Err(e) = stream.flush().await {
538 warn!(error = %e, "Failed to flush response");
539 return;
540 }
541
542 debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
543}
544
545async fn wait_for_shutdown() {
549 let ctrl_c = async {
550 signal::ctrl_c()
551 .await
552 .expect("Failed to install Ctrl+C handler");
553 };
554
555 #[cfg(unix)]
556 let terminate = async {
557 signal::unix::signal(signal::unix::SignalKind::terminate())
558 .expect("Failed to install SIGTERM handler")
559 .recv()
560 .await;
561 };
562
563 #[cfg(not(unix))]
564 let terminate = std::future::pending::<()>();
565
566 tokio::select! {
567 _ = ctrl_c => {
568 info!("Received Ctrl+C, shutting down gracefully...");
569 }
570 _ = terminate => {
571 info!("Received SIGTERM, shutting down gracefully...");
572 }
573 }
574}
575
576#[cfg(unix)]
580async fn wait_for_sighup() -> bool {
581 use tokio::signal::unix::SignalKind;
582 let mut stream = match signal::unix::signal(SignalKind::hangup()) {
583 Ok(s) => s,
584 Err(e) => {
585 warn!(error = %e, "Failed to install SIGHUP handler");
586 return false;
587 }
588 };
589 stream.recv().await;
590 info!("Received SIGHUP — reloading configuration");
591 true
592}
593
594#[cfg(not(unix))]
595async fn wait_for_sighup() -> bool {
596 std::future::pending::<()>().await;
598 false
599}
600
601async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
605 let llm = state
606 .llm
607 .as_ref()
608 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
609
610 #[derive(serde::Deserialize)]
611 struct ChatRequest {
612 messages: Vec<ChatMessage>,
613 #[serde(default)]
614 #[allow(dead_code)]
615 stream: bool,
616 #[serde(default)]
617 max_iterations: Option<usize>,
618 }
619
620 let req: ChatRequest =
621 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
622
623 if req.messages.is_empty() {
624 return Err(anyhow::anyhow!("No messages provided"));
625 }
626
627 let system_prompt = req
629 .messages
630 .iter()
631 .find(|m| m.role == "system")
632 .map(|m| m.content.clone())
633 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
634
635 let user_message = req
637 .messages
638 .iter()
639 .rev()
640 .find(|m| m.role == "user")
641 .map(|m| m.content.clone())
642 .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
643
644 let metrics = state.metrics.clone();
645 let load_manager = Arc::clone(&state.load_manager);
646 let loop_config = AgentLoopConfig {
647 max_iterations: req.max_iterations.unwrap_or(10),
648 enable_tools: true,
649 require_approval: false,
650 prompt_injection_protection: state.config.security.prompt_injection_protection,
651 token_lifetime_secs: state.config.security.token_lifetime_secs,
652 no_final_required: true,
653 fallback_chain: None,
654 token_budget: None,
655 ravenfabric: None,
656 checkpoint_dir: None,
657 session_id: None,
658 metrics_callback: Some(Box::new(move |tokens, tool_calls| {
659 if tokens > 0 {
660 metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
661 }
662 if tool_calls > 0 {
663 metrics
664 .tool_calls_total
665 .fetch_add(tool_calls, Ordering::Relaxed);
666 }
667 })),
668 load_manager: Some(load_manager),
669 retry_config: None,
670 };
671
672 let tool_registry = state.tool_registry.clone();
673
674 let response = agent::run_agent_loop_with_registry(
675 llm.clone(),
676 &user_message,
677 &system_prompt,
678 loop_config,
679 tool_registry,
680 )
681 .await?;
682
683 state.metrics.record_llm_request();
684
685 let result = serde_json::json!({
686 "response": response,
687 "model": llm.model(),
688 "provider": llm.provider_name(),
689 });
690
691 Ok(serde_json::to_vec(&result)?)
692}
693
694async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
696 let bg_manager = state
697 .bg_manager
698 .as_ref()
699 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
700
701 #[derive(serde::Deserialize)]
702 struct ExecuteRequest {
703 prompt: String,
704 #[serde(default)]
705 system_prompt: Option<String>,
706 }
707
708 let req: ExecuteRequest =
709 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
710
711 if req.prompt.trim().is_empty() {
712 return Err(anyhow::anyhow!("Prompt cannot be empty"));
713 }
714
715 let system_prompt = req
716 .system_prompt
717 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
718
719 let task_id = bg_manager
720 .submit(req.prompt, system_prompt)
721 .await
722 .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
723
724 if let Some(ref llm) = state.llm {
726 let bg = bg_manager.clone();
727 let tid = task_id.clone();
728 let llm_clone = llm.clone();
729 tokio::spawn(async move {
730 if let Err(e) = bg.execute(&tid, llm_clone).await {
731 warn!(task_id = %tid, error = %e, "Background task execution failed");
732 }
733 });
734 }
735
736 let result = serde_json::json!({
737 "task_id": task_id,
738 "status": "pending",
739 });
740
741 Ok(serde_json::to_vec(&result)?)
742}
743
744async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
746 let bg_manager = state
747 .bg_manager
748 .as_ref()
749 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
750
751 let task = bg_manager
752 .get_task(task_id)
753 .await
754 .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
755
756 let result = serde_json::json!({
757 "task_id": task.id,
758 "status": task.status.to_string(),
759 "result": task.result,
760 "error": task.error,
761 "created_at": task.created_at,
762 "updated_at": task.updated_at,
763 "iterations": task.iterations,
764 "provider": task.provider,
765 "model": task.model,
766 });
767
768 Ok(serde_json::to_vec(&result)?)
769}
770
771fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
773 let registry = state
774 .tool_registry
775 .as_ref()
776 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
777
778 let tools: Vec<serde_json::Value> = registry
779 .definitions()
780 .iter()
781 .map(|def| {
782 serde_json::json!({
783 "name": def.name,
784 "description": def.description,
785 "parameters": def.parameters,
786 "category": def.category,
787 "requires_approval": def.requires_approval,
788 })
789 })
790 .collect();
791
792 let result = serde_json::json!({
793 "tools": tools,
794 "count": tools.len(),
795 });
796
797 Ok(serde_json::to_vec(&result)?)
798}
799
800fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
802 let registry = state
803 .tool_registry
804 .as_ref()
805 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
806
807 let definitions = registry.definitions();
808 let def = definitions
809 .iter()
810 .find(|d| d.name == tool_name)
811 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
812
813 let result = serde_json::json!({
814 "name": def.name,
815 "description": def.description,
816 "parameters": def.parameters,
817 "category": def.category,
818 "requires_approval": def.requires_approval,
819 });
820
821 Ok(serde_json::to_vec(&result)?)
822}
823
824async fn handle_execute_tool(
826 state: &ServerState,
827 tool_name: &str,
828 body: &[u8],
829) -> anyhow::Result<Vec<u8>> {
830 let registry = state
831 .tool_registry
832 .as_ref()
833 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
834
835 let args: serde_json::Value = if body.is_empty() {
836 serde_json::Value::Object(serde_json::Map::new())
837 } else {
838 serde_json::from_slice(body)
839 .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
840 };
841
842 let call = ToolCall {
843 name: tool_name.to_string(),
844 arguments: args,
845 id: None,
846 };
847
848 let result = registry.execute(call).await?;
849 state.metrics.record_tool_call();
850
851 let response = serde_json::json!({
852 "tool": result.tool_name,
853 "success": result.success,
854 "output": result.output,
855 "error": result.error,
856 "exit_code": result.exit_code,
857 "duration_ms": result.duration_ms,
858 });
859
860 Ok(serde_json::to_vec(&response)?)
861}
862
863async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
865 let llm = state
866 .llm
867 .as_ref()
868 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
869
870 let messages = vec![ChatMessage::new("user", "Respond with exactly: OK")];
872
873 let response = llm
874 .chat(messages)
875 .await
876 .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
877
878 let content = response
879 .choices
880 .first()
881 .map(|c| c.message.content.clone())
882 .unwrap_or_default();
883
884 let result = serde_json::json!({
885 "status": "ok",
886 "provider": llm.provider_name(),
887 "model": llm.model(),
888 "response": content,
889 "uptime_seconds": state.metrics.uptime_secs(),
890 });
891
892 Ok(serde_json::to_vec(&result)?)
893}
894
895async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
903 info!("Reloading configuration via POST /reload...");
904 let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
905 match Config::load(config_path.as_deref()) {
906 Ok(_new_config) => {
907 info!("Configuration reloaded successfully");
912 let result = serde_json::json!({
913 "status": "ok",
914 "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
915 });
916 Ok(serde_json::to_vec(&result)?)
917 }
918 Err(e) => {
919 warn!(error = %e, "Failed to reload configuration via POST /reload");
920 Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
921 }
922 }
923}
924
925#[instrument(skip_all, fields(bind_addr))]
933pub async fn run_server(config: Config) -> anyhow::Result<()> {
934 let host = config
935 .runtime
936 .host
937 .clone()
938 .unwrap_or_else(|| "0.0.0.0".to_string());
939 let port = config.runtime.port;
940 let bind_addr = format!("{}:{}", host, port);
941
942 let mut state = ServerState::new(config);
943
944 info!("Initializing LLM client for server mode");
946 let llm = llm::create_client(&state.config.llm)?;
947 state.llm = Some(llm);
948 info!(
949 provider = state
950 .llm
951 .as_ref()
952 .map(|l| l.provider_name())
953 .unwrap_or("unknown"),
954 model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
955 "LLM client initialized for server mode"
956 );
957
958 info!("Initializing tool registry");
960 let registry = ToolRegistry::with_config(&state.config);
961 state.tool_registry = Some(registry);
962 info!(
963 tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
964 "Tool registry initialized"
965 );
966
967 info!("Initializing background task manager");
969 let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
970 state.bg_manager = Some(bg_manager);
971 info!("Background task manager initialized");
972
973 if !state.config.mcp.servers.is_empty() {
975 info!(
976 server_count = state.config.mcp.servers.len(),
977 "Initializing MCP clients from config"
978 );
979 let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
980 let registered = if !mcp_manager.is_empty() {
981 if let Some(ref mut registry) = state.tool_registry {
982 mcp_manager.register_all_tools(registry).await
983 } else {
984 0
985 }
986 } else {
987 0
988 };
989 info!(
990 connected = mcp_manager.len(),
991 tools_registered = registered,
992 "MCP client initialization complete"
993 );
994 state.mcp_manager = Some(mcp_manager);
995 } else {
996 info!("No MCP servers configured, skipping MCP client initialization");
997 }
998
999 let state = Arc::new(state);
1000 let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
1001 error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
1002 anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
1003 })?;
1004
1005 info!(
1006 address = %bind_addr,
1007 "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
1008 );
1009
1010 state.mark_ready();
1012
1013 loop {
1015 tokio::select! {
1016 accept_result = listener.accept() => {
1017 match accept_result {
1018 Ok((stream, peer)) => {
1019 debug!(peer = %peer, "Accepted connection");
1020 let state = Arc::clone(&state);
1021 tokio::spawn(async move {
1022 handle_connection(stream, state).await;
1023 });
1024 }
1025 Err(e) => {
1026 warn!(error = %e, "Failed to accept connection");
1027 state.metrics.record_error();
1028 }
1029 }
1030 }
1031 _ = wait_for_shutdown() => {
1032 info!("Shutting down HTTP server gracefully...");
1033 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1035 info!("HTTP server shutdown complete");
1036 break;
1037 }
1038 _ = wait_for_sighup() => {
1039 info!("Reloading configuration from SIGHUP...");
1040 let config_path = std::env::var("RAVENCLAWS_CONFIG")
1042 .ok();
1043 match Config::load(config_path.as_deref()) {
1044 Ok(_new_config) => {
1045 info!("Configuration reloaded successfully");
1048 }
1052 Err(e) => {
1053 warn!(error = %e, "Failed to reload configuration on SIGHUP");
1054 }
1055 }
1056 }
1057 }
1058 }
1059
1060 Ok(())
1061}
1062
1063#[cfg(test)]
1066mod tests {
1067 use super::*;
1068 use crate::config::RuntimeConfig;
1069
1070 fn test_config() -> Config {
1071 Config {
1072 runtime: RuntimeConfig {
1073 host: Some("127.0.0.1".to_string()),
1074 port: 0, ..RuntimeConfig::default()
1076 },
1077 ..Config::default()
1078 }
1079 }
1080
1081 #[tokio::test]
1082 async fn test_health_response() {
1083 let body = health_response();
1084 assert_eq!(body, b"OK");
1085 }
1086
1087 #[tokio::test]
1088 async fn test_ready_response_when_ready() {
1089 let config = test_config();
1090 let state = ServerState::new(config);
1091 state.mark_ready();
1092 let (body, status) = ready_response(&state).await;
1093 assert_eq!(body, b"READY");
1094 assert_eq!(status, "200 OK");
1095 }
1096
1097 #[tokio::test]
1098 async fn test_ready_response_when_not_ready() {
1099 let config = test_config();
1100 let state = ServerState::new(config);
1101 let (body, status) = ready_response(&state).await;
1102 assert_eq!(body, b"NOT READY");
1103 assert_eq!(status, "503 Service Unavailable");
1104 }
1105
1106 #[tokio::test]
1107 async fn test_metrics_response_format() {
1108 let config = test_config();
1109 let state = ServerState::new(config);
1110 let body = metrics_response(&state);
1111 let output = String::from_utf8_lossy(&body);
1112
1113 assert!(output.contains("ravenclaws_requests_total"));
1115 assert!(output.contains("ravenclaws_llm_requests_total"));
1116 assert!(output.contains("ravenclaws_tool_calls_total"));
1117 assert!(output.contains("ravenclaws_errors_total"));
1118 assert!(output.contains("ravenclaws_tokens_total"));
1119 assert!(output.contains("ravenclaws_uptime_seconds"));
1120 assert!(output.contains("ravenclaws_start_time_seconds"));
1121 assert!(output.contains("# HELP"));
1122 assert!(output.contains("# TYPE"));
1123 }
1124
1125 #[tokio::test]
1126 async fn test_metrics_counters_increment() {
1127 let config = test_config();
1128 let state = ServerState::new(config);
1129
1130 state.metrics.record_request();
1131 state.metrics.record_request();
1132 state.metrics.record_error();
1133 state.metrics.record_tokens(150);
1134
1135 assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1136 assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1137 assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1138 }
1139
1140 #[tokio::test]
1141 async fn test_uptime_increases() {
1142 let config = test_config();
1143 let state = ServerState::new(config);
1144 let uptime1 = state.metrics.uptime_secs();
1145 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1146 let uptime2 = state.metrics.uptime_secs();
1147 assert!(uptime2 >= uptime1);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_server_binds_and_responds() {
1152 let mut config = test_config();
1153 config.runtime.port = 0; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1157 let addr = listener.local_addr().unwrap();
1158 let state = Arc::new(ServerState::new(config));
1159 state.mark_ready();
1160
1161 let state_clone = Arc::clone(&state);
1163 let handle = tokio::spawn(async move {
1164 let (stream, _) = listener.accept().await.unwrap();
1165 handle_connection(stream, state_clone).await;
1166 });
1167
1168 let response = reqwest::Client::new()
1170 .get(format!("http://{}/health", addr))
1171 .send()
1172 .await;
1173
1174 handle.await.unwrap();
1175
1176 if let Ok(resp) = response {
1177 assert_eq!(resp.status(), 200);
1178 let body = resp.text().await.unwrap();
1179 assert_eq!(body, "OK");
1180 }
1181 }
1183
1184 #[tokio::test]
1185 async fn test_server_404() {
1186 let mut config = test_config();
1187 config.runtime.port = 0;
1188
1189 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1190 let addr = listener.local_addr().unwrap();
1191 let state = Arc::new(ServerState::new(config));
1192 state.mark_ready();
1193
1194 let state_clone = Arc::clone(&state);
1195 let handle = tokio::spawn(async move {
1196 let (stream, _) = listener.accept().await.unwrap();
1197 handle_connection(stream, state_clone).await;
1198 });
1199
1200 let response = reqwest::Client::new()
1201 .get(format!("http://{}/unknown", addr))
1202 .send()
1203 .await;
1204
1205 handle.await.unwrap();
1206
1207 if let Ok(resp) = response {
1208 assert_eq!(resp.status(), 404);
1209 }
1210 }
1211
1212 #[tokio::test]
1213 async fn test_server_metrics_endpoint() {
1214 let mut config = test_config();
1215 config.runtime.port = 0;
1216
1217 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1218 let addr = listener.local_addr().unwrap();
1219 let state = Arc::new(ServerState::new(config));
1220 state.mark_ready();
1221
1222 let state_clone = Arc::clone(&state);
1223 let handle = tokio::spawn(async move {
1224 let (stream, _) = listener.accept().await.unwrap();
1225 handle_connection(stream, state_clone).await;
1226 });
1227
1228 let response = reqwest::Client::new()
1229 .get(format!("http://{}/metrics", addr))
1230 .send()
1231 .await;
1232
1233 handle.await.unwrap();
1234
1235 if let Ok(resp) = response {
1236 assert_eq!(resp.status(), 200);
1237 let body = resp.text().await.unwrap();
1238 assert!(body.contains("ravenclaws_requests_total"));
1239 }
1240 }
1241}