1use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53 Shutdown,
55 ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Shutdown => write!(f, "tick thread has shut down"),
63 Self::ChannelFull => write!(f, "command channel full"),
64 }
65 }
66}
67
68impl std::error::Error for SubmitError {}
69
70#[derive(Debug)]
74pub struct ShutdownReport {
75 pub total_ms: u64,
77 pub drain_ms: u64,
79 pub quiesce_ms: u64,
81 pub tick_joined: bool,
83 pub workers_joined: usize,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum ShutdownState {
91 Running,
92 Draining,
93 Quiescing,
94 Dropped,
95}
96
97pub struct RealtimeAsyncWorld {
105 ring: Arc<SnapshotRing>,
106 epoch_counter: Arc<EpochCounter>,
107 worker_epochs: Arc<[WorkerEpoch]>,
108 cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
109 obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
110 shutdown_flag: Arc<AtomicBool>,
111 tick_stopped: Arc<AtomicBool>,
112 tick_thread: Option<JoinHandle<TickEngine>>,
113 worker_threads: Vec<JoinHandle<()>>,
114 state: ShutdownState,
115 recovered_engine: Mutex<Option<TickEngine>>,
120 config: AsyncConfig,
121 backoff_config: BackoffConfig,
122 seed: u64,
123 tick_rate_hz: f64,
124 space: Arc<dyn Space>,
126}
127
128impl RealtimeAsyncWorld {
129 pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
135 let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
136 if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
137 return Err(ConfigError::InvalidTickRate {
138 value: tick_rate_hz,
139 });
140 }
141
142 let seed = config.seed;
143 let ring_size = config.ring_buffer_size;
144 let backoff_config = config.backoff.clone();
145 let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
146 let cancel_grace_ms = async_config.cancel_grace_ms;
147
148 let space: Arc<dyn Space> = Arc::from(config.space);
150
151 let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
155 let engine_config = WorldConfig {
156 space: engine_space,
157 fields: config.fields,
158 propagators: config.propagators,
159 dt: config.dt,
160 seed: config.seed,
161 ring_buffer_size: config.ring_buffer_size,
162 max_ingress_queue: config.max_ingress_queue,
163 tick_rate_hz: config.tick_rate_hz,
164 backoff: backoff_config.clone(),
165 };
166
167 let engine = TickEngine::new(engine_config)?;
168
169 let worker_count = async_config.resolved_worker_count();
170 let ring = Arc::new(SnapshotRing::new(ring_size));
171 let epoch_counter = Arc::new(EpochCounter::new());
172 let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
173 .map(WorkerEpoch::new)
174 .collect::<Vec<_>>()
175 .into();
176
177 let shutdown_flag = Arc::new(AtomicBool::new(false));
178 let tick_stopped = Arc::new(AtomicBool::new(false));
179
180 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
182
183 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
185
186 let tick_ring = Arc::clone(&ring);
188 let tick_epoch = Arc::clone(&epoch_counter);
189 let tick_workers = Arc::clone(&worker_epochs);
190 let tick_shutdown = Arc::clone(&shutdown_flag);
191 let tick_stopped_flag = Arc::clone(&tick_stopped);
192 let stored_backoff = backoff_config.clone();
193 let tick_thread = thread::Builder::new()
194 .name("murk-tick".into())
195 .spawn(move || {
196 let state = TickThreadState::new(
197 engine,
198 tick_ring,
199 tick_epoch,
200 tick_workers,
201 cmd_rx,
202 tick_shutdown,
203 tick_stopped_flag,
204 tick_rate_hz,
205 max_epoch_hold_ms,
206 cancel_grace_ms,
207 &backoff_config,
208 );
209 state.run()
210 })
211 .expect("failed to spawn tick thread");
212
213 let worker_threads = Self::spawn_egress_workers(
215 worker_count,
216 &obs_rx,
217 &ring,
218 &epoch_counter,
219 &worker_epochs,
220 );
221
222 Ok(Self {
223 ring,
224 epoch_counter,
225 worker_epochs,
226 cmd_tx: Some(cmd_tx),
227 obs_tx: Some(obs_tx),
228 shutdown_flag,
229 tick_stopped,
230 tick_thread: Some(tick_thread),
231 worker_threads,
232 state: ShutdownState::Running,
233 recovered_engine: Mutex::new(None),
234 config: async_config,
235 backoff_config: stored_backoff,
236 seed,
237 tick_rate_hz,
238 space,
239 })
240 }
241
242 pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
247 let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
248
249 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
250 let batch = IngressBatch {
251 commands,
252 reply: reply_tx,
253 };
254
255 cmd_tx.try_send(batch).map_err(|e| match e {
256 crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
257 crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
258 })?;
259
260 reply_rx.recv().map_err(|_| SubmitError::Shutdown)
262 }
263
264 pub fn observe(
269 &self,
270 plan: &Arc<ObsPlan>,
271 output: &mut [f32],
272 mask: &mut [u8],
273 ) -> Result<ObsMetadata, ObsError> {
274 let obs_tx = self
275 .obs_tx
276 .as_ref()
277 .ok_or_else(|| ObsError::ExecutionFailed {
278 reason: "world is shut down".into(),
279 })?;
280
281 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
282 let task = ObsTask::Simple {
283 plan: Arc::clone(plan),
284 output_len: output.len(),
285 mask_len: mask.len(),
286 reply: reply_tx,
287 };
288
289 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
290 reason: "egress pool shut down".into(),
291 })?;
292
293 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
294 reason: "worker disconnected".into(),
295 })?;
296
297 match result {
298 ObsResult::Simple {
299 metadata,
300 output: buf,
301 mask: mbuf,
302 } => {
303 if buf.len() > output.len() || mbuf.len() > mask.len() {
304 return Err(ObsError::ExecutionFailed {
305 reason: format!(
306 "output buffer too small: need ({}, {}) got ({}, {})",
307 buf.len(),
308 mbuf.len(),
309 output.len(),
310 mask.len()
311 ),
312 });
313 }
314 output[..buf.len()].copy_from_slice(&buf);
315 mask[..mbuf.len()].copy_from_slice(&mbuf);
316 Ok(metadata)
317 }
318 ObsResult::Error(e) => Err(e),
319 _ => Err(ObsError::ExecutionFailed {
320 reason: "unexpected result type".into(),
321 }),
322 }
323 }
324
325 pub fn observe_agents(
329 &self,
330 plan: &Arc<ObsPlan>,
331 space: &Arc<dyn Space>,
332 agent_centers: &[Coord],
333 output: &mut [f32],
334 mask: &mut [u8],
335 ) -> Result<Vec<ObsMetadata>, ObsError> {
336 let obs_tx = self
337 .obs_tx
338 .as_ref()
339 .ok_or_else(|| ObsError::ExecutionFailed {
340 reason: "world is shut down".into(),
341 })?;
342
343 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
344 let n_agents = agent_centers.len();
345 let per_agent_output = if n_agents > 0 {
346 output.len() / n_agents
347 } else {
348 0
349 };
350 let per_agent_mask = if n_agents > 0 {
351 mask.len() / n_agents
352 } else {
353 0
354 };
355
356 let task = ObsTask::Agents {
357 plan: Arc::clone(plan),
358 space: Arc::clone(space),
359 agent_centers: agent_centers.to_vec(),
360 output_len: per_agent_output,
361 mask_len: per_agent_mask,
362 reply: reply_tx,
363 };
364
365 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
366 reason: "egress pool shut down".into(),
367 })?;
368
369 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
370 reason: "worker disconnected".into(),
371 })?;
372
373 match result {
374 ObsResult::Agents {
375 metadata,
376 output: buf,
377 mask: mbuf,
378 } => {
379 if buf.len() > output.len() || mbuf.len() > mask.len() {
380 return Err(ObsError::ExecutionFailed {
381 reason: format!(
382 "output buffer too small: need ({}, {}) got ({}, {})",
383 buf.len(),
384 mbuf.len(),
385 output.len(),
386 mask.len()
387 ),
388 });
389 }
390 output[..buf.len()].copy_from_slice(&buf);
391 mask[..mbuf.len()].copy_from_slice(&mbuf);
392 Ok(metadata)
393 }
394 ObsResult::Error(e) => Err(e),
395 _ => Err(ObsError::ExecutionFailed {
396 reason: "unexpected result type".into(),
397 }),
398 }
399 }
400
401 fn spawn_egress_workers(
403 worker_count: usize,
404 obs_rx: &crossbeam_channel::Receiver<ObsTask>,
405 ring: &Arc<SnapshotRing>,
406 epoch_counter: &Arc<EpochCounter>,
407 worker_epochs: &Arc<[WorkerEpoch]>,
408 ) -> Vec<JoinHandle<()>> {
409 let mut worker_threads = Vec::with_capacity(worker_count);
410 for i in 0..worker_count {
411 let obs_rx = obs_rx.clone();
412 let ring = Arc::clone(ring);
413 let epoch = Arc::clone(epoch_counter);
414 let worker_epochs_ref = Arc::clone(worker_epochs);
415 let handle = thread::Builder::new()
416 .name(format!("murk-egress-{i}"))
417 .spawn(move || {
418 crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
419 })
420 .expect("failed to spawn egress worker");
421 worker_threads.push(handle);
422 }
423 worker_threads
424 }
425
426 pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
428 self.ring.latest()
429 }
430
431 pub fn current_epoch(&self) -> u64 {
433 self.epoch_counter.current()
434 }
435
436 pub fn shutdown(&mut self) -> ShutdownReport {
443 if self.state == ShutdownState::Dropped {
444 return ShutdownReport {
445 total_ms: 0,
446 drain_ms: 0,
447 quiesce_ms: 0,
448 tick_joined: true,
449 workers_joined: 0,
450 };
451 }
452
453 let start = Instant::now();
454
455 self.state = ShutdownState::Draining;
457 self.shutdown_flag.store(true, Ordering::Release);
458
459 if let Some(handle) = &self.tick_thread {
463 handle.thread().unpark();
464 }
465
466 let drain_deadline = Instant::now() + Duration::from_millis(33);
468 while !self.tick_stopped.load(Ordering::Acquire) {
469 if Instant::now() > drain_deadline {
470 break;
471 }
472 thread::yield_now();
473 }
474 let drain_ms = start.elapsed().as_millis() as u64;
475
476 self.state = ShutdownState::Quiescing;
478
479 for w in self.worker_epochs.iter() {
481 w.request_cancel();
482 }
483
484 self.cmd_tx.take();
486 self.obs_tx.take();
487
488 let quiesce_deadline = Instant::now() + Duration::from_millis(200);
490 loop {
491 let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
492 if all_unpinned || Instant::now() > quiesce_deadline {
493 break;
494 }
495 thread::yield_now();
496 }
497 let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
498
499 self.state = ShutdownState::Dropped;
501
502 let tick_joined = if let Some(handle) = self.tick_thread.take() {
503 match handle.join() {
504 Ok(engine) => {
505 *self.recovered_engine.lock().unwrap() = Some(engine);
506 true
507 }
508 Err(_) => false,
509 }
510 } else {
511 true
512 };
513
514 let mut workers_joined = 0;
515 for handle in self.worker_threads.drain(..) {
516 if handle.join().is_ok() {
517 workers_joined += 1;
518 }
519 }
520
521 let total_ms = start.elapsed().as_millis() as u64;
522 ShutdownReport {
523 total_ms,
524 drain_ms,
525 quiesce_ms,
526 tick_joined,
527 workers_joined,
528 }
529 }
530
531 pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
537 if self.state != ShutdownState::Dropped {
539 self.shutdown();
540 }
541
542 self.seed = seed;
543
544 let mut engine = self
546 .recovered_engine
547 .lock()
548 .unwrap()
549 .take()
550 .ok_or(ConfigError::EngineRecoveryFailed)?;
551
552 if let Err(e) = engine.reset() {
555 *self.recovered_engine.lock().unwrap() = Some(engine);
556 return Err(e);
557 }
558
559 let worker_count = self.config.resolved_worker_count();
561 self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
562 self.epoch_counter = Arc::new(EpochCounter::new());
563 self.worker_epochs = (0..worker_count as u32)
564 .map(WorkerEpoch::new)
565 .collect::<Vec<_>>()
566 .into();
567 self.shutdown_flag = Arc::new(AtomicBool::new(false));
568 self.tick_stopped = Arc::new(AtomicBool::new(false));
569
570 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
572 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
573 self.cmd_tx = Some(cmd_tx);
574 self.obs_tx = Some(obs_tx);
575
576 let tick_ring = Arc::clone(&self.ring);
578 let tick_epoch = Arc::clone(&self.epoch_counter);
579 let tick_workers = Arc::clone(&self.worker_epochs);
580 let tick_shutdown = Arc::clone(&self.shutdown_flag);
581 let tick_stopped_flag = Arc::clone(&self.tick_stopped);
582 let tick_rate_hz = self.tick_rate_hz;
583 let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
584 let cancel_grace_ms = self.config.cancel_grace_ms;
585 let backoff_config = self.backoff_config.clone();
586 self.tick_thread = Some(
587 thread::Builder::new()
588 .name("murk-tick".into())
589 .spawn(move || {
590 let state = TickThreadState::new(
591 engine,
592 tick_ring,
593 tick_epoch,
594 tick_workers,
595 cmd_rx,
596 tick_shutdown,
597 tick_stopped_flag,
598 tick_rate_hz,
599 max_epoch_hold_ms,
600 cancel_grace_ms,
601 &backoff_config,
602 );
603 state.run()
604 })
605 .expect("failed to spawn tick thread"),
606 );
607
608 self.worker_threads = Self::spawn_egress_workers(
610 worker_count,
611 &obs_rx,
612 &self.ring,
613 &self.epoch_counter,
614 &self.worker_epochs,
615 );
616
617 self.state = ShutdownState::Running;
618 Ok(())
619 }
620
621 pub fn space(&self) -> &dyn Space {
623 self.space.as_ref()
624 }
625}
626
627impl Drop for RealtimeAsyncWorld {
628 fn drop(&mut self) {
629 if self.state != ShutdownState::Dropped {
630 self.shutdown();
631 }
632 }
633}
634
635struct ArcSpaceWrapper(Arc<dyn Space>);
642
643impl murk_space::Space for ArcSpaceWrapper {
644 fn ndim(&self) -> usize {
645 self.0.ndim()
646 }
647
648 fn cell_count(&self) -> usize {
649 self.0.cell_count()
650 }
651
652 fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
653 self.0.neighbours(coord)
654 }
655
656 fn distance(&self, a: &Coord, b: &Coord) -> f64 {
657 self.0.distance(a, b)
658 }
659
660 fn compile_region(
661 &self,
662 spec: &murk_space::RegionSpec,
663 ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
664 self.0.compile_region(spec)
665 }
666
667 fn canonical_ordering(&self) -> Vec<Coord> {
668 self.0.canonical_ordering()
669 }
670
671 fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
672 self.0.canonical_rank(coord)
673 }
674
675 fn instance_id(&self) -> murk_core::SpaceInstanceId {
676 self.0.instance_id()
677 }
678
679 fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
680 if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
683 self.0.topology_eq(&*w.0)
684 } else {
685 self.0.topology_eq(other)
686 }
687 }
688}
689
690impl std::fmt::Debug for ArcSpaceWrapper {
692 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693 f.debug_tuple("ArcSpaceWrapper").finish()
694 }
695}
696
697#[cfg(test)]
698mod tests {
699 use super::*;
700 use murk_core::id::FieldId;
701 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
702 use murk_obs::spec::ObsRegion;
703 use murk_obs::{ObsEntry, ObsSpec};
704 use murk_space::{EdgeBehavior, Line1D};
705 use murk_test_utils::ConstPropagator;
706
707 fn scalar_field(name: &str) -> FieldDef {
708 FieldDef {
709 name: name.to_string(),
710 field_type: FieldType::Scalar,
711 mutability: FieldMutability::PerTick,
712 units: None,
713 bounds: None,
714 boundary_behavior: BoundaryBehavior::Clamp,
715 }
716 }
717
718 fn test_config() -> WorldConfig {
719 WorldConfig {
720 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
721 fields: vec![scalar_field("energy")],
722 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
723 dt: 0.1,
724 seed: 42,
725 ring_buffer_size: 8,
726 max_ingress_queue: 1024,
727 tick_rate_hz: Some(60.0),
728 backoff: crate::config::BackoffConfig::default(),
729 }
730 }
731
732 #[test]
733 fn lifecycle_start_and_shutdown() {
734 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
735
736 let deadline = Instant::now() + Duration::from_secs(2);
738 while world.latest_snapshot().is_none() {
739 if Instant::now() > deadline {
740 panic!("no snapshot produced within 2s");
741 }
742 std::thread::sleep(Duration::from_millis(10));
743 }
744
745 let epoch = world.current_epoch();
746 assert!(epoch > 0, "epoch should have advanced");
747
748 let report = world.shutdown();
749 assert!(report.tick_joined);
750 assert!(report.workers_joined > 0);
751 }
752
753 #[test]
754 fn observe_returns_data() {
755 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
756
757 let deadline = Instant::now() + Duration::from_secs(2);
759 while world.latest_snapshot().is_none() {
760 if Instant::now() > deadline {
761 panic!("no snapshot produced within 2s");
762 }
763 std::thread::sleep(Duration::from_millis(10));
764 }
765
766 let space = world.space();
768 let spec = ObsSpec {
769 entries: vec![ObsEntry {
770 field_id: FieldId(0),
771 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
772 pool: None,
773 transform: murk_obs::spec::ObsTransform::Identity,
774 dtype: murk_obs::spec::ObsDtype::F32,
775 }],
776 };
777 let plan_result = ObsPlan::compile(&spec, space).unwrap();
778 let plan = Arc::new(plan_result.plan);
779
780 let mut output = vec![0.0f32; plan_result.output_len];
781 let mut mask = vec![0u8; plan_result.mask_len];
782
783 let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
784 assert!(meta.tick_id.0 > 0);
785 assert_eq!(output.len(), 10);
786 assert!(output.iter().all(|&v| v == 42.0));
787
788 world.shutdown();
789 }
790
791 #[test]
792 fn concurrent_observe() {
793 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
794
795 let deadline = Instant::now() + Duration::from_secs(2);
797 while world.latest_snapshot().is_none() {
798 if Instant::now() > deadline {
799 panic!("no snapshot produced within 2s");
800 }
801 std::thread::sleep(Duration::from_millis(10));
802 }
803
804 let space = world.space();
805 let spec = ObsSpec {
806 entries: vec![ObsEntry {
807 field_id: FieldId(0),
808 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
809 pool: None,
810 transform: murk_obs::spec::ObsTransform::Identity,
811 dtype: murk_obs::spec::ObsDtype::F32,
812 }],
813 };
814 let plan_result = ObsPlan::compile(&spec, space).unwrap();
815 let plan = Arc::new(plan_result.plan);
816
817 let world = Arc::new(world);
819 let handles: Vec<_> = (0..4)
820 .map(|_| {
821 let w = Arc::clone(&world);
822 let p = Arc::clone(&plan);
823 let out_len = plan_result.output_len;
824 let mask_len = plan_result.mask_len;
825 std::thread::spawn(move || {
826 let mut output = vec![0.0f32; out_len];
827 let mut mask = vec![0u8; mask_len];
828 let meta = w.observe(&p, &mut output, &mut mask).unwrap();
829 assert!(meta.tick_id.0 > 0);
830 assert!(output.iter().all(|&v| v == 42.0));
831 })
832 })
833 .collect();
834
835 for h in handles {
836 h.join().unwrap();
837 }
838
839 drop(world);
842 }
843
844 #[test]
845 fn submit_commands_flow_through() {
846 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
847
848 let deadline = Instant::now() + Duration::from_secs(2);
850 while world.latest_snapshot().is_none() {
851 if Instant::now() > deadline {
852 panic!("no snapshot produced within 2s");
853 }
854 std::thread::sleep(Duration::from_millis(10));
855 }
856
857 let cmd = Command {
859 payload: murk_core::command::CommandPayload::SetParameter {
860 key: murk_core::id::ParameterKey(0),
861 value: 1.0,
862 },
863 expires_after_tick: murk_core::id::TickId(10000),
864 source_id: None,
865 source_seq: None,
866 priority_class: 1,
867 arrival_seq: 0,
868 };
869 let receipts = world.submit_commands(vec![cmd]).unwrap();
870 assert_eq!(receipts.len(), 1);
871 assert!(receipts[0].accepted);
872
873 world.shutdown();
874 }
875
876 #[test]
877 fn drop_triggers_shutdown() {
878 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
879 std::thread::sleep(Duration::from_millis(50));
880 drop(world);
881 }
883
884 #[test]
885 fn shutdown_budget() {
886 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
887 std::thread::sleep(Duration::from_millis(100));
888
889 let report = world.shutdown();
890 assert!(
892 report.total_ms < 2000,
893 "shutdown took too long: {}ms",
894 report.total_ms
895 );
896 }
897
898 #[test]
903 fn shutdown_fast_with_slow_tick_rate() {
904 let config = WorldConfig {
905 tick_rate_hz: Some(0.5), ..test_config()
907 };
908 let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
909
910 let deadline = Instant::now() + Duration::from_secs(5);
913 while world.latest_snapshot().is_none() {
914 if Instant::now() > deadline {
915 panic!("no snapshot produced within 5s");
916 }
917 std::thread::sleep(Duration::from_millis(10));
918 }
919
920 std::thread::sleep(Duration::from_millis(50));
922
923 let start = Instant::now();
924 let report = world.shutdown();
925 let wall_ms = start.elapsed().as_millis();
926
927 assert!(
930 wall_ms < 500,
931 "shutdown took {wall_ms}ms with 0.5Hz tick rate \
932 (report: total={}ms, drain={}ms, quiesce={}ms)",
933 report.total_ms,
934 report.drain_ms,
935 report.quiesce_ms
936 );
937 assert!(report.tick_joined);
938 }
939
940 #[test]
941 fn reset_lifecycle() {
942 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
943
944 let deadline = Instant::now() + Duration::from_secs(5);
946 while world.current_epoch() < 5 {
947 if Instant::now() > deadline {
948 panic!("epoch didn't reach 5 within 5s");
949 }
950 std::thread::sleep(Duration::from_millis(10));
951 }
952 let epoch_before = world.current_epoch();
953 assert!(epoch_before >= 5);
954
955 world.reset(99).unwrap();
957
958 assert_eq!(world.current_epoch(), 0);
960
961 let deadline = Instant::now() + Duration::from_secs(2);
963 while world.latest_snapshot().is_none() {
964 if Instant::now() > deadline {
965 panic!("no snapshot after reset within 2s");
966 }
967 std::thread::sleep(Duration::from_millis(10));
968 }
969 assert!(world.current_epoch() > 0, "should be ticking after reset");
970
971 world.shutdown();
972 }
973
974 #[test]
975 fn arc_space_wrapper_topology_eq() {
976 let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
980 let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
981 assert!(
982 a.topology_eq(&b),
983 "identical Line1D through ArcSpaceWrapper should be topology-equal"
984 );
985
986 let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
988 assert!(
989 !a.topology_eq(&c),
990 "different Line1D sizes should not be topology-equal"
991 );
992
993 let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
995 assert!(
996 a.topology_eq(&bare),
997 "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
998 );
999 }
1000}