1use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53 Shutdown,
55 ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Shutdown => write!(f, "tick thread has shut down"),
63 Self::ChannelFull => write!(f, "command channel full"),
64 }
65 }
66}
67
68impl std::error::Error for SubmitError {}
69
70#[derive(Debug)]
74pub struct ShutdownReport {
75 pub total_ms: u64,
77 pub drain_ms: u64,
79 pub quiesce_ms: u64,
81 pub tick_joined: bool,
83 pub workers_joined: usize,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct RealtimePreflight {
90 pub command_queue_depth: usize,
92 pub command_queue_capacity: usize,
94 pub observe_queue_depth: usize,
96 pub observe_queue_capacity: usize,
98 pub has_snapshot: bool,
100 pub latest_snapshot_tick_id: u64,
102 pub snapshot_age_ticks: u64,
104 pub ring_capacity: usize,
106 pub ring_len: usize,
108 pub ring_write_pos: u64,
110 pub ring_oldest_retained_pos: Option<u64>,
112 pub ring_eviction_events: u64,
114 pub ring_stale_read_events: u64,
116 pub ring_skew_retry_events: u64,
118 pub tick_thread_stopped: bool,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum ShutdownState {
126 Running,
127 Draining,
128 Quiescing,
129 Dropped,
130}
131
132pub struct RealtimeAsyncWorld {
140 ring: Arc<SnapshotRing>,
141 epoch_counter: Arc<EpochCounter>,
142 worker_epochs: Arc<[WorkerEpoch]>,
143 cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
144 obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
145 shutdown_flag: Arc<AtomicBool>,
146 tick_stopped: Arc<AtomicBool>,
147 tick_thread: Option<JoinHandle<TickEngine>>,
148 worker_threads: Vec<JoinHandle<()>>,
149 state: ShutdownState,
150 recovered_engine: Mutex<Option<TickEngine>>,
155 config: AsyncConfig,
156 backoff_config: BackoffConfig,
157 seed: u64,
158 tick_rate_hz: f64,
159 space: Arc<dyn Space>,
161}
162
163impl RealtimeAsyncWorld {
164 pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
170 let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
171 if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
172 return Err(ConfigError::InvalidTickRate {
173 value: tick_rate_hz,
174 });
175 }
176
177 let seed = config.seed;
178 let ring_size = config.ring_buffer_size;
179 let backoff_config = config.backoff.clone();
180 let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
181 let cancel_grace_ms = async_config.cancel_grace_ms;
182
183 let space: Arc<dyn Space> = Arc::from(config.space);
185
186 let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
190 let engine_config = WorldConfig {
191 space: engine_space,
192 fields: config.fields,
193 propagators: config.propagators,
194 dt: config.dt,
195 seed: config.seed,
196 ring_buffer_size: config.ring_buffer_size,
197 max_ingress_queue: config.max_ingress_queue,
198 tick_rate_hz: config.tick_rate_hz,
199 backoff: backoff_config.clone(),
200 };
201
202 let engine = TickEngine::new(engine_config)?;
203
204 let worker_count = async_config.resolved_worker_count();
205 let ring = Arc::new(SnapshotRing::new(ring_size));
206 let epoch_counter = Arc::new(EpochCounter::new());
207 let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
208 .map(WorkerEpoch::new)
209 .collect::<Vec<_>>()
210 .into();
211
212 let shutdown_flag = Arc::new(AtomicBool::new(false));
213 let tick_stopped = Arc::new(AtomicBool::new(false));
214
215 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
217
218 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
220
221 let tick_ring = Arc::clone(&ring);
223 let tick_epoch = Arc::clone(&epoch_counter);
224 let tick_workers = Arc::clone(&worker_epochs);
225 let tick_shutdown = Arc::clone(&shutdown_flag);
226 let tick_stopped_flag = Arc::clone(&tick_stopped);
227 let stored_backoff = backoff_config.clone();
228 let tick_thread = thread::Builder::new()
229 .name("murk-tick".into())
230 .spawn(move || {
231 let state = TickThreadState::new(
232 engine,
233 tick_ring,
234 tick_epoch,
235 tick_workers,
236 cmd_rx,
237 tick_shutdown,
238 tick_stopped_flag,
239 tick_rate_hz,
240 max_epoch_hold_ms,
241 cancel_grace_ms,
242 &backoff_config,
243 );
244 state.run()
245 })
246 .expect("failed to spawn tick thread");
247
248 let worker_threads = Self::spawn_egress_workers(
250 worker_count,
251 &obs_rx,
252 &ring,
253 &epoch_counter,
254 &worker_epochs,
255 );
256
257 Ok(Self {
258 ring,
259 epoch_counter,
260 worker_epochs,
261 cmd_tx: Some(cmd_tx),
262 obs_tx: Some(obs_tx),
263 shutdown_flag,
264 tick_stopped,
265 tick_thread: Some(tick_thread),
266 worker_threads,
267 state: ShutdownState::Running,
268 recovered_engine: Mutex::new(None),
269 config: async_config,
270 backoff_config: stored_backoff,
271 seed,
272 tick_rate_hz,
273 space,
274 })
275 }
276
277 pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
282 let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
283
284 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
285 let batch = IngressBatch {
286 commands,
287 reply: reply_tx,
288 };
289
290 cmd_tx.try_send(batch).map_err(|e| match e {
291 crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
292 crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
293 })?;
294
295 reply_rx.recv().map_err(|_| SubmitError::Shutdown)
297 }
298
299 pub fn observe(
304 &self,
305 plan: &Arc<ObsPlan>,
306 output: &mut [f32],
307 mask: &mut [u8],
308 ) -> Result<ObsMetadata, ObsError> {
309 let obs_tx = self
310 .obs_tx
311 .as_ref()
312 .ok_or_else(|| ObsError::ExecutionFailed {
313 reason: "world is shut down".into(),
314 })?;
315
316 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
317 let task = ObsTask::Simple {
318 plan: Arc::clone(plan),
319 output_len: output.len(),
320 mask_len: mask.len(),
321 reply: reply_tx,
322 };
323
324 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
325 reason: "egress pool shut down".into(),
326 })?;
327
328 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
329 reason: "worker disconnected".into(),
330 })?;
331
332 match result {
333 ObsResult::Simple {
334 metadata,
335 output: buf,
336 mask: mbuf,
337 } => {
338 if buf.len() > output.len() || mbuf.len() > mask.len() {
339 return Err(ObsError::ExecutionFailed {
340 reason: format!(
341 "output buffer too small: need ({}, {}) got ({}, {})",
342 buf.len(),
343 mbuf.len(),
344 output.len(),
345 mask.len()
346 ),
347 });
348 }
349 output[..buf.len()].copy_from_slice(&buf);
350 mask[..mbuf.len()].copy_from_slice(&mbuf);
351 Ok(metadata)
352 }
353 ObsResult::Error(e) => Err(e),
354 _ => Err(ObsError::ExecutionFailed {
355 reason: "unexpected result type".into(),
356 }),
357 }
358 }
359
360 pub fn observe_agents(
364 &self,
365 plan: &Arc<ObsPlan>,
366 space: &Arc<dyn Space>,
367 agent_centers: &[Coord],
368 output: &mut [f32],
369 mask: &mut [u8],
370 ) -> Result<Vec<ObsMetadata>, ObsError> {
371 let obs_tx = self
372 .obs_tx
373 .as_ref()
374 .ok_or_else(|| ObsError::ExecutionFailed {
375 reason: "world is shut down".into(),
376 })?;
377
378 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
379 let n_agents = agent_centers.len();
380 let per_agent_output = if n_agents > 0 {
381 output.len() / n_agents
382 } else {
383 0
384 };
385 let per_agent_mask = if n_agents > 0 {
386 mask.len() / n_agents
387 } else {
388 0
389 };
390
391 let task = ObsTask::Agents {
392 plan: Arc::clone(plan),
393 space: Arc::clone(space),
394 agent_centers: agent_centers.to_vec(),
395 output_len: per_agent_output,
396 mask_len: per_agent_mask,
397 reply: reply_tx,
398 };
399
400 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
401 reason: "egress pool shut down".into(),
402 })?;
403
404 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
405 reason: "worker disconnected".into(),
406 })?;
407
408 match result {
409 ObsResult::Agents {
410 metadata,
411 output: buf,
412 mask: mbuf,
413 } => {
414 if buf.len() > output.len() || mbuf.len() > mask.len() {
415 return Err(ObsError::ExecutionFailed {
416 reason: format!(
417 "output buffer too small: need ({}, {}) got ({}, {})",
418 buf.len(),
419 mbuf.len(),
420 output.len(),
421 mask.len()
422 ),
423 });
424 }
425 output[..buf.len()].copy_from_slice(&buf);
426 mask[..mbuf.len()].copy_from_slice(&mbuf);
427 Ok(metadata)
428 }
429 ObsResult::Error(e) => Err(e),
430 _ => Err(ObsError::ExecutionFailed {
431 reason: "unexpected result type".into(),
432 }),
433 }
434 }
435
436 fn spawn_egress_workers(
438 worker_count: usize,
439 obs_rx: &crossbeam_channel::Receiver<ObsTask>,
440 ring: &Arc<SnapshotRing>,
441 epoch_counter: &Arc<EpochCounter>,
442 worker_epochs: &Arc<[WorkerEpoch]>,
443 ) -> Vec<JoinHandle<()>> {
444 let mut worker_threads = Vec::with_capacity(worker_count);
445 for i in 0..worker_count {
446 let obs_rx = obs_rx.clone();
447 let ring = Arc::clone(ring);
448 let epoch = Arc::clone(epoch_counter);
449 let worker_epochs_ref = Arc::clone(worker_epochs);
450 let handle = thread::Builder::new()
451 .name(format!("murk-egress-{i}"))
452 .spawn(move || {
453 crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
454 })
455 .expect("failed to spawn egress worker");
456 worker_threads.push(handle);
457 }
458 worker_threads
459 }
460
461 pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
463 self.ring.latest()
464 }
465
466 pub fn current_epoch(&self) -> u64 {
468 self.epoch_counter.current()
469 }
470
471 pub fn preflight(&self) -> RealtimePreflight {
476 let (command_queue_depth, command_queue_capacity) = self
477 .cmd_tx
478 .as_ref()
479 .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
480 .unwrap_or((0, 0));
481 let (observe_queue_depth, observe_queue_capacity) = self
482 .obs_tx
483 .as_ref()
484 .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
485 .unwrap_or((0, 0));
486 let ring = crate::egress::ring_preflight(&self.ring, &self.epoch_counter);
487
488 RealtimePreflight {
489 command_queue_depth,
490 command_queue_capacity,
491 observe_queue_depth,
492 observe_queue_capacity,
493 has_snapshot: ring.has_snapshot,
494 latest_snapshot_tick_id: ring.latest_tick_id,
495 snapshot_age_ticks: ring.age_ticks,
496 ring_capacity: ring.ring_capacity,
497 ring_len: ring.ring_len,
498 ring_write_pos: ring.ring_write_pos,
499 ring_oldest_retained_pos: ring.ring_oldest_retained_pos,
500 ring_eviction_events: ring.ring_eviction_events,
501 ring_stale_read_events: ring.ring_stale_read_events,
502 ring_skew_retry_events: ring.ring_skew_retry_events,
503 tick_thread_stopped: self.tick_stopped.load(Ordering::Acquire),
504 }
505 }
506
507 pub fn shutdown(&mut self) -> ShutdownReport {
514 if self.state == ShutdownState::Dropped {
515 return ShutdownReport {
516 total_ms: 0,
517 drain_ms: 0,
518 quiesce_ms: 0,
519 tick_joined: true,
520 workers_joined: 0,
521 };
522 }
523
524 let start = Instant::now();
525
526 self.state = ShutdownState::Draining;
528 self.shutdown_flag.store(true, Ordering::Release);
529
530 if let Some(handle) = &self.tick_thread {
534 handle.thread().unpark();
535 }
536
537 let drain_deadline = Instant::now() + Duration::from_millis(33);
539 while !self.tick_stopped.load(Ordering::Acquire) {
540 if Instant::now() > drain_deadline {
541 break;
542 }
543 thread::yield_now();
544 }
545 let drain_ms = start.elapsed().as_millis() as u64;
546
547 self.state = ShutdownState::Quiescing;
549
550 for w in self.worker_epochs.iter() {
552 w.request_cancel();
553 }
554
555 self.cmd_tx.take();
557 self.obs_tx.take();
558
559 let quiesce_deadline = Instant::now() + Duration::from_millis(200);
561 loop {
562 let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
563 if all_unpinned || Instant::now() > quiesce_deadline {
564 break;
565 }
566 thread::yield_now();
567 }
568 let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
569
570 self.state = ShutdownState::Dropped;
572
573 let tick_joined = if let Some(handle) = self.tick_thread.take() {
574 match handle.join() {
575 Ok(engine) => {
576 *self.recovered_engine.lock().unwrap() = Some(engine);
577 true
578 }
579 Err(_) => false,
580 }
581 } else {
582 true
583 };
584
585 let mut workers_joined = 0;
586 for handle in self.worker_threads.drain(..) {
587 if handle.join().is_ok() {
588 workers_joined += 1;
589 }
590 }
591
592 let total_ms = start.elapsed().as_millis() as u64;
593 ShutdownReport {
594 total_ms,
595 drain_ms,
596 quiesce_ms,
597 tick_joined,
598 workers_joined,
599 }
600 }
601
602 pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
608 if self.state != ShutdownState::Dropped {
610 self.shutdown();
611 }
612
613 self.seed = seed;
614
615 let mut engine = self
617 .recovered_engine
618 .lock()
619 .unwrap()
620 .take()
621 .ok_or(ConfigError::EngineRecoveryFailed)?;
622
623 if let Err(e) = engine.reset() {
626 *self.recovered_engine.lock().unwrap() = Some(engine);
627 return Err(e);
628 }
629
630 let worker_count = self.config.resolved_worker_count();
632 self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
633 self.epoch_counter = Arc::new(EpochCounter::new());
634 self.worker_epochs = (0..worker_count as u32)
635 .map(WorkerEpoch::new)
636 .collect::<Vec<_>>()
637 .into();
638 self.shutdown_flag = Arc::new(AtomicBool::new(false));
639 self.tick_stopped = Arc::new(AtomicBool::new(false));
640
641 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
643 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
644 self.cmd_tx = Some(cmd_tx);
645 self.obs_tx = Some(obs_tx);
646
647 let tick_ring = Arc::clone(&self.ring);
649 let tick_epoch = Arc::clone(&self.epoch_counter);
650 let tick_workers = Arc::clone(&self.worker_epochs);
651 let tick_shutdown = Arc::clone(&self.shutdown_flag);
652 let tick_stopped_flag = Arc::clone(&self.tick_stopped);
653 let tick_rate_hz = self.tick_rate_hz;
654 let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
655 let cancel_grace_ms = self.config.cancel_grace_ms;
656 let backoff_config = self.backoff_config.clone();
657 self.tick_thread = Some(
658 thread::Builder::new()
659 .name("murk-tick".into())
660 .spawn(move || {
661 let state = TickThreadState::new(
662 engine,
663 tick_ring,
664 tick_epoch,
665 tick_workers,
666 cmd_rx,
667 tick_shutdown,
668 tick_stopped_flag,
669 tick_rate_hz,
670 max_epoch_hold_ms,
671 cancel_grace_ms,
672 &backoff_config,
673 );
674 state.run()
675 })
676 .expect("failed to spawn tick thread"),
677 );
678
679 self.worker_threads = Self::spawn_egress_workers(
681 worker_count,
682 &obs_rx,
683 &self.ring,
684 &self.epoch_counter,
685 &self.worker_epochs,
686 );
687
688 self.state = ShutdownState::Running;
689 Ok(())
690 }
691
692 pub fn space(&self) -> &dyn Space {
694 self.space.as_ref()
695 }
696}
697
698impl Drop for RealtimeAsyncWorld {
699 fn drop(&mut self) {
700 if self.state != ShutdownState::Dropped {
701 self.shutdown();
702 }
703 }
704}
705
706struct ArcSpaceWrapper(Arc<dyn Space>);
713
714impl murk_space::Space for ArcSpaceWrapper {
715 fn ndim(&self) -> usize {
716 self.0.ndim()
717 }
718
719 fn cell_count(&self) -> usize {
720 self.0.cell_count()
721 }
722
723 fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
724 self.0.neighbours(coord)
725 }
726
727 fn distance(&self, a: &Coord, b: &Coord) -> f64 {
728 self.0.distance(a, b)
729 }
730
731 fn compile_region(
732 &self,
733 spec: &murk_space::RegionSpec,
734 ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
735 self.0.compile_region(spec)
736 }
737
738 fn canonical_ordering(&self) -> Vec<Coord> {
739 self.0.canonical_ordering()
740 }
741
742 fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
743 self.0.canonical_rank(coord)
744 }
745
746 fn instance_id(&self) -> murk_core::SpaceInstanceId {
747 self.0.instance_id()
748 }
749
750 fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
751 if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
754 self.0.topology_eq(&*w.0)
755 } else {
756 self.0.topology_eq(other)
757 }
758 }
759}
760
761impl std::fmt::Debug for ArcSpaceWrapper {
763 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
764 f.debug_tuple("ArcSpaceWrapper").finish()
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use murk_core::id::FieldId;
772 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
773 use murk_obs::spec::ObsRegion;
774 use murk_obs::{ObsEntry, ObsSpec};
775 use murk_space::{EdgeBehavior, Line1D};
776 use murk_test_utils::ConstPropagator;
777
778 fn scalar_field(name: &str) -> FieldDef {
779 FieldDef {
780 name: name.to_string(),
781 field_type: FieldType::Scalar,
782 mutability: FieldMutability::PerTick,
783 units: None,
784 bounds: None,
785 boundary_behavior: BoundaryBehavior::Clamp,
786 }
787 }
788
789 fn test_config() -> WorldConfig {
790 WorldConfig {
791 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
792 fields: vec![scalar_field("energy")],
793 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
794 dt: 0.1,
795 seed: 42,
796 ring_buffer_size: 8,
797 max_ingress_queue: 1024,
798 tick_rate_hz: Some(60.0),
799 backoff: crate::config::BackoffConfig::default(),
800 }
801 }
802
803 #[test]
804 fn lifecycle_start_and_shutdown() {
805 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
806
807 let deadline = Instant::now() + Duration::from_secs(2);
809 while world.latest_snapshot().is_none() {
810 if Instant::now() > deadline {
811 panic!("no snapshot produced within 2s");
812 }
813 std::thread::sleep(Duration::from_millis(10));
814 }
815
816 let epoch = world.current_epoch();
817 assert!(epoch > 0, "epoch should have advanced");
818
819 let report = world.shutdown();
820 assert!(report.tick_joined);
821 assert!(report.workers_joined > 0);
822 }
823
824 #[test]
825 fn observe_returns_data() {
826 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
827
828 let deadline = Instant::now() + Duration::from_secs(2);
830 while world.latest_snapshot().is_none() {
831 if Instant::now() > deadline {
832 panic!("no snapshot produced within 2s");
833 }
834 std::thread::sleep(Duration::from_millis(10));
835 }
836
837 let space = world.space();
839 let spec = ObsSpec {
840 entries: vec![ObsEntry {
841 field_id: FieldId(0),
842 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
843 pool: None,
844 transform: murk_obs::spec::ObsTransform::Identity,
845 dtype: murk_obs::spec::ObsDtype::F32,
846 }],
847 };
848 let plan_result = ObsPlan::compile(&spec, space).unwrap();
849 let plan = Arc::new(plan_result.plan);
850
851 let mut output = vec![0.0f32; plan_result.output_len];
852 let mut mask = vec![0u8; plan_result.mask_len];
853
854 let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
855 assert!(meta.tick_id.0 > 0);
856 assert_eq!(output.len(), 10);
857 assert!(output.iter().all(|&v| v == 42.0));
858
859 world.shutdown();
860 }
861
862 #[test]
863 fn concurrent_observe() {
864 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
865
866 let deadline = Instant::now() + Duration::from_secs(2);
868 while world.latest_snapshot().is_none() {
869 if Instant::now() > deadline {
870 panic!("no snapshot produced within 2s");
871 }
872 std::thread::sleep(Duration::from_millis(10));
873 }
874
875 let space = world.space();
876 let spec = ObsSpec {
877 entries: vec![ObsEntry {
878 field_id: FieldId(0),
879 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
880 pool: None,
881 transform: murk_obs::spec::ObsTransform::Identity,
882 dtype: murk_obs::spec::ObsDtype::F32,
883 }],
884 };
885 let plan_result = ObsPlan::compile(&spec, space).unwrap();
886 let plan = Arc::new(plan_result.plan);
887
888 let world = Arc::new(world);
890 let handles: Vec<_> = (0..4)
891 .map(|_| {
892 let w = Arc::clone(&world);
893 let p = Arc::clone(&plan);
894 let out_len = plan_result.output_len;
895 let mask_len = plan_result.mask_len;
896 std::thread::spawn(move || {
897 let mut output = vec![0.0f32; out_len];
898 let mut mask = vec![0u8; mask_len];
899 let meta = w.observe(&p, &mut output, &mut mask).unwrap();
900 assert!(meta.tick_id.0 > 0);
901 assert!(output.iter().all(|&v| v == 42.0));
902 })
903 })
904 .collect();
905
906 for h in handles {
907 h.join().unwrap();
908 }
909
910 drop(world);
913 }
914
915 #[test]
916 fn submit_commands_flow_through() {
917 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
918
919 let deadline = Instant::now() + Duration::from_secs(2);
921 while world.latest_snapshot().is_none() {
922 if Instant::now() > deadline {
923 panic!("no snapshot produced within 2s");
924 }
925 std::thread::sleep(Duration::from_millis(10));
926 }
927
928 let cmd = Command {
930 payload: murk_core::command::CommandPayload::SetParameter {
931 key: murk_core::id::ParameterKey(0),
932 value: 1.0,
933 },
934 expires_after_tick: murk_core::id::TickId(10000),
935 source_id: None,
936 source_seq: None,
937 priority_class: 1,
938 arrival_seq: 0,
939 };
940 let receipts = world.submit_commands(vec![cmd]).unwrap();
941 assert_eq!(receipts.len(), 1);
942 assert!(receipts[0].accepted);
943
944 world.shutdown();
945 }
946
947 #[test]
948 fn drop_triggers_shutdown() {
949 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
950 std::thread::sleep(Duration::from_millis(50));
951 drop(world);
952 }
954
955 #[test]
956 fn shutdown_budget() {
957 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
958 std::thread::sleep(Duration::from_millis(100));
959
960 let report = world.shutdown();
961 assert!(
963 report.total_ms < 2000,
964 "shutdown took too long: {}ms",
965 report.total_ms
966 );
967 }
968
969 #[test]
974 fn shutdown_fast_with_slow_tick_rate() {
975 let config = WorldConfig {
976 tick_rate_hz: Some(0.5), ..test_config()
978 };
979 let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
980
981 let deadline = Instant::now() + Duration::from_secs(5);
984 while world.latest_snapshot().is_none() {
985 if Instant::now() > deadline {
986 panic!("no snapshot produced within 5s");
987 }
988 std::thread::sleep(Duration::from_millis(10));
989 }
990
991 std::thread::sleep(Duration::from_millis(50));
993
994 let start = Instant::now();
995 let report = world.shutdown();
996 let wall_ms = start.elapsed().as_millis();
997
998 assert!(
1001 wall_ms < 500,
1002 "shutdown took {wall_ms}ms with 0.5Hz tick rate \
1003 (report: total={}ms, drain={}ms, quiesce={}ms)",
1004 report.total_ms,
1005 report.drain_ms,
1006 report.quiesce_ms
1007 );
1008 assert!(report.tick_joined);
1009 }
1010
1011 #[test]
1012 fn preflight_reports_queue_capacities_and_snapshot_readiness() {
1013 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1014
1015 let initial = world.preflight();
1016 assert_eq!(initial.command_queue_capacity, 64);
1017 assert!(initial.observe_queue_capacity > 0);
1018 assert_eq!(initial.ring_capacity, 8);
1019 assert!(initial.ring_len <= initial.ring_capacity);
1020 assert!(initial.ring_write_pos >= initial.ring_len as u64);
1021 if initial.has_snapshot {
1022 assert!(initial.ring_len > 0);
1023 assert!(initial.latest_snapshot_tick_id > 0);
1024 assert!(initial.ring_oldest_retained_pos.is_some());
1025 } else {
1026 assert_eq!(initial.ring_len, 0);
1027 assert_eq!(initial.ring_write_pos, 0);
1028 assert_eq!(initial.ring_oldest_retained_pos, None);
1029 }
1030 assert!(!initial.tick_thread_stopped);
1031
1032 let deadline = Instant::now() + Duration::from_secs(2);
1033 let mut ready = initial;
1034 while !ready.has_snapshot {
1035 if Instant::now() > deadline {
1036 panic!("preflight never reported available snapshot");
1037 }
1038 std::thread::sleep(Duration::from_millis(10));
1039 ready = world.preflight();
1040 }
1041 assert!(ready.latest_snapshot_tick_id > 0);
1042 assert!(ready.ring_len > 0);
1043 assert!(ready.ring_write_pos >= ready.ring_len as u64);
1044 assert!(ready.ring_oldest_retained_pos.is_some());
1045
1046 world.shutdown();
1047 let stopped = world.preflight();
1048 assert_eq!(stopped.command_queue_capacity, 0);
1049 assert_eq!(stopped.observe_queue_capacity, 0);
1050 assert!(stopped.tick_thread_stopped);
1051 }
1052
1053 #[test]
1054 fn preflight_observes_ingress_backlog() {
1055 let config = WorldConfig {
1056 tick_rate_hz: Some(0.5),
1057 ..test_config()
1058 };
1059 let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1060
1061 let ready_deadline = Instant::now() + Duration::from_secs(5);
1065 while world.latest_snapshot().is_none() {
1066 if Instant::now() > ready_deadline {
1067 panic!("no snapshot produced within 5s");
1068 }
1069 std::thread::sleep(Duration::from_millis(10));
1070 }
1071
1072 let cmd_tx = world
1073 .cmd_tx
1074 .as_ref()
1075 .expect("cmd channel must exist")
1076 .clone();
1077
1078 for _ in 0..8 {
1079 let (reply_tx, _reply_rx) = crossbeam_channel::bounded(1);
1080 cmd_tx
1081 .try_send(IngressBatch {
1082 commands: Vec::new(),
1083 reply: reply_tx,
1084 })
1085 .expect("expected ingress queue to accept batch");
1086 }
1087
1088 let deadline = Instant::now() + Duration::from_millis(500);
1089 let mut preflight = world.preflight();
1090 while preflight.command_queue_depth == 0 {
1091 if Instant::now() > deadline {
1092 break;
1093 }
1094 std::thread::sleep(Duration::from_millis(5));
1095 preflight = world.preflight();
1096 }
1097 assert!(preflight.command_queue_depth > 0);
1098 assert!(preflight.command_queue_depth <= preflight.command_queue_capacity);
1099
1100 world.shutdown();
1101 }
1102
1103 #[test]
1104 fn reset_lifecycle() {
1105 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1106
1107 let deadline = Instant::now() + Duration::from_secs(5);
1109 while world.current_epoch() < 5 {
1110 if Instant::now() > deadline {
1111 panic!("epoch didn't reach 5 within 5s");
1112 }
1113 std::thread::sleep(Duration::from_millis(10));
1114 }
1115 let epoch_before = world.current_epoch();
1116 assert!(epoch_before >= 5);
1117
1118 world.reset(99).unwrap();
1120
1121 assert_eq!(world.current_epoch(), 0);
1123
1124 let deadline = Instant::now() + Duration::from_secs(2);
1126 while world.latest_snapshot().is_none() {
1127 if Instant::now() > deadline {
1128 panic!("no snapshot after reset within 2s");
1129 }
1130 std::thread::sleep(Duration::from_millis(10));
1131 }
1132 assert!(world.current_epoch() > 0, "should be ticking after reset");
1133
1134 world.shutdown();
1135 }
1136
1137 #[test]
1138 fn arc_space_wrapper_topology_eq() {
1139 let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1143 let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1144 assert!(
1145 a.topology_eq(&b),
1146 "identical Line1D through ArcSpaceWrapper should be topology-equal"
1147 );
1148
1149 let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
1151 assert!(
1152 !a.topology_eq(&c),
1153 "different Line1D sizes should not be topology-equal"
1154 );
1155
1156 let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
1158 assert!(
1159 a.topology_eq(&bare),
1160 "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
1161 );
1162 }
1163}