1use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use pin_project_lite::pin_project;
14use recycle_box::{coerce_box, RecycleBox};
15
16use crate::channel::{ChannelId, Sender};
17use crate::executor::Executor;
18use crate::model::{InputFn, Model};
19use crate::time::{MonotonicTime, TearableAtomicTime};
20use crate::util::priority_queue::PriorityQueue;
21use crate::util::sync_cell::SyncCellReader;
22
23pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box<dyn ScheduledEvent>>;
25
26pub trait Deadline {
31 fn into_time(self, now: MonotonicTime) -> MonotonicTime;
34}
35
36impl Deadline for Duration {
37 #[inline(always)]
38 fn into_time(self, now: MonotonicTime) -> MonotonicTime {
39 now + self
40 }
41}
42
43impl Deadline for MonotonicTime {
44 #[inline(always)]
45 fn into_time(self, _: MonotonicTime) -> MonotonicTime {
46 self
47 }
48}
49
50pub struct Scheduler<M: Model> {
115 sender: Sender<M>,
116 scheduler_queue: Arc<Mutex<SchedulerQueue>>,
117 time: SyncCellReader<TearableAtomicTime>,
118}
119
120impl<M: Model> Scheduler<M> {
121 pub(crate) fn new(
123 sender: Sender<M>,
124 scheduler_queue: Arc<Mutex<SchedulerQueue>>,
125 time: SyncCellReader<TearableAtomicTime>,
126 ) -> Self {
127 Self {
128 sender,
129 scheduler_queue,
130 time,
131 }
132 }
133
134 pub fn time(&self) -> MonotonicTime {
149 self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
150 }
151
152 pub fn schedule_event<F, T, S>(
185 &self,
186 deadline: impl Deadline,
187 func: F,
188 arg: T,
189 ) -> Result<(), SchedulingError>
190 where
191 F: for<'a> InputFn<'a, M, T, S>,
192 T: Send + Clone + 'static,
193 S: Send + 'static,
194 {
195 let now = self.time();
196 let time = deadline.into_time(now);
197 if now >= time {
198 return Err(SchedulingError::InvalidScheduledTime);
199 }
200 let sender = self.sender.clone();
201 schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
202
203 Ok(())
204 }
205
206 pub fn schedule_keyed_event<F, T, S>(
247 &self,
248 deadline: impl Deadline,
249 func: F,
250 arg: T,
251 ) -> Result<EventKey, SchedulingError>
252 where
253 F: for<'a> InputFn<'a, M, T, S>,
254 T: Send + Clone + 'static,
255 S: Send + 'static,
256 {
257 let now = self.time();
258 let time = deadline.into_time(now);
259 if now >= time {
260 return Err(SchedulingError::InvalidScheduledTime);
261 }
262 let sender = self.sender.clone();
263 let event_key =
264 schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
265
266 Ok(event_key)
267 }
268
269 pub fn schedule_periodic_event<F, T, S>(
307 &self,
308 deadline: impl Deadline,
309 period: Duration,
310 func: F,
311 arg: T,
312 ) -> Result<(), SchedulingError>
313 where
314 F: for<'a> InputFn<'a, M, T, S> + Clone,
315 T: Send + Clone + 'static,
316 S: Send + 'static,
317 {
318 let now = self.time();
319 let time = deadline.into_time(now);
320 if now >= time {
321 return Err(SchedulingError::InvalidScheduledTime);
322 }
323 if period.is_zero() {
324 return Err(SchedulingError::NullRepetitionPeriod);
325 }
326 let sender = self.sender.clone();
327 schedule_periodic_event_at_unchecked(
328 time,
329 period,
330 func,
331 arg,
332 sender,
333 &self.scheduler_queue,
334 );
335
336 Ok(())
337 }
338
339 pub fn schedule_keyed_periodic_event<F, T, S>(
389 &self,
390 deadline: impl Deadline,
391 period: Duration,
392 func: F,
393 arg: T,
394 ) -> Result<EventKey, SchedulingError>
395 where
396 F: for<'a> InputFn<'a, M, T, S> + Clone,
397 T: Send + Clone + 'static,
398 S: Send + 'static,
399 {
400 let now = self.time();
401 let time = deadline.into_time(now);
402 if now >= time {
403 return Err(SchedulingError::InvalidScheduledTime);
404 }
405 if period.is_zero() {
406 return Err(SchedulingError::NullRepetitionPeriod);
407 }
408 let sender = self.sender.clone();
409 let event_key = schedule_periodic_keyed_event_at_unchecked(
410 time,
411 period,
412 func,
413 arg,
414 sender,
415 &self.scheduler_queue,
416 );
417
418 Ok(event_key)
419 }
420}
421
422impl<M: Model> fmt::Debug for Scheduler<M> {
423 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424 f.debug_struct("Scheduler").finish_non_exhaustive()
425 }
426}
427
428#[derive(Clone, Debug)]
432#[must_use = "prefer unkeyed scheduling methods if the event is never cancelled"]
433pub struct EventKey {
434 is_cancelled: Arc<AtomicBool>,
435}
436
437impl EventKey {
438 pub(crate) fn new() -> Self {
440 Self {
441 is_cancelled: Arc::new(AtomicBool::new(false)),
442 }
443 }
444
445 pub(crate) fn is_cancelled(&self) -> bool {
447 self.is_cancelled.load(Ordering::Relaxed)
448 }
449
450 pub fn cancel(self) {
452 self.is_cancelled.store(true, Ordering::Relaxed);
453 }
454}
455
456#[derive(Debug, PartialEq, Eq, Clone, Copy)]
458pub enum SchedulingError {
459 InvalidScheduledTime,
462 NullRepetitionPeriod,
464}
465
466impl fmt::Display for SchedulingError {
467 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
468 match self {
469 Self::InvalidScheduledTime => write!(
470 fmt,
471 "the scheduled time should be in the future of the current simulation time"
472 ),
473 Self::NullRepetitionPeriod => write!(fmt, "the repetition period cannot be zero"),
474 }
475 }
476}
477
478impl Error for SchedulingError {}
479
480pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
485 time: MonotonicTime,
486 func: F,
487 arg: T,
488 sender: Sender<M>,
489 scheduler_queue: &Mutex<SchedulerQueue>,
490) where
491 M: Model,
492 F: for<'a> InputFn<'a, M, T, S>,
493 T: Send + Clone + 'static,
494 S: Send + 'static,
495{
496 let channel_id = sender.channel_id();
497
498 let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender));
499
500 let mut scheduler_queue = scheduler_queue.lock().unwrap();
501 scheduler_queue.insert((time, channel_id), event_dispatcher);
502}
503
504pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
509 time: MonotonicTime,
510 func: F,
511 arg: T,
512 sender: Sender<M>,
513 scheduler_queue: &Mutex<SchedulerQueue>,
514) -> EventKey
515where
516 M: Model,
517 F: for<'a> InputFn<'a, M, T, S>,
518 T: Send + Clone + 'static,
519 S: Send + 'static,
520{
521 let event_key = EventKey::new();
522 let channel_id = sender.channel_id();
523 let event_dispatcher = Box::new(KeyedEventDispatcher::new(
524 event_key.clone(),
525 func,
526 arg,
527 sender,
528 ));
529
530 let mut scheduler_queue = scheduler_queue.lock().unwrap();
531 scheduler_queue.insert((time, channel_id), event_dispatcher);
532
533 event_key
534}
535
536pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
541 time: MonotonicTime,
542 period: Duration,
543 func: F,
544 arg: T,
545 sender: Sender<M>,
546 scheduler_queue: &Mutex<SchedulerQueue>,
547) where
548 M: Model,
549 F: for<'a> InputFn<'a, M, T, S> + Clone,
550 T: Send + Clone + 'static,
551 S: Send + 'static,
552{
553 let channel_id = sender.channel_id();
554
555 let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period));
556
557 let mut scheduler_queue = scheduler_queue.lock().unwrap();
558 scheduler_queue.insert((time, channel_id), event_dispatcher);
559}
560
561pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>(
566 time: MonotonicTime,
567 period: Duration,
568 func: F,
569 arg: T,
570 sender: Sender<M>,
571 scheduler_queue: &Mutex<SchedulerQueue>,
572) -> EventKey
573where
574 M: Model,
575 F: for<'a> InputFn<'a, M, T, S> + Clone,
576 T: Send + Clone + 'static,
577 S: Send + 'static,
578{
579 let event_key = EventKey::new();
580 let channel_id = sender.channel_id();
581 let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new(
582 event_key.clone(),
583 func,
584 arg,
585 sender,
586 period,
587 ));
588
589 let mut scheduler_queue = scheduler_queue.lock().unwrap();
590 scheduler_queue.insert((time, channel_id), event_dispatcher);
591
592 event_key
593}
594
595pub(crate) trait ScheduledEvent: Send {
598 fn is_cancelled(&self) -> bool;
600
601 fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)>;
604
605 fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
607
608 fn spawn_and_forget(self: Box<Self>, executor: &Executor);
614}
615
616pin_project! {
617 pub(crate) struct EventDispatcher<F> {
625 #[pin]
626 fut: F,
627 }
628}
629
630fn new_event_dispatcher<M, F, T, S>(
635 func: F,
636 arg: T,
637 sender: Sender<M>,
638) -> EventDispatcher<impl Future<Output = ()>>
639where
640 M: Model,
641 F: for<'a> InputFn<'a, M, T, S>,
642 T: Send + Clone + 'static,
643{
644 let fut = dispatch_event(func, arg, sender);
645
646 EventDispatcher { fut }
647}
648
649impl<F> Future for EventDispatcher<F>
650where
651 F: Future,
652{
653 type Output = F::Output;
654
655 #[inline(always)]
656 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
657 self.project().fut.poll(cx)
658 }
659}
660
661impl<F> ScheduledEvent for EventDispatcher<F>
662where
663 F: Future<Output = ()> + Send + 'static,
664{
665 fn is_cancelled(&self) -> bool {
666 false
667 }
668 fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
669 None
670 }
671 fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
672 Box::into_pin(self)
674 }
675 fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
676 executor.spawn_and_forget(*self);
677 }
678}
679
680pub(crate) struct PeriodicEventDispatcher<M, F, T, S>
683where
684 M: Model,
685{
686 func: F,
687 arg: T,
688 sender: Sender<M>,
689 period: Duration,
690 _input_kind: PhantomData<S>,
691}
692
693impl<M, F, T, S> PeriodicEventDispatcher<M, F, T, S>
694where
695 M: Model,
696 F: for<'a> InputFn<'a, M, T, S>,
697 T: Send + Clone + 'static,
698{
699 fn new(func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
701 Self {
702 func,
703 arg,
704 sender,
705 period,
706 _input_kind: PhantomData,
707 }
708 }
709}
710
711impl<M, F, T, S> ScheduledEvent for PeriodicEventDispatcher<M, F, T, S>
712where
713 M: Model,
714 F: for<'a> InputFn<'a, M, T, S> + Clone,
715 T: Send + Clone + 'static,
716 S: Send + 'static,
717{
718 fn is_cancelled(&self) -> bool {
719 false
720 }
721 fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
722 let event = Box::new(Self::new(
723 self.func.clone(),
724 self.arg.clone(),
725 self.sender.clone(),
726 self.period,
727 ));
728
729 Some((event, self.period))
730 }
731 fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
732 let Self {
733 func, arg, sender, ..
734 } = *self;
735
736 Box::pin(dispatch_event(func, arg, sender))
737 }
738 fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
739 let Self {
740 func, arg, sender, ..
741 } = *self;
742
743 let fut = dispatch_event(func, arg, sender);
744 executor.spawn_and_forget(fut);
745 }
746}
747
748pub(crate) struct KeyedEventDispatcher<M, F, T, S>
750where
751 M: Model,
752 F: for<'a> InputFn<'a, M, T, S>,
753 T: Send + Clone + 'static,
754{
755 event_key: EventKey,
756 func: F,
757 arg: T,
758 sender: Sender<M>,
759 _input_kind: PhantomData<S>,
760}
761
762impl<M, F, T, S> KeyedEventDispatcher<M, F, T, S>
763where
764 M: Model,
765 F: for<'a> InputFn<'a, M, T, S>,
766 T: Send + Clone + 'static,
767{
768 fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>) -> Self {
770 Self {
771 event_key,
772 func,
773 arg,
774 sender,
775 _input_kind: PhantomData,
776 }
777 }
778}
779
780impl<M, F, T, S> ScheduledEvent for KeyedEventDispatcher<M, F, T, S>
781where
782 M: Model,
783 F: for<'a> InputFn<'a, M, T, S>,
784 T: Send + Clone + 'static,
785 S: Send + 'static,
786{
787 fn is_cancelled(&self) -> bool {
788 self.event_key.is_cancelled()
789 }
790 fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
791 None
792 }
793 fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
794 let Self {
795 event_key,
796 func,
797 arg,
798 sender,
799 ..
800 } = *self;
801
802 Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
803 }
804 fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
805 let Self {
806 event_key,
807 func,
808 arg,
809 sender,
810 ..
811 } = *self;
812
813 let fut = dispatch_keyed_event(event_key, func, arg, sender);
814 executor.spawn_and_forget(fut);
815 }
816}
817
818pub(crate) struct PeriodicKeyedEventDispatcher<M, F, T, S>
820where
821 M: Model,
822 F: for<'a> InputFn<'a, M, T, S>,
823 T: Send + Clone + 'static,
824{
825 event_key: EventKey,
826 func: F,
827 arg: T,
828 sender: Sender<M>,
829 period: Duration,
830 _input_kind: PhantomData<S>,
831}
832
833impl<M, F, T, S> PeriodicKeyedEventDispatcher<M, F, T, S>
834where
835 M: Model,
836 F: for<'a> InputFn<'a, M, T, S>,
837 T: Send + Clone + 'static,
838{
839 fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
841 Self {
842 event_key,
843 func,
844 arg,
845 sender,
846 period,
847 _input_kind: PhantomData,
848 }
849 }
850}
851
852impl<M, F, T, S> ScheduledEvent for PeriodicKeyedEventDispatcher<M, F, T, S>
853where
854 M: Model,
855 F: for<'a> InputFn<'a, M, T, S> + Clone,
856 T: Send + Clone + 'static,
857 S: Send + 'static,
858{
859 fn is_cancelled(&self) -> bool {
860 self.event_key.is_cancelled()
861 }
862 fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
863 let event = Box::new(Self::new(
864 self.event_key.clone(),
865 self.func.clone(),
866 self.arg.clone(),
867 self.sender.clone(),
868 self.period,
869 ));
870
871 Some((event, self.period))
872 }
873 fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
874 let Self {
875 event_key,
876 func,
877 arg,
878 sender,
879 ..
880 } = *self;
881
882 Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
883 }
884 fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
885 let Self {
886 event_key,
887 func,
888 arg,
889 sender,
890 ..
891 } = *self;
892
893 let fut = dispatch_keyed_event(event_key, func, arg, sender);
894 executor.spawn_and_forget(fut);
895 }
896}
897
898async fn dispatch_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>)
900where
901 M: Model,
902 F: for<'a> InputFn<'a, M, T, S>,
903 T: Send + Clone + 'static,
904{
905 let _ = sender
906 .send(
907 move |model: &mut M,
908 scheduler,
909 recycle_box: RecycleBox<()>|
910 -> RecycleBox<dyn Future<Output = ()> + Send + '_> {
911 let fut = func.call(model, arg, scheduler);
912
913 coerce_box!(RecycleBox::recycle(recycle_box, fut))
914 },
915 )
916 .await;
917}
918
919async fn dispatch_keyed_event<M, F, T, S>(event_key: EventKey, func: F, arg: T, sender: Sender<M>)
921where
922 M: Model,
923 F: for<'a> InputFn<'a, M, T, S>,
924 T: Send + Clone + 'static,
925{
926 let _ = sender
927 .send(
928 move |model: &mut M,
929 scheduler,
930 recycle_box: RecycleBox<()>|
931 -> RecycleBox<dyn Future<Output = ()> + Send + '_> {
932 let fut = async move {
933 if !event_key.is_cancelled() {
935 func.call(model, arg, scheduler).await;
936 }
937 };
938
939 coerce_box!(RecycleBox::recycle(recycle_box, fut))
940 },
941 )
942 .await;
943}