1use super::Result;
5use derive_builder::Builder;
6use figment::{
7 Figment,
8 providers::{Env, Format, Serialized, Toml},
9};
10use serde::{Deserialize, Serialize};
11use std::fmt;
12use validator::Validate;
13
14const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16
17const DEFAULT_SYSTEM_PORT: u16 = 9090;
19
20const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
22const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
23
24pub const DEFAULT_CANARY_WAIT_TIME_SECS: u64 = 10;
27pub const DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS: u64 = 3;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct WorkerConfig {
32 pub graceful_shutdown_timeout: u64,
34}
35
36impl WorkerConfig {
37 pub fn from_settings() -> Self {
40 Figment::new()
42 .merge(Serialized::defaults(Self::default()))
43 .merge(Env::prefixed("DYN_WORKER_"))
44 .extract()
45 .unwrap() }
47}
48
49impl Default for WorkerConfig {
50 fn default() -> Self {
51 WorkerConfig {
52 graceful_shutdown_timeout: if cfg!(debug_assertions) {
53 1 } else {
55 30 },
57 }
58 }
59}
60
61#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
62#[serde(rename_all = "lowercase")]
63pub enum HealthStatus {
64 Ready,
65 NotReady,
66}
67
68#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
71#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
72pub struct RuntimeConfig {
73 #[validate(range(min = 1))]
78 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
79 pub num_worker_threads: Option<usize>,
80
81 #[validate(range(min = 1))]
86 #[builder(default = "512")]
87 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
88 pub max_blocking_threads: usize,
89
90 #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
93 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
94 pub system_host: String,
95
96 #[builder(default = "DEFAULT_SYSTEM_PORT")]
100 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
101 pub system_port: u16,
102
103 #[builder(default = "false")]
106 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
107 pub system_enabled: bool,
108
109 #[builder(default = "HealthStatus::NotReady")]
112 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
113 pub starting_health_status: HealthStatus,
114
115 #[builder(default = "vec![]")]
121 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
122 pub use_endpoint_health_status: Vec<String>,
123
124 #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
127 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
128 pub system_health_path: String,
129 #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
131 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
132 pub system_live_path: String,
133
134 #[builder(default = "None")]
138 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
139 pub compute_threads: Option<usize>,
140
141 #[builder(default = "Some(2 * 1024 * 1024)")]
145 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
146 pub compute_stack_size: Option<usize>,
147
148 #[builder(default = "\"compute\".to_string()")]
151 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
152 pub compute_thread_prefix: String,
153
154 #[builder(default = "false")]
157 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
158 pub health_check_enabled: bool,
159
160 #[builder(default = "DEFAULT_CANARY_WAIT_TIME_SECS")]
163 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
164 pub canary_wait_time_secs: u64,
165
166 #[builder(default = "DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS")]
169 #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
170 pub health_check_request_timeout_secs: u64,
171}
172
173impl fmt::Display for RuntimeConfig {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 match self.num_worker_threads {
177 Some(val) => write!(f, "num_worker_threads={val}, ")?,
178 None => write!(f, "num_worker_threads=default (num_cores), ")?,
179 }
180
181 write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
182 write!(f, "system_host={}, ", self.system_host)?;
183 write!(f, "system_port={}, ", self.system_port)?;
184 write!(f, "system_enabled={}", self.system_enabled)?;
185 write!(
186 f,
187 "use_endpoint_health_status={:?}",
188 self.use_endpoint_health_status
189 )?;
190 write!(
191 f,
192 "starting_health_status={:?}",
193 self.starting_health_status
194 )?;
195 write!(f, ", system_health_path={}", self.system_health_path)?;
196 write!(f, ", system_live_path={}", self.system_live_path)?;
197 write!(f, ", health_check_enabled={}", self.health_check_enabled)?;
198 write!(f, ", canary_wait_time_secs={}", self.canary_wait_time_secs)?;
199 write!(
200 f,
201 ", health_check_request_timeout_secs={}",
202 self.health_check_request_timeout_secs
203 )?;
204
205 Ok(())
206 }
207}
208
209impl RuntimeConfig {
210 pub fn builder() -> RuntimeConfigBuilder {
211 RuntimeConfigBuilder::default()
212 }
213
214 pub(crate) fn figment() -> Figment {
215 Figment::new()
216 .merge(Serialized::defaults(RuntimeConfig::default()))
217 .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
218 .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
219 .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
220 let full_key = format!("DYN_RUNTIME_{}", k.as_str());
221 match std::env::var(&full_key) {
223 Ok(v) if !v.is_empty() => Some(k.into()),
224 _ => None,
225 }
226 }))
227 .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
228 let full_key = format!("DYN_SYSTEM_{}", k.as_str());
229 match std::env::var(&full_key) {
231 Ok(v) if !v.is_empty() => {
232 let mapped_key = match k.as_str() {
234 "HOST" => "system_host",
235 "PORT" => "system_port",
236 "ENABLED" => "system_enabled",
237 "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
238 "STARTING_HEALTH_STATUS" => "starting_health_status",
239 "HEALTH_PATH" => "system_health_path",
240 "LIVE_PATH" => "system_live_path",
241 _ => k.as_str(),
242 };
243 Some(mapped_key.into())
244 }
245 _ => None,
246 }
247 }))
248 .merge(Env::prefixed("DYN_COMPUTE_").filter_map(|k| {
249 let full_key = format!("DYN_COMPUTE_{}", k.as_str());
250 match std::env::var(&full_key) {
252 Ok(v) if !v.is_empty() => {
253 let mapped_key = match k.as_str() {
255 "THREADS" => "compute_threads",
256 "STACK_SIZE" => "compute_stack_size",
257 "THREAD_PREFIX" => "compute_thread_prefix",
258 _ => k.as_str(),
259 };
260 Some(mapped_key.into())
261 }
262 _ => None,
263 }
264 }))
265 .merge(Env::prefixed("DYN_HEALTH_CHECK_").filter_map(|k| {
266 let full_key = format!("DYN_HEALTH_CHECK_{}", k.as_str());
267 match std::env::var(&full_key) {
269 Ok(v) if !v.is_empty() => {
270 let mapped_key = match k.as_str() {
272 "ENABLED" => "health_check_enabled",
273 "REQUEST_TIMEOUT" => "health_check_request_timeout_secs",
274 _ => k.as_str(),
275 };
276 Some(mapped_key.into())
277 }
278 _ => None,
279 }
280 }))
281 .merge(Env::prefixed("DYN_CANARY_").filter_map(|k| {
282 let full_key = format!("DYN_CANARY_{}", k.as_str());
283 match std::env::var(&full_key) {
285 Ok(v) if !v.is_empty() => {
286 let mapped_key = match k.as_str() {
288 "WAIT_TIME" => "canary_wait_time_secs",
289 _ => k.as_str(),
290 };
291 Some(mapped_key.into())
292 }
293 _ => None,
294 }
295 }))
296 }
297
298 pub fn from_settings() -> Result<RuntimeConfig> {
307 if std::env::var("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS").is_ok() {
309 tracing::warn!(
310 "DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS is deprecated and no longer used. \
311 System health is now determined by endpoints that register with health check payloads. \
312 Please update your configuration to register health check payloads directly on endpoints."
313 );
314 }
315
316 let config: RuntimeConfig = Self::figment().extract()?;
317 config.validate()?;
318 Ok(config)
319 }
320
321 pub fn system_server_enabled(&self) -> bool {
324 self.system_enabled
325 }
326
327 pub fn single_threaded() -> Self {
328 RuntimeConfig {
329 num_worker_threads: Some(1),
330 max_blocking_threads: 1,
331 system_host: DEFAULT_SYSTEM_HOST.to_string(),
332 system_port: DEFAULT_SYSTEM_PORT,
333 system_enabled: false,
334 starting_health_status: HealthStatus::NotReady,
335 use_endpoint_health_status: vec![],
336 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
337 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
338 compute_threads: Some(1),
339 compute_stack_size: Some(2 * 1024 * 1024),
340 compute_thread_prefix: "compute".to_string(),
341 health_check_enabled: false,
342 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
343 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
344 }
345 }
346
347 pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
349 tokio::runtime::Builder::new_multi_thread()
350 .worker_threads(
351 self.num_worker_threads
352 .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
353 )
354 .max_blocking_threads(self.max_blocking_threads)
355 .enable_all()
356 .build()
357 }
358}
359
360impl Default for RuntimeConfig {
361 fn default() -> Self {
362 let num_cores = std::thread::available_parallelism().unwrap().get();
363 Self {
364 num_worker_threads: Some(num_cores),
365 max_blocking_threads: num_cores,
366 system_host: DEFAULT_SYSTEM_HOST.to_string(),
367 system_port: DEFAULT_SYSTEM_PORT,
368 system_enabled: false,
369 starting_health_status: HealthStatus::NotReady,
370 use_endpoint_health_status: vec![],
371 system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
372 system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
373 compute_threads: None,
374 compute_stack_size: Some(2 * 1024 * 1024),
375 compute_thread_prefix: "compute".to_string(),
376 health_check_enabled: false,
377 canary_wait_time_secs: DEFAULT_CANARY_WAIT_TIME_SECS,
378 health_check_request_timeout_secs: DEFAULT_HEALTH_CHECK_REQUEST_TIMEOUT_SECS,
379 }
380 }
381}
382
383impl RuntimeConfigBuilder {
384 pub fn build(&self) -> Result<RuntimeConfig> {
386 let config = self.build_internal()?;
387 config.validate()?;
388 Ok(config)
389 }
390}
391
392pub fn is_truthy(val: &str) -> bool {
397 matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
398}
399
400pub fn is_falsey(val: &str) -> bool {
405 matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
406}
407
408pub fn env_is_truthy(env: &str) -> bool {
410 match std::env::var(env) {
411 Ok(val) => is_truthy(val.as_str()),
412 Err(_) => false,
413 }
414}
415
416pub fn env_is_falsey(env: &str) -> bool {
418 match std::env::var(env) {
419 Ok(val) => is_falsey(val.as_str()),
420 Err(_) => false,
421 }
422}
423
424pub fn jsonl_logging_enabled() -> bool {
427 env_is_truthy("DYN_LOGGING_JSONL")
428}
429
430pub fn disable_ansi_logging() -> bool {
433 env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
434}
435
436pub fn use_local_timezone() -> bool {
439 env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445
446 #[test]
447 fn test_runtime_config_with_env_vars() -> Result<()> {
448 temp_env::with_vars(
449 vec![
450 ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
451 ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
452 ],
453 || {
454 let config = RuntimeConfig::from_settings()?;
455 assert_eq!(config.num_worker_threads, Some(24));
456 assert_eq!(config.max_blocking_threads, 32);
457 Ok(())
458 },
459 )
460 }
461
462 #[test]
463 fn test_runtime_config_defaults() -> Result<()> {
464 temp_env::with_vars(
465 vec![
466 ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
467 ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
468 ],
469 || {
470 let config = RuntimeConfig::from_settings()?;
471
472 let default_config = RuntimeConfig::default();
473 assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
474 assert_eq!(
475 config.max_blocking_threads,
476 default_config.max_blocking_threads
477 );
478 Ok(())
479 },
480 )
481 }
482
483 #[test]
484 fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
485 temp_env::with_vars(
486 vec![
487 ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
488 ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
489 ],
490 || {
491 let result = RuntimeConfig::from_settings();
492 assert!(result.is_err());
493 if let Err(e) = result {
494 assert!(
495 e.to_string()
496 .contains("num_worker_threads: Validation error")
497 );
498 assert!(
499 e.to_string()
500 .contains("max_blocking_threads: Validation error")
501 );
502 }
503 Ok(())
504 },
505 )
506 }
507
508 #[test]
509 fn test_runtime_config_system_server_env_vars() -> Result<()> {
510 temp_env::with_vars(
511 vec![
512 ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
513 ("DYN_SYSTEM_PORT", Some("9090")),
514 ],
515 || {
516 let config = RuntimeConfig::from_settings()?;
517 assert_eq!(config.system_host, "127.0.0.1");
518 assert_eq!(config.system_port, 9090);
519 Ok(())
520 },
521 )
522 }
523
524 #[test]
525 fn test_system_server_enabled_by_default() {
526 temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
527 let config = RuntimeConfig::from_settings().unwrap();
528 assert!(!config.system_server_enabled());
529 });
530 }
531
532 #[test]
533 fn test_system_server_disabled_explicitly() {
534 temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
535 let config = RuntimeConfig::from_settings().unwrap();
536 assert!(!config.system_server_enabled());
537 });
538 }
539
540 #[test]
541 fn test_system_server_enabled_explicitly() {
542 temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
543 let config = RuntimeConfig::from_settings().unwrap();
544 assert!(config.system_server_enabled());
545 });
546 }
547
548 #[test]
549 fn test_system_server_enabled_by_port() {
550 temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
551 let config = RuntimeConfig::from_settings().unwrap();
552 assert!(!config.system_server_enabled());
553 assert_eq!(config.system_port, 8080);
554 });
555 }
556
557 #[test]
558 fn test_system_server_starting_health_status_ready() {
559 temp_env::with_vars(
560 vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
561 || {
562 let config = RuntimeConfig::from_settings().unwrap();
563 assert!(config.starting_health_status == HealthStatus::Ready);
564 },
565 );
566 }
567
568 #[test]
569 fn test_system_use_endpoint_health_status() {
570 temp_env::with_vars(
571 vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
572 || {
573 let config = RuntimeConfig::from_settings().unwrap();
574 assert!(config.use_endpoint_health_status == vec!["ready"]);
575 },
576 );
577 }
578
579 #[test]
580 fn test_system_health_endpoint_path_default() {
581 temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
582 let config = RuntimeConfig::from_settings().unwrap();
583 assert_eq!(
584 config.system_health_path,
585 DEFAULT_SYSTEM_HEALTH_PATH.to_string()
586 );
587 });
588
589 temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
590 let config = RuntimeConfig::from_settings().unwrap();
591 assert_eq!(
592 config.system_live_path,
593 DEFAULT_SYSTEM_LIVE_PATH.to_string()
594 );
595 });
596 }
597
598 #[test]
599 fn test_system_health_endpoint_path_custom() {
600 temp_env::with_vars(
601 vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
602 || {
603 let config = RuntimeConfig::from_settings().unwrap();
604 assert_eq!(config.system_health_path, "/custom/health");
605 },
606 );
607
608 temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
609 let config = RuntimeConfig::from_settings().unwrap();
610 assert_eq!(config.system_live_path, "/custom/live");
611 });
612 }
613
614 #[test]
615 fn test_is_truthy_and_falsey() {
616 assert!(is_truthy("1"));
618 assert!(is_truthy("true"));
619 assert!(is_truthy("TRUE"));
620 assert!(is_truthy("on"));
621 assert!(is_truthy("yes"));
622
623 assert!(is_falsey("0"));
625 assert!(is_falsey("false"));
626 assert!(is_falsey("FALSE"));
627 assert!(is_falsey("off"));
628 assert!(is_falsey("no"));
629
630 assert!(!is_truthy("0"));
632 assert!(!is_falsey("1"));
633
634 temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
636 assert!(env_is_truthy("TEST_TRUTHY"));
637 assert!(!env_is_falsey("TEST_TRUTHY"));
638 });
639
640 temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
641 assert!(!env_is_truthy("TEST_FALSEY"));
642 assert!(env_is_falsey("TEST_FALSEY"));
643 });
644
645 temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
647 assert!(!env_is_truthy("TEST_MISSING"));
648 assert!(!env_is_falsey("TEST_MISSING"));
649 });
650 }
651}