1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::load::{Admission, LoadManager};
50use crate::tools::{ToolCall, ToolRegistry};
51
52#[derive(Debug, Default)]
56pub struct ServerMetrics {
57 pub requests_total: AtomicU64,
59 pub llm_requests_total: AtomicU64,
61 pub tool_calls_total: AtomicU64,
63 pub errors_total: AtomicU64,
65 pub tokens_total: AtomicU64,
67 pub start_time: AtomicU64,
69 pub ready_cache_time: AtomicU64,
71 pub ready_cache_result: AtomicU64,
73}
74
75impl Clone for ServerMetrics {
77 fn clone(&self) -> Self {
78 Self {
79 requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
80 llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
81 tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
82 errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
83 tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
84 start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
85 ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
86 ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
87 }
88 }
89}
90
91impl ServerMetrics {
92 fn new() -> Self {
93 let metrics = Self::default();
94 metrics.start_time.store(
95 std::time::SystemTime::now()
96 .duration_since(std::time::UNIX_EPOCH)
97 .unwrap_or_default()
98 .as_secs(),
99 Ordering::Relaxed,
100 );
101 metrics
102 }
103
104 fn record_request(&self) {
105 self.requests_total.fetch_add(1, Ordering::Relaxed);
106 }
107
108 fn record_llm_request(&self) {
109 self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
110 }
111
112 fn record_tool_call(&self) {
113 self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
114 }
115
116 fn record_error(&self) {
117 self.errors_total.fetch_add(1, Ordering::Relaxed);
118 }
119
120 #[cfg_attr(not(test), allow(dead_code))]
121 fn record_tokens(&self, count: u64) {
122 self.tokens_total.fetch_add(count, Ordering::Relaxed);
123 }
124
125 fn uptime_secs(&self) -> u64 {
126 let now = std::time::SystemTime::now()
127 .duration_since(std::time::UNIX_EPOCH)
128 .unwrap_or_default()
129 .as_secs();
130 let start = self.start_time.load(Ordering::Relaxed);
131 now.saturating_sub(start)
132 }
133}
134
135pub struct ServerState {
139 pub ready: AtomicBool,
141 pub metrics: ServerMetrics,
143 pub config: Config,
145 #[allow(dead_code)]
147 pub start_time: Instant,
148 pub llm: Option<Arc<dyn LLMProviderTrait>>,
150 pub tool_registry: Option<ToolRegistry>,
152 pub bg_manager: Option<BackgroundTaskManager>,
154 pub mcp_manager: Option<crate::mcp::McpClientManager>,
156 pub load_manager: Arc<LoadManager>,
158}
159
160impl ServerState {
161 fn new(config: Config) -> Self {
162 let load_manager = Arc::new(LoadManager::new(config.load.clone()));
163 Self {
164 ready: AtomicBool::new(false),
165 metrics: ServerMetrics::new(),
166 config,
167 start_time: Instant::now(),
168 llm: None,
169 tool_registry: None,
170 bg_manager: None,
171 mcp_manager: None,
172 load_manager,
173 }
174 }
175
176 fn mark_ready(&self) {
178 self.ready.store(true, Ordering::Relaxed);
179 info!("Server marked as ready");
180 }
181}
182
183fn health_response() -> Vec<u8> {
186 b"OK".to_vec()
187}
188
189async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
190 if !state.ready.load(Ordering::Relaxed) {
191 return (b"NOT READY".to_vec(), "503 Service Unavailable");
192 }
193
194 if let Some(ref llm) = state.llm {
196 let now = std::time::SystemTime::now()
198 .duration_since(std::time::UNIX_EPOCH)
199 .unwrap_or_default()
200 .as_secs();
201 let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
202 let cache_ttl: u64 = 30;
203
204 if now.saturating_sub(cache_time) < cache_ttl {
205 if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
207 return (b"READY".to_vec(), "200 OK");
208 } else {
209 return (
210 b"NOT READY: LLM unreachable".to_vec(),
211 "503 Service Unavailable",
212 );
213 }
214 }
215
216 let result = match tokio::time::timeout(
218 std::time::Duration::from_secs(5),
219 llm.chat(vec![ChatMessage::new(
220 "user",
221 "Respond with exactly one word: ready",
222 )]),
223 )
224 .await
225 {
226 Ok(Ok(_)) => {
227 state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
229 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
230 (b"READY".to_vec(), "200 OK")
231 }
232 Ok(Err(e)) => {
233 warn!(error = %e, "Readiness check: LLM connectivity failed");
234 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
235 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
236 (
237 b"NOT READY: LLM unreachable".to_vec(),
238 "503 Service Unavailable",
239 )
240 }
241 Err(_) => {
242 warn!("Readiness check: LLM connectivity timed out");
243 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
244 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
245 (
246 b"NOT READY: LLM timeout".to_vec(),
247 "503 Service Unavailable",
248 )
249 }
250 };
251 result
252 } else {
253 (b"READY".to_vec(), "200 OK")
254 }
255}
256
257fn metrics_response(state: &ServerState) -> Vec<u8> {
258 let metrics = &state.metrics;
259 let base = format!(
260 "# HELP ravenclaws_requests_total Total HTTP requests served\n\
261 # TYPE ravenclaws_requests_total counter\n\
262 ravenclaws_requests_total {}\n\
263 \n\
264 # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
265 # TYPE ravenclaws_llm_requests_total counter\n\
266 ravenclaws_llm_requests_total {}\n\
267 \n\
268 # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
269 # TYPE ravenclaws_tool_calls_total counter\n\
270 ravenclaws_tool_calls_total {}\n\
271 \n\
272 # HELP ravenclaws_errors_total Total errors encountered\n\
273 # TYPE ravenclaws_errors_total counter\n\
274 ravenclaws_errors_total {}\n\
275 \n\
276 # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
277 # TYPE ravenclaws_tokens_total counter\n\
278 ravenclaws_tokens_total {}\n\
279 \n\
280 # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
281 # TYPE ravenclaws_uptime_seconds gauge\n\
282 ravenclaws_uptime_seconds {}\n\
283 \n\
284 # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
285 # TYPE ravenclaws_start_time_seconds gauge\n\
286 ravenclaws_start_time_seconds {}\n",
287 metrics.requests_total.load(Ordering::Relaxed),
288 metrics.llm_requests_total.load(Ordering::Relaxed),
289 metrics.tool_calls_total.load(Ordering::Relaxed),
290 metrics.errors_total.load(Ordering::Relaxed),
291 metrics.tokens_total.load(Ordering::Relaxed),
292 metrics.uptime_secs(),
293 metrics.start_time.load(Ordering::Relaxed),
294 );
295 let load_metrics = state.load_manager.metrics().to_prometheus_text();
296 format!("{}\n{}", base, load_metrics).into_bytes()
297}
298
299fn parse_content_length(headers: &[String]) -> usize {
303 for header in headers {
304 if let Some(value) = header
305 .strip_prefix("content-length:")
306 .or_else(|| header.strip_prefix("Content-Length:"))
307 {
308 return value.trim().parse().unwrap_or(0);
309 }
310 }
311 0
312}
313
314async fn read_body(
316 reader: &mut BufReader<&mut tokio::net::TcpStream>,
317 content_length: usize,
318) -> Vec<u8> {
319 if content_length == 0 {
320 return Vec::new();
321 }
322 let mut body = vec![0u8; content_length];
323 if let Err(e) = reader.read_exact(&mut body).await {
324 warn!(error = %e, "Failed to read request body");
325 return Vec::new();
326 }
327 body
328}
329
330#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
332async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
333 let peer = stream.peer_addr().ok();
334 let mut reader = BufReader::new(&mut stream);
335 let mut request_line = String::new();
336
337 if reader.read_line(&mut request_line).await.is_err() {
339 return;
340 }
341
342 let request_line = request_line.trim();
343 if request_line.is_empty() {
344 return;
345 }
346
347 state.metrics.record_request();
348
349 let admission = state.load_manager.check_admission();
351 if !admission.is_allowed() {
352 let (status, body): (&str, Vec<u8>) = match admission {
353 Admission::RateLimited => ("429 Too Many Requests", b"Rate limited\n".to_vec()),
354 Admission::ConcurrencyLimited => (
355 "503 Service Unavailable",
356 b"Too many concurrent requests\n".to_vec(),
357 ),
358 Admission::LoadShed => (
359 "503 Service Unavailable",
360 b"Server is overloaded\n".to_vec(),
361 ),
362 _ => unreachable!(),
363 };
364 state.metrics.errors_total.fetch_add(1, Ordering::Relaxed);
365 let response = format!(
366 "HTTP/1.1 {}\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nRetry-After: 1\r\nConnection: close\r\n\r\n{}",
367 status,
368 body.len(),
369 std::str::from_utf8(&body).unwrap_or(""),
370 );
371 let _ = stream.write_all(response.as_bytes()).await;
372 return;
373 }
374
375 let parts: Vec<&str> = request_line.split_whitespace().collect();
377 let method = parts.first().unwrap_or(&"GET");
378 let path = parts.get(1).unwrap_or(&"/");
379
380 let mut headers: Vec<String> = Vec::new();
382 let mut header_line = String::new();
383 loop {
384 header_line.clear();
385 if reader.read_line(&mut header_line).await.is_err() {
386 return;
387 }
388 let trimmed = header_line.trim();
389 if trimmed.is_empty() {
390 break;
391 }
392 headers.push(trimmed.to_lowercase());
393 }
394
395 let content_length = parse_content_length(&headers);
397 let body = read_body(&mut reader, content_length).await;
398
399 let (response_body, status_line, content_type) = match (*method, *path) {
401 ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
402 ("GET", "/ready") => {
403 let (body, status) = ready_response(&state).await;
404 (body, status, "text/plain")
405 }
406 ("GET", "/metrics") => (
407 metrics_response(&state),
408 "200 OK",
409 "text/plain; charset=utf-8",
410 ),
411 ("GET", "/health/deep") => match handle_health_deep(&state).await {
412 Ok(body) => (body, "200 OK", "application/json"),
413 Err(e) => {
414 state.metrics.record_error();
415 (
416 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
417 "503 Service Unavailable",
418 "application/json",
419 )
420 }
421 },
422 ("POST", "/chat") => match handle_chat(&state, &body).await {
423 Ok(body) => (body, "200 OK", "application/json"),
424 Err(e) => {
425 state.metrics.record_error();
426 (
427 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
428 "400 Bad Request",
429 "application/json",
430 )
431 }
432 },
433 ("POST", "/reload") => match handle_reload(&state).await {
434 Ok(body) => (body, "200 OK", "application/json"),
435 Err(e) => {
436 state.metrics.record_error();
437 (
438 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
439 "500 Internal Server Error",
440 "application/json",
441 )
442 }
443 },
444 ("POST", "/execute") => match handle_execute(&state, &body).await {
445 Ok(body) => (body, "200 OK", "application/json"),
446 Err(e) => {
447 state.metrics.record_error();
448 (
449 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
450 "400 Bad Request",
451 "application/json",
452 )
453 }
454 },
455 ("GET", path) if path.starts_with("/tasks/") => {
456 let task_id = path.strip_prefix("/tasks/").unwrap_or("");
457 match handle_task_status(&state, task_id).await {
458 Ok(body) => (body, "200 OK", "application/json"),
459 Err(e) => {
460 state.metrics.record_error();
461 (
462 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
463 "404 Not Found",
464 "application/json",
465 )
466 }
467 }
468 }
469 ("GET", "/tools") => match handle_list_tools(&state) {
470 Ok(body) => (body, "200 OK", "application/json"),
471 Err(e) => {
472 state.metrics.record_error();
473 (
474 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
475 "500 Internal Server Error",
476 "application/json",
477 )
478 }
479 },
480 ("GET", path) if path.starts_with("/tools/") => {
481 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
482 match handle_get_tool(&state, tool_name) {
483 Ok(body) => (body, "200 OK", "application/json"),
484 Err(e) => {
485 state.metrics.record_error();
486 (
487 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
488 "404 Not Found",
489 "application/json",
490 )
491 }
492 }
493 }
494 ("POST", path) if path.starts_with("/tools/") => {
495 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
496 match handle_execute_tool(&state, tool_name, &body).await {
497 Ok(body) => (body, "200 OK", "application/json"),
498 Err(e) => {
499 state.metrics.record_error();
500 let status = if e.to_string().contains("not found")
501 || e.to_string().contains("No tool")
502 {
503 "404 Not Found"
504 } else {
505 "400 Bad Request"
506 };
507 (
508 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
509 status,
510 "application/json",
511 )
512 }
513 }
514 }
515 _ => {
516 state.metrics.record_error();
517 (b"Not Found".to_vec(), "404 Not Found", "text/plain")
518 }
519 };
520
521 let response = format!(
523 "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
524 status_line,
525 content_type,
526 response_body.len(),
527 );
528
529 if let Err(e) = stream.write_all(response.as_bytes()).await {
530 warn!(error = %e, "Failed to write response headers");
531 return;
532 }
533 if let Err(e) = stream.write_all(&response_body).await {
534 warn!(error = %e, "Failed to write response body");
535 return;
536 }
537 if let Err(e) = stream.flush().await {
538 warn!(error = %e, "Failed to flush response");
539 return;
540 }
541
542 debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
543}
544
545async fn wait_for_shutdown() {
549 let ctrl_c = async {
550 signal::ctrl_c()
551 .await
552 .expect("Failed to install Ctrl+C handler");
553 };
554
555 #[cfg(unix)]
556 let terminate = async {
557 signal::unix::signal(signal::unix::SignalKind::terminate())
558 .expect("Failed to install SIGTERM handler")
559 .recv()
560 .await;
561 };
562
563 #[cfg(not(unix))]
564 let terminate = std::future::pending::<()>();
565
566 tokio::select! {
567 _ = ctrl_c => {
568 info!("Received Ctrl+C, shutting down gracefully...");
569 }
570 _ = terminate => {
571 info!("Received SIGTERM, shutting down gracefully...");
572 }
573 }
574}
575
576#[cfg(unix)]
580async fn wait_for_sighup() -> bool {
581 use tokio::signal::unix::SignalKind;
582 let mut stream = match signal::unix::signal(SignalKind::hangup()) {
583 Ok(s) => s,
584 Err(e) => {
585 warn!(error = %e, "Failed to install SIGHUP handler");
586 return false;
587 }
588 };
589 stream.recv().await;
590 info!("Received SIGHUP — reloading configuration");
591 true
592}
593
594#[cfg(not(unix))]
595async fn wait_for_sighup() -> bool {
596 std::future::pending::<()>().await;
598 false
599}
600
601async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
605 let llm = state
606 .llm
607 .as_ref()
608 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
609
610 #[derive(serde::Deserialize)]
611 struct ChatRequest {
612 messages: Vec<ChatMessage>,
613 #[serde(default)]
614 #[allow(dead_code)]
615 stream: bool,
616 #[serde(default)]
617 max_iterations: Option<usize>,
618 }
619
620 let req: ChatRequest =
621 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
622
623 if req.messages.is_empty() {
624 return Err(anyhow::anyhow!("No messages provided"));
625 }
626
627 let system_prompt = req
629 .messages
630 .iter()
631 .find(|m| m.role == "system")
632 .map(|m| m.content.clone())
633 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
634
635 let user_message = req
637 .messages
638 .iter()
639 .rev()
640 .find(|m| m.role == "user")
641 .map(|m| m.content.clone())
642 .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
643
644 let metrics = state.metrics.clone();
645 let load_manager = Arc::clone(&state.load_manager);
646 let loop_config = AgentLoopConfig {
647 max_iterations: req.max_iterations.unwrap_or(10),
648 enable_tools: true,
649 require_approval: false,
650 prompt_injection_protection: state.config.security.prompt_injection_protection,
651 token_lifetime_secs: state.config.security.token_lifetime_secs,
652 no_final_required: true,
653 fallback_chain: None,
654 token_budget: None,
655 ravenfabric: None,
656 checkpoint_dir: None,
657 session_id: None,
658 metrics_callback: Some(Box::new(move |tokens, tool_calls| {
659 if tokens > 0 {
660 metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
661 }
662 if tool_calls > 0 {
663 metrics
664 .tool_calls_total
665 .fetch_add(tool_calls, Ordering::Relaxed);
666 }
667 })),
668 load_manager: Some(load_manager),
669 };
670
671 let tool_registry = state.tool_registry.clone();
672
673 let response = agent::run_agent_loop_with_registry(
674 llm.clone(),
675 &user_message,
676 &system_prompt,
677 loop_config,
678 tool_registry,
679 )
680 .await?;
681
682 state.metrics.record_llm_request();
683
684 let result = serde_json::json!({
685 "response": response,
686 "model": llm.model(),
687 "provider": llm.provider_name(),
688 });
689
690 Ok(serde_json::to_vec(&result)?)
691}
692
693async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
695 let bg_manager = state
696 .bg_manager
697 .as_ref()
698 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
699
700 #[derive(serde::Deserialize)]
701 struct ExecuteRequest {
702 prompt: String,
703 #[serde(default)]
704 system_prompt: Option<String>,
705 }
706
707 let req: ExecuteRequest =
708 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
709
710 if req.prompt.trim().is_empty() {
711 return Err(anyhow::anyhow!("Prompt cannot be empty"));
712 }
713
714 let system_prompt = req
715 .system_prompt
716 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
717
718 let task_id = bg_manager
719 .submit(req.prompt, system_prompt)
720 .await
721 .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
722
723 if let Some(ref llm) = state.llm {
725 let bg = bg_manager.clone();
726 let tid = task_id.clone();
727 let llm_clone = llm.clone();
728 tokio::spawn(async move {
729 if let Err(e) = bg.execute(&tid, llm_clone).await {
730 warn!(task_id = %tid, error = %e, "Background task execution failed");
731 }
732 });
733 }
734
735 let result = serde_json::json!({
736 "task_id": task_id,
737 "status": "pending",
738 });
739
740 Ok(serde_json::to_vec(&result)?)
741}
742
743async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
745 let bg_manager = state
746 .bg_manager
747 .as_ref()
748 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
749
750 let task = bg_manager
751 .get_task(task_id)
752 .await
753 .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
754
755 let result = serde_json::json!({
756 "task_id": task.id,
757 "status": task.status.to_string(),
758 "result": task.result,
759 "error": task.error,
760 "created_at": task.created_at,
761 "updated_at": task.updated_at,
762 "iterations": task.iterations,
763 "provider": task.provider,
764 "model": task.model,
765 });
766
767 Ok(serde_json::to_vec(&result)?)
768}
769
770fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
772 let registry = state
773 .tool_registry
774 .as_ref()
775 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
776
777 let tools: Vec<serde_json::Value> = registry
778 .definitions()
779 .iter()
780 .map(|def| {
781 serde_json::json!({
782 "name": def.name,
783 "description": def.description,
784 "parameters": def.parameters,
785 "category": def.category,
786 "requires_approval": def.requires_approval,
787 })
788 })
789 .collect();
790
791 let result = serde_json::json!({
792 "tools": tools,
793 "count": tools.len(),
794 });
795
796 Ok(serde_json::to_vec(&result)?)
797}
798
799fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
801 let registry = state
802 .tool_registry
803 .as_ref()
804 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
805
806 let definitions = registry.definitions();
807 let def = definitions
808 .iter()
809 .find(|d| d.name == tool_name)
810 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
811
812 let result = serde_json::json!({
813 "name": def.name,
814 "description": def.description,
815 "parameters": def.parameters,
816 "category": def.category,
817 "requires_approval": def.requires_approval,
818 });
819
820 Ok(serde_json::to_vec(&result)?)
821}
822
823async fn handle_execute_tool(
825 state: &ServerState,
826 tool_name: &str,
827 body: &[u8],
828) -> anyhow::Result<Vec<u8>> {
829 let registry = state
830 .tool_registry
831 .as_ref()
832 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
833
834 let args: serde_json::Value = if body.is_empty() {
835 serde_json::Value::Object(serde_json::Map::new())
836 } else {
837 serde_json::from_slice(body)
838 .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
839 };
840
841 let call = ToolCall {
842 name: tool_name.to_string(),
843 arguments: args,
844 id: None,
845 };
846
847 let result = registry.execute(call).await?;
848 state.metrics.record_tool_call();
849
850 let response = serde_json::json!({
851 "tool": result.tool_name,
852 "success": result.success,
853 "output": result.output,
854 "error": result.error,
855 "exit_code": result.exit_code,
856 "duration_ms": result.duration_ms,
857 });
858
859 Ok(serde_json::to_vec(&response)?)
860}
861
862async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
864 let llm = state
865 .llm
866 .as_ref()
867 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
868
869 let messages = vec![ChatMessage::new("user", "Respond with exactly: OK")];
871
872 let response = llm
873 .chat(messages)
874 .await
875 .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
876
877 let content = response
878 .choices
879 .first()
880 .map(|c| c.message.content.clone())
881 .unwrap_or_default();
882
883 let result = serde_json::json!({
884 "status": "ok",
885 "provider": llm.provider_name(),
886 "model": llm.model(),
887 "response": content,
888 "uptime_seconds": state.metrics.uptime_secs(),
889 });
890
891 Ok(serde_json::to_vec(&result)?)
892}
893
894async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
902 info!("Reloading configuration via POST /reload...");
903 let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
904 match Config::load(config_path.as_deref()) {
905 Ok(_new_config) => {
906 info!("Configuration reloaded successfully");
911 let result = serde_json::json!({
912 "status": "ok",
913 "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
914 });
915 Ok(serde_json::to_vec(&result)?)
916 }
917 Err(e) => {
918 warn!(error = %e, "Failed to reload configuration via POST /reload");
919 Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
920 }
921 }
922}
923
924#[instrument(skip_all, fields(bind_addr))]
932pub async fn run_server(config: Config) -> anyhow::Result<()> {
933 let host = config
934 .runtime
935 .host
936 .clone()
937 .unwrap_or_else(|| "0.0.0.0".to_string());
938 let port = config.runtime.port;
939 let bind_addr = format!("{}:{}", host, port);
940
941 let mut state = ServerState::new(config);
942
943 info!("Initializing LLM client for server mode");
945 let llm = llm::create_client(&state.config.llm)?;
946 state.llm = Some(llm);
947 info!(
948 provider = state
949 .llm
950 .as_ref()
951 .map(|l| l.provider_name())
952 .unwrap_or("unknown"),
953 model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
954 "LLM client initialized for server mode"
955 );
956
957 info!("Initializing tool registry");
959 let registry = ToolRegistry::with_config(&state.config);
960 state.tool_registry = Some(registry);
961 info!(
962 tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
963 "Tool registry initialized"
964 );
965
966 info!("Initializing background task manager");
968 let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
969 state.bg_manager = Some(bg_manager);
970 info!("Background task manager initialized");
971
972 if !state.config.mcp.servers.is_empty() {
974 info!(
975 server_count = state.config.mcp.servers.len(),
976 "Initializing MCP clients from config"
977 );
978 let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
979 let registered = if !mcp_manager.is_empty() {
980 if let Some(ref mut registry) = state.tool_registry {
981 mcp_manager.register_all_tools(registry).await
982 } else {
983 0
984 }
985 } else {
986 0
987 };
988 info!(
989 connected = mcp_manager.len(),
990 tools_registered = registered,
991 "MCP client initialization complete"
992 );
993 state.mcp_manager = Some(mcp_manager);
994 } else {
995 info!("No MCP servers configured, skipping MCP client initialization");
996 }
997
998 let state = Arc::new(state);
999 let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
1000 error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
1001 anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
1002 })?;
1003
1004 info!(
1005 address = %bind_addr,
1006 "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
1007 );
1008
1009 state.mark_ready();
1011
1012 loop {
1014 tokio::select! {
1015 accept_result = listener.accept() => {
1016 match accept_result {
1017 Ok((stream, peer)) => {
1018 debug!(peer = %peer, "Accepted connection");
1019 let state = Arc::clone(&state);
1020 tokio::spawn(async move {
1021 handle_connection(stream, state).await;
1022 });
1023 }
1024 Err(e) => {
1025 warn!(error = %e, "Failed to accept connection");
1026 state.metrics.record_error();
1027 }
1028 }
1029 }
1030 _ = wait_for_shutdown() => {
1031 info!("Shutting down HTTP server gracefully...");
1032 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1034 info!("HTTP server shutdown complete");
1035 break;
1036 }
1037 _ = wait_for_sighup() => {
1038 info!("Reloading configuration from SIGHUP...");
1039 let config_path = std::env::var("RAVENCLAWS_CONFIG")
1041 .ok();
1042 match Config::load(config_path.as_deref()) {
1043 Ok(_new_config) => {
1044 info!("Configuration reloaded successfully");
1047 }
1051 Err(e) => {
1052 warn!(error = %e, "Failed to reload configuration on SIGHUP");
1053 }
1054 }
1055 }
1056 }
1057 }
1058
1059 Ok(())
1060}
1061
1062#[cfg(test)]
1065mod tests {
1066 use super::*;
1067 use crate::config::RuntimeConfig;
1068
1069 fn test_config() -> Config {
1070 Config {
1071 runtime: RuntimeConfig {
1072 host: Some("127.0.0.1".to_string()),
1073 port: 0, ..RuntimeConfig::default()
1075 },
1076 ..Config::default()
1077 }
1078 }
1079
1080 #[tokio::test]
1081 async fn test_health_response() {
1082 let body = health_response();
1083 assert_eq!(body, b"OK");
1084 }
1085
1086 #[tokio::test]
1087 async fn test_ready_response_when_ready() {
1088 let config = test_config();
1089 let state = ServerState::new(config);
1090 state.mark_ready();
1091 let (body, status) = ready_response(&state).await;
1092 assert_eq!(body, b"READY");
1093 assert_eq!(status, "200 OK");
1094 }
1095
1096 #[tokio::test]
1097 async fn test_ready_response_when_not_ready() {
1098 let config = test_config();
1099 let state = ServerState::new(config);
1100 let (body, status) = ready_response(&state).await;
1101 assert_eq!(body, b"NOT READY");
1102 assert_eq!(status, "503 Service Unavailable");
1103 }
1104
1105 #[tokio::test]
1106 async fn test_metrics_response_format() {
1107 let config = test_config();
1108 let state = ServerState::new(config);
1109 let body = metrics_response(&state);
1110 let output = String::from_utf8_lossy(&body);
1111
1112 assert!(output.contains("ravenclaws_requests_total"));
1114 assert!(output.contains("ravenclaws_llm_requests_total"));
1115 assert!(output.contains("ravenclaws_tool_calls_total"));
1116 assert!(output.contains("ravenclaws_errors_total"));
1117 assert!(output.contains("ravenclaws_tokens_total"));
1118 assert!(output.contains("ravenclaws_uptime_seconds"));
1119 assert!(output.contains("ravenclaws_start_time_seconds"));
1120 assert!(output.contains("# HELP"));
1121 assert!(output.contains("# TYPE"));
1122 }
1123
1124 #[tokio::test]
1125 async fn test_metrics_counters_increment() {
1126 let config = test_config();
1127 let state = ServerState::new(config);
1128
1129 state.metrics.record_request();
1130 state.metrics.record_request();
1131 state.metrics.record_error();
1132 state.metrics.record_tokens(150);
1133
1134 assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1135 assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1136 assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1137 }
1138
1139 #[tokio::test]
1140 async fn test_uptime_increases() {
1141 let config = test_config();
1142 let state = ServerState::new(config);
1143 let uptime1 = state.metrics.uptime_secs();
1144 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1145 let uptime2 = state.metrics.uptime_secs();
1146 assert!(uptime2 >= uptime1);
1147 }
1148
1149 #[tokio::test]
1150 async fn test_server_binds_and_responds() {
1151 let mut config = test_config();
1152 config.runtime.port = 0; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1156 let addr = listener.local_addr().unwrap();
1157 let state = Arc::new(ServerState::new(config));
1158 state.mark_ready();
1159
1160 let state_clone = Arc::clone(&state);
1162 let handle = tokio::spawn(async move {
1163 let (stream, _) = listener.accept().await.unwrap();
1164 handle_connection(stream, state_clone).await;
1165 });
1166
1167 let response = reqwest::Client::new()
1169 .get(format!("http://{}/health", addr))
1170 .send()
1171 .await;
1172
1173 handle.await.unwrap();
1174
1175 if let Ok(resp) = response {
1176 assert_eq!(resp.status(), 200);
1177 let body = resp.text().await.unwrap();
1178 assert_eq!(body, "OK");
1179 }
1180 }
1182
1183 #[tokio::test]
1184 async fn test_server_404() {
1185 let mut config = test_config();
1186 config.runtime.port = 0;
1187
1188 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1189 let addr = listener.local_addr().unwrap();
1190 let state = Arc::new(ServerState::new(config));
1191 state.mark_ready();
1192
1193 let state_clone = Arc::clone(&state);
1194 let handle = tokio::spawn(async move {
1195 let (stream, _) = listener.accept().await.unwrap();
1196 handle_connection(stream, state_clone).await;
1197 });
1198
1199 let response = reqwest::Client::new()
1200 .get(format!("http://{}/unknown", addr))
1201 .send()
1202 .await;
1203
1204 handle.await.unwrap();
1205
1206 if let Ok(resp) = response {
1207 assert_eq!(resp.status(), 404);
1208 }
1209 }
1210
1211 #[tokio::test]
1212 async fn test_server_metrics_endpoint() {
1213 let mut config = test_config();
1214 config.runtime.port = 0;
1215
1216 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1217 let addr = listener.local_addr().unwrap();
1218 let state = Arc::new(ServerState::new(config));
1219 state.mark_ready();
1220
1221 let state_clone = Arc::clone(&state);
1222 let handle = tokio::spawn(async move {
1223 let (stream, _) = listener.accept().await.unwrap();
1224 handle_connection(stream, state_clone).await;
1225 });
1226
1227 let response = reqwest::Client::new()
1228 .get(format!("http://{}/metrics", addr))
1229 .send()
1230 .await;
1231
1232 handle.await.unwrap();
1233
1234 if let Ok(resp) = response {
1235 assert_eq!(resp.status(), 200);
1236 let body = resp.text().await.unwrap();
1237 assert!(body.contains("ravenclaws_requests_total"));
1238 }
1239 }
1240}