1use std::error::Error;
9use std::fmt;
10
11use murk_arena::ArenaError;
12use murk_core::{FieldDef, FieldId, FieldSet};
13use murk_propagator::{validate_pipeline, PipelineError, Propagator};
14use murk_space::Space;
15
16#[derive(Clone, Debug)]
24pub struct BackoffConfig {
25 pub initial_max_skew: u64,
27 pub backoff_factor: f64,
29 pub max_skew_cap: u64,
31 pub decay_rate: u64,
33 pub rejection_rate_threshold: f64,
35}
36
37impl Default for BackoffConfig {
38 fn default() -> Self {
39 Self {
40 initial_max_skew: 2,
41 backoff_factor: 1.5,
42 max_skew_cap: 10,
43 decay_rate: 60,
44 rejection_rate_threshold: 0.20,
45 }
46 }
47}
48
49#[derive(Clone, Debug)]
56pub struct AsyncConfig {
57 pub worker_count: Option<usize>,
60 pub max_epoch_hold_ms: u64,
63 pub cancel_grace_ms: u64,
66}
67
68impl Default for AsyncConfig {
69 fn default() -> Self {
70 Self {
71 worker_count: None,
72 max_epoch_hold_ms: 100,
73 cancel_grace_ms: 10,
74 }
75 }
76}
77
78impl AsyncConfig {
79 pub fn resolved_worker_count(&self) -> usize {
84 match self.worker_count {
85 Some(n) => n.clamp(1, 64),
86 None => {
87 let cpus = std::thread::available_parallelism()
88 .map(|n| n.get())
89 .unwrap_or(4);
90 (cpus / 2).clamp(2, 16)
91 }
92 }
93 }
94}
95
96#[derive(Debug)]
100pub enum ConfigError {
101 Pipeline(PipelineError),
103 Arena(ArenaError),
105 EmptySpace,
107 NoFields,
109 RingBufferTooSmall {
111 configured: usize,
113 },
114 IngressQueueZero,
116 InvalidTickRate {
118 value: f64,
120 },
121}
122
123impl fmt::Display for ConfigError {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 match self {
126 Self::Pipeline(e) => write!(f, "pipeline: {e}"),
127 Self::Arena(e) => write!(f, "arena: {e}"),
128 Self::EmptySpace => write!(f, "space has zero cells"),
129 Self::NoFields => write!(f, "no fields registered"),
130 Self::RingBufferTooSmall { configured } => {
131 write!(f, "ring_buffer_size {configured} is below minimum of 2")
132 }
133 Self::IngressQueueZero => write!(f, "max_ingress_queue must be at least 1"),
134 Self::InvalidTickRate { value } => {
135 write!(f, "tick_rate_hz must be finite and positive, got {value}")
136 }
137 }
138 }
139}
140
141impl Error for ConfigError {
142 fn source(&self) -> Option<&(dyn Error + 'static)> {
143 match self {
144 Self::Pipeline(e) => Some(e),
145 Self::Arena(e) => Some(e),
146 _ => None,
147 }
148 }
149}
150
151impl From<PipelineError> for ConfigError {
152 fn from(e: PipelineError) -> Self {
153 Self::Pipeline(e)
154 }
155}
156
157impl From<ArenaError> for ConfigError {
158 fn from(e: ArenaError) -> Self {
159 Self::Arena(e)
160 }
161}
162
163pub struct WorldConfig {
170 pub space: Box<dyn Space>,
172 pub fields: Vec<FieldDef>,
174 pub propagators: Vec<Box<dyn Propagator>>,
176 pub dt: f64,
178 pub seed: u64,
180 pub ring_buffer_size: usize,
182 pub max_ingress_queue: usize,
184 pub tick_rate_hz: Option<f64>,
186 pub backoff: BackoffConfig,
188}
189
190impl WorldConfig {
191 pub fn validate(&self) -> Result<(), ConfigError> {
197 if self.space.cell_count() == 0 {
199 return Err(ConfigError::EmptySpace);
200 }
201 if self.fields.is_empty() {
203 return Err(ConfigError::NoFields);
204 }
205 if self.ring_buffer_size < 2 {
207 return Err(ConfigError::RingBufferTooSmall {
208 configured: self.ring_buffer_size,
209 });
210 }
211 if self.max_ingress_queue == 0 {
213 return Err(ConfigError::IngressQueueZero);
214 }
215 if let Some(hz) = self.tick_rate_hz {
217 if !hz.is_finite() || hz <= 0.0 {
218 return Err(ConfigError::InvalidTickRate { value: hz });
219 }
220 }
221 let defined = self.defined_field_set();
223 validate_pipeline(&self.propagators, &defined, self.dt)?;
224
225 Ok(())
226 }
227
228 pub(crate) fn defined_field_set(&self) -> FieldSet {
230 (0..self.fields.len()).map(|i| FieldId(i as u32)).collect()
231 }
232}
233
234impl fmt::Debug for WorldConfig {
235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236 f.debug_struct("WorldConfig")
237 .field("space_ndim", &self.space.ndim())
238 .field("space_cell_count", &self.space.cell_count())
239 .field("fields", &self.fields.len())
240 .field("propagators", &self.propagators.len())
241 .field("dt", &self.dt)
242 .field("seed", &self.seed)
243 .field("ring_buffer_size", &self.ring_buffer_size)
244 .field("max_ingress_queue", &self.max_ingress_queue)
245 .field("tick_rate_hz", &self.tick_rate_hz)
246 .field("backoff", &self.backoff)
247 .finish()
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254 use murk_core::{BoundaryBehavior, FieldMutability, FieldType};
255 use murk_space::{EdgeBehavior, Line1D};
256 use murk_test_utils::ConstPropagator;
257
258 fn scalar_field(name: &str) -> FieldDef {
259 FieldDef {
260 name: name.to_string(),
261 field_type: FieldType::Scalar,
262 mutability: FieldMutability::PerTick,
263 units: None,
264 bounds: None,
265 boundary_behavior: BoundaryBehavior::Clamp,
266 }
267 }
268
269 fn valid_config() -> WorldConfig {
270 WorldConfig {
271 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
272 fields: vec![scalar_field("energy")],
273 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 1.0))],
274 dt: 0.1,
275 seed: 42,
276 ring_buffer_size: 8,
277 max_ingress_queue: 1024,
278 tick_rate_hz: None,
279 backoff: BackoffConfig::default(),
280 }
281 }
282
283 #[test]
284 fn validate_valid_config_succeeds() {
285 assert!(valid_config().validate().is_ok());
286 }
287
288 #[test]
289 fn validate_empty_propagators_fails() {
290 let mut cfg = valid_config();
291 cfg.propagators.clear();
292 match cfg.validate() {
293 Err(ConfigError::Pipeline(PipelineError::EmptyPipeline)) => {}
294 other => panic!("expected Pipeline(EmptyPipeline), got {other:?}"),
295 }
296 }
297
298 #[test]
299 fn validate_invalid_dt_fails() {
300 let mut cfg = valid_config();
301 cfg.dt = f64::NAN;
302 match cfg.validate() {
303 Err(ConfigError::Pipeline(PipelineError::InvalidDt { .. })) => {}
304 other => panic!("expected Pipeline(InvalidDt), got {other:?}"),
305 }
306 }
307
308 #[test]
309 fn validate_write_conflict_fails() {
310 let mut cfg = valid_config();
311 cfg.propagators
313 .push(Box::new(ConstPropagator::new("conflict", FieldId(0), 2.0)));
314 match cfg.validate() {
315 Err(ConfigError::Pipeline(PipelineError::WriteConflict(_))) => {}
316 other => panic!("expected Pipeline(WriteConflict), got {other:?}"),
317 }
318 }
319
320 #[test]
321 fn validate_dt_exceeds_max_dt_fails() {
322 use murk_core::PropagatorError;
323 use murk_propagator::context::StepContext;
324 use murk_propagator::propagator::WriteMode;
325
326 struct CflProp;
327 impl Propagator for CflProp {
328 fn name(&self) -> &str {
329 "cfl"
330 }
331 fn reads(&self) -> FieldSet {
332 FieldSet::empty()
333 }
334 fn writes(&self) -> Vec<(FieldId, WriteMode)> {
335 vec![(FieldId(0), WriteMode::Full)]
336 }
337 fn max_dt(&self) -> Option<f64> {
338 Some(0.01)
339 }
340 fn step(&self, _ctx: &mut StepContext<'_>) -> Result<(), PropagatorError> {
341 Ok(())
342 }
343 }
344
345 let mut cfg = valid_config();
346 cfg.propagators = vec![Box::new(CflProp)];
347 cfg.dt = 0.1;
348 match cfg.validate() {
349 Err(ConfigError::Pipeline(PipelineError::DtTooLarge { .. })) => {}
350 other => panic!("expected Pipeline(DtTooLarge), got {other:?}"),
351 }
352 }
353
354 #[test]
355 fn validate_empty_space_fails() {
356 use murk_space::error::SpaceError;
357 struct EmptySpace(murk_core::SpaceInstanceId);
359 impl Space for EmptySpace {
360 fn ndim(&self) -> usize {
361 1
362 }
363 fn cell_count(&self) -> usize {
364 0
365 }
366 fn neighbours(
367 &self,
368 _: &murk_core::Coord,
369 ) -> smallvec::SmallVec<[murk_core::Coord; 8]> {
370 smallvec::smallvec![]
371 }
372 fn distance(&self, _: &murk_core::Coord, _: &murk_core::Coord) -> f64 {
373 0.0
374 }
375 fn compile_region(
376 &self,
377 _: &murk_space::RegionSpec,
378 ) -> Result<murk_space::RegionPlan, SpaceError> {
379 Err(SpaceError::EmptySpace)
380 }
381 fn canonical_ordering(&self) -> Vec<murk_core::Coord> {
382 vec![]
383 }
384 fn instance_id(&self) -> murk_core::SpaceInstanceId {
385 self.0
386 }
387 }
388
389 let mut cfg = valid_config();
390 cfg.space = Box::new(EmptySpace(murk_core::SpaceInstanceId::next()));
391 match cfg.validate() {
392 Err(ConfigError::EmptySpace) => {}
393 other => panic!("expected EmptySpace, got {other:?}"),
394 }
395 }
396
397 #[test]
398 fn validate_no_fields_fails() {
399 let mut cfg = valid_config();
400 cfg.fields.clear();
401 match cfg.validate() {
402 Err(ConfigError::NoFields) => {}
403 other => panic!("expected NoFields, got {other:?}"),
404 }
405 }
406
407 #[test]
408 fn async_config_resolved_worker_count_clamps_zero() {
409 let cfg = AsyncConfig {
410 worker_count: Some(0),
411 ..AsyncConfig::default()
412 };
413 assert_eq!(cfg.resolved_worker_count(), 1);
414 }
415
416 #[test]
417 fn async_config_resolved_worker_count_clamps_large() {
418 let cfg = AsyncConfig {
419 worker_count: Some(200),
420 ..AsyncConfig::default()
421 };
422 assert_eq!(cfg.resolved_worker_count(), 64);
423 }
424
425 #[test]
426 fn async_config_resolved_worker_count_auto() {
427 let cfg = AsyncConfig::default();
428 let count = cfg.resolved_worker_count();
429 assert!(
430 (2..=16).contains(&count),
431 "auto count {count} out of [2,16]"
432 );
433 }
434}