Skip to main content

murk_engine/
realtime.rs

1//! User-facing `RealtimeAsyncWorld` API and shutdown state machine.
2//!
3//! This is the primary mode for RL training: the tick engine runs on a
4//! dedicated background thread at a configurable rate (default 60 Hz),
5//! while an egress thread pool serves observation requests concurrently.
6//!
7//! # Architecture
8//!
9//! ```text
10//! User Thread(s)              Tick Thread              Egress Workers (N)
11//!     |                           |                         |
12//!     |--submit_commands()------->| cmd_rx.try_recv()       |
13//!     |   [cmd_tx: bounded(64)]   | engine.submit_commands()|
14//!     |<--receipts via reply_tx---| engine.execute_tick()   |
15//!     |                           | ring.push(snap)         |
16//!     |                           | epoch_counter.advance() |
17//!     |                           | check_stalled_workers() |
18//!     |                           | sleep(budget - elapsed) |
19//!     |                           |                         |
20//!     |--observe()------------------------------------->    |
21//!     |   [obs_tx: bounded(N*4)]               task_rx.recv()
22//!     |   blocks on reply_rx                   ring.latest()
23//!     |                                        pin epoch
24//!     |                                        execute ObsPlan
25//!     |                                        unpin epoch
26//!     |<--result via reply_tx--------------------------    |
27//! ```
28
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48// ── Error types ──────────────────────────────────────────────────
49
50/// Error submitting commands to the tick thread.
51#[derive(Debug)]
52pub enum SubmitError {
53    /// The tick thread has shut down.
54    Shutdown,
55    /// The command channel is full (back-pressure).
56    ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Shutdown => write!(f, "tick thread has shut down"),
63            Self::ChannelFull => write!(f, "command channel full"),
64        }
65    }
66}
67
68impl std::error::Error for SubmitError {}
69
70// ── ShutdownReport ───────────────────────────────────────────────
71
72/// Report from the shutdown state machine.
73#[derive(Debug)]
74pub struct ShutdownReport {
75    /// Total time spent in the shutdown sequence.
76    pub total_ms: u64,
77    /// Time spent draining the tick thread.
78    pub drain_ms: u64,
79    /// Time spent quiescing workers.
80    pub quiesce_ms: u64,
81    /// Whether the tick thread was joined successfully.
82    pub tick_joined: bool,
83    /// Number of worker threads joined.
84    pub workers_joined: usize,
85}
86
87// ── ShutdownState ────────────────────────────────────────────────
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90enum ShutdownState {
91    Running,
92    Draining,
93    Quiescing,
94    Dropped,
95}
96
97// ── RealtimeAsyncWorld ───────────────────────────────────────────
98
99/// Realtime asynchronous simulation world.
100///
101/// Runs the tick engine on a background thread and serves observations
102/// from a pool of egress workers. This is the primary API for RL
103/// training environments.
104pub struct RealtimeAsyncWorld {
105    ring: Arc<SnapshotRing>,
106    epoch_counter: Arc<EpochCounter>,
107    worker_epochs: Arc<[WorkerEpoch]>,
108    cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
109    obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
110    shutdown_flag: Arc<AtomicBool>,
111    tick_stopped: Arc<AtomicBool>,
112    tick_thread: Option<JoinHandle<TickEngine>>,
113    worker_threads: Vec<JoinHandle<()>>,
114    state: ShutdownState,
115    /// Recovered from tick thread on shutdown, used for `reset()`.
116    /// Wrapped in Mutex so RealtimeAsyncWorld is Sync (TickEngine
117    /// contains `Vec<Box<dyn Propagator>>` which is Send but not Sync).
118    /// Never contended: only accessed during reset() which takes &mut self.
119    recovered_engine: Mutex<Option<TickEngine>>,
120    config: AsyncConfig,
121    backoff_config: BackoffConfig,
122    seed: u64,
123    tick_rate_hz: f64,
124    /// Shared space for agent-relative observations and engine reconstruction.
125    space: Arc<dyn Space>,
126}
127
128impl RealtimeAsyncWorld {
129    /// Create a new realtime async world and spawn all threads.
130    ///
131    /// The `WorldConfig` is consumed: the `TickEngine` is moved into
132    /// the tick thread. The `space` is shared via `Arc` for egress
133    /// workers that need it for agent-relative observations.
134    pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
135        let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
136        if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
137            return Err(ConfigError::InvalidTickRate {
138                value: tick_rate_hz,
139            });
140        }
141
142        let seed = config.seed;
143        let ring_size = config.ring_buffer_size;
144        let backoff_config = config.backoff.clone();
145        let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
146        let cancel_grace_ms = async_config.cancel_grace_ms;
147
148        // Share the space for agent-relative observations.
149        let space: Arc<dyn Space> = Arc::from(config.space);
150
151        // Reconstruct WorldConfig with the Arc'd space (Box from Arc).
152        // We clone the space Arc for our own reference, and give
153        // TickEngine a Box wrapper.
154        let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
155        let engine_config = WorldConfig {
156            space: engine_space,
157            fields: config.fields,
158            propagators: config.propagators,
159            dt: config.dt,
160            seed: config.seed,
161            ring_buffer_size: config.ring_buffer_size,
162            max_ingress_queue: config.max_ingress_queue,
163            tick_rate_hz: config.tick_rate_hz,
164            backoff: backoff_config.clone(),
165        };
166
167        let engine = TickEngine::new(engine_config)?;
168
169        let worker_count = async_config.resolved_worker_count();
170        let ring = Arc::new(SnapshotRing::new(ring_size));
171        let epoch_counter = Arc::new(EpochCounter::new());
172        let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
173            .map(WorkerEpoch::new)
174            .collect::<Vec<_>>()
175            .into();
176
177        let shutdown_flag = Arc::new(AtomicBool::new(false));
178        let tick_stopped = Arc::new(AtomicBool::new(false));
179
180        // Command channel: bounded(64) — tick thread drains each tick.
181        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
182
183        // Task channel for egress workers: bounded(worker_count * 4).
184        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
185
186        // Spawn tick thread — returns TickEngine on exit for reset().
187        let tick_ring = Arc::clone(&ring);
188        let tick_epoch = Arc::clone(&epoch_counter);
189        let tick_workers = Arc::clone(&worker_epochs);
190        let tick_shutdown = Arc::clone(&shutdown_flag);
191        let tick_stopped_flag = Arc::clone(&tick_stopped);
192        let stored_backoff = backoff_config.clone();
193        let tick_thread = thread::Builder::new()
194            .name("murk-tick".into())
195            .spawn(move || {
196                let state = TickThreadState::new(
197                    engine,
198                    tick_ring,
199                    tick_epoch,
200                    tick_workers,
201                    cmd_rx,
202                    tick_shutdown,
203                    tick_stopped_flag,
204                    tick_rate_hz,
205                    max_epoch_hold_ms,
206                    cancel_grace_ms,
207                    &backoff_config,
208                );
209                state.run()
210            })
211            .expect("failed to spawn tick thread");
212
213        // Spawn egress worker threads.
214        let worker_threads = Self::spawn_egress_workers(
215            worker_count,
216            &obs_rx,
217            &ring,
218            &epoch_counter,
219            &worker_epochs,
220        );
221
222        Ok(Self {
223            ring,
224            epoch_counter,
225            worker_epochs,
226            cmd_tx: Some(cmd_tx),
227            obs_tx: Some(obs_tx),
228            shutdown_flag,
229            tick_stopped,
230            tick_thread: Some(tick_thread),
231            worker_threads,
232            state: ShutdownState::Running,
233            recovered_engine: Mutex::new(None),
234            config: async_config,
235            backoff_config: stored_backoff,
236            seed,
237            tick_rate_hz,
238            space,
239        })
240    }
241
242    /// Submit commands to be processed in the next tick.
243    ///
244    /// Non-blocking: sends the batch via channel and blocks only for
245    /// the receipt reply (which arrives within one tick period).
246    pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
247        let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
248
249        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
250        let batch = IngressBatch {
251            commands,
252            reply: reply_tx,
253        };
254
255        cmd_tx.try_send(batch).map_err(|e| match e {
256            crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
257            crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
258        })?;
259
260        // Wait for receipts (blocks until tick thread processes the batch).
261        reply_rx.recv().map_err(|_| SubmitError::Shutdown)
262    }
263
264    /// Extract an observation from the latest snapshot.
265    ///
266    /// Blocking: dispatches to an egress worker and waits for the result.
267    /// The output and mask buffers must be pre-allocated.
268    pub fn observe(
269        &self,
270        plan: &Arc<ObsPlan>,
271        output: &mut [f32],
272        mask: &mut [u8],
273    ) -> Result<ObsMetadata, ObsError> {
274        let obs_tx = self
275            .obs_tx
276            .as_ref()
277            .ok_or_else(|| ObsError::ExecutionFailed {
278                reason: "world is shut down".into(),
279            })?;
280
281        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
282        let task = ObsTask::Simple {
283            plan: Arc::clone(plan),
284            output_len: output.len(),
285            mask_len: mask.len(),
286            reply: reply_tx,
287        };
288
289        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
290            reason: "egress pool shut down".into(),
291        })?;
292
293        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
294            reason: "worker disconnected".into(),
295        })?;
296
297        match result {
298            ObsResult::Simple {
299                metadata,
300                output: buf,
301                mask: mbuf,
302            } => {
303                output[..buf.len()].copy_from_slice(&buf);
304                mask[..mbuf.len()].copy_from_slice(&mbuf);
305                Ok(metadata)
306            }
307            ObsResult::Error(e) => Err(e),
308            _ => Err(ObsError::ExecutionFailed {
309                reason: "unexpected result type".into(),
310            }),
311        }
312    }
313
314    /// Extract agent-relative observations from the latest snapshot.
315    ///
316    /// Each agent gets `output_len / n_agents` elements in the output buffer.
317    pub fn observe_agents(
318        &self,
319        plan: &Arc<ObsPlan>,
320        space: &Arc<dyn Space>,
321        agent_centers: &[Coord],
322        output: &mut [f32],
323        mask: &mut [u8],
324    ) -> Result<Vec<ObsMetadata>, ObsError> {
325        let obs_tx = self
326            .obs_tx
327            .as_ref()
328            .ok_or_else(|| ObsError::ExecutionFailed {
329                reason: "world is shut down".into(),
330            })?;
331
332        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
333        let n_agents = agent_centers.len();
334        let per_agent_output = if n_agents > 0 {
335            output.len() / n_agents
336        } else {
337            0
338        };
339        let per_agent_mask = if n_agents > 0 {
340            mask.len() / n_agents
341        } else {
342            0
343        };
344
345        let task = ObsTask::Agents {
346            plan: Arc::clone(plan),
347            space: Arc::clone(space),
348            agent_centers: agent_centers.to_vec(),
349            output_len: per_agent_output,
350            mask_len: per_agent_mask,
351            reply: reply_tx,
352        };
353
354        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
355            reason: "egress pool shut down".into(),
356        })?;
357
358        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
359            reason: "worker disconnected".into(),
360        })?;
361
362        match result {
363            ObsResult::Agents {
364                metadata,
365                output: buf,
366                mask: mbuf,
367            } => {
368                let copy_out = buf.len().min(output.len());
369                output[..copy_out].copy_from_slice(&buf[..copy_out]);
370                let copy_mask = mbuf.len().min(mask.len());
371                mask[..copy_mask].copy_from_slice(&mbuf[..copy_mask]);
372                Ok(metadata)
373            }
374            ObsResult::Error(e) => Err(e),
375            _ => Err(ObsError::ExecutionFailed {
376                reason: "unexpected result type".into(),
377            }),
378        }
379    }
380
381    /// Spawn egress worker threads (shared between `new` and `reset`).
382    fn spawn_egress_workers(
383        worker_count: usize,
384        obs_rx: &crossbeam_channel::Receiver<ObsTask>,
385        ring: &Arc<SnapshotRing>,
386        epoch_counter: &Arc<EpochCounter>,
387        worker_epochs: &Arc<[WorkerEpoch]>,
388    ) -> Vec<JoinHandle<()>> {
389        let mut worker_threads = Vec::with_capacity(worker_count);
390        for i in 0..worker_count {
391            let obs_rx = obs_rx.clone();
392            let ring = Arc::clone(ring);
393            let epoch = Arc::clone(epoch_counter);
394            let worker_epochs_ref = Arc::clone(worker_epochs);
395            let handle = thread::Builder::new()
396                .name(format!("murk-egress-{i}"))
397                .spawn(move || {
398                    crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
399                })
400                .expect("failed to spawn egress worker");
401            worker_threads.push(handle);
402        }
403        worker_threads
404    }
405
406    /// Get the latest snapshot directly from the ring.
407    pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
408        self.ring.latest()
409    }
410
411    /// Current epoch (lock-free read).
412    pub fn current_epoch(&self) -> u64 {
413        self.epoch_counter.current()
414    }
415
416    /// Shutdown the world with the 4-state machine.
417    ///
418    /// 1. **Running → Draining (≤33ms):** Set shutdown flag, wait for tick stop.
419    /// 2. **Draining → Quiescing (≤200ms):** Cancel workers, drop obs channel.
420    /// 3. **Quiescing → Dropped (≤10ms):** Join all threads.
421    pub fn shutdown(&mut self) -> ShutdownReport {
422        if self.state == ShutdownState::Dropped {
423            return ShutdownReport {
424                total_ms: 0,
425                drain_ms: 0,
426                quiesce_ms: 0,
427                tick_joined: true,
428                workers_joined: 0,
429            };
430        }
431
432        let start = Instant::now();
433
434        // Phase 1: Running → Draining
435        self.state = ShutdownState::Draining;
436        self.shutdown_flag.store(true, Ordering::Release);
437
438        // Wait for tick thread to acknowledge (≤33ms budget).
439        let drain_deadline = Instant::now() + Duration::from_millis(33);
440        while !self.tick_stopped.load(Ordering::Acquire) {
441            if Instant::now() > drain_deadline {
442                break;
443            }
444            thread::yield_now();
445        }
446        let drain_ms = start.elapsed().as_millis() as u64;
447
448        // Phase 2: Draining → Quiescing
449        self.state = ShutdownState::Quiescing;
450
451        // Cancel all workers.
452        for w in self.worker_epochs.iter() {
453            w.request_cancel();
454        }
455
456        // Drop the command and observation channels to unblock workers.
457        self.cmd_tx.take();
458        self.obs_tx.take();
459
460        // Wait for all workers to unpin (≤200ms budget).
461        let quiesce_deadline = Instant::now() + Duration::from_millis(200);
462        loop {
463            let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
464            if all_unpinned || Instant::now() > quiesce_deadline {
465                break;
466            }
467            thread::yield_now();
468        }
469        let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
470
471        // Phase 3: Quiescing → Dropped
472        self.state = ShutdownState::Dropped;
473
474        let tick_joined = if let Some(handle) = self.tick_thread.take() {
475            match handle.join() {
476                Ok(engine) => {
477                    *self.recovered_engine.lock().unwrap() = Some(engine);
478                    true
479                }
480                Err(_) => false,
481            }
482        } else {
483            true
484        };
485
486        let mut workers_joined = 0;
487        for handle in self.worker_threads.drain(..) {
488            if handle.join().is_ok() {
489                workers_joined += 1;
490            }
491        }
492
493        let total_ms = start.elapsed().as_millis() as u64;
494        ShutdownReport {
495            total_ms,
496            drain_ms,
497            quiesce_ms,
498            tick_joined,
499            workers_joined,
500        }
501    }
502
503    /// Reset the world: stop all threads, reset engine state, restart.
504    ///
505    /// This is the RL episode-boundary operation. The engine is recovered
506    /// from the tick thread, reset in-place, then respawned with fresh
507    /// channels and worker threads.
508    pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
509        // Shutdown if still running (recovers engine via JoinHandle).
510        if self.state != ShutdownState::Dropped {
511            self.shutdown();
512        }
513
514        self.seed = seed;
515
516        // Recover the engine from the previous tick thread.
517        let mut engine = self
518            .recovered_engine
519            .lock()
520            .unwrap()
521            .take()
522            .ok_or(ConfigError::InvalidTickRate { value: 0.0 })?;
523
524        // Reset the engine (clears arena, ingress, tick counter).
525        // If reset fails, restore the engine so a subsequent reset can retry.
526        if let Err(e) = engine.reset() {
527            *self.recovered_engine.lock().unwrap() = Some(engine);
528            return Err(e);
529        }
530
531        // Fresh shared state.
532        let worker_count = self.config.resolved_worker_count();
533        self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
534        self.epoch_counter = Arc::new(EpochCounter::new());
535        self.worker_epochs = (0..worker_count as u32)
536            .map(WorkerEpoch::new)
537            .collect::<Vec<_>>()
538            .into();
539        self.shutdown_flag = Arc::new(AtomicBool::new(false));
540        self.tick_stopped = Arc::new(AtomicBool::new(false));
541
542        // Fresh channels.
543        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
544        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
545        self.cmd_tx = Some(cmd_tx);
546        self.obs_tx = Some(obs_tx);
547
548        // Respawn tick thread.
549        let tick_ring = Arc::clone(&self.ring);
550        let tick_epoch = Arc::clone(&self.epoch_counter);
551        let tick_workers = Arc::clone(&self.worker_epochs);
552        let tick_shutdown = Arc::clone(&self.shutdown_flag);
553        let tick_stopped_flag = Arc::clone(&self.tick_stopped);
554        let tick_rate_hz = self.tick_rate_hz;
555        let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
556        let cancel_grace_ms = self.config.cancel_grace_ms;
557        let backoff_config = self.backoff_config.clone();
558        self.tick_thread = Some(
559            thread::Builder::new()
560                .name("murk-tick".into())
561                .spawn(move || {
562                    let state = TickThreadState::new(
563                        engine,
564                        tick_ring,
565                        tick_epoch,
566                        tick_workers,
567                        cmd_rx,
568                        tick_shutdown,
569                        tick_stopped_flag,
570                        tick_rate_hz,
571                        max_epoch_hold_ms,
572                        cancel_grace_ms,
573                        &backoff_config,
574                    );
575                    state.run()
576                })
577                .expect("failed to spawn tick thread"),
578        );
579
580        // Respawn egress workers.
581        self.worker_threads = Self::spawn_egress_workers(
582            worker_count,
583            &obs_rx,
584            &self.ring,
585            &self.epoch_counter,
586            &self.worker_epochs,
587        );
588
589        self.state = ShutdownState::Running;
590        Ok(())
591    }
592
593    /// The shared space used for agent-relative observations.
594    pub fn space(&self) -> &dyn Space {
595        self.space.as_ref()
596    }
597}
598
599impl Drop for RealtimeAsyncWorld {
600    fn drop(&mut self) {
601        if self.state != ShutdownState::Dropped {
602            self.shutdown();
603        }
604    }
605}
606
607// ── ArcSpaceWrapper ──────────────────────────────────────────────
608
609/// Wrapper that implements `Space` by delegating to an `Arc<dyn Space>`.
610///
611/// This allows `TickEngine` to take a `Box<dyn Space>` while the
612/// `RealtimeAsyncWorld` retains a shared `Arc` reference for egress.
613struct ArcSpaceWrapper(Arc<dyn Space>);
614
615impl murk_space::Space for ArcSpaceWrapper {
616    fn ndim(&self) -> usize {
617        self.0.ndim()
618    }
619
620    fn cell_count(&self) -> usize {
621        self.0.cell_count()
622    }
623
624    fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
625        self.0.neighbours(coord)
626    }
627
628    fn distance(&self, a: &Coord, b: &Coord) -> f64 {
629        self.0.distance(a, b)
630    }
631
632    fn compile_region(
633        &self,
634        spec: &murk_space::RegionSpec,
635    ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
636        self.0.compile_region(spec)
637    }
638
639    fn canonical_ordering(&self) -> Vec<Coord> {
640        self.0.canonical_ordering()
641    }
642
643    fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
644        self.0.canonical_rank(coord)
645    }
646
647    fn instance_id(&self) -> murk_core::SpaceInstanceId {
648        self.0.instance_id()
649    }
650}
651
652// Delegate optional Space methods if they exist.
653impl std::fmt::Debug for ArcSpaceWrapper {
654    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655        f.debug_tuple("ArcSpaceWrapper").finish()
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662    use murk_core::id::FieldId;
663    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
664    use murk_obs::spec::ObsRegion;
665    use murk_obs::{ObsEntry, ObsSpec};
666    use murk_space::{EdgeBehavior, Line1D};
667    use murk_test_utils::ConstPropagator;
668
669    fn scalar_field(name: &str) -> FieldDef {
670        FieldDef {
671            name: name.to_string(),
672            field_type: FieldType::Scalar,
673            mutability: FieldMutability::PerTick,
674            units: None,
675            bounds: None,
676            boundary_behavior: BoundaryBehavior::Clamp,
677        }
678    }
679
680    fn test_config() -> WorldConfig {
681        WorldConfig {
682            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
683            fields: vec![scalar_field("energy")],
684            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
685            dt: 0.1,
686            seed: 42,
687            ring_buffer_size: 8,
688            max_ingress_queue: 1024,
689            tick_rate_hz: Some(60.0),
690            backoff: crate::config::BackoffConfig::default(),
691        }
692    }
693
694    #[test]
695    fn lifecycle_start_and_shutdown() {
696        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
697
698        // Wait for at least one snapshot (polling with timeout).
699        let deadline = Instant::now() + Duration::from_secs(2);
700        while world.latest_snapshot().is_none() {
701            if Instant::now() > deadline {
702                panic!("no snapshot produced within 2s");
703            }
704            std::thread::sleep(Duration::from_millis(10));
705        }
706
707        let epoch = world.current_epoch();
708        assert!(epoch > 0, "epoch should have advanced");
709
710        let report = world.shutdown();
711        assert!(report.tick_joined);
712        assert!(report.workers_joined > 0);
713    }
714
715    #[test]
716    fn observe_returns_data() {
717        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
718
719        // Wait for at least one snapshot.
720        let deadline = Instant::now() + Duration::from_secs(2);
721        while world.latest_snapshot().is_none() {
722            if Instant::now() > deadline {
723                panic!("no snapshot produced within 2s");
724            }
725            std::thread::sleep(Duration::from_millis(10));
726        }
727
728        // Compile an obs plan.
729        let space = world.space();
730        let spec = ObsSpec {
731            entries: vec![ObsEntry {
732                field_id: FieldId(0),
733                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
734                pool: None,
735                transform: murk_obs::spec::ObsTransform::Identity,
736                dtype: murk_obs::spec::ObsDtype::F32,
737            }],
738        };
739        let plan_result = ObsPlan::compile(&spec, space).unwrap();
740        let plan = Arc::new(plan_result.plan);
741
742        let mut output = vec![0.0f32; plan_result.output_len];
743        let mut mask = vec![0u8; plan_result.mask_len];
744
745        let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
746        assert!(meta.tick_id.0 > 0);
747        assert_eq!(output.len(), 10);
748        assert!(output.iter().all(|&v| v == 42.0));
749
750        world.shutdown();
751    }
752
753    #[test]
754    fn concurrent_observe() {
755        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
756
757        // Wait for at least one snapshot.
758        let deadline = Instant::now() + Duration::from_secs(2);
759        while world.latest_snapshot().is_none() {
760            if Instant::now() > deadline {
761                panic!("no snapshot produced within 2s");
762            }
763            std::thread::sleep(Duration::from_millis(10));
764        }
765
766        let space = world.space();
767        let spec = ObsSpec {
768            entries: vec![ObsEntry {
769                field_id: FieldId(0),
770                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
771                pool: None,
772                transform: murk_obs::spec::ObsTransform::Identity,
773                dtype: murk_obs::spec::ObsDtype::F32,
774            }],
775        };
776        let plan_result = ObsPlan::compile(&spec, space).unwrap();
777        let plan = Arc::new(plan_result.plan);
778
779        // Spawn 4 threads all calling observe concurrently.
780        let world = Arc::new(world);
781        let handles: Vec<_> = (0..4)
782            .map(|_| {
783                let w = Arc::clone(&world);
784                let p = Arc::clone(&plan);
785                let out_len = plan_result.output_len;
786                let mask_len = plan_result.mask_len;
787                std::thread::spawn(move || {
788                    let mut output = vec![0.0f32; out_len];
789                    let mut mask = vec![0u8; mask_len];
790                    let meta = w.observe(&p, &mut output, &mut mask).unwrap();
791                    assert!(meta.tick_id.0 > 0);
792                    assert!(output.iter().all(|&v| v == 42.0));
793                })
794            })
795            .collect();
796
797        for h in handles {
798            h.join().unwrap();
799        }
800
801        // Shutdown via Arc — need to unwrap.
802        // Since we can't call shutdown on Arc, Drop will handle it.
803        drop(world);
804    }
805
806    #[test]
807    fn submit_commands_flow_through() {
808        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
809
810        // Wait for first tick.
811        let deadline = Instant::now() + Duration::from_secs(2);
812        while world.latest_snapshot().is_none() {
813            if Instant::now() > deadline {
814                panic!("no snapshot produced within 2s");
815            }
816            std::thread::sleep(Duration::from_millis(10));
817        }
818
819        // Submit a command.
820        let cmd = Command {
821            payload: murk_core::command::CommandPayload::SetParameter {
822                key: murk_core::id::ParameterKey(0),
823                value: 1.0,
824            },
825            expires_after_tick: murk_core::id::TickId(10000),
826            source_id: None,
827            source_seq: None,
828            priority_class: 1,
829            arrival_seq: 0,
830        };
831        let receipts = world.submit_commands(vec![cmd]).unwrap();
832        assert_eq!(receipts.len(), 1);
833        assert!(receipts[0].accepted);
834
835        world.shutdown();
836    }
837
838    #[test]
839    fn drop_triggers_shutdown() {
840        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
841        std::thread::sleep(Duration::from_millis(50));
842        drop(world);
843        // If this doesn't hang, shutdown worked.
844    }
845
846    #[test]
847    fn shutdown_budget() {
848        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
849        std::thread::sleep(Duration::from_millis(100));
850
851        let report = world.shutdown();
852        // Shutdown should complete well within 500ms.
853        assert!(
854            report.total_ms < 500,
855            "shutdown took too long: {}ms",
856            report.total_ms
857        );
858    }
859
860    #[test]
861    fn reset_lifecycle() {
862        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
863
864        // Wait for some ticks.
865        let deadline = Instant::now() + Duration::from_secs(2);
866        while world.current_epoch() < 5 {
867            if Instant::now() > deadline {
868                panic!("epoch didn't reach 5 within 2s");
869            }
870            std::thread::sleep(Duration::from_millis(10));
871        }
872        let epoch_before = world.current_epoch();
873        assert!(epoch_before >= 5);
874
875        // Reset with a new seed.
876        world.reset(99).unwrap();
877
878        // After reset, epoch should restart from 0.
879        assert_eq!(world.current_epoch(), 0);
880
881        // The world should produce new snapshots.
882        let deadline = Instant::now() + Duration::from_secs(2);
883        while world.latest_snapshot().is_none() {
884            if Instant::now() > deadline {
885                panic!("no snapshot after reset within 2s");
886            }
887            std::thread::sleep(Duration::from_millis(10));
888        }
889        assert!(world.current_epoch() > 0, "should be ticking after reset");
890
891        world.shutdown();
892    }
893}