1use std::error::Error;
9use std::fmt;
10
11use murk_arena::ArenaError;
12use murk_core::{FieldDef, FieldId, FieldSet};
13use murk_propagator::{validate_pipeline, PipelineError, Propagator};
14use murk_space::Space;
15
16#[derive(Clone, Debug)]
24pub struct BackoffConfig {
25 pub initial_max_skew: u64,
27 pub backoff_factor: f64,
29 pub max_skew_cap: u64,
31 pub decay_rate: u64,
33 pub rejection_rate_threshold: f64,
35}
36
37impl Default for BackoffConfig {
38 fn default() -> Self {
39 Self {
40 initial_max_skew: 2,
41 backoff_factor: 1.5,
42 max_skew_cap: 10,
43 decay_rate: 60,
44 rejection_rate_threshold: 0.20,
45 }
46 }
47}
48
49#[derive(Clone, Debug)]
56pub struct AsyncConfig {
57 pub worker_count: Option<usize>,
60 pub max_epoch_hold_ms: u64,
63 pub cancel_grace_ms: u64,
66}
67
68impl Default for AsyncConfig {
69 fn default() -> Self {
70 Self {
71 worker_count: None,
72 max_epoch_hold_ms: 100,
73 cancel_grace_ms: 10,
74 }
75 }
76}
77
78impl AsyncConfig {
79 pub fn resolved_worker_count(&self) -> usize {
84 match self.worker_count {
85 Some(n) => n.clamp(1, 64),
86 None => {
87 let cpus = std::thread::available_parallelism()
88 .map(|n| n.get())
89 .unwrap_or(4);
90 (cpus / 2).clamp(2, 16)
91 }
92 }
93 }
94}
95
96#[derive(Debug, PartialEq)]
100pub enum ConfigError {
101 Pipeline(PipelineError),
103 Arena(ArenaError),
105 EmptySpace,
107 NoFields,
109 RingBufferTooSmall {
111 configured: usize,
113 },
114 IngressQueueZero,
116 InvalidTickRate {
118 value: f64,
120 },
121 BackoffSkewExceedsCap {
123 initial: u64,
125 cap: u64,
127 },
128 BackoffInvalidFactor {
130 value: f64,
132 },
133 BackoffInvalidThreshold {
135 value: f64,
137 },
138 BackoffZeroDecayRate,
140 CellCountOverflow {
142 value: usize,
144 },
145 FieldCountOverflow {
147 value: usize,
149 },
150 InvalidField {
152 reason: String,
154 },
155 EngineRecoveryFailed,
157 ThreadSpawnFailed {
159 reason: String,
161 },
162}
163
164impl fmt::Display for ConfigError {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 match self {
167 Self::Pipeline(e) => write!(f, "pipeline: {e}"),
168 Self::Arena(e) => write!(f, "arena: {e}"),
169 Self::EmptySpace => write!(f, "space has zero cells"),
170 Self::NoFields => write!(f, "no fields registered"),
171 Self::RingBufferTooSmall { configured } => {
172 write!(f, "ring_buffer_size {configured} is below minimum of 2")
173 }
174 Self::IngressQueueZero => write!(f, "max_ingress_queue must be at least 1"),
175 Self::InvalidTickRate { value } => {
176 write!(f, "tick_rate_hz must be finite and positive, got {value}")
177 }
178 Self::BackoffSkewExceedsCap { initial, cap } => {
179 write!(f, "invalid backoff config: initial_max_skew ({initial}) exceeds max_skew_cap ({cap})")
180 }
181 Self::BackoffInvalidFactor { value } => {
182 write!(
183 f,
184 "invalid backoff config: backoff_factor must be finite and >= 1.0, got {value}"
185 )
186 }
187 Self::BackoffInvalidThreshold { value } => {
188 write!(f, "invalid backoff config: rejection_rate_threshold must be in [0.0, 1.0], got {value}")
189 }
190 Self::BackoffZeroDecayRate => {
191 write!(f, "invalid backoff config: decay_rate must be at least 1")
192 }
193 Self::CellCountOverflow { value } => {
194 write!(f, "cell count {value} exceeds u32::MAX")
195 }
196 Self::FieldCountOverflow { value } => {
197 write!(f, "field count {value} exceeds u32::MAX")
198 }
199 Self::InvalidField { reason } => {
200 write!(f, "invalid field: {reason}")
201 }
202 Self::EngineRecoveryFailed => {
203 write!(f, "engine could not be recovered from tick thread")
204 }
205 Self::ThreadSpawnFailed { reason } => {
206 write!(f, "thread spawn failed: {reason}")
207 }
208 }
209 }
210}
211
212impl Error for ConfigError {
213 fn source(&self) -> Option<&(dyn Error + 'static)> {
214 match self {
215 Self::Pipeline(e) => Some(e),
216 Self::Arena(e) => Some(e),
217 _ => None,
218 }
219 }
220}
221
222impl From<PipelineError> for ConfigError {
223 fn from(e: PipelineError) -> Self {
224 Self::Pipeline(e)
225 }
226}
227
228impl From<ArenaError> for ConfigError {
229 fn from(e: ArenaError) -> Self {
230 Self::Arena(e)
231 }
232}
233
234pub struct WorldConfig {
241 pub space: Box<dyn Space>,
243 pub fields: Vec<FieldDef>,
245 pub propagators: Vec<Box<dyn Propagator>>,
247 pub dt: f64,
249 pub seed: u64,
251 pub ring_buffer_size: usize,
253 pub max_ingress_queue: usize,
255 pub tick_rate_hz: Option<f64>,
257 pub backoff: BackoffConfig,
259}
260
261impl WorldConfig {
262 pub fn validate(&self) -> Result<(), ConfigError> {
268 if self.space.cell_count() == 0 {
270 return Err(ConfigError::EmptySpace);
271 }
272 if self.fields.is_empty() {
274 return Err(ConfigError::NoFields);
275 }
276 for field in &self.fields {
278 field
279 .validate()
280 .map_err(|reason| ConfigError::InvalidField { reason })?;
281 }
282 let cell_count = self.space.cell_count();
284 if u32::try_from(cell_count).is_err() {
285 return Err(ConfigError::CellCountOverflow { value: cell_count });
286 }
287 if u32::try_from(self.fields.len()).is_err() {
289 return Err(ConfigError::FieldCountOverflow {
290 value: self.fields.len(),
291 });
292 }
293 if self.ring_buffer_size < 2 {
295 return Err(ConfigError::RingBufferTooSmall {
296 configured: self.ring_buffer_size,
297 });
298 }
299 if self.max_ingress_queue == 0 {
301 return Err(ConfigError::IngressQueueZero);
302 }
303 if let Some(hz) = self.tick_rate_hz {
307 if !hz.is_finite() || hz <= 0.0 || !(1.0 / hz).is_finite() {
308 return Err(ConfigError::InvalidTickRate { value: hz });
309 }
310 }
311 let b = &self.backoff;
313 if b.initial_max_skew > b.max_skew_cap {
314 return Err(ConfigError::BackoffSkewExceedsCap {
315 initial: b.initial_max_skew,
316 cap: b.max_skew_cap,
317 });
318 }
319 if !b.backoff_factor.is_finite() || b.backoff_factor < 1.0 {
320 return Err(ConfigError::BackoffInvalidFactor {
321 value: b.backoff_factor,
322 });
323 }
324 if !b.rejection_rate_threshold.is_finite()
325 || b.rejection_rate_threshold < 0.0
326 || b.rejection_rate_threshold > 1.0
327 {
328 return Err(ConfigError::BackoffInvalidThreshold {
329 value: b.rejection_rate_threshold,
330 });
331 }
332 if b.decay_rate == 0 {
333 return Err(ConfigError::BackoffZeroDecayRate);
334 }
335
336 let defined = self.defined_field_set()?;
340 let _ = validate_pipeline(&self.propagators, &defined, self.dt, &*self.space)?;
341
342 Ok(())
343 }
344
345 pub(crate) fn defined_field_set(&self) -> Result<FieldSet, ConfigError> {
350 (0..self.fields.len())
351 .map(|i| {
352 u32::try_from(i)
353 .map(FieldId)
354 .map_err(|_| ConfigError::FieldCountOverflow {
355 value: self.fields.len(),
356 })
357 })
358 .collect()
359 }
360}
361
362impl fmt::Debug for WorldConfig {
363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
364 f.debug_struct("WorldConfig")
365 .field("space_ndim", &self.space.ndim())
366 .field("space_cell_count", &self.space.cell_count())
367 .field("fields", &self.fields.len())
368 .field("propagators", &self.propagators.len())
369 .field("dt", &self.dt)
370 .field("seed", &self.seed)
371 .field("ring_buffer_size", &self.ring_buffer_size)
372 .field("max_ingress_queue", &self.max_ingress_queue)
373 .field("tick_rate_hz", &self.tick_rate_hz)
374 .field("backoff", &self.backoff)
375 .finish()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382 use murk_core::{BoundaryBehavior, FieldMutability, FieldType};
383 use murk_space::{EdgeBehavior, Line1D};
384 use murk_test_utils::ConstPropagator;
385
386 fn scalar_field(name: &str) -> FieldDef {
387 FieldDef {
388 name: name.to_string(),
389 field_type: FieldType::Scalar,
390 mutability: FieldMutability::PerTick,
391 units: None,
392 bounds: None,
393 boundary_behavior: BoundaryBehavior::Clamp,
394 }
395 }
396
397 fn valid_config() -> WorldConfig {
398 WorldConfig {
399 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
400 fields: vec![scalar_field("energy")],
401 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
402 dt: 0.1,
403 seed: 42,
404 ring_buffer_size: 8,
405 max_ingress_queue: 1024,
406 tick_rate_hz: None,
407 backoff: BackoffConfig::default(),
408 }
409 }
410
411 #[test]
412 fn validate_valid_config_succeeds() {
413 assert!(valid_config().validate().is_ok());
414 }
415
416 #[test]
417 fn validate_empty_propagators_fails() {
418 let mut cfg = valid_config();
419 cfg.propagators.clear();
420 match cfg.validate() {
421 Err(ConfigError::Pipeline(PipelineError::EmptyPipeline)) => {}
422 other => panic!("expected Pipeline(EmptyPipeline), got {other:?}"),
423 }
424 }
425
426 #[test]
427 fn validate_invalid_dt_fails() {
428 let mut cfg = valid_config();
429 cfg.dt = f64::NAN;
430 match cfg.validate() {
431 Err(ConfigError::Pipeline(PipelineError::InvalidDt { .. })) => {}
432 other => panic!("expected Pipeline(InvalidDt), got {other:?}"),
433 }
434 }
435
436 #[test]
437 fn validate_write_conflict_fails() {
438 let mut cfg = valid_config();
439 cfg.propagators
441 .push(Box::new(ConstPropagator::new("conflict", FieldId(0), 2.0)));
442 match cfg.validate() {
443 Err(ConfigError::Pipeline(PipelineError::WriteConflict(_))) => {}
444 other => panic!("expected Pipeline(WriteConflict), got {other:?}"),
445 }
446 }
447
448 #[test]
449 fn validate_dt_exceeds_max_dt_fails() {
450 use murk_core::PropagatorError;
451 use murk_propagator::context::StepContext;
452 use murk_propagator::propagator::WriteMode;
453
454 struct CflProp;
455 impl Propagator for CflProp {
456 fn name(&self) -> &str {
457 "cfl"
458 }
459 fn reads(&self) -> FieldSet {
460 FieldSet::empty()
461 }
462 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
463 vec![(FieldId(0), WriteMode::Full)]
464 }
465 fn max_dt(&self, _space: &dyn murk_space::Space) -> Option<f64> {
466 Some(0.01)
467 }
468 fn step(&self, _ctx: &mut StepContext<'_>) -> Result<(), PropagatorError> {
469 Ok(())
470 }
471 }
472
473 let mut cfg = valid_config();
474 cfg.propagators = vec![Box::new(CflProp)];
475 cfg.dt = 0.1;
476 match cfg.validate() {
477 Err(ConfigError::Pipeline(PipelineError::DtTooLarge { .. })) => {}
478 other => panic!("expected Pipeline(DtTooLarge), got {other:?}"),
479 }
480 }
481
482 #[test]
483 fn validate_empty_space_fails() {
484 use murk_space::error::SpaceError;
485 struct EmptySpace(murk_core::SpaceInstanceId);
487 impl Space for EmptySpace {
488 fn ndim(&self) -> usize {
489 1
490 }
491 fn cell_count(&self) -> usize {
492 0
493 }
494 fn neighbours(
495 &self,
496 _: &murk_core::Coord,
497 ) -> smallvec::SmallVec<[murk_core::Coord; 8]> {
498 smallvec::smallvec![]
499 }
500 fn distance(&self, _: &murk_core::Coord, _: &murk_core::Coord) -> f64 {
501 0.0
502 }
503 fn compile_region(
504 &self,
505 _: &murk_space::RegionSpec,
506 ) -> Result<murk_space::RegionPlan, SpaceError> {
507 Err(SpaceError::EmptySpace)
508 }
509 fn canonical_ordering(&self) -> Vec<murk_core::Coord> {
510 vec![]
511 }
512 fn instance_id(&self) -> murk_core::SpaceInstanceId {
513 self.0
514 }
515 fn topology_eq(&self, other: &dyn Space) -> bool {
516 (other as &dyn std::any::Any)
517 .downcast_ref::<Self>()
518 .is_some()
519 }
520 }
521
522 let mut cfg = valid_config();
523 cfg.space = Box::new(EmptySpace(murk_core::SpaceInstanceId::next()));
524 match cfg.validate() {
525 Err(ConfigError::EmptySpace) => {}
526 other => panic!("expected EmptySpace, got {other:?}"),
527 }
528 }
529
530 #[test]
531 fn validate_no_fields_fails() {
532 let mut cfg = valid_config();
533 cfg.fields.clear();
534 match cfg.validate() {
535 Err(ConfigError::NoFields) => {}
536 other => panic!("expected NoFields, got {other:?}"),
537 }
538 }
539
540 #[test]
541 fn cell_count_overflow_display_says_cell_count() {
542 let err = ConfigError::CellCountOverflow {
543 value: u32::MAX as usize + 1,
544 };
545 let msg = format!("{err}");
546 assert!(
547 msg.contains("cell count"),
548 "CellCountOverflow Display should say 'cell count', got: {msg}"
549 );
550 }
551
552 #[test]
553 fn field_count_overflow_display_says_field_count() {
554 let err = ConfigError::FieldCountOverflow {
555 value: u32::MAX as usize + 1,
556 };
557 let msg = format!("{err}");
558 assert!(
559 msg.contains("field count"),
560 "FieldCountOverflow Display should say 'field count', got: {msg}"
561 );
562 }
563
564 #[test]
565 fn async_config_resolved_worker_count_clamps_zero() {
566 let cfg = AsyncConfig {
567 worker_count: Some(0),
568 ..AsyncConfig::default()
569 };
570 assert_eq!(cfg.resolved_worker_count(), 1);
571 }
572
573 #[test]
574 fn async_config_resolved_worker_count_clamps_large() {
575 let cfg = AsyncConfig {
576 worker_count: Some(200),
577 ..AsyncConfig::default()
578 };
579 assert_eq!(cfg.resolved_worker_count(), 64);
580 }
581
582 #[test]
583 fn async_config_resolved_worker_count_auto() {
584 let cfg = AsyncConfig::default();
585 let count = cfg.resolved_worker_count();
586 assert!(
587 (2..=16).contains(&count),
588 "auto count {count} out of [2,16]"
589 );
590 }
591
592 #[test]
595 fn validate_backoff_initial_exceeds_cap_fails() {
596 let mut cfg = valid_config();
597 cfg.backoff.initial_max_skew = 100;
598 cfg.backoff.max_skew_cap = 5;
599 match cfg.validate() {
600 Err(ConfigError::BackoffSkewExceedsCap {
601 initial: 100,
602 cap: 5,
603 }) => {}
604 other => panic!("expected BackoffSkewExceedsCap, got {other:?}"),
605 }
606 }
607
608 #[test]
609 fn validate_backoff_nan_factor_fails() {
610 let mut cfg = valid_config();
611 cfg.backoff.backoff_factor = f64::NAN;
612 match cfg.validate() {
613 Err(ConfigError::BackoffInvalidFactor { .. }) => {}
614 other => panic!("expected BackoffInvalidFactor, got {other:?}"),
615 }
616 }
617
618 #[test]
619 fn validate_backoff_factor_below_one_fails() {
620 let mut cfg = valid_config();
621 cfg.backoff.backoff_factor = 0.5;
622 match cfg.validate() {
623 Err(ConfigError::BackoffInvalidFactor { value: 0.5 }) => {}
624 other => panic!("expected BackoffInvalidFactor(0.5), got {other:?}"),
625 }
626 }
627
628 #[test]
629 fn validate_backoff_threshold_out_of_range_fails() {
630 let mut cfg = valid_config();
631 cfg.backoff.rejection_rate_threshold = 1.5;
632 match cfg.validate() {
633 Err(ConfigError::BackoffInvalidThreshold { value: 1.5 }) => {}
634 other => panic!("expected BackoffInvalidThreshold(1.5), got {other:?}"),
635 }
636 }
637
638 #[test]
639 fn validate_backoff_zero_decay_rate_fails() {
640 let mut cfg = valid_config();
641 cfg.backoff.decay_rate = 0;
642 match cfg.validate() {
643 Err(ConfigError::BackoffZeroDecayRate) => {}
644 other => panic!("expected BackoffZeroDecayRate, got {other:?}"),
645 }
646 }
647
648 #[test]
650 fn thread_spawn_failed_error_display() {
651 let err = ConfigError::ThreadSpawnFailed {
652 reason: "tick thread: resource limit".to_string(),
653 };
654 let msg = format!("{err}");
655 assert!(msg.contains("thread spawn failed"));
656 assert!(msg.contains("tick thread"));
657 }
658
659 #[test]
662 fn validate_subnormal_tick_rate_hz_rejected() {
663 let mut cfg = valid_config();
664 cfg.tick_rate_hz = Some(f64::from_bits(1)); match cfg.validate() {
666 Err(ConfigError::InvalidTickRate { .. }) => {}
667 other => panic!("expected InvalidTickRate, got {other:?}"),
668 }
669 }
670
671 #[test]
672 fn validate_valid_backoff_succeeds() {
673 let mut cfg = valid_config();
674 cfg.backoff = BackoffConfig {
675 initial_max_skew: 5,
676 max_skew_cap: 10,
677 backoff_factor: 1.5,
678 decay_rate: 60,
679 rejection_rate_threshold: 0.20,
680 };
681 assert!(cfg.validate().is_ok());
682 }
683
684 #[test]
685 fn thread_spawn_failed_error_source_is_none() {
686 use std::error::Error;
687 let err = ConfigError::ThreadSpawnFailed {
688 reason: "egress worker 2: resource limit".into(),
689 };
690 assert!(err.source().is_none());
691 }
692
693 #[test]
694 fn thread_spawn_failed_reason_preserved() {
695 let err = ConfigError::ThreadSpawnFailed {
696 reason: "egress worker 2: os error 11".into(),
697 };
698 match &err {
699 ConfigError::ThreadSpawnFailed { reason } => {
700 assert_eq!(reason, "egress worker 2: os error 11");
701 }
702 _ => panic!("wrong variant"),
703 }
704 }
705
706 #[test]
707 fn thread_spawn_failed_debug_contains_reason() {
708 let err = ConfigError::ThreadSpawnFailed {
709 reason: "tick thread: RLIMIT_NPROC".into(),
710 };
711 let dbg = format!("{err:?}");
712 assert!(dbg.contains("RLIMIT_NPROC"), "Debug output: {dbg}");
713 }
714}