1use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_system_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15 Router,
16 body::Bytes,
17 extract::{Json, Path, State},
18 http::StatusCode,
19 response::IntoResponse,
20 routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35 pub socket_addr: std::net::SocketAddr,
36 pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41 Self {
42 socket_addr,
43 handle: handle.map(Arc::new),
44 }
45 }
46
47 pub fn address(&self) -> String {
48 self.socket_addr.to_string()
49 }
50
51 pub fn hostname(&self) -> String {
52 self.socket_addr.ip().to_string()
53 }
54
55 pub fn port(&self) -> u16 {
56 self.socket_addr.port()
57 }
58}
59
60impl Clone for SystemStatusServerInfo {
61 fn clone(&self) -> Self {
62 Self {
63 socket_addr: self.socket_addr,
64 handle: self.handle.clone(),
65 }
66 }
67}
68
69pub struct SystemStatusState {
71 root_drt: Arc<crate::DistributedRuntime>,
73 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78 pub fn new(
80 drt: Arc<crate::DistributedRuntime>,
81 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82 ) -> anyhow::Result<Self> {
83 Ok(Self {
84 root_drt: drt,
85 discovery_metadata,
86 })
87 }
88
89 pub fn drt(&self) -> &crate::DistributedRuntime {
91 &self.root_drt
92 }
93
94 pub fn discovery_metadata(
96 &self,
97 ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98 self.discovery_metadata.as_ref()
99 }
100}
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105 pub lora_name: String,
106 pub source: LoraSource,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112 pub uri: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118 pub status: String,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub message: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub lora_name: Option<String>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub lora_id: Option<u64>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub loras: Option<serde_json::Value>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub count: Option<usize>,
129}
130
131pub async fn spawn_system_status_server(
133 host: &str,
134 port: u16,
135 cancel_token: CancellationToken,
136 drt: Arc<crate::DistributedRuntime>,
137 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139 let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141 let health_path = server_state
142 .drt()
143 .system_health()
144 .lock()
145 .health_path()
146 .to_string();
147 let live_path = server_state
148 .drt()
149 .system_health()
150 .lock()
151 .live_path()
152 .to_string();
153
154 let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156 .map(|v| v.to_lowercase() == "true")
157 .unwrap_or(false);
158
159 let mut app = Router::new()
160 .route(
161 &health_path,
162 get({
163 let state = Arc::clone(&server_state);
164 move || health_handler(state)
165 }),
166 )
167 .route(
168 &live_path,
169 get({
170 let state = Arc::clone(&server_state);
171 move || health_handler(state)
172 }),
173 )
174 .route(
175 "/metrics",
176 get({
177 let state = Arc::clone(&server_state);
178 move || metrics_handler(state)
179 }),
180 )
181 .route(
182 "/metadata",
183 get({
184 let state = Arc::clone(&server_state);
185 move || metadata_handler(state)
186 }),
187 )
188 .route(
189 "/engine/{*path}",
190 any({
191 let state = Arc::clone(&server_state);
192 move |path, body| engine_route_handler(state, path, body)
193 }),
194 );
195
196 if lora_enabled {
198 app = app
199 .route(
200 "/v1/loras",
201 get({
202 let state = Arc::clone(&server_state);
203 move || list_loras_handler(State(state))
204 })
205 .post({
206 let state = Arc::clone(&server_state);
207 move |body| load_lora_handler(State(state), body)
208 }),
209 )
210 .route(
211 "/v1/loras/{*lora_name}",
212 delete({
213 let state = Arc::clone(&server_state);
214 move |path| unload_lora_handler(State(state), path)
215 }),
216 );
217 }
218
219 app = app.route(
223 "/v1/metadata/{model_slug}/{model_suffix}/{*filename}",
224 get({
225 let state = Arc::clone(&server_state);
226 move |path| metadata_file_handler(State(state), path)
227 }),
228 );
229
230 let app = app
231 .fallback(|| async {
232 tracing::info!("[fallback handler] called");
233 (StatusCode::NOT_FOUND, "Route not found").into_response()
234 })
235 .layer(TraceLayer::new_for_http().make_span_with(make_system_request_span));
236
237 let address = format!("{}:{}", host, port);
238 tracing::info!("[spawn_system_status_server] binding to: {address}");
239
240 let listener = match TcpListener::bind(&address).await {
241 Ok(listener) => {
242 let actual_address = listener.local_addr()?;
244 tracing::info!(
245 "[spawn_system_status_server] system status server bound to: {}",
246 actual_address
247 );
248 (listener, actual_address)
249 }
250 Err(e) => {
251 tracing::error!("Failed to bind to address {}: {}", address, e);
252 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
253 }
254 };
255 let (listener, actual_address) = listener;
256
257 let observer = cancel_token.child_token();
258 let handle = tokio::spawn(async move {
260 if let Err(e) = axum::serve(listener, app)
261 .with_graceful_shutdown(observer.cancelled_owned())
262 .await
263 {
264 tracing::error!("System status server error: {e}");
265 }
266 });
267
268 Ok((actual_address, handle))
269}
270
271#[tracing::instrument(skip_all, level = "trace")]
273async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
274 let system_health = state.drt().system_health();
276 let system_health_lock = system_health.lock();
277 let (healthy, endpoints) = system_health_lock.get_health_status();
278 let uptime = Some(system_health_lock.uptime());
279 drop(system_health_lock);
280
281 let healthy_string = if healthy { "ready" } else { "notready" };
282 let status_code = if healthy {
283 StatusCode::OK
284 } else {
285 StatusCode::SERVICE_UNAVAILABLE
286 };
287
288 let response = json!({
289 "status": healthy_string,
290 "uptime": uptime,
291 "endpoints": endpoints,
292 });
293
294 tracing::trace!("Response {}", response.to_string());
295
296 (status_code, response.to_string())
297}
298
299#[tracing::instrument(skip_all, level = "trace")]
301async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
302 let response = match state.drt().metrics().prometheus_expfmt() {
309 Ok(r) => r,
310 Err(e) => {
311 tracing::error!("Failed to get metrics from registry: {e}");
312 return (
313 StatusCode::INTERNAL_SERVER_ERROR,
314 "Failed to get metrics".to_string(),
315 );
316 }
317 };
318
319 (StatusCode::OK, response)
320}
321
322#[tracing::instrument(skip_all, level = "trace")]
324async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
325 let metadata = match state.discovery_metadata() {
327 Some(metadata) => metadata,
328 None => {
329 tracing::debug!("Metadata endpoint called but no discovery metadata available");
330 return (
331 StatusCode::NOT_FOUND,
332 "Discovery metadata not available".to_string(),
333 )
334 .into_response();
335 }
336 };
337
338 let metadata_guard = metadata.read().await;
340
341 match serde_json::to_string(&*metadata_guard) {
343 Ok(json) => {
344 tracing::trace!("Returning metadata: {} bytes", json.len());
345 (StatusCode::OK, json).into_response()
346 }
347 Err(e) => {
348 tracing::error!("Failed to serialize metadata: {e}");
349 (
350 StatusCode::INTERNAL_SERVER_ERROR,
351 "Failed to serialize metadata".to_string(),
352 )
353 .into_response()
354 }
355 }
356}
357
358#[tracing::instrument(skip_all, level = "debug")]
360async fn load_lora_handler(
361 State(state): State<Arc<SystemStatusState>>,
362 Json(request): Json<LoadLoraRequest>,
363) -> impl IntoResponse {
364 tracing::info!("Loading LoRA: {}", request.lora_name);
365
366 match call_lora_endpoint(
368 state.drt(),
369 "load_lora",
370 json!({
371 "lora_name": request.lora_name,
372 "source": {
373 "uri": request.source.uri
374 },
375 }),
376 )
377 .await
378 {
379 Ok(response) => {
380 if response.status == "error" {
381 tracing::error!(
382 "Failed to load LoRA {}: {}",
383 request.lora_name,
384 response.message.as_deref().unwrap_or("Unknown error")
385 );
386 (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
387 } else {
388 tracing::info!("LoRA loaded successfully: {}", request.lora_name);
389 (StatusCode::OK, Json(response))
390 }
391 }
392 Err(e) => {
393 tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
394 (
395 StatusCode::INTERNAL_SERVER_ERROR,
396 Json(LoraResponse {
397 status: "error".to_string(),
398 message: Some(e.to_string()),
399 lora_name: Some(request.lora_name),
400 lora_id: None,
401 loras: None,
402 count: None,
403 }),
404 )
405 }
406 }
407}
408
409#[tracing::instrument(skip_all, level = "debug")]
411async fn unload_lora_handler(
412 State(state): State<Arc<SystemStatusState>>,
413 Path(lora_name): Path<String>,
414) -> impl IntoResponse {
415 let lora_name = lora_name
417 .strip_prefix('/')
418 .unwrap_or(&lora_name)
419 .to_string();
420 tracing::info!("Unloading LoRA: {lora_name}");
421
422 match call_lora_endpoint(
424 state.drt(),
425 "unload_lora",
426 json!({
427 "lora_name": lora_name.clone(),
428 }),
429 )
430 .await
431 {
432 Ok(response) => {
433 if response.status == "error" {
434 tracing::error!(
435 "Failed to unload LoRA {}: {}",
436 lora_name,
437 response.message.as_deref().unwrap_or("Unknown error")
438 );
439 (StatusCode::INTERNAL_SERVER_ERROR, Json(response))
440 } else {
441 tracing::info!("LoRA unloaded successfully: {lora_name}");
442 (StatusCode::OK, Json(response))
443 }
444 }
445 Err(e) => {
446 tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
447 (
448 StatusCode::INTERNAL_SERVER_ERROR,
449 Json(LoraResponse {
450 status: "error".to_string(),
451 message: Some(e.to_string()),
452 lora_name: Some(lora_name),
453 lora_id: None,
454 loras: None,
455 count: None,
456 }),
457 )
458 }
459 }
460}
461
462#[tracing::instrument(skip_all, level = "debug")]
464async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
465 tracing::info!("Listing all LoRAs");
466
467 match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
469 Ok(response) => {
470 tracing::info!("Successfully retrieved LoRA list");
471 (StatusCode::OK, Json(response))
472 }
473 Err(e) => {
474 tracing::error!("Failed to list LoRAs: {e}");
475 (
476 StatusCode::INTERNAL_SERVER_ERROR,
477 Json(LoraResponse {
478 status: "error".to_string(),
479 message: Some(e.to_string()),
480 lora_name: None,
481 lora_id: None,
482 loras: None,
483 count: None,
484 }),
485 )
486 }
487 }
488}
489
490async fn metadata_file_handler(
493 State(state): State<Arc<SystemStatusState>>,
494 Path((model_slug, model_suffix, filename)): Path<(String, String, String)>,
495) -> impl IntoResponse {
496 let path = match state
497 .drt()
498 .metadata_artifacts()
499 .get(&model_slug, &model_suffix, &filename)
500 {
501 Some(p) => p,
502 None => {
503 tracing::debug!(
504 model_slug,
505 model_suffix,
506 filename,
507 "metadata artifact not registered for self-host"
508 );
509 return (StatusCode::NOT_FOUND, "Not found").into_response();
510 }
511 };
512
513 match tokio::fs::read(&path).await {
514 Ok(bytes) => (StatusCode::OK, bytes).into_response(),
515 Err(err) => {
516 tracing::error!(
517 model_slug,
518 model_suffix,
519 filename,
520 path = %path.display(),
521 %err,
522 "failed to read self-hosted metadata file"
523 );
524 (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read file").into_response()
525 }
526 }
527}
528
529async fn call_lora_endpoint(
534 drt: &crate::DistributedRuntime,
535 endpoint_name: &str,
536 request_body: serde_json::Value,
537) -> anyhow::Result<LoraResponse> {
538 use crate::engine::AsyncEngine;
539
540 tracing::debug!("Calling local endpoint: '{endpoint_name}'");
541
542 let local_registry = drt.local_endpoint_registry();
544 let engine = local_registry
545 .get(endpoint_name)
546 .ok_or_else(|| {
547 anyhow::anyhow!(
548 "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
549 endpoint_name
550 )
551 })?;
552
553 tracing::debug!(
554 "Found endpoint '{}' in local registry, calling directly",
555 endpoint_name
556 );
557
558 let request = crate::pipeline::SingleIn::new(request_body);
560 let mut stream = engine.generate(request).await?;
561
562 if let Some(response) = stream.next().await {
564 let response_data = response.data.unwrap_or_default();
565
566 let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
568 .unwrap_or_else(|_| parse_lora_response(&response_data));
569
570 return Ok(lora_response);
571 }
572
573 anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
574}
575
576fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
578 LoraResponse {
579 status: response_data
580 .get("status")
581 .and_then(|s| s.as_str())
582 .unwrap_or("success")
583 .to_string(),
584 message: response_data
585 .get("message")
586 .and_then(|m| m.as_str())
587 .map(|s| s.to_string()),
588 lora_name: response_data
589 .get("lora_name")
590 .and_then(|n| n.as_str())
591 .map(|s| s.to_string()),
592 lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
593 loras: response_data.get("loras").cloned(),
594 count: response_data
595 .get("count")
596 .and_then(|c| c.as_u64())
597 .map(|c| c as usize),
598 }
599}
600
601#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
606async fn engine_route_handler(
607 state: Arc<SystemStatusState>,
608 Path(path): Path<String>,
609 body: Bytes,
610) -> impl IntoResponse {
611 tracing::trace!("Engine route request to /engine/{path}");
612
613 let body_json: serde_json::Value = if body.is_empty() {
615 serde_json::json!({})
616 } else {
617 match serde_json::from_slice(&body) {
618 Ok(json) => json,
619 Err(e) => {
620 tracing::warn!("Invalid JSON in request body: {e}");
621 return (
622 StatusCode::BAD_REQUEST,
623 json!({
624 "error": "Invalid JSON",
625 "message": format!("{}", e)
626 })
627 .to_string(),
628 )
629 .into_response();
630 }
631 }
632 };
633
634 let callback = match state.drt().engine_routes().get(&path) {
636 Some(cb) => cb,
637 None => {
638 tracing::debug!("Route /engine/{path} not found");
639 return (
640 StatusCode::NOT_FOUND,
641 json!({
642 "error": "Route not found",
643 "message": format!("Route /engine/{} not found", path)
644 })
645 .to_string(),
646 )
647 .into_response();
648 }
649 };
650
651 match callback(body_json).await {
653 Ok(response) => {
654 tracing::trace!("Engine route handler succeeded for /engine/{path}");
655 (StatusCode::OK, response.to_string()).into_response()
656 }
657 Err(e) => {
658 tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
659 (
660 StatusCode::INTERNAL_SERVER_ERROR,
661 json!({
662 "error": "Handler error",
663 "message": format!("{}", e)
664 })
665 .to_string(),
666 )
667 .into_response()
668 }
669 }
670}
671
672#[cfg(test)]
674mod tests {
675 use super::*;
676 use tokio::time::Duration;
677
678 #[tokio::test]
680 async fn test_http_server_lifecycle() {
681 let cancel_token = CancellationToken::new();
682 let cancel_token_for_server = cancel_token.clone();
683
684 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
686
687 let server_handle = tokio::spawn(async move {
689 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
690 let _ = axum::serve(listener, app)
691 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
692 .await;
693 });
694
695 cancel_token.cancel();
699
700 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
702 assert!(
703 result.is_ok(),
704 "HTTP server should shut down when cancel token is cancelled"
705 );
706 }
707}
708
709#[cfg(all(test, feature = "integration"))]
711mod integration_tests {
712 use super::*;
713 use crate::config::environment_names::logging as env_logging;
714 use crate::config::environment_names::runtime::canary as env_canary;
715 use crate::distributed::distributed_test_utils::create_test_drt_async;
716 use crate::metrics::MetricsHierarchy;
717 use anyhow::Result;
718 use rstest::rstest;
719 use std::sync::Arc;
720 use tokio::time::Duration;
721
722 #[tokio::test]
723 async fn test_uptime_from_system_health() {
724 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
726 let drt = create_test_drt_async().await;
727
728 let uptime = drt.system_health().lock().uptime();
730 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
732
733 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
735 let uptime_after = drt.system_health().lock().uptime();
736 assert!(uptime_after > uptime);
737 })
738 .await;
739 }
740
741 #[tokio::test]
742 async fn test_runtime_metrics_initialization_and_namespace() {
743 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
745 let drt = create_test_drt_async().await;
746 let response = drt.metrics().prometheus_expfmt().unwrap();
751 println!("Full metrics response:\n{}", response);
752
753 assert!(
755 response.contains("# HELP dynamo_component_uptime_seconds"),
756 "Should contain uptime_seconds help text"
757 );
758 assert!(
759 response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
760 "Should contain uptime_seconds type"
761 );
762 assert!(
763 response.contains("dynamo_component_uptime_seconds"),
764 "Should contain uptime_seconds metric with correct namespace"
765 );
766 })
767 .await;
768 }
769
770 #[tokio::test]
771 async fn test_uptime_gauge_updates() {
772 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
774 let drt = create_test_drt_async().await;
775
776 let initial_uptime = drt.system_health().lock().uptime();
778
779 drt.system_health().lock().update_uptime_gauge();
781
782 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
784
785 let uptime_after_sleep = drt.system_health().lock().uptime();
787
788 drt.system_health().lock().update_uptime_gauge();
790
791 let elapsed = uptime_after_sleep - initial_uptime;
793 assert!(
794 elapsed >= std::time::Duration::from_millis(100),
795 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
796 elapsed
797 );
798 })
799 .await;
800 }
801
802 #[tokio::test]
803 async fn test_http_requests_fail_when_system_disabled() {
804 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
806 let drt = create_test_drt_async().await;
807
808 let system_info = drt.system_status_server_info();
810 assert!(
811 system_info.is_none(),
812 "System status server should not be running when disabled"
813 );
814
815 println!("✓ System status server correctly disabled when not enabled");
816 })
817 .await;
818 }
819
820 #[rstest]
827 #[case("ready", 200, "ready", None, None, 3)]
828 #[case("notready", 503, "notready", None, None, 3)]
829 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
830 #[case(
831 "notready",
832 503,
833 "notready",
834 Some("/custom/health"),
835 Some("/custom/live"),
836 5
837 )]
838 #[tokio::test]
839 #[cfg(feature = "integration")]
840 async fn test_health_endpoints(
841 #[case] starting_health_status: &'static str,
842 #[case] expected_status: u16,
843 #[case] expected_body: &'static str,
844 #[case] custom_health_path: Option<&'static str>,
845 #[case] custom_live_path: Option<&'static str>,
846 #[case] expected_num_tests: usize,
847 ) {
848 use std::sync::Arc;
849 crate::logging::init();
855
856 #[allow(clippy::redundant_closure_call)]
857 temp_env::async_with_vars(
858 [
859 (env_system::DYN_SYSTEM_PORT, Some("0")),
860 (
861 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
862 Some(starting_health_status),
863 ),
864 (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
865 (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
866 ],
867 (async || {
868 let drt = Arc::new(create_test_drt_async().await);
869
870 let system_info = drt
872 .system_status_server_info()
873 .expect("System status server should be started by DRT");
874 let addr = system_info.socket_addr;
875
876 let client = reqwest::Client::new();
877
878 let mut test_cases = vec![];
880 match custom_health_path {
881 None => {
882 test_cases.push(("/health", expected_status, expected_body));
884 }
885 Some(chp) => {
886 test_cases.push(("/health", 404, "Route not found"));
888 test_cases.push((chp, expected_status, expected_body));
889 }
890 }
891 match custom_live_path {
892 None => {
893 test_cases.push(("/live", expected_status, expected_body));
895 }
896 Some(clp) => {
897 test_cases.push(("/live", 404, "Route not found"));
899 test_cases.push((clp, expected_status, expected_body));
900 }
901 }
902 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
903 assert_eq!(test_cases.len(), expected_num_tests);
904
905 for (path, expect_status, expect_body) in test_cases {
906 println!("[test] Sending request to {}", path);
907 let url = format!("http://{}{}", addr, path);
908 let response = client.get(&url).send().await.unwrap();
909 let status = response.status();
910 let body = response.text().await.unwrap();
911 println!(
912 "[test] Response for {}: status={}, body={:?}",
913 path, status, body
914 );
915 assert_eq!(
916 status, expect_status,
917 "Response: status={}, body={:?}",
918 status, body
919 );
920 assert!(
921 body.contains(expect_body),
922 "Response: status={}, body={:?}",
923 status,
924 body
925 );
926 }
927 })(),
928 )
929 .await;
930 }
931
932 #[tokio::test]
933 async fn test_health_endpoint_tracing() -> Result<()> {
934 use std::sync::Arc;
935
936 #[allow(clippy::redundant_closure_call)]
939 let _ = temp_env::async_with_vars(
940 [
941 (env_system::DYN_SYSTEM_PORT, Some("0")),
942 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
943 (env_logging::DYN_LOGGING_JSONL, Some("1")),
944 (env_logging::DYN_LOG, Some("trace")),
945 ],
946 (async || {
947 crate::logging::init();
951
952 let drt = Arc::new(create_test_drt_async().await);
953
954 let system_info = drt
956 .system_status_server_info()
957 .expect("System status server should be started by DRT");
958 let addr = system_info.socket_addr;
959 let client = reqwest::Client::new();
960 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
961 let traceparent_value =
962 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
963 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
964 let mut headers = reqwest::header::HeaderMap::new();
965 headers.insert(
966 reqwest::header::HeaderName::from_static("traceparent"),
967 reqwest::header::HeaderValue::from_str(traceparent_value)?,
968 );
969 headers.insert(
970 reqwest::header::HeaderName::from_static("tracestate"),
971 reqwest::header::HeaderValue::from_str(tracestate_value)?,
972 );
973 let url = format!("http://{}{}", addr, path);
974 let response = client.get(&url).headers(headers).send().await.unwrap();
975 let status = response.status();
976 let body = response.text().await.unwrap();
977 tracing::info!(body = body, status = status.to_string());
978 }
979
980 Ok::<(), anyhow::Error>(())
981 })(),
982 )
983 .await;
984 Ok(())
985 }
986
987 #[tokio::test]
988 async fn test_health_endpoint_with_changing_health_status() {
989 const ENDPOINT_NAME: &str = "generate";
992 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
993 temp_env::async_with_vars(
994 [
995 (env_system::DYN_SYSTEM_PORT, Some("0")),
996 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
997 (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
998 ],
999 async {
1000 let drt = Arc::new(create_test_drt_async().await);
1001
1002 let system_info_opt = drt.system_status_server_info();
1004
1005 assert!(
1007 system_info_opt.is_some(),
1008 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
1009 std::env::var(env_system::DYN_SYSTEM_PORT)
1010 );
1011
1012 let system_info = system_info_opt.unwrap();
1014 let addr = system_info.socket_addr;
1015
1016 let client = reqwest::Client::new();
1018 let health_url = format!("http://{}/health", addr);
1019
1020 let response = client.get(&health_url).send().await.unwrap();
1021 let status = response.status();
1022 let body = response.text().await.unwrap();
1023
1024 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
1026 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
1027
1028 let namespace = drt.namespace("ns1234").unwrap();
1030 let component = namespace.component("comp1234").unwrap();
1031
1032 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
1034 use crate::protocols::annotated::Annotated;
1035
1036 struct TestHandler;
1037
1038 #[async_trait]
1039 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
1040 async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
1041 let (data, ctx) = input.into_parts();
1042 let response = Annotated::from_data(format!("You responded: {}", data));
1043 Ok(crate::pipeline::ResponseStream::new(
1044 Box::pin(crate::stream::iter(vec![response])),
1045 ctx.context()
1046 ))
1047 }
1048 }
1049
1050 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
1052
1053 tokio::spawn(async move {
1056 let _ = component.endpoint(ENDPOINT_NAME)
1057 .endpoint_builder()
1058 .handler(ingress)
1059 .health_check_payload(serde_json::json!({
1060 "test": "health_check"
1061 }))
1062 .start()
1063 .await;
1064 });
1065
1066 let mut success_count = 0;
1068 let mut failures = Vec::new();
1069
1070 for i in 1..=200 {
1071 let response = client.get(&health_url).send().await.unwrap();
1072 let status = response.status();
1073 let body = response.text().await.unwrap();
1074
1075 if status == 200 && body.contains("\"status\":\"ready\"") {
1076 success_count += 1;
1077 } else {
1078 failures.push((i, status.as_u16(), body.clone()));
1079 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
1081 }
1082 }
1083 }
1084
1085 tracing::info!("Health endpoint test results: {success_count}/200 requests succeeded");
1086 if !failures.is_empty() {
1087 tracing::warn!("Failed requests: {}", failures.len());
1088 }
1089
1090 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1092 },
1093 )
1094 .await;
1095 }
1096
1097 #[tokio::test]
1098 async fn test_spawn_system_status_server_endpoints() {
1099 temp_env::async_with_vars(
1101 [
1102 (env_system::DYN_SYSTEM_PORT, Some("0")),
1103 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1104 ],
1105 async {
1106 let drt = Arc::new(create_test_drt_async().await);
1107
1108 let system_info = drt
1110 .system_status_server_info()
1111 .expect("System status server should be started by DRT");
1112 let addr = system_info.socket_addr;
1113 let client = reqwest::Client::new();
1114 for (path, expect_200, expect_body) in [
1115 ("/health", true, "ready"),
1116 ("/live", true, "ready"),
1117 ("/someRandomPathNotFoundHere", false, "Route not found"),
1118 ] {
1119 println!("[test] Sending request to {}", path);
1120 let url = format!("http://{}{}", addr, path);
1121 let response = client.get(&url).send().await.unwrap();
1122 let status = response.status();
1123 let body = response.text().await.unwrap();
1124 println!(
1125 "[test] Response for {}: status={}, body={:?}",
1126 path, status, body
1127 );
1128 if expect_200 {
1129 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1130 } else {
1131 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1132 }
1133 assert!(
1134 body.contains(expect_body),
1135 "Response: status={}, body={:?}",
1136 status,
1137 body
1138 );
1139 }
1140 },
1142 )
1143 .await;
1144 }
1145
1146 #[cfg(feature = "integration")]
1147 #[tokio::test]
1148 async fn test_health_check_with_payload_and_timeout() {
1149 crate::logging::init();
1151
1152 temp_env::async_with_vars(
1153 [
1154 (env_system::DYN_SYSTEM_PORT, Some("0")),
1155 (
1156 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1157 Some("notready"),
1158 ),
1159 (
1160 env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1161 Some("[\"test.endpoint\"]"),
1162 ),
1163 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1165 (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
1169 async {
1170 let drt = Arc::new(create_test_drt_async().await);
1171
1172 let system_info = drt
1174 .system_status_server_info()
1175 .expect("System status server should be started");
1176 let addr = system_info.socket_addr;
1177
1178 let client = reqwest::Client::new();
1179 let health_url = format!("http://{}/health", addr);
1180
1181 let endpoint = "test.endpoint";
1183 let health_check_payload = serde_json::json!({
1184 "prompt": "health check test",
1185 "_health_check": true
1186 });
1187
1188 {
1190 let system_health = drt.system_health();
1191 let system_health_lock = system_health.lock();
1192 system_health_lock.register_health_check_target(
1193 endpoint,
1194 crate::component::Instance {
1195 component: "test_component".to_string(),
1196 endpoint: "health".to_string(),
1197 namespace: "test_namespace".to_string(),
1198 instance_id: 1,
1199 transport: crate::component::TransportType::Nats(endpoint.to_string()),
1200 device_type: None,
1201 },
1202 health_check_payload.clone(),
1203 );
1204 }
1205
1206 let response = client.get(&health_url).send().await.unwrap();
1208 let status = response.status();
1209 let body = response.text().await.unwrap();
1210 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1211 assert!(
1212 body.contains("\"status\":\"notready\""),
1213 "Should show notready status initially"
1214 );
1215
1216 drt.system_health()
1218 .lock()
1219 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1220
1221 let response = client.get(&health_url).send().await.unwrap();
1223 let status = response.status();
1224 let body = response.text().await.unwrap();
1225
1226 assert_eq!(status, 200, "Should be healthy due to recent response");
1227 assert!(
1228 body.contains("\"status\":\"ready\""),
1229 "Should show ready status after response"
1230 );
1231
1232 let endpoint_status = drt
1234 .system_health()
1235 .lock()
1236 .get_endpoint_health_status(endpoint);
1237 assert_eq!(
1238 endpoint_status,
1239 Some(HealthStatus::Ready),
1240 "SystemHealth should show endpoint as Ready after response"
1241 );
1242 },
1243 )
1244 .await;
1245 }
1246}