1use anyhow::Result;
5use derive_builder::Builder;
6use figment::{
7 Figment,
8 providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use std::sync::OnceLock;
13use validator::Validate;
14
15pub mod environment_names;
16
17const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
19
20const DEFAULT_SYSTEM_PORT: i16 = -1;
22
23const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
25const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
26
27pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
30pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WorkerConfig {
35 pub graceful_shutdown_timeout: u64,
37}
38
39impl WorkerConfig {
40 pub fn from_settings() -> Self {
43 Figment::new()
45 .merge(Serialized::defaults(Self::default()))
46 .merge(Env::prefixed("DYN_WORKER_"))
47 .extract()
48 .unwrap() }
50}
51
52impl Default for WorkerConfig {
53 fn default() -> Self {
54 WorkerConfig {
55 graceful_shutdown_timeout: if cfg!(debug_assertions) {
56 1 } else {
58 30 },
60 }
61 }
62}
63
64#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
65#[serde(rename_all = "lowercase")]
66pub enum HealthStatus {
67 Ready,
68 NotReady,
69}
70
71#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
74#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
75pub struct RuntimeConfig {
76 #[validate(range(min = 1))]
81 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
82 pub num_worker_threads: Option<usize>,
83
84 #[validate(range(min = 1))]
89 #[builder(default = "512")]
90 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
91 pub max_blocking_threads: usize,
92
93 #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
96 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
97 pub system_host: String,
98
99 #[builder(default = "DEFAULT_SYSTEM_PORT")]
105 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
106 pub system_port: i16,
107
108 #[deprecated(
112 note = "Use system_port instead. Set DYN_SYSTEM_PORT to enable the system metrics server."
113 )]
114 #[builder(default = "false")]
115 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
116 pub system_enabled: bool,
117
118 #[builder(default = "HealthStatus::NotReady")]
121 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122 pub starting_health_status: HealthStatus,
123
124 #[builder(default = "vec![]")]
130 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
131 pub use_endpoint_health_status: Vec<String>,
132
133 #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
136 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
137 pub system_health_path: String,
138 #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
140 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
141 pub system_live_path: String,
142
143 #[builder(default = "None")]
147 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
148 pub compute_threads: Option<usize>,
149
150 #[builder(default = "Some(2 * 1024 * 1024)")]
154 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
155 pub compute_stack_size: Option<usize>,
156
157 #[builder(default = "\"compute\".to_string()")]
160 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
161 pub compute_thread_prefix: String,
162
163 #[builder(default = "false")]
166 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
167 pub health_check_enabled: bool,
168
169 #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
172 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
173 pub canary_wait_time_secs: u64,
174
175 #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
178 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
179 pub health_check_request_timeout_secs: u64,
180}
181
182impl fmt::Display for RuntimeConfig {
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 match self.num_worker_threads {
186 Some(val) => write!(f, "num_worker_threads={val}, ")?,
187 None => write!(f, "num_worker_threads=default (num_cores), ")?,
188 }
189
190 write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
191 write!(f, "system_host={}, ", self.system_host)?;
192 write!(f, "system_port={}, ", self.system_port)?;
193 write!(
194 f,
195 "use_endpoint_health_status={:?}",
196 self.use_endpoint_health_status
197 )?;
198 write!(
199 f,
200 "starting_health_status={:?}",
201 self.starting_health_status
202 )?;
203 write!(f, ", system_health_path={}", self.system_health_path)?;
204 write!(f, ", system_live_path={}", self.system_live_path)?;
205 write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
206 write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
207 write!(
208 f,
209 ", health_check_request_timeout_secs={}",
210 self.health_check_request_timeout_secs
211 )?;
212
213 Ok(())
214 }
215}
216
217impl RuntimeConfig {
218 pub fn builder() -> RuntimeConfigBuilder {
219 RuntimeConfigBuilder::default()
220 }
221
222 pub(crate) fn figment() -> Figment {
223 Figment::new()
224 .merge(Serialized::defaults(RuntimeConfig::default()))
225 .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
226 .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
227 .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
228 let full_key = format!("DYN_RUNTIME_{}", k.as_str());
229 match std::env::var(&full_key) {
231 Ok(v) if !v.is_empty() => Some(k.into()),
232 _ => None,
233 }
234 }))
235 .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
236 let full_key = format!("DYN_SYSTEM_{}", k.as_str());
237 match std::env::var(&full_key) {
239 Ok(v) if !v.is_empty() => {
240 let mapped_key = match k.as_str() {
242 "HOST" => "system_host",
243 "PORT" => "system_port",
244 "ENABLED" => "system_enabled",
245 "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
246 "STARTING_HEALTH_STATUS" => "starting_health_status",
247 "HEALTH_PATH" => "system_health_path",
248 "LIVE_PATH" => "system_live_path",
249 _ => k.as_str(),
250 };
251 Some(mapped_key.into())
252 }
253 _ => None,
254 }
255 }))
256 .merge(Env::prefixed("DYN_COMPUTE_").filter_map(|k| {
257 let full_key = format!("DYN_COMPUTE_{}", k.as_str());
258 match std::env::var(&full_key) {
260 Ok(v) if !v.is_empty() => {
261 let mapped_key = match k.as_str() {
263 "THREADS" => "compute_threads",
264 "STACK_SIZE" => "compute_stack_size",
265 "THREAD_PREFIX" => "compute_thread_prefix",
266 _ => k.as_str(),
267 };
268 Some(mapped_key.into())
269 }
270 _ => None,
271 }
272 }))
273 .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
274 let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
275 match std::env::var(&full_key) {
277 Ok(v) if !v.is_empty() => {
278 let mapped_key = match k.as_str() {
280 "ENABLED" => "health_check_enabled",
281 "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
282 _ => k.as_str(),
283 };
284 Some(mapped_key.into())
285 }
286 _ => None,
287 }
288 }))
289 .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
290 let full_key = format!("DYN_CANARY_{}", k.as_str());
291 match std::env::var(&full_key) {
293 Ok(v) if !v.is_empty() => {
294 let mapped_key = match k.as_str() {
296 "WAIT_TIME" => "canary_wait_time_secs",
297 _ => k.as_str(),
298 };
299 Some(mapped_key.into())
300 }
301 _ => None,
302 }
303 }))
304 }
305
306 pub fn from_settings() -> Result<RuntimeConfig> {
315 use environment_names::runtime::system as env_system;
316 if std::env::var(env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS).is_ok() {
318 tracing::warn!(
319 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
320 System health is now determined by endpoints that register with health check payloads. \
321 Please update your configuration to register health check payloads directly on endpoints."
322 );
323 }
324
325 if std::env::var(env_system::DYN_SYSTEM_ENABLED).is_ok() {
326 tracing::warn!(
327 "DYN_SYSTEM_ENABLED is deprecated. \
328 System metrics server is now controlled solely by DYN_SYSTEM_PORT. \
329 Set DYN_SYSTEM_PORT to a positive value to enable the server, or set to -1 to disable (default)."
330 );
331 }
332
333 let config: RuntimeConfig = Self::figment().extract()?;
334 config.validate()?;
335 Ok(config)
336 }
337
338 pub fn system_server_enabled(&self) -> bool {
343 self.system_port >= 0
344 }
345
346 pub fn single_threaded() -> Self {
347 RuntimeConfig {
348 num_worker_threads: Some(1),
349 max_blocking_threads: 1,
350 system_host: DEFAULT_SYSTEM_HOST.to_string(),
351 system_port: DEFAULT_SYSTEM_PORT,
352 #[allow(deprecated)]
353 system_enabled: false,
354 starting_health_status: HealthStatus::NotReady,
355 use_endpoint_health_status: vec![],
356 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
357 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
358 compute_threads: Some(1),
359 compute_stack_size: Some(2 * 1024 * 1024),
360 compute_thread_prefix: "compute".to_string(),
361 health_check_enabled: false,
362 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
363 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
364 }
365 }
366
367 pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
369 let mut builder = tokio::runtime::Builder::new_multi_thread();
370 builder
371 .worker_threads(
372 self.num_worker_threads
373 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
374 )
375 .max_blocking_threads(self.max_blocking_threads)
376 .enable_all();
377 if env_is_truthy(environment_names::runtime::DYN_ENABLE_POLL_HISTOGRAM) {
378 tracing::info!(
379 "Tokio poll-time histogram enabled (DYN_ENABLE_POLL_HISTOGRAM); \
380 expect ~2× Instant::now() overhead per task poll"
381 );
382 builder.enable_metrics_poll_time_histogram();
383 }
384 builder.build()
385 }
386}
387
388impl Default for RuntimeConfig {
389 fn default() -> Self {
390 let num_cores = std::thread::available_parallelism().unwrap().get();
391 Self {
392 num_worker_threads: Some(num_cores),
393 max_blocking_threads: num_cores,
394 system_host: DEFAULT_SYSTEM_HOST.to_string(),
395 system_port: DEFAULT_SYSTEM_PORT,
396 #[allow(deprecated)]
397 system_enabled: false,
398 starting_health_status: HealthStatus::NotReady,
399 use_endpoint_health_status: vec![],
400 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
401 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
402 compute_threads: None,
403 compute_stack_size: Some(2 * 1024 * 1024),
404 compute_thread_prefix: "compute".to_string(),
405 health_check_enabled: false,
406 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
407 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
408 }
409 }
410}
411
412impl RuntimeConfigBuilder {
413 pub fn build(&self) -> Result<RuntimeConfig> {
415 let config = self.build_internal()?;
416 config.validate()?;
417 Ok(config)
418 }
419}
420
421pub fn is_truthy(val: &str) -> bool {
426 matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
427}
428
429pub fn parse_bool(val: &str) -> anyhow::Result<bool> {
430 if is_truthy(val) {
431 Ok(true)
432 } else if is_falsey(val) {
433 Ok(false)
434 } else {
435 anyhow::bail!(
436 "Invalid boolean value: '{}'. Expected one of: true/false, 1/0, on/off, yes/no",
437 val
438 )
439 }
440}
441
442pub fn is_falsey(val: &str) -> bool {
447 matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
448}
449
450pub fn env_is_truthy(env: &str) -> bool {
452 match std::env::var(env) {
453 Ok(val) => is_truthy(val.as_str()),
454 Err(_) => false,
455 }
456}
457
458pub fn env_is_falsey(env: &str) -> bool {
460 match std::env::var(env) {
461 Ok(val) => is_falsey(val.as_str()),
462 Err(_) => false,
463 }
464}
465
466pub fn jsonl_logging_enabled() -> bool {
469 env_is_truthy(environment_names::logging::DYN_LOGGING_JSONL)
470}
471
472pub fn disable_ansi_logging() -> bool {
475 env_is_truthy(environment_names::logging::DYN_SDK_DISABLE_ANSI_LOGGING)
476}
477
478pub fn use_local_timezone() -> bool {
481 env_is_truthy(environment_names::logging::DYN_LOG_USE_LOCAL_TZ)
482}
483
484pub fn span_events_enabled() -> bool {
486 env_is_truthy(environment_names::logging::DYN_LOGGING_SPAN_EVENTS)
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492
493 #[test]
494 fn test_runtime_config_with_env_vars() -> Result<()> {
495 use environment_names::runtime;
496 temp_env::with_vars(
497 vec![
498 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("24")),
499 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("32")),
500 ],
501 || {
502 let config = RuntimeConfig::from_settings()?;
503 assert_eq!(config.num_worker_threads, Some(24));
504 assert_eq!(config.max_blocking_threads, 32);
505 Ok(())
506 },
507 )
508 }
509
510 #[test]
511 fn test_runtime_config_defaults() -> Result<()> {
512 use environment_names::runtime;
513 temp_env::with_vars(
514 vec![
515 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, None::<&str>),
516 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("")),
517 ],
518 || {
519 let config = RuntimeConfig::from_settings()?;
520
521 let default_config = RuntimeConfig::default();
522 assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
523 assert_eq!(
524 config.max_blocking_threads,
525 default_config.max_blocking_threads
526 );
527 Ok(())
528 },
529 )
530 }
531
532 #[test]
533 fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
534 use environment_names::runtime;
535 temp_env::with_vars(
536 vec![
537 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("0")),
538 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("0")),
539 ],
540 || {
541 let result = RuntimeConfig::from_settings();
542 assert!(result.is_err());
543 if let Err(e) = result {
544 assert!(
545 e.to_string()
546 .contains("num_worker_threads: Validation error")
547 );
548 assert!(
549 e.to_string()
550 .contains("max_blocking_threads: Validation error")
551 );
552 }
553 Ok(())
554 },
555 )
556 }
557
558 #[test]
559 fn test_runtime_config_system_server_env_vars() -> Result<()> {
560 use environment_names::runtime::system;
561 temp_env::with_vars(
562 vec![
563 (system::DYN_SYSTEM_HOST, Some("127.0.0.1")),
564 (system::DYN_SYSTEM_PORT, Some("9090")),
565 ],
566 || {
567 let config = RuntimeConfig::from_settings()?;
568 assert_eq!(config.system_host, "127.0.0.1");
569 assert_eq!(config.system_port, 9090);
570 Ok(())
571 },
572 )
573 }
574
575 #[test]
576 fn test_system_server_disabled_by_default() {
577 use environment_names::runtime::system;
578 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, None::<&str>)], || {
579 let config = RuntimeConfig::from_settings().unwrap();
580 assert!(!config.system_server_enabled());
581 assert_eq!(config.system_port, -1);
582 });
583 }
584
585 #[test]
586 fn test_system_server_disabled_with_negative_port() {
587 use environment_names::runtime::system;
588 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("-1"))], || {
589 let config = RuntimeConfig::from_settings().unwrap();
590 assert!(!config.system_server_enabled());
591 assert_eq!(config.system_port, -1);
592 });
593 }
594
595 #[test]
596 fn test_system_server_enabled_with_port() {
597 use environment_names::runtime::system;
598 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("9527"))], || {
599 let config = RuntimeConfig::from_settings().unwrap();
600 assert!(config.system_server_enabled());
601 assert_eq!(config.system_port, 9527);
602 });
603 }
604
605 #[test]
606 fn test_system_server_starting_health_status_ready() {
607 use environment_names::runtime::system;
608 temp_env::with_vars(
609 vec![(system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready"))],
610 || {
611 let config = RuntimeConfig::from_settings().unwrap();
612 assert!(config.starting_health_status == HealthStatus::Ready);
613 },
614 );
615 }
616
617 #[test]
618 fn test_system_use_endpoint_health_status() {
619 use environment_names::runtime::system;
620 temp_env::with_vars(
621 vec![(
622 system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
623 Some("[\"ready\"]"),
624 )],
625 || {
626 let config = RuntimeConfig::from_settings().unwrap();
627 assert!(config.use_endpoint_health_status == vec!["ready"]);
628 },
629 );
630 }
631
632 #[test]
633 fn test_system_health_endpoint_path_default() {
634 use environment_names::runtime::system;
635 temp_env::with_vars(vec![(system::DYN_SYSTEM_HEALTH_PATH, None::<&str>)], || {
636 let config = RuntimeConfig::from_settings().unwrap();
637 assert_eq!(
638 config.system_health_path,
639 DEFAULT_SYSTEM_HEALTH_PATH.to_string()
640 );
641 });
642
643 temp_env::with_vars(vec![(system::DYN_SYSTEM_LIVE_PATH, None::<&str>)], || {
644 let config = RuntimeConfig::from_settings().unwrap();
645 assert_eq!(
646 config.system_live_path,
647 DEFAULT_SYSTEM_LIVE_PATH.to_string()
648 );
649 });
650 }
651
652 #[test]
653 fn test_system_health_endpoint_path_custom() {
654 use environment_names::runtime::system;
655 temp_env::with_vars(
656 vec![(system::DYN_SYSTEM_HEALTH_PATH, Some("/custom/health"))],
657 || {
658 let config = RuntimeConfig::from_settings().unwrap();
659 assert_eq!(config.system_health_path, "/custom/health");
660 },
661 );
662
663 temp_env::with_vars(
664 vec![(system::DYN_SYSTEM_LIVE_PATH, Some("/custom/live"))],
665 || {
666 let config = RuntimeConfig::from_settings().unwrap();
667 assert_eq!(config.system_live_path, "/custom/live");
668 },
669 );
670 }
671
672 #[test]
673 fn test_is_truthy_and_falsey() {
674 assert!(is_truthy("1"));
676 assert!(is_truthy("true"));
677 assert!(is_truthy("TRUE"));
678 assert!(is_truthy("on"));
679 assert!(is_truthy("yes"));
680
681 assert!(is_falsey("0"));
683 assert!(is_falsey("false"));
684 assert!(is_falsey("FALSE"));
685 assert!(is_falsey("off"));
686 assert!(is_falsey("no"));
687
688 assert!(!is_truthy("0"));
690 assert!(!is_falsey("1"));
691
692 temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
694 assert!(env_is_truthy("TEST_TRUTHY"));
695 assert!(!env_is_falsey("TEST_TRUTHY"));
696 });
697
698 temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
699 assert!(!env_is_truthy("TEST_FALSEY"));
700 assert!(env_is_falsey("TEST_FALSEY"));
701 });
702
703 temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
705 assert!(!env_is_truthy("TEST_MISSING"));
706 assert!(!env_is_falsey("TEST_MISSING"));
707 });
708 }
709}