1use crate::config::HealthStatus;
8use crate::config::environment_names::logging as env_logging;
9use crate::config::environment_names::runtime::canary as env_canary;
10use crate::config::environment_names::runtime::system as env_system;
11use crate::logging::make_request_span;
12use crate::metrics::MetricsHierarchy;
13use crate::traits::DistributedRuntimeProvider;
14use axum::{
15 Router,
16 body::Bytes,
17 extract::{Json, Path, State},
18 http::StatusCode,
19 response::IntoResponse,
20 routing::{any, delete, get, post},
21};
22use futures::StreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::json;
25use std::collections::HashMap;
26use std::sync::{Arc, OnceLock};
27use std::time::Instant;
28use tokio::{net::TcpListener, task::JoinHandle};
29use tokio_util::sync::CancellationToken;
30use tower_http::trace::TraceLayer;
31
32#[derive(Debug)]
34pub struct SystemStatusServerInfo {
35 pub socket_addr: std::net::SocketAddr,
36 pub handle: Option<Arc<JoinHandle<()>>>,
37}
38
39impl SystemStatusServerInfo {
40 pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
41 Self {
42 socket_addr,
43 handle: handle.map(Arc::new),
44 }
45 }
46
47 pub fn address(&self) -> String {
48 self.socket_addr.to_string()
49 }
50
51 pub fn hostname(&self) -> String {
52 self.socket_addr.ip().to_string()
53 }
54
55 pub fn port(&self) -> u16 {
56 self.socket_addr.port()
57 }
58}
59
60impl Clone for SystemStatusServerInfo {
61 fn clone(&self) -> Self {
62 Self {
63 socket_addr: self.socket_addr,
64 handle: self.handle.clone(),
65 }
66 }
67}
68
69pub struct SystemStatusState {
71 root_drt: Arc<crate::DistributedRuntime>,
73 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
75}
76
77impl SystemStatusState {
78 pub fn new(
80 drt: Arc<crate::DistributedRuntime>,
81 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
82 ) -> anyhow::Result<Self> {
83 Ok(Self {
84 root_drt: drt,
85 discovery_metadata,
86 })
87 }
88
89 pub fn drt(&self) -> &crate::DistributedRuntime {
91 &self.root_drt
92 }
93
94 pub fn discovery_metadata(
96 &self,
97 ) -> Option<&Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>> {
98 self.discovery_metadata.as_ref()
99 }
100}
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104pub struct LoadLoraRequest {
105 pub lora_name: String,
106 pub source: LoraSource,
107}
108
109#[derive(Debug, Clone, Deserialize, Serialize)]
111pub struct LoraSource {
112 pub uri: String,
113}
114
115#[derive(Debug, Clone, Deserialize, Serialize)]
117pub struct LoraResponse {
118 pub status: String,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub message: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 pub lora_name: Option<String>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub lora_id: Option<u64>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub loras: Option<serde_json::Value>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub count: Option<usize>,
129}
130
131pub async fn spawn_system_status_server(
133 host: &str,
134 port: u16,
135 cancel_token: CancellationToken,
136 drt: Arc<crate::DistributedRuntime>,
137 discovery_metadata: Option<Arc<tokio::sync::RwLock<crate::discovery::DiscoveryMetadata>>>,
138) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
139 let server_state = Arc::new(SystemStatusState::new(drt, discovery_metadata)?);
141 let health_path = server_state
142 .drt()
143 .system_health()
144 .lock()
145 .health_path()
146 .to_string();
147 let live_path = server_state
148 .drt()
149 .system_health()
150 .lock()
151 .live_path()
152 .to_string();
153
154 let lora_enabled = std::env::var(crate::config::environment_names::llm::DYN_LORA_ENABLED)
156 .map(|v| v.to_lowercase() == "true")
157 .unwrap_or(false);
158
159 let mut app = Router::new()
160 .route(
161 &health_path,
162 get({
163 let state = Arc::clone(&server_state);
164 move || health_handler(state)
165 }),
166 )
167 .route(
168 &live_path,
169 get({
170 let state = Arc::clone(&server_state);
171 move || health_handler(state)
172 }),
173 )
174 .route(
175 "/metrics",
176 get({
177 let state = Arc::clone(&server_state);
178 move || metrics_handler(state)
179 }),
180 )
181 .route(
182 "/metadata",
183 get({
184 let state = Arc::clone(&server_state);
185 move || metadata_handler(state)
186 }),
187 )
188 .route(
189 "/engine/{*path}",
190 any({
191 let state = Arc::clone(&server_state);
192 move |path, body| engine_route_handler(state, path, body)
193 }),
194 );
195
196 if lora_enabled {
198 app = app
199 .route(
200 "/v1/loras",
201 get({
202 let state = Arc::clone(&server_state);
203 move || list_loras_handler(State(state))
204 })
205 .post({
206 let state = Arc::clone(&server_state);
207 move |body| load_lora_handler(State(state), body)
208 }),
209 )
210 .route(
211 "/v1/loras/{*lora_name}",
212 delete({
213 let state = Arc::clone(&server_state);
214 move |path| unload_lora_handler(State(state), path)
215 }),
216 );
217 }
218
219 let app = app
220 .fallback(|| async {
221 tracing::info!("[fallback handler] called");
222 (StatusCode::NOT_FOUND, "Route not found").into_response()
223 })
224 .layer(TraceLayer::new_for_http().make_span_with(make_request_span));
225
226 let address = format!("{}:{}", host, port);
227 tracing::info!("[spawn_system_status_server] binding to: {}", address);
228
229 let listener = match TcpListener::bind(&address).await {
230 Ok(listener) => {
231 let actual_address = listener.local_addr()?;
233 tracing::info!(
234 "[spawn_system_status_server] system status server bound to: {}",
235 actual_address
236 );
237 (listener, actual_address)
238 }
239 Err(e) => {
240 tracing::error!("Failed to bind to address {}: {}", address, e);
241 return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
242 }
243 };
244 let (listener, actual_address) = listener;
245
246 let observer = cancel_token.child_token();
247 let handle = tokio::spawn(async move {
249 if let Err(e) = axum::serve(listener, app)
250 .with_graceful_shutdown(observer.cancelled_owned())
251 .await
252 {
253 tracing::error!("System status server error: {}", e);
254 }
255 });
256
257 Ok((actual_address, handle))
258}
259
260#[tracing::instrument(skip_all, level = "trace")]
262async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
263 let system_health = state.drt().system_health();
265 let system_health_lock = system_health.lock();
266 let (healthy, endpoints) = system_health_lock.get_health_status();
267 let uptime = Some(system_health_lock.uptime());
268 drop(system_health_lock);
269
270 let healthy_string = if healthy { "ready" } else { "notready" };
271 let status_code = if healthy {
272 StatusCode::OK
273 } else {
274 StatusCode::SERVICE_UNAVAILABLE
275 };
276
277 let response = json!({
278 "status": healthy_string,
279 "uptime": uptime,
280 "endpoints": endpoints,
281 });
282
283 tracing::trace!("Response {}", response.to_string());
284
285 (status_code, response.to_string())
286}
287
288#[tracing::instrument(skip_all, level = "trace")]
290async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
291 state.drt().system_health().lock().update_uptime_gauge();
293
294 let response = match state.drt().metrics().prometheus_expfmt() {
300 Ok(r) => r,
301 Err(e) => {
302 tracing::error!("Failed to get metrics from registry: {}", e);
303 return (
304 StatusCode::INTERNAL_SERVER_ERROR,
305 "Failed to get metrics".to_string(),
306 );
307 }
308 };
309
310 (StatusCode::OK, response)
311}
312
313#[tracing::instrument(skip_all, level = "trace")]
315async fn metadata_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
316 let metadata = match state.discovery_metadata() {
318 Some(metadata) => metadata,
319 None => {
320 tracing::debug!("Metadata endpoint called but no discovery metadata available");
321 return (
322 StatusCode::NOT_FOUND,
323 "Discovery metadata not available".to_string(),
324 )
325 .into_response();
326 }
327 };
328
329 let metadata_guard = metadata.read().await;
331
332 match serde_json::to_string(&*metadata_guard) {
334 Ok(json) => {
335 tracing::trace!("Returning metadata: {} bytes", json.len());
336 (StatusCode::OK, json).into_response()
337 }
338 Err(e) => {
339 tracing::error!("Failed to serialize metadata: {}", e);
340 (
341 StatusCode::INTERNAL_SERVER_ERROR,
342 "Failed to serialize metadata".to_string(),
343 )
344 .into_response()
345 }
346 }
347}
348
349#[tracing::instrument(skip_all, level = "debug")]
351async fn load_lora_handler(
352 State(state): State<Arc<SystemStatusState>>,
353 Json(request): Json<LoadLoraRequest>,
354) -> impl IntoResponse {
355 tracing::info!("Loading LoRA: {}", request.lora_name);
356
357 match call_lora_endpoint(
359 state.drt(),
360 "load_lora",
361 json!({
362 "lora_name": request.lora_name,
363 "source": {
364 "uri": request.source.uri
365 },
366 }),
367 )
368 .await
369 {
370 Ok(response) => {
371 tracing::info!("LoRA loaded successfully: {}", request.lora_name);
372 (StatusCode::OK, Json(response))
373 }
374 Err(e) => {
375 tracing::error!("Failed to load LoRA {}: {}", request.lora_name, e);
376 (
377 StatusCode::INTERNAL_SERVER_ERROR,
378 Json(LoraResponse {
379 status: "error".to_string(),
380 message: Some(e.to_string()),
381 lora_name: Some(request.lora_name),
382 lora_id: None,
383 loras: None,
384 count: None,
385 }),
386 )
387 }
388 }
389}
390
391#[tracing::instrument(skip_all, level = "debug")]
393async fn unload_lora_handler(
394 State(state): State<Arc<SystemStatusState>>,
395 Path(lora_name): Path<String>,
396) -> impl IntoResponse {
397 let lora_name = lora_name
399 .strip_prefix('/')
400 .unwrap_or(&lora_name)
401 .to_string();
402 tracing::info!("Unloading LoRA: {}", lora_name);
403
404 match call_lora_endpoint(
406 state.drt(),
407 "unload_lora",
408 json!({
409 "lora_name": lora_name.clone(),
410 }),
411 )
412 .await
413 {
414 Ok(response) => {
415 tracing::info!("LoRA unloaded successfully: {}", lora_name);
416 (StatusCode::OK, Json(response))
417 }
418 Err(e) => {
419 tracing::error!("Failed to unload LoRA {}: {}", lora_name, e);
420 (
421 StatusCode::INTERNAL_SERVER_ERROR,
422 Json(LoraResponse {
423 status: "error".to_string(),
424 message: Some(e.to_string()),
425 lora_name: Some(lora_name),
426 lora_id: None,
427 loras: None,
428 count: None,
429 }),
430 )
431 }
432 }
433}
434
435#[tracing::instrument(skip_all, level = "debug")]
437async fn list_loras_handler(State(state): State<Arc<SystemStatusState>>) -> impl IntoResponse {
438 tracing::info!("Listing all LoRAs");
439
440 match call_lora_endpoint(state.drt(), "list_loras", json!({})).await {
442 Ok(response) => {
443 tracing::info!("Successfully retrieved LoRA list");
444 (StatusCode::OK, Json(response))
445 }
446 Err(e) => {
447 tracing::error!("Failed to list LoRAs: {}", e);
448 (
449 StatusCode::INTERNAL_SERVER_ERROR,
450 Json(LoraResponse {
451 status: "error".to_string(),
452 message: Some(e.to_string()),
453 lora_name: None,
454 lora_id: None,
455 loras: None,
456 count: None,
457 }),
458 )
459 }
460 }
461}
462
463async fn call_lora_endpoint(
468 drt: &crate::DistributedRuntime,
469 endpoint_name: &str,
470 request_body: serde_json::Value,
471) -> anyhow::Result<LoraResponse> {
472 use crate::engine::AsyncEngine;
473
474 tracing::debug!("Calling local endpoint: '{}'", endpoint_name);
475
476 let local_registry = drt.local_endpoint_registry();
478 let engine = local_registry
479 .get(endpoint_name)
480 .ok_or_else(|| {
481 anyhow::anyhow!(
482 "Endpoint '{}' not found in local registry. Make sure it's registered with .register_local_engine()",
483 endpoint_name
484 )
485 })?;
486
487 tracing::debug!(
488 "Found endpoint '{}' in local registry, calling directly",
489 endpoint_name
490 );
491
492 let request = crate::pipeline::SingleIn::new(request_body);
494 let mut stream = engine.generate(request).await?;
495
496 if let Some(response) = stream.next().await {
498 let response_data = response.data.unwrap_or_default();
499
500 let lora_response = serde_json::from_value::<LoraResponse>(response_data.clone())
502 .unwrap_or_else(|_| parse_lora_response(&response_data));
503
504 return Ok(lora_response);
505 }
506
507 anyhow::bail!("No response received from endpoint '{}'", endpoint_name)
508}
509
510fn parse_lora_response(response_data: &serde_json::Value) -> LoraResponse {
512 LoraResponse {
513 status: response_data
514 .get("status")
515 .and_then(|s| s.as_str())
516 .unwrap_or("success")
517 .to_string(),
518 message: response_data
519 .get("message")
520 .and_then(|m| m.as_str())
521 .map(|s| s.to_string()),
522 lora_name: response_data
523 .get("lora_name")
524 .and_then(|n| n.as_str())
525 .map(|s| s.to_string()),
526 lora_id: response_data.get("lora_id").and_then(|id| id.as_u64()),
527 loras: response_data.get("loras").cloned(),
528 count: response_data
529 .get("count")
530 .and_then(|c| c.as_u64())
531 .map(|c| c as usize),
532 }
533}
534
535#[tracing::instrument(skip_all, level = "trace", fields(path = %path))]
540async fn engine_route_handler(
541 state: Arc<SystemStatusState>,
542 Path(path): Path<String>,
543 body: Bytes,
544) -> impl IntoResponse {
545 tracing::trace!("Engine route request to /engine/{}", path);
546
547 let body_json: serde_json::Value = if body.is_empty() {
549 serde_json::json!({})
550 } else {
551 match serde_json::from_slice(&body) {
552 Ok(json) => json,
553 Err(e) => {
554 tracing::warn!("Invalid JSON in request body: {}", e);
555 return (
556 StatusCode::BAD_REQUEST,
557 json!({
558 "error": "Invalid JSON",
559 "message": format!("{}", e)
560 })
561 .to_string(),
562 )
563 .into_response();
564 }
565 }
566 };
567
568 let callback = match state.drt().engine_routes().get(&path) {
570 Some(cb) => cb,
571 None => {
572 tracing::debug!("Route /engine/{} not found", path);
573 return (
574 StatusCode::NOT_FOUND,
575 json!({
576 "error": "Route not found",
577 "message": format!("Route /engine/{} not found", path)
578 })
579 .to_string(),
580 )
581 .into_response();
582 }
583 };
584
585 match callback(body_json).await {
587 Ok(response) => {
588 tracing::trace!("Engine route handler succeeded for /engine/{}", path);
589 (StatusCode::OK, response.to_string()).into_response()
590 }
591 Err(e) => {
592 tracing::error!("Engine route handler error for /engine/{}: {}", path, e);
593 (
594 StatusCode::INTERNAL_SERVER_ERROR,
595 json!({
596 "error": "Handler error",
597 "message": format!("{}", e)
598 })
599 .to_string(),
600 )
601 .into_response()
602 }
603 }
604}
605
606#[cfg(test)]
608mod tests {
609 use super::*;
610 use tokio::time::Duration;
611
612 #[tokio::test]
614 async fn test_http_server_lifecycle() {
615 let cancel_token = CancellationToken::new();
616 let cancel_token_for_server = cancel_token.clone();
617
618 let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
620
621 let server_handle = tokio::spawn(async move {
623 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
624 let _ = axum::serve(listener, app)
625 .with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
626 .await;
627 });
628
629 cancel_token.cancel();
633
634 let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
636 assert!(
637 result.is_ok(),
638 "HTTP server should shut down when cancel token is cancelled"
639 );
640 }
641}
642
643#[cfg(all(test, feature = "integration"))]
645mod integration_tests {
646 use super::*;
647 use crate::config::environment_names::logging as env_logging;
648 use crate::config::environment_names::runtime::canary as env_canary;
649 use crate::distributed::distributed_test_utils::create_test_drt_async;
650 use crate::metrics::MetricsHierarchy;
651 use anyhow::Result;
652 use rstest::rstest;
653 use std::sync::Arc;
654 use tokio::time::Duration;
655
656 #[tokio::test]
657 async fn test_uptime_from_system_health() {
658 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
660 let drt = create_test_drt_async().await;
661
662 let uptime = drt.system_health().lock().uptime();
664 assert!(uptime.as_nanos() > 0 || uptime.is_zero());
666
667 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
669 let uptime_after = drt.system_health().lock().uptime();
670 assert!(uptime_after > uptime);
671 })
672 .await;
673 }
674
675 #[tokio::test]
676 async fn test_runtime_metrics_initialization_and_namespace() {
677 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
679 let drt = create_test_drt_async().await;
680 let response = drt.metrics().prometheus_expfmt().unwrap();
685 println!("Full metrics response:\n{}", response);
686
687 assert!(
689 response.contains("# HELP dynamo_component_uptime_seconds"),
690 "Should contain uptime_seconds help text"
691 );
692 assert!(
693 response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
694 "Should contain uptime_seconds type"
695 );
696 assert!(
697 response.contains("dynamo_component_uptime_seconds"),
698 "Should contain uptime_seconds metric with correct namespace"
699 );
700 })
701 .await;
702 }
703
704 #[tokio::test]
705 async fn test_uptime_gauge_updates() {
706 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
708 let drt = create_test_drt_async().await;
709
710 let initial_uptime = drt.system_health().lock().uptime();
712
713 drt.system_health().lock().update_uptime_gauge();
715
716 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
718
719 let uptime_after_sleep = drt.system_health().lock().uptime();
721
722 drt.system_health().lock().update_uptime_gauge();
724
725 let elapsed = uptime_after_sleep - initial_uptime;
727 assert!(
728 elapsed >= std::time::Duration::from_millis(100),
729 "Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
730 elapsed
731 );
732 })
733 .await;
734 }
735
736 #[tokio::test]
737 async fn test_http_requests_fail_when_system_disabled() {
738 temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
740 let drt = create_test_drt_async().await;
741
742 let system_info = drt.system_status_server_info();
744 assert!(
745 system_info.is_none(),
746 "System status server should not be running when disabled"
747 );
748
749 println!("✓ System status server correctly disabled when not enabled");
750 })
751 .await;
752 }
753
754 #[rstest]
761 #[case("ready", 200, "ready", None, None, 3)]
762 #[case("notready", 503, "notready", None, None, 3)]
763 #[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
764 #[case(
765 "notready",
766 503,
767 "notready",
768 Some("/custom/health"),
769 Some("/custom/live"),
770 5
771 )]
772 #[tokio::test]
773 #[cfg(feature = "integration")]
774 async fn test_health_endpoints(
775 #[case] starting_health_status: &'static str,
776 #[case] expected_status: u16,
777 #[case] expected_body: &'static str,
778 #[case] custom_health_path: Option<&'static str>,
779 #[case] custom_live_path: Option<&'static str>,
780 #[case] expected_num_tests: usize,
781 ) {
782 use std::sync::Arc;
783 crate::logging::init();
789
790 #[allow(clippy::redundant_closure_call)]
791 temp_env::async_with_vars(
792 [
793 (env_system::DYN_SYSTEM_PORT, Some("0")),
794 (
795 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
796 Some(starting_health_status),
797 ),
798 (env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
799 (env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
800 ],
801 (async || {
802 let drt = Arc::new(create_test_drt_async().await);
803
804 let system_info = drt
806 .system_status_server_info()
807 .expect("System status server should be started by DRT");
808 let addr = system_info.socket_addr;
809
810 let client = reqwest::Client::new();
811
812 let mut test_cases = vec![];
814 match custom_health_path {
815 None => {
816 test_cases.push(("/health", expected_status, expected_body));
818 }
819 Some(chp) => {
820 test_cases.push(("/health", 404, "Route not found"));
822 test_cases.push((chp, expected_status, expected_body));
823 }
824 }
825 match custom_live_path {
826 None => {
827 test_cases.push(("/live", expected_status, expected_body));
829 }
830 Some(clp) => {
831 test_cases.push(("/live", 404, "Route not found"));
833 test_cases.push((clp, expected_status, expected_body));
834 }
835 }
836 test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
837 assert_eq!(test_cases.len(), expected_num_tests);
838
839 for (path, expect_status, expect_body) in test_cases {
840 println!("[test] Sending request to {}", path);
841 let url = format!("http://{}{}", addr, path);
842 let response = client.get(&url).send().await.unwrap();
843 let status = response.status();
844 let body = response.text().await.unwrap();
845 println!(
846 "[test] Response for {}: status={}, body={:?}",
847 path, status, body
848 );
849 assert_eq!(
850 status, expect_status,
851 "Response: status={}, body={:?}",
852 status, body
853 );
854 assert!(
855 body.contains(expect_body),
856 "Response: status={}, body={:?}",
857 status,
858 body
859 );
860 }
861 })(),
862 )
863 .await;
864 }
865
866 #[tokio::test]
867 async fn test_health_endpoint_tracing() -> Result<()> {
868 use std::sync::Arc;
869
870 #[allow(clippy::redundant_closure_call)]
873 let _ = temp_env::async_with_vars(
874 [
875 (env_system::DYN_SYSTEM_PORT, Some("0")),
876 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
877 (env_logging::DYN_LOGGING_JSONL, Some("1")),
878 (env_logging::DYN_LOG, Some("trace")),
879 ],
880 (async || {
881 crate::logging::init();
885
886 let drt = Arc::new(create_test_drt_async().await);
887
888 let system_info = drt
890 .system_status_server_info()
891 .expect("System status server should be started by DRT");
892 let addr = system_info.socket_addr;
893 let client = reqwest::Client::new();
894 for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
895 let traceparent_value =
896 "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
897 let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
898 let mut headers = reqwest::header::HeaderMap::new();
899 headers.insert(
900 reqwest::header::HeaderName::from_static("traceparent"),
901 reqwest::header::HeaderValue::from_str(traceparent_value)?,
902 );
903 headers.insert(
904 reqwest::header::HeaderName::from_static("tracestate"),
905 reqwest::header::HeaderValue::from_str(tracestate_value)?,
906 );
907 let url = format!("http://{}{}", addr, path);
908 let response = client.get(&url).headers(headers).send().await.unwrap();
909 let status = response.status();
910 let body = response.text().await.unwrap();
911 tracing::info!(body = body, status = status.to_string());
912 }
913
914 Ok::<(), anyhow::Error>(())
915 })(),
916 )
917 .await;
918 Ok(())
919 }
920
921 #[tokio::test]
922 async fn test_health_endpoint_with_changing_health_status() {
923 const ENDPOINT_NAME: &str = "generate";
926 const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
927 temp_env::async_with_vars(
928 [
929 (env_system::DYN_SYSTEM_PORT, Some("0")),
930 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
931 (env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
932 ],
933 async {
934 let drt = Arc::new(create_test_drt_async().await);
935
936 let system_info_opt = drt.system_status_server_info();
938
939 assert!(
941 system_info_opt.is_some(),
942 "System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
943 std::env::var(env_system::DYN_SYSTEM_PORT)
944 );
945
946 let system_info = system_info_opt.unwrap();
948 let addr = system_info.socket_addr;
949
950 let client = reqwest::Client::new();
952 let health_url = format!("http://{}/health", addr);
953
954 let response = client.get(&health_url).send().await.unwrap();
955 let status = response.status();
956 let body = response.text().await.unwrap();
957
958 assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
960 assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
961
962 let namespace = drt.namespace("ns1234").unwrap();
964 let component = namespace.component("comp1234").unwrap();
965
966 use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
968 use crate::protocols::annotated::Annotated;
969
970 struct TestHandler;
971
972 #[async_trait]
973 impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, anyhow::Error> for TestHandler {
974 async fn generate(&self, input: SingleIn<String>) -> anyhow::Result<ManyOut<Annotated<String>>> {
975 let (data, ctx) = input.into_parts();
976 let response = Annotated::from_data(format!("You responded: {}", data));
977 Ok(crate::pipeline::ResponseStream::new(
978 Box::pin(crate::stream::iter(vec![response])),
979 ctx.context()
980 ))
981 }
982 }
983
984 let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
986
987 tokio::spawn(async move {
990 let _ = component.endpoint(ENDPOINT_NAME)
991 .endpoint_builder()
992 .handler(ingress)
993 .health_check_payload(serde_json::json!({
994 "test": "health_check"
995 }))
996 .start()
997 .await;
998 });
999
1000 let mut success_count = 0;
1002 let mut failures = Vec::new();
1003
1004 for i in 1..=200 {
1005 let response = client.get(&health_url).send().await.unwrap();
1006 let status = response.status();
1007 let body = response.text().await.unwrap();
1008
1009 if status == 200 && body.contains("\"status\":\"ready\"") {
1010 success_count += 1;
1011 } else {
1012 failures.push((i, status.as_u16(), body.clone()));
1013 if failures.len() <= 5 { tracing::warn!("Request {}: status={}, body={}", i, status, body);
1015 }
1016 }
1017 }
1018
1019 tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
1020 if !failures.is_empty() {
1021 tracing::warn!("Failed requests: {}", failures.len());
1022 }
1023
1024 assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
1026 },
1027 )
1028 .await;
1029 }
1030
1031 #[tokio::test]
1032 async fn test_spawn_system_status_server_endpoints() {
1033 temp_env::async_with_vars(
1035 [
1036 (env_system::DYN_SYSTEM_PORT, Some("0")),
1037 (env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
1038 ],
1039 async {
1040 let drt = Arc::new(create_test_drt_async().await);
1041
1042 let system_info = drt
1044 .system_status_server_info()
1045 .expect("System status server should be started by DRT");
1046 let addr = system_info.socket_addr;
1047 let client = reqwest::Client::new();
1048 for (path, expect_200, expect_body) in [
1049 ("/health", true, "ready"),
1050 ("/live", true, "ready"),
1051 ("/someRandomPathNotFoundHere", false, "Route not found"),
1052 ] {
1053 println!("[test] Sending request to {}", path);
1054 let url = format!("http://{}{}", addr, path);
1055 let response = client.get(&url).send().await.unwrap();
1056 let status = response.status();
1057 let body = response.text().await.unwrap();
1058 println!(
1059 "[test] Response for {}: status={}, body={:?}",
1060 path, status, body
1061 );
1062 if expect_200 {
1063 assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
1064 } else {
1065 assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
1066 }
1067 assert!(
1068 body.contains(expect_body),
1069 "Response: status={}, body={:?}",
1070 status,
1071 body
1072 );
1073 }
1074 },
1076 )
1077 .await;
1078 }
1079
1080 #[cfg(feature = "integration")]
1081 #[tokio::test]
1082 async fn test_health_check_with_payload_and_timeout() {
1083 crate::logging::init();
1085
1086 temp_env::async_with_vars(
1087 [
1088 (env_system::DYN_SYSTEM_PORT, Some("0")),
1089 (
1090 env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
1091 Some("notready"),
1092 ),
1093 (
1094 env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
1095 Some("[\"test.endpoint\"]"),
1096 ),
1097 ("DYN_HEALTH_CHECK_ENABLED", Some("true")),
1099 (env_canary::DYN_CANARY_WAIT_TIME, Some("1")), ("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), ("RUST_LOG", Some("info")), ],
1103 async {
1104 let drt = Arc::new(create_test_drt_async().await);
1105
1106 let system_info = drt
1108 .system_status_server_info()
1109 .expect("System status server should be started");
1110 let addr = system_info.socket_addr;
1111
1112 let client = reqwest::Client::new();
1113 let health_url = format!("http://{}/health", addr);
1114
1115 let endpoint = "test.endpoint";
1117 let health_check_payload = serde_json::json!({
1118 "prompt": "health check test",
1119 "_health_check": true
1120 });
1121
1122 {
1124 let system_health = drt.system_health();
1125 let system_health_lock = system_health.lock();
1126 system_health_lock.register_health_check_target(
1127 endpoint,
1128 crate::component::Instance {
1129 component: "test_component".to_string(),
1130 endpoint: "health".to_string(),
1131 namespace: "test_namespace".to_string(),
1132 instance_id: 1,
1133 transport: crate::component::TransportType::Nats(endpoint.to_string()),
1134 },
1135 health_check_payload.clone(),
1136 );
1137 }
1138
1139 let response = client.get(&health_url).send().await.unwrap();
1141 let status = response.status();
1142 let body = response.text().await.unwrap();
1143 assert_eq!(status, 503, "Should be unhealthy initially (default state)");
1144 assert!(
1145 body.contains("\"status\":\"notready\""),
1146 "Should show notready status initially"
1147 );
1148
1149 drt.system_health()
1151 .lock()
1152 .set_endpoint_health_status(endpoint, HealthStatus::Ready);
1153
1154 let response = client.get(&health_url).send().await.unwrap();
1156 let status = response.status();
1157 let body = response.text().await.unwrap();
1158
1159 assert_eq!(status, 200, "Should be healthy due to recent response");
1160 assert!(
1161 body.contains("\"status\":\"ready\""),
1162 "Should show ready status after response"
1163 );
1164
1165 let endpoint_status = drt
1167 .system_health()
1168 .lock()
1169 .get_endpoint_health_status(endpoint);
1170 assert_eq!(
1171 endpoint_status,
1172 Some(HealthStatus::Ready),
1173 "SystemHealth should show endpoint as Ready after response"
1174 );
1175 },
1176 )
1177 .await;
1178 }
1179}