1use std::error::Error;
9use std::fmt;
10
11use murk_arena::ArenaError;
12use murk_core::{FieldDef, FieldId, FieldSet};
13use murk_propagator::{validate_pipeline, PipelineError, Propagator};
14use murk_space::Space;
15
16#[derive(Clone, Debug)]
24pub struct BackoffConfig {
25 pub initial_max_skew: u64,
27 pub backoff_factor: f64,
29 pub max_skew_cap: u64,
31 pub decay_rate: u64,
33 pub rejection_rate_threshold: f64,
35}
36
37impl Default for BackoffConfig {
38 fn default() -> Self {
39 Self {
40 initial_max_skew: 2,
41 backoff_factor: 1.5,
42 max_skew_cap: 10,
43 decay_rate: 60,
44 rejection_rate_threshold: 0.20,
45 }
46 }
47}
48
49#[derive(Clone, Debug)]
56pub struct AsyncConfig {
57 pub worker_count: Option<usize>,
60 pub max_epoch_hold_ms: u64,
63 pub cancel_grace_ms: u64,
66}
67
68impl Default for AsyncConfig {
69 fn default() -> Self {
70 Self {
71 worker_count: None,
72 max_epoch_hold_ms: 100,
73 cancel_grace_ms: 10,
74 }
75 }
76}
77
78impl AsyncConfig {
79 pub fn resolved_worker_count(&self) -> usize {
84 match self.worker_count {
85 Some(n) => n.clamp(1, 64),
86 None => {
87 let cpus = std::thread::available_parallelism()
88 .map(|n| n.get())
89 .unwrap_or(4);
90 (cpus / 2).clamp(2, 16)
91 }
92 }
93 }
94}
95
96#[derive(Debug, PartialEq)]
100pub enum ConfigError {
101 Pipeline(PipelineError),
103 Arena(ArenaError),
105 EmptySpace,
107 NoFields,
109 RingBufferTooSmall {
111 configured: usize,
113 },
114 IngressQueueZero,
116 InvalidTickRate {
118 value: f64,
120 },
121 InvalidBackoff {
123 reason: String,
125 },
126 CellCountOverflow {
128 value: usize,
130 },
131 InvalidField {
133 reason: String,
135 },
136 EngineRecoveryFailed,
138}
139
140impl fmt::Display for ConfigError {
141 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
142 match self {
143 Self::Pipeline(e) => write!(f, "pipeline: {e}"),
144 Self::Arena(e) => write!(f, "arena: {e}"),
145 Self::EmptySpace => write!(f, "space has zero cells"),
146 Self::NoFields => write!(f, "no fields registered"),
147 Self::RingBufferTooSmall { configured } => {
148 write!(f, "ring_buffer_size {configured} is below minimum of 2")
149 }
150 Self::IngressQueueZero => write!(f, "max_ingress_queue must be at least 1"),
151 Self::InvalidTickRate { value } => {
152 write!(f, "tick_rate_hz must be finite and positive, got {value}")
153 }
154 Self::InvalidBackoff { reason } => {
155 write!(f, "invalid backoff config: {reason}")
156 }
157 Self::CellCountOverflow { value } => {
158 write!(f, "cell count {value} exceeds u32::MAX")
159 }
160 Self::InvalidField { reason } => {
161 write!(f, "invalid field: {reason}")
162 }
163 Self::EngineRecoveryFailed => {
164 write!(f, "engine could not be recovered from tick thread")
165 }
166 }
167 }
168}
169
170impl Error for ConfigError {
171 fn source(&self) -> Option<&(dyn Error + 'static)> {
172 match self {
173 Self::Pipeline(e) => Some(e),
174 Self::Arena(e) => Some(e),
175 _ => None,
176 }
177 }
178}
179
180impl From<PipelineError> for ConfigError {
181 fn from(e: PipelineError) -> Self {
182 Self::Pipeline(e)
183 }
184}
185
186impl From<ArenaError> for ConfigError {
187 fn from(e: ArenaError) -> Self {
188 Self::Arena(e)
189 }
190}
191
192pub struct WorldConfig {
199 pub space: Box<dyn Space>,
201 pub fields: Vec<FieldDef>,
203 pub propagators: Vec<Box<dyn Propagator>>,
205 pub dt: f64,
207 pub seed: u64,
209 pub ring_buffer_size: usize,
211 pub max_ingress_queue: usize,
213 pub tick_rate_hz: Option<f64>,
215 pub backoff: BackoffConfig,
217}
218
219impl WorldConfig {
220 pub fn validate(&self) -> Result<(), ConfigError> {
226 if self.space.cell_count() == 0 {
228 return Err(ConfigError::EmptySpace);
229 }
230 if self.fields.is_empty() {
232 return Err(ConfigError::NoFields);
233 }
234 for field in &self.fields {
236 field
237 .validate()
238 .map_err(|reason| ConfigError::InvalidField { reason })?;
239 }
240 let cell_count = self.space.cell_count();
242 if u32::try_from(cell_count).is_err() {
243 return Err(ConfigError::CellCountOverflow { value: cell_count });
244 }
245 if u32::try_from(self.fields.len()).is_err() {
247 return Err(ConfigError::CellCountOverflow {
248 value: self.fields.len(),
249 });
250 }
251 if self.ring_buffer_size < 2 {
253 return Err(ConfigError::RingBufferTooSmall {
254 configured: self.ring_buffer_size,
255 });
256 }
257 if self.max_ingress_queue == 0 {
259 return Err(ConfigError::IngressQueueZero);
260 }
261 if let Some(hz) = self.tick_rate_hz {
263 if !hz.is_finite() || hz <= 0.0 {
264 return Err(ConfigError::InvalidTickRate { value: hz });
265 }
266 }
267 let b = &self.backoff;
269 if b.initial_max_skew > b.max_skew_cap {
270 return Err(ConfigError::InvalidBackoff {
271 reason: format!(
272 "initial_max_skew ({}) exceeds max_skew_cap ({})",
273 b.initial_max_skew, b.max_skew_cap,
274 ),
275 });
276 }
277 if !b.backoff_factor.is_finite() || b.backoff_factor < 1.0 {
278 return Err(ConfigError::InvalidBackoff {
279 reason: format!(
280 "backoff_factor must be finite and >= 1.0, got {}",
281 b.backoff_factor,
282 ),
283 });
284 }
285 if !b.rejection_rate_threshold.is_finite()
286 || b.rejection_rate_threshold < 0.0
287 || b.rejection_rate_threshold > 1.0
288 {
289 return Err(ConfigError::InvalidBackoff {
290 reason: format!(
291 "rejection_rate_threshold must be in [0.0, 1.0], got {}",
292 b.rejection_rate_threshold,
293 ),
294 });
295 }
296 if b.decay_rate == 0 {
297 return Err(ConfigError::InvalidBackoff {
298 reason: "decay_rate must be at least 1".to_string(),
299 });
300 }
301
302 let defined = self.defined_field_set();
306 let _ = validate_pipeline(&self.propagators, &defined, self.dt)?;
307
308 Ok(())
309 }
310
311 pub(crate) fn defined_field_set(&self) -> FieldSet {
318 (0..self.fields.len())
319 .map(|i| FieldId(u32::try_from(i).expect("field count validated")))
320 .collect()
321 }
322}
323
324impl fmt::Debug for WorldConfig {
325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326 f.debug_struct("WorldConfig")
327 .field("space_ndim", &self.space.ndim())
328 .field("space_cell_count", &self.space.cell_count())
329 .field("fields", &self.fields.len())
330 .field("propagators", &self.propagators.len())
331 .field("dt", &self.dt)
332 .field("seed", &self.seed)
333 .field("ring_buffer_size", &self.ring_buffer_size)
334 .field("max_ingress_queue", &self.max_ingress_queue)
335 .field("tick_rate_hz", &self.tick_rate_hz)
336 .field("backoff", &self.backoff)
337 .finish()
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use murk_core::{BoundaryBehavior, FieldMutability, FieldType};
345 use murk_space::{EdgeBehavior, Line1D};
346 use murk_test_utils::ConstPropagator;
347
348 fn scalar_field(name: &str) -> FieldDef {
349 FieldDef {
350 name: name.to_string(),
351 field_type: FieldType::Scalar,
352 mutability: FieldMutability::PerTick,
353 units: None,
354 bounds: None,
355 boundary_behavior: BoundaryBehavior::Clamp,
356 }
357 }
358
359 fn valid_config() -> WorldConfig {
360 WorldConfig {
361 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
362 fields: vec![scalar_field("energy")],
363 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
364 dt: 0.1,
365 seed: 42,
366 ring_buffer_size: 8,
367 max_ingress_queue: 1024,
368 tick_rate_hz: None,
369 backoff: BackoffConfig::default(),
370 }
371 }
372
373 #[test]
374 fn validate_valid_config_succeeds() {
375 assert!(valid_config().validate().is_ok());
376 }
377
378 #[test]
379 fn validate_empty_propagators_fails() {
380 let mut cfg = valid_config();
381 cfg.propagators.clear();
382 match cfg.validate() {
383 Err(ConfigError::Pipeline(PipelineError::EmptyPipeline)) => {}
384 other => panic!("expected Pipeline(EmptyPipeline), got {other:?}"),
385 }
386 }
387
388 #[test]
389 fn validate_invalid_dt_fails() {
390 let mut cfg = valid_config();
391 cfg.dt = f64::NAN;
392 match cfg.validate() {
393 Err(ConfigError::Pipeline(PipelineError::InvalidDt { .. })) => {}
394 other => panic!("expected Pipeline(InvalidDt), got {other:?}"),
395 }
396 }
397
398 #[test]
399 fn validate_write_conflict_fails() {
400 let mut cfg = valid_config();
401 cfg.propagators
403 .push(Box::new(ConstPropagator::new("conflict", FieldId(0), 2.0)));
404 match cfg.validate() {
405 Err(ConfigError::Pipeline(PipelineError::WriteConflict(_))) => {}
406 other => panic!("expected Pipeline(WriteConflict), got {other:?}"),
407 }
408 }
409
410 #[test]
411 fn validate_dt_exceeds_max_dt_fails() {
412 use murk_core::PropagatorError;
413 use murk_propagator::context::StepContext;
414 use murk_propagator::propagator::WriteMode;
415
416 struct CflProp;
417 impl Propagator for CflProp {
418 fn name(&self) -> &str {
419 "cfl"
420 }
421 fn reads(&self) -> FieldSet {
422 FieldSet::empty()
423 }
424 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
425 vec![(FieldId(0), WriteMode::Full)]
426 }
427 fn max_dt(&self) -> Option<f64> {
428 Some(0.01)
429 }
430 fn step(&self, _ctx: &mut StepContext<'_>) -> Result<(), PropagatorError> {
431 Ok(())
432 }
433 }
434
435 let mut cfg = valid_config();
436 cfg.propagators = vec![Box::new(CflProp)];
437 cfg.dt = 0.1;
438 match cfg.validate() {
439 Err(ConfigError::Pipeline(PipelineError::DtTooLarge { .. })) => {}
440 other => panic!("expected Pipeline(DtTooLarge), got {other:?}"),
441 }
442 }
443
444 #[test]
445 fn validate_empty_space_fails() {
446 use murk_space::error::SpaceError;
447 struct EmptySpace(murk_core::SpaceInstanceId);
449 impl Space for EmptySpace {
450 fn ndim(&self) -> usize {
451 1
452 }
453 fn cell_count(&self) -> usize {
454 0
455 }
456 fn neighbours(
457 &self,
458 _: &murk_core::Coord,
459 ) -> smallvec::SmallVec<[murk_core::Coord; 8]> {
460 smallvec::smallvec![]
461 }
462 fn distance(&self, _: &murk_core::Coord, _: &murk_core::Coord) -> f64 {
463 0.0
464 }
465 fn compile_region(
466 &self,
467 _: &murk_space::RegionSpec,
468 ) -> Result<murk_space::RegionPlan, SpaceError> {
469 Err(SpaceError::EmptySpace)
470 }
471 fn canonical_ordering(&self) -> Vec<murk_core::Coord> {
472 vec![]
473 }
474 fn instance_id(&self) -> murk_core::SpaceInstanceId {
475 self.0
476 }
477 fn topology_eq(&self, other: &dyn Space) -> bool {
478 (other as &dyn std::any::Any)
479 .downcast_ref::<Self>()
480 .is_some()
481 }
482 }
483
484 let mut cfg = valid_config();
485 cfg.space = Box::new(EmptySpace(murk_core::SpaceInstanceId::next()));
486 match cfg.validate() {
487 Err(ConfigError::EmptySpace) => {}
488 other => panic!("expected EmptySpace, got {other:?}"),
489 }
490 }
491
492 #[test]
493 fn validate_no_fields_fails() {
494 let mut cfg = valid_config();
495 cfg.fields.clear();
496 match cfg.validate() {
497 Err(ConfigError::NoFields) => {}
498 other => panic!("expected NoFields, got {other:?}"),
499 }
500 }
501
502 #[test]
503 fn async_config_resolved_worker_count_clamps_zero() {
504 let cfg = AsyncConfig {
505 worker_count: Some(0),
506 ..AsyncConfig::default()
507 };
508 assert_eq!(cfg.resolved_worker_count(), 1);
509 }
510
511 #[test]
512 fn async_config_resolved_worker_count_clamps_large() {
513 let cfg = AsyncConfig {
514 worker_count: Some(200),
515 ..AsyncConfig::default()
516 };
517 assert_eq!(cfg.resolved_worker_count(), 64);
518 }
519
520 #[test]
521 fn async_config_resolved_worker_count_auto() {
522 let cfg = AsyncConfig::default();
523 let count = cfg.resolved_worker_count();
524 assert!(
525 (2..=16).contains(&count),
526 "auto count {count} out of [2,16]"
527 );
528 }
529
530 #[test]
533 fn validate_backoff_initial_exceeds_cap_fails() {
534 let mut cfg = valid_config();
535 cfg.backoff.initial_max_skew = 100;
536 cfg.backoff.max_skew_cap = 5;
537 match cfg.validate() {
538 Err(ConfigError::InvalidBackoff { .. }) => {}
539 other => panic!("expected InvalidBackoff, got {other:?}"),
540 }
541 }
542
543 #[test]
544 fn validate_backoff_nan_factor_fails() {
545 let mut cfg = valid_config();
546 cfg.backoff.backoff_factor = f64::NAN;
547 match cfg.validate() {
548 Err(ConfigError::InvalidBackoff { .. }) => {}
549 other => panic!("expected InvalidBackoff, got {other:?}"),
550 }
551 }
552
553 #[test]
554 fn validate_backoff_factor_below_one_fails() {
555 let mut cfg = valid_config();
556 cfg.backoff.backoff_factor = 0.5;
557 match cfg.validate() {
558 Err(ConfigError::InvalidBackoff { .. }) => {}
559 other => panic!("expected InvalidBackoff, got {other:?}"),
560 }
561 }
562
563 #[test]
564 fn validate_backoff_threshold_out_of_range_fails() {
565 let mut cfg = valid_config();
566 cfg.backoff.rejection_rate_threshold = 1.5;
567 match cfg.validate() {
568 Err(ConfigError::InvalidBackoff { .. }) => {}
569 other => panic!("expected InvalidBackoff, got {other:?}"),
570 }
571 }
572
573 #[test]
574 fn validate_backoff_zero_decay_rate_fails() {
575 let mut cfg = valid_config();
576 cfg.backoff.decay_rate = 0;
577 match cfg.validate() {
578 Err(ConfigError::InvalidBackoff { .. }) => {}
579 other => panic!("expected InvalidBackoff, got {other:?}"),
580 }
581 }
582
583 #[test]
584 fn validate_valid_backoff_succeeds() {
585 let mut cfg = valid_config();
586 cfg.backoff = BackoffConfig {
587 initial_max_skew: 5,
588 max_skew_cap: 10,
589 backoff_factor: 1.5,
590 decay_rate: 60,
591 rejection_rate_threshold: 0.20,
592 };
593 assert!(cfg.validate().is_ok());
594 }
595}