1use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15 Router,
16 body::Bytes,
17 extract::{Json, Path, State},
18 http::StatusCode,
19 response::IntoResponse,
20 routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35 pub socket_addr: std::net::SocketAddr,
36 pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41 Self {
42 socket_addr,
43 handle: handle.map(Arc::new),
44 }
45 }
46
47 pub fn address(&self) -> String {
48 self.socket_addr.to_string()
49 }
50
51 pub fn hostname(&self) -> String {
52 self.socket_addr.ip().to_string()
53 }
54
55 pub fn port(&self) -> u16 {
56 self.socket_addr.port()
57 }
58}
59
60impl Clone for SystemStatusServerInfo {
61 fn clone(&self) -> Self {
62 Self {
63 socket_addr: self.socket_addr,
64 handle: self.handle.clone(),
65 }
66 }
67}
68
69pub struct SystemStatusState {
71 root_drt: Arc<crate::DistributedRuntime>,
73 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78 pub fn new(
80 drt: Arc<crate::DistributedRuntime>,
81 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82 ) -> anyhow::Result<Self> {
83 Ok(Self {
84 root_drt: drt,
85 discovery_metadata,
86 })
87 }
88
89 pub fn drt(&self) -> &crate::DistributedRuntime {
91 &self.root_drt
92 }
93
94 pub fn discovery_metadata(
96 &self,
97 ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98 self.discovery_metadata.as_ref()
99 }
100}
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105 pub lora_name: String,
106 pub source: LoraSource,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112 pub uri: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118 pub status: String,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub message: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub lora_name: Option<String>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub lora_id: Option<u64>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub loras: Option<serde_json::Value>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub count: Option<usize>,
129}
130
131pub async fn spawn_system_status_server(
133 host: &str,
134 port: u16,
135 cancel_token: CancellationToken,
136 drt: Arc<crate::DistributedRuntime>,
137 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139 let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141 let health_path = server_state
142 .drt()
143 .system_health()
144 .lock()
145 .health_path()
146 .to_string();
147 let live_path = server_state
148 .drt()
149 .system_health()
150 .lock()
151 .live_path()
152 .to_string();
153
154 let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156 .map(|v| v.to_lowercase() == "true")
157 .unwrap_or(false);
158
159 let mut app = Router::new()
160 .route(
161 &health_path,
162 get({
163 let state = Arc::clone(&server_state);
164 move || health_handler(state)
165 }),
166 )
167 .route(
168 &live_path,
169 get({
170 let state = Arc::clone(&server_state);
171 move || health_handler(state)
172 }),
173 )
174 .route(
175 "/metrics",
176 get({
177 let state = Arc::clone(&server_state);
178 move || metrics_handler(state)
179 }),
180 )
181 .route(
182 "/metadata",
183 get({
184 let state = Arc::clone(&server_state);
185 move || metadata_handler(state)
186 }),
187 )
188 .route(
189 "/engine/{*path}",
190 any({
191 let state = Arc::clone(&server_state);
192 move |path, body| engine_route_handler(state, path, body)
193 }),
194 );
195
196 if lora_enabled {
198 app = app
199 .route(
200 "/v1/loras",
201 get({
202 let state = Arc::clone(&server_state);
203 move || list_loras_handler(State(state))
204 })
205 .post({
206 let state = Arc::clone(&server_state);
207 move |body| load_lora_handler(State(state), body)
208 }),
209 )
210 .route(
211 "/v1/loras/{*lora_name}",
212 delete({
213 let state = Arc::clone(&server_state);
214 move |path| unload_lora_handler(State(state), path)
215 }),
216 );
217 }
218
219 let app = app
220 .fallback(|| async {
221 tracing::info!("[fallback handler] called");
222 (StatusCode::NOT_FOUND, "Route not found").into_response()
223 })
224 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226 let address = format!("{}:{}", host, port);
227 tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229 let listener = match TcpListener::bind(&address).await {
230 Ok(listener) => {
231 let actual_address = listener.local_addr()?;
233 tracing::info!(
234 "[spawn_system_status_server] system status server bound to: {}",
235 actual_address
236 );
237 (listener, actual_address)
238 }
239 Err(e) => {
240 tracing::error!("Failed to bind to address {}: {}", address, e);
241 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242 }
243 };
244 let (listener, actual_address) = listener;
245
246 let observer = cancel_token.child_token();
247 let handle = tokio::spawn(async move {
249 if let Err(e) = axum::serve(listener, app)
250 .with_graceful_shutdown(observer.cancelled_owned())
251 .await
252 {
253 tracing::error!("System status server error: {}", e);
254 }
255 });
256
257 Ok((actual_address, handle))
258}
259
260#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263 let system_health = state.drt().system_health();
265 let system_health_lock = system_health.lock();
266 let (healthy, endpoints) = system_health_lock.get_health_status();
267 let uptime = Some(system_health_lock.uptime());
268 drop(system_health_lock);
269
270 let healthy_string = if healthy { "ready" } else { "notready" };
271 let status_code = if healthy {
272 StatusCode::OK
273 } else {
274 StatusCode::SERVICE_UNAVAILABLE
275 };
276
277 let response = json!({
278 "status": healthy_string,
279 "uptime": uptime,
280 "endpoints": endpoints,
281 });
282
283 tracing::trace!("Response {}", response.to_string());
284
285 (status_code, response.to_string())
286}
287
288#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291 let response = match state.drt().metrics().prometheus_expfmt() {
298 Ok(r) => r,
299 Err(e) => {
300 tracing::error!("Failed to get metrics from registry: {}", e);
301 return (
302 StatusCode::INTERNAL_SERVER_ERROR,
303 "Failed to get metrics".to_string(),
304 );
305 }
306 };
307
308 (StatusCode::OK, response)
309}
310
311#[tracing::instrument(skip_all, level = "trace")]
313async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
314 let metadata = match state.discovery_metadata() {
316 Some(metadata) => metadata,
317 None => {
318 tracing::debug!("Metadata endpoint called but no discovery metadata available");
319 return (
320 StatusCode::NOT_FOUND,
321 "Discovery metadata not available".to_string(),
322 )
323 .into_response();
324 }
325 };
326
327 let metadata_guard = metadata.read().await;
329
330 match serde_json::to_string(&*metadata_guard) {
332 Ok(json) => {
333 tracing::trace!("Returning metadata: {} bytes", json.len());
334 (StatusCode::OK, json).into_response()
335 }
336 Err(e) => {
337 tracing::error!("Failed to serialize metadata: {}", e);
338 (
339 StatusCode::INTERNAL_SERVER_ERROR,
340 "Failed to serialize metadata".to_string(),
341 )
342 .into_response()
343 }
344 }
345}
346
347#[tracing::instrument(skip_all, level = "debug")]
349async fn load_lora_handler(
350 State(state): State<Arc<SystemStatusState>>,
351 Json(request): Json<LoadLoraRequest>,
352) -> impl IntoResponse {
353 tracing::info!("Loading LoRA: {}", request.lora_name);
354
355 match call_lora_endpoint(
357 state.drt(),
358 "load_lora",
359 json!({
360 "lora_name": request.lora_name,
361 "source": {
362 "uri": request.source.uri
363 },
364 }),
365 )
366 .await
367 {
368 Ok(response) => {
369 if response.status == "error" {
370 tracing::error!(
371 "Failed to load LoRA {}: {}",
372 request.lora_name,
373 response.message.as_deref().unwrap_or("Unknown error")
374 );
375 (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
376 } else {
377 tracing::info!("LoRA loaded successfully: {}", request.lora_name);
378 (StatusCode::OK, Json(response))
379 }
380 }
381 Err(e) => {
382 tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
383 (
384 StatusCode::INTERNAL_SERVER_ERROR,
385 Json(LoraResponse {
386 status: "error".to_string(),
387 message: Some(e.to_string()),
388 lora_name: Some(request.lora_name),
389 lora_id: None,
390 loras: None,
391 count: None,
392 }),
393 )
394 }
395 }
396}
397
398#[tracing::instrument(skip_all, level = "debug")]
400async fn unload_lora_handler(
401 State(state): State<Arc<SystemStatusState>>,
402 Path(lora_name): Path<String>,
403) -> impl IntoResponse {
404 let lora_name = lora_name
406 .strip_prefix('/')
407 .unwrap_or(&lora_name)
408 .to_string();
409 tracing::info!("Unloading LoRA: {}", lora_name);
410
411 match call_lora_endpoint(
413 state.drt(),
414 "unload_lora",
415 json!({
416 "lora_name": lora_name.clone(),
417 }),
418 )
419 .await
420 {
421 Ok(response) => {
422 if response.status == "error" {
423 tracing::error!(
424 "Failed to unload LoRA {}: {}",
425 lora_name,
426 response.message.as_deref().unwrap_or("Unknown error")
427 );
428 (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
429 } else {
430 tracing::info!("LoRA unloaded successfully: {}", lora_name);
431 (StatusCode::OK, Json(response))
432 }
433 }
434 Err(e) => {
435 tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
436 (
437 StatusCode::INTERNAL_SERVER_ERROR,
438 Json(LoraResponse {
439 status: "error".to_string(),
440 message: Some(e.to_string()),
441 lora_name: Some(lora_name),
442 lora_id: None,
443 loras: None,
444 count: None,
445 }),
446 )
447 }
448 }
449}
450
451#[tracing::instrument(skip_all, level = "debug")]
453async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
454 tracing::info!("Listing all LoRAs");
455
456 match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
458 Ok(response) => {
459 tracing::info!("Successfully retrieved LoRA list");
460 (StatusCode::OK, Json(response))
461 }
462 Err(e) => {
463 tracing::error!("Failed to list LoRAs: {}", e);
464 (
465 StatusCode::INTERNAL_SERVER_ERROR,
466 Json(LoraResponse {
467 status: "error".to_string(),
468 message: Some(e.to_string()),
469 lora_name: None,
470 lora_id: None,
471 loras: None,
472 count: None,
473 }),
474 )
475 }
476 }
477}
478
479async fn call_lora_endpoint(
484 drt: &crate::DistributedRuntime,
485 endpoint_name: &str,
486 request_body: serde_json::Value,
487) -> anyhow::Result<LoraResponse> {
488 use crate::engine::AsyncEngine;
489
490 tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
491
492 let local_registry = drt.local_endpoint_registry();
494 let engine = local_registry
495 .get(endpoint_name)
496 .ok_or_else(|| {
497 anyhow::anyhow!(
498 "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
499 endpoint_name
500 )
501 })?;
502
503 tracing::debug!(
504 "Found endpoint '{}' in local registry, calling directly",
505 endpoint_name
506 );
507
508 let request = crate::pipeline::SingleIn::new(request_body);
510 let mut stream = engine.generate(request).await?;
511
512 if let Some(response) = stream.next().await {
514 let response_data = response.data.unwrap_or_default();
515
516 let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
518 .unwrap_or_else(|_| parse_lora_response(&response_data));
519
520 return Ok(lora_response);
521 }
522
523 anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
524}
525
526fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
528 LoraResponse {
529 status: response_data
530 .get("status")
531 .and_then(|s| s.as_str())
532 .unwrap_or("success")
533 .to_string(),
534 message: response_data
535 .get("message")
536 .and_then(|m| m.as_str())
537 .map(|s| s.to_string()),
538 lora_name: response_data
539 .get("lora_name")
540 .and_then(|n| n.as_str())
541 .map(|s| s.to_string()),
542 lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
543 loras: response_data.get("loras").cloned(),
544 count: response_data
545 .get("count")
546 .and_then(|c| c.as_u64())
547 .map(|c| c as usize),
548 }
549}
550
551#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
556async fn engine_route_handler(
557 state: Arc<SystemStatusState>,
558 Path(path): Path<String>,
559 body: Bytes,
560) -> impl IntoResponse {
561 tracing::trace!("Engine route request to /engine/{}", path);
562
563 let body_json: serde_json::Value = if body.is_empty() {
565 serde_json::json!({})
566 } else {
567 match serde_json::from_slice(&body) {
568 Ok(json) => json,
569 Err(e) => {
570 tracing::warn!("Invalid JSON in request body: {}", e);
571 return (
572 StatusCode::BAD_REQUEST,
573 json!({
574 "error": "Invalid JSON",
575 "message": format!("{}", e)
576 })
577 .to_string(),
578 )
579 .into_response();
580 }
581 }
582 };
583
584 let callback = match state.drt().engine_routes().get(&path) {
586 Some(cb) => cb,
587 None => {
588 tracing::debug!("Route /engine/{} not found", path);
589 return (
590 StatusCode::NOT_FOUND,
591 json!({
592 "error": "Route not found",
593 "message": format!("Route /engine/{} not found", path)
594 })
595 .to_string(),
596 )
597 .into_response();
598 }
599 };
600
601 match callback(body_json).await {
603 Ok(response) => {
604 tracing::trace!("Engine route handler succeeded for /engine/{}", path);
605 (StatusCode::OK, response.to_string()).into_response()
606 }
607 Err(e) => {
608 tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
609 (
610 StatusCode::INTERNAL_SERVER_ERROR,
611 json!({
612 "error": "Handler error",
613 "message": format!("{}", e)
614 })
615 .to_string(),
616 )
617 .into_response()
618 }
619 }
620}
621
622#[cfg(test)]
624mod tests {
625 use super::*;
626 use tokio::time::Duration;
627
628 #[tokio::test]
630 async fn test_http_server_lifecycle() {
631 let cancel_token = CancellationToken::new();
632 let cancel_token_for_server = cancel_token.clone();
633
634 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
636
637 let server_handle = tokio::spawn(async move {
639 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
640 let _ = axum::serve(listener, app)
641 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
642 .await;
643 });
644
645 cancel_token.cancel();
649
650 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
652 assert!(
653 result.is_ok(),
654 "HTTP server should shut down when cancel token is cancelled"
655 );
656 }
657}
658
659#[cfg(all(test, feature = "integration"))]
661mod integration_tests {
662 use super::*;
663 use crate::config::environment_names::logging as env_logging;
664 use crate::config::environment_names::runtime::canary as env_canary;
665 use crate::distributed::distributed_test_utils::create_test_drt_async;
666 use crate::metrics::MetricsHierarchy;
667 use anyhow::Result;
668 use rstest::rstest;
669 use std::sync::Arc;
670 use tokio::time::Duration;
671
672 #[tokio::test]
673 async fn test_uptime_from_system_health() {
674 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
676 let drt = create_test_drt_async().await;
677
678 let uptime = drt.system_health().lock().uptime();
680 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
682
683 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
685 let uptime_after = drt.system_health().lock().uptime();
686 assert!(uptime_after > uptime);
687 })
688 .await;
689 }
690
691 #[tokio::test]
692 async fn test_runtime_metrics_initialization_and_namespace() {
693 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
695 let drt = create_test_drt_async().await;
696 let response = drt.metrics().prometheus_expfmt().unwrap();
701 println!("Full metrics response:\n{}", response);
702
703 assert!(
705 response.contains("# HELP dynamo_component_uptime_seconds"),
706 "Should contain uptime_seconds help text"
707 );
708 assert!(
709 response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
710 "Should contain uptime_seconds type"
711 );
712 assert!(
713 response.contains("dynamo_component_uptime_seconds"),
714 "Should contain uptime_seconds metric with correct namespace"
715 );
716 })
717 .await;
718 }
719
720 #[tokio::test]
721 async fn test_uptime_gauge_updates() {
722 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
724 let drt = create_test_drt_async().await;
725
726 let initial_uptime = drt.system_health().lock().uptime();
728
729 drt.system_health().lock().update_uptime_gauge();
731
732 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
734
735 let uptime_after_sleep = drt.system_health().lock().uptime();
737
738 drt.system_health().lock().update_uptime_gauge();
740
741 let elapsed = uptime_after_sleep - initial_uptime;
743 assert!(
744 elapsed >= std::time::Duration::from_millis(100),
745 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
746 elapsed
747 );
748 })
749 .await;
750 }
751
752 #[tokio::test]
753 async fn test_http_requests_fail_when_system_disabled() {
754 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
756 let drt = create_test_drt_async().await;
757
758 let system_info = drt.system_status_server_info();
760 assert!(
761 system_info.is_none(),
762 "System status server should not be running when disabled"
763 );
764
765 println!("✓ System status server correctly disabled when not enabled");
766 })
767 .await;
768 }
769
770 #[rstest]
777 #[case("ready", 200, "ready", None, None, 3)]
778 #[case("notready", 503, "notready", None, None, 3)]
779 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
780 #[case(
781 "notready",
782 503,
783 "notready",
784 Some("/custom/health"),
785 Some("/custom/live"),
786 5
787 )]
788 #[tokio::test]
789 #[cfg(feature = "integration")]
790 async fn test_health_endpoints(
791 #[case] starting_health_status: &'static str,
792 #[case] expected_status: u16,
793 #[case] expected_body: &'static str,
794 #[case] custom_health_path: Option<&'static str>,
795 #[case] custom_live_path: Option<&'static str>,
796 #[case] expected_num_tests: usize,
797 ) {
798 use std::sync::Arc;
799 crate::logging::init();
805
806 #[allow(clippy::redundant_closure_call)]
807 temp_env::async_with_vars(
808 [
809 (env_system::DYN_SYSTEM_PORT, Some("0")),
810 (
811 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
812 Some(starting_health_status),
813 ),
814 (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
815 (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
816 ],
817 (async || {
818 let drt = Arc::new(create_test_drt_async().await);
819
820 let system_info = drt
822 .system_status_server_info()
823 .expect("System status server should be started by DRT");
824 let addr = system_info.socket_addr;
825
826 let client = reqwest::Client::new();
827
828 let mut test_cases = vec![];
830 match custom_health_path {
831 None => {
832 test_cases.push(("/health", expected_status, expected_body));
834 }
835 Some(chp) => {
836 test_cases.push(("/health", 404, "Route not found"));
838 test_cases.push((chp, expected_status, expected_body));
839 }
840 }
841 match custom_live_path {
842 None => {
843 test_cases.push(("/live", expected_status, expected_body));
845 }
846 Some(clp) => {
847 test_cases.push(("/live", 404, "Route not found"));
849 test_cases.push((clp, expected_status, expected_body));
850 }
851 }
852 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
853 assert_eq!(test_cases.len(), expected_num_tests);
854
855 for (path, expect_status, expect_body) in test_cases {
856 println!("[test] Sending request to {}", path);
857 let url = format!("http://{}{}", addr, path);
858 let response = client.get(&url).send().await.unwrap();
859 let status = response.status();
860 let body = response.text().await.unwrap();
861 println!(
862 "[test] Response for {}: status={}, body={:?}",
863 path, status, body
864 );
865 assert_eq!(
866 status, expect_status,
867 "Response: status={}, body={:?}",
868 status, body
869 );
870 assert!(
871 body.contains(expect_body),
872 "Response: status={}, body={:?}",
873 status,
874 body
875 );
876 }
877 })(),
878 )
879 .await;
880 }
881
882 #[tokio::test]
883 async fn test_health_endpoint_tracing() -> Result<()> {
884 use std::sync::Arc;
885
886 #[allow(clippy::redundant_closure_call)]
889 let _ = temp_env::async_with_vars(
890 [
891 (env_system::DYN_SYSTEM_PORT, Some("0")),
892 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
893 (env_logging::DYN_LOGGING_JSONL, Some("1")),
894 (env_logging::DYN_LOG, Some("trace")),
895 ],
896 (async || {
897 crate::logging::init();
901
902 let drt = Arc::new(create_test_drt_async().await);
903
904 let system_info = drt
906 .system_status_server_info()
907 .expect("System status server should be started by DRT");
908 let addr = system_info.socket_addr;
909 let client = reqwest::Client::new();
910 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
911 let traceparent_value =
912 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
913 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
914 let mut headers = reqwest::header::HeaderMap::new();
915 headers.insert(
916 reqwest::header::HeaderName::from_static("traceparent"),
917 reqwest::header::HeaderValue::from_str(traceparent_value)?,
918 );
919 headers.insert(
920 reqwest::header::HeaderName::from_static("tracestate"),
921 reqwest::header::HeaderValue::from_str(tracestate_value)?,
922 );
923 let url = format!("http://{}{}", addr, path);
924 let response = client.get(&url).headers(headers).send().await.unwrap();
925 let status = response.status();
926 let body = response.text().await.unwrap();
927 tracing::info!(body = body, status = status.to_string());
928 }
929
930 Ok::<(), anyhow::Error>(())
931 })(),
932 )
933 .await;
934 Ok(())
935 }
936
937 #[tokio::test]
938 async fn test_health_endpoint_with_changing_health_status() {
939 const ENDPOINT_NAME: &str = "generate";
942 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
943 temp_env::async_with_vars(
944 [
945 (env_system::DYN_SYSTEM_PORT, Some("0")),
946 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
947 (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
948 ],
949 async {
950 let drt = Arc::new(create_test_drt_async().await);
951
952 let system_info_opt = drt.system_status_server_info();
954
955 assert!(
957 system_info_opt.is_some(),
958 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
959 std::env::var(env_system::DYN_SYSTEM_PORT)
960 );
961
962 let system_info = system_info_opt.unwrap();
964 let addr = system_info.socket_addr;
965
966 let client = reqwest::Client::new();
968 let health_url = format!("http://{}/health", addr);
969
970 let response = client.get(&health_url).send().await.unwrap();
971 let status = response.status();
972 let body = response.text().await.unwrap();
973
974 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
976 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
977
978 let namespace = drt.namespace("ns1234").unwrap();
980 let component = namespace.component("comp1234").unwrap();
981
982 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
984 use crate::protocols::annotated::Annotated;
985
986 struct TestHandler;
987
988 #[async_trait]
989 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
990 async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
991 let (data, ctx) = input.into_parts();
992 let response = Annotated::from_data(format!("You responded: {}", data));
993 Ok(crate::pipeline::ResponseStream::new(
994 Box::pin(crate::stream::iter(vec![response])),
995 ctx.context()
996 ))
997 }
998 }
999
1000 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
1002
1003 tokio::spawn(async move {
1006 let _ = component.endpoint(ENDPOINT_NAME)
1007 .endpoint_builder()
1008 .handler(ingress)
1009 .health_check_payload(serde_json::json!({
1010 "test": "health_check"
1011 }))
1012 .start()
1013 .await;
1014 });
1015
1016 let mut success_count = 0;
1018 let mut failures = Vec::new();
1019
1020 for i in 1..=200 {
1021 let response = client.get(&health_url).send().await.unwrap();
1022 let status = response.status();
1023 let body = response.text().await.unwrap();
1024
1025 if status == 200 && body.contains("\"status\":\"ready\"") {
1026 success_count += 1;
1027 } else {
1028 failures.push((i, status.as_u16(), body.clone()));
1029 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
1031 }
1032 }
1033 }
1034
1035 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1036 if !failures.is_empty() {
1037 tracing::warn!("Failed requests: {}", failures.len());
1038 }
1039
1040 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1042 },
1043 )
1044 .await;
1045 }
1046
1047 #[tokio::test]
1048 async fn test_spawn_system_status_server_endpoints() {
1049 temp_env::async_with_vars(
1051 [
1052 (env_system::DYN_SYSTEM_PORT, Some("0")),
1053 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1054 ],
1055 async {
1056 let drt = Arc::new(create_test_drt_async().await);
1057
1058 let system_info = drt
1060 .system_status_server_info()
1061 .expect("System status server should be started by DRT");
1062 let addr = system_info.socket_addr;
1063 let client = reqwest::Client::new();
1064 for (path, expect_200, expect_body) in [
1065 ("/health", true, "ready"),
1066 ("/live", true, "ready"),
1067 ("/someRandomPathNotFoundHere", false, "Route not found"),
1068 ] {
1069 println!("[test] Sending request to {}", path);
1070 let url = format!("http://{}{}", addr, path);
1071 let response = client.get(&url).send().await.unwrap();
1072 let status = response.status();
1073 let body = response.text().await.unwrap();
1074 println!(
1075 "[test] Response for {}: status={}, body={:?}",
1076 path, status, body
1077 );
1078 if expect_200 {
1079 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1080 } else {
1081 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1082 }
1083 assert!(
1084 body.contains(expect_body),
1085 "Response: status={}, body={:?}",
1086 status,
1087 body
1088 );
1089 }
1090 },
1092 )
1093 .await;
1094 }
1095
1096 #[cfg(feature = "integration")]
1097 #[tokio::test]
1098 async fn test_health_check_with_payload_and_timeout() {
1099 crate::logging::init();
1101
1102 temp_env::async_with_vars(
1103 [
1104 (env_system::DYN_SYSTEM_PORT, Some("0")),
1105 (
1106 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1107 Some("notready"),
1108 ),
1109 (
1110 env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1111 Some("[\"test.endpoint\"]"),
1112 ),
1113 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1115 (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
1119 async {
1120 let drt = Arc::new(create_test_drt_async().await);
1121
1122 let system_info = drt
1124 .system_status_server_info()
1125 .expect("System status server should be started");
1126 let addr = system_info.socket_addr;
1127
1128 let client = reqwest::Client::new();
1129 let health_url = format!("http://{}/health", addr);
1130
1131 let endpoint = "test.endpoint";
1133 let health_check_payload = serde_json::json!({
1134 "prompt": "health check test",
1135 "_health_check": true
1136 });
1137
1138 {
1140 let system_health = drt.system_health();
1141 let system_health_lock = system_health.lock();
1142 system_health_lock.register_health_check_target(
1143 endpoint,
1144 crate::component::Instance {
1145 component: "test_component".to_string(),
1146 endpoint: "health".to_string(),
1147 namespace: "test_namespace".to_string(),
1148 instance_id: 1,
1149 transport: crate::component::TransportType::Nats(endpoint.to_string()),
1150 },
1151 health_check_payload.clone(),
1152 );
1153 }
1154
1155 let response = client.get(&health_url).send().await.unwrap();
1157 let status = response.status();
1158 let body = response.text().await.unwrap();
1159 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1160 assert!(
1161 body.contains("\"status\":\"notready\""),
1162 "Should show notready status initially"
1163 );
1164
1165 drt.system_health()
1167 .lock()
1168 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1169
1170 let response = client.get(&health_url).send().await.unwrap();
1172 let status = response.status();
1173 let body = response.text().await.unwrap();
1174
1175 assert_eq!(status, 200, "Should be healthy due to recent response");
1176 assert!(
1177 body.contains("\"status\":\"ready\""),
1178 "Should show ready status after response"
1179 );
1180
1181 let endpoint_status = drt
1183 .system_health()
1184 .lock()
1185 .get_endpoint_health_status(endpoint);
1186 assert_eq!(
1187 endpoint_status,
1188 Some(HealthStatus::Ready),
1189 "SystemHealth should show endpoint as Ready after response"
1190 );
1191 },
1192 )
1193 .await;
1194 }
1195}