1use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48#[derive(Debug)]
52pub enum SubmitError {
53 Shutdown,
55 ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 Self::Shutdown => write!(f, "tick thread has shut down"),
63 Self::ChannelFull => write!(f, "command channel full"),
64 }
65 }
66}
67
68impl std::error::Error for SubmitError {}
69
70#[derive(Debug)]
74pub struct ShutdownReport {
75 pub total_ms: u64,
77 pub drain_ms: u64,
79 pub quiesce_ms: u64,
81 pub tick_joined: bool,
83 pub workers_joined: usize,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum ShutdownState {
91 Running,
92 Draining,
93 Quiescing,
94 Dropped,
95}
96
97pub struct RealtimeAsyncWorld {
105 ring: Arc<SnapshotRing>,
106 epoch_counter: Arc<EpochCounter>,
107 worker_epochs: Arc<[WorkerEpoch]>,
108 cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
109 obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
110 shutdown_flag: Arc<AtomicBool>,
111 tick_stopped: Arc<AtomicBool>,
112 tick_thread: Option<JoinHandle<TickEngine>>,
113 worker_threads: Vec<JoinHandle<()>>,
114 state: ShutdownState,
115 recovered_engine: Mutex<Option<TickEngine>>,
120 config: AsyncConfig,
121 backoff_config: BackoffConfig,
122 seed: u64,
123 tick_rate_hz: f64,
124 space: Arc<dyn Space>,
126}
127
128impl RealtimeAsyncWorld {
129 pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
135 let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
136 if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
137 return Err(ConfigError::InvalidTickRate {
138 value: tick_rate_hz,
139 });
140 }
141
142 let seed = config.seed;
143 let ring_size = config.ring_buffer_size;
144 let backoff_config = config.backoff.clone();
145 let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
146 let cancel_grace_ms = async_config.cancel_grace_ms;
147
148 let space: Arc<dyn Space> = Arc::from(config.space);
150
151 let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
155 let engine_config = WorldConfig {
156 space: engine_space,
157 fields: config.fields,
158 propagators: config.propagators,
159 dt: config.dt,
160 seed: config.seed,
161 ring_buffer_size: config.ring_buffer_size,
162 max_ingress_queue: config.max_ingress_queue,
163 tick_rate_hz: config.tick_rate_hz,
164 backoff: backoff_config.clone(),
165 };
166
167 let engine = TickEngine::new(engine_config)?;
168
169 let worker_count = async_config.resolved_worker_count();
170 let ring = Arc::new(SnapshotRing::new(ring_size));
171 let epoch_counter = Arc::new(EpochCounter::new());
172 let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
173 .map(WorkerEpoch::new)
174 .collect::<Vec<_>>()
175 .into();
176
177 let shutdown_flag = Arc::new(AtomicBool::new(false));
178 let tick_stopped = Arc::new(AtomicBool::new(false));
179
180 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
182
183 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
185
186 let tick_ring = Arc::clone(&ring);
188 let tick_epoch = Arc::clone(&epoch_counter);
189 let tick_workers = Arc::clone(&worker_epochs);
190 let tick_shutdown = Arc::clone(&shutdown_flag);
191 let tick_stopped_flag = Arc::clone(&tick_stopped);
192 let stored_backoff = backoff_config.clone();
193 let tick_thread = thread::Builder::new()
194 .name("murk-tick".into())
195 .spawn(move || {
196 let state = TickThreadState::new(
197 engine,
198 tick_ring,
199 tick_epoch,
200 tick_workers,
201 cmd_rx,
202 tick_shutdown,
203 tick_stopped_flag,
204 tick_rate_hz,
205 max_epoch_hold_ms,
206 cancel_grace_ms,
207 &backoff_config,
208 );
209 state.run()
210 })
211 .expect("failed to spawn tick thread");
212
213 let worker_threads = Self::spawn_egress_workers(
215 worker_count,
216 &obs_rx,
217 &ring,
218 &epoch_counter,
219 &worker_epochs,
220 );
221
222 Ok(Self {
223 ring,
224 epoch_counter,
225 worker_epochs,
226 cmd_tx: Some(cmd_tx),
227 obs_tx: Some(obs_tx),
228 shutdown_flag,
229 tick_stopped,
230 tick_thread: Some(tick_thread),
231 worker_threads,
232 state: ShutdownState::Running,
233 recovered_engine: Mutex::new(None),
234 config: async_config,
235 backoff_config: stored_backoff,
236 seed,
237 tick_rate_hz,
238 space,
239 })
240 }
241
242 pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
247 let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
248
249 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
250 let batch = IngressBatch {
251 commands,
252 reply: reply_tx,
253 };
254
255 cmd_tx.try_send(batch).map_err(|e| match e {
256 crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
257 crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
258 })?;
259
260 reply_rx.recv().map_err(|_| SubmitError::Shutdown)
262 }
263
264 pub fn observe(
269 &self,
270 plan: &Arc<ObsPlan>,
271 output: &mut [f32],
272 mask: &mut [u8],
273 ) -> Result<ObsMetadata, ObsError> {
274 let obs_tx = self
275 .obs_tx
276 .as_ref()
277 .ok_or_else(|| ObsError::ExecutionFailed {
278 reason: "world is shut down".into(),
279 })?;
280
281 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
282 let task = ObsTask::Simple {
283 plan: Arc::clone(plan),
284 output_len: output.len(),
285 mask_len: mask.len(),
286 reply: reply_tx,
287 };
288
289 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
290 reason: "egress pool shut down".into(),
291 })?;
292
293 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
294 reason: "worker disconnected".into(),
295 })?;
296
297 match result {
298 ObsResult::Simple {
299 metadata,
300 output: buf,
301 mask: mbuf,
302 } => {
303 output[..buf.len()].copy_from_slice(&buf);
304 mask[..mbuf.len()].copy_from_slice(&mbuf);
305 Ok(metadata)
306 }
307 ObsResult::Error(e) => Err(e),
308 _ => Err(ObsError::ExecutionFailed {
309 reason: "unexpected result type".into(),
310 }),
311 }
312 }
313
314 pub fn observe_agents(
318 &self,
319 plan: &Arc<ObsPlan>,
320 space: &Arc<dyn Space>,
321 agent_centers: &[Coord],
322 output: &mut [f32],
323 mask: &mut [u8],
324 ) -> Result<Vec<ObsMetadata>, ObsError> {
325 let obs_tx = self
326 .obs_tx
327 .as_ref()
328 .ok_or_else(|| ObsError::ExecutionFailed {
329 reason: "world is shut down".into(),
330 })?;
331
332 let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
333 let n_agents = agent_centers.len();
334 let per_agent_output = if n_agents > 0 {
335 output.len() / n_agents
336 } else {
337 0
338 };
339 let per_agent_mask = if n_agents > 0 {
340 mask.len() / n_agents
341 } else {
342 0
343 };
344
345 let task = ObsTask::Agents {
346 plan: Arc::clone(plan),
347 space: Arc::clone(space),
348 agent_centers: agent_centers.to_vec(),
349 output_len: per_agent_output,
350 mask_len: per_agent_mask,
351 reply: reply_tx,
352 };
353
354 obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
355 reason: "egress pool shut down".into(),
356 })?;
357
358 let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
359 reason: "worker disconnected".into(),
360 })?;
361
362 match result {
363 ObsResult::Agents {
364 metadata,
365 output: buf,
366 mask: mbuf,
367 } => {
368 let copy_out = buf.len().min(output.len());
369 output[..copy_out].copy_from_slice(&buf[..copy_out]);
370 let copy_mask = mbuf.len().min(mask.len());
371 mask[..copy_mask].copy_from_slice(&mbuf[..copy_mask]);
372 Ok(metadata)
373 }
374 ObsResult::Error(e) => Err(e),
375 _ => Err(ObsError::ExecutionFailed {
376 reason: "unexpected result type".into(),
377 }),
378 }
379 }
380
381 fn spawn_egress_workers(
383 worker_count: usize,
384 obs_rx: &crossbeam_channel::Receiver<ObsTask>,
385 ring: &Arc<SnapshotRing>,
386 epoch_counter: &Arc<EpochCounter>,
387 worker_epochs: &Arc<[WorkerEpoch]>,
388 ) -> Vec<JoinHandle<()>> {
389 let mut worker_threads = Vec::with_capacity(worker_count);
390 for i in 0..worker_count {
391 let obs_rx = obs_rx.clone();
392 let ring = Arc::clone(ring);
393 let epoch = Arc::clone(epoch_counter);
394 let worker_epochs_ref = Arc::clone(worker_epochs);
395 let handle = thread::Builder::new()
396 .name(format!("murk-egress-{i}"))
397 .spawn(move || {
398 crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
399 })
400 .expect("failed to spawn egress worker");
401 worker_threads.push(handle);
402 }
403 worker_threads
404 }
405
406 pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
408 self.ring.latest()
409 }
410
411 pub fn current_epoch(&self) -> u64 {
413 self.epoch_counter.current()
414 }
415
416 pub fn shutdown(&mut self) -> ShutdownReport {
422 if self.state == ShutdownState::Dropped {
423 return ShutdownReport {
424 total_ms: 0,
425 drain_ms: 0,
426 quiesce_ms: 0,
427 tick_joined: true,
428 workers_joined: 0,
429 };
430 }
431
432 let start = Instant::now();
433
434 self.state = ShutdownState::Draining;
436 self.shutdown_flag.store(true, Ordering::Release);
437
438 let drain_deadline = Instant::now() + Duration::from_millis(33);
440 while !self.tick_stopped.load(Ordering::Acquire) {
441 if Instant::now() > drain_deadline {
442 break;
443 }
444 thread::yield_now();
445 }
446 let drain_ms = start.elapsed().as_millis() as u64;
447
448 self.state = ShutdownState::Quiescing;
450
451 for w in self.worker_epochs.iter() {
453 w.request_cancel();
454 }
455
456 self.cmd_tx.take();
458 self.obs_tx.take();
459
460 let quiesce_deadline = Instant::now() + Duration::from_millis(200);
462 loop {
463 let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
464 if all_unpinned || Instant::now() > quiesce_deadline {
465 break;
466 }
467 thread::yield_now();
468 }
469 let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
470
471 self.state = ShutdownState::Dropped;
473
474 let tick_joined = if let Some(handle) = self.tick_thread.take() {
475 match handle.join() {
476 Ok(engine) => {
477 *self.recovered_engine.lock().unwrap() = Some(engine);
478 true
479 }
480 Err(_) => false,
481 }
482 } else {
483 true
484 };
485
486 let mut workers_joined = 0;
487 for handle in self.worker_threads.drain(..) {
488 if handle.join().is_ok() {
489 workers_joined += 1;
490 }
491 }
492
493 let total_ms = start.elapsed().as_millis() as u64;
494 ShutdownReport {
495 total_ms,
496 drain_ms,
497 quiesce_ms,
498 tick_joined,
499 workers_joined,
500 }
501 }
502
503 pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
509 if self.state != ShutdownState::Dropped {
511 self.shutdown();
512 }
513
514 self.seed = seed;
515
516 let mut engine = self
518 .recovered_engine
519 .lock()
520 .unwrap()
521 .take()
522 .ok_or(ConfigError::InvalidTickRate { value: 0.0 })?;
523
524 if let Err(e) = engine.reset() {
527 *self.recovered_engine.lock().unwrap() = Some(engine);
528 return Err(e);
529 }
530
531 let worker_count = self.config.resolved_worker_count();
533 self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
534 self.epoch_counter = Arc::new(EpochCounter::new());
535 self.worker_epochs = (0..worker_count as u32)
536 .map(WorkerEpoch::new)
537 .collect::<Vec<_>>()
538 .into();
539 self.shutdown_flag = Arc::new(AtomicBool::new(false));
540 self.tick_stopped = Arc::new(AtomicBool::new(false));
541
542 let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
544 let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
545 self.cmd_tx = Some(cmd_tx);
546 self.obs_tx = Some(obs_tx);
547
548 let tick_ring = Arc::clone(&self.ring);
550 let tick_epoch = Arc::clone(&self.epoch_counter);
551 let tick_workers = Arc::clone(&self.worker_epochs);
552 let tick_shutdown = Arc::clone(&self.shutdown_flag);
553 let tick_stopped_flag = Arc::clone(&self.tick_stopped);
554 let tick_rate_hz = self.tick_rate_hz;
555 let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
556 let cancel_grace_ms = self.config.cancel_grace_ms;
557 let backoff_config = self.backoff_config.clone();
558 self.tick_thread = Some(
559 thread::Builder::new()
560 .name("murk-tick".into())
561 .spawn(move || {
562 let state = TickThreadState::new(
563 engine,
564 tick_ring,
565 tick_epoch,
566 tick_workers,
567 cmd_rx,
568 tick_shutdown,
569 tick_stopped_flag,
570 tick_rate_hz,
571 max_epoch_hold_ms,
572 cancel_grace_ms,
573 &backoff_config,
574 );
575 state.run()
576 })
577 .expect("failed to spawn tick thread"),
578 );
579
580 self.worker_threads = Self::spawn_egress_workers(
582 worker_count,
583 &obs_rx,
584 &self.ring,
585 &self.epoch_counter,
586 &self.worker_epochs,
587 );
588
589 self.state = ShutdownState::Running;
590 Ok(())
591 }
592
593 pub fn space(&self) -> &dyn Space {
595 self.space.as_ref()
596 }
597}
598
599impl Drop for RealtimeAsyncWorld {
600 fn drop(&mut self) {
601 if self.state != ShutdownState::Dropped {
602 self.shutdown();
603 }
604 }
605}
606
607struct ArcSpaceWrapper(Arc<dyn Space>);
614
615impl murk_space::Space for ArcSpaceWrapper {
616 fn ndim(&self) -> usize {
617 self.0.ndim()
618 }
619
620 fn cell_count(&self) -> usize {
621 self.0.cell_count()
622 }
623
624 fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
625 self.0.neighbours(coord)
626 }
627
628 fn distance(&self, a: &Coord, b: &Coord) -> f64 {
629 self.0.distance(a, b)
630 }
631
632 fn compile_region(
633 &self,
634 spec: &murk_space::RegionSpec,
635 ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
636 self.0.compile_region(spec)
637 }
638
639 fn canonical_ordering(&self) -> Vec<Coord> {
640 self.0.canonical_ordering()
641 }
642
643 fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
644 self.0.canonical_rank(coord)
645 }
646
647 fn instance_id(&self) -> murk_core::SpaceInstanceId {
648 self.0.instance_id()
649 }
650}
651
652impl std::fmt::Debug for ArcSpaceWrapper {
654 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655 f.debug_tuple("ArcSpaceWrapper").finish()
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662 use murk_core::id::FieldId;
663 use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
664 use murk_obs::spec::ObsRegion;
665 use murk_obs::{ObsEntry, ObsSpec};
666 use murk_space::{EdgeBehavior, Line1D};
667 use murk_test_utils::ConstPropagator;
668
669 fn scalar_field(name: &str) -> FieldDef {
670 FieldDef {
671 name: name.to_string(),
672 field_type: FieldType::Scalar,
673 mutability: FieldMutability::PerTick,
674 units: None,
675 bounds: None,
676 boundary_behavior: BoundaryBehavior::Clamp,
677 }
678 }
679
680 fn test_config() -> WorldConfig {
681 WorldConfig {
682 space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
683 fields: vec![scalar_field("energy")],
684 propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
685 dt: 0.1,
686 seed: 42,
687 ring_buffer_size: 8,
688 max_ingress_queue: 1024,
689 tick_rate_hz: Some(60.0),
690 backoff: crate::config::BackoffConfig::default(),
691 }
692 }
693
694 #[test]
695 fn lifecycle_start_and_shutdown() {
696 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
697
698 let deadline = Instant::now() + Duration::from_secs(2);
700 while world.latest_snapshot().is_none() {
701 if Instant::now() > deadline {
702 panic!("no snapshot produced within 2s");
703 }
704 std::thread::sleep(Duration::from_millis(10));
705 }
706
707 let epoch = world.current_epoch();
708 assert!(epoch > 0, "epoch should have advanced");
709
710 let report = world.shutdown();
711 assert!(report.tick_joined);
712 assert!(report.workers_joined > 0);
713 }
714
715 #[test]
716 fn observe_returns_data() {
717 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
718
719 let deadline = Instant::now() + Duration::from_secs(2);
721 while world.latest_snapshot().is_none() {
722 if Instant::now() > deadline {
723 panic!("no snapshot produced within 2s");
724 }
725 std::thread::sleep(Duration::from_millis(10));
726 }
727
728 let space = world.space();
730 let spec = ObsSpec {
731 entries: vec![ObsEntry {
732 field_id: FieldId(0),
733 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
734 pool: None,
735 transform: murk_obs::spec::ObsTransform::Identity,
736 dtype: murk_obs::spec::ObsDtype::F32,
737 }],
738 };
739 let plan_result = ObsPlan::compile(&spec, space).unwrap();
740 let plan = Arc::new(plan_result.plan);
741
742 let mut output = vec![0.0f32; plan_result.output_len];
743 let mut mask = vec![0u8; plan_result.mask_len];
744
745 let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
746 assert!(meta.tick_id.0 > 0);
747 assert_eq!(output.len(), 10);
748 assert!(output.iter().all(|&v| v == 42.0));
749
750 world.shutdown();
751 }
752
753 #[test]
754 fn concurrent_observe() {
755 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
756
757 let deadline = Instant::now() + Duration::from_secs(2);
759 while world.latest_snapshot().is_none() {
760 if Instant::now() > deadline {
761 panic!("no snapshot produced within 2s");
762 }
763 std::thread::sleep(Duration::from_millis(10));
764 }
765
766 let space = world.space();
767 let spec = ObsSpec {
768 entries: vec![ObsEntry {
769 field_id: FieldId(0),
770 region: ObsRegion::Fixed(murk_space::RegionSpec::All),
771 pool: None,
772 transform: murk_obs::spec::ObsTransform::Identity,
773 dtype: murk_obs::spec::ObsDtype::F32,
774 }],
775 };
776 let plan_result = ObsPlan::compile(&spec, space).unwrap();
777 let plan = Arc::new(plan_result.plan);
778
779 let world = Arc::new(world);
781 let handles: Vec<_> = (0..4)
782 .map(|_| {
783 let w = Arc::clone(&world);
784 let p = Arc::clone(&plan);
785 let out_len = plan_result.output_len;
786 let mask_len = plan_result.mask_len;
787 std::thread::spawn(move || {
788 let mut output = vec![0.0f32; out_len];
789 let mut mask = vec![0u8; mask_len];
790 let meta = w.observe(&p, &mut output, &mut mask).unwrap();
791 assert!(meta.tick_id.0 > 0);
792 assert!(output.iter().all(|&v| v == 42.0));
793 })
794 })
795 .collect();
796
797 for h in handles {
798 h.join().unwrap();
799 }
800
801 drop(world);
804 }
805
806 #[test]
807 fn submit_commands_flow_through() {
808 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
809
810 let deadline = Instant::now() + Duration::from_secs(2);
812 while world.latest_snapshot().is_none() {
813 if Instant::now() > deadline {
814 panic!("no snapshot produced within 2s");
815 }
816 std::thread::sleep(Duration::from_millis(10));
817 }
818
819 let cmd = Command {
821 payload: murk_core::command::CommandPayload::SetParameter {
822 key: murk_core::id::ParameterKey(0),
823 value: 1.0,
824 },
825 expires_after_tick: murk_core::id::TickId(10000),
826 source_id: None,
827 source_seq: None,
828 priority_class: 1,
829 arrival_seq: 0,
830 };
831 let receipts = world.submit_commands(vec![cmd]).unwrap();
832 assert_eq!(receipts.len(), 1);
833 assert!(receipts[0].accepted);
834
835 world.shutdown();
836 }
837
838 #[test]
839 fn drop_triggers_shutdown() {
840 let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
841 std::thread::sleep(Duration::from_millis(50));
842 drop(world);
843 }
845
846 #[test]
847 fn shutdown_budget() {
848 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
849 std::thread::sleep(Duration::from_millis(100));
850
851 let report = world.shutdown();
852 assert!(
854 report.total_ms < 500,
855 "shutdown took too long: {}ms",
856 report.total_ms
857 );
858 }
859
860 #[test]
861 fn reset_lifecycle() {
862 let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
863
864 let deadline = Instant::now() + Duration::from_secs(2);
866 while world.current_epoch() < 5 {
867 if Instant::now() > deadline {
868 panic!("epoch didn't reach 5 within 2s");
869 }
870 std::thread::sleep(Duration::from_millis(10));
871 }
872 let epoch_before = world.current_epoch();
873 assert!(epoch_before >= 5);
874
875 world.reset(99).unwrap();
877
878 assert_eq!(world.current_epoch(), 0);
880
881 let deadline = Instant::now() + Duration::from_secs(2);
883 while world.latest_snapshot().is_none() {
884 if Instant::now() > deadline {
885 panic!("no snapshot after reset within 2s");
886 }
887 std::thread::sleep(Duration::from_millis(10));
888 }
889 assert!(world.current_epoch() > 0, "should be ticking after reset");
890
891 world.shutdown();
892 }
893}