1#![allow(clippy::type_complexity)]
35
36use crate::{
37 agent::Agent,
38 interaction::{InteractionError, PositionedAgent, SpaceInteraction},
39 model::Model,
40 space::Space,
41 step_context::DeferredAction,
42 store::AgentStore,
43 types::{AgentId, Time},
44};
45use rand::RngCore;
46use std::cell::{Ref, RefMut};
47use std::cmp::Ordering;
48use std::collections::BinaryHeap;
49use tracing::{debug, trace};
50
51#[derive(Debug, Clone)]
53pub struct Event {
54 pub time: f64,
56 pub agent_id: AgentId,
58 pub event_idx: usize,
60 sequence: u64,
62}
63
64impl<S, A, Store, Props, R> EventQueueModel<S, A, Store, Props, R>
65where
66 A: PositionedAgent,
67 S: SpaceInteraction<A>,
68 Store: AgentStore<A>,
69 R: RngCore,
70{
71 pub fn insert_positioned_agent(&mut self, agent: A) -> Result<(), InteractionError<S::Error>> {
73 let id = agent.id();
74 if self.agents.contains(id) {
75 return Err(InteractionError::DuplicateId(id));
76 }
77 self.space
78 .add_agent(&agent)
79 .map_err(InteractionError::Space)?;
80 self.agents.insert(agent);
81 if id > self.max_id {
82 self.max_id = id;
83 }
84 Ok(())
85 }
86
87 pub fn remove_positioned_agent(
89 &mut self,
90 id: AgentId,
91 ) -> Result<Option<A>, InteractionError<S::Error>> {
92 let Some(agent_ref) = self.agents.get(id) else {
93 return Ok(None);
94 };
95 self.space
96 .remove_agent(&*agent_ref)
97 .map_err(InteractionError::Space)?;
98 drop(agent_ref);
99 Ok(self.agents.remove(id))
100 }
101
102 pub fn move_positioned_agent(
104 &mut self,
105 id: AgentId,
106 new_position: A::Position,
107 ) -> Result<(), InteractionError<S::Error>> {
108 let mut agent_ref = self
109 .agents
110 .get_mut(id)
111 .ok_or(InteractionError::AgentNotFound(id))?;
112 let old_position = agent_ref.position().clone();
113
114 self.space
115 .remove_agent(&*agent_ref)
116 .map_err(InteractionError::Space)?;
117 agent_ref.set_position(new_position);
118
119 if let Err(source) = self.space.add_agent(&*agent_ref) {
120 agent_ref.set_position(old_position);
121 if let Err(rollback) = self.space.add_agent(&*agent_ref) {
122 return Err(InteractionError::RollbackFailed {
123 operation: "move_positioned_agent",
124 source,
125 rollback,
126 });
127 }
128 return Err(InteractionError::Space(source));
129 }
130 Ok(())
131 }
132
133 pub fn validate_space_index(&self) -> Result<(), InteractionError<S::Error>> {
135 for id in self.agents.iter_ids() {
136 let Some(agent) = self.agents.get(id) else {
137 continue;
138 };
139 let matches = self
140 .space
141 .nearby_ids(agent.position(), 0)
142 .into_iter()
143 .filter(|candidate| *candidate == id)
144 .count();
145 match matches {
146 0 => return Err(InteractionError::SpaceIndexMissing(id)),
147 1 => {}
148 _ => return Err(InteractionError::SpaceIndexDuplicate(id)),
149 }
150 }
151 Ok(())
152 }
153
154 pub fn step_event_spatial(&mut self) -> Result<bool, InteractionError<S::Error>> {
157 let timed = match self.queue.pop() {
158 Some(te) => te,
159 None => {
160 trace!("step_event_spatial: queue empty");
161 return Ok(false);
162 }
163 };
164
165 let event = timed.0;
166 self.time = event.time;
167
168 if !self.agents.contains(event.agent_id) {
169 trace!(
170 agent_id = event.agent_id,
171 time = event.time,
172 "skipping event for removed agent"
173 );
174 return Ok(true);
175 }
176
177 if event.event_idx < self.actions.len() {
178 let action = self.actions[event.event_idx];
179
180 let Some(mut agent_ref) = self.agents.get_mut(event.agent_id) else {
181 return Ok(true);
182 };
183
184 let mut rng = self.rng.borrow_mut();
185 let mut deferred: Vec<DeferredAction<A>> = Vec::new();
186
187 {
188 let mut ctx = EventContext {
189 space: &mut self.space,
190 properties: &mut self.properties,
191 rng: &mut *rng,
192 queue: &mut self.queue,
193 sequence: &mut self.sequence,
194 time: self.time,
195 deferred: &mut deferred,
196 };
197
198 action(&mut *agent_ref, &mut ctx);
199 }
200
201 drop(agent_ref);
202 drop(rng);
203
204 self.apply_deferred_actions_spatial(deferred)?;
205 }
206
207 Ok(true)
208 }
209
210 pub fn step_until_spatial(&mut self, t_end: f64) -> Result<(), InteractionError<S::Error>> {
213 loop {
214 match self.queue.peek() {
215 Some(te) if te.0.time <= t_end => {}
216 _ => {
217 self.time = t_end;
218 debug!(
219 time = t_end,
220 queue_len = self.queue.len(),
221 "step_until_spatial reached boundary"
222 );
223 return Ok(());
224 }
225 }
226 self.step_event_spatial()?;
227 }
228 }
229
230 pub fn run_events_spatial(&mut self, n: usize) -> Result<(), InteractionError<S::Error>> {
232 for _ in 0..n {
233 if !self.step_event_spatial()? {
234 break;
235 }
236 }
237 Ok(())
238 }
239
240 fn apply_deferred_actions_spatial(
241 &mut self,
242 deferred: Vec<DeferredAction<A>>,
243 ) -> Result<(), InteractionError<S::Error>> {
244 for action in deferred {
245 match action {
246 DeferredAction::RemoveAgent(id) => {
247 self.remove_positioned_agent(id)?;
248 }
249 DeferredAction::InsertAgent(agent) => {
250 self.insert_positioned_agent(agent)?;
251 }
252 }
253 }
254 Ok(())
255 }
256}
257
258#[derive(Debug, Clone)]
260pub(crate) struct TimedEvent(Event);
261
262impl PartialEq for TimedEvent {
263 fn eq(&self, other: &Self) -> bool {
264 self.0.time == other.0.time && self.0.sequence == other.0.sequence
265 }
266}
267
268impl Eq for TimedEvent {}
269
270impl PartialOrd for TimedEvent {
271 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
272 Some(self.cmp(other))
273 }
274}
275
276impl Ord for TimedEvent {
277 fn cmp(&self, other: &Self) -> Ordering {
278 other
279 .0
280 .time
281 .partial_cmp(&self.0.time)
282 .unwrap_or(Ordering::Equal)
283 .then_with(|| other.0.sequence.cmp(&self.0.sequence))
284 }
285}
286
287pub struct EventContext<'a, S, A, Props, R>
292where
293 A: Agent,
294{
295 pub(crate) space: &'a mut S,
296 pub(crate) properties: &'a mut Props,
297 pub(crate) rng: &'a mut R,
298 pub(crate) queue: &'a mut BinaryHeap<TimedEvent>,
299 pub(crate) sequence: &'a mut u64,
300 pub(crate) time: f64,
301 pub(crate) deferred: &'a mut Vec<DeferredAction<A>>,
302}
303
304impl<'a, S, A, Props, R> EventContext<'a, S, A, Props, R>
305where
306 A: Agent,
307{
308 pub fn space(&self) -> &S {
310 self.space
311 }
312
313 pub fn space_mut(&mut self) -> &mut S {
315 self.space
316 }
317
318 pub fn properties(&self) -> &Props {
320 self.properties
321 }
322
323 pub fn properties_mut(&mut self) -> &mut Props {
325 self.properties
326 }
327
328 pub fn rng(&mut self) -> &mut R {
330 self.rng
331 }
332
333 pub fn time(&self) -> f64 {
335 self.time
336 }
337
338 pub fn add_event(&mut self, agent_id: AgentId, event_idx: usize, dt: f64) {
344 assert!(
345 dt.is_finite() && dt >= 0.0,
346 "event dt must be finite and non-negative, got {dt}"
347 );
348 *self.sequence += 1;
349 let event = Event {
350 time: self.time + dt,
351 agent_id,
352 event_idx,
353 sequence: *self.sequence,
354 };
355 self.queue.push(TimedEvent(event));
356 }
357
358 pub fn defer_remove_agent(&mut self, id: AgentId) {
360 self.deferred.push(DeferredAction::RemoveAgent(id));
361 }
362
363 pub fn defer_insert_agent(&mut self, agent: A) {
365 self.deferred.push(DeferredAction::InsertAgent(agent));
366 }
367}
368
369pub struct EventQueueModel<S, A, Store, Props, R>
391where
392 A: Agent,
393 S: Space,
394 Store: AgentStore<A>,
395 R: RngCore,
396{
397 pub(crate) agents: Store,
398 pub(crate) space: S,
399 pub(crate) properties: Props,
400 pub(crate) rng: std::cell::RefCell<R>,
401 pub(crate) time: f64,
402 pub(crate) max_id: AgentId,
403 pub(crate) queue: BinaryHeap<TimedEvent>,
404 pub(crate) sequence: u64,
405 pub(crate) actions: Vec<fn(&mut A, &mut EventContext<'_, S, A, Props, R>)>,
406 pub(crate) _agent: std::marker::PhantomData<A>,
407}
408
409impl<S, A, Store, Props, R> EventQueueModel<S, A, Store, Props, R>
410where
411 A: Agent,
412 S: Space,
413 Store: AgentStore<A>,
414 R: RngCore,
415{
416 pub fn new(
426 agents: Store,
427 space: S,
428 properties: Props,
429 rng: R,
430 actions: Vec<fn(&mut A, &mut EventContext<'_, S, A, Props, R>)>,
431 ) -> Self {
432 let max_id = agents.iter_ids().into_iter().max().unwrap_or(0);
433 Self {
434 agents,
435 space,
436 properties,
437 rng: std::cell::RefCell::new(rng),
438 time: 0.0,
439 max_id,
440 queue: BinaryHeap::new(),
441 sequence: 0,
442 actions,
443 _agent: std::marker::PhantomData,
444 }
445 }
446
447 pub fn time_f64(&self) -> f64 {
449 self.time
450 }
451
452 pub fn rng_mut(&self) -> std::cell::RefMut<'_, R> {
454 self.rng.borrow_mut()
455 }
456
457 pub fn space(&self) -> &S {
459 &self.space
460 }
461
462 pub fn space_mut(&mut self) -> &mut S {
464 &mut self.space
465 }
466
467 pub fn properties(&self) -> &Props {
469 &self.properties
470 }
471
472 pub fn properties_mut(&mut self) -> &mut Props {
474 &mut self.properties
475 }
476
477 pub fn agent(&self, id: AgentId) -> Option<Ref<'_, A>> {
479 self.agents.get(id)
480 }
481
482 pub fn agent_mut(&self, id: AgentId) -> Option<RefMut<'_, A>> {
484 self.agents.get_mut(id)
485 }
486
487 pub fn insert_agent(&mut self, agent: A) -> Result<(), A> {
491 let id = agent.id();
492 if self.agents.get(id).is_some() {
493 return Err(agent);
494 }
495 self.agents.insert(agent);
496 if id > self.max_id {
497 self.max_id = id;
498 }
499 Ok(())
500 }
501
502 pub fn remove_agent(&mut self, id: AgentId) -> Option<A> {
506 self.agents.remove(id)
507 }
508
509 pub fn next_id(&mut self) -> AgentId {
511 self.max_id += 1;
512 self.max_id
513 }
514
515 pub fn add_event(&mut self, agent_id: AgentId, event_idx: usize, dt: f64) {
521 assert!(
522 dt.is_finite() && dt >= 0.0,
523 "event dt must be finite and non-negative, got {dt}"
524 );
525 self.sequence += 1;
526 let event = Event {
527 time: self.time + dt,
528 agent_id,
529 event_idx,
530 sequence: self.sequence,
531 };
532 self.queue.push(TimedEvent(event));
533 }
534
535 pub fn queue_len(&self) -> usize {
537 self.queue.len()
538 }
539
540 pub fn queue_is_empty(&self) -> bool {
542 self.queue.is_empty()
543 }
544
545 pub fn peek_time(&self) -> Option<f64> {
547 self.queue.peek().map(|te| te.0.time)
548 }
549
550 pub fn step_event(&mut self) -> bool {
555 let timed = match self.queue.pop() {
556 Some(te) => te,
557 None => {
558 trace!("step_event: queue empty");
559 return false;
560 }
561 };
562
563 let event = timed.0;
564 self.time = event.time;
565
566 if !self.agents.contains(event.agent_id) {
567 trace!(
568 agent_id = event.agent_id,
569 time = event.time,
570 "skipping event for removed agent"
571 );
572 return true;
573 }
574
575 if event.event_idx < self.actions.len() {
576 let action = self.actions[event.event_idx];
577
578 let Some(mut agent_ref) = self.agents.get_mut(event.agent_id) else {
579 return true;
580 };
581
582 let mut rng = self.rng.borrow_mut();
583 let mut deferred: Vec<DeferredAction<A>> = Vec::new();
584
585 {
586 let mut ctx = EventContext {
587 space: &mut self.space,
588 properties: &mut self.properties,
589 rng: &mut *rng,
590 queue: &mut self.queue,
591 sequence: &mut self.sequence,
592 time: self.time,
593 deferred: &mut deferred,
594 };
595
596 action(&mut *agent_ref, &mut ctx);
597 }
598
599 drop(agent_ref);
600 drop(rng);
601
602 for action in deferred {
603 match action {
604 DeferredAction::RemoveAgent(id) => {
605 self.remove_agent(id);
606 }
607 DeferredAction::InsertAgent(agent) => {
608 let _ = self.insert_agent(agent);
609 }
610 }
611 }
612 }
613
614 true
615 }
616
617 pub fn step_until(&mut self, t_end: f64) {
622 loop {
623 match self.queue.peek() {
624 Some(te) if te.0.time <= t_end => {}
625 _ => {
626 self.time = t_end;
627 debug!(
628 time = t_end,
629 queue_len = self.queue.len(),
630 "step_until reached boundary"
631 );
632 return;
633 }
634 }
635 self.step_event();
636 }
637 }
638
639 pub fn run_events(&mut self, n: usize) {
643 for _ in 0..n {
644 if !self.step_event() {
645 break;
646 }
647 }
648 }
649}
650
651impl<S, A, Store, Props, R> Model for EventQueueModel<S, A, Store, Props, R>
652where
653 A: Agent,
654 S: Space,
655 Store: AgentStore<A>,
656 R: RngCore,
657{
658 type Agent = A;
659 type Space = S;
660 type Properties = Props;
661 type Rng = R;
662
663 type AgentRef<'a>
665 = Ref<'a, A>
666 where
667 Self: 'a;
668 type AgentRefMut<'a>
669 = RefMut<'a, A>
670 where
671 Self: 'a;
672
673 fn time(&self) -> Time {
674 Time::Continuous(self.time)
675 }
676
677 fn rng_mut(&self) -> impl std::ops::DerefMut<Target = Self::Rng> + '_ {
678 self.rng.borrow_mut()
679 }
680
681 fn space(&self) -> &Self::Space {
682 &self.space
683 }
684
685 fn properties(&self) -> &Self::Properties {
686 &self.properties
687 }
688
689 fn properties_mut(&mut self) -> &mut Self::Properties {
690 &mut self.properties
691 }
692
693 fn agent(&self, id: AgentId) -> Option<Self::AgentRef<'_>> {
694 self.agents.get(id)
695 }
696
697 fn agent_mut(&self, id: AgentId) -> Option<Self::AgentRefMut<'_>> {
698 self.agents.get_mut(id)
699 }
700}