1use anyhow::Result;
5use derive_builder::Builder;
6use figment::{
7 Figment,
8 providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use std::sync::OnceLock;
13use validator::Validate;
14
15pub mod environment_names;
16
17const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
19
20const DEFAULT_SYSTEM_PORT: i16 = -1;
22
23const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
25const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
26
27pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
30pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WorkerConfig {
35 pub graceful_shutdown_timeout: u64,
37}
38
39impl WorkerConfig {
40 pub fn from_settings() -> Self {
43 Figment::new()
45 .merge(Serialized::defaults(Self::default()))
46 .merge(Env::prefixed("DYN_WORKER_"))
47 .extract()
48 .unwrap() }
50}
51
52impl Default for WorkerConfig {
53 fn default() -> Self {
54 WorkerConfig {
55 graceful_shutdown_timeout: if cfg!(debug_assertions) {
56 1 } else {
58 30 },
60 }
61 }
62}
63
64#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
65#[serde(rename_all = "lowercase")]
66pub enum HealthStatus {
67 Ready,
68 NotReady,
69}
70
71#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
74#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
75pub struct RuntimeConfig {
76 #[validate(range(min = 1))]
81 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
82 pub num_worker_threads: Option<usize>,
83
84 #[validate(range(min = 1))]
89 #[builder(default = "512")]
90 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
91 pub max_blocking_threads: usize,
92
93 #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
96 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
97 pub system_host: String,
98
99 #[builder(default = "DEFAULT_SYSTEM_PORT")]
105 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
106 pub system_port: i16,
107
108 #[deprecated(
112 note = "Use system_port instead. Set DYN_SYSTEM_PORT to enable the system metrics server."
113 )]
114 #[builder(default = "false")]
115 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
116 pub system_enabled: bool,
117
118 #[builder(default = "HealthStatus::NotReady")]
121 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122 pub starting_health_status: HealthStatus,
123
124 #[builder(default = "vec![]")]
130 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
131 pub use_endpoint_health_status: Vec<String>,
132
133 #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
136 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
137 pub system_health_path: String,
138 #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
140 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
141 pub system_live_path: String,
142
143 #[builder(default = "None")]
147 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
148 pub compute_threads: Option<usize>,
149
150 #[builder(default = "Some(2 * 1024 * 1024)")]
154 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
155 pub compute_stack_size: Option<usize>,
156
157 #[builder(default = "\"compute\".to_string()")]
160 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
161 pub compute_thread_prefix: String,
162
163 #[builder(default = "false")]
166 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
167 pub health_check_enabled: bool,
168
169 #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
172 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
173 pub canary_wait_time_secs: u64,
174
175 #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
178 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
179 pub health_check_request_timeout_secs: u64,
180}
181
182impl fmt::Display for RuntimeConfig {
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 match self.num_worker_threads {
186 Some(val) => write!(f, "num_worker_threads={val}, ")?,
187 None => write!(f, "num_worker_threads=default (num_cores), ")?,
188 }
189
190 write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
191 write!(f, "system_host={}, ", self.system_host)?;
192 write!(f, "system_port={}, ", self.system_port)?;
193 write!(
194 f,
195 "use_endpoint_health_status={:?}",
196 self.use_endpoint_health_status
197 )?;
198 write!(
199 f,
200 "starting_health_status={:?}",
201 self.starting_health_status
202 )?;
203 write!(f, ", system_health_path={}", self.system_health_path)?;
204 write!(f, ", system_live_path={}", self.system_live_path)?;
205 write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
206 write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
207 write!(
208 f,
209 ", health_check_request_timeout_secs={}",
210 self.health_check_request_timeout_secs
211 )?;
212
213 Ok(())
214 }
215}
216
217impl RuntimeConfig {
218 pub fn builder() -> RuntimeConfigBuilder {
219 RuntimeConfigBuilder::default()
220 }
221
222 pub(crate) fn figment() -> Figment {
223 Figment::new()
224 .merge(Serialized::defaults(RuntimeConfig::default()))
225 .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
226 .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
227 .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
228 let full_key = format!("DYN_RUNTIME_{}", k.as_str());
229 match std::env::var(&full_key) {
231 Ok(v) if !v.is_empty() => Some(k.into()),
232 _ => None,
233 }
234 }))
235 .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
236 let full_key = format!("DYN_SYSTEM_{}", k.as_str());
237 match std::env::var(&full_key) {
239 Ok(v) if !v.is_empty() => {
240 let mapped_key = match k.as_str() {
242 "HOST" => "system_host",
243 "PORT" => "system_port",
244 "ENABLED" => "system_enabled",
245 "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
246 "STARTING_HEALTH_STATUS" => "starting_health_status",
247 "HEALTH_PATH" => "system_health_path",
248 "LIVE_PATH" => "system_live_path",
249 _ => k.as_str(),
250 };
251 Some(mapped_key.into())
252 }
253 _ => None,
254 }
255 }))
256 .merge(Env::prefixed("DYN_COMPUTE_").filter_map(|k| {
257 let full_key = format!("DYN_COMPUTE_{}", k.as_str());
258 match std::env::var(&full_key) {
260 Ok(v) if !v.is_empty() => {
261 let mapped_key = match k.as_str() {
263 "THREADS" => "compute_threads",
264 "STACK_SIZE" => "compute_stack_size",
265 "THREAD_PREFIX" => "compute_thread_prefix",
266 _ => k.as_str(),
267 };
268 Some(mapped_key.into())
269 }
270 _ => None,
271 }
272 }))
273 .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
274 let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
275 match std::env::var(&full_key) {
277 Ok(v) if !v.is_empty() => {
278 let mapped_key = match k.as_str() {
280 "ENABLED" => "health_check_enabled",
281 "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
282 _ => k.as_str(),
283 };
284 Some(mapped_key.into())
285 }
286 _ => None,
287 }
288 }))
289 .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
290 let full_key = format!("DYN_CANARY_{}", k.as_str());
291 match std::env::var(&full_key) {
293 Ok(v) if !v.is_empty() => {
294 let mapped_key = match k.as_str() {
296 "WAIT_TIME" => "canary_wait_time_secs",
297 _ => k.as_str(),
298 };
299 Some(mapped_key.into())
300 }
301 _ => None,
302 }
303 }))
304 }
305
306 pub fn from_settings() -> Result<RuntimeConfig> {
315 use environment_names::runtime::system as env_system;
316 if std::env::var(env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS).is_ok() {
318 tracing::warn!(
319 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
320 System health is now determined by endpoints that register with health check payloads. \
321 Please update your configuration to register health check payloads directly on endpoints."
322 );
323 }
324
325 if std::env::var(env_system::DYN_SYSTEM_ENABLED).is_ok() {
326 tracing::warn!(
327 "DYN_SYSTEM_ENABLED is deprecated. \
328 System metrics server is now controlled solely by DYN_SYSTEM_PORT. \
329 Set DYN_SYSTEM_PORT to a positive value to enable the server, or set to -1 to disable (default)."
330 );
331 }
332
333 let config: RuntimeConfig = Self::figment().extract()?;
334 config.validate()?;
335 Ok(config)
336 }
337
338 pub fn system_server_enabled(&self) -> bool {
343 self.system_port >= 0
344 }
345
346 pub fn single_threaded() -> Self {
347 RuntimeConfig {
348 num_worker_threads: Some(1),
349 max_blocking_threads: 1,
350 system_host: DEFAULT_SYSTEM_HOST.to_string(),
351 system_port: DEFAULT_SYSTEM_PORT,
352 #[allow(deprecated)]
353 system_enabled: false,
354 starting_health_status: HealthStatus::NotReady,
355 use_endpoint_health_status: vec![],
356 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
357 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
358 compute_threads: Some(1),
359 compute_stack_size: Some(2 * 1024 * 1024),
360 compute_thread_prefix: "compute".to_string(),
361 health_check_enabled: false,
362 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
363 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
364 }
365 }
366
367 pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
369 tokio::runtime::Builder::new_multi_thread()
370 .worker_threads(
371 self.num_worker_threads
372 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
373 )
374 .max_blocking_threads(self.max_blocking_threads)
375 .enable_all()
376 .build()
377 }
378}
379
380impl Default for RuntimeConfig {
381 fn default() -> Self {
382 let num_cores = std::thread::available_parallelism().unwrap().get();
383 Self {
384 num_worker_threads: Some(num_cores),
385 max_blocking_threads: num_cores,
386 system_host: DEFAULT_SYSTEM_HOST.to_string(),
387 system_port: DEFAULT_SYSTEM_PORT,
388 #[allow(deprecated)]
389 system_enabled: false,
390 starting_health_status: HealthStatus::NotReady,
391 use_endpoint_health_status: vec![],
392 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
393 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
394 compute_threads: None,
395 compute_stack_size: Some(2 * 1024 * 1024),
396 compute_thread_prefix: "compute".to_string(),
397 health_check_enabled: false,
398 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
399 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
400 }
401 }
402}
403
404impl RuntimeConfigBuilder {
405 pub fn build(&self) -> Result<RuntimeConfig> {
407 let config = self.build_internal()?;
408 config.validate()?;
409 Ok(config)
410 }
411}
412
413pub fn is_truthy(val: &str) -> bool {
418 matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
419}
420
421pub fn parse_bool(val: &str) -> anyhow::Result<bool> {
422 if is_truthy(val) {
423 Ok(true)
424 } else if is_falsey(val) {
425 Ok(false)
426 } else {
427 anyhow::bail!(
428 "Invalid boolean value: '{}'. Expected one of: true/false, 1/0, on/off, yes/no",
429 val
430 )
431 }
432}
433
434pub fn is_falsey(val: &str) -> bool {
439 matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
440}
441
442pub fn env_is_truthy(env: &str) -> bool {
444 match std::env::var(env) {
445 Ok(val) => is_truthy(val.as_str()),
446 Err(_) => false,
447 }
448}
449
450pub fn env_is_falsey(env: &str) -> bool {
452 match std::env::var(env) {
453 Ok(val) => is_falsey(val.as_str()),
454 Err(_) => false,
455 }
456}
457
458pub fn jsonl_logging_enabled() -> bool {
461 env_is_truthy(environment_names::logging::DYN_LOGGING_JSONL)
462}
463
464pub fn disable_ansi_logging() -> bool {
467 env_is_truthy(environment_names::logging::DYN_SDK_DISABLE_ANSI_LOGGING)
468}
469
470pub fn use_local_timezone() -> bool {
473 env_is_truthy(environment_names::logging::DYN_LOG_USE_LOCAL_TZ)
474}
475
476pub fn span_events_enabled() -> bool {
478 env_is_truthy(environment_names::logging::DYN_LOGGING_SPAN_EVENTS)
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484
485 #[test]
486 fn test_runtime_config_with_env_vars() -> Result<()> {
487 use environment_names::runtime;
488 temp_env::with_vars(
489 vec![
490 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("24")),
491 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("32")),
492 ],
493 || {
494 let config = RuntimeConfig::from_settings()?;
495 assert_eq!(config.num_worker_threads, Some(24));
496 assert_eq!(config.max_blocking_threads, 32);
497 Ok(())
498 },
499 )
500 }
501
502 #[test]
503 fn test_runtime_config_defaults() -> Result<()> {
504 use environment_names::runtime;
505 temp_env::with_vars(
506 vec![
507 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, None::<&str>),
508 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("")),
509 ],
510 || {
511 let config = RuntimeConfig::from_settings()?;
512
513 let default_config = RuntimeConfig::default();
514 assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
515 assert_eq!(
516 config.max_blocking_threads,
517 default_config.max_blocking_threads
518 );
519 Ok(())
520 },
521 )
522 }
523
524 #[test]
525 fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
526 use environment_names::runtime;
527 temp_env::with_vars(
528 vec![
529 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("0")),
530 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("0")),
531 ],
532 || {
533 let result = RuntimeConfig::from_settings();
534 assert!(result.is_err());
535 if let Err(e) = result {
536 assert!(
537 e.to_string()
538 .contains("num_worker_threads: Validation error")
539 );
540 assert!(
541 e.to_string()
542 .contains("max_blocking_threads: Validation error")
543 );
544 }
545 Ok(())
546 },
547 )
548 }
549
550 #[test]
551 fn test_runtime_config_system_server_env_vars() -> Result<()> {
552 use environment_names::runtime::system;
553 temp_env::with_vars(
554 vec![
555 (system::DYN_SYSTEM_HOST, Some("127.0.0.1")),
556 (system::DYN_SYSTEM_PORT, Some("9090")),
557 ],
558 || {
559 let config = RuntimeConfig::from_settings()?;
560 assert_eq!(config.system_host, "127.0.0.1");
561 assert_eq!(config.system_port, 9090);
562 Ok(())
563 },
564 )
565 }
566
567 #[test]
568 fn test_system_server_disabled_by_default() {
569 use environment_names::runtime::system;
570 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, None::<&str>)], || {
571 let config = RuntimeConfig::from_settings().unwrap();
572 assert!(!config.system_server_enabled());
573 assert_eq!(config.system_port, -1);
574 });
575 }
576
577 #[test]
578 fn test_system_server_disabled_with_negative_port() {
579 use environment_names::runtime::system;
580 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("-1"))], || {
581 let config = RuntimeConfig::from_settings().unwrap();
582 assert!(!config.system_server_enabled());
583 assert_eq!(config.system_port, -1);
584 });
585 }
586
587 #[test]
588 fn test_system_server_enabled_with_port() {
589 use environment_names::runtime::system;
590 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("9527"))], || {
591 let config = RuntimeConfig::from_settings().unwrap();
592 assert!(config.system_server_enabled());
593 assert_eq!(config.system_port, 9527);
594 });
595 }
596
597 #[test]
598 fn test_system_server_starting_health_status_ready() {
599 use environment_names::runtime::system;
600 temp_env::with_vars(
601 vec![(system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready"))],
602 || {
603 let config = RuntimeConfig::from_settings().unwrap();
604 assert!(config.starting_health_status == HealthStatus::Ready);
605 },
606 );
607 }
608
609 #[test]
610 fn test_system_use_endpoint_health_status() {
611 use environment_names::runtime::system;
612 temp_env::with_vars(
613 vec![(
614 system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
615 Some("[\"ready\"]"),
616 )],
617 || {
618 let config = RuntimeConfig::from_settings().unwrap();
619 assert!(config.use_endpoint_health_status == vec!["ready"]);
620 },
621 );
622 }
623
624 #[test]
625 fn test_system_health_endpoint_path_default() {
626 use environment_names::runtime::system;
627 temp_env::with_vars(vec![(system::DYN_SYSTEM_HEALTH_PATH, None::<&str>)], || {
628 let config = RuntimeConfig::from_settings().unwrap();
629 assert_eq!(
630 config.system_health_path,
631 DEFAULT_SYSTEM_HEALTH_PATH.to_string()
632 );
633 });
634
635 temp_env::with_vars(vec![(system::DYN_SYSTEM_LIVE_PATH, None::<&str>)], || {
636 let config = RuntimeConfig::from_settings().unwrap();
637 assert_eq!(
638 config.system_live_path,
639 DEFAULT_SYSTEM_LIVE_PATH.to_string()
640 );
641 });
642 }
643
644 #[test]
645 fn test_system_health_endpoint_path_custom() {
646 use environment_names::runtime::system;
647 temp_env::with_vars(
648 vec![(system::DYN_SYSTEM_HEALTH_PATH, Some("/custom/health"))],
649 || {
650 let config = RuntimeConfig::from_settings().unwrap();
651 assert_eq!(config.system_health_path, "/custom/health");
652 },
653 );
654
655 temp_env::with_vars(
656 vec![(system::DYN_SYSTEM_LIVE_PATH, Some("/custom/live"))],
657 || {
658 let config = RuntimeConfig::from_settings().unwrap();
659 assert_eq!(config.system_live_path, "/custom/live");
660 },
661 );
662 }
663
664 #[test]
665 fn test_is_truthy_and_falsey() {
666 assert!(is_truthy("1"));
668 assert!(is_truthy("true"));
669 assert!(is_truthy("TRUE"));
670 assert!(is_truthy("on"));
671 assert!(is_truthy("yes"));
672
673 assert!(is_falsey("0"));
675 assert!(is_falsey("false"));
676 assert!(is_falsey("FALSE"));
677 assert!(is_falsey("off"));
678 assert!(is_falsey("no"));
679
680 assert!(!is_truthy("0"));
682 assert!(!is_falsey("1"));
683
684 temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
686 assert!(env_is_truthy("TEST_TRUTHY"));
687 assert!(!env_is_falsey("TEST_TRUTHY"));
688 });
689
690 temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
691 assert!(!env_is_truthy("TEST_FALSEY"));
692 assert!(env_is_falsey("TEST_FALSEY"));
693 });
694
695 temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
697 assert!(!env_is_truthy("TEST_MISSING"));
698 assert!(!env_is_falsey("TEST_MISSING"));
699 });
700 }
701}