1use super::Result;
5use derive_builder::Builder;
6use figment::{
7    Figment,
8    providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use validator::Validate;
13
14const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16
17const DEFAULT_SYSTEM_PORT: u16 = 9090;
19
20const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
22const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
23
24pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
27pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WorkerConfig {
32    pub graceful_shutdown_timeout: u64,
34}
35
36impl WorkerConfig {
37    pub fn from_settings() -> Self {
40        Figment::new()
42            .merge(Serialized::defaults(Self::default()))
43            .merge(Env::prefixed("DYN_WORKER_"))
44            .extract()
45            .unwrap() }
47}
48
49impl Default for WorkerConfig {
50    fn default() -> Self {
51        WorkerConfig {
52            graceful_shutdown_timeout: if cfg!(debug_assertions) {
53                1 } else {
55                30 },
57        }
58    }
59}
60
61#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
62#[serde(rename_all = "lowercase")]
63pub enum HealthStatus {
64    Ready,
65    NotReady,
66}
67
68#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
71#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
72pub struct RuntimeConfig {
73    #[validate(range(min = 1))]
78    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
79    pub num_worker_threads: Option<usize>,
80
81    #[validate(range(min = 1))]
86    #[builder(default = "512")]
87    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
88    pub max_blocking_threads: usize,
89
90    #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
93    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
94    pub system_host: String,
95
96    #[builder(default = "DEFAULT_SYSTEM_PORT")]
100    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
101    pub system_port: u16,
102
103    #[builder(default = "false")]
106    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
107    pub system_enabled: bool,
108
109    #[builder(default = "HealthStatus::NotReady")]
112    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
113    pub starting_health_status: HealthStatus,
114
115    #[builder(default = "vec![]")]
121    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122    pub use_endpoint_health_status: Vec<String>,
123
124    #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
127    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
128    pub system_health_path: String,
129    #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
131    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
132    pub system_live_path: String,
133
134    #[builder(default = "false")]
137    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
138    pub health_check_enabled: bool,
139
140    #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
143    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
144    pub canary_wait_time_secs: u64,
145
146    #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
149    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
150    pub health_check_request_timeout_secs: u64,
151}
152
153impl fmt::Display for RuntimeConfig {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        match self.num_worker_threads {
157            Some(val) => write!(f, "num_worker_threads={val}, ")?,
158            None => write!(f, "num_worker_threads=default (num_cores), ")?,
159        }
160
161        write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
162        write!(f, "system_host={}, ", self.system_host)?;
163        write!(f, "system_port={}, ", self.system_port)?;
164        write!(f, "system_enabled={}", self.system_enabled)?;
165        write!(
166            f,
167            "use_endpoint_health_status={:?}",
168            self.use_endpoint_health_status
169        )?;
170        write!(
171            f,
172            "starting_health_status={:?}",
173            self.starting_health_status
174        )?;
175        write!(f, ", system_health_path={}", self.system_health_path)?;
176        write!(f, ", system_live_path={}", self.system_live_path)?;
177        write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
178        write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
179        write!(
180            f,
181            ", health_check_request_timeout_secs={}",
182            self.health_check_request_timeout_secs
183        )?;
184
185        Ok(())
186    }
187}
188
189impl RuntimeConfig {
190    pub fn builder() -> RuntimeConfigBuilder {
191        RuntimeConfigBuilder::default()
192    }
193
194    pub(crate) fn figment() -> Figment {
195        Figment::new()
196            .merge(Serialized::defaults(RuntimeConfig::default()))
197            .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
198            .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
199            .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
200                let full_key = format!("DYN_RUNTIME_{}", k.as_str());
201                match std::env::var(&full_key) {
203                    Ok(v) if !v.is_empty() => Some(k.into()),
204                    _ => None,
205                }
206            }))
207            .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
208                let full_key = format!("DYN_SYSTEM_{}", k.as_str());
209                match std::env::var(&full_key) {
211                    Ok(v) if !v.is_empty() => {
212                        let mapped_key = match k.as_str() {
214                            "HOST" => "system_host",
215                            "PORT" => "system_port",
216                            "ENABLED" => "system_enabled",
217                            "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
218                            "STARTING_HEALTH_STATUS" => "starting_health_status",
219                            "HEALTH_PATH" => "system_health_path",
220                            "LIVE_PATH" => "system_live_path",
221                            _ => k.as_str(),
222                        };
223                        Some(mapped_key.into())
224                    }
225                    _ => None,
226                }
227            }))
228            .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
229                let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
230                match std::env::var(&full_key) {
232                    Ok(v) if !v.is_empty() => {
233                        let mapped_key = match k.as_str() {
235                            "ENABLED" => "health_check_enabled",
236                            "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
237                            _ => k.as_str(),
238                        };
239                        Some(mapped_key.into())
240                    }
241                    _ => None,
242                }
243            }))
244            .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
245                let full_key = format!("DYN_CANARY_{}", k.as_str());
246                match std::env::var(&full_key) {
248                    Ok(v) if !v.is_empty() => {
249                        let mapped_key = match k.as_str() {
251                            "WAIT_TIME" => "canary_wait_time_secs",
252                            _ => k.as_str(),
253                        };
254                        Some(mapped_key.into())
255                    }
256                    _ => None,
257                }
258            }))
259    }
260
261    pub fn from_settings() -> Result<RuntimeConfig> {
270        if std::env::var("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS").is_ok() {
272            tracing::warn!(
273                "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
274                System health is now determined by endpoints that register with health check payloads. \
275                Please update your configuration to register health check payloads directly on endpoints."
276            );
277        }
278
279        let config: RuntimeConfig = Self::figment().extract()?;
280        config.validate()?;
281        Ok(config)
282    }
283
284    pub fn system_server_enabled(&self) -> bool {
287        self.system_enabled
288    }
289
290    pub fn single_threaded() -> Self {
291        RuntimeConfig {
292            num_worker_threads: Some(1),
293            max_blocking_threads: 1,
294            system_host: DEFAULT_SYSTEM_HOST.to_string(),
295            system_port: DEFAULT_SYSTEM_PORT,
296            system_enabled: false,
297            starting_health_status: HealthStatus::NotReady,
298            use_endpoint_health_status: vec![],
299            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
300            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
301            health_check_enabled: false,
302            canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
303            health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
304        }
305    }
306
307    pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
309        tokio::runtime::Builder::new_multi_thread()
310            .worker_threads(
311                self.num_worker_threads
312                    .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
313            )
314            .max_blocking_threads(self.max_blocking_threads)
315            .enable_all()
316            .build()
317    }
318}
319
320impl Default for RuntimeConfig {
321    fn default() -> Self {
322        let num_cores = std::thread::available_parallelism().unwrap().get();
323        Self {
324            num_worker_threads: Some(num_cores),
325            max_blocking_threads: num_cores,
326            system_host: DEFAULT_SYSTEM_HOST.to_string(),
327            system_port: DEFAULT_SYSTEM_PORT,
328            system_enabled: false,
329            starting_health_status: HealthStatus::NotReady,
330            use_endpoint_health_status: vec![],
331            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
332            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
333            health_check_enabled: false,
334            canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
335            health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
336        }
337    }
338}
339
340impl RuntimeConfigBuilder {
341    pub fn build(&self) -> Result<RuntimeConfig> {
343        let config = self.build_internal()?;
344        config.validate()?;
345        Ok(config)
346    }
347}
348
349pub fn is_truthy(val: &str) -> bool {
354    matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
355}
356
357pub fn is_falsey(val: &str) -> bool {
362    matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
363}
364
365pub fn env_is_truthy(env: &str) -> bool {
367    match std::env::var(env) {
368        Ok(val) => is_truthy(val.as_str()),
369        Err(_) => false,
370    }
371}
372
373pub fn env_is_falsey(env: &str) -> bool {
375    match std::env::var(env) {
376        Ok(val) => is_falsey(val.as_str()),
377        Err(_) => false,
378    }
379}
380
381pub fn jsonl_logging_enabled() -> bool {
384    env_is_truthy("DYN_LOGGING_JSONL")
385}
386
387pub fn disable_ansi_logging() -> bool {
390    env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
391}
392
393pub fn use_local_timezone() -> bool {
396    env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402
403    #[test]
404    fn test_runtime_config_with_env_vars() -> Result<()> {
405        temp_env::with_vars(
406            vec![
407                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
408                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
409            ],
410            || {
411                let config = RuntimeConfig::from_settings()?;
412                assert_eq!(config.num_worker_threads, Some(24));
413                assert_eq!(config.max_blocking_threads, 32);
414                Ok(())
415            },
416        )
417    }
418
419    #[test]
420    fn test_runtime_config_defaults() -> Result<()> {
421        temp_env::with_vars(
422            vec![
423                ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
424                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
425            ],
426            || {
427                let config = RuntimeConfig::from_settings()?;
428
429                let default_config = RuntimeConfig::default();
430                assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
431                assert_eq!(
432                    config.max_blocking_threads,
433                    default_config.max_blocking_threads
434                );
435                Ok(())
436            },
437        )
438    }
439
440    #[test]
441    fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
442        temp_env::with_vars(
443            vec![
444                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
445                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
446            ],
447            || {
448                let result = RuntimeConfig::from_settings();
449                assert!(result.is_err());
450                if let Err(e) = result {
451                    assert!(
452                        e.to_string()
453                            .contains("num_worker_threads: Validation error")
454                    );
455                    assert!(
456                        e.to_string()
457                            .contains("max_blocking_threads: Validation error")
458                    );
459                }
460                Ok(())
461            },
462        )
463    }
464
465    #[test]
466    fn test_runtime_config_system_server_env_vars() -> Result<()> {
467        temp_env::with_vars(
468            vec![
469                ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
470                ("DYN_SYSTEM_PORT", Some("9090")),
471            ],
472            || {
473                let config = RuntimeConfig::from_settings()?;
474                assert_eq!(config.system_host, "127.0.0.1");
475                assert_eq!(config.system_port, 9090);
476                Ok(())
477            },
478        )
479    }
480
481    #[test]
482    fn test_system_server_enabled_by_default() {
483        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
484            let config = RuntimeConfig::from_settings().unwrap();
485            assert!(!config.system_server_enabled());
486        });
487    }
488
489    #[test]
490    fn test_system_server_disabled_explicitly() {
491        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
492            let config = RuntimeConfig::from_settings().unwrap();
493            assert!(!config.system_server_enabled());
494        });
495    }
496
497    #[test]
498    fn test_system_server_enabled_explicitly() {
499        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
500            let config = RuntimeConfig::from_settings().unwrap();
501            assert!(config.system_server_enabled());
502        });
503    }
504
505    #[test]
506    fn test_system_server_enabled_by_port() {
507        temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
508            let config = RuntimeConfig::from_settings().unwrap();
509            assert!(!config.system_server_enabled());
510            assert_eq!(config.system_port, 8080);
511        });
512    }
513
514    #[test]
515    fn test_system_server_starting_health_status_ready() {
516        temp_env::with_vars(
517            vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
518            || {
519                let config = RuntimeConfig::from_settings().unwrap();
520                assert!(config.starting_health_status == HealthStatus::Ready);
521            },
522        );
523    }
524
525    #[test]
526    fn test_system_use_endpoint_health_status() {
527        temp_env::with_vars(
528            vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
529            || {
530                let config = RuntimeConfig::from_settings().unwrap();
531                assert!(config.use_endpoint_health_status == vec!["ready"]);
532            },
533        );
534    }
535
536    #[test]
537    fn test_system_health_endpoint_path_default() {
538        temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
539            let config = RuntimeConfig::from_settings().unwrap();
540            assert_eq!(
541                config.system_health_path,
542                DEFAULT_SYSTEM_HEALTH_PATH.to_string()
543            );
544        });
545
546        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
547            let config = RuntimeConfig::from_settings().unwrap();
548            assert_eq!(
549                config.system_live_path,
550                DEFAULT_SYSTEM_LIVE_PATH.to_string()
551            );
552        });
553    }
554
555    #[test]
556    fn test_system_health_endpoint_path_custom() {
557        temp_env::with_vars(
558            vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
559            || {
560                let config = RuntimeConfig::from_settings().unwrap();
561                assert_eq!(config.system_health_path, "/custom/health");
562            },
563        );
564
565        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
566            let config = RuntimeConfig::from_settings().unwrap();
567            assert_eq!(config.system_live_path, "/custom/live");
568        });
569    }
570
571    #[test]
572    fn test_is_truthy_and_falsey() {
573        assert!(is_truthy("1"));
575        assert!(is_truthy("true"));
576        assert!(is_truthy("TRUE"));
577        assert!(is_truthy("on"));
578        assert!(is_truthy("yes"));
579
580        assert!(is_falsey("0"));
582        assert!(is_falsey("false"));
583        assert!(is_falsey("FALSE"));
584        assert!(is_falsey("off"));
585        assert!(is_falsey("no"));
586
587        assert!(!is_truthy("0"));
589        assert!(!is_falsey("1"));
590
591        temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
593            assert!(env_is_truthy("TEST_TRUTHY"));
594            assert!(!env_is_falsey("TEST_TRUTHY"));
595        });
596
597        temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
598            assert!(!env_is_truthy("TEST_FALSEY"));
599            assert!(env_is_falsey("TEST_FALSEY"));
600        });
601
602        temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
604            assert!(!env_is_truthy("TEST_MISSING"));
605            assert!(!env_is_falsey("TEST_MISSING"));
606        });
607    }
608}