Skip to main content

murk_engine/
realtime.rs

1//! User-facing `RealtimeAsyncWorld` API and shutdown state machine.
2//!
3//! This is the primary mode for RL training: the tick engine runs on a
4//! dedicated background thread at a configurable rate (default 60 Hz),
5//! while an egress thread pool serves observation requests concurrently.
6//!
7//! # Architecture
8//!
9//! ```text
10//! User Thread(s)              Tick Thread              Egress Workers (N)
11//!     |                           |                         |
12//!     |--submit_commands()------->| cmd_rx.try_recv()       |
13//!     |   [cmd_tx: bounded(64)]   | engine.submit_commands()|
14//!     |<--receipts via reply_tx---| engine.execute_tick()   |
15//!     |                           | ring.push(snap)         |
16//!     |                           | epoch_counter.advance() |
17//!     |                           | check_stalled_workers() |
18//!     |                           | sleep(budget - elapsed) |
19//!     |                           |                         |
20//!     |--observe()------------------------------------->    |
21//!     |   [obs_tx: bounded(N*4)]               task_rx.recv()
22//!     |   blocks on reply_rx                   ring.latest()
23//!     |                                        pin epoch
24//!     |                                        execute ObsPlan
25//!     |                                        unpin epoch
26//!     |<--result via reply_tx--------------------------    |
27//! ```
28
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48// ── Error types ──────────────────────────────────────────────────
49
50/// Error submitting commands to the tick thread.
51#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53    /// The tick thread has shut down.
54    Shutdown,
55    /// The command channel is full (back-pressure).
56    ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Shutdown => write!(f, "tick thread has shut down"),
63            Self::ChannelFull => write!(f, "command channel full"),
64        }
65    }
66}
67
68impl std::error::Error for SubmitError {}
69
70// ── ShutdownReport ───────────────────────────────────────────────
71
72/// Report from the shutdown state machine.
73#[derive(Debug)]
74pub struct ShutdownReport {
75    /// Total time spent in the shutdown sequence.
76    pub total_ms: u64,
77    /// Time spent draining the tick thread.
78    pub drain_ms: u64,
79    /// Time spent quiescing workers.
80    pub quiesce_ms: u64,
81    /// Whether the tick thread was joined successfully.
82    pub tick_joined: bool,
83    /// Number of worker threads joined.
84    pub workers_joined: usize,
85}
86
87/// Non-blocking visibility snapshot for realtime health checks.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct RealtimePreflight {
90    /// Pending command batches in the ingress channel.
91    pub command_queue_depth: usize,
92    /// Ingress channel capacity (0 if unavailable/shut down).
93    pub command_queue_capacity: usize,
94    /// Pending observation tasks in the egress task channel.
95    pub observe_queue_depth: usize,
96    /// Egress task channel capacity (0 if unavailable/shut down).
97    pub observe_queue_capacity: usize,
98    /// Whether a snapshot is currently available to serve observes.
99    pub has_snapshot: bool,
100    /// Tick ID of the latest snapshot (0 when unavailable).
101    pub latest_snapshot_tick_id: u64,
102    /// Snapshot age in ticks relative to the current epoch.
103    pub snapshot_age_ticks: u64,
104    /// Ring retention capacity (maximum number of retained snapshots).
105    pub ring_capacity: usize,
106    /// Current retained snapshot count.
107    pub ring_len: usize,
108    /// Current monotonic ring write position.
109    pub ring_write_pos: u64,
110    /// Oldest retained write position (None when ring is empty).
111    pub ring_oldest_retained_pos: Option<u64>,
112    /// Cumulative count of evictions from ring overwrite.
113    pub ring_eviction_events: u64,
114    /// Cumulative count of stale/not-yet-written position reads.
115    pub ring_stale_read_events: u64,
116    /// Cumulative count of overwrite-skew retry events.
117    pub ring_skew_retry_events: u64,
118    /// Whether the tick thread has already stopped.
119    pub tick_thread_stopped: bool,
120}
121
122// ── ShutdownState ────────────────────────────────────────────────
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum ShutdownState {
126    Running,
127    Draining,
128    Quiescing,
129    Dropped,
130}
131
132// ── RealtimeAsyncWorld ───────────────────────────────────────────
133
134/// Realtime asynchronous simulation world.
135///
136/// Runs the tick engine on a background thread and serves observations
137/// from a pool of egress workers. This is the primary API for RL
138/// training environments.
139pub struct RealtimeAsyncWorld {
140    ring: Arc<SnapshotRing>,
141    epoch_counter: Arc<EpochCounter>,
142    worker_epochs: Arc<[WorkerEpoch]>,
143    cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
144    obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
145    shutdown_flag: Arc<AtomicBool>,
146    tick_stopped: Arc<AtomicBool>,
147    tick_thread: Option<JoinHandle<TickEngine>>,
148    worker_threads: Vec<JoinHandle<()>>,
149    state: ShutdownState,
150    /// Recovered from tick thread on shutdown, used for `reset()`.
151    /// Wrapped in Mutex so RealtimeAsyncWorld is Sync (TickEngine
152    /// contains `Vec<Box<dyn Propagator>>` which is Send but not Sync).
153    /// Never contended: only accessed during reset() which takes &mut self.
154    recovered_engine: Mutex<Option<TickEngine>>,
155    config: AsyncConfig,
156    backoff_config: BackoffConfig,
157    seed: u64,
158    tick_rate_hz: f64,
159    /// Shared space for agent-relative observations and engine reconstruction.
160    space: Arc<dyn Space>,
161}
162
163impl RealtimeAsyncWorld {
164    /// Create a new realtime async world and spawn all threads.
165    ///
166    /// The `WorldConfig` is consumed: the `TickEngine` is moved into
167    /// the tick thread. The `space` is shared via `Arc` for egress
168    /// workers that need it for agent-relative observations.
169    pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
170        let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
171        if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 {
172            return Err(ConfigError::InvalidTickRate {
173                value: tick_rate_hz,
174            });
175        }
176
177        let seed = config.seed;
178        let ring_size = config.ring_buffer_size;
179        let backoff_config = config.backoff.clone();
180        let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
181        let cancel_grace_ms = async_config.cancel_grace_ms;
182
183        // Share the space for agent-relative observations.
184        let space: Arc<dyn Space> = Arc::from(config.space);
185
186        // Reconstruct WorldConfig with the Arc'd space (Box from Arc).
187        // We clone the space Arc for our own reference, and give
188        // TickEngine a Box wrapper.
189        let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
190        let engine_config = WorldConfig {
191            space: engine_space,
192            fields: config.fields,
193            propagators: config.propagators,
194            dt: config.dt,
195            seed: config.seed,
196            ring_buffer_size: config.ring_buffer_size,
197            max_ingress_queue: config.max_ingress_queue,
198            tick_rate_hz: config.tick_rate_hz,
199            backoff: backoff_config.clone(),
200        };
201
202        let engine = TickEngine::new(engine_config)?;
203
204        let worker_count = async_config.resolved_worker_count();
205        let ring = Arc::new(SnapshotRing::new(ring_size));
206        let epoch_counter = Arc::new(EpochCounter::new());
207        let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
208            .map(WorkerEpoch::new)
209            .collect::<Vec<_>>()
210            .into();
211
212        let shutdown_flag = Arc::new(AtomicBool::new(false));
213        let tick_stopped = Arc::new(AtomicBool::new(false));
214
215        // Command channel: bounded(64) — tick thread drains each tick.
216        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
217
218        // Task channel for egress workers: bounded(worker_count * 4).
219        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
220
221        // Spawn tick thread — returns TickEngine on exit for reset().
222        let tick_ring = Arc::clone(&ring);
223        let tick_epoch = Arc::clone(&epoch_counter);
224        let tick_workers = Arc::clone(&worker_epochs);
225        let tick_shutdown = Arc::clone(&shutdown_flag);
226        let tick_stopped_flag = Arc::clone(&tick_stopped);
227        let stored_backoff = backoff_config.clone();
228        let tick_thread = thread::Builder::new()
229            .name("murk-tick".into())
230            .spawn(move || {
231                let state = TickThreadState::new(
232                    engine,
233                    tick_ring,
234                    tick_epoch,
235                    tick_workers,
236                    cmd_rx,
237                    tick_shutdown,
238                    tick_stopped_flag,
239                    tick_rate_hz,
240                    max_epoch_hold_ms,
241                    cancel_grace_ms,
242                    &backoff_config,
243                );
244                state.run()
245            })
246            .expect("failed to spawn tick thread");
247
248        // Spawn egress worker threads.
249        let worker_threads = Self::spawn_egress_workers(
250            worker_count,
251            &obs_rx,
252            &ring,
253            &epoch_counter,
254            &worker_epochs,
255        );
256
257        Ok(Self {
258            ring,
259            epoch_counter,
260            worker_epochs,
261            cmd_tx: Some(cmd_tx),
262            obs_tx: Some(obs_tx),
263            shutdown_flag,
264            tick_stopped,
265            tick_thread: Some(tick_thread),
266            worker_threads,
267            state: ShutdownState::Running,
268            recovered_engine: Mutex::new(None),
269            config: async_config,
270            backoff_config: stored_backoff,
271            seed,
272            tick_rate_hz,
273            space,
274        })
275    }
276
277    /// Submit commands to be processed in the next tick.
278    ///
279    /// Non-blocking: sends the batch via channel and blocks only for
280    /// the receipt reply (which arrives within one tick period).
281    pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
282        let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
283
284        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
285        let batch = IngressBatch {
286            commands,
287            reply: reply_tx,
288        };
289
290        cmd_tx.try_send(batch).map_err(|e| match e {
291            crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
292            crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
293        })?;
294
295        // Wait for receipts (blocks until tick thread processes the batch).
296        reply_rx.recv().map_err(|_| SubmitError::Shutdown)
297    }
298
299    /// Extract an observation from the latest snapshot.
300    ///
301    /// Blocking: dispatches to an egress worker and waits for the result.
302    /// The output and mask buffers must be pre-allocated.
303    pub fn observe(
304        &self,
305        plan: &Arc<ObsPlan>,
306        output: &mut [f32],
307        mask: &mut [u8],
308    ) -> Result<ObsMetadata, ObsError> {
309        let obs_tx = self
310            .obs_tx
311            .as_ref()
312            .ok_or_else(|| ObsError::ExecutionFailed {
313                reason: "world is shut down".into(),
314            })?;
315
316        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
317        let task = ObsTask::Simple {
318            plan: Arc::clone(plan),
319            output_len: output.len(),
320            mask_len: mask.len(),
321            reply: reply_tx,
322        };
323
324        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
325            reason: "egress pool shut down".into(),
326        })?;
327
328        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
329            reason: "worker disconnected".into(),
330        })?;
331
332        match result {
333            ObsResult::Simple {
334                metadata,
335                output: buf,
336                mask: mbuf,
337            } => {
338                if buf.len() > output.len() || mbuf.len() > mask.len() {
339                    return Err(ObsError::ExecutionFailed {
340                        reason: format!(
341                            "output buffer too small: need ({}, {}) got ({}, {})",
342                            buf.len(),
343                            mbuf.len(),
344                            output.len(),
345                            mask.len()
346                        ),
347                    });
348                }
349                output[..buf.len()].copy_from_slice(&buf);
350                mask[..mbuf.len()].copy_from_slice(&mbuf);
351                Ok(metadata)
352            }
353            ObsResult::Error(e) => Err(e),
354            _ => Err(ObsError::ExecutionFailed {
355                reason: "unexpected result type".into(),
356            }),
357        }
358    }
359
360    /// Extract agent-relative observations from the latest snapshot.
361    ///
362    /// Each agent gets `output_len / n_agents` elements in the output buffer.
363    pub fn observe_agents(
364        &self,
365        plan: &Arc<ObsPlan>,
366        space: &Arc<dyn Space>,
367        agent_centers: &[Coord],
368        output: &mut [f32],
369        mask: &mut [u8],
370    ) -> Result<Vec<ObsMetadata>, ObsError> {
371        let obs_tx = self
372            .obs_tx
373            .as_ref()
374            .ok_or_else(|| ObsError::ExecutionFailed {
375                reason: "world is shut down".into(),
376            })?;
377
378        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
379        let n_agents = agent_centers.len();
380        let per_agent_output = if n_agents > 0 {
381            output.len() / n_agents
382        } else {
383            0
384        };
385        let per_agent_mask = if n_agents > 0 {
386            mask.len() / n_agents
387        } else {
388            0
389        };
390
391        let task = ObsTask::Agents {
392            plan: Arc::clone(plan),
393            space: Arc::clone(space),
394            agent_centers: agent_centers.to_vec(),
395            output_len: per_agent_output,
396            mask_len: per_agent_mask,
397            reply: reply_tx,
398        };
399
400        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
401            reason: "egress pool shut down".into(),
402        })?;
403
404        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
405            reason: "worker disconnected".into(),
406        })?;
407
408        match result {
409            ObsResult::Agents {
410                metadata,
411                output: buf,
412                mask: mbuf,
413            } => {
414                if buf.len() > output.len() || mbuf.len() > mask.len() {
415                    return Err(ObsError::ExecutionFailed {
416                        reason: format!(
417                            "output buffer too small: need ({}, {}) got ({}, {})",
418                            buf.len(),
419                            mbuf.len(),
420                            output.len(),
421                            mask.len()
422                        ),
423                    });
424                }
425                output[..buf.len()].copy_from_slice(&buf);
426                mask[..mbuf.len()].copy_from_slice(&mbuf);
427                Ok(metadata)
428            }
429            ObsResult::Error(e) => Err(e),
430            _ => Err(ObsError::ExecutionFailed {
431                reason: "unexpected result type".into(),
432            }),
433        }
434    }
435
436    /// Spawn egress worker threads (shared between `new` and `reset`).
437    fn spawn_egress_workers(
438        worker_count: usize,
439        obs_rx: &crossbeam_channel::Receiver<ObsTask>,
440        ring: &Arc<SnapshotRing>,
441        epoch_counter: &Arc<EpochCounter>,
442        worker_epochs: &Arc<[WorkerEpoch]>,
443    ) -> Vec<JoinHandle<()>> {
444        let mut worker_threads = Vec::with_capacity(worker_count);
445        for i in 0..worker_count {
446            let obs_rx = obs_rx.clone();
447            let ring = Arc::clone(ring);
448            let epoch = Arc::clone(epoch_counter);
449            let worker_epochs_ref = Arc::clone(worker_epochs);
450            let handle = thread::Builder::new()
451                .name(format!("murk-egress-{i}"))
452                .spawn(move || {
453                    crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
454                })
455                .expect("failed to spawn egress worker");
456            worker_threads.push(handle);
457        }
458        worker_threads
459    }
460
461    /// Get the latest snapshot directly from the ring.
462    pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
463        self.ring.latest()
464    }
465
466    /// Current epoch (lock-free read).
467    pub fn current_epoch(&self) -> u64 {
468        self.epoch_counter.current()
469    }
470
471    /// Non-blocking queue/ring visibility for readiness checks.
472    ///
473    /// This API is intended for callers that need to detect overload risk
474    /// before making blocking calls like `observe`.
475    pub fn preflight(&self) -> RealtimePreflight {
476        let (command_queue_depth, command_queue_capacity) = self
477            .cmd_tx
478            .as_ref()
479            .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
480            .unwrap_or((0, 0));
481        let (observe_queue_depth, observe_queue_capacity) = self
482            .obs_tx
483            .as_ref()
484            .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
485            .unwrap_or((0, 0));
486        let ring = crate::egress::ring_preflight(&self.ring, &self.epoch_counter);
487
488        RealtimePreflight {
489            command_queue_depth,
490            command_queue_capacity,
491            observe_queue_depth,
492            observe_queue_capacity,
493            has_snapshot: ring.has_snapshot,
494            latest_snapshot_tick_id: ring.latest_tick_id,
495            snapshot_age_ticks: ring.age_ticks,
496            ring_capacity: ring.ring_capacity,
497            ring_len: ring.ring_len,
498            ring_write_pos: ring.ring_write_pos,
499            ring_oldest_retained_pos: ring.ring_oldest_retained_pos,
500            ring_eviction_events: ring.ring_eviction_events,
501            ring_stale_read_events: ring.ring_stale_read_events,
502            ring_skew_retry_events: ring.ring_skew_retry_events,
503            tick_thread_stopped: self.tick_stopped.load(Ordering::Acquire),
504        }
505    }
506
507    /// Shutdown the world with the 4-state machine.
508    ///
509    /// 1. **Running → Draining (≤33ms):** Set shutdown flag, unpark tick
510    ///    thread (wakes it from budget sleep immediately), wait for tick stop.
511    /// 2. **Draining → Quiescing (≤200ms):** Cancel workers, drop obs channel.
512    /// 3. **Quiescing → Dropped (≤10ms):** Join all threads.
513    pub fn shutdown(&mut self) -> ShutdownReport {
514        if self.state == ShutdownState::Dropped {
515            return ShutdownReport {
516                total_ms: 0,
517                drain_ms: 0,
518                quiesce_ms: 0,
519                tick_joined: true,
520                workers_joined: 0,
521            };
522        }
523
524        let start = Instant::now();
525
526        // Phase 1: Running → Draining
527        self.state = ShutdownState::Draining;
528        self.shutdown_flag.store(true, Ordering::Release);
529
530        // Wake the tick thread if it's parked in a budget sleep.
531        // park_timeout is used instead of thread::sleep, so unpark()
532        // provides immediate wakeup regardless of tick_rate_hz.
533        if let Some(handle) = &self.tick_thread {
534            handle.thread().unpark();
535        }
536
537        // Wait for tick thread to acknowledge (≤33ms budget).
538        let drain_deadline = Instant::now() + Duration::from_millis(33);
539        while !self.tick_stopped.load(Ordering::Acquire) {
540            if Instant::now() > drain_deadline {
541                break;
542            }
543            thread::yield_now();
544        }
545        let drain_ms = start.elapsed().as_millis() as u64;
546
547        // Phase 2: Draining → Quiescing
548        self.state = ShutdownState::Quiescing;
549
550        // Cancel all workers.
551        for w in self.worker_epochs.iter() {
552            w.request_cancel();
553        }
554
555        // Drop the command and observation channels to unblock workers.
556        self.cmd_tx.take();
557        self.obs_tx.take();
558
559        // Wait for all workers to unpin (≤200ms budget).
560        let quiesce_deadline = Instant::now() + Duration::from_millis(200);
561        loop {
562            let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
563            if all_unpinned || Instant::now() > quiesce_deadline {
564                break;
565            }
566            thread::yield_now();
567        }
568        let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
569
570        // Phase 3: Quiescing → Dropped
571        self.state = ShutdownState::Dropped;
572
573        let tick_joined = if let Some(handle) = self.tick_thread.take() {
574            match handle.join() {
575                Ok(engine) => {
576                    *self.recovered_engine.lock().unwrap() = Some(engine);
577                    true
578                }
579                Err(_) => false,
580            }
581        } else {
582            true
583        };
584
585        let mut workers_joined = 0;
586        for handle in self.worker_threads.drain(..) {
587            if handle.join().is_ok() {
588                workers_joined += 1;
589            }
590        }
591
592        let total_ms = start.elapsed().as_millis() as u64;
593        ShutdownReport {
594            total_ms,
595            drain_ms,
596            quiesce_ms,
597            tick_joined,
598            workers_joined,
599        }
600    }
601
602    /// Reset the world: stop all threads, reset engine state, restart.
603    ///
604    /// This is the RL episode-boundary operation. The engine is recovered
605    /// from the tick thread, reset in-place, then respawned with fresh
606    /// channels and worker threads.
607    pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
608        // Shutdown if still running (recovers engine via JoinHandle).
609        if self.state != ShutdownState::Dropped {
610            self.shutdown();
611        }
612
613        self.seed = seed;
614
615        // Recover the engine from the previous tick thread.
616        let mut engine = self
617            .recovered_engine
618            .lock()
619            .unwrap()
620            .take()
621            .ok_or(ConfigError::EngineRecoveryFailed)?;
622
623        // Reset the engine (clears arena, ingress, tick counter).
624        // If reset fails, restore the engine so a subsequent reset can retry.
625        if let Err(e) = engine.reset() {
626            *self.recovered_engine.lock().unwrap() = Some(engine);
627            return Err(e);
628        }
629
630        // Fresh shared state.
631        let worker_count = self.config.resolved_worker_count();
632        self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
633        self.epoch_counter = Arc::new(EpochCounter::new());
634        self.worker_epochs = (0..worker_count as u32)
635            .map(WorkerEpoch::new)
636            .collect::<Vec<_>>()
637            .into();
638        self.shutdown_flag = Arc::new(AtomicBool::new(false));
639        self.tick_stopped = Arc::new(AtomicBool::new(false));
640
641        // Fresh channels.
642        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
643        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
644        self.cmd_tx = Some(cmd_tx);
645        self.obs_tx = Some(obs_tx);
646
647        // Respawn tick thread.
648        let tick_ring = Arc::clone(&self.ring);
649        let tick_epoch = Arc::clone(&self.epoch_counter);
650        let tick_workers = Arc::clone(&self.worker_epochs);
651        let tick_shutdown = Arc::clone(&self.shutdown_flag);
652        let tick_stopped_flag = Arc::clone(&self.tick_stopped);
653        let tick_rate_hz = self.tick_rate_hz;
654        let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
655        let cancel_grace_ms = self.config.cancel_grace_ms;
656        let backoff_config = self.backoff_config.clone();
657        self.tick_thread = Some(
658            thread::Builder::new()
659                .name("murk-tick".into())
660                .spawn(move || {
661                    let state = TickThreadState::new(
662                        engine,
663                        tick_ring,
664                        tick_epoch,
665                        tick_workers,
666                        cmd_rx,
667                        tick_shutdown,
668                        tick_stopped_flag,
669                        tick_rate_hz,
670                        max_epoch_hold_ms,
671                        cancel_grace_ms,
672                        &backoff_config,
673                    );
674                    state.run()
675                })
676                .expect("failed to spawn tick thread"),
677        );
678
679        // Respawn egress workers.
680        self.worker_threads = Self::spawn_egress_workers(
681            worker_count,
682            &obs_rx,
683            &self.ring,
684            &self.epoch_counter,
685            &self.worker_epochs,
686        );
687
688        self.state = ShutdownState::Running;
689        Ok(())
690    }
691
692    /// The shared space used for agent-relative observations.
693    pub fn space(&self) -> &dyn Space {
694        self.space.as_ref()
695    }
696}
697
698impl Drop for RealtimeAsyncWorld {
699    fn drop(&mut self) {
700        if self.state != ShutdownState::Dropped {
701            self.shutdown();
702        }
703    }
704}
705
706// ── ArcSpaceWrapper ──────────────────────────────────────────────
707
708/// Wrapper that implements `Space` by delegating to an `Arc<dyn Space>`.
709///
710/// This allows `TickEngine` to take a `Box<dyn Space>` while the
711/// `RealtimeAsyncWorld` retains a shared `Arc` reference for egress.
712struct ArcSpaceWrapper(Arc<dyn Space>);
713
714impl murk_space::Space for ArcSpaceWrapper {
715    fn ndim(&self) -> usize {
716        self.0.ndim()
717    }
718
719    fn cell_count(&self) -> usize {
720        self.0.cell_count()
721    }
722
723    fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
724        self.0.neighbours(coord)
725    }
726
727    fn distance(&self, a: &Coord, b: &Coord) -> f64 {
728        self.0.distance(a, b)
729    }
730
731    fn compile_region(
732        &self,
733        spec: &murk_space::RegionSpec,
734    ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
735        self.0.compile_region(spec)
736    }
737
738    fn canonical_ordering(&self) -> Vec<Coord> {
739        self.0.canonical_ordering()
740    }
741
742    fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
743        self.0.canonical_rank(coord)
744    }
745
746    fn instance_id(&self) -> murk_core::SpaceInstanceId {
747        self.0.instance_id()
748    }
749
750    fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
751        // Unwrap ArcSpaceWrapper so the inner space's downcast-based
752        // comparison sees the concrete type, not this wrapper.
753        if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
754            self.0.topology_eq(&*w.0)
755        } else {
756            self.0.topology_eq(other)
757        }
758    }
759}
760
761// Delegate optional Space methods if they exist.
762impl std::fmt::Debug for ArcSpaceWrapper {
763    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
764        f.debug_tuple("ArcSpaceWrapper").finish()
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use murk_core::id::FieldId;
772    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
773    use murk_obs::spec::ObsRegion;
774    use murk_obs::{ObsEntry, ObsSpec};
775    use murk_space::{EdgeBehavior, Line1D};
776    use murk_test_utils::ConstPropagator;
777
778    fn scalar_field(name: &str) -> FieldDef {
779        FieldDef {
780            name: name.to_string(),
781            field_type: FieldType::Scalar,
782            mutability: FieldMutability::PerTick,
783            units: None,
784            bounds: None,
785            boundary_behavior: BoundaryBehavior::Clamp,
786        }
787    }
788
789    fn test_config() -> WorldConfig {
790        WorldConfig {
791            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
792            fields: vec![scalar_field("energy")],
793            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
794            dt: 0.1,
795            seed: 42,
796            ring_buffer_size: 8,
797            max_ingress_queue: 1024,
798            tick_rate_hz: Some(60.0),
799            backoff: crate::config::BackoffConfig::default(),
800        }
801    }
802
803    #[test]
804    fn lifecycle_start_and_shutdown() {
805        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
806
807        // Wait for at least one snapshot (polling with timeout).
808        let deadline = Instant::now() + Duration::from_secs(2);
809        while world.latest_snapshot().is_none() {
810            if Instant::now() > deadline {
811                panic!("no snapshot produced within 2s");
812            }
813            std::thread::sleep(Duration::from_millis(10));
814        }
815
816        let epoch = world.current_epoch();
817        assert!(epoch > 0, "epoch should have advanced");
818
819        let report = world.shutdown();
820        assert!(report.tick_joined);
821        assert!(report.workers_joined > 0);
822    }
823
824    #[test]
825    fn observe_returns_data() {
826        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
827
828        // Wait for at least one snapshot.
829        let deadline = Instant::now() + Duration::from_secs(2);
830        while world.latest_snapshot().is_none() {
831            if Instant::now() > deadline {
832                panic!("no snapshot produced within 2s");
833            }
834            std::thread::sleep(Duration::from_millis(10));
835        }
836
837        // Compile an obs plan.
838        let space = world.space();
839        let spec = ObsSpec {
840            entries: vec![ObsEntry {
841                field_id: FieldId(0),
842                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
843                pool: None,
844                transform: murk_obs::spec::ObsTransform::Identity,
845                dtype: murk_obs::spec::ObsDtype::F32,
846            }],
847        };
848        let plan_result = ObsPlan::compile(&spec, space).unwrap();
849        let plan = Arc::new(plan_result.plan);
850
851        let mut output = vec![0.0f32; plan_result.output_len];
852        let mut mask = vec![0u8; plan_result.mask_len];
853
854        let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
855        assert!(meta.tick_id.0 > 0);
856        assert_eq!(output.len(), 10);
857        assert!(output.iter().all(|&v| v == 42.0));
858
859        world.shutdown();
860    }
861
862    #[test]
863    fn concurrent_observe() {
864        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
865
866        // Wait for at least one snapshot.
867        let deadline = Instant::now() + Duration::from_secs(2);
868        while world.latest_snapshot().is_none() {
869            if Instant::now() > deadline {
870                panic!("no snapshot produced within 2s");
871            }
872            std::thread::sleep(Duration::from_millis(10));
873        }
874
875        let space = world.space();
876        let spec = ObsSpec {
877            entries: vec![ObsEntry {
878                field_id: FieldId(0),
879                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
880                pool: None,
881                transform: murk_obs::spec::ObsTransform::Identity,
882                dtype: murk_obs::spec::ObsDtype::F32,
883            }],
884        };
885        let plan_result = ObsPlan::compile(&spec, space).unwrap();
886        let plan = Arc::new(plan_result.plan);
887
888        // Spawn 4 threads all calling observe concurrently.
889        let world = Arc::new(world);
890        let handles: Vec<_> = (0..4)
891            .map(|_| {
892                let w = Arc::clone(&world);
893                let p = Arc::clone(&plan);
894                let out_len = plan_result.output_len;
895                let mask_len = plan_result.mask_len;
896                std::thread::spawn(move || {
897                    let mut output = vec![0.0f32; out_len];
898                    let mut mask = vec![0u8; mask_len];
899                    let meta = w.observe(&p, &mut output, &mut mask).unwrap();
900                    assert!(meta.tick_id.0 > 0);
901                    assert!(output.iter().all(|&v| v == 42.0));
902                })
903            })
904            .collect();
905
906        for h in handles {
907            h.join().unwrap();
908        }
909
910        // Shutdown via Arc — need to unwrap.
911        // Since we can't call shutdown on Arc, Drop will handle it.
912        drop(world);
913    }
914
915    #[test]
916    fn submit_commands_flow_through() {
917        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
918
919        // Wait for first tick.
920        let deadline = Instant::now() + Duration::from_secs(2);
921        while world.latest_snapshot().is_none() {
922            if Instant::now() > deadline {
923                panic!("no snapshot produced within 2s");
924            }
925            std::thread::sleep(Duration::from_millis(10));
926        }
927
928        // Submit a command.
929        let cmd = Command {
930            payload: murk_core::command::CommandPayload::SetParameter {
931                key: murk_core::id::ParameterKey(0),
932                value: 1.0,
933            },
934            expires_after_tick: murk_core::id::TickId(10000),
935            source_id: None,
936            source_seq: None,
937            priority_class: 1,
938            arrival_seq: 0,
939        };
940        let receipts = world.submit_commands(vec![cmd]).unwrap();
941        assert_eq!(receipts.len(), 1);
942        assert!(receipts[0].accepted);
943
944        world.shutdown();
945    }
946
947    #[test]
948    fn drop_triggers_shutdown() {
949        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
950        std::thread::sleep(Duration::from_millis(50));
951        drop(world);
952        // If this doesn't hang, shutdown worked.
953    }
954
955    #[test]
956    fn shutdown_budget() {
957        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
958        std::thread::sleep(Duration::from_millis(100));
959
960        let report = world.shutdown();
961        // Shutdown should complete well within 2s (generous for slow CI runners).
962        assert!(
963            report.total_ms < 2000,
964            "shutdown took too long: {}ms",
965            report.total_ms
966        );
967    }
968
969    /// Regression test: with a very slow tick rate, shutdown must still
970    /// complete within the documented budget. Before the fix, the tick
971    /// thread used `thread::sleep` which was uninterruptible by the
972    /// shutdown flag, causing shutdown to block for the full tick budget.
973    #[test]
974    fn shutdown_fast_with_slow_tick_rate() {
975        let config = WorldConfig {
976            tick_rate_hz: Some(0.5), // 2-second tick budget
977            ..test_config()
978        };
979        let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
980
981        // Wait for at least one tick to complete so the ring is non-empty
982        // and the tick thread enters its budget sleep.
983        let deadline = Instant::now() + Duration::from_secs(5);
984        while world.latest_snapshot().is_none() {
985            if Instant::now() > deadline {
986                panic!("no snapshot produced within 5s");
987            }
988            std::thread::sleep(Duration::from_millis(10));
989        }
990
991        // Give the tick thread time to enter its budget sleep.
992        std::thread::sleep(Duration::from_millis(50));
993
994        let start = Instant::now();
995        let report = world.shutdown();
996        let wall_ms = start.elapsed().as_millis();
997
998        // Shutdown should complete well under the 2-second tick budget.
999        // Allow 500ms for CI overhead; the point is it shouldn't take 2s.
1000        assert!(
1001            wall_ms < 500,
1002            "shutdown took {wall_ms}ms with 0.5Hz tick rate \
1003             (report: total={}ms, drain={}ms, quiesce={}ms)",
1004            report.total_ms,
1005            report.drain_ms,
1006            report.quiesce_ms
1007        );
1008        assert!(report.tick_joined);
1009    }
1010
1011    #[test]
1012    fn preflight_reports_queue_capacities_and_snapshot_readiness() {
1013        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1014
1015        let initial = world.preflight();
1016        assert_eq!(initial.command_queue_capacity, 64);
1017        assert!(initial.observe_queue_capacity > 0);
1018        assert_eq!(initial.ring_capacity, 8);
1019        assert!(initial.ring_len <= initial.ring_capacity);
1020        assert!(initial.ring_write_pos >= initial.ring_len as u64);
1021        if initial.has_snapshot {
1022            assert!(initial.ring_len > 0);
1023            assert!(initial.latest_snapshot_tick_id > 0);
1024            assert!(initial.ring_oldest_retained_pos.is_some());
1025        } else {
1026            assert_eq!(initial.ring_len, 0);
1027            assert_eq!(initial.ring_write_pos, 0);
1028            assert_eq!(initial.ring_oldest_retained_pos, None);
1029        }
1030        assert!(!initial.tick_thread_stopped);
1031
1032        let deadline = Instant::now() + Duration::from_secs(2);
1033        let mut ready = initial;
1034        while !ready.has_snapshot {
1035            if Instant::now() > deadline {
1036                panic!("preflight never reported available snapshot");
1037            }
1038            std::thread::sleep(Duration::from_millis(10));
1039            ready = world.preflight();
1040        }
1041        assert!(ready.latest_snapshot_tick_id > 0);
1042        assert!(ready.ring_len > 0);
1043        assert!(ready.ring_write_pos >= ready.ring_len as u64);
1044        assert!(ready.ring_oldest_retained_pos.is_some());
1045
1046        world.shutdown();
1047        let stopped = world.preflight();
1048        assert_eq!(stopped.command_queue_capacity, 0);
1049        assert_eq!(stopped.observe_queue_capacity, 0);
1050        assert!(stopped.tick_thread_stopped);
1051    }
1052
1053    #[test]
1054    fn preflight_observes_ingress_backlog() {
1055        let config = WorldConfig {
1056            tick_rate_hz: Some(0.5),
1057            ..test_config()
1058        };
1059        let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1060
1061        // Wait for the first publish so the tick thread enters budget sleep.
1062        // Enqueueing after this point makes backlog visibility deterministic
1063        // in slow/instrumented runners (e.g., tarpaulin).
1064        let ready_deadline = Instant::now() + Duration::from_secs(5);
1065        while world.latest_snapshot().is_none() {
1066            if Instant::now() > ready_deadline {
1067                panic!("no snapshot produced within 5s");
1068            }
1069            std::thread::sleep(Duration::from_millis(10));
1070        }
1071
1072        let cmd_tx = world
1073            .cmd_tx
1074            .as_ref()
1075            .expect("cmd channel must exist")
1076            .clone();
1077
1078        for _ in 0..8 {
1079            let (reply_tx, _reply_rx) = crossbeam_channel::bounded(1);
1080            cmd_tx
1081                .try_send(IngressBatch {
1082                    commands: Vec::new(),
1083                    reply: reply_tx,
1084                })
1085                .expect("expected ingress queue to accept batch");
1086        }
1087
1088        let deadline = Instant::now() + Duration::from_millis(500);
1089        let mut preflight = world.preflight();
1090        while preflight.command_queue_depth == 0 {
1091            if Instant::now() > deadline {
1092                break;
1093            }
1094            std::thread::sleep(Duration::from_millis(5));
1095            preflight = world.preflight();
1096        }
1097        assert!(preflight.command_queue_depth > 0);
1098        assert!(preflight.command_queue_depth <= preflight.command_queue_capacity);
1099
1100        world.shutdown();
1101    }
1102
1103    #[test]
1104    fn reset_lifecycle() {
1105        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1106
1107        // Wait for some ticks (generous timeout for slow CI runners).
1108        let deadline = Instant::now() + Duration::from_secs(5);
1109        while world.current_epoch() < 5 {
1110            if Instant::now() > deadline {
1111                panic!("epoch didn't reach 5 within 5s");
1112            }
1113            std::thread::sleep(Duration::from_millis(10));
1114        }
1115        let epoch_before = world.current_epoch();
1116        assert!(epoch_before >= 5);
1117
1118        // Reset with a new seed.
1119        world.reset(99).unwrap();
1120
1121        // After reset, epoch should restart from 0.
1122        assert_eq!(world.current_epoch(), 0);
1123
1124        // The world should produce new snapshots.
1125        let deadline = Instant::now() + Duration::from_secs(2);
1126        while world.latest_snapshot().is_none() {
1127            if Instant::now() > deadline {
1128                panic!("no snapshot after reset within 2s");
1129            }
1130            std::thread::sleep(Duration::from_millis(10));
1131        }
1132        assert!(world.current_epoch() > 0, "should be ticking after reset");
1133
1134        world.shutdown();
1135    }
1136
1137    #[test]
1138    fn arc_space_wrapper_topology_eq() {
1139        // Two ArcSpaceWrappers around identical Line1D spaces must compare
1140        // as topologically equal. Before the fix, the inner downcast saw
1141        // ArcSpaceWrapper instead of Line1D and returned false.
1142        let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1143        let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1144        assert!(
1145            a.topology_eq(&b),
1146            "identical Line1D through ArcSpaceWrapper should be topology-equal"
1147        );
1148
1149        // Different sizes must not match.
1150        let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
1151        assert!(
1152            !a.topology_eq(&c),
1153            "different Line1D sizes should not be topology-equal"
1154        );
1155
1156        // Comparing ArcSpaceWrapper with a bare Line1D should also work.
1157        let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
1158        assert!(
1159            a.topology_eq(&bare),
1160            "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
1161        );
1162    }
1163}