Skip to main content

murk_engine/
realtime.rs

1//! User-facing `RealtimeAsyncWorld` API and shutdown state machine.
2//!
3//! This is the primary mode for RL training: the tick engine runs on a
4//! dedicated background thread at a configurable rate (default 60 Hz),
5//! while an egress thread pool serves observation requests concurrently.
6//!
7//! # Architecture
8//!
9//! ```text
10//! User Thread(s)              Tick Thread              Egress Workers (N)
11//!     |                           |                         |
12//!     |--submit_commands()------->| cmd_rx.try_recv()       |
13//!     |   [cmd_tx: bounded(64)]   | engine.submit_commands()|
14//!     |<--receipts via reply_tx---| engine.execute_tick()   |
15//!     |                           | ring.push(snap)         |
16//!     |                           | epoch_counter.advance() |
17//!     |                           | check_stalled_workers() |
18//!     |                           | sleep(budget - elapsed) |
19//!     |                           |                         |
20//!     |--observe()------------------------------------->    |
21//!     |   [obs_tx: bounded(N*4)]               task_rx.recv()
22//!     |   blocks on reply_rx                   ring.latest()
23//!     |                                        pin epoch
24//!     |                                        execute ObsPlan
25//!     |                                        unpin epoch
26//!     |<--result via reply_tx--------------------------    |
27//! ```
28
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::thread::{self, JoinHandle};
32use std::time::{Duration, Instant};
33
34use murk_arena::OwnedSnapshot;
35use murk_core::command::{Command, Receipt};
36use murk_core::error::ObsError;
37use murk_core::Coord;
38use murk_obs::{ObsMetadata, ObsPlan};
39use murk_space::Space;
40
41use crate::config::{AsyncConfig, BackoffConfig, ConfigError, WorldConfig};
42use crate::egress::{ObsResult, ObsTask};
43use crate::epoch::{EpochCounter, WorkerEpoch};
44use crate::ring::SnapshotRing;
45use crate::tick::TickEngine;
46use crate::tick_thread::{IngressBatch, TickThreadState};
47
48// ── Error types ──────────────────────────────────────────────────
49
50/// Error submitting commands to the tick thread.
51#[derive(Debug, PartialEq, Eq)]
52pub enum SubmitError {
53    /// The tick thread has shut down.
54    Shutdown,
55    /// The command channel is full (back-pressure).
56    ChannelFull,
57}
58
59impl std::fmt::Display for SubmitError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            Self::Shutdown => write!(f, "tick thread has shut down"),
63            Self::ChannelFull => write!(f, "command channel full"),
64        }
65    }
66}
67
68impl std::error::Error for SubmitError {}
69
70// ── ShutdownReport ───────────────────────────────────────────────
71
72/// Report from the shutdown state machine.
73#[derive(Debug)]
74pub struct ShutdownReport {
75    /// Total time spent in the shutdown sequence.
76    pub total_ms: u64,
77    /// Time spent draining the tick thread.
78    pub drain_ms: u64,
79    /// Time spent quiescing workers.
80    pub quiesce_ms: u64,
81    /// Whether the tick thread was joined successfully.
82    pub tick_joined: bool,
83    /// Number of worker threads joined.
84    pub workers_joined: usize,
85}
86
87/// Non-blocking visibility snapshot for realtime health checks.
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct RealtimePreflight {
90    /// Pending command batches in the ingress channel.
91    pub command_queue_depth: usize,
92    /// Ingress channel capacity (0 if unavailable/shut down).
93    pub command_queue_capacity: usize,
94    /// Pending observation tasks in the egress task channel.
95    pub observe_queue_depth: usize,
96    /// Egress task channel capacity (0 if unavailable/shut down).
97    pub observe_queue_capacity: usize,
98    /// Whether a snapshot is currently available to serve observes.
99    pub has_snapshot: bool,
100    /// Tick ID of the latest snapshot (0 when unavailable).
101    pub latest_snapshot_tick_id: u64,
102    /// Snapshot age in ticks relative to the current epoch.
103    pub snapshot_age_ticks: u64,
104    /// Ring retention capacity (maximum number of retained snapshots).
105    pub ring_capacity: usize,
106    /// Current retained snapshot count.
107    pub ring_len: usize,
108    /// Current monotonic ring write position.
109    pub ring_write_pos: u64,
110    /// Oldest retained write position (None when ring is empty).
111    pub ring_oldest_retained_pos: Option<u64>,
112    /// Cumulative count of evictions from ring overwrite.
113    pub ring_eviction_events: u64,
114    /// Cumulative count of stale/not-yet-written position reads.
115    pub ring_stale_read_events: u64,
116    /// Cumulative count of overwrite-skew retry events.
117    pub ring_skew_retry_events: u64,
118    /// Whether the tick thread has already stopped.
119    pub tick_thread_stopped: bool,
120}
121
122// ── ShutdownState ────────────────────────────────────────────────
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125enum ShutdownState {
126    Running,
127    Draining,
128    Quiescing,
129    Dropped,
130}
131
132// ── RealtimeAsyncWorld ───────────────────────────────────────────
133
134/// Realtime asynchronous simulation world.
135///
136/// Runs the tick engine on a background thread and serves observations
137/// from a pool of egress workers. This is the primary API for RL
138/// training environments.
139pub struct RealtimeAsyncWorld {
140    ring: Arc<SnapshotRing>,
141    epoch_counter: Arc<EpochCounter>,
142    worker_epochs: Arc<[WorkerEpoch]>,
143    cmd_tx: Option<crossbeam_channel::Sender<IngressBatch>>,
144    obs_tx: Option<crossbeam_channel::Sender<ObsTask>>,
145    shutdown_flag: Arc<AtomicBool>,
146    tick_stopped: Arc<AtomicBool>,
147    tick_thread: Option<JoinHandle<TickEngine>>,
148    worker_threads: Vec<JoinHandle<()>>,
149    state: ShutdownState,
150    /// Recovered from tick thread on shutdown, used for `reset()`.
151    /// Wrapped in Mutex so RealtimeAsyncWorld is Sync (TickEngine
152    /// contains `Vec<Box<dyn Propagator>>` which is Send but not Sync).
153    /// Never contended: only accessed during reset() which takes &mut self.
154    recovered_engine: Mutex<Option<TickEngine>>,
155    config: AsyncConfig,
156    backoff_config: BackoffConfig,
157    seed: u64,
158    tick_rate_hz: f64,
159    /// Shared space for agent-relative observations and engine reconstruction.
160    space: Arc<dyn Space>,
161}
162
163impl RealtimeAsyncWorld {
164    /// Create a new realtime async world and spawn all threads.
165    ///
166    /// The `WorldConfig` is consumed: the `TickEngine` is moved into
167    /// the tick thread. The `space` is shared via `Arc` for egress
168    /// workers that need it for agent-relative observations.
169    pub fn new(config: WorldConfig, async_config: AsyncConfig) -> Result<Self, ConfigError> {
170        let tick_rate_hz = config.tick_rate_hz.unwrap_or(60.0);
171        if !tick_rate_hz.is_finite() || tick_rate_hz <= 0.0 || !(1.0 / tick_rate_hz).is_finite() {
172            return Err(ConfigError::InvalidTickRate {
173                value: tick_rate_hz,
174            });
175        }
176
177        let seed = config.seed;
178        let ring_size = config.ring_buffer_size;
179        let backoff_config = config.backoff.clone();
180        let max_epoch_hold_ms = async_config.max_epoch_hold_ms;
181        let cancel_grace_ms = async_config.cancel_grace_ms;
182
183        // Share the space for agent-relative observations.
184        let space: Arc<dyn Space> = Arc::from(config.space);
185
186        // Reconstruct WorldConfig with the Arc'd space (Box from Arc).
187        // We clone the space Arc for our own reference, and give
188        // TickEngine a Box wrapper.
189        let engine_space: Box<dyn Space> = Box::new(ArcSpaceWrapper(Arc::clone(&space)));
190        let engine_config = WorldConfig {
191            space: engine_space,
192            fields: config.fields,
193            propagators: config.propagators,
194            dt: config.dt,
195            seed: config.seed,
196            ring_buffer_size: config.ring_buffer_size,
197            max_ingress_queue: config.max_ingress_queue,
198            tick_rate_hz: config.tick_rate_hz,
199            backoff: backoff_config.clone(),
200        };
201
202        let engine = TickEngine::new(engine_config)?;
203
204        let worker_count = async_config.resolved_worker_count();
205        let ring = Arc::new(SnapshotRing::new(ring_size));
206        let epoch_counter = Arc::new(EpochCounter::new());
207        let worker_epochs: Arc<[WorkerEpoch]> = (0..worker_count as u32)
208            .map(WorkerEpoch::new)
209            .collect::<Vec<_>>()
210            .into();
211
212        let shutdown_flag = Arc::new(AtomicBool::new(false));
213        let tick_stopped = Arc::new(AtomicBool::new(false));
214
215        // Command channel: bounded(64) — tick thread drains each tick.
216        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
217
218        // Task channel for egress workers: bounded(worker_count * 4).
219        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
220
221        // Spawn tick thread — returns TickEngine on exit for reset().
222        let tick_ring = Arc::clone(&ring);
223        let tick_epoch = Arc::clone(&epoch_counter);
224        let tick_workers = Arc::clone(&worker_epochs);
225        let tick_shutdown = Arc::clone(&shutdown_flag);
226        let tick_stopped_flag = Arc::clone(&tick_stopped);
227        let stored_backoff = backoff_config.clone();
228        let tick_thread = thread::Builder::new()
229            .name("murk-tick".into())
230            .spawn(move || {
231                let state = TickThreadState::new(
232                    engine,
233                    tick_ring,
234                    tick_epoch,
235                    tick_workers,
236                    cmd_rx,
237                    tick_shutdown,
238                    tick_stopped_flag,
239                    tick_rate_hz,
240                    max_epoch_hold_ms,
241                    cancel_grace_ms,
242                    &backoff_config,
243                );
244                state.run()
245            })
246            .map_err(|e| ConfigError::ThreadSpawnFailed {
247                reason: format!("tick thread: {e}"),
248            })?;
249
250        // Spawn egress worker threads. On partial failure, shut down
251        // the tick thread before propagating the error.
252        let (obs_tx, worker_threads) = match Self::spawn_egress_workers(
253            worker_count,
254            obs_tx,
255            &obs_rx,
256            &ring,
257            &epoch_counter,
258            &worker_epochs,
259        ) {
260            Ok(result) => result,
261            Err(e) => {
262                shutdown_flag.store(true, Ordering::Release);
263                tick_thread.thread().unpark();
264                let _ = tick_thread.join();
265                return Err(e);
266            }
267        };
268
269        Ok(Self {
270            ring,
271            epoch_counter,
272            worker_epochs,
273            cmd_tx: Some(cmd_tx),
274            obs_tx: Some(obs_tx),
275            shutdown_flag,
276            tick_stopped,
277            tick_thread: Some(tick_thread),
278            worker_threads,
279            state: ShutdownState::Running,
280            recovered_engine: Mutex::new(None),
281            config: async_config,
282            backoff_config: stored_backoff,
283            seed,
284            tick_rate_hz,
285            space,
286        })
287    }
288
289    /// Submit commands to be processed in the next tick.
290    ///
291    /// Non-blocking: sends the batch via channel and blocks only for
292    /// the receipt reply (which arrives within one tick period).
293    pub fn submit_commands(&self, commands: Vec<Command>) -> Result<Vec<Receipt>, SubmitError> {
294        let cmd_tx = self.cmd_tx.as_ref().ok_or(SubmitError::Shutdown)?;
295
296        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
297        let batch = IngressBatch {
298            commands,
299            reply: reply_tx,
300        };
301
302        cmd_tx.try_send(batch).map_err(|e| match e {
303            crossbeam_channel::TrySendError::Full(_) => SubmitError::ChannelFull,
304            crossbeam_channel::TrySendError::Disconnected(_) => SubmitError::Shutdown,
305        })?;
306
307        // Wait for receipts (blocks until tick thread processes the batch).
308        reply_rx.recv().map_err(|_| SubmitError::Shutdown)
309    }
310
311    /// Extract an observation from the latest snapshot.
312    ///
313    /// Blocking: dispatches to an egress worker and waits for the result.
314    /// The output and mask buffers must be pre-allocated.
315    pub fn observe(
316        &self,
317        plan: &Arc<ObsPlan>,
318        output: &mut [f32],
319        mask: &mut [u8],
320    ) -> Result<ObsMetadata, ObsError> {
321        let obs_tx = self
322            .obs_tx
323            .as_ref()
324            .ok_or_else(|| ObsError::ExecutionFailed {
325                reason: "world is shut down".into(),
326            })?;
327
328        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
329        let task = ObsTask::Simple {
330            plan: Arc::clone(plan),
331            output_len: output.len(),
332            mask_len: mask.len(),
333            reply: reply_tx,
334        };
335
336        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
337            reason: "egress pool shut down".into(),
338        })?;
339
340        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
341            reason: "worker disconnected".into(),
342        })?;
343
344        match result {
345            ObsResult::Simple {
346                metadata,
347                output: buf,
348                mask: mbuf,
349            } => {
350                if buf.len() > output.len() || mbuf.len() > mask.len() {
351                    return Err(ObsError::ExecutionFailed {
352                        reason: format!(
353                            "output buffer too small: need ({}, {}) got ({}, {})",
354                            buf.len(),
355                            mbuf.len(),
356                            output.len(),
357                            mask.len()
358                        ),
359                    });
360                }
361                output[..buf.len()].copy_from_slice(&buf);
362                mask[..mbuf.len()].copy_from_slice(&mbuf);
363                Ok(metadata)
364            }
365            ObsResult::Error(e) => Err(e),
366            _ => Err(ObsError::ExecutionFailed {
367                reason: "unexpected result type".into(),
368            }),
369        }
370    }
371
372    /// Extract agent-relative observations from the latest snapshot.
373    ///
374    /// Each agent gets `output_len / n_agents` elements in the output buffer.
375    pub fn observe_agents(
376        &self,
377        plan: &Arc<ObsPlan>,
378        space: &Arc<dyn Space>,
379        agent_centers: &[Coord],
380        output: &mut [f32],
381        mask: &mut [u8],
382    ) -> Result<Vec<ObsMetadata>, ObsError> {
383        let obs_tx = self
384            .obs_tx
385            .as_ref()
386            .ok_or_else(|| ObsError::ExecutionFailed {
387                reason: "world is shut down".into(),
388            })?;
389
390        let (reply_tx, reply_rx) = crossbeam_channel::bounded(1);
391        let n_agents = agent_centers.len();
392        let per_agent_output = if n_agents > 0 {
393            output.len() / n_agents
394        } else {
395            0
396        };
397        let per_agent_mask = if n_agents > 0 {
398            mask.len() / n_agents
399        } else {
400            0
401        };
402
403        let task = ObsTask::Agents {
404            plan: Arc::clone(plan),
405            space: Arc::clone(space),
406            agent_centers: agent_centers.to_vec(),
407            output_len: per_agent_output,
408            mask_len: per_agent_mask,
409            reply: reply_tx,
410        };
411
412        obs_tx.send(task).map_err(|_| ObsError::ExecutionFailed {
413            reason: "egress pool shut down".into(),
414        })?;
415
416        let result = reply_rx.recv().map_err(|_| ObsError::ExecutionFailed {
417            reason: "worker disconnected".into(),
418        })?;
419
420        match result {
421            ObsResult::Agents {
422                metadata,
423                output: buf,
424                mask: mbuf,
425            } => {
426                if buf.len() > output.len() || mbuf.len() > mask.len() {
427                    return Err(ObsError::ExecutionFailed {
428                        reason: format!(
429                            "output buffer too small: need ({}, {}) got ({}, {})",
430                            buf.len(),
431                            mbuf.len(),
432                            output.len(),
433                            mask.len()
434                        ),
435                    });
436                }
437                output[..buf.len()].copy_from_slice(&buf);
438                mask[..mbuf.len()].copy_from_slice(&mbuf);
439                Ok(metadata)
440            }
441            ObsResult::Error(e) => Err(e),
442            _ => Err(ObsError::ExecutionFailed {
443                reason: "unexpected result type".into(),
444            }),
445        }
446    }
447
448    /// Spawn egress worker threads (shared between `new` and `reset`).
449    ///
450    /// Takes ownership of `obs_tx` so that on partial failure (some
451    /// workers spawned, one fails), the sender can be dropped to
452    /// disconnect the channel — workers' `recv()` then returns `Err`
453    /// and they exit cleanly. Returns the sender alongside the handles
454    /// on success.
455    fn spawn_egress_workers(
456        worker_count: usize,
457        obs_tx: crossbeam_channel::Sender<ObsTask>,
458        obs_rx: &crossbeam_channel::Receiver<ObsTask>,
459        ring: &Arc<SnapshotRing>,
460        epoch_counter: &Arc<EpochCounter>,
461        worker_epochs: &Arc<[WorkerEpoch]>,
462    ) -> Result<(crossbeam_channel::Sender<ObsTask>, Vec<JoinHandle<()>>), ConfigError> {
463        let mut worker_threads = Vec::with_capacity(worker_count);
464        for i in 0..worker_count {
465            let obs_rx = obs_rx.clone();
466            let ring = Arc::clone(ring);
467            let epoch = Arc::clone(epoch_counter);
468            let worker_epochs_ref = Arc::clone(worker_epochs);
469            match thread::Builder::new()
470                .name(format!("murk-egress-{i}"))
471                .spawn(move || {
472                    crate::egress::worker_loop_indexed(obs_rx, ring, epoch, worker_epochs_ref, i);
473                }) {
474                Ok(handle) => worker_threads.push(handle),
475                Err(e) => {
476                    // Drop the sender so workers' recv() returns Err
477                    // and they exit cleanly, then join them.
478                    drop(obs_tx);
479                    for handle in worker_threads {
480                        let _ = handle.join();
481                    }
482                    return Err(ConfigError::ThreadSpawnFailed {
483                        reason: format!("egress worker {i}: {e}"),
484                    });
485                }
486            }
487        }
488        Ok((obs_tx, worker_threads))
489    }
490
491    /// Get the latest snapshot directly from the ring.
492    pub fn latest_snapshot(&self) -> Option<Arc<OwnedSnapshot>> {
493        self.ring.latest()
494    }
495
496    /// Current epoch (lock-free read).
497    pub fn current_epoch(&self) -> u64 {
498        self.epoch_counter.current()
499    }
500
501    /// Non-blocking queue/ring visibility for readiness checks.
502    ///
503    /// This API is intended for callers that need to detect overload risk
504    /// before making blocking calls like `observe`.
505    pub fn preflight(&self) -> RealtimePreflight {
506        let (command_queue_depth, command_queue_capacity) = self
507            .cmd_tx
508            .as_ref()
509            .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
510            .unwrap_or((0, 0));
511        let (observe_queue_depth, observe_queue_capacity) = self
512            .obs_tx
513            .as_ref()
514            .map(|tx| (tx.len(), tx.capacity().unwrap_or(0)))
515            .unwrap_or((0, 0));
516        let ring = crate::egress::ring_preflight(&self.ring, &self.epoch_counter);
517
518        RealtimePreflight {
519            command_queue_depth,
520            command_queue_capacity,
521            observe_queue_depth,
522            observe_queue_capacity,
523            has_snapshot: ring.has_snapshot,
524            latest_snapshot_tick_id: ring.latest_tick_id,
525            snapshot_age_ticks: ring.age_ticks,
526            ring_capacity: ring.ring_capacity,
527            ring_len: ring.ring_len,
528            ring_write_pos: ring.ring_write_pos,
529            ring_oldest_retained_pos: ring.ring_oldest_retained_pos,
530            ring_eviction_events: ring.ring_eviction_events,
531            ring_stale_read_events: ring.ring_stale_read_events,
532            ring_skew_retry_events: ring.ring_skew_retry_events,
533            tick_thread_stopped: self.tick_stopped.load(Ordering::Acquire),
534        }
535    }
536
537    /// Shutdown the world with the 4-state machine.
538    ///
539    /// 1. **Running → Draining (≤33ms):** Set shutdown flag, unpark tick
540    ///    thread (wakes it from budget sleep immediately), wait for tick stop.
541    /// 2. **Draining → Quiescing (≤200ms):** Cancel workers, drop obs channel.
542    /// 3. **Quiescing → Dropped (≤10ms):** Join all threads.
543    pub fn shutdown(&mut self) -> ShutdownReport {
544        if self.state == ShutdownState::Dropped {
545            return ShutdownReport {
546                total_ms: 0,
547                drain_ms: 0,
548                quiesce_ms: 0,
549                tick_joined: true,
550                workers_joined: 0,
551            };
552        }
553
554        let start = Instant::now();
555
556        // Phase 1: Running → Draining
557        self.state = ShutdownState::Draining;
558        self.shutdown_flag.store(true, Ordering::Release);
559
560        // Wake the tick thread if it's parked in a budget sleep.
561        // park_timeout is used instead of thread::sleep, so unpark()
562        // provides immediate wakeup regardless of tick_rate_hz.
563        if let Some(handle) = &self.tick_thread {
564            handle.thread().unpark();
565        }
566
567        // Wait for tick thread to acknowledge (≤33ms budget).
568        let drain_deadline = Instant::now() + Duration::from_millis(33);
569        while !self.tick_stopped.load(Ordering::Acquire) {
570            if Instant::now() > drain_deadline {
571                break;
572            }
573            thread::yield_now();
574        }
575        let drain_ms = start.elapsed().as_millis() as u64;
576
577        // Phase 2: Draining → Quiescing
578        self.state = ShutdownState::Quiescing;
579
580        // Cancel all workers.
581        for w in self.worker_epochs.iter() {
582            w.request_cancel();
583        }
584
585        // Drop the command and observation channels to unblock workers.
586        self.cmd_tx.take();
587        self.obs_tx.take();
588
589        // Wait for all workers to unpin (≤200ms budget).
590        let quiesce_deadline = Instant::now() + Duration::from_millis(200);
591        loop {
592            let all_unpinned = self.worker_epochs.iter().all(|w| !w.is_pinned());
593            if all_unpinned || Instant::now() > quiesce_deadline {
594                break;
595            }
596            thread::yield_now();
597        }
598        let quiesce_ms = start.elapsed().as_millis() as u64 - drain_ms;
599
600        // Phase 3: Quiescing → Dropped
601        self.state = ShutdownState::Dropped;
602
603        let tick_joined = if let Some(handle) = self.tick_thread.take() {
604            match handle.join() {
605                Ok(engine) => {
606                    *self.recovered_engine.lock().unwrap() = Some(engine);
607                    true
608                }
609                Err(_) => false,
610            }
611        } else {
612            true
613        };
614
615        let mut workers_joined = 0;
616        for handle in self.worker_threads.drain(..) {
617            if handle.join().is_ok() {
618                workers_joined += 1;
619            }
620        }
621
622        let total_ms = start.elapsed().as_millis() as u64;
623        ShutdownReport {
624            total_ms,
625            drain_ms,
626            quiesce_ms,
627            tick_joined,
628            workers_joined,
629        }
630    }
631
632    /// Reset the world: stop all threads, reset engine state, restart.
633    ///
634    /// This is the RL episode-boundary operation. The engine is recovered
635    /// from the tick thread, reset in-place, then respawned with fresh
636    /// channels and worker threads.
637    pub fn reset(&mut self, seed: u64) -> Result<(), ConfigError> {
638        // Shutdown if still running (recovers engine via JoinHandle).
639        if self.state != ShutdownState::Dropped {
640            self.shutdown();
641        }
642
643        self.seed = seed;
644
645        // Recover the engine from the previous tick thread.
646        let mut engine = self
647            .recovered_engine
648            .lock()
649            .unwrap()
650            .take()
651            .ok_or(ConfigError::EngineRecoveryFailed)?;
652
653        // Reset the engine (clears arena, ingress, tick counter).
654        // If reset fails, restore the engine so a subsequent reset can retry.
655        if let Err(e) = engine.reset() {
656            *self.recovered_engine.lock().unwrap() = Some(engine);
657            return Err(e);
658        }
659
660        // Fresh shared state.
661        let worker_count = self.config.resolved_worker_count();
662        self.ring = Arc::new(SnapshotRing::new(self.ring.capacity()));
663        self.epoch_counter = Arc::new(EpochCounter::new());
664        self.worker_epochs = (0..worker_count as u32)
665            .map(WorkerEpoch::new)
666            .collect::<Vec<_>>()
667            .into();
668        self.shutdown_flag = Arc::new(AtomicBool::new(false));
669        self.tick_stopped = Arc::new(AtomicBool::new(false));
670
671        // Fresh channels.
672        let (cmd_tx, cmd_rx) = crossbeam_channel::bounded(64);
673        let (obs_tx, obs_rx) = crossbeam_channel::bounded(worker_count * 4);
674        self.cmd_tx = Some(cmd_tx);
675
676        // Respawn tick thread. The engine is sent via a rendezvous
677        // channel *after* spawn succeeds so that a spawn failure doesn't
678        // consume it (the closure would be dropped, losing the engine
679        // permanently and bricking subsequent reset() calls).
680        let (engine_tx, engine_rx) = crossbeam_channel::bounded(0);
681        let tick_ring = Arc::clone(&self.ring);
682        let tick_epoch = Arc::clone(&self.epoch_counter);
683        let tick_workers = Arc::clone(&self.worker_epochs);
684        let tick_shutdown = Arc::clone(&self.shutdown_flag);
685        let tick_stopped_flag = Arc::clone(&self.tick_stopped);
686        let tick_rate_hz = self.tick_rate_hz;
687        let max_epoch_hold_ms = self.config.max_epoch_hold_ms;
688        let cancel_grace_ms = self.config.cancel_grace_ms;
689        let backoff_config = self.backoff_config.clone();
690        let tick_thread = match thread::Builder::new()
691            .name("murk-tick".into())
692            .spawn(move || {
693                let engine = engine_rx.recv().expect("engine channel closed before send");
694                let state = TickThreadState::new(
695                    engine,
696                    tick_ring,
697                    tick_epoch,
698                    tick_workers,
699                    cmd_rx,
700                    tick_shutdown,
701                    tick_stopped_flag,
702                    tick_rate_hz,
703                    max_epoch_hold_ms,
704                    cancel_grace_ms,
705                    &backoff_config,
706                );
707                state.run()
708            }) {
709            Ok(handle) => handle,
710            Err(e) => {
711                // Spawn failed — engine was never sent, restore for retry.
712                *self.recovered_engine.lock().unwrap() = Some(engine);
713                return Err(ConfigError::ThreadSpawnFailed {
714                    reason: format!("tick thread: {e}"),
715                });
716            }
717        };
718
719        // Spawn succeeded — send engine to tick thread.
720        engine_tx
721            .send(engine)
722            .expect("tick thread died before receiving engine");
723
724        // Respawn egress workers. On failure, shut down the tick thread
725        // and recover the engine so the world remains usable for retry.
726        let (obs_tx, worker_threads) = match Self::spawn_egress_workers(
727            worker_count,
728            obs_tx,
729            &obs_rx,
730            &self.ring,
731            &self.epoch_counter,
732            &self.worker_epochs,
733        ) {
734            Ok(result) => result,
735            Err(e) => {
736                self.shutdown_flag.store(true, Ordering::Release);
737                tick_thread.thread().unpark();
738                if let Ok(engine) = tick_thread.join() {
739                    *self.recovered_engine.lock().unwrap() = Some(engine);
740                }
741                return Err(e);
742            }
743        };
744
745        self.obs_tx = Some(obs_tx);
746        self.tick_thread = Some(tick_thread);
747        self.worker_threads = worker_threads;
748
749        self.state = ShutdownState::Running;
750        Ok(())
751    }
752
753    /// The shared space used for agent-relative observations.
754    pub fn space(&self) -> &dyn Space {
755        self.space.as_ref()
756    }
757}
758
759impl Drop for RealtimeAsyncWorld {
760    fn drop(&mut self) {
761        if self.state != ShutdownState::Dropped {
762            self.shutdown();
763        }
764    }
765}
766
767// ── ArcSpaceWrapper ──────────────────────────────────────────────
768
769/// Wrapper that implements `Space` by delegating to an `Arc<dyn Space>`.
770///
771/// This allows `TickEngine` to take a `Box<dyn Space>` while the
772/// `RealtimeAsyncWorld` retains a shared `Arc` reference for egress.
773struct ArcSpaceWrapper(Arc<dyn Space>);
774
775impl murk_space::Space for ArcSpaceWrapper {
776    fn ndim(&self) -> usize {
777        self.0.ndim()
778    }
779
780    fn cell_count(&self) -> usize {
781        self.0.cell_count()
782    }
783
784    fn neighbours(&self, coord: &Coord) -> smallvec::SmallVec<[Coord; 8]> {
785        self.0.neighbours(coord)
786    }
787
788    fn distance(&self, a: &Coord, b: &Coord) -> f64 {
789        self.0.distance(a, b)
790    }
791
792    fn compile_region(
793        &self,
794        spec: &murk_space::RegionSpec,
795    ) -> Result<murk_space::RegionPlan, murk_space::error::SpaceError> {
796        self.0.compile_region(spec)
797    }
798
799    fn canonical_ordering(&self) -> Vec<Coord> {
800        self.0.canonical_ordering()
801    }
802
803    fn canonical_rank(&self, coord: &Coord) -> Option<usize> {
804        self.0.canonical_rank(coord)
805    }
806
807    fn instance_id(&self) -> murk_core::SpaceInstanceId {
808        self.0.instance_id()
809    }
810
811    fn topology_eq(&self, other: &dyn murk_space::Space) -> bool {
812        // Unwrap ArcSpaceWrapper so the inner space's downcast-based
813        // comparison sees the concrete type, not this wrapper.
814        if let Some(w) = (other as &dyn std::any::Any).downcast_ref::<ArcSpaceWrapper>() {
815            self.0.topology_eq(&*w.0)
816        } else {
817            self.0.topology_eq(other)
818        }
819    }
820}
821
822// Delegate optional Space methods if they exist.
823impl std::fmt::Debug for ArcSpaceWrapper {
824    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
825        f.debug_tuple("ArcSpaceWrapper").finish()
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832    use murk_core::id::FieldId;
833    use murk_core::{BoundaryBehavior, FieldDef, FieldMutability, FieldType};
834    use murk_obs::spec::ObsRegion;
835    use murk_obs::{ObsEntry, ObsSpec};
836    use murk_space::{EdgeBehavior, Line1D};
837    use murk_test_utils::ConstPropagator;
838
839    fn scalar_field(name: &str) -> FieldDef {
840        FieldDef {
841            name: name.to_string(),
842            field_type: FieldType::Scalar,
843            mutability: FieldMutability::PerTick,
844            units: None,
845            bounds: None,
846            boundary_behavior: BoundaryBehavior::Clamp,
847        }
848    }
849
850    fn test_config() -> WorldConfig {
851        WorldConfig {
852            space: Box::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()),
853            fields: vec![scalar_field("energy")],
854            propagators: vec![Box::new(ConstPropagator::new("const", FieldId(0), 42.0))],
855            dt: 0.1,
856            seed: 42,
857            ring_buffer_size: 8,
858            max_ingress_queue: 1024,
859            tick_rate_hz: Some(60.0),
860            backoff: crate::config::BackoffConfig::default(),
861        }
862    }
863
864    #[test]
865    fn lifecycle_start_and_shutdown() {
866        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
867
868        // Wait for at least one snapshot (polling with timeout).
869        let deadline = Instant::now() + Duration::from_secs(2);
870        while world.latest_snapshot().is_none() {
871            if Instant::now() > deadline {
872                panic!("no snapshot produced within 2s");
873            }
874            std::thread::sleep(Duration::from_millis(10));
875        }
876
877        let epoch = world.current_epoch();
878        assert!(epoch > 0, "epoch should have advanced");
879
880        let report = world.shutdown();
881        assert!(report.tick_joined);
882        assert!(report.workers_joined > 0);
883    }
884
885    #[test]
886    fn observe_returns_data() {
887        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
888
889        // Wait for at least one snapshot.
890        let deadline = Instant::now() + Duration::from_secs(2);
891        while world.latest_snapshot().is_none() {
892            if Instant::now() > deadline {
893                panic!("no snapshot produced within 2s");
894            }
895            std::thread::sleep(Duration::from_millis(10));
896        }
897
898        // Compile an obs plan.
899        let space = world.space();
900        let spec = ObsSpec {
901            entries: vec![ObsEntry {
902                field_id: FieldId(0),
903                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
904                pool: None,
905                transform: murk_obs::spec::ObsTransform::Identity,
906                dtype: murk_obs::spec::ObsDtype::F32,
907            }],
908        };
909        let plan_result = ObsPlan::compile(&spec, space).unwrap();
910        let plan = Arc::new(plan_result.plan);
911
912        let mut output = vec![0.0f32; plan_result.output_len];
913        let mut mask = vec![0u8; plan_result.mask_len];
914
915        let meta = world.observe(&plan, &mut output, &mut mask).unwrap();
916        assert!(meta.tick_id.0 > 0);
917        assert_eq!(output.len(), 10);
918        assert!(output.iter().all(|&v| v == 42.0));
919
920        world.shutdown();
921    }
922
923    #[test]
924    fn concurrent_observe() {
925        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
926
927        // Wait for at least one snapshot.
928        let deadline = Instant::now() + Duration::from_secs(2);
929        while world.latest_snapshot().is_none() {
930            if Instant::now() > deadline {
931                panic!("no snapshot produced within 2s");
932            }
933            std::thread::sleep(Duration::from_millis(10));
934        }
935
936        let space = world.space();
937        let spec = ObsSpec {
938            entries: vec![ObsEntry {
939                field_id: FieldId(0),
940                region: ObsRegion::Fixed(murk_space::RegionSpec::All),
941                pool: None,
942                transform: murk_obs::spec::ObsTransform::Identity,
943                dtype: murk_obs::spec::ObsDtype::F32,
944            }],
945        };
946        let plan_result = ObsPlan::compile(&spec, space).unwrap();
947        let plan = Arc::new(plan_result.plan);
948
949        // Spawn 4 threads all calling observe concurrently.
950        let world = Arc::new(world);
951        let handles: Vec<_> = (0..4)
952            .map(|_| {
953                let w = Arc::clone(&world);
954                let p = Arc::clone(&plan);
955                let out_len = plan_result.output_len;
956                let mask_len = plan_result.mask_len;
957                std::thread::spawn(move || {
958                    let mut output = vec![0.0f32; out_len];
959                    let mut mask = vec![0u8; mask_len];
960                    let meta = w.observe(&p, &mut output, &mut mask).unwrap();
961                    assert!(meta.tick_id.0 > 0);
962                    assert!(output.iter().all(|&v| v == 42.0));
963                })
964            })
965            .collect();
966
967        for h in handles {
968            h.join().unwrap();
969        }
970
971        // Shutdown via Arc — need to unwrap.
972        // Since we can't call shutdown on Arc, Drop will handle it.
973        drop(world);
974    }
975
976    #[test]
977    fn submit_commands_flow_through() {
978        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
979
980        // Wait for first tick.
981        let deadline = Instant::now() + Duration::from_secs(2);
982        while world.latest_snapshot().is_none() {
983            if Instant::now() > deadline {
984                panic!("no snapshot produced within 2s");
985            }
986            std::thread::sleep(Duration::from_millis(10));
987        }
988
989        // Submit a command.
990        let cmd = Command {
991            payload: murk_core::command::CommandPayload::SetParameter {
992                key: murk_core::id::ParameterKey(0),
993                value: 1.0,
994            },
995            expires_after_tick: murk_core::id::TickId(10000),
996            source_id: None,
997            source_seq: None,
998            priority_class: 1,
999            arrival_seq: 0,
1000        };
1001        let receipts = world.submit_commands(vec![cmd]).unwrap();
1002        assert_eq!(receipts.len(), 1);
1003        assert!(receipts[0].accepted);
1004
1005        world.shutdown();
1006    }
1007
1008    #[test]
1009    fn drop_triggers_shutdown() {
1010        let world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1011        std::thread::sleep(Duration::from_millis(50));
1012        drop(world);
1013        // If this doesn't hang, shutdown worked.
1014    }
1015
1016    #[test]
1017    fn shutdown_budget() {
1018        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1019        std::thread::sleep(Duration::from_millis(100));
1020
1021        let report = world.shutdown();
1022        // Shutdown should complete well within 2s (generous for slow CI runners).
1023        assert!(
1024            report.total_ms < 2000,
1025            "shutdown took too long: {}ms",
1026            report.total_ms
1027        );
1028    }
1029
1030    /// Regression test: with a very slow tick rate, shutdown must still
1031    /// complete within the documented budget. Before the fix, the tick
1032    /// thread used `thread::sleep` which was uninterruptible by the
1033    /// shutdown flag, causing shutdown to block for the full tick budget.
1034    #[test]
1035    fn shutdown_fast_with_slow_tick_rate() {
1036        let config = WorldConfig {
1037            tick_rate_hz: Some(0.5), // 2-second tick budget
1038            ..test_config()
1039        };
1040        let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1041
1042        // Wait for at least one tick to complete so the ring is non-empty
1043        // and the tick thread enters its budget sleep.
1044        let deadline = Instant::now() + Duration::from_secs(5);
1045        while world.latest_snapshot().is_none() {
1046            if Instant::now() > deadline {
1047                panic!("no snapshot produced within 5s");
1048            }
1049            std::thread::sleep(Duration::from_millis(10));
1050        }
1051
1052        // Give the tick thread time to enter its budget sleep.
1053        std::thread::sleep(Duration::from_millis(50));
1054
1055        let start = Instant::now();
1056        let report = world.shutdown();
1057        let wall_ms = start.elapsed().as_millis();
1058
1059        // Shutdown should complete well under the 2-second tick budget.
1060        // Allow 500ms for CI overhead; the point is it shouldn't take 2s.
1061        assert!(
1062            wall_ms < 500,
1063            "shutdown took {wall_ms}ms with 0.5Hz tick rate \
1064             (report: total={}ms, drain={}ms, quiesce={}ms)",
1065            report.total_ms,
1066            report.drain_ms,
1067            report.quiesce_ms
1068        );
1069        assert!(report.tick_joined);
1070    }
1071
1072    #[test]
1073    fn preflight_reports_queue_capacities_and_snapshot_readiness() {
1074        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1075
1076        let initial = world.preflight();
1077        assert_eq!(initial.command_queue_capacity, 64);
1078        assert!(initial.observe_queue_capacity > 0);
1079        assert_eq!(initial.ring_capacity, 8);
1080        assert!(initial.ring_len <= initial.ring_capacity);
1081        assert!(initial.ring_write_pos >= initial.ring_len as u64);
1082        if initial.has_snapshot {
1083            assert!(initial.ring_len > 0);
1084            assert!(initial.latest_snapshot_tick_id > 0);
1085            assert!(initial.ring_oldest_retained_pos.is_some());
1086        } else {
1087            assert_eq!(initial.ring_len, 0);
1088            assert_eq!(initial.ring_write_pos, 0);
1089            assert_eq!(initial.ring_oldest_retained_pos, None);
1090        }
1091        assert!(!initial.tick_thread_stopped);
1092
1093        let deadline = Instant::now() + Duration::from_secs(2);
1094        let mut ready = initial;
1095        while !ready.has_snapshot {
1096            if Instant::now() > deadline {
1097                panic!("preflight never reported available snapshot");
1098            }
1099            std::thread::sleep(Duration::from_millis(10));
1100            ready = world.preflight();
1101        }
1102        assert!(ready.latest_snapshot_tick_id > 0);
1103        assert!(ready.ring_len > 0);
1104        assert!(ready.ring_write_pos >= ready.ring_len as u64);
1105        assert!(ready.ring_oldest_retained_pos.is_some());
1106
1107        world.shutdown();
1108        let stopped = world.preflight();
1109        assert_eq!(stopped.command_queue_capacity, 0);
1110        assert_eq!(stopped.observe_queue_capacity, 0);
1111        assert!(stopped.tick_thread_stopped);
1112    }
1113
1114    #[test]
1115    fn preflight_observes_ingress_backlog() {
1116        let config = WorldConfig {
1117            tick_rate_hz: Some(0.5),
1118            ..test_config()
1119        };
1120        let mut world = RealtimeAsyncWorld::new(config, AsyncConfig::default()).unwrap();
1121
1122        // Wait for the first publish so the tick thread enters budget sleep.
1123        // Enqueueing after this point makes backlog visibility deterministic
1124        // in slow/instrumented runners (e.g., tarpaulin).
1125        let ready_deadline = Instant::now() + Duration::from_secs(5);
1126        while world.latest_snapshot().is_none() {
1127            if Instant::now() > ready_deadline {
1128                panic!("no snapshot produced within 5s");
1129            }
1130            std::thread::sleep(Duration::from_millis(10));
1131        }
1132
1133        let cmd_tx = world
1134            .cmd_tx
1135            .as_ref()
1136            .expect("cmd channel must exist")
1137            .clone();
1138
1139        for _ in 0..8 {
1140            let (reply_tx, _reply_rx) = crossbeam_channel::bounded(1);
1141            cmd_tx
1142                .try_send(IngressBatch {
1143                    commands: Vec::new(),
1144                    reply: reply_tx,
1145                })
1146                .expect("expected ingress queue to accept batch");
1147        }
1148
1149        let deadline = Instant::now() + Duration::from_millis(500);
1150        let mut preflight = world.preflight();
1151        while preflight.command_queue_depth == 0 {
1152            if Instant::now() > deadline {
1153                break;
1154            }
1155            std::thread::sleep(Duration::from_millis(5));
1156            preflight = world.preflight();
1157        }
1158        assert!(preflight.command_queue_depth > 0);
1159        assert!(preflight.command_queue_depth <= preflight.command_queue_capacity);
1160
1161        world.shutdown();
1162    }
1163
1164    #[test]
1165    fn reset_lifecycle() {
1166        let mut world = RealtimeAsyncWorld::new(test_config(), AsyncConfig::default()).unwrap();
1167
1168        // Wait for some ticks (generous timeout for slow CI runners).
1169        let deadline = Instant::now() + Duration::from_secs(5);
1170        while world.current_epoch() < 5 {
1171            if Instant::now() > deadline {
1172                panic!("epoch didn't reach 5 within 5s");
1173            }
1174            std::thread::sleep(Duration::from_millis(10));
1175        }
1176        let epoch_before = world.current_epoch();
1177        assert!(epoch_before >= 5);
1178
1179        // Reset with a new seed.
1180        world.reset(99).unwrap();
1181
1182        // After reset, epoch should restart from 0.
1183        assert_eq!(world.current_epoch(), 0);
1184
1185        // The world should produce new snapshots.
1186        let deadline = Instant::now() + Duration::from_secs(2);
1187        while world.latest_snapshot().is_none() {
1188            if Instant::now() > deadline {
1189                panic!("no snapshot after reset within 2s");
1190            }
1191            std::thread::sleep(Duration::from_millis(10));
1192        }
1193        assert!(world.current_epoch() > 0, "should be ticking after reset");
1194
1195        world.shutdown();
1196    }
1197
1198    #[test]
1199    fn arc_space_wrapper_topology_eq() {
1200        // Two ArcSpaceWrappers around identical Line1D spaces must compare
1201        // as topologically equal. Before the fix, the inner downcast saw
1202        // ArcSpaceWrapper instead of Line1D and returned false.
1203        let a = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1204        let b = ArcSpaceWrapper(Arc::new(Line1D::new(10, EdgeBehavior::Absorb).unwrap()));
1205        assert!(
1206            a.topology_eq(&b),
1207            "identical Line1D through ArcSpaceWrapper should be topology-equal"
1208        );
1209
1210        // Different sizes must not match.
1211        let c = ArcSpaceWrapper(Arc::new(Line1D::new(20, EdgeBehavior::Absorb).unwrap()));
1212        assert!(
1213            !a.topology_eq(&c),
1214            "different Line1D sizes should not be topology-equal"
1215        );
1216
1217        // Comparing ArcSpaceWrapper with a bare Line1D should also work.
1218        let bare = Line1D::new(10, EdgeBehavior::Absorb).unwrap();
1219        assert!(
1220            a.topology_eq(&bare),
1221            "ArcSpaceWrapper(Line1D) vs bare Line1D should be topology-equal"
1222        );
1223    }
1224}