1use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53 Shutdown,
55 ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Shutdown => write!(f, "tick thread has shut down"),
63 Self::ChannelFull => write!(f, "command channel full"),
64 }
65 }
66}
67
68impl std::error::Error for SubmitError {}
69
70#[derive(Debug)]
74pub struct ShutdownReport {
75 pub total_ms: u64,
77 pub drain_ms: u64,
79 pub quiesce_ms: u64,
81 pub tick_joined: bool,
83 pub workers_joined: usize,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct RealtimePreflight {
90 pub command_queue_depth: usize,
92 pub command_queue_capacity: usize,
94 pub observe_queue_depth: usize,
96 pub observe_queue_capacity: usize,
98 pub has_snapshot: bool,
100 pub latest_snapshot_tick_id: u64,
102 pub snapshot_age_ticks: u64,
104 pub ring_capacity: usize,
106 pub ring_len: usize,
108 pub ring_write_pos: u64,
110 pub ring_oldest_retained_pos: Option<u64>,
112 pub ring_eviction_events: u64,
114 pub ring_stale_read_events: u64,
116 pub ring_skew_retry_events: u64,
118 pub tick_thread_stopped: bool,
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum ShutdownState {
126 Running,
127 Draining,
128 Quiescing,
129 Dropped,
130}
131
132pub struct RealtimeAsyncWorld {
140 ring: Arc<SnapshotRing>,
141 epoch_counter: Arc<EpochCounter>,
142 worker_epochs: Arc<[WorkerEpoch]>,
143 cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
144 obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
145 shutdown_flag: Arc<AtomicBool>,
146 tick_stopped: Arc<AtomicBool>,
147 tick_thread: Option<JoinHandle<TickEngine>>,
148 worker_threads: Vec<JoinHandle<()>>,
149 state: ShutdownState,
150 recovered_engine: Mutex<Option<TickEngine>>,
155 config: AsyncConfig,
156 backoff_config: BackoffConfig,
157 seed: u64,
158 tick_rate_hz: f64,
159 space: Arc<dyn Space>,
161}
162
163impl RealtimeAsyncWorld {
164 pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
170 let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
171 if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 || !(1.0 / tick_rate_hz).is_finite() {
172 return Err(ConfigError::InvalidTickRate {
173 value: tick_rate_hz,
174 });
175 }
176
177 let seed = config.seed;
178 let ring_size = config.ring_buffer_size;
179 let backoff_config = config.backoff.clone();
180 let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
181 let cancel_grace_ms = async_config.cancel_grace_ms;
182
183 let space: Arc<dyn Space> = Arc::from(config.space);
185
186 let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
190 let engine_config = WorldConfig {
191 space: engine_space,
192 fields: config.fields,
193 propagators: config.propagators,
194 dt: config.dt,
195 seed: config.seed,
196 ring_buffer_size: config.ring_buffer_size,
197 max_ingress_queue: config.max_ingress_queue,
198 tick_rate_hz: config.tick_rate_hz,
199 backoff: backoff_config.clone(),
200 };
201
202 let engine = TickEngine::new(engine_config)?;
203
204 let worker_count = async_config.resolved_worker_count();
205 let ring = Arc::new(SnapshotRing::new(ring_size));
206 let epoch_counter = Arc::new(EpochCounter::new());
207 let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
208 .map(WorkerEpoch::new)
209 .collect::<Vec<_>>()
210 .into();
211
212 let shutdown_flag = Arc::new(AtomicBool::new(false));
213 let tick_stopped = Arc::new(AtomicBool::new(false));
214
215 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
217
218 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
220
221 let tick_ring = Arc::clone(&ring);
223 let tick_epoch = Arc::clone(&epoch_counter);
224 let tick_workers = Arc::clone(&worker_epochs);
225 let tick_shutdown = Arc::clone(&shutdown_flag);
226 let tick_stopped_flag = Arc::clone(&tick_stopped);
227 let stored_backoff = backoff_config.clone();
228 let tick_thread = thread::Builder::new()
229 .name("murk-tick".into())
230 .spawn(move || {
231 let state = TickThreadState::new(
232 engine,
233 tick_ring,
234 tick_epoch,
235 tick_workers,
236 cmd_rx,
237 tick_shutdown,
238 tick_stopped_flag,
239 tick_rate_hz,
240 max_epoch_hold_ms,
241 cancel_grace_ms,
242 &backoff_config,
243 );
244 state.run()
245 })
246 .map_err(|e| ConfigError::ThreadSpawnFailed {
247 reason: format!("tick thread: {e}"),
248 })?;
249
250 let (obs_tx, worker_threads) = match Self::spawn_egress_workers(
253 worker_count,
254 obs_tx,
255 &obs_rx,
256 &ring,
257 &epoch_counter,
258 &worker_epochs,
259 ) {
260 Ok(result) => result,
261 Err(e) => {
262 shutdown_flag.store(true, Ordering::Release);
263 tick_thread.thread().unpark();
264 let _ = tick_thread.join();
265 return Err(e);
266 }
267 };
268
269 Ok(Self {
270 ring,
271 epoch_counter,
272 worker_epochs,
273 cmd_tx: Some(cmd_tx),
274 obs_tx: Some(obs_tx),
275 shutdown_flag,
276 tick_stopped,
277 tick_thread: Some(tick_thread),
278 worker_threads,
279 state: ShutdownState::Running,
280 recovered_engine: Mutex::new(None),
281 config: async_config,
282 backoff_config: stored_backoff,
283 seed,
284 tick_rate_hz,
285 space,
286 })
287 }
288
289 pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
294 let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
295
296 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
297 let batch = IngressBatch {
298 commands,
299 reply: reply_tx,
300 };
301
302 cmd_tx.try_send(batch).map_err(|e| match e {
303 crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
304 crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
305 })?;
306
307 reply_rx.recv().map_err(|_| SubmitError::Shutdown)
309 }
310
311 pub fn observe(
316 &self,
317 plan: &Arc<ObsPlan>,
318 output: &mut [f32],
319 mask: &mut [u8],
320 ) -> Result<ObsMetadata, ObsError> {
321 let obs_tx = self
322 .obs_tx
323 .as_ref()
324 .ok_or_else(|| ObsError::ExecutionFailed {
325 reason: "world is shut down".into(),
326 })?;
327
328 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
329 let task = ObsTask::Simple {
330 plan: Arc::clone(plan),
331 output_len: output.len(),
332 mask_len: mask.len(),
333 reply: reply_tx,
334 };
335
336 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
337 reason: "egress pool shut down".into(),
338 })?;
339
340 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
341 reason: "worker disconnected".into(),
342 })?;
343
344 match result {
345 ObsResult::Simple {
346 metadata,
347 output: buf,
348 mask: mbuf,
349 } => {
350 if buf.len() > output.len() || mbuf.len() > mask.len() {
351 return Err(ObsError::ExecutionFailed {
352 reason: format!(
353 "output buffer too small: need ({}, {}) got ({}, {})",
354 buf.len(),
355 mbuf.len(),
356 output.len(),
357 mask.len()
358 ),
359 });
360 }
361 output[..buf.len()].copy_from_slice(&buf);
362 mask[..mbuf.len()].copy_from_slice(&mbuf);
363 Ok(metadata)
364 }
365 ObsResult::Error(e) => Err(e),
366 _ => Err(ObsError::ExecutionFailed {
367 reason: "unexpected result type".into(),
368 }),
369 }
370 }
371
372 pub fn observe_agents(
376 &self,
377 plan: &Arc<ObsPlan>,
378 space: &Arc<dyn Space>,
379 agent_centers: &[Coord],
380 output: &mut [f32],
381 mask: &mut [u8],
382 ) -> Result<Vec<ObsMetadata>, ObsError> {
383 let obs_tx = self
384 .obs_tx
385 .as_ref()
386 .ok_or_else(|| ObsError::ExecutionFailed {
387 reason: "world is shut down".into(),
388 })?;
389
390 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
391 let n_agents = agent_centers.len();
392 let per_agent_output = if n_agents > 0 {
393 output.len() / n_agents
394 } else {
395 0
396 };
397 let per_agent_mask = if n_agents > 0 {
398 mask.len() / n_agents
399 } else {
400 0
401 };
402
403 let task = ObsTask::Agents {
404 plan: Arc::clone(plan),
405 space: Arc::clone(space),
406 agent_centers: agent_centers.to_vec(),
407 output_len: per_agent_output,
408 mask_len: per_agent_mask,
409 reply: reply_tx,
410 };
411
412 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
413 reason: "egress pool shut down".into(),
414 })?;
415
416 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
417 reason: "worker disconnected".into(),
418 })?;
419
420 match result {
421 ObsResult::Agents {
422 metadata,
423 output: buf,
424 mask: mbuf,
425 } => {
426 if buf.len() > output.len() || mbuf.len() > mask.len() {
427 return Err(ObsError::ExecutionFailed {
428 reason: format!(
429 "output buffer too small: need ({}, {}) got ({}, {})",
430 buf.len(),
431 mbuf.len(),
432 output.len(),
433 mask.len()
434 ),
435 });
436 }
437 output[..buf.len()].copy_from_slice(&buf);
438 mask[..mbuf.len()].copy_from_slice(&mbuf);
439 Ok(metadata)
440 }
441 ObsResult::Error(e) => Err(e),
442 _ => Err(ObsError::ExecutionFailed {
443 reason: "unexpected result type".into(),
444 }),
445 }
446 }
447
448 fn spawn_egress_workers(
456 worker_count: usize,
457 obs_tx: crossbeam_channel::Sender<ObsTask>,
458 obs_rx: &crossbeam_channel::Receiver<ObsTask>,
459 ring: &Arc<SnapshotRing>,
460 epoch_counter: &Arc<EpochCounter>,
461 worker_epochs: &Arc<[WorkerEpoch]>,
462 ) -> Result<(crossbeam_channel::Sender<ObsTask>, Vec<JoinHandle<()>>), ConfigError> {
463 let mut worker_threads = Vec::with_capacity(worker_count);
464 for i in 0..worker_count {
465 let obs_rx = obs_rx.clone();
466 let ring = Arc::clone(ring);
467 let epoch = Arc::clone(epoch_counter);
468 let worker_epochs_ref = Arc::clone(worker_epochs);
469 match thread::Builder::new()
470 .name(format!("murk-egress-{i}"))
471 .spawn(move || {
472 crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
473 }) {
474 Ok(handle) => worker_threads.push(handle),
475 Err(e) => {
476 drop(obs_tx);
479 for handle in worker_threads {
480 let _ = handle.join();
481 }
482 return Err(ConfigError::ThreadSpawnFailed {
483 reason: format!("egress worker {i}: {e}"),
484 });
485 }
486 }
487 }
488 Ok((obs_tx, worker_threads))
489 }
490
491 pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
493 self.ring.latest()
494 }
495
496 pub fn current_epoch(&self) -> u64 {
498 self.epoch_counter.current()
499 }
500
501 pub fn preflight(&self) -> RealtimePreflight {
506 let (command_queue_depth, command_queue_capacity) = self
507 .cmd_tx
508 .as_ref()
509 .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
510 .unwrap_or((0, 0));
511 let (observe_queue_depth, observe_queue_capacity) = self
512 .obs_tx
513 .as_ref()
514 .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
515 .unwrap_or((0, 0));
516 let ring = crate::egress::ring_preflight(&self.ring, &self.epoch_counter);
517
518 RealtimePreflight {
519 command_queue_depth,
520 command_queue_capacity,
521 observe_queue_depth,
522 observe_queue_capacity,
523 has_snapshot: ring.has_snapshot,
524 latest_snapshot_tick_id: ring.latest_tick_id,
525 snapshot_age_ticks: ring.age_ticks,
526 ring_capacity: ring.ring_capacity,
527 ring_len: ring.ring_len,
528 ring_write_pos: ring.ring_write_pos,
529 ring_oldest_retained_pos: ring.ring_oldest_retained_pos,
530 ring_eviction_events: ring.ring_eviction_events,
531 ring_stale_read_events: ring.ring_stale_read_events,
532 ring_skew_retry_events: ring.ring_skew_retry_events,
533 tick_thread_stopped: self.tick_stopped.load(Ordering::Acquire),
534 }
535 }
536
537 pub fn shutdown(&mut self) -> ShutdownReport {
544 if self.state == ShutdownState::Dropped {
545 return ShutdownReport {
546 total_ms: 0,
547 drain_ms: 0,
548 quiesce_ms: 0,
549 tick_joined: true,
550 workers_joined: 0,
551 };
552 }
553
554 let start = Instant::now();
555
556 self.state = ShutdownState::Draining;
558 self.shutdown_flag.store(true, Ordering::Release);
559
560 if let Some(handle) = &self.tick_thread {
564 handle.thread().unpark();
565 }
566
567 let drain_deadline = Instant::now() + Duration::from_millis(33);
569 while !self.tick_stopped.load(Ordering::Acquire) {
570 if Instant::now() > drain_deadline {
571 break;
572 }
573 thread::yield_now();
574 }
575 let drain_ms = start.elapsed().as_millis() as u64;
576
577 self.state = ShutdownState::Quiescing;
579
580 for w in self.worker_epochs.iter() {
582 w.request_cancel();
583 }
584
585 self.cmd_tx.take();
587 self.obs_tx.take();
588
589 let quiesce_deadline = Instant::now() + Duration::from_millis(200);
591 loop {
592 let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
593 if all_unpinned || Instant::now() > quiesce_deadline {
594 break;
595 }
596 thread::yield_now();
597 }
598 let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
599
600 self.state = ShutdownState::Dropped;
602
603 let tick_joined = if let Some(handle) = self.tick_thread.take() {
604 match handle.join() {
605 Ok(engine) => {
606 *self.recovered_engine.lock().unwrap() = Some(engine);
607 true
608 }
609 Err(_) => false,
610 }
611 } else {
612 true
613 };
614
615 let mut workers_joined = 0;
616 for handle in self.worker_threads.drain(..) {
617 if handle.join().is_ok() {
618 workers_joined += 1;
619 }
620 }
621
622 let total_ms = start.elapsed().as_millis() as u64;
623 ShutdownReport {
624 total_ms,
625 drain_ms,
626 quiesce_ms,
627 tick_joined,
628 workers_joined,
629 }
630 }
631
632 pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
638 if self.state != ShutdownState::Dropped {
640 self.shutdown();
641 }
642
643 self.seed = seed;
644
645 let mut engine = self
647 .recovered_engine
648 .lock()
649 .unwrap()
650 .take()
651 .ok_or(ConfigError::EngineRecoveryFailed)?;
652
653 if let Err(e) = engine.reset() {
656 *self.recovered_engine.lock().unwrap() = Some(engine);
657 return Err(e);
658 }
659
660 let worker_count = self.config.resolved_worker_count();
662 self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
663 self.epoch_counter = Arc::new(EpochCounter::new());
664 self.worker_epochs = (0..worker_count as u32)
665 .map(WorkerEpoch::new)
666 .collect::<Vec<_>>()
667 .into();
668 self.shutdown_flag = Arc::new(AtomicBool::new(false));
669 self.tick_stopped = Arc::new(AtomicBool::new(false));
670
671 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
673 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
674 self.cmd_tx = Some(cmd_tx);
675
676 let (engine_tx, engine_rx) = crossbeam_channel::bounded(0);
681 let tick_ring = Arc::clone(&self.ring);
682 let tick_epoch = Arc::clone(&self.epoch_counter);
683 let tick_workers = Arc::clone(&self.worker_epochs);
684 let tick_shutdown = Arc::clone(&self.shutdown_flag);
685 let tick_stopped_flag = Arc::clone(&self.tick_stopped);
686 let tick_rate_hz = self.tick_rate_hz;
687 let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
688 let cancel_grace_ms = self.config.cancel_grace_ms;
689 let backoff_config = self.backoff_config.clone();
690 let tick_thread = match thread::Builder::new()
691 .name("murk-tick".into())
692 .spawn(move || {
693 let engine = engine_rx.recv().expect("engine channel closed before send");
694 let state = TickThreadState::new(
695 engine,
696 tick_ring,
697 tick_epoch,
698 tick_workers,
699 cmd_rx,
700 tick_shutdown,
701 tick_stopped_flag,
702 tick_rate_hz,
703 max_epoch_hold_ms,
704 cancel_grace_ms,
705 &backoff_config,
706 );
707 state.run()
708 }) {
709 Ok(handle) => handle,
710 Err(e) => {
711 *self.recovered_engine.lock().unwrap() = Some(engine);
713 return Err(ConfigError::ThreadSpawnFailed {
714 reason: format!("tick thread: {e}"),
715 });
716 }
717 };
718
719 engine_tx
721 .send(engine)
722 .expect("tick thread died before receiving engine");
723
724 let (obs_tx, worker_threads) = match Self::spawn_egress_workers(
727 worker_count,
728 obs_tx,
729 &obs_rx,
730 &self.ring,
731 &self.epoch_counter,
732 &self.worker_epochs,
733 ) {
734 Ok(result) => result,
735 Err(e) => {
736 self.shutdown_flag.store(true, Ordering::Release);
737 tick_thread.thread().unpark();
738 if let Ok(engine) = tick_thread.join() {
739 *self.recovered_engine.lock().unwrap() = Some(engine);
740 }
741 return Err(e);
742 }
743 };
744
745 self.obs_tx = Some(obs_tx);
746 self.tick_thread = Some(tick_thread);
747 self.worker_threads = worker_threads;
748
749 self.state = ShutdownState::Running;
750 Ok(())
751 }
752
753 pub fn space(&self) -> &dyn Space {
755 self.space.as_ref()
756 }
757}
758
759impl Drop for RealtimeAsyncWorld {
760 fn drop(&mut self) {
761 if self.state != ShutdownState::Dropped {
762 self.shutdown();
763 }
764 }
765}
766
767struct ArcSpaceWrapper(Arc<dyn Space>);
774
775impl murk_space::Space for ArcSpaceWrapper {
776 fn ndim(&self) -> usize {
777 self.0.ndim()
778 }
779
780 fn cell_count(&self) -> usize {
781 self.0.cell_count()
782 }
783
784 fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
785 self.0.neighbours(coord)
786 }
787
788 fn distance(&self, a: &Coord, b: &Coord) -> f64 {
789 self.0.distance(a, b)
790 }
791
792 fn compile_region(
793 &self,
794 spec: &murk_space::RegionSpec,
795 ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
796 self.0.compile_region(spec)
797 }
798
799 fn canonical_ordering(&self) -> Vec<Coord> {
800 self.0.canonical_ordering()
801 }
802
803 fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
804 self.0.canonical_rank(coord)
805 }
806
807 fn instance_id(&self) -> murk_core::SpaceInstanceId {
808 self.0.instance_id()
809 }
810
811 fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
812 if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
815 self.0.topology_eq(&*w.0)
816 } else {
817 self.0.topology_eq(other)
818 }
819 }
820}
821
822impl std::fmt::Debug for ArcSpaceWrapper {
824 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825 f.debug_tuple("ArcSpaceWrapper").finish()
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832 use murk_core::id::FieldId;
833 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
834 use murk_obs::spec::ObsRegion;
835 use murk_obs::{ObsEntry, ObsSpec};
836 use murk_space::{EdgeBehavior, Line1D};
837 use murk_test_utils::ConstPropagator;
838
839 fn scalar_field(name: &str) -> FieldDef {
840 FieldDef {
841 name: name.to_string(),
842 field_type: FieldType::Scalar,
843 mutability: FieldMutability::PerTick,
844 units: None,
845 bounds: None,
846 boundary_behavior: BoundaryBehavior::Clamp,
847 }
848 }
849
850 fn test_config() -> WorldConfig {
851 WorldConfig {
852 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
853 fields: vec![scalar_field("energy")],
854 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
855 dt: 0.1,
856 seed: 42,
857 ring_buffer_size: 8,
858 max_ingress_queue: 1024,
859 tick_rate_hz: Some(60.0),
860 backoff: crate::config::BackoffConfig::default(),
861 }
862 }
863
864 #[test]
865 fn lifecycle_start_and_shutdown() {
866 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
867
868 let deadline = Instant::now() + Duration::from_secs(2);
870 while world.latest_snapshot().is_none() {
871 if Instant::now() > deadline {
872 panic!("no snapshot produced within 2s");
873 }
874 std::thread::sleep(Duration::from_millis(10));
875 }
876
877 let epoch = world.current_epoch();
878 assert!(epoch > 0, "epoch should have advanced");
879
880 let report = world.shutdown();
881 assert!(report.tick_joined);
882 assert!(report.workers_joined > 0);
883 }
884
885 #[test]
886 fn observe_returns_data() {
887 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
888
889 let deadline = Instant::now() + Duration::from_secs(2);
891 while world.latest_snapshot().is_none() {
892 if Instant::now() > deadline {
893 panic!("no snapshot produced within 2s");
894 }
895 std::thread::sleep(Duration::from_millis(10));
896 }
897
898 let space = world.space();
900 let spec = ObsSpec {
901 entries: vec![ObsEntry {
902 field_id: FieldId(0),
903 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
904 pool: None,
905 transform: murk_obs::spec::ObsTransform::Identity,
906 dtype: murk_obs::spec::ObsDtype::F32,
907 }],
908 };
909 let plan_result = ObsPlan::compile(&spec, space).unwrap();
910 let plan = Arc::new(plan_result.plan);
911
912 let mut output = vec![0.0f32; plan_result.output_len];
913 let mut mask = vec![0u8; plan_result.mask_len];
914
915 let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
916 assert!(meta.tick_id.0 > 0);
917 assert_eq!(output.len(), 10);
918 assert!(output.iter().all(|&v| v == 42.0));
919
920 world.shutdown();
921 }
922
923 #[test]
924 fn concurrent_observe() {
925 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
926
927 let deadline = Instant::now() + Duration::from_secs(2);
929 while world.latest_snapshot().is_none() {
930 if Instant::now() > deadline {
931 panic!("no snapshot produced within 2s");
932 }
933 std::thread::sleep(Duration::from_millis(10));
934 }
935
936 let space = world.space();
937 let spec = ObsSpec {
938 entries: vec![ObsEntry {
939 field_id: FieldId(0),
940 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
941 pool: None,
942 transform: murk_obs::spec::ObsTransform::Identity,
943 dtype: murk_obs::spec::ObsDtype::F32,
944 }],
945 };
946 let plan_result = ObsPlan::compile(&spec, space).unwrap();
947 let plan = Arc::new(plan_result.plan);
948
949 let world = Arc::new(world);
951 let handles: Vec<_> = (0..4)
952 .map(|_| {
953 let w = Arc::clone(&world);
954 let p = Arc::clone(&plan);
955 let out_len = plan_result.output_len;
956 let mask_len = plan_result.mask_len;
957 std::thread::spawn(move || {
958 let mut output = vec![0.0f32; out_len];
959 let mut mask = vec![0u8; mask_len];
960 let meta = w.observe(&p, &mut output, &mut mask).unwrap();
961 assert!(meta.tick_id.0 > 0);
962 assert!(output.iter().all(|&v| v == 42.0));
963 })
964 })
965 .collect();
966
967 for h in handles {
968 h.join().unwrap();
969 }
970
971 drop(world);
974 }
975
976 #[test]
977 fn submit_commands_flow_through() {
978 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
979
980 let deadline = Instant::now() + Duration::from_secs(2);
982 while world.latest_snapshot().is_none() {
983 if Instant::now() > deadline {
984 panic!("no snapshot produced within 2s");
985 }
986 std::thread::sleep(Duration::from_millis(10));
987 }
988
989 let cmd = Command {
991 payload: murk_core::command::CommandPayload::SetParameter {
992 key: murk_core::id::ParameterKey(0),
993 value: 1.0,
994 },
995 expires_after_tick: murk_core::id::TickId(10000),
996 source_id: None,
997 source_seq: None,
998 priority_class: 1,
999 arrival_seq: 0,
1000 };
1001 let receipts = world.submit_commands(vec![cmd]).unwrap();
1002 assert_eq!(receipts.len(), 1);
1003 assert!(receipts[0].accepted);
1004
1005 world.shutdown();
1006 }
1007
1008 #[test]
1009 fn drop_triggers_shutdown() {
1010 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1011 std::thread::sleep(Duration::from_millis(50));
1012 drop(world);
1013 }
1015
1016 #[test]
1017 fn shutdown_budget() {
1018 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1019 std::thread::sleep(Duration::from_millis(100));
1020
1021 let report = world.shutdown();
1022 assert!(
1024 report.total_ms < 2000,
1025 "shutdown took too long: {}ms",
1026 report.total_ms
1027 );
1028 }
1029
1030 #[test]
1035 fn shutdown_fast_with_slow_tick_rate() {
1036 let config = WorldConfig {
1037 tick_rate_hz: Some(0.5), ..test_config()
1039 };
1040 let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1041
1042 let deadline = Instant::now() + Duration::from_secs(5);
1045 while world.latest_snapshot().is_none() {
1046 if Instant::now() > deadline {
1047 panic!("no snapshot produced within 5s");
1048 }
1049 std::thread::sleep(Duration::from_millis(10));
1050 }
1051
1052 std::thread::sleep(Duration::from_millis(50));
1054
1055 let start = Instant::now();
1056 let report = world.shutdown();
1057 let wall_ms = start.elapsed().as_millis();
1058
1059 assert!(
1062 wall_ms < 500,
1063 "shutdown took {wall_ms}ms with 0.5Hz tick rate \
1064 (report: total={}ms, drain={}ms, quiesce={}ms)",
1065 report.total_ms,
1066 report.drain_ms,
1067 report.quiesce_ms
1068 );
1069 assert!(report.tick_joined);
1070 }
1071
1072 #[test]
1073 fn preflight_reports_queue_capacities_and_snapshot_readiness() {
1074 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1075
1076 let initial = world.preflight();
1077 assert_eq!(initial.command_queue_capacity, 64);
1078 assert!(initial.observe_queue_capacity > 0);
1079 assert_eq!(initial.ring_capacity, 8);
1080 assert!(initial.ring_len <= initial.ring_capacity);
1081 assert!(initial.ring_write_pos >= initial.ring_len as u64);
1082 if initial.has_snapshot {
1083 assert!(initial.ring_len > 0);
1084 assert!(initial.latest_snapshot_tick_id > 0);
1085 assert!(initial.ring_oldest_retained_pos.is_some());
1086 } else {
1087 assert_eq!(initial.ring_len, 0);
1088 assert_eq!(initial.ring_write_pos, 0);
1089 assert_eq!(initial.ring_oldest_retained_pos, None);
1090 }
1091 assert!(!initial.tick_thread_stopped);
1092
1093 let deadline = Instant::now() + Duration::from_secs(2);
1094 let mut ready = initial;
1095 while !ready.has_snapshot {
1096 if Instant::now() > deadline {
1097 panic!("preflight never reported available snapshot");
1098 }
1099 std::thread::sleep(Duration::from_millis(10));
1100 ready = world.preflight();
1101 }
1102 assert!(ready.latest_snapshot_tick_id > 0);
1103 assert!(ready.ring_len > 0);
1104 assert!(ready.ring_write_pos >= ready.ring_len as u64);
1105 assert!(ready.ring_oldest_retained_pos.is_some());
1106
1107 world.shutdown();
1108 let stopped = world.preflight();
1109 assert_eq!(stopped.command_queue_capacity, 0);
1110 assert_eq!(stopped.observe_queue_capacity, 0);
1111 assert!(stopped.tick_thread_stopped);
1112 }
1113
1114 #[test]
1115 fn preflight_observes_ingress_backlog() {
1116 let config = WorldConfig {
1117 tick_rate_hz: Some(0.5),
1118 ..test_config()
1119 };
1120 let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1121
1122 let ready_deadline = Instant::now() + Duration::from_secs(5);
1126 while world.latest_snapshot().is_none() {
1127 if Instant::now() > ready_deadline {
1128 panic!("no snapshot produced within 5s");
1129 }
1130 std::thread::sleep(Duration::from_millis(10));
1131 }
1132
1133 let cmd_tx = world
1134 .cmd_tx
1135 .as_ref()
1136 .expect("cmd channel must exist")
1137 .clone();
1138
1139 for _ in 0..8 {
1140 let (reply_tx, _reply_rx) = crossbeam_channel::bounded(1);
1141 cmd_tx
1142 .try_send(IngressBatch {
1143 commands: Vec::new(),
1144 reply: reply_tx,
1145 })
1146 .expect("expected ingress queue to accept batch");
1147 }
1148
1149 let deadline = Instant::now() + Duration::from_millis(500);
1150 let mut preflight = world.preflight();
1151 while preflight.command_queue_depth == 0 {
1152 if Instant::now() > deadline {
1153 break;
1154 }
1155 std::thread::sleep(Duration::from_millis(5));
1156 preflight = world.preflight();
1157 }
1158 assert!(preflight.command_queue_depth > 0);
1159 assert!(preflight.command_queue_depth <= preflight.command_queue_capacity);
1160
1161 world.shutdown();
1162 }
1163
1164 #[test]
1165 fn reset_lifecycle() {
1166 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1167
1168 let deadline = Instant::now() + Duration::from_secs(5);
1170 while world.current_epoch() < 5 {
1171 if Instant::now() > deadline {
1172 panic!("epoch didn't reach 5 within 5s");
1173 }
1174 std::thread::sleep(Duration::from_millis(10));
1175 }
1176 let epoch_before = world.current_epoch();
1177 assert!(epoch_before >= 5);
1178
1179 world.reset(99).unwrap();
1181
1182 assert_eq!(world.current_epoch(), 0);
1184
1185 let deadline = Instant::now() + Duration::from_secs(2);
1187 while world.latest_snapshot().is_none() {
1188 if Instant::now() > deadline {
1189 panic!("no snapshot after reset within 2s");
1190 }
1191 std::thread::sleep(Duration::from_millis(10));
1192 }
1193 assert!(world.current_epoch() > 0, "should be ticking after reset");
1194
1195 world.shutdown();
1196 }
1197
1198 #[test]
1199 fn arc_space_wrapper_topology_eq() {
1200 let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1204 let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1205 assert!(
1206 a.topology_eq(&b),
1207 "identical Line1D through ArcSpaceWrapper should be topology-equal"
1208 );
1209
1210 let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
1212 assert!(
1213 !a.topology_eq(&c),
1214 "different Line1D sizes should not be topology-equal"
1215 );
1216
1217 let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
1219 assert!(
1220 a.topology_eq(&bare),
1221 "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
1222 );
1223 }
1224}