Skip to main content

rustsim_core/
event_queue.rs

1//! Continuous-time event-queue model.
2//!
3//! [`EventQueueModel`] mirrors Julia Agents.jl `EventQueueABM`. Instead of
4//! fixed-tick stepping, agents schedule events at arbitrary future times.
5//! A deterministic priority queue processes events in chronological order,
6//! with tie-breaking by insertion sequence number.
7//!
8//! # Event processing
9//!
10//! 1. Pop the earliest event from the queue.
11//! 2. Advance model time to the event's timestamp.
12//! 3. Look up the target agent; skip if removed.
13//! 4. Call the action function indexed by `event_idx`.
14//! 5. Apply any deferred add/remove actions.
15//!
16//! Events for removed agents are silently skipped (not an error).
17//!
18//! # Determinism
19//!
20//! Events with identical timestamps are ordered by a monotonic sequence
21//! number assigned at insertion time.
22//!
23//! Replay is deterministic for a fixed seed when all of the following are held
24//! constant:
25//! - the same initial event insertion order
26//! - the same action functions
27//! - the same event rescheduling behavior inside actions
28//! - the same agent add/remove behavior
29//!
30//! Equal-time events are therefore reproducible by construction, but changing
31//! the order in which they are inserted will intentionally change execution
32//! order.
33
34#![allow(clippy::type_complexity)]
35
36use crate::{
37    agent::Agent,
38    interaction::{InteractionError, PositionedAgent, SpaceInteraction},
39    model::Model,
40    space::Space,
41    step_context::DeferredAction,
42    store::AgentStore,
43    types::{AgentId, Time},
44};
45use rand::RngCore;
46use std::cell::{Ref, RefMut};
47use std::cmp::Ordering;
48use std::collections::BinaryHeap;
49use tracing::{debug, trace};
50
51/// A scheduled event in the queue.
52#[derive(Debug, Clone)]
53pub struct Event {
54    /// Absolute time at which this event fires.
55    pub time: f64,
56    /// Target agent for this event.
57    pub agent_id: AgentId,
58    /// Index into the model's `actions` vector selecting which function to call.
59    pub event_idx: usize,
60    /// Monotonic sequence number for deterministic tie-breaking.
61    sequence: u64,
62}
63
64impl<S, A, Store, Props, R> EventQueueModel<S, A, Store, Props, R>
65where
66    A: PositionedAgent,
67    S: SpaceInteraction<A>,
68    Store: AgentStore<A>,
69    R: RngCore,
70{
71    /// Insert a positioned agent into both the store and the space atomically.
72    pub fn insert_positioned_agent(&mut self, agent: A) -> Result<(), InteractionError<S::Error>> {
73        let id = agent.id();
74        if self.agents.contains(id) {
75            return Err(InteractionError::DuplicateId(id));
76        }
77        self.space
78            .add_agent(&agent)
79            .map_err(InteractionError::Space)?;
80        self.agents.insert(agent);
81        if id > self.max_id {
82            self.max_id = id;
83        }
84        Ok(())
85    }
86
87    /// Remove a positioned agent from both the store and the space atomically.
88    pub fn remove_positioned_agent(
89        &mut self,
90        id: AgentId,
91    ) -> Result<Option<A>, InteractionError<S::Error>> {
92        let Some(agent_ref) = self.agents.get(id) else {
93            return Ok(None);
94        };
95        self.space
96            .remove_agent(&*agent_ref)
97            .map_err(InteractionError::Space)?;
98        drop(agent_ref);
99        Ok(self.agents.remove(id))
100    }
101
102    /// Move a positioned agent, updating both the agent value and spatial index.
103    pub fn move_positioned_agent(
104        &mut self,
105        id: AgentId,
106        new_position: A::Position,
107    ) -> Result<(), InteractionError<S::Error>> {
108        let mut agent_ref = self
109            .agents
110            .get_mut(id)
111            .ok_or(InteractionError::AgentNotFound(id))?;
112        let old_position = agent_ref.position().clone();
113
114        self.space
115            .remove_agent(&*agent_ref)
116            .map_err(InteractionError::Space)?;
117        agent_ref.set_position(new_position);
118
119        if let Err(source) = self.space.add_agent(&*agent_ref) {
120            agent_ref.set_position(old_position);
121            if let Err(rollback) = self.space.add_agent(&*agent_ref) {
122                return Err(InteractionError::RollbackFailed {
123                    operation: "move_positioned_agent",
124                    source,
125                    rollback,
126                });
127            }
128            return Err(InteractionError::Space(source));
129        }
130        Ok(())
131    }
132
133    /// Validate that all stored agents are represented by the spatial index.
134    pub fn validate_space_index(&self) -> Result<(), InteractionError<S::Error>> {
135        for id in self.agents.iter_ids() {
136            let Some(agent) = self.agents.get(id) else {
137                continue;
138            };
139            let matches = self
140                .space
141                .nearby_ids(agent.position(), 0)
142                .into_iter()
143                .filter(|candidate| *candidate == id)
144                .count();
145            match matches {
146                0 => return Err(InteractionError::SpaceIndexMissing(id)),
147                1 => {}
148                _ => return Err(InteractionError::SpaceIndexDuplicate(id)),
149            }
150        }
151        Ok(())
152    }
153
154    /// Process the next event and apply deferred add/remove actions to both the
155    /// agent store and spatial index.
156    pub fn step_event_spatial(&mut self) -> Result<bool, InteractionError<S::Error>> {
157        let timed = match self.queue.pop() {
158            Some(te) => te,
159            None => {
160                trace!("step_event_spatial: queue empty");
161                return Ok(false);
162            }
163        };
164
165        let event = timed.0;
166        self.time = event.time;
167
168        if !self.agents.contains(event.agent_id) {
169            trace!(
170                agent_id = event.agent_id,
171                time = event.time,
172                "skipping event for removed agent"
173            );
174            return Ok(true);
175        }
176
177        if event.event_idx < self.actions.len() {
178            let action = self.actions[event.event_idx];
179
180            let Some(mut agent_ref) = self.agents.get_mut(event.agent_id) else {
181                return Ok(true);
182            };
183
184            let mut rng = self.rng.borrow_mut();
185            let mut deferred: Vec<DeferredAction<A>> = Vec::new();
186
187            {
188                let mut ctx = EventContext {
189                    space: &mut self.space,
190                    properties: &mut self.properties,
191                    rng: &mut *rng,
192                    queue: &mut self.queue,
193                    sequence: &mut self.sequence,
194                    time: self.time,
195                    deferred: &mut deferred,
196                };
197
198                action(&mut *agent_ref, &mut ctx);
199            }
200
201            drop(agent_ref);
202            drop(rng);
203
204            self.apply_deferred_actions_spatial(deferred)?;
205        }
206
207        Ok(true)
208    }
209
210    /// Process all events up to and including time `t_end` with spatially
211    /// consistent deferred lifecycle updates.
212    pub fn step_until_spatial(&mut self, t_end: f64) -> Result<(), InteractionError<S::Error>> {
213        loop {
214            match self.queue.peek() {
215                Some(te) if te.0.time <= t_end => {}
216                _ => {
217                    self.time = t_end;
218                    debug!(
219                        time = t_end,
220                        queue_len = self.queue.len(),
221                        "step_until_spatial reached boundary"
222                    );
223                    return Ok(());
224                }
225            }
226            self.step_event_spatial()?;
227        }
228    }
229
230    /// Process up to `n` events with spatially consistent lifecycle updates.
231    pub fn run_events_spatial(&mut self, n: usize) -> Result<(), InteractionError<S::Error>> {
232        for _ in 0..n {
233            if !self.step_event_spatial()? {
234                break;
235            }
236        }
237        Ok(())
238    }
239
240    fn apply_deferred_actions_spatial(
241        &mut self,
242        deferred: Vec<DeferredAction<A>>,
243    ) -> Result<(), InteractionError<S::Error>> {
244        for action in deferred {
245            match action {
246                DeferredAction::RemoveAgent(id) => {
247                    self.remove_positioned_agent(id)?;
248                }
249                DeferredAction::InsertAgent(agent) => {
250                    self.insert_positioned_agent(agent)?;
251                }
252            }
253        }
254        Ok(())
255    }
256}
257
258/// Internal wrapper that implements `Ord` for the min-heap.
259#[derive(Debug, Clone)]
260pub(crate) struct TimedEvent(Event);
261
262impl PartialEq for TimedEvent {
263    fn eq(&self, other: &Self) -> bool {
264        self.0.time == other.0.time && self.0.sequence == other.0.sequence
265    }
266}
267
268impl Eq for TimedEvent {}
269
270impl PartialOrd for TimedEvent {
271    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
272        Some(self.cmp(other))
273    }
274}
275
276impl Ord for TimedEvent {
277    fn cmp(&self, other: &Self) -> Ordering {
278        other
279            .0
280            .time
281            .partial_cmp(&self.0.time)
282            .unwrap_or(Ordering::Equal)
283            .then_with(|| other.0.sequence.cmp(&self.0.sequence))
284    }
285}
286
287/// Context passed to event-queue action callbacks during [`EventQueueModel::step_event`].
288///
289/// Provides safe mutable access to space, properties, RNG, and the event queue
290/// without aliasing the currently-borrowed agent.
291pub struct EventContext<'a, S, A, Props, R>
292where
293    A: Agent,
294{
295    pub(crate) space: &'a mut S,
296    pub(crate) properties: &'a mut Props,
297    pub(crate) rng: &'a mut R,
298    pub(crate) queue: &'a mut BinaryHeap<TimedEvent>,
299    pub(crate) sequence: &'a mut u64,
300    pub(crate) time: f64,
301    pub(crate) deferred: &'a mut Vec<DeferredAction<A>>,
302}
303
304impl<'a, S, A, Props, R> EventContext<'a, S, A, Props, R>
305where
306    A: Agent,
307{
308    /// Immutable reference to the simulation space.
309    pub fn space(&self) -> &S {
310        self.space
311    }
312
313    /// Mutable reference to the simulation space.
314    pub fn space_mut(&mut self) -> &mut S {
315        self.space
316    }
317
318    /// Immutable reference to user-defined model properties.
319    pub fn properties(&self) -> &Props {
320        self.properties
321    }
322
323    /// Mutable reference to user-defined model properties.
324    pub fn properties_mut(&mut self) -> &mut Props {
325        self.properties
326    }
327
328    /// Mutable reference to the model's RNG.
329    pub fn rng(&mut self) -> &mut R {
330        self.rng
331    }
332
333    /// Current model time (the timestamp of the event being processed).
334    pub fn time(&self) -> f64 {
335        self.time
336    }
337
338    /// Schedule a new event relative to the current time.
339    ///
340    /// # Panics
341    ///
342    /// Panics if `dt` is negative, NaN, or infinite.
343    pub fn add_event(&mut self, agent_id: AgentId, event_idx: usize, dt: f64) {
344        assert!(
345            dt.is_finite() && dt >= 0.0,
346            "event dt must be finite and non-negative, got {dt}"
347        );
348        *self.sequence += 1;
349        let event = Event {
350            time: self.time + dt,
351            agent_id,
352            event_idx,
353            sequence: *self.sequence,
354        };
355        self.queue.push(TimedEvent(event));
356    }
357
358    /// Schedule an agent for removal after the current event completes.
359    pub fn defer_remove_agent(&mut self, id: AgentId) {
360        self.deferred.push(DeferredAction::RemoveAgent(id));
361    }
362
363    /// Schedule an agent for insertion after the current event completes.
364    pub fn defer_insert_agent(&mut self, agent: A) {
365        self.deferred.push(DeferredAction::InsertAgent(agent));
366    }
367}
368
369/// Continuous-time agent-based model driven by an event queue.
370///
371/// # Type Parameters
372///
373/// - `S` - space type
374/// - `A` - agent type implementing [`Agent`]
375/// - `Store` - agent container implementing [`AgentStore<A>`]
376/// - `Props` - user-defined model properties (use `()` if unused)
377/// - `R` - RNG type
378///
379/// # Example
380///
381/// ```ignore
382/// let actions: Vec<fn(&mut MyAgent, &mut EventContext<...>)> = vec![tick_action];
383/// let mut model = EventQueueModel::new(store, space, (), rng, actions);
384/// model.add_event(agent_id, 0, 1.0); // fire action 0 at t+1.0
385/// model.step_until(100.0);
386/// ```
387///
388/// [`Agent`]: crate::agent::Agent
389/// [`AgentStore`]: crate::store::AgentStore
390pub struct EventQueueModel<S, A, Store, Props, R>
391where
392    A: Agent,
393    S: Space,
394    Store: AgentStore<A>,
395    R: RngCore,
396{
397    pub(crate) agents: Store,
398    pub(crate) space: S,
399    pub(crate) properties: Props,
400    pub(crate) rng: std::cell::RefCell<R>,
401    pub(crate) time: f64,
402    pub(crate) max_id: AgentId,
403    pub(crate) queue: BinaryHeap<TimedEvent>,
404    pub(crate) sequence: u64,
405    pub(crate) actions: Vec<fn(&mut A, &mut EventContext<'_, S, A, Props, R>)>,
406    pub(crate) _agent: std::marker::PhantomData<A>,
407}
408
409impl<S, A, Store, Props, R> EventQueueModel<S, A, Store, Props, R>
410where
411    A: Agent,
412    S: Space,
413    Store: AgentStore<A>,
414    R: RngCore,
415{
416    /// Create a new `EventQueueModel`.
417    ///
418    /// # Arguments
419    ///
420    /// - `agents` - pre-populated agent store.
421    /// - `space` - the simulation space.
422    /// - `properties` - user-defined model properties.
423    /// - `rng` - seeded random number generator.
424    /// - `actions` - vector of event-action functions, indexed by `event_idx`.
425    pub fn new(
426        agents: Store,
427        space: S,
428        properties: Props,
429        rng: R,
430        actions: Vec<fn(&mut A, &mut EventContext<'_, S, A, Props, R>)>,
431    ) -> Self {
432        let max_id = agents.iter_ids().into_iter().max().unwrap_or(0);
433        Self {
434            agents,
435            space,
436            properties,
437            rng: std::cell::RefCell::new(rng),
438            time: 0.0,
439            max_id,
440            queue: BinaryHeap::new(),
441            sequence: 0,
442            actions,
443            _agent: std::marker::PhantomData,
444        }
445    }
446
447    /// Current model time as `f64`.
448    pub fn time_f64(&self) -> f64 {
449        self.time
450    }
451
452    /// Mutable access to the model's RNG (via `RefCell`).
453    pub fn rng_mut(&self) -> std::cell::RefMut<'_, R> {
454        self.rng.borrow_mut()
455    }
456
457    /// Immutable reference to the simulation space.
458    pub fn space(&self) -> &S {
459        &self.space
460    }
461
462    /// Mutable reference to the simulation space.
463    pub fn space_mut(&mut self) -> &mut S {
464        &mut self.space
465    }
466
467    /// Immutable reference to user-defined properties.
468    pub fn properties(&self) -> &Props {
469        &self.properties
470    }
471
472    /// Mutable reference to user-defined properties.
473    pub fn properties_mut(&mut self) -> &mut Props {
474        &mut self.properties
475    }
476
477    /// Borrow an agent immutably by ID.
478    pub fn agent(&self, id: AgentId) -> Option<Ref<'_, A>> {
479        self.agents.get(id)
480    }
481
482    /// Borrow an agent mutably by ID.
483    pub fn agent_mut(&self, id: AgentId) -> Option<RefMut<'_, A>> {
484        self.agents.get_mut(id)
485    }
486
487    /// Insert an agent into the store.
488    ///
489    /// Returns `Err(agent)` if an agent with the same ID already exists.
490    pub fn insert_agent(&mut self, agent: A) -> Result<(), A> {
491        let id = agent.id();
492        if self.agents.get(id).is_some() {
493            return Err(agent);
494        }
495        self.agents.insert(agent);
496        if id > self.max_id {
497            self.max_id = id;
498        }
499        Ok(())
500    }
501
502    /// Remove an agent by ID, returning it if found.
503    ///
504    /// Events targeting this agent will be silently skipped when they fire.
505    pub fn remove_agent(&mut self, id: AgentId) -> Option<A> {
506        self.agents.remove(id)
507    }
508
509    /// Generate the next unused agent ID (monotonically increasing).
510    pub fn next_id(&mut self) -> AgentId {
511        self.max_id += 1;
512        self.max_id
513    }
514
515    /// Schedule a new event at `self.time + dt`.
516    ///
517    /// # Panics
518    ///
519    /// Panics if `dt` is negative, NaN, or infinite.
520    pub fn add_event(&mut self, agent_id: AgentId, event_idx: usize, dt: f64) {
521        assert!(
522            dt.is_finite() && dt >= 0.0,
523            "event dt must be finite and non-negative, got {dt}"
524        );
525        self.sequence += 1;
526        let event = Event {
527            time: self.time + dt,
528            agent_id,
529            event_idx,
530            sequence: self.sequence,
531        };
532        self.queue.push(TimedEvent(event));
533    }
534
535    /// Number of events currently in the queue.
536    pub fn queue_len(&self) -> usize {
537        self.queue.len()
538    }
539
540    /// Returns `true` if the event queue is empty.
541    pub fn queue_is_empty(&self) -> bool {
542        self.queue.is_empty()
543    }
544
545    /// Peek at the timestamp of the next (earliest) event, if any.
546    pub fn peek_time(&self) -> Option<f64> {
547        self.queue.peek().map(|te| te.0.time)
548    }
549
550    /// Process the next event in the queue.
551    ///
552    /// Returns `true` if an event was popped (even if the target agent was
553    /// removed), `false` if the queue was empty.
554    pub fn step_event(&mut self) -> bool {
555        let timed = match self.queue.pop() {
556            Some(te) => te,
557            None => {
558                trace!("step_event: queue empty");
559                return false;
560            }
561        };
562
563        let event = timed.0;
564        self.time = event.time;
565
566        if !self.agents.contains(event.agent_id) {
567            trace!(
568                agent_id = event.agent_id,
569                time = event.time,
570                "skipping event for removed agent"
571            );
572            return true;
573        }
574
575        if event.event_idx < self.actions.len() {
576            let action = self.actions[event.event_idx];
577
578            let Some(mut agent_ref) = self.agents.get_mut(event.agent_id) else {
579                return true;
580            };
581
582            let mut rng = self.rng.borrow_mut();
583            let mut deferred: Vec<DeferredAction<A>> = Vec::new();
584
585            {
586                let mut ctx = EventContext {
587                    space: &mut self.space,
588                    properties: &mut self.properties,
589                    rng: &mut *rng,
590                    queue: &mut self.queue,
591                    sequence: &mut self.sequence,
592                    time: self.time,
593                    deferred: &mut deferred,
594                };
595
596                action(&mut *agent_ref, &mut ctx);
597            }
598
599            drop(agent_ref);
600            drop(rng);
601
602            for action in deferred {
603                match action {
604                    DeferredAction::RemoveAgent(id) => {
605                        self.remove_agent(id);
606                    }
607                    DeferredAction::InsertAgent(agent) => {
608                        let _ = self.insert_agent(agent);
609                    }
610                }
611            }
612        }
613
614        true
615    }
616
617    /// Process all events up to and including time `t_end`.
618    ///
619    /// After this call, `self.time_f64() == t_end`. Events scheduled after
620    /// `t_end` remain in the queue.
621    pub fn step_until(&mut self, t_end: f64) {
622        loop {
623            match self.queue.peek() {
624                Some(te) if te.0.time <= t_end => {}
625                _ => {
626                    self.time = t_end;
627                    debug!(
628                        time = t_end,
629                        queue_len = self.queue.len(),
630                        "step_until reached boundary"
631                    );
632                    return;
633                }
634            }
635            self.step_event();
636        }
637    }
638
639    /// Process up to `n` events from the queue.
640    ///
641    /// Stops early if the queue is exhausted.
642    pub fn run_events(&mut self, n: usize) {
643        for _ in 0..n {
644            if !self.step_event() {
645                break;
646            }
647        }
648    }
649}
650
651impl<S, A, Store, Props, R> Model for EventQueueModel<S, A, Store, Props, R>
652where
653    A: Agent,
654    S: Space,
655    Store: AgentStore<A>,
656    R: RngCore,
657{
658    type Agent = A;
659    type Space = S;
660    type Properties = Props;
661    type Rng = R;
662
663    // GATs implementation
664    type AgentRef<'a>
665        = Ref<'a, A>
666    where
667        Self: 'a;
668    type AgentRefMut<'a>
669        = RefMut<'a, A>
670    where
671        Self: 'a;
672
673    fn time(&self) -> Time {
674        Time::Continuous(self.time)
675    }
676
677    fn rng_mut(&self) -> impl std::ops::DerefMut<Target = Self::Rng> + '_ {
678        self.rng.borrow_mut()
679    }
680
681    fn space(&self) -> &Self::Space {
682        &self.space
683    }
684
685    fn properties(&self) -> &Self::Properties {
686        &self.properties
687    }
688
689    fn properties_mut(&mut self) -> &mut Self::Properties {
690        &mut self.properties
691    }
692
693    fn agent(&self, id: AgentId) -> Option<Self::AgentRef<'_>> {
694        self.agents.get(id)
695    }
696
697    fn agent_mut(&self, id: AgentId) -> Option<Self::AgentRefMut<'_>> {
698        self.agents.get_mut(id)
699    }
700}