dynamo_runtime/
config.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::Result;
5use derive_builder::Builder;
6use figment::{
7    Figment,
8    providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use validator::Validate;
13
14/// Default system host for health and metrics endpoints
15const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16
17/// Default system port for health and metrics endpoints
18const DEFAULT_SYSTEM_PORT: u16 = 9090;
19
20/// Default health endpoint paths
21const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
22const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
23
24/// Default health check configuration
25/// This is the wait time before sending canary health checks when no activity is detected
26pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
27/// Default timeout for individual health check requests
28pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WorkerConfig {
32    /// Grace shutdown period for the system server.
33    pub graceful_shutdown_timeout: u64,
34}
35
36impl WorkerConfig {
37    /// Instantiates and reads server configurations from appropriate sources.
38    /// Panics on invalid configuration.
39    pub fn from_settings() -> Self {
40        // All calls should be global and thread safe.
41        Figment::new()
42            .merge(Serialized::defaults(Self::default()))
43            .merge(Env::prefixed("DYN_WORKER_"))
44            .extract()
45            .unwrap() // safety: Called on startup, so panic is reasonable
46    }
47}
48
49impl Default for WorkerConfig {
50    fn default() -> Self {
51        WorkerConfig {
52            graceful_shutdown_timeout: if cfg!(debug_assertions) {
53                1 // Debug build: 1 second
54            } else {
55                30 // Release build: 30 seconds
56            },
57        }
58    }
59}
60
61#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
62#[serde(rename_all = "lowercase")]
63pub enum HealthStatus {
64    Ready,
65    NotReady,
66}
67
68/// Runtime configuration
69/// Defines the configuration for Tokio runtimes
70#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
71#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
72pub struct RuntimeConfig {
73    /// Number of async worker threads
74    /// If set to 1, the runtime will run in single-threaded mode
75    /// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
76    /// number of cores.
77    #[validate(range(min = 1))]
78    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
79    pub num_worker_threads: Option<usize>,
80
81    /// Maximum number of blocking threads
82    /// Blocking threads are used for blocking operations, this value must be greater than 0.
83    /// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
84    /// 512.
85    #[validate(range(min = 1))]
86    #[builder(default = "512")]
87    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
88    pub max_blocking_threads: usize,
89
90    /// System status server host for health and metrics endpoints
91    /// Set this at runtime with environment variable DYN_SYSTEM_HOST
92    #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
93    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
94    pub system_host: String,
95
96    /// System status server port for health and metrics endpoints
97    /// If set to 0, the system will assign a random available port
98    /// Set this at runtime with environment variable DYN_SYSTEM_PORT
99    #[builder(default = "DEFAULT_SYSTEM_PORT")]
100    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
101    pub system_port: u16,
102
103    /// Health and metrics System status server enabled
104    /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
105    #[builder(default = "false")]
106    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
107    pub system_enabled: bool,
108
109    /// Starting Health Status
110    /// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
111    #[builder(default = "HealthStatus::NotReady")]
112    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
113    pub starting_health_status: HealthStatus,
114
115    /// Use Endpoint Health Status
116    /// When using endpoint health status, health status
117    /// is the AND of individual endpoint health
118    /// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
119    /// with the list of endpoints to consider for system health
120    #[builder(default = "vec![]")]
121    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122    pub use_endpoint_health_status: Vec<String>,
123
124    /// Health endpoint paths
125    /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH
126    #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
127    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
128    pub system_health_path: String,
129    /// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH
130    #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
131    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
132    pub system_live_path: String,
133
134    /// Enable active health checking with payloads
135    /// Set this at runtime with environment variable DYN_HEALTH_CHECK_ENABLED
136    #[builder(default = "false")]
137    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
138    pub health_check_enabled: bool,
139
140    /// Canary wait time in seconds (time to wait before sending health check when no activity)
141    /// Set this at runtime with environment variable DYN_CANARY_WAIT_TIME
142    #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
143    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
144    pub canary_wait_time_secs: u64,
145
146    /// Health check request timeout in seconds
147    /// Set this at runtime with environment variable DYN_HEALTH_CHECK_REQUEST_TIMEOUT
148    #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
149    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
150    pub health_check_request_timeout_secs: u64,
151}
152
153impl fmt::Display for RuntimeConfig {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        // If None, it defaults to "number of cores", so we indicate that.
156        match self.num_worker_threads {
157            Some(val) => write!(f, "num_worker_threads={val}, ")?,
158            None => write!(f, "num_worker_threads=default (num_cores), ")?,
159        }
160
161        write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
162        write!(f, "system_host={}, ", self.system_host)?;
163        write!(f, "system_port={}, ", self.system_port)?;
164        write!(f, "system_enabled={}", self.system_enabled)?;
165        write!(
166            f,
167            "use_endpoint_health_status={:?}",
168            self.use_endpoint_health_status
169        )?;
170        write!(
171            f,
172            "starting_health_status={:?}",
173            self.starting_health_status
174        )?;
175        write!(f, ", system_health_path={}", self.system_health_path)?;
176        write!(f, ", system_live_path={}", self.system_live_path)?;
177        write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
178        write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
179        write!(
180            f,
181            ", health_check_request_timeout_secs={}",
182            self.health_check_request_timeout_secs
183        )?;
184
185        Ok(())
186    }
187}
188
189impl RuntimeConfig {
190    pub fn builder() -> RuntimeConfigBuilder {
191        RuntimeConfigBuilder::default()
192    }
193
194    pub(crate) fn figment() -> Figment {
195        Figment::new()
196            .merge(Serialized::defaults(RuntimeConfig::default()))
197            .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
198            .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
199            .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
200                let full_key = format!("DYN_RUNTIME_{}", k.as_str());
201                // filters out empty environment variables
202                match std::env::var(&full_key) {
203                    Ok(v) if !v.is_empty() => Some(k.into()),
204                    _ => None,
205                }
206            }))
207            .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
208                let full_key = format!("DYN_SYSTEM_{}", k.as_str());
209                // filters out empty environment variables
210                match std::env::var(&full_key) {
211                    Ok(v) if !v.is_empty() => {
212                        // Map DYN_SYSTEM_* to the correct field names
213                        let mapped_key = match k.as_str() {
214                            "HOST" => "system_host",
215                            "PORT" => "system_port",
216                            "ENABLED" => "system_enabled",
217                            "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
218                            "STARTING_HEALTH_STATUS" => "starting_health_status",
219                            "HEALTH_PATH" => "system_health_path",
220                            "LIVE_PATH" => "system_live_path",
221                            _ => k.as_str(),
222                        };
223                        Some(mapped_key.into())
224                    }
225                    _ => None,
226                }
227            }))
228            .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
229                let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
230                // filters out empty environment variables
231                match std::env::var(&full_key) {
232                    Ok(v) if !v.is_empty() => {
233                        // Map DYN_HEALTH_CHECK_* to the correct field names
234                        let mapped_key = match k.as_str() {
235                            "ENABLED" => "health_check_enabled",
236                            "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
237                            _ => k.as_str(),
238                        };
239                        Some(mapped_key.into())
240                    }
241                    _ => None,
242                }
243            }))
244            .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
245                let full_key = format!("DYN_CANARY_{}", k.as_str());
246                // filters out empty environment variables
247                match std::env::var(&full_key) {
248                    Ok(v) if !v.is_empty() => {
249                        // Map DYN_CANARY_* to the correct field names
250                        let mapped_key = match k.as_str() {
251                            "WAIT_TIME" => "canary_wait_time_secs",
252                            _ => k.as_str(),
253                        };
254                        Some(mapped_key.into())
255                    }
256                    _ => None,
257                }
258            }))
259    }
260
261    /// Load the runtime configuration from the environment and configuration files
262    /// Configuration is priorities in the following order, where the last has the lowest priority:
263    /// 1. Environment variables (top priority)
264    ///    TO DO: Add documentation for configuration files. Paths should be configurable.
265    /// 2. /opt/dynamo/etc/runtime.toml
266    /// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
267    ///
268    /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
269    pub fn from_settings() -> Result<RuntimeConfig> {
270        // Check for deprecated environment variable
271        if std::env::var("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS").is_ok() {
272            tracing::warn!(
273                "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
274                System health is now determined by endpoints that register with health check payloads. \
275                Please update your configuration to register health check payloads directly on endpoints."
276            );
277        }
278
279        let config: RuntimeConfig = Self::figment().extract()?;
280        config.validate()?;
281        Ok(config)
282    }
283
284    /// Check if System server should be enabled
285    /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true
286    pub fn system_server_enabled(&self) -> bool {
287        self.system_enabled
288    }
289
290    pub fn single_threaded() -> Self {
291        RuntimeConfig {
292            num_worker_threads: Some(1),
293            max_blocking_threads: 1,
294            system_host: DEFAULT_SYSTEM_HOST.to_string(),
295            system_port: DEFAULT_SYSTEM_PORT,
296            system_enabled: false,
297            starting_health_status: HealthStatus::NotReady,
298            use_endpoint_health_status: vec![],
299            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
300            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
301            health_check_enabled: false,
302            canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
303            health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
304        }
305    }
306
307    /// Create a new default runtime configuration
308    pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
309        tokio::runtime::Builder::new_multi_thread()
310            .worker_threads(
311                self.num_worker_threads
312                    .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
313            )
314            .max_blocking_threads(self.max_blocking_threads)
315            .enable_all()
316            .build()
317    }
318}
319
320impl Default for RuntimeConfig {
321    fn default() -> Self {
322        let num_cores = std::thread::available_parallelism().unwrap().get();
323        Self {
324            num_worker_threads: Some(num_cores),
325            max_blocking_threads: num_cores,
326            system_host: DEFAULT_SYSTEM_HOST.to_string(),
327            system_port: DEFAULT_SYSTEM_PORT,
328            system_enabled: false,
329            starting_health_status: HealthStatus::NotReady,
330            use_endpoint_health_status: vec![],
331            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
332            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
333            health_check_enabled: false,
334            canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
335            health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
336        }
337    }
338}
339
340impl RuntimeConfigBuilder {
341    /// Build and validate the runtime configuration
342    pub fn build(&self) -> Result<RuntimeConfig> {
343        let config = self.build_internal()?;
344        config.validate()?;
345        Ok(config)
346    }
347}
348
349/// Check if a string is truthy
350/// This will be used to evaluate environment variables or any other subjective
351/// configuration parameters that can be set by the user that should be evaluated
352/// as a boolean value.
353pub fn is_truthy(val: &str) -> bool {
354    matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
355}
356
357/// Check if a string is falsey
358/// This will be used to evaluate environment variables or any other subjective
359/// configuration parameters that can be set by the user that should be evaluated
360/// as a boolean value (opposite of is_truthy).
361pub fn is_falsey(val: &str) -> bool {
362    matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
363}
364
365/// Check if an environment variable is truthy
366pub fn env_is_truthy(env: &str) -> bool {
367    match std::env::var(env) {
368        Ok(val) => is_truthy(val.as_str()),
369        Err(_) => false,
370    }
371}
372
373/// Check if an environment variable is falsey
374pub fn env_is_falsey(env: &str) -> bool {
375    match std::env::var(env) {
376        Ok(val) => is_falsey(val.as_str()),
377        Err(_) => false,
378    }
379}
380
381/// Check whether JSONL logging enabled
382/// Set the `DYN_LOGGING_JSONL` environment variable a [`is_truthy`] value
383pub fn jsonl_logging_enabled() -> bool {
384    env_is_truthy("DYN_LOGGING_JSONL")
385}
386
387/// Check whether logging with ANSI terminal escape codes and colors is disabled.
388/// Set the `DYN_SDK_DISABLE_ANSI_LOGGING` environment variable a [`is_truthy`] value
389pub fn disable_ansi_logging() -> bool {
390    env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
391}
392
393/// Check whether to use local timezone for logging timestamps (default is UTC)
394/// Set the `DYN_LOG_USE_LOCAL_TZ` environment variable to a [`is_truthy`] value
395pub fn use_local_timezone() -> bool {
396    env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn test_runtime_config_with_env_vars() -> Result<()> {
405        temp_env::with_vars(
406            vec![
407                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
408                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
409            ],
410            || {
411                let config = RuntimeConfig::from_settings()?;
412                assert_eq!(config.num_worker_threads, Some(24));
413                assert_eq!(config.max_blocking_threads, 32);
414                Ok(())
415            },
416        )
417    }
418
419    #[test]
420    fn test_runtime_config_defaults() -> Result<()> {
421        temp_env::with_vars(
422            vec![
423                ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
424                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
425            ],
426            || {
427                let config = RuntimeConfig::from_settings()?;
428
429                let default_config = RuntimeConfig::default();
430                assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
431                assert_eq!(
432                    config.max_blocking_threads,
433                    default_config.max_blocking_threads
434                );
435                Ok(())
436            },
437        )
438    }
439
440    #[test]
441    fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
442        temp_env::with_vars(
443            vec![
444                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
445                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
446            ],
447            || {
448                let result = RuntimeConfig::from_settings();
449                assert!(result.is_err());
450                if let Err(e) = result {
451                    assert!(
452                        e.to_string()
453                            .contains("num_worker_threads: Validation error")
454                    );
455                    assert!(
456                        e.to_string()
457                            .contains("max_blocking_threads: Validation error")
458                    );
459                }
460                Ok(())
461            },
462        )
463    }
464
465    #[test]
466    fn test_runtime_config_system_server_env_vars() -> Result<()> {
467        temp_env::with_vars(
468            vec![
469                ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
470                ("DYN_SYSTEM_PORT", Some("9090")),
471            ],
472            || {
473                let config = RuntimeConfig::from_settings()?;
474                assert_eq!(config.system_host, "127.0.0.1");
475                assert_eq!(config.system_port, 9090);
476                Ok(())
477            },
478        )
479    }
480
481    #[test]
482    fn test_system_server_enabled_by_default() {
483        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
484            let config = RuntimeConfig::from_settings().unwrap();
485            assert!(!config.system_server_enabled());
486        });
487    }
488
489    #[test]
490    fn test_system_server_disabled_explicitly() {
491        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
492            let config = RuntimeConfig::from_settings().unwrap();
493            assert!(!config.system_server_enabled());
494        });
495    }
496
497    #[test]
498    fn test_system_server_enabled_explicitly() {
499        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
500            let config = RuntimeConfig::from_settings().unwrap();
501            assert!(config.system_server_enabled());
502        });
503    }
504
505    #[test]
506    fn test_system_server_enabled_by_port() {
507        temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
508            let config = RuntimeConfig::from_settings().unwrap();
509            assert!(!config.system_server_enabled());
510            assert_eq!(config.system_port, 8080);
511        });
512    }
513
514    #[test]
515    fn test_system_server_starting_health_status_ready() {
516        temp_env::with_vars(
517            vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
518            || {
519                let config = RuntimeConfig::from_settings().unwrap();
520                assert!(config.starting_health_status == HealthStatus::Ready);
521            },
522        );
523    }
524
525    #[test]
526    fn test_system_use_endpoint_health_status() {
527        temp_env::with_vars(
528            vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
529            || {
530                let config = RuntimeConfig::from_settings().unwrap();
531                assert!(config.use_endpoint_health_status == vec!["ready"]);
532            },
533        );
534    }
535
536    #[test]
537    fn test_system_health_endpoint_path_default() {
538        temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
539            let config = RuntimeConfig::from_settings().unwrap();
540            assert_eq!(
541                config.system_health_path,
542                DEFAULT_SYSTEM_HEALTH_PATH.to_string()
543            );
544        });
545
546        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
547            let config = RuntimeConfig::from_settings().unwrap();
548            assert_eq!(
549                config.system_live_path,
550                DEFAULT_SYSTEM_LIVE_PATH.to_string()
551            );
552        });
553    }
554
555    #[test]
556    fn test_system_health_endpoint_path_custom() {
557        temp_env::with_vars(
558            vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
559            || {
560                let config = RuntimeConfig::from_settings().unwrap();
561                assert_eq!(config.system_health_path, "/custom/health");
562            },
563        );
564
565        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
566            let config = RuntimeConfig::from_settings().unwrap();
567            assert_eq!(config.system_live_path, "/custom/live");
568        });
569    }
570
571    #[test]
572    fn test_is_truthy_and_falsey() {
573        // Test truthy values
574        assert!(is_truthy("1"));
575        assert!(is_truthy("true"));
576        assert!(is_truthy("TRUE"));
577        assert!(is_truthy("on"));
578        assert!(is_truthy("yes"));
579
580        // Test falsey values
581        assert!(is_falsey("0"));
582        assert!(is_falsey("false"));
583        assert!(is_falsey("FALSE"));
584        assert!(is_falsey("off"));
585        assert!(is_falsey("no"));
586
587        // Test opposite behavior
588        assert!(!is_truthy("0"));
589        assert!(!is_falsey("1"));
590
591        // Test env functions
592        temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
593            assert!(env_is_truthy("TEST_TRUTHY"));
594            assert!(!env_is_falsey("TEST_TRUTHY"));
595        });
596
597        temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
598            assert!(!env_is_truthy("TEST_FALSEY"));
599            assert!(env_is_falsey("TEST_FALSEY"));
600        });
601
602        // Test missing env vars
603        temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
604            assert!(!env_is_truthy("TEST_MISSING"));
605            assert!(!env_is_falsey("TEST_MISSING"));
606        });
607    }
608}