1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
37use std::sync::Arc;
38use std::time::Instant;
39
40use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
41use tokio::net::TcpListener;
42use tokio::signal;
43use tracing::{debug, error, info, instrument, warn};
44
45use crate::agent::{self, AgentLoopConfig};
46use crate::background::BackgroundTaskManager;
47use crate::config::Config;
48use crate::llm::{self, ChatMessage, LLMProviderTrait};
49use crate::tools::{ToolCall, ToolRegistry};
50
51#[derive(Debug, Default)]
55pub struct ServerMetrics {
56 pub requests_total: AtomicU64,
58 pub llm_requests_total: AtomicU64,
60 pub tool_calls_total: AtomicU64,
62 pub errors_total: AtomicU64,
64 pub tokens_total: AtomicU64,
66 pub start_time: AtomicU64,
68 pub ready_cache_time: AtomicU64,
70 pub ready_cache_result: AtomicU64,
72}
73
74impl Clone for ServerMetrics {
76 fn clone(&self) -> Self {
77 Self {
78 requests_total: AtomicU64::new(self.requests_total.load(Ordering::Relaxed)),
79 llm_requests_total: AtomicU64::new(self.llm_requests_total.load(Ordering::Relaxed)),
80 tool_calls_total: AtomicU64::new(self.tool_calls_total.load(Ordering::Relaxed)),
81 errors_total: AtomicU64::new(self.errors_total.load(Ordering::Relaxed)),
82 tokens_total: AtomicU64::new(self.tokens_total.load(Ordering::Relaxed)),
83 start_time: AtomicU64::new(self.start_time.load(Ordering::Relaxed)),
84 ready_cache_time: AtomicU64::new(self.ready_cache_time.load(Ordering::Relaxed)),
85 ready_cache_result: AtomicU64::new(self.ready_cache_result.load(Ordering::Relaxed)),
86 }
87 }
88}
89
90impl ServerMetrics {
91 fn new() -> Self {
92 let metrics = Self::default();
93 metrics.start_time.store(
94 std::time::SystemTime::now()
95 .duration_since(std::time::UNIX_EPOCH)
96 .unwrap_or_default()
97 .as_secs(),
98 Ordering::Relaxed,
99 );
100 metrics
101 }
102
103 fn record_request(&self) {
104 self.requests_total.fetch_add(1, Ordering::Relaxed);
105 }
106
107 fn record_llm_request(&self) {
108 self.llm_requests_total.fetch_add(1, Ordering::Relaxed);
109 }
110
111 fn record_tool_call(&self) {
112 self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
113 }
114
115 fn record_error(&self) {
116 self.errors_total.fetch_add(1, Ordering::Relaxed);
117 }
118
119 #[cfg_attr(not(test), allow(dead_code))]
120 fn record_tokens(&self, count: u64) {
121 self.tokens_total.fetch_add(count, Ordering::Relaxed);
122 }
123
124 fn uptime_secs(&self) -> u64 {
125 let now = std::time::SystemTime::now()
126 .duration_since(std::time::UNIX_EPOCH)
127 .unwrap_or_default()
128 .as_secs();
129 let start = self.start_time.load(Ordering::Relaxed);
130 now.saturating_sub(start)
131 }
132}
133
134pub struct ServerState {
138 pub ready: AtomicBool,
140 pub metrics: ServerMetrics,
142 pub config: Config,
144 #[allow(dead_code)]
146 pub start_time: Instant,
147 pub llm: Option<Arc<dyn LLMProviderTrait>>,
149 pub tool_registry: Option<ToolRegistry>,
151 pub bg_manager: Option<BackgroundTaskManager>,
153 pub mcp_manager: Option<crate::mcp::McpClientManager>,
155}
156
157impl ServerState {
158 fn new(config: Config) -> Self {
159 Self {
160 ready: AtomicBool::new(false),
161 metrics: ServerMetrics::new(),
162 config,
163 start_time: Instant::now(),
164 llm: None,
165 tool_registry: None,
166 bg_manager: None,
167 mcp_manager: None,
168 }
169 }
170
171 fn mark_ready(&self) {
173 self.ready.store(true, Ordering::Relaxed);
174 info!("Server marked as ready");
175 }
176}
177
178fn health_response() -> Vec<u8> {
181 b"OK".to_vec()
182}
183
184async fn ready_response(state: &ServerState) -> (Vec<u8>, &'static str) {
185 if !state.ready.load(Ordering::Relaxed) {
186 return (b"NOT READY".to_vec(), "503 Service Unavailable");
187 }
188
189 if let Some(ref llm) = state.llm {
191 let now = std::time::SystemTime::now()
193 .duration_since(std::time::UNIX_EPOCH)
194 .unwrap_or_default()
195 .as_secs();
196 let cache_time = state.metrics.ready_cache_time.load(Ordering::Relaxed);
197 let cache_ttl: u64 = 30;
198
199 if now.saturating_sub(cache_time) < cache_ttl {
200 if state.metrics.ready_cache_result.load(Ordering::Relaxed) == 1 {
202 return (b"READY".to_vec(), "200 OK");
203 } else {
204 return (
205 b"NOT READY: LLM unreachable".to_vec(),
206 "503 Service Unavailable",
207 );
208 }
209 }
210
211 let result = match tokio::time::timeout(
213 std::time::Duration::from_secs(5),
214 llm.chat(vec![ChatMessage {
215 role: "user".to_string(),
216 content: "Respond with exactly one word: ready".to_string(),
217 }]),
218 )
219 .await
220 {
221 Ok(Ok(_)) => {
222 state.metrics.ready_cache_result.store(1, Ordering::Relaxed);
224 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
225 (b"READY".to_vec(), "200 OK")
226 }
227 Ok(Err(e)) => {
228 warn!(error = %e, "Readiness check: LLM connectivity failed");
229 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
230 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
231 (
232 b"NOT READY: LLM unreachable".to_vec(),
233 "503 Service Unavailable",
234 )
235 }
236 Err(_) => {
237 warn!("Readiness check: LLM connectivity timed out");
238 state.metrics.ready_cache_result.store(0, Ordering::Relaxed);
239 state.metrics.ready_cache_time.store(now, Ordering::Relaxed);
240 (
241 b"NOT READY: LLM timeout".to_vec(),
242 "503 Service Unavailable",
243 )
244 }
245 };
246 result
247 } else {
248 (b"READY".to_vec(), "200 OK")
249 }
250}
251
252fn metrics_response(state: &ServerState) -> Vec<u8> {
253 let metrics = &state.metrics;
254 format!(
255 "# HELP ravenclaws_requests_total Total HTTP requests served\n\
256 # TYPE ravenclaws_requests_total counter\n\
257 ravenclaws_requests_total {}\n\
258 \n\
259 # HELP ravenclaws_llm_requests_total Total LLM requests made\n\
260 # TYPE ravenclaws_llm_requests_total counter\n\
261 ravenclaws_llm_requests_total {}\n\
262 \n\
263 # HELP ravenclaws_tool_calls_total Total tool calls executed\n\
264 # TYPE ravenclaws_tool_calls_total counter\n\
265 ravenclaws_tool_calls_total {}\n\
266 \n\
267 # HELP ravenclaws_errors_total Total errors encountered\n\
268 # TYPE ravenclaws_errors_total counter\n\
269 ravenclaws_errors_total {}\n\
270 \n\
271 # HELP ravenclaws_tokens_total Total tokens consumed (estimated)\n\
272 # TYPE ravenclaws_tokens_total counter\n\
273 ravenclaws_tokens_total {}\n\
274 \n\
275 # HELP ravenclaws_uptime_seconds Server uptime in seconds\n\
276 # TYPE ravenclaws_uptime_seconds gauge\n\
277 ravenclaws_uptime_seconds {}\n\
278 \n\
279 # HELP ravenclaws_start_time_seconds Server start time (Unix epoch)\n\
280 # TYPE ravenclaws_start_time_seconds gauge\n\
281 ravenclaws_start_time_seconds {}\n",
282 metrics.requests_total.load(Ordering::Relaxed),
283 metrics.llm_requests_total.load(Ordering::Relaxed),
284 metrics.tool_calls_total.load(Ordering::Relaxed),
285 metrics.errors_total.load(Ordering::Relaxed),
286 metrics.tokens_total.load(Ordering::Relaxed),
287 metrics.uptime_secs(),
288 metrics.start_time.load(Ordering::Relaxed),
289 )
290 .into_bytes()
291}
292
293fn parse_content_length(headers: &[String]) -> usize {
297 for header in headers {
298 if let Some(value) = header
299 .strip_prefix("content-length:")
300 .or_else(|| header.strip_prefix("Content-Length:"))
301 {
302 return value.trim().parse().unwrap_or(0);
303 }
304 }
305 0
306}
307
308async fn read_body(
310 reader: &mut BufReader<&mut tokio::net::TcpStream>,
311 content_length: usize,
312) -> Vec<u8> {
313 if content_length == 0 {
314 return Vec::new();
315 }
316 let mut body = vec![0u8; content_length];
317 if let Err(e) = reader.read_exact(&mut body).await {
318 warn!(error = %e, "Failed to read request body");
319 return Vec::new();
320 }
321 body
322}
323
324#[instrument(skip_all, fields(peer = ?stream.peer_addr().ok()))]
326async fn handle_connection(mut stream: tokio::net::TcpStream, state: Arc<ServerState>) {
327 let peer = stream.peer_addr().ok();
328 let mut reader = BufReader::new(&mut stream);
329 let mut request_line = String::new();
330
331 if reader.read_line(&mut request_line).await.is_err() {
333 return;
334 }
335
336 let request_line = request_line.trim();
337 if request_line.is_empty() {
338 return;
339 }
340
341 state.metrics.record_request();
342
343 let parts: Vec<&str> = request_line.split_whitespace().collect();
345 let method = parts.first().unwrap_or(&"GET");
346 let path = parts.get(1).unwrap_or(&"/");
347
348 let mut headers: Vec<String> = Vec::new();
350 let mut header_line = String::new();
351 loop {
352 header_line.clear();
353 if reader.read_line(&mut header_line).await.is_err() {
354 return;
355 }
356 let trimmed = header_line.trim();
357 if trimmed.is_empty() {
358 break;
359 }
360 headers.push(trimmed.to_lowercase());
361 }
362
363 let content_length = parse_content_length(&headers);
365 let body = read_body(&mut reader, content_length).await;
366
367 let (response_body, status_line, content_type) = match (*method, *path) {
369 ("GET", "/health") => (health_response(), "200 OK", "text/plain"),
370 ("GET", "/ready") => {
371 let (body, status) = ready_response(&state).await;
372 (body, status, "text/plain")
373 }
374 ("GET", "/metrics") => (
375 metrics_response(&state),
376 "200 OK",
377 "text/plain; charset=utf-8",
378 ),
379 ("GET", "/health/deep") => match handle_health_deep(&state).await {
380 Ok(body) => (body, "200 OK", "application/json"),
381 Err(e) => {
382 state.metrics.record_error();
383 (
384 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
385 "503 Service Unavailable",
386 "application/json",
387 )
388 }
389 },
390 ("POST", "/chat") => match handle_chat(&state, &body).await {
391 Ok(body) => (body, "200 OK", "application/json"),
392 Err(e) => {
393 state.metrics.record_error();
394 (
395 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
396 "400 Bad Request",
397 "application/json",
398 )
399 }
400 },
401 ("POST", "/reload") => match handle_reload(&state).await {
402 Ok(body) => (body, "200 OK", "application/json"),
403 Err(e) => {
404 state.metrics.record_error();
405 (
406 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
407 "500 Internal Server Error",
408 "application/json",
409 )
410 }
411 },
412 ("POST", "/execute") => match handle_execute(&state, &body).await {
413 Ok(body) => (body, "200 OK", "application/json"),
414 Err(e) => {
415 state.metrics.record_error();
416 (
417 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
418 "400 Bad Request",
419 "application/json",
420 )
421 }
422 },
423 ("GET", path) if path.starts_with("/tasks/") => {
424 let task_id = path.strip_prefix("/tasks/").unwrap_or("");
425 match handle_task_status(&state, task_id).await {
426 Ok(body) => (body, "200 OK", "application/json"),
427 Err(e) => {
428 state.metrics.record_error();
429 (
430 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
431 "404 Not Found",
432 "application/json",
433 )
434 }
435 }
436 }
437 ("GET", "/tools") => match handle_list_tools(&state) {
438 Ok(body) => (body, "200 OK", "application/json"),
439 Err(e) => {
440 state.metrics.record_error();
441 (
442 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
443 "500 Internal Server Error",
444 "application/json",
445 )
446 }
447 },
448 ("GET", path) if path.starts_with("/tools/") => {
449 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
450 match handle_get_tool(&state, tool_name) {
451 Ok(body) => (body, "200 OK", "application/json"),
452 Err(e) => {
453 state.metrics.record_error();
454 (
455 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
456 "404 Not Found",
457 "application/json",
458 )
459 }
460 }
461 }
462 ("POST", path) if path.starts_with("/tools/") => {
463 let tool_name = path.strip_prefix("/tools/").unwrap_or("");
464 match handle_execute_tool(&state, tool_name, &body).await {
465 Ok(body) => (body, "200 OK", "application/json"),
466 Err(e) => {
467 state.metrics.record_error();
468 let status = if e.to_string().contains("not found")
469 || e.to_string().contains("No tool")
470 {
471 "404 Not Found"
472 } else {
473 "400 Bad Request"
474 };
475 (
476 format!("{{\"error\":\"{}\"}}", e).into_bytes(),
477 status,
478 "application/json",
479 )
480 }
481 }
482 }
483 _ => {
484 state.metrics.record_error();
485 (b"Not Found".to_vec(), "404 Not Found", "text/plain")
486 }
487 };
488
489 let response = format!(
491 "HTTP/1.1 {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
492 status_line,
493 content_type,
494 response_body.len(),
495 );
496
497 if let Err(e) = stream.write_all(response.as_bytes()).await {
498 warn!(error = %e, "Failed to write response headers");
499 return;
500 }
501 if let Err(e) = stream.write_all(&response_body).await {
502 warn!(error = %e, "Failed to write response body");
503 return;
504 }
505 if let Err(e) = stream.flush().await {
506 warn!(error = %e, "Failed to flush response");
507 return;
508 }
509
510 debug!(path = %path, status = %status_line, peer = ?peer, "Request handled");
511}
512
513async fn wait_for_shutdown() {
517 let ctrl_c = async {
518 signal::ctrl_c()
519 .await
520 .expect("Failed to install Ctrl+C handler");
521 };
522
523 #[cfg(unix)]
524 let terminate = async {
525 signal::unix::signal(signal::unix::SignalKind::terminate())
526 .expect("Failed to install SIGTERM handler")
527 .recv()
528 .await;
529 };
530
531 #[cfg(not(unix))]
532 let terminate = std::future::pending::<()>();
533
534 tokio::select! {
535 _ = ctrl_c => {
536 info!("Received Ctrl+C, shutting down gracefully...");
537 }
538 _ = terminate => {
539 info!("Received SIGTERM, shutting down gracefully...");
540 }
541 }
542}
543
544#[cfg(unix)]
548async fn wait_for_sighup() -> bool {
549 use tokio::signal::unix::SignalKind;
550 let mut stream = match signal::unix::signal(SignalKind::hangup()) {
551 Ok(s) => s,
552 Err(e) => {
553 warn!(error = %e, "Failed to install SIGHUP handler");
554 return false;
555 }
556 };
557 stream.recv().await;
558 info!("Received SIGHUP — reloading configuration");
559 true
560}
561
562#[cfg(not(unix))]
563async fn wait_for_sighup() -> bool {
564 std::future::pending::<()>().await;
566 false
567}
568
569async fn handle_chat(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
573 let llm = state
574 .llm
575 .as_ref()
576 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
577
578 #[derive(serde::Deserialize)]
579 struct ChatRequest {
580 messages: Vec<ChatMessage>,
581 #[serde(default)]
582 #[allow(dead_code)]
583 stream: bool,
584 #[serde(default)]
585 max_iterations: Option<usize>,
586 }
587
588 let req: ChatRequest =
589 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
590
591 if req.messages.is_empty() {
592 return Err(anyhow::anyhow!("No messages provided"));
593 }
594
595 let system_prompt = req
597 .messages
598 .iter()
599 .find(|m| m.role == "system")
600 .map(|m| m.content.clone())
601 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
602
603 let user_message = req
605 .messages
606 .iter()
607 .rev()
608 .find(|m| m.role == "user")
609 .map(|m| m.content.clone())
610 .ok_or_else(|| anyhow::anyhow!("No user message found"))?;
611
612 let metrics = state.metrics.clone();
613 let loop_config = AgentLoopConfig {
614 max_iterations: req.max_iterations.unwrap_or(10),
615 enable_tools: true,
616 require_approval: false,
617 prompt_injection_protection: state.config.security.prompt_injection_protection,
618 token_lifetime_secs: state.config.security.token_lifetime_secs,
619 no_final_required: true,
620 fallback_chain: None,
621 token_budget: None,
622 ravenfabric: None,
623 checkpoint_dir: None,
624 session_id: None,
625 metrics_callback: Some(Box::new(move |tokens, tool_calls| {
626 if tokens > 0 {
627 metrics.tokens_total.fetch_add(tokens, Ordering::Relaxed);
628 }
629 if tool_calls > 0 {
630 metrics
631 .tool_calls_total
632 .fetch_add(tool_calls, Ordering::Relaxed);
633 }
634 })),
635 };
636
637 let tool_registry = state.tool_registry.clone();
638
639 let response = agent::run_agent_loop_with_registry(
640 llm.clone(),
641 &user_message,
642 &system_prompt,
643 loop_config,
644 tool_registry,
645 )
646 .await?;
647
648 state.metrics.record_llm_request();
649
650 let result = serde_json::json!({
651 "response": response,
652 "model": llm.model(),
653 "provider": llm.provider_name(),
654 });
655
656 Ok(serde_json::to_vec(&result)?)
657}
658
659async fn handle_execute(state: &ServerState, body: &[u8]) -> anyhow::Result<Vec<u8>> {
661 let bg_manager = state
662 .bg_manager
663 .as_ref()
664 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
665
666 #[derive(serde::Deserialize)]
667 struct ExecuteRequest {
668 prompt: String,
669 #[serde(default)]
670 system_prompt: Option<String>,
671 }
672
673 let req: ExecuteRequest =
674 serde_json::from_slice(body).map_err(|e| anyhow::anyhow!("Invalid request body: {}", e))?;
675
676 if req.prompt.trim().is_empty() {
677 return Err(anyhow::anyhow!("Prompt cannot be empty"));
678 }
679
680 let system_prompt = req
681 .system_prompt
682 .unwrap_or_else(|| state.config.llm.system_prompt.clone());
683
684 let task_id = bg_manager
685 .submit(req.prompt, system_prompt)
686 .await
687 .map_err(|e| anyhow::anyhow!("Failed to submit task: {}", e))?;
688
689 if let Some(ref llm) = state.llm {
691 let bg = bg_manager.clone();
692 let tid = task_id.clone();
693 let llm_clone = llm.clone();
694 tokio::spawn(async move {
695 if let Err(e) = bg.execute(&tid, llm_clone).await {
696 warn!(task_id = %tid, error = %e, "Background task execution failed");
697 }
698 });
699 }
700
701 let result = serde_json::json!({
702 "task_id": task_id,
703 "status": "pending",
704 });
705
706 Ok(serde_json::to_vec(&result)?)
707}
708
709async fn handle_task_status(state: &ServerState, task_id: &str) -> anyhow::Result<Vec<u8>> {
711 let bg_manager = state
712 .bg_manager
713 .as_ref()
714 .ok_or_else(|| anyhow::anyhow!("No background task manager configured"))?;
715
716 let task = bg_manager
717 .get_task(task_id)
718 .await
719 .map_err(|e| anyhow::anyhow!("Task not found: {}", e))?;
720
721 let result = serde_json::json!({
722 "task_id": task.id,
723 "status": task.status.to_string(),
724 "result": task.result,
725 "error": task.error,
726 "created_at": task.created_at,
727 "updated_at": task.updated_at,
728 "iterations": task.iterations,
729 "provider": task.provider,
730 "model": task.model,
731 });
732
733 Ok(serde_json::to_vec(&result)?)
734}
735
736fn handle_list_tools(state: &ServerState) -> anyhow::Result<Vec<u8>> {
738 let registry = state
739 .tool_registry
740 .as_ref()
741 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
742
743 let tools: Vec<serde_json::Value> = registry
744 .definitions()
745 .iter()
746 .map(|def| {
747 serde_json::json!({
748 "name": def.name,
749 "description": def.description,
750 "parameters": def.parameters,
751 "category": def.category,
752 "requires_approval": def.requires_approval,
753 })
754 })
755 .collect();
756
757 let result = serde_json::json!({
758 "tools": tools,
759 "count": tools.len(),
760 });
761
762 Ok(serde_json::to_vec(&result)?)
763}
764
765fn handle_get_tool(state: &ServerState, tool_name: &str) -> anyhow::Result<Vec<u8>> {
767 let registry = state
768 .tool_registry
769 .as_ref()
770 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
771
772 let definitions = registry.definitions();
773 let def = definitions
774 .iter()
775 .find(|d| d.name == tool_name)
776 .ok_or_else(|| anyhow::anyhow!("Tool '{}' not found", tool_name))?;
777
778 let result = serde_json::json!({
779 "name": def.name,
780 "description": def.description,
781 "parameters": def.parameters,
782 "category": def.category,
783 "requires_approval": def.requires_approval,
784 });
785
786 Ok(serde_json::to_vec(&result)?)
787}
788
789async fn handle_execute_tool(
791 state: &ServerState,
792 tool_name: &str,
793 body: &[u8],
794) -> anyhow::Result<Vec<u8>> {
795 let registry = state
796 .tool_registry
797 .as_ref()
798 .ok_or_else(|| anyhow::anyhow!("No tool registry configured"))?;
799
800 let args: serde_json::Value = if body.is_empty() {
801 serde_json::Value::Object(serde_json::Map::new())
802 } else {
803 serde_json::from_slice(body)
804 .map_err(|e| anyhow::anyhow!("Invalid arguments JSON: {}", e))?
805 };
806
807 let call = ToolCall {
808 name: tool_name.to_string(),
809 arguments: args,
810 id: None,
811 };
812
813 let result = registry.execute(call).await?;
814 state.metrics.record_tool_call();
815
816 let response = serde_json::json!({
817 "tool": result.tool_name,
818 "success": result.success,
819 "output": result.output,
820 "error": result.error,
821 "exit_code": result.exit_code,
822 "duration_ms": result.duration_ms,
823 });
824
825 Ok(serde_json::to_vec(&response)?)
826}
827
828async fn handle_health_deep(state: &ServerState) -> anyhow::Result<Vec<u8>> {
830 let llm = state
831 .llm
832 .as_ref()
833 .ok_or_else(|| anyhow::anyhow!("No LLM client configured"))?;
834
835 let messages = vec![ChatMessage {
837 role: "user".to_string(),
838 content: "Respond with exactly: OK".to_string(),
839 }];
840
841 let response = llm
842 .chat(messages)
843 .await
844 .map_err(|e| anyhow::anyhow!("LLM connectivity check failed: {}", e))?;
845
846 let content = response
847 .choices
848 .first()
849 .map(|c| c.message.content.clone())
850 .unwrap_or_default();
851
852 let result = serde_json::json!({
853 "status": "ok",
854 "provider": llm.provider_name(),
855 "model": llm.model(),
856 "response": content,
857 "uptime_seconds": state.metrics.uptime_secs(),
858 });
859
860 Ok(serde_json::to_vec(&result)?)
861}
862
863async fn handle_reload(_state: &ServerState) -> anyhow::Result<Vec<u8>> {
871 info!("Reloading configuration via POST /reload...");
872 let config_path = std::env::var("RAVENCLAWS_CONFIG").ok();
873 match Config::load(config_path.as_deref()) {
874 Ok(_new_config) => {
875 info!("Configuration reloaded successfully");
880 let result = serde_json::json!({
881 "status": "ok",
882 "message": "Configuration reloaded successfully. Note: LLM client and tool registry hot-reload requires a server restart.",
883 });
884 Ok(serde_json::to_vec(&result)?)
885 }
886 Err(e) => {
887 warn!(error = %e, "Failed to reload configuration via POST /reload");
888 Err(anyhow::anyhow!("Failed to reload configuration: {}", e))
889 }
890 }
891}
892
893#[instrument(skip_all, fields(bind_addr))]
901pub async fn run_server(config: Config) -> anyhow::Result<()> {
902 let host = config
903 .runtime
904 .host
905 .clone()
906 .unwrap_or_else(|| "0.0.0.0".to_string());
907 let port = config.runtime.port;
908 let bind_addr = format!("{}:{}", host, port);
909
910 let mut state = ServerState::new(config);
911
912 info!("Initializing LLM client for server mode");
914 let llm = llm::create_client(&state.config.llm)?;
915 state.llm = Some(llm);
916 info!(
917 provider = state
918 .llm
919 .as_ref()
920 .map(|l| l.provider_name())
921 .unwrap_or("unknown"),
922 model = state.llm.as_ref().map(|l| l.model()).unwrap_or("unknown"),
923 "LLM client initialized for server mode"
924 );
925
926 info!("Initializing tool registry");
928 let registry = ToolRegistry::with_config(&state.config);
929 state.tool_registry = Some(registry);
930 info!(
931 tool_count = state.tool_registry.as_ref().map(|r| r.len()).unwrap_or(0),
932 "Tool registry initialized"
933 );
934
935 info!("Initializing background task manager");
937 let bg_manager = BackgroundTaskManager::from_config(&state.config.runtime).await?;
938 state.bg_manager = Some(bg_manager);
939 info!("Background task manager initialized");
940
941 if !state.config.mcp.servers.is_empty() {
943 info!(
944 server_count = state.config.mcp.servers.len(),
945 "Initializing MCP clients from config"
946 );
947 let mcp_manager = crate::mcp::McpClientManager::from_config(&state.config.mcp).await;
948 let registered = if !mcp_manager.is_empty() {
949 if let Some(ref mut registry) = state.tool_registry {
950 mcp_manager.register_all_tools(registry).await
951 } else {
952 0
953 }
954 } else {
955 0
956 };
957 info!(
958 connected = mcp_manager.len(),
959 tools_registered = registered,
960 "MCP client initialization complete"
961 );
962 state.mcp_manager = Some(mcp_manager);
963 } else {
964 info!("No MCP servers configured, skipping MCP client initialization");
965 }
966
967 let state = Arc::new(state);
968 let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
969 error!(address = %bind_addr, error = %e, "Failed to bind HTTP server");
970 anyhow::anyhow!("Failed to bind to {}: {}", bind_addr, e)
971 })?;
972
973 info!(
974 address = %bind_addr,
975 "HTTP server started — endpoints: /health, /ready, /metrics, /health/deep, /chat, /execute, /tasks/:id, /tools, /tools/:name, /reload"
976 );
977
978 state.mark_ready();
980
981 loop {
983 tokio::select! {
984 accept_result = listener.accept() => {
985 match accept_result {
986 Ok((stream, peer)) => {
987 debug!(peer = %peer, "Accepted connection");
988 let state = Arc::clone(&state);
989 tokio::spawn(async move {
990 handle_connection(stream, state).await;
991 });
992 }
993 Err(e) => {
994 warn!(error = %e, "Failed to accept connection");
995 state.metrics.record_error();
996 }
997 }
998 }
999 _ = wait_for_shutdown() => {
1000 info!("Shutting down HTTP server gracefully...");
1001 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1003 info!("HTTP server shutdown complete");
1004 break;
1005 }
1006 _ = wait_for_sighup() => {
1007 info!("Reloading configuration from SIGHUP...");
1008 let config_path = std::env::var("RAVENCLAWS_CONFIG")
1010 .ok();
1011 match Config::load(config_path.as_deref()) {
1012 Ok(_new_config) => {
1013 info!("Configuration reloaded successfully");
1016 }
1020 Err(e) => {
1021 warn!(error = %e, "Failed to reload configuration on SIGHUP");
1022 }
1023 }
1024 }
1025 }
1026 }
1027
1028 Ok(())
1029}
1030
1031#[cfg(test)]
1034mod tests {
1035 use super::*;
1036 use crate::config::RuntimeConfig;
1037
1038 fn test_config() -> Config {
1039 Config {
1040 runtime: RuntimeConfig {
1041 host: Some("127.0.0.1".to_string()),
1042 port: 0, ..RuntimeConfig::default()
1044 },
1045 ..Config::default()
1046 }
1047 }
1048
1049 #[tokio::test]
1050 async fn test_health_response() {
1051 let body = health_response();
1052 assert_eq!(body, b"OK");
1053 }
1054
1055 #[tokio::test]
1056 async fn test_ready_response_when_ready() {
1057 let config = test_config();
1058 let state = ServerState::new(config);
1059 state.mark_ready();
1060 let (body, status) = ready_response(&state).await;
1061 assert_eq!(body, b"READY");
1062 assert_eq!(status, "200 OK");
1063 }
1064
1065 #[tokio::test]
1066 async fn test_ready_response_when_not_ready() {
1067 let config = test_config();
1068 let state = ServerState::new(config);
1069 let (body, status) = ready_response(&state).await;
1070 assert_eq!(body, b"NOT READY");
1071 assert_eq!(status, "503 Service Unavailable");
1072 }
1073
1074 #[tokio::test]
1075 async fn test_metrics_response_format() {
1076 let config = test_config();
1077 let state = ServerState::new(config);
1078 let body = metrics_response(&state);
1079 let output = String::from_utf8_lossy(&body);
1080
1081 assert!(output.contains("ravenclaws_requests_total"));
1083 assert!(output.contains("ravenclaws_llm_requests_total"));
1084 assert!(output.contains("ravenclaws_tool_calls_total"));
1085 assert!(output.contains("ravenclaws_errors_total"));
1086 assert!(output.contains("ravenclaws_tokens_total"));
1087 assert!(output.contains("ravenclaws_uptime_seconds"));
1088 assert!(output.contains("ravenclaws_start_time_seconds"));
1089 assert!(output.contains("# HELP"));
1090 assert!(output.contains("# TYPE"));
1091 }
1092
1093 #[tokio::test]
1094 async fn test_metrics_counters_increment() {
1095 let config = test_config();
1096 let state = ServerState::new(config);
1097
1098 state.metrics.record_request();
1099 state.metrics.record_request();
1100 state.metrics.record_error();
1101 state.metrics.record_tokens(150);
1102
1103 assert_eq!(state.metrics.requests_total.load(Ordering::Relaxed), 2);
1104 assert_eq!(state.metrics.errors_total.load(Ordering::Relaxed), 1);
1105 assert_eq!(state.metrics.tokens_total.load(Ordering::Relaxed), 150);
1106 }
1107
1108 #[tokio::test]
1109 async fn test_uptime_increases() {
1110 let config = test_config();
1111 let state = ServerState::new(config);
1112 let uptime1 = state.metrics.uptime_secs();
1113 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1114 let uptime2 = state.metrics.uptime_secs();
1115 assert!(uptime2 >= uptime1);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_server_binds_and_responds() {
1120 let mut config = test_config();
1121 config.runtime.port = 0; let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1125 let addr = listener.local_addr().unwrap();
1126 let state = Arc::new(ServerState::new(config));
1127 state.mark_ready();
1128
1129 let state_clone = Arc::clone(&state);
1131 let handle = tokio::spawn(async move {
1132 let (stream, _) = listener.accept().await.unwrap();
1133 handle_connection(stream, state_clone).await;
1134 });
1135
1136 let response = reqwest::Client::new()
1138 .get(format!("http://{}/health", addr))
1139 .send()
1140 .await;
1141
1142 handle.await.unwrap();
1143
1144 if let Ok(resp) = response {
1145 assert_eq!(resp.status(), 200);
1146 let body = resp.text().await.unwrap();
1147 assert_eq!(body, "OK");
1148 }
1149 }
1151
1152 #[tokio::test]
1153 async fn test_server_404() {
1154 let mut config = test_config();
1155 config.runtime.port = 0;
1156
1157 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1158 let addr = listener.local_addr().unwrap();
1159 let state = Arc::new(ServerState::new(config));
1160 state.mark_ready();
1161
1162 let state_clone = Arc::clone(&state);
1163 let handle = tokio::spawn(async move {
1164 let (stream, _) = listener.accept().await.unwrap();
1165 handle_connection(stream, state_clone).await;
1166 });
1167
1168 let response = reqwest::Client::new()
1169 .get(format!("http://{}/unknown", addr))
1170 .send()
1171 .await;
1172
1173 handle.await.unwrap();
1174
1175 if let Ok(resp) = response {
1176 assert_eq!(resp.status(), 404);
1177 }
1178 }
1179
1180 #[tokio::test]
1181 async fn test_server_metrics_endpoint() {
1182 let mut config = test_config();
1183 config.runtime.port = 0;
1184
1185 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1186 let addr = listener.local_addr().unwrap();
1187 let state = Arc::new(ServerState::new(config));
1188 state.mark_ready();
1189
1190 let state_clone = Arc::clone(&state);
1191 let handle = tokio::spawn(async move {
1192 let (stream, _) = listener.accept().await.unwrap();
1193 handle_connection(stream, state_clone).await;
1194 });
1195
1196 let response = reqwest::Client::new()
1197 .get(format!("http://{}/metrics", addr))
1198 .send()
1199 .await;
1200
1201 handle.await.unwrap();
1202
1203 if let Ok(resp) = response {
1204 assert_eq!(resp.status(), 200);
1205 let body = resp.text().await.unwrap();
1206 assert!(body.contains("ravenclaws_requests_total"));
1207 }
1208 }
1209}