1use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15 Router,
16 body::Bytes,
17 extract::{Json, Path, State},
18 http::StatusCode,
19 response::IntoResponse,
20 routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35 pub socket_addr: std::net::SocketAddr,
36 pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41 Self {
42 socket_addr,
43 handle: handle.map(Arc::new),
44 }
45 }
46
47 pub fn address(&self) -> String {
48 self.socket_addr.to_string()
49 }
50
51 pub fn hostname(&self) -> String {
52 self.socket_addr.ip().to_string()
53 }
54
55 pub fn port(&self) -> u16 {
56 self.socket_addr.port()
57 }
58}
59
60impl Clone for SystemStatusServerInfo {
61 fn clone(&self) -> Self {
62 Self {
63 socket_addr: self.socket_addr,
64 handle: self.handle.clone(),
65 }
66 }
67}
68
69pub struct SystemStatusState {
71 root_drt: Arc<crate::DistributedRuntime>,
73 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78 pub fn new(
80 drt: Arc<crate::DistributedRuntime>,
81 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82 ) -> anyhow::Result<Self> {
83 Ok(Self {
84 root_drt: drt,
85 discovery_metadata,
86 })
87 }
88
89 pub fn drt(&self) -> &crate::DistributedRuntime {
91 &self.root_drt
92 }
93
94 pub fn discovery_metadata(
96 &self,
97 ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98 self.discovery_metadata.as_ref()
99 }
100}
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105 pub lora_name: String,
106 pub source: LoraSource,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112 pub uri: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118 pub status: String,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub message: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub lora_name: Option<String>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub lora_id: Option<u64>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub loras: Option<serde_json::Value>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub count: Option<usize>,
129}
130
131pub async fn spawn_system_status_server(
133 host: &str,
134 port: u16,
135 cancel_token: CancellationToken,
136 drt: Arc<crate::DistributedRuntime>,
137 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139 let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141 let health_path = server_state
142 .drt()
143 .system_health()
144 .lock()
145 .health_path()
146 .to_string();
147 let live_path = server_state
148 .drt()
149 .system_health()
150 .lock()
151 .live_path()
152 .to_string();
153
154 let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156 .map(|v| v.to_lowercase() == "true")
157 .unwrap_or(false);
158
159 let mut app = Router::new()
160 .route(
161 &health_path,
162 get({
163 let state = Arc::clone(&server_state);
164 move || health_handler(state)
165 }),
166 )
167 .route(
168 &live_path,
169 get({
170 let state = Arc::clone(&server_state);
171 move || health_handler(state)
172 }),
173 )
174 .route(
175 "/metrics",
176 get({
177 let state = Arc::clone(&server_state);
178 move || metrics_handler(state)
179 }),
180 )
181 .route(
182 "/metadata",
183 get({
184 let state = Arc::clone(&server_state);
185 move || metadata_handler(state)
186 }),
187 )
188 .route(
189 "/engine/{*path}",
190 any({
191 let state = Arc::clone(&server_state);
192 move |path, body| engine_route_handler(state, path, body)
193 }),
194 );
195
196 if lora_enabled {
198 app = app
199 .route(
200 "/v1/loras",
201 get({
202 let state = Arc::clone(&server_state);
203 move || list_loras_handler(State(state))
204 })
205 .post({
206 let state = Arc::clone(&server_state);
207 move |body| load_lora_handler(State(state), body)
208 }),
209 )
210 .route(
211 "/v1/loras/{*lora_name}",
212 delete({
213 let state = Arc::clone(&server_state);
214 move |path| unload_lora_handler(State(state), path)
215 }),
216 );
217 }
218
219 let app = app
220 .fallback(|| async {
221 tracing::info!("[fallback handler] called");
222 (StatusCode::NOT_FOUND, "Route not found").into_response()
223 })
224 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226 let address = format!("{}:{}", host, port);
227 tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229 let listener = match TcpListener::bind(&address).await {
230 Ok(listener) => {
231 let actual_address = listener.local_addr()?;
233 tracing::info!(
234 "[spawn_system_status_server] system status server bound to: {}",
235 actual_address
236 );
237 (listener, actual_address)
238 }
239 Err(e) => {
240 tracing::error!("Failed to bind to address {}: {}", address, e);
241 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242 }
243 };
244 let (listener, actual_address) = listener;
245
246 let observer = cancel_token.child_token();
247 let handle = tokio::spawn(async move {
249 if let Err(e) = axum::serve(listener, app)
250 .with_graceful_shutdown(observer.cancelled_owned())
251 .await
252 {
253 tracing::error!("System status server error: {}", e);
254 }
255 });
256
257 Ok((actual_address, handle))
258}
259
260#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263 let system_health = state.drt().system_health();
265 let system_health_lock = system_health.lock();
266 let (healthy, endpoints) = system_health_lock.get_health_status();
267 let uptime = Some(system_health_lock.uptime());
268 drop(system_health_lock);
269
270 let healthy_string = if healthy { "ready" } else { "notready" };
271 let status_code = if healthy {
272 StatusCode::OK
273 } else {
274 StatusCode::SERVICE_UNAVAILABLE
275 };
276
277 let response = json!({
278 "status": healthy_string,
279 "uptime": uptime,
280 "endpoints": endpoints,
281 });
282
283 tracing::trace!("Response {}", response.to_string());
284
285 (status_code, response.to_string())
286}
287
288#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291 state.drt().system_health().lock().update_uptime_gauge();
293
294 let response = match state.drt().metrics().prometheus_expfmt() {
299 Ok(r) => r,
300 Err(e) => {
301 tracing::error!("Failed to get metrics from registry: {}", e);
302 return (
303 StatusCode::INTERNAL_SERVER_ERROR,
304 "Failed to get metrics".to_string(),
305 );
306 }
307 };
308
309 (StatusCode::OK, response)
310}
311
312#[tracing::instrument(skip_all, level = "trace")]
314async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
315 let metadata = match state.discovery_metadata() {
317 Some(metadata) => metadata,
318 None => {
319 tracing::debug!("Metadata endpoint called but no discovery metadata available");
320 return (
321 StatusCode::NOT_FOUND,
322 "Discovery metadata not available".to_string(),
323 )
324 .into_response();
325 }
326 };
327
328 let metadata_guard = metadata.read().await;
330
331 match serde_json::to_string(&*metadata_guard) {
333 Ok(json) => {
334 tracing::trace!("Returning metadata: {} bytes", json.len());
335 (StatusCode::OK, json).into_response()
336 }
337 Err(e) => {
338 tracing::error!("Failed to serialize metadata: {}", e);
339 (
340 StatusCode::INTERNAL_SERVER_ERROR,
341 "Failed to serialize metadata".to_string(),
342 )
343 .into_response()
344 }
345 }
346}
347
348#[tracing::instrument(skip_all, level = "debug")]
350async fn load_lora_handler(
351 State(state): State<Arc<SystemStatusState>>,
352 Json(request): Json<LoadLoraRequest>,
353) -> impl IntoResponse {
354 tracing::info!("Loading LoRA: {}", request.lora_name);
355
356 match call_lora_endpoint(
358 state.drt(),
359 "load_lora",
360 json!({
361 "lora_name": request.lora_name,
362 "source": {
363 "uri": request.source.uri
364 },
365 }),
366 )
367 .await
368 {
369 Ok(response) => {
370 tracing::info!("LoRA loaded successfully: {}", request.lora_name);
371 (StatusCode::OK, Json(response))
372 }
373 Err(e) => {
374 tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
375 (
376 StatusCode::INTERNAL_SERVER_ERROR,
377 Json(LoraResponse {
378 status: "error".to_string(),
379 message: Some(e.to_string()),
380 lora_name: Some(request.lora_name),
381 lora_id: None,
382 loras: None,
383 count: None,
384 }),
385 )
386 }
387 }
388}
389
390#[tracing::instrument(skip_all, level = "debug")]
392async fn unload_lora_handler(
393 State(state): State<Arc<SystemStatusState>>,
394 Path(lora_name): Path<String>,
395) -> impl IntoResponse {
396 let lora_name = lora_name
398 .strip_prefix('/')
399 .unwrap_or(&lora_name)
400 .to_string();
401 tracing::info!("Unloading LoRA: {}", lora_name);
402
403 match call_lora_endpoint(
405 state.drt(),
406 "unload_lora",
407 json!({
408 "lora_name": lora_name.clone(),
409 }),
410 )
411 .await
412 {
413 Ok(response) => {
414 tracing::info!("LoRA unloaded successfully: {}", lora_name);
415 (StatusCode::OK, Json(response))
416 }
417 Err(e) => {
418 tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
419 (
420 StatusCode::INTERNAL_SERVER_ERROR,
421 Json(LoraResponse {
422 status: "error".to_string(),
423 message: Some(e.to_string()),
424 lora_name: Some(lora_name),
425 lora_id: None,
426 loras: None,
427 count: None,
428 }),
429 )
430 }
431 }
432}
433
434#[tracing::instrument(skip_all, level = "debug")]
436async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
437 tracing::info!("Listing all LoRAs");
438
439 match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
441 Ok(response) => {
442 tracing::info!("Successfully retrieved LoRA list");
443 (StatusCode::OK, Json(response))
444 }
445 Err(e) => {
446 tracing::error!("Failed to list LoRAs: {}", e);
447 (
448 StatusCode::INTERNAL_SERVER_ERROR,
449 Json(LoraResponse {
450 status: "error".to_string(),
451 message: Some(e.to_string()),
452 lora_name: None,
453 lora_id: None,
454 loras: None,
455 count: None,
456 }),
457 )
458 }
459 }
460}
461
462async fn call_lora_endpoint(
467 drt: &crate::DistributedRuntime,
468 endpoint_name: &str,
469 request_body: serde_json::Value,
470) -> anyhow::Result<LoraResponse> {
471 use crate::engine::AsyncEngine;
472
473 tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
474
475 let local_registry = drt.local_endpoint_registry();
477 let engine = local_registry
478 .get(endpoint_name)
479 .ok_or_else(|| {
480 anyhow::anyhow!(
481 "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
482 endpoint_name
483 )
484 })?;
485
486 tracing::debug!(
487 "Found endpoint '{}' in local registry, calling directly",
488 endpoint_name
489 );
490
491 let request = crate::pipeline::SingleIn::new(request_body);
493 let mut stream = engine.generate(request).await?;
494
495 if let Some(response) = stream.next().await {
497 let response_data = response.data.unwrap_or_default();
498
499 let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
501 .unwrap_or_else(|_| parse_lora_response(&response_data));
502
503 return Ok(lora_response);
504 }
505
506 anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
507}
508
509fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
511 LoraResponse {
512 status: response_data
513 .get("status")
514 .and_then(|s| s.as_str())
515 .unwrap_or("success")
516 .to_string(),
517 message: response_data
518 .get("message")
519 .and_then(|m| m.as_str())
520 .map(|s| s.to_string()),
521 lora_name: response_data
522 .get("lora_name")
523 .and_then(|n| n.as_str())
524 .map(|s| s.to_string()),
525 lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
526 loras: response_data.get("loras").cloned(),
527 count: response_data
528 .get("count")
529 .and_then(|c| c.as_u64())
530 .map(|c| c as usize),
531 }
532}
533
534#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
539async fn engine_route_handler(
540 state: Arc<SystemStatusState>,
541 Path(path): Path<String>,
542 body: Bytes,
543) -> impl IntoResponse {
544 tracing::trace!("Engine route request to /engine/{}", path);
545
546 let body_json: serde_json::Value = if body.is_empty() {
548 serde_json::json!({})
549 } else {
550 match serde_json::from_slice(&body) {
551 Ok(json) => json,
552 Err(e) => {
553 tracing::warn!("Invalid JSON in request body: {}", e);
554 return (
555 StatusCode::BAD_REQUEST,
556 json!({
557 "error": "Invalid JSON",
558 "message": format!("{}", e)
559 })
560 .to_string(),
561 )
562 .into_response();
563 }
564 }
565 };
566
567 let callback = match state.drt().engine_routes().get(&path) {
569 Some(cb) => cb,
570 None => {
571 tracing::debug!("Route /engine/{} not found", path);
572 return (
573 StatusCode::NOT_FOUND,
574 json!({
575 "error": "Route not found",
576 "message": format!("Route /engine/{} not found", path)
577 })
578 .to_string(),
579 )
580 .into_response();
581 }
582 };
583
584 match callback(body_json).await {
586 Ok(response) => {
587 tracing::trace!("Engine route handler succeeded for /engine/{}", path);
588 (StatusCode::OK, response.to_string()).into_response()
589 }
590 Err(e) => {
591 tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
592 (
593 StatusCode::INTERNAL_SERVER_ERROR,
594 json!({
595 "error": "Handler error",
596 "message": format!("{}", e)
597 })
598 .to_string(),
599 )
600 .into_response()
601 }
602 }
603}
604
605#[cfg(test)]
607mod tests {
608 use super::*;
609 use tokio::time::Duration;
610
611 #[tokio::test]
613 async fn test_http_server_lifecycle() {
614 let cancel_token = CancellationToken::new();
615 let cancel_token_for_server = cancel_token.clone();
616
617 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
619
620 let server_handle = tokio::spawn(async move {
622 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
623 let _ = axum::serve(listener, app)
624 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
625 .await;
626 });
627
628 cancel_token.cancel();
632
633 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
635 assert!(
636 result.is_ok(),
637 "HTTP server should shut down when cancel token is cancelled"
638 );
639 }
640}
641
642#[cfg(all(test, feature = "integration"))]
644mod integration_tests {
645 use super::*;
646 use crate::config::environment_names::logging as env_logging;
647 use crate::config::environment_names::runtime::canary as env_canary;
648 use crate::distributed::distributed_test_utils::create_test_drt_async;
649 use crate::metrics::MetricsHierarchy;
650 use anyhow::Result;
651 use rstest::rstest;
652 use std::sync::Arc;
653 use tokio::time::Duration;
654
655 #[tokio::test]
656 async fn test_uptime_from_system_health() {
657 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
659 let drt = create_test_drt_async().await;
660
661 let uptime = drt.system_health().lock().uptime();
663 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
665
666 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
668 let uptime_after = drt.system_health().lock().uptime();
669 assert!(uptime_after > uptime);
670 })
671 .await;
672 }
673
674 #[tokio::test]
675 async fn test_runtime_metrics_initialization_and_namespace() {
676 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
678 let drt = create_test_drt_async().await;
679 let response = drt.metrics().prometheus_expfmt().unwrap();
684 println!("Full metrics response:\n{}", response);
685
686 assert!(
688 response.contains("# HELP dynamo_component_uptime_seconds"),
689 "Should contain uptime_seconds help text"
690 );
691 assert!(
692 response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
693 "Should contain uptime_seconds type"
694 );
695 assert!(
696 response.contains("dynamo_component_uptime_seconds"),
697 "Should contain uptime_seconds metric with correct namespace"
698 );
699 })
700 .await;
701 }
702
703 #[tokio::test]
704 async fn test_uptime_gauge_updates() {
705 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
707 let drt = create_test_drt_async().await;
708
709 let initial_uptime = drt.system_health().lock().uptime();
711
712 drt.system_health().lock().update_uptime_gauge();
714
715 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
717
718 let uptime_after_sleep = drt.system_health().lock().uptime();
720
721 drt.system_health().lock().update_uptime_gauge();
723
724 let elapsed = uptime_after_sleep - initial_uptime;
726 assert!(
727 elapsed >= std::time::Duration::from_millis(100),
728 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
729 elapsed
730 );
731 })
732 .await;
733 }
734
735 #[tokio::test]
736 async fn test_http_requests_fail_when_system_disabled() {
737 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
739 let drt = create_test_drt_async().await;
740
741 let system_info = drt.system_status_server_info();
743 assert!(
744 system_info.is_none(),
745 "System status server should not be running when disabled"
746 );
747
748 println!("✓ System status server correctly disabled when not enabled");
749 })
750 .await;
751 }
752
753 #[rstest]
760 #[case("ready", 200, "ready", None, None, 3)]
761 #[case("notready", 503, "notready", None, None, 3)]
762 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
763 #[case(
764 "notready",
765 503,
766 "notready",
767 Some("/custom/health"),
768 Some("/custom/live"),
769 5
770 )]
771 #[tokio::test]
772 #[cfg(feature = "integration")]
773 async fn test_health_endpoints(
774 #[case] starting_health_status: &'static str,
775 #[case] expected_status: u16,
776 #[case] expected_body: &'static str,
777 #[case] custom_health_path: Option<&'static str>,
778 #[case] custom_live_path: Option<&'static str>,
779 #[case] expected_num_tests: usize,
780 ) {
781 use std::sync::Arc;
782 crate::logging::init();
788
789 #[allow(clippy::redundant_closure_call)]
790 temp_env::async_with_vars(
791 [
792 (env_system::DYN_SYSTEM_PORT, Some("0")),
793 (
794 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
795 Some(starting_health_status),
796 ),
797 (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
798 (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
799 ],
800 (async || {
801 let drt = Arc::new(create_test_drt_async().await);
802
803 let system_info = drt
805 .system_status_server_info()
806 .expect("System status server should be started by DRT");
807 let addr = system_info.socket_addr;
808
809 let client = reqwest::Client::new();
810
811 let mut test_cases = vec![];
813 match custom_health_path {
814 None => {
815 test_cases.push(("/health", expected_status, expected_body));
817 }
818 Some(chp) => {
819 test_cases.push(("/health", 404, "Route not found"));
821 test_cases.push((chp, expected_status, expected_body));
822 }
823 }
824 match custom_live_path {
825 None => {
826 test_cases.push(("/live", expected_status, expected_body));
828 }
829 Some(clp) => {
830 test_cases.push(("/live", 404, "Route not found"));
832 test_cases.push((clp, expected_status, expected_body));
833 }
834 }
835 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
836 assert_eq!(test_cases.len(), expected_num_tests);
837
838 for (path, expect_status, expect_body) in test_cases {
839 println!("[test] Sending request to {}", path);
840 let url = format!("http://{}{}", addr, path);
841 let response = client.get(&url).send().await.unwrap();
842 let status = response.status();
843 let body = response.text().await.unwrap();
844 println!(
845 "[test] Response for {}: status={}, body={:?}",
846 path, status, body
847 );
848 assert_eq!(
849 status, expect_status,
850 "Response: status={}, body={:?}",
851 status, body
852 );
853 assert!(
854 body.contains(expect_body),
855 "Response: status={}, body={:?}",
856 status,
857 body
858 );
859 }
860 })(),
861 )
862 .await;
863 }
864
865 #[tokio::test]
866 async fn test_health_endpoint_tracing() -> Result<()> {
867 use std::sync::Arc;
868
869 #[allow(clippy::redundant_closure_call)]
872 let _ = temp_env::async_with_vars(
873 [
874 (env_system::DYN_SYSTEM_PORT, Some("0")),
875 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
876 (env_logging::DYN_LOGGING_JSONL, Some("1")),
877 (env_logging::DYN_LOG, Some("trace")),
878 ],
879 (async || {
880 crate::logging::init();
884
885 let drt = Arc::new(create_test_drt_async().await);
886
887 let system_info = drt
889 .system_status_server_info()
890 .expect("System status server should be started by DRT");
891 let addr = system_info.socket_addr;
892 let client = reqwest::Client::new();
893 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
894 let traceparent_value =
895 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
896 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
897 let mut headers = reqwest::header::HeaderMap::new();
898 headers.insert(
899 reqwest::header::HeaderName::from_static("traceparent"),
900 reqwest::header::HeaderValue::from_str(traceparent_value)?,
901 );
902 headers.insert(
903 reqwest::header::HeaderName::from_static("tracestate"),
904 reqwest::header::HeaderValue::from_str(tracestate_value)?,
905 );
906 let url = format!("http://{}{}", addr, path);
907 let response = client.get(&url).headers(headers).send().await.unwrap();
908 let status = response.status();
909 let body = response.text().await.unwrap();
910 tracing::info!(body = body, status = status.to_string());
911 }
912
913 Ok::<(), anyhow::Error>(())
914 })(),
915 )
916 .await;
917 Ok(())
918 }
919
920 #[tokio::test]
921 async fn test_health_endpoint_with_changing_health_status() {
922 const ENDPOINT_NAME: &str = "generate";
925 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
926 temp_env::async_with_vars(
927 [
928 (env_system::DYN_SYSTEM_PORT, Some("0")),
929 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
930 (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
931 ],
932 async {
933 let drt = Arc::new(create_test_drt_async().await);
934
935 let system_info_opt = drt.system_status_server_info();
937
938 assert!(
940 system_info_opt.is_some(),
941 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
942 std::env::var(env_system::DYN_SYSTEM_PORT)
943 );
944
945 let system_info = system_info_opt.unwrap();
947 let addr = system_info.socket_addr;
948
949 let client = reqwest::Client::new();
951 let health_url = format!("http://{}/health", addr);
952
953 let response = client.get(&health_url).send().await.unwrap();
954 let status = response.status();
955 let body = response.text().await.unwrap();
956
957 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
959 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
960
961 let namespace = drt.namespace("ns1234").unwrap();
963 let component = namespace.component("comp1234").unwrap();
964
965 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
967 use crate::protocols::annotated::Annotated;
968
969 struct TestHandler;
970
971 #[async_trait]
972 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
973 async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
974 let (data, ctx) = input.into_parts();
975 let response = Annotated::from_data(format!("You responded: {}", data));
976 Ok(crate::pipeline::ResponseStream::new(
977 Box::pin(crate::stream::iter(vec![response])),
978 ctx.context()
979 ))
980 }
981 }
982
983 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
985
986 tokio::spawn(async move {
989 let _ = component.endpoint(ENDPOINT_NAME)
990 .endpoint_builder()
991 .handler(ingress)
992 .health_check_payload(serde_json::json!({
993 "test": "health_check"
994 }))
995 .start()
996 .await;
997 });
998
999 let mut success_count = 0;
1001 let mut failures = Vec::new();
1002
1003 for i in 1..=200 {
1004 let response = client.get(&health_url).send().await.unwrap();
1005 let status = response.status();
1006 let body = response.text().await.unwrap();
1007
1008 if status == 200 && body.contains("\"status\":\"ready\"") {
1009 success_count += 1;
1010 } else {
1011 failures.push((i, status.as_u16(), body.clone()));
1012 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
1014 }
1015 }
1016 }
1017
1018 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1019 if !failures.is_empty() {
1020 tracing::warn!("Failed requests: {}", failures.len());
1021 }
1022
1023 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1025 },
1026 )
1027 .await;
1028 }
1029
1030 #[tokio::test]
1031 async fn test_spawn_system_status_server_endpoints() {
1032 temp_env::async_with_vars(
1034 [
1035 (env_system::DYN_SYSTEM_PORT, Some("0")),
1036 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1037 ],
1038 async {
1039 let drt = Arc::new(create_test_drt_async().await);
1040
1041 let system_info = drt
1043 .system_status_server_info()
1044 .expect("System status server should be started by DRT");
1045 let addr = system_info.socket_addr;
1046 let client = reqwest::Client::new();
1047 for (path, expect_200, expect_body) in [
1048 ("/health", true, "ready"),
1049 ("/live", true, "ready"),
1050 ("/someRandomPathNotFoundHere", false, "Route not found"),
1051 ] {
1052 println!("[test] Sending request to {}", path);
1053 let url = format!("http://{}{}", addr, path);
1054 let response = client.get(&url).send().await.unwrap();
1055 let status = response.status();
1056 let body = response.text().await.unwrap();
1057 println!(
1058 "[test] Response for {}: status={}, body={:?}",
1059 path, status, body
1060 );
1061 if expect_200 {
1062 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1063 } else {
1064 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1065 }
1066 assert!(
1067 body.contains(expect_body),
1068 "Response: status={}, body={:?}",
1069 status,
1070 body
1071 );
1072 }
1073 },
1075 )
1076 .await;
1077 }
1078
1079 #[cfg(feature = "integration")]
1080 #[tokio::test]
1081 async fn test_health_check_with_payload_and_timeout() {
1082 crate::logging::init();
1084
1085 temp_env::async_with_vars(
1086 [
1087 (env_system::DYN_SYSTEM_PORT, Some("0")),
1088 (
1089 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1090 Some("notready"),
1091 ),
1092 (
1093 env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1094 Some("[\"test.endpoint\"]"),
1095 ),
1096 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1098 (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
1102 async {
1103 let drt = Arc::new(create_test_drt_async().await);
1104
1105 let system_info = drt
1107 .system_status_server_info()
1108 .expect("System status server should be started");
1109 let addr = system_info.socket_addr;
1110
1111 let client = reqwest::Client::new();
1112 let health_url = format!("http://{}/health", addr);
1113
1114 let endpoint = "test.endpoint";
1116 let health_check_payload = serde_json::json!({
1117 "prompt": "health check test",
1118 "_health_check": true
1119 });
1120
1121 {
1123 let system_health = drt.system_health();
1124 let system_health_lock = system_health.lock();
1125 system_health_lock.register_health_check_target(
1126 endpoint,
1127 crate::component::Instance {
1128 component: "test_component".to_string(),
1129 endpoint: "health".to_string(),
1130 namespace: "test_namespace".to_string(),
1131 instance_id: 1,
1132 transport: crate::component::TransportType::Nats(endpoint.to_string()),
1133 },
1134 health_check_payload.clone(),
1135 );
1136 }
1137
1138 let response = client.get(&health_url).send().await.unwrap();
1140 let status = response.status();
1141 let body = response.text().await.unwrap();
1142 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1143 assert!(
1144 body.contains("\"status\":\"notready\""),
1145 "Should show notready status initially"
1146 );
1147
1148 drt.system_health()
1150 .lock()
1151 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1152
1153 let response = client.get(&health_url).send().await.unwrap();
1155 let status = response.status();
1156 let body = response.text().await.unwrap();
1157
1158 assert_eq!(status, 200, "Should be healthy due to recent response");
1159 assert!(
1160 body.contains("\"status\":\"ready\""),
1161 "Should show ready status after response"
1162 );
1163
1164 let endpoint_status = drt
1166 .system_health()
1167 .lock()
1168 .get_endpoint_health_status(endpoint);
1169 assert_eq!(
1170 endpoint_status,
1171 Some(HealthStatus::Ready),
1172 "SystemHealth should show endpoint as Ready after response"
1173 );
1174 },
1175 )
1176 .await;
1177 }
1178}