Skip to main content

murk_engine/
realtime.rs

1//! User-facing `RealtimeAsyncWorld` API and shutdown state machine.
2//!
3//! This is the primary mode for RL training: the tick engine runs on a
4//! dedicated background thread at a configurable rate (default 60 Hz),
5//! while an egress thread pool serves observation requests concurrently.
6//!
7//! # Architecture
8//!
9//! ```text
10//! User Thread(s)              Tick Thread              Egress Workers (N)
11//!     |                           |                         |
12//!     |--submit_commands()------->| cmd_rx.try_recv()       |
13//!     |   [cmd_tx: bounded(64)]   | engine.submit_commands()|
14//!     |<--receipts via reply_tx---| engine.execute_tick()   |
15//!     |                           | ring.push(snap)         |
16//!     |                           | epoch_counter.advance() |
17//!     |                           | check_stalled_workers() |
18//!     |                           | sleep(budget - elapsed) |
19//!     |                           |                         |
20//!     |--observe()------------------------------------->    |
21//!     |   [obs_tx: bounded(N*4)]               task_rx.recv()
22//!     |   blocks on reply_rx                   ring.latest()
23//!     |                                        pin epoch
24//!     |                                        execute ObsPlan
25//!     |                                        unpin epoch
26//!     |<--result via reply_tx--------------------------    |
27//! ```
28
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48// ── Error types ──────────────────────────────────────────────────
49
50/// Error submitting commands to the tick thread.
51#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53    /// The tick thread has shut down.
54    Shutdown,
55    /// The command channel is full (back-pressure).
56    ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Shutdown => write!(f, "tick thread has shut down"),
63            Self::ChannelFull => write!(f, "command channel full"),
64        }
65    }
66}
67
68impl std::error::Error for SubmitError {}
69
70// ── ShutdownReport ───────────────────────────────────────────────
71
72/// Report from the shutdown state machine.
73#[derive(Debug)]
74pub struct ShutdownReport {
75    /// Total time spent in the shutdown sequence.
76    pub total_ms: u64,
77    /// Time spent draining the tick thread.
78    pub drain_ms: u64,
79    /// Time spent quiescing workers.
80    pub quiesce_ms: u64,
81    /// Whether the tick thread was joined successfully.
82    pub tick_joined: bool,
83    /// Number of worker threads joined.
84    pub workers_joined: usize,
85}
86
87// ── ShutdownState ────────────────────────────────────────────────
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum ShutdownState {
91    Running,
92    Draining,
93    Quiescing,
94    Dropped,
95}
96
97// ── RealtimeAsyncWorld ───────────────────────────────────────────
98
99/// Realtime asynchronous simulation world.
100///
101/// Runs the tick engine on a background thread and serves observations
102/// from a pool of egress workers. This is the primary API for RL
103/// training environments.
104pub struct RealtimeAsyncWorld {
105    ring: Arc<SnapshotRing>,
106    epoch_counter: Arc<EpochCounter>,
107    worker_epochs: Arc<[WorkerEpoch]>,
108    cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
109    obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
110    shutdown_flag: Arc<AtomicBool>,
111    tick_stopped: Arc<AtomicBool>,
112    tick_thread: Option<JoinHandle<TickEngine>>,
113    worker_threads: Vec<JoinHandle<()>>,
114    state: ShutdownState,
115    /// Recovered from tick thread on shutdown, used for `reset()`.
116    /// Wrapped in Mutex so RealtimeAsyncWorld is Sync (TickEngine
117    /// contains `Vec<Box<dyn Propagator>>` which is Send but not Sync).
118    /// Never contended: only accessed during reset() which takes &mut self.
119    recovered_engine: Mutex<Option<TickEngine>>,
120    config: AsyncConfig,
121    backoff_config: BackoffConfig,
122    seed: u64,
123    tick_rate_hz: f64,
124    /// Shared space for agent-relative observations and engine reconstruction.
125    space: Arc<dyn Space>,
126}
127
128impl RealtimeAsyncWorld {
129    /// Create a new realtime async world and spawn all threads.
130    ///
131    /// The `WorldConfig` is consumed: the `TickEngine` is moved into
132    /// the tick thread. The `space` is shared via `Arc` for egress
133    /// workers that need it for agent-relative observations.
134    pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
135        let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
136        if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
137            return Err(ConfigError::InvalidTickRate {
138                value: tick_rate_hz,
139            });
140        }
141
142        let seed = config.seed;
143        let ring_size = config.ring_buffer_size;
144        let backoff_config = config.backoff.clone();
145        let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
146        let cancel_grace_ms = async_config.cancel_grace_ms;
147
148        // Share the space for agent-relative observations.
149        let space: Arc<dyn Space> = Arc::from(config.space);
150
151        // Reconstruct WorldConfig with the Arc'd space (Box from Arc).
152        // We clone the space Arc for our own reference, and give
153        // TickEngine a Box wrapper.
154        let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
155        let engine_config = WorldConfig {
156            space: engine_space,
157            fields: config.fields,
158            propagators: config.propagators,
159            dt: config.dt,
160            seed: config.seed,
161            ring_buffer_size: config.ring_buffer_size,
162            max_ingress_queue: config.max_ingress_queue,
163            tick_rate_hz: config.tick_rate_hz,
164            backoff: backoff_config.clone(),
165        };
166
167        let engine = TickEngine::new(engine_config)?;
168
169        let worker_count = async_config.resolved_worker_count();
170        let ring = Arc::new(SnapshotRing::new(ring_size));
171        let epoch_counter = Arc::new(EpochCounter::new());
172        let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
173            .map(WorkerEpoch::new)
174            .collect::<Vec<_>>()
175            .into();
176
177        let shutdown_flag = Arc::new(AtomicBool::new(false));
178        let tick_stopped = Arc::new(AtomicBool::new(false));
179
180        // Command channel: bounded(64) — tick thread drains each tick.
181        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
182
183        // Task channel for egress workers: bounded(worker_count * 4).
184        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
185
186        // Spawn tick thread — returns TickEngine on exit for reset().
187        let tick_ring = Arc::clone(&ring);
188        let tick_epoch = Arc::clone(&epoch_counter);
189        let tick_workers = Arc::clone(&worker_epochs);
190        let tick_shutdown = Arc::clone(&shutdown_flag);
191        let tick_stopped_flag = Arc::clone(&tick_stopped);
192        let stored_backoff = backoff_config.clone();
193        let tick_thread = thread::Builder::new()
194            .name("murk-tick".into())
195            .spawn(move || {
196                let state = TickThreadState::new(
197                    engine,
198                    tick_ring,
199                    tick_epoch,
200                    tick_workers,
201                    cmd_rx,
202                    tick_shutdown,
203                    tick_stopped_flag,
204                    tick_rate_hz,
205                    max_epoch_hold_ms,
206                    cancel_grace_ms,
207                    &backoff_config,
208                );
209                state.run()
210            })
211            .expect("failed to spawn tick thread");
212
213        // Spawn egress worker threads.
214        let worker_threads = Self::spawn_egress_workers(
215            worker_count,
216            &obs_rx,
217            &ring,
218            &epoch_counter,
219            &worker_epochs,
220        );
221
222        Ok(Self {
223            ring,
224            epoch_counter,
225            worker_epochs,
226            cmd_tx: Some(cmd_tx),
227            obs_tx: Some(obs_tx),
228            shutdown_flag,
229            tick_stopped,
230            tick_thread: Some(tick_thread),
231            worker_threads,
232            state: ShutdownState::Running,
233            recovered_engine: Mutex::new(None),
234            config: async_config,
235            backoff_config: stored_backoff,
236            seed,
237            tick_rate_hz,
238            space,
239        })
240    }
241
242    /// Submit commands to be processed in the next tick.
243    ///
244    /// Non-blocking: sends the batch via channel and blocks only for
245    /// the receipt reply (which arrives within one tick period).
246    pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
247        let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
248
249        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
250        let batch = IngressBatch {
251            commands,
252            reply: reply_tx,
253        };
254
255        cmd_tx.try_send(batch).map_err(|e| match e {
256            crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
257            crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
258        })?;
259
260        // Wait for receipts (blocks until tick thread processes the batch).
261        reply_rx.recv().map_err(|_| SubmitError::Shutdown)
262    }
263
264    /// Extract an observation from the latest snapshot.
265    ///
266    /// Blocking: dispatches to an egress worker and waits for the result.
267    /// The output and mask buffers must be pre-allocated.
268    pub fn observe(
269        &self,
270        plan: &Arc<ObsPlan>,
271        output: &mut [f32],
272        mask: &mut [u8],
273    ) -> Result<ObsMetadata, ObsError> {
274        let obs_tx = self
275            .obs_tx
276            .as_ref()
277            .ok_or_else(|| ObsError::ExecutionFailed {
278                reason: "world is shut down".into(),
279            })?;
280
281        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
282        let task = ObsTask::Simple {
283            plan: Arc::clone(plan),
284            output_len: output.len(),
285            mask_len: mask.len(),
286            reply: reply_tx,
287        };
288
289        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
290            reason: "egress pool shut down".into(),
291        })?;
292
293        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
294            reason: "worker disconnected".into(),
295        })?;
296
297        match result {
298            ObsResult::Simple {
299                metadata,
300                output: buf,
301                mask: mbuf,
302            } => {
303                if buf.len() > output.len() || mbuf.len() > mask.len() {
304                    return Err(ObsError::ExecutionFailed {
305                        reason: format!(
306                            "output buffer too small: need ({}, {}) got ({}, {})",
307                            buf.len(),
308                            mbuf.len(),
309                            output.len(),
310                            mask.len()
311                        ),
312                    });
313                }
314                output[..buf.len()].copy_from_slice(&buf);
315                mask[..mbuf.len()].copy_from_slice(&mbuf);
316                Ok(metadata)
317            }
318            ObsResult::Error(e) => Err(e),
319            _ => Err(ObsError::ExecutionFailed {
320                reason: "unexpected result type".into(),
321            }),
322        }
323    }
324
325    /// Extract agent-relative observations from the latest snapshot.
326    ///
327    /// Each agent gets `output_len / n_agents` elements in the output buffer.
328    pub fn observe_agents(
329        &self,
330        plan: &Arc<ObsPlan>,
331        space: &Arc<dyn Space>,
332        agent_centers: &[Coord],
333        output: &mut [f32],
334        mask: &mut [u8],
335    ) -> Result<Vec<ObsMetadata>, ObsError> {
336        let obs_tx = self
337            .obs_tx
338            .as_ref()
339            .ok_or_else(|| ObsError::ExecutionFailed {
340                reason: "world is shut down".into(),
341            })?;
342
343        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
344        let n_agents = agent_centers.len();
345        let per_agent_output = if n_agents > 0 {
346            output.len() / n_agents
347        } else {
348            0
349        };
350        let per_agent_mask = if n_agents > 0 {
351            mask.len() / n_agents
352        } else {
353            0
354        };
355
356        let task = ObsTask::Agents {
357            plan: Arc::clone(plan),
358            space: Arc::clone(space),
359            agent_centers: agent_centers.to_vec(),
360            output_len: per_agent_output,
361            mask_len: per_agent_mask,
362            reply: reply_tx,
363        };
364
365        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
366            reason: "egress pool shut down".into(),
367        })?;
368
369        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
370            reason: "worker disconnected".into(),
371        })?;
372
373        match result {
374            ObsResult::Agents {
375                metadata,
376                output: buf,
377                mask: mbuf,
378            } => {
379                if buf.len() > output.len() || mbuf.len() > mask.len() {
380                    return Err(ObsError::ExecutionFailed {
381                        reason: format!(
382                            "output buffer too small: need ({}, {}) got ({}, {})",
383                            buf.len(),
384                            mbuf.len(),
385                            output.len(),
386                            mask.len()
387                        ),
388                    });
389                }
390                output[..buf.len()].copy_from_slice(&buf);
391                mask[..mbuf.len()].copy_from_slice(&mbuf);
392                Ok(metadata)
393            }
394            ObsResult::Error(e) => Err(e),
395            _ => Err(ObsError::ExecutionFailed {
396                reason: "unexpected result type".into(),
397            }),
398        }
399    }
400
401    /// Spawn egress worker threads (shared between `new` and `reset`).
402    fn spawn_egress_workers(
403        worker_count: usize,
404        obs_rx: &crossbeam_channel::Receiver<ObsTask>,
405        ring: &Arc<SnapshotRing>,
406        epoch_counter: &Arc<EpochCounter>,
407        worker_epochs: &Arc<[WorkerEpoch]>,
408    ) -> Vec<JoinHandle<()>> {
409        let mut worker_threads = Vec::with_capacity(worker_count);
410        for i in 0..worker_count {
411            let obs_rx = obs_rx.clone();
412            let ring = Arc::clone(ring);
413            let epoch = Arc::clone(epoch_counter);
414            let worker_epochs_ref = Arc::clone(worker_epochs);
415            let handle = thread::Builder::new()
416                .name(format!("murk-egress-{i}"))
417                .spawn(move || {
418                    crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
419                })
420                .expect("failed to spawn egress worker");
421            worker_threads.push(handle);
422        }
423        worker_threads
424    }
425
426    /// Get the latest snapshot directly from the ring.
427    pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
428        self.ring.latest()
429    }
430
431    /// Current epoch (lock-free read).
432    pub fn current_epoch(&self) -> u64 {
433        self.epoch_counter.current()
434    }
435
436    /// Shutdown the world with the 4-state machine.
437    ///
438    /// 1. **Running → Draining (≤33ms):** Set shutdown flag, unpark tick
439    ///    thread (wakes it from budget sleep immediately), wait for tick stop.
440    /// 2. **Draining → Quiescing (≤200ms):** Cancel workers, drop obs channel.
441    /// 3. **Quiescing → Dropped (≤10ms):** Join all threads.
442    pub fn shutdown(&mut self) -> ShutdownReport {
443        if self.state == ShutdownState::Dropped {
444            return ShutdownReport {
445                total_ms: 0,
446                drain_ms: 0,
447                quiesce_ms: 0,
448                tick_joined: true,
449                workers_joined: 0,
450            };
451        }
452
453        let start = Instant::now();
454
455        // Phase 1: Running → Draining
456        self.state = ShutdownState::Draining;
457        self.shutdown_flag.store(true, Ordering::Release);
458
459        // Wake the tick thread if it's parked in a budget sleep.
460        // park_timeout is used instead of thread::sleep, so unpark()
461        // provides immediate wakeup regardless of tick_rate_hz.
462        if let Some(handle) = &self.tick_thread {
463            handle.thread().unpark();
464        }
465
466        // Wait for tick thread to acknowledge (≤33ms budget).
467        let drain_deadline = Instant::now() + Duration::from_millis(33);
468        while !self.tick_stopped.load(Ordering::Acquire) {
469            if Instant::now() > drain_deadline {
470                break;
471            }
472            thread::yield_now();
473        }
474        let drain_ms = start.elapsed().as_millis() as u64;
475
476        // Phase 2: Draining → Quiescing
477        self.state = ShutdownState::Quiescing;
478
479        // Cancel all workers.
480        for w in self.worker_epochs.iter() {
481            w.request_cancel();
482        }
483
484        // Drop the command and observation channels to unblock workers.
485        self.cmd_tx.take();
486        self.obs_tx.take();
487
488        // Wait for all workers to unpin (≤200ms budget).
489        let quiesce_deadline = Instant::now() + Duration::from_millis(200);
490        loop {
491            let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
492            if all_unpinned || Instant::now() > quiesce_deadline {
493                break;
494            }
495            thread::yield_now();
496        }
497        let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
498
499        // Phase 3: Quiescing → Dropped
500        self.state = ShutdownState::Dropped;
501
502        let tick_joined = if let Some(handle) = self.tick_thread.take() {
503            match handle.join() {
504                Ok(engine) => {
505                    *self.recovered_engine.lock().unwrap() = Some(engine);
506                    true
507                }
508                Err(_) => false,
509            }
510        } else {
511            true
512        };
513
514        let mut workers_joined = 0;
515        for handle in self.worker_threads.drain(..) {
516            if handle.join().is_ok() {
517                workers_joined += 1;
518            }
519        }
520
521        let total_ms = start.elapsed().as_millis() as u64;
522        ShutdownReport {
523            total_ms,
524            drain_ms,
525            quiesce_ms,
526            tick_joined,
527            workers_joined,
528        }
529    }
530
531    /// Reset the world: stop all threads, reset engine state, restart.
532    ///
533    /// This is the RL episode-boundary operation. The engine is recovered
534    /// from the tick thread, reset in-place, then respawned with fresh
535    /// channels and worker threads.
536    pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
537        // Shutdown if still running (recovers engine via JoinHandle).
538        if self.state != ShutdownState::Dropped {
539            self.shutdown();
540        }
541
542        self.seed = seed;
543
544        // Recover the engine from the previous tick thread.
545        let mut engine = self
546            .recovered_engine
547            .lock()
548            .unwrap()
549            .take()
550            .ok_or(ConfigError::EngineRecoveryFailed)?;
551
552        // Reset the engine (clears arena, ingress, tick counter).
553        // If reset fails, restore the engine so a subsequent reset can retry.
554        if let Err(e) = engine.reset() {
555            *self.recovered_engine.lock().unwrap() = Some(engine);
556            return Err(e);
557        }
558
559        // Fresh shared state.
560        let worker_count = self.config.resolved_worker_count();
561        self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
562        self.epoch_counter = Arc::new(EpochCounter::new());
563        self.worker_epochs = (0..worker_count as u32)
564            .map(WorkerEpoch::new)
565            .collect::<Vec<_>>()
566            .into();
567        self.shutdown_flag = Arc::new(AtomicBool::new(false));
568        self.tick_stopped = Arc::new(AtomicBool::new(false));
569
570        // Fresh channels.
571        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
572        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
573        self.cmd_tx = Some(cmd_tx);
574        self.obs_tx = Some(obs_tx);
575
576        // Respawn tick thread.
577        let tick_ring = Arc::clone(&self.ring);
578        let tick_epoch = Arc::clone(&self.epoch_counter);
579        let tick_workers = Arc::clone(&self.worker_epochs);
580        let tick_shutdown = Arc::clone(&self.shutdown_flag);
581        let tick_stopped_flag = Arc::clone(&self.tick_stopped);
582        let tick_rate_hz = self.tick_rate_hz;
583        let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
584        let cancel_grace_ms = self.config.cancel_grace_ms;
585        let backoff_config = self.backoff_config.clone();
586        self.tick_thread = Some(
587            thread::Builder::new()
588                .name("murk-tick".into())
589                .spawn(move || {
590                    let state = TickThreadState::new(
591                        engine,
592                        tick_ring,
593                        tick_epoch,
594                        tick_workers,
595                        cmd_rx,
596                        tick_shutdown,
597                        tick_stopped_flag,
598                        tick_rate_hz,
599                        max_epoch_hold_ms,
600                        cancel_grace_ms,
601                        &backoff_config,
602                    );
603                    state.run()
604                })
605                .expect("failed to spawn tick thread"),
606        );
607
608        // Respawn egress workers.
609        self.worker_threads = Self::spawn_egress_workers(
610            worker_count,
611            &obs_rx,
612            &self.ring,
613            &self.epoch_counter,
614            &self.worker_epochs,
615        );
616
617        self.state = ShutdownState::Running;
618        Ok(())
619    }
620
621    /// The shared space used for agent-relative observations.
622    pub fn space(&self) -> &dyn Space {
623        self.space.as_ref()
624    }
625}
626
627impl Drop for RealtimeAsyncWorld {
628    fn drop(&mut self) {
629        if self.state != ShutdownState::Dropped {
630            self.shutdown();
631        }
632    }
633}
634
635// ── ArcSpaceWrapper ──────────────────────────────────────────────
636
637/// Wrapper that implements `Space` by delegating to an `Arc<dyn Space>`.
638///
639/// This allows `TickEngine` to take a `Box<dyn Space>` while the
640/// `RealtimeAsyncWorld` retains a shared `Arc` reference for egress.
641struct ArcSpaceWrapper(Arc<dyn Space>);
642
643impl murk_space::Space for ArcSpaceWrapper {
644    fn ndim(&self) -> usize {
645        self.0.ndim()
646    }
647
648    fn cell_count(&self) -> usize {
649        self.0.cell_count()
650    }
651
652    fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
653        self.0.neighbours(coord)
654    }
655
656    fn distance(&self, a: &Coord, b: &Coord) -> f64 {
657        self.0.distance(a, b)
658    }
659
660    fn compile_region(
661        &self,
662        spec: &murk_space::RegionSpec,
663    ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
664        self.0.compile_region(spec)
665    }
666
667    fn canonical_ordering(&self) -> Vec<Coord> {
668        self.0.canonical_ordering()
669    }
670
671    fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
672        self.0.canonical_rank(coord)
673    }
674
675    fn instance_id(&self) -> murk_core::SpaceInstanceId {
676        self.0.instance_id()
677    }
678
679    fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
680        // Unwrap ArcSpaceWrapper so the inner space's downcast-based
681        // comparison sees the concrete type, not this wrapper.
682        if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
683            self.0.topology_eq(&*w.0)
684        } else {
685            self.0.topology_eq(other)
686        }
687    }
688}
689
690// Delegate optional Space methods if they exist.
691impl std::fmt::Debug for ArcSpaceWrapper {
692    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
693        f.debug_tuple("ArcSpaceWrapper").finish()
694    }
695}
696
697#[cfg(test)]
698mod tests {
699    use super::*;
700    use murk_core::id::FieldId;
701    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
702    use murk_obs::spec::ObsRegion;
703    use murk_obs::{ObsEntry, ObsSpec};
704    use murk_space::{EdgeBehavior, Line1D};
705    use murk_test_utils::ConstPropagator;
706
707    fn scalar_field(name: &str) -> FieldDef {
708        FieldDef {
709            name: name.to_string(),
710            field_type: FieldType::Scalar,
711            mutability: FieldMutability::PerTick,
712            units: None,
713            bounds: None,
714            boundary_behavior: BoundaryBehavior::Clamp,
715        }
716    }
717
718    fn test_config() -> WorldConfig {
719        WorldConfig {
720            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
721            fields: vec![scalar_field("energy")],
722            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
723            dt: 0.1,
724            seed: 42,
725            ring_buffer_size: 8,
726            max_ingress_queue: 1024,
727            tick_rate_hz: Some(60.0),
728            backoff: crate::config::BackoffConfig::default(),
729        }
730    }
731
732    #[test]
733    fn lifecycle_start_and_shutdown() {
734        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
735
736        // Wait for at least one snapshot (polling with timeout).
737        let deadline = Instant::now() + Duration::from_secs(2);
738        while world.latest_snapshot().is_none() {
739            if Instant::now() > deadline {
740                panic!("no snapshot produced within 2s");
741            }
742            std::thread::sleep(Duration::from_millis(10));
743        }
744
745        let epoch = world.current_epoch();
746        assert!(epoch > 0, "epoch should have advanced");
747
748        let report = world.shutdown();
749        assert!(report.tick_joined);
750        assert!(report.workers_joined > 0);
751    }
752
753    #[test]
754    fn observe_returns_data() {
755        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
756
757        // Wait for at least one snapshot.
758        let deadline = Instant::now() + Duration::from_secs(2);
759        while world.latest_snapshot().is_none() {
760            if Instant::now() > deadline {
761                panic!("no snapshot produced within 2s");
762            }
763            std::thread::sleep(Duration::from_millis(10));
764        }
765
766        // Compile an obs plan.
767        let space = world.space();
768        let spec = ObsSpec {
769            entries: vec![ObsEntry {
770                field_id: FieldId(0),
771                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
772                pool: None,
773                transform: murk_obs::spec::ObsTransform::Identity,
774                dtype: murk_obs::spec::ObsDtype::F32,
775            }],
776        };
777        let plan_result = ObsPlan::compile(&spec, space).unwrap();
778        let plan = Arc::new(plan_result.plan);
779
780        let mut output = vec![0.0f32; plan_result.output_len];
781        let mut mask = vec![0u8; plan_result.mask_len];
782
783        let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
784        assert!(meta.tick_id.0 > 0);
785        assert_eq!(output.len(), 10);
786        assert!(output.iter().all(|&v| v == 42.0));
787
788        world.shutdown();
789    }
790
791    #[test]
792    fn concurrent_observe() {
793        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
794
795        // Wait for at least one snapshot.
796        let deadline = Instant::now() + Duration::from_secs(2);
797        while world.latest_snapshot().is_none() {
798            if Instant::now() > deadline {
799                panic!("no snapshot produced within 2s");
800            }
801            std::thread::sleep(Duration::from_millis(10));
802        }
803
804        let space = world.space();
805        let spec = ObsSpec {
806            entries: vec![ObsEntry {
807                field_id: FieldId(0),
808                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
809                pool: None,
810                transform: murk_obs::spec::ObsTransform::Identity,
811                dtype: murk_obs::spec::ObsDtype::F32,
812            }],
813        };
814        let plan_result = ObsPlan::compile(&spec, space).unwrap();
815        let plan = Arc::new(plan_result.plan);
816
817        // Spawn 4 threads all calling observe concurrently.
818        let world = Arc::new(world);
819        let handles: Vec<_> = (0..4)
820            .map(|_| {
821                let w = Arc::clone(&world);
822                let p = Arc::clone(&plan);
823                let out_len = plan_result.output_len;
824                let mask_len = plan_result.mask_len;
825                std::thread::spawn(move || {
826                    let mut output = vec![0.0f32; out_len];
827                    let mut mask = vec![0u8; mask_len];
828                    let meta = w.observe(&p, &mut output, &mut mask).unwrap();
829                    assert!(meta.tick_id.0 > 0);
830                    assert!(output.iter().all(|&v| v == 42.0));
831                })
832            })
833            .collect();
834
835        for h in handles {
836            h.join().unwrap();
837        }
838
839        // Shutdown via Arc — need to unwrap.
840        // Since we can't call shutdown on Arc, Drop will handle it.
841        drop(world);
842    }
843
844    #[test]
845    fn submit_commands_flow_through() {
846        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
847
848        // Wait for first tick.
849        let deadline = Instant::now() + Duration::from_secs(2);
850        while world.latest_snapshot().is_none() {
851            if Instant::now() > deadline {
852                panic!("no snapshot produced within 2s");
853            }
854            std::thread::sleep(Duration::from_millis(10));
855        }
856
857        // Submit a command.
858        let cmd = Command {
859            payload: murk_core::command::CommandPayload::SetParameter {
860                key: murk_core::id::ParameterKey(0),
861                value: 1.0,
862            },
863            expires_after_tick: murk_core::id::TickId(10000),
864            source_id: None,
865            source_seq: None,
866            priority_class: 1,
867            arrival_seq: 0,
868        };
869        let receipts = world.submit_commands(vec![cmd]).unwrap();
870        assert_eq!(receipts.len(), 1);
871        assert!(receipts[0].accepted);
872
873        world.shutdown();
874    }
875
876    #[test]
877    fn drop_triggers_shutdown() {
878        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
879        std::thread::sleep(Duration::from_millis(50));
880        drop(world);
881        // If this doesn't hang, shutdown worked.
882    }
883
884    #[test]
885    fn shutdown_budget() {
886        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
887        std::thread::sleep(Duration::from_millis(100));
888
889        let report = world.shutdown();
890        // Shutdown should complete well within 2s (generous for slow CI runners).
891        assert!(
892            report.total_ms < 2000,
893            "shutdown took too long: {}ms",
894            report.total_ms
895        );
896    }
897
898    /// Regression test: with a very slow tick rate, shutdown must still
899    /// complete within the documented budget. Before the fix, the tick
900    /// thread used `thread::sleep` which was uninterruptible by the
901    /// shutdown flag, causing shutdown to block for the full tick budget.
902    #[test]
903    fn shutdown_fast_with_slow_tick_rate() {
904        let config = WorldConfig {
905            tick_rate_hz: Some(0.5), // 2-second tick budget
906            ..test_config()
907        };
908        let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
909
910        // Wait for at least one tick to complete so the ring is non-empty
911        // and the tick thread enters its budget sleep.
912        let deadline = Instant::now() + Duration::from_secs(5);
913        while world.latest_snapshot().is_none() {
914            if Instant::now() > deadline {
915                panic!("no snapshot produced within 5s");
916            }
917            std::thread::sleep(Duration::from_millis(10));
918        }
919
920        // Give the tick thread time to enter its budget sleep.
921        std::thread::sleep(Duration::from_millis(50));
922
923        let start = Instant::now();
924        let report = world.shutdown();
925        let wall_ms = start.elapsed().as_millis();
926
927        // Shutdown should complete well under the 2-second tick budget.
928        // Allow 500ms for CI overhead; the point is it shouldn't take 2s.
929        assert!(
930            wall_ms < 500,
931            "shutdown took {wall_ms}ms with 0.5Hz tick rate \
932             (report: total={}ms, drain={}ms, quiesce={}ms)",
933            report.total_ms,
934            report.drain_ms,
935            report.quiesce_ms
936        );
937        assert!(report.tick_joined);
938    }
939
940    #[test]
941    fn reset_lifecycle() {
942        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
943
944        // Wait for some ticks (generous timeout for slow CI runners).
945        let deadline = Instant::now() + Duration::from_secs(5);
946        while world.current_epoch() < 5 {
947            if Instant::now() > deadline {
948                panic!("epoch didn't reach 5 within 5s");
949            }
950            std::thread::sleep(Duration::from_millis(10));
951        }
952        let epoch_before = world.current_epoch();
953        assert!(epoch_before >= 5);
954
955        // Reset with a new seed.
956        world.reset(99).unwrap();
957
958        // After reset, epoch should restart from 0.
959        assert_eq!(world.current_epoch(), 0);
960
961        // The world should produce new snapshots.
962        let deadline = Instant::now() + Duration::from_secs(2);
963        while world.latest_snapshot().is_none() {
964            if Instant::now() > deadline {
965                panic!("no snapshot after reset within 2s");
966            }
967            std::thread::sleep(Duration::from_millis(10));
968        }
969        assert!(world.current_epoch() > 0, "should be ticking after reset");
970
971        world.shutdown();
972    }
973
974    #[test]
975    fn arc_space_wrapper_topology_eq() {
976        // Two ArcSpaceWrappers around identical Line1D spaces must compare
977        // as topologically equal. Before the fix, the inner downcast saw
978        // ArcSpaceWrapper instead of Line1D and returned false.
979        let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
980        let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
981        assert!(
982            a.topology_eq(&b),
983            "identical Line1D through ArcSpaceWrapper should be topology-equal"
984        );
985
986        // Different sizes must not match.
987        let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
988        assert!(
989            !a.topology_eq(&c),
990            "different Line1D sizes should not be topology-equal"
991        );
992
993        // Comparing ArcSpaceWrapper with a bare Line1D should also work.
994        let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
995        assert!(
996            a.topology_eq(&bare),
997            "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
998        );
999    }
1000}