1use anyhow::Result;
5use derive_builder::Builder;
6use figment::{
7 Figment,
8 providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use std::sync::OnceLock;
13use validator::Validate;
14
15pub mod environment_names;
16
17const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
19
20const DEFAULT_SYSTEM_PORT: i16 = -1;
22
23const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
25const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
26
27pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
30pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WorkerConfig {
35 pub graceful_shutdown_timeout: u64,
37}
38
39impl WorkerConfig {
40 pub fn from_settings() -> Self {
43 Figment::new()
45 .merge(Serialized::defaults(Self::default()))
46 .merge(Env::prefixed("DYN_WORKER_"))
47 .extract()
48 .unwrap() }
50}
51
52impl Default for WorkerConfig {
53 fn default() -> Self {
54 WorkerConfig {
55 graceful_shutdown_timeout: if cfg!(debug_assertions) {
56 1 } else {
58 30 },
60 }
61 }
62}
63
64#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
65#[serde(rename_all = "lowercase")]
66pub enum HealthStatus {
67 Ready,
68 NotReady,
69}
70
71#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
74#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
75pub struct RuntimeConfig {
76 #[validate(range(min = 1))]
81 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
82 pub num_worker_threads: Option<usize>,
83
84 #[validate(range(min = 1))]
89 #[builder(default = "512")]
90 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
91 pub max_blocking_threads: usize,
92
93 #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
96 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
97 pub system_host: String,
98
99 #[builder(default = "DEFAULT_SYSTEM_PORT")]
105 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
106 pub system_port: i16,
107
108 #[deprecated(
112 note = "Use system_port instead. Set DYN_SYSTEM_PORT to enable the system metrics server."
113 )]
114 #[builder(default = "false")]
115 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
116 pub system_enabled: bool,
117
118 #[builder(default = "HealthStatus::NotReady")]
121 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122 pub starting_health_status: HealthStatus,
123
124 #[builder(default = "vec![]")]
130 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
131 pub use_endpoint_health_status: Vec<String>,
132
133 #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
136 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
137 pub system_health_path: String,
138 #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
140 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
141 pub system_live_path: String,
142
143 #[builder(default = "None")]
147 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
148 pub compute_threads: Option<usize>,
149
150 #[builder(default = "Some(2 * 1024 * 1024)")]
154 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
155 pub compute_stack_size: Option<usize>,
156
157 #[builder(default = "\"compute\".to_string()")]
160 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
161 pub compute_thread_prefix: String,
162
163 #[builder(default = "false")]
166 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
167 pub health_check_enabled: bool,
168
169 #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
172 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
173 pub canary_wait_time_secs: u64,
174
175 #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
178 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
179 pub health_check_request_timeout_secs: u64,
180}
181
182impl fmt::Display for RuntimeConfig {
183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
184 match self.num_worker_threads {
186 Some(val) => write!(f, "num_worker_threads={val}, ")?,
187 None => write!(f, "num_worker_threads=default (num_cores), ")?,
188 }
189
190 write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
191 write!(f, "system_host={}, ", self.system_host)?;
192 write!(f, "system_port={}, ", self.system_port)?;
193 write!(
194 f,
195 "use_endpoint_health_status={:?}",
196 self.use_endpoint_health_status
197 )?;
198 write!(
199 f,
200 "starting_health_status={:?}",
201 self.starting_health_status
202 )?;
203 write!(f, ", system_health_path={}", self.system_health_path)?;
204 write!(f, ", system_live_path={}", self.system_live_path)?;
205 write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
206 write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
207 write!(
208 f,
209 ", health_check_request_timeout_secs={}",
210 self.health_check_request_timeout_secs
211 )?;
212
213 Ok(())
214 }
215}
216
217impl RuntimeConfig {
218 pub fn builder() -> RuntimeConfigBuilder {
219 RuntimeConfigBuilder::default()
220 }
221
222 pub(crate) fn figment() -> Figment {
223 Figment::new()
224 .merge(Serialized::defaults(RuntimeConfig::default()))
225 .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
226 .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
227 .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
228 let full_key = format!("DYN_RUNTIME_{}", k.as_str());
229 match std::env::var(&full_key) {
231 Ok(v) if !v.is_empty() => Some(k.into()),
232 _ => None,
233 }
234 }))
235 .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
236 let full_key = format!("DYN_SYSTEM_{}", k.as_str());
237 match std::env::var(&full_key) {
239 Ok(v) if !v.is_empty() => {
240 let mapped_key = match k.as_str() {
242 "HOST" => "system_host",
243 "PORT" => "system_port",
244 "ENABLED" => "system_enabled",
245 "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
246 "STARTING_HEALTH_STATUS" => "starting_health_status",
247 "HEALTH_PATH" => "system_health_path",
248 "LIVE_PATH" => "system_live_path",
249 _ => k.as_str(),
250 };
251 Some(mapped_key.into())
252 }
253 _ => None,
254 }
255 }))
256 .merge(Env::prefixed("DYN_COMPUTE_").filter_map(|k| {
257 let full_key = format!("DYN_COMPUTE_{}", k.as_str());
258 match std::env::var(&full_key) {
260 Ok(v) if !v.is_empty() => {
261 let mapped_key = match k.as_str() {
263 "THREADS" => "compute_threads",
264 "STACK_SIZE" => "compute_stack_size",
265 "THREAD_PREFIX" => "compute_thread_prefix",
266 _ => k.as_str(),
267 };
268 Some(mapped_key.into())
269 }
270 _ => None,
271 }
272 }))
273 .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
274 let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
275 match std::env::var(&full_key) {
277 Ok(v) if !v.is_empty() => {
278 let mapped_key = match k.as_str() {
280 "ENABLED" => "health_check_enabled",
281 "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
282 _ => k.as_str(),
283 };
284 Some(mapped_key.into())
285 }
286 _ => None,
287 }
288 }))
289 .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
290 let full_key = format!("DYN_CANARY_{}", k.as_str());
291 match std::env::var(&full_key) {
293 Ok(v) if !v.is_empty() => {
294 let mapped_key = match k.as_str() {
296 "WAIT_TIME" => "canary_wait_time_secs",
297 _ => k.as_str(),
298 };
299 Some(mapped_key.into())
300 }
301 _ => None,
302 }
303 }))
304 }
305
306 pub fn from_settings() -> Result<RuntimeConfig> {
315 use environment_names::runtime::system as env_system;
316 if std::env::var(env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS).is_ok() {
318 tracing::warn!(
319 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
320 System health is now determined by endpoints that register with health check payloads. \
321 Please update your configuration to register health check payloads directly on endpoints."
322 );
323 }
324
325 if std::env::var(env_system::DYN_SYSTEM_ENABLED).is_ok() {
326 tracing::warn!(
327 "DYN_SYSTEM_ENABLED is deprecated. \
328 System metrics server is now controlled solely by DYN_SYSTEM_PORT. \
329 Set DYN_SYSTEM_PORT to a positive value to enable the server, or set to -1 to disable (default)."
330 );
331 }
332
333 let config: RuntimeConfig = Self::figment().extract()?;
334 config.validate()?;
335 Ok(config)
336 }
337
338 pub fn system_server_enabled(&self) -> bool {
343 self.system_port >= 0
344 }
345
346 pub fn single_threaded() -> Self {
347 RuntimeConfig {
348 num_worker_threads: Some(1),
349 max_blocking_threads: 1,
350 system_host: DEFAULT_SYSTEM_HOST.to_string(),
351 system_port: DEFAULT_SYSTEM_PORT,
352 #[allow(deprecated)]
353 system_enabled: false,
354 starting_health_status: HealthStatus::NotReady,
355 use_endpoint_health_status: vec![],
356 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
357 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
358 compute_threads: Some(1),
359 compute_stack_size: Some(2 * 1024 * 1024),
360 compute_thread_prefix: "compute".to_string(),
361 health_check_enabled: false,
362 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
363 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
364 }
365 }
366
367 pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
369 tokio::runtime::Builder::new_multi_thread()
370 .worker_threads(
371 self.num_worker_threads
372 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
373 )
374 .max_blocking_threads(self.max_blocking_threads)
375 .enable_all()
376 .build()
377 }
378}
379
380impl Default for RuntimeConfig {
381 fn default() -> Self {
382 let num_cores = std::thread::available_parallelism().unwrap().get();
383 Self {
384 num_worker_threads: Some(num_cores),
385 max_blocking_threads: num_cores,
386 system_host: DEFAULT_SYSTEM_HOST.to_string(),
387 system_port: DEFAULT_SYSTEM_PORT,
388 #[allow(deprecated)]
389 system_enabled: false,
390 starting_health_status: HealthStatus::NotReady,
391 use_endpoint_health_status: vec![],
392 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
393 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
394 compute_threads: None,
395 compute_stack_size: Some(2 * 1024 * 1024),
396 compute_thread_prefix: "compute".to_string(),
397 health_check_enabled: false,
398 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
399 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
400 }
401 }
402}
403
404impl RuntimeConfigBuilder {
405 pub fn build(&self) -> Result<RuntimeConfig> {
407 let config = self.build_internal()?;
408 config.validate()?;
409 Ok(config)
410 }
411}
412
413pub fn is_truthy(val: &str) -> bool {
418 matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
419}
420
421pub fn parse_bool(val: &str) -> anyhow::Result<bool> {
422 if is_truthy(val) {
423 Ok(true)
424 } else if is_falsey(val) {
425 Ok(false)
426 } else {
427 anyhow::bail!(
428 "Invalid boolean value: '{}'. Expected one of: true/false, 1/0, on/off, yes/no",
429 val
430 )
431 }
432}
433
434pub fn is_falsey(val: &str) -> bool {
439 matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
440}
441
442pub fn env_is_truthy(env: &str) -> bool {
444 match std::env::var(env) {
445 Ok(val) => is_truthy(val.as_str()),
446 Err(_) => false,
447 }
448}
449
450pub fn env_is_falsey(env: &str) -> bool {
452 match std::env::var(env) {
453 Ok(val) => is_falsey(val.as_str()),
454 Err(_) => false,
455 }
456}
457
458pub fn jsonl_logging_enabled() -> bool {
461 env_is_truthy(environment_names::logging::DYN_LOGGING_JSONL)
462}
463
464pub fn disable_ansi_logging() -> bool {
467 env_is_truthy(environment_names::logging::DYN_SDK_DISABLE_ANSI_LOGGING)
468}
469
470pub fn use_local_timezone() -> bool {
473 env_is_truthy(environment_names::logging::DYN_LOG_USE_LOCAL_TZ)
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479
480 #[test]
481 fn test_runtime_config_with_env_vars() -> Result<()> {
482 use environment_names::runtime;
483 temp_env::with_vars(
484 vec![
485 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("24")),
486 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("32")),
487 ],
488 || {
489 let config = RuntimeConfig::from_settings()?;
490 assert_eq!(config.num_worker_threads, Some(24));
491 assert_eq!(config.max_blocking_threads, 32);
492 Ok(())
493 },
494 )
495 }
496
497 #[test]
498 fn test_runtime_config_defaults() -> Result<()> {
499 use environment_names::runtime;
500 temp_env::with_vars(
501 vec![
502 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, None::<&str>),
503 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("")),
504 ],
505 || {
506 let config = RuntimeConfig::from_settings()?;
507
508 let default_config = RuntimeConfig::default();
509 assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
510 assert_eq!(
511 config.max_blocking_threads,
512 default_config.max_blocking_threads
513 );
514 Ok(())
515 },
516 )
517 }
518
519 #[test]
520 fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
521 use environment_names::runtime;
522 temp_env::with_vars(
523 vec![
524 (runtime::DYN_RUNTIME_NUM_WORKER_THREADS, Some("0")),
525 (runtime::DYN_RUNTIME_MAX_BLOCKING_THREADS, Some("0")),
526 ],
527 || {
528 let result = RuntimeConfig::from_settings();
529 assert!(result.is_err());
530 if let Err(e) = result {
531 assert!(
532 e.to_string()
533 .contains("num_worker_threads: Validation error")
534 );
535 assert!(
536 e.to_string()
537 .contains("max_blocking_threads: Validation error")
538 );
539 }
540 Ok(())
541 },
542 )
543 }
544
545 #[test]
546 fn test_runtime_config_system_server_env_vars() -> Result<()> {
547 use environment_names::runtime::system;
548 temp_env::with_vars(
549 vec![
550 (system::DYN_SYSTEM_HOST, Some("127.0.0.1")),
551 (system::DYN_SYSTEM_PORT, Some("9090")),
552 ],
553 || {
554 let config = RuntimeConfig::from_settings()?;
555 assert_eq!(config.system_host, "127.0.0.1");
556 assert_eq!(config.system_port, 9090);
557 Ok(())
558 },
559 )
560 }
561
562 #[test]
563 fn test_system_server_disabled_by_default() {
564 use environment_names::runtime::system;
565 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, None::<&str>)], || {
566 let config = RuntimeConfig::from_settings().unwrap();
567 assert!(!config.system_server_enabled());
568 assert_eq!(config.system_port, -1);
569 });
570 }
571
572 #[test]
573 fn test_system_server_disabled_with_negative_port() {
574 use environment_names::runtime::system;
575 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("-1"))], || {
576 let config = RuntimeConfig::from_settings().unwrap();
577 assert!(!config.system_server_enabled());
578 assert_eq!(config.system_port, -1);
579 });
580 }
581
582 #[test]
583 fn test_system_server_enabled_with_port() {
584 use environment_names::runtime::system;
585 temp_env::with_vars(vec![(system::DYN_SYSTEM_PORT, Some("9527"))], || {
586 let config = RuntimeConfig::from_settings().unwrap();
587 assert!(config.system_server_enabled());
588 assert_eq!(config.system_port, 9527);
589 });
590 }
591
592 #[test]
593 fn test_system_server_starting_health_status_ready() {
594 use environment_names::runtime::system;
595 temp_env::with_vars(
596 vec![(system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready"))],
597 || {
598 let config = RuntimeConfig::from_settings().unwrap();
599 assert!(config.starting_health_status == HealthStatus::Ready);
600 },
601 );
602 }
603
604 #[test]
605 fn test_system_use_endpoint_health_status() {
606 use environment_names::runtime::system;
607 temp_env::with_vars(
608 vec![(
609 system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
610 Some("[\"ready\"]"),
611 )],
612 || {
613 let config = RuntimeConfig::from_settings().unwrap();
614 assert!(config.use_endpoint_health_status == vec!["ready"]);
615 },
616 );
617 }
618
619 #[test]
620 fn test_system_health_endpoint_path_default() {
621 use environment_names::runtime::system;
622 temp_env::with_vars(vec![(system::DYN_SYSTEM_HEALTH_PATH, None::<&str>)], || {
623 let config = RuntimeConfig::from_settings().unwrap();
624 assert_eq!(
625 config.system_health_path,
626 DEFAULT_SYSTEM_HEALTH_PATH.to_string()
627 );
628 });
629
630 temp_env::with_vars(vec![(system::DYN_SYSTEM_LIVE_PATH, None::<&str>)], || {
631 let config = RuntimeConfig::from_settings().unwrap();
632 assert_eq!(
633 config.system_live_path,
634 DEFAULT_SYSTEM_LIVE_PATH.to_string()
635 );
636 });
637 }
638
639 #[test]
640 fn test_system_health_endpoint_path_custom() {
641 use environment_names::runtime::system;
642 temp_env::with_vars(
643 vec![(system::DYN_SYSTEM_HEALTH_PATH, Some("/custom/health"))],
644 || {
645 let config = RuntimeConfig::from_settings().unwrap();
646 assert_eq!(config.system_health_path, "/custom/health");
647 },
648 );
649
650 temp_env::with_vars(
651 vec![(system::DYN_SYSTEM_LIVE_PATH, Some("/custom/live"))],
652 || {
653 let config = RuntimeConfig::from_settings().unwrap();
654 assert_eq!(config.system_live_path, "/custom/live");
655 },
656 );
657 }
658
659 #[test]
660 fn test_is_truthy_and_falsey() {
661 assert!(is_truthy("1"));
663 assert!(is_truthy("true"));
664 assert!(is_truthy("TRUE"));
665 assert!(is_truthy("on"));
666 assert!(is_truthy("yes"));
667
668 assert!(is_falsey("0"));
670 assert!(is_falsey("false"));
671 assert!(is_falsey("FALSE"));
672 assert!(is_falsey("off"));
673 assert!(is_falsey("no"));
674
675 assert!(!is_truthy("0"));
677 assert!(!is_falsey("1"));
678
679 temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
681 assert!(env_is_truthy("TEST_TRUTHY"));
682 assert!(!env_is_falsey("TEST_TRUTHY"));
683 });
684
685 temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
686 assert!(!env_is_truthy("TEST_FALSEY"));
687 assert!(env_is_falsey("TEST_FALSEY"));
688 });
689
690 temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
692 assert!(!env_is_truthy("TEST_MISSING"));
693 assert!(!env_is_falsey("TEST_MISSING"));
694 });
695 }
696}