asynchronix/time/
scheduler.rs

1//! Scheduling functions and types.
2
3use std::error::Error;
4use std::fmt;
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use pin_project_lite::pin_project;
14use recycle_box::{coerce_box, RecycleBox};
15
16use crate::channel::{ChannelId, Sender};
17use crate::executor::Executor;
18use crate::model::{InputFn, Model};
19use crate::time::{MonotonicTime, TearableAtomicTime};
20use crate::util::priority_queue::PriorityQueue;
21use crate::util::sync_cell::SyncCellReader;
22
23/// Shorthand for the scheduler queue type.
24pub(crate) type SchedulerQueue = PriorityQueue<(MonotonicTime, ChannelId), Box<dyn ScheduledEvent>>;
25
26/// Trait abstracting over time-absolute and time-relative deadlines.
27///
28/// This trait is implemented by [`std::time::Duration`] and
29/// [`MonotonicTime`].
30pub trait Deadline {
31    /// Make this deadline into an absolute timestamp, using the provided
32    /// current time as a reference.
33    fn into_time(self, now: MonotonicTime) -> MonotonicTime;
34}
35
36impl Deadline for Duration {
37    #[inline(always)]
38    fn into_time(self, now: MonotonicTime) -> MonotonicTime {
39        now + self
40    }
41}
42
43impl Deadline for MonotonicTime {
44    #[inline(always)]
45    fn into_time(self, _: MonotonicTime) -> MonotonicTime {
46        self
47    }
48}
49
50/// A local scheduler for models.
51///
52/// A `Scheduler` is a handle to the global scheduler associated to a model
53/// instance. It can be used by the model to retrieve the simulation time or
54/// schedule delayed actions on itself.
55///
56/// ### Caveat: self-scheduling `async` methods
57///
58/// Due to a current rustc issue, `async` methods that schedule themselves will
59/// not compile unless an explicit `Send` bound is added to the returned future.
60/// This can be done by replacing the `async` signature with a partially
61/// desugared signature such as:
62///
63/// ```ignore
64/// fn self_scheduling_method<'a>(
65///     &'a mut self,
66///     arg: MyEventType,
67///     scheduler: &'a Scheduler<Self>
68/// ) -> impl Future<Output=()> + Send + 'a {
69///     async move {
70///         /* implementation */
71///     }
72/// }
73/// ```
74///
75/// Self-scheduling methods which are not `async` are not affected by this
76/// issue.
77///
78/// # Examples
79///
80/// A model that sends a greeting after some delay.
81///
82/// ```
83/// use std::time::Duration;
84/// use asynchronix::model::{Model, Output}; use asynchronix::time::Scheduler;
85///
86/// #[derive(Default)]
87/// pub struct DelayedGreeter {
88///     msg_out: Output<String>,
89/// }
90///
91/// impl DelayedGreeter {
92///     // Triggers a greeting on the output port after some delay [input port].
93///     pub async fn greet_with_delay(&mut self, delay: Duration, scheduler: &Scheduler<Self>) {
94///         let time = scheduler.time();
95///         let greeting = format!("Hello, this message was scheduled at: {:?}.", time);
96///         
97///         if delay.is_zero() {
98///             self.msg_out.send(greeting).await;
99///         } else {
100///             scheduler.schedule_event(delay, Self::send_msg, greeting).unwrap();
101///         }
102///     }
103///
104///     // Sends a message to the output [private input port].
105///     async fn send_msg(&mut self, msg: String) {
106///         self.msg_out.send(msg).await;
107///     }
108/// }
109/// impl Model for DelayedGreeter {}
110/// ```
111
112// The self-scheduling caveat seems related to this issue:
113// https://github.com/rust-lang/rust/issues/78649
114pub struct Scheduler<M: Model> {
115    sender: Sender<M>,
116    scheduler_queue: Arc<Mutex<SchedulerQueue>>,
117    time: SyncCellReader<TearableAtomicTime>,
118}
119
120impl<M: Model> Scheduler<M> {
121    /// Creates a new local scheduler.
122    pub(crate) fn new(
123        sender: Sender<M>,
124        scheduler_queue: Arc<Mutex<SchedulerQueue>>,
125        time: SyncCellReader<TearableAtomicTime>,
126    ) -> Self {
127        Self {
128            sender,
129            scheduler_queue,
130            time,
131        }
132    }
133
134    /// Returns the current simulation time.
135    ///
136    /// # Examples
137    ///
138    /// ```
139    /// use asynchronix::model::Model;
140    /// use asynchronix::time::{MonotonicTime, Scheduler};
141    ///
142    /// fn is_third_millenium<M: Model>(scheduler: &Scheduler<M>) -> bool {
143    ///     let time = scheduler.time();
144    ///
145    ///     time >= MonotonicTime::new(978307200, 0) && time < MonotonicTime::new(32535216000, 0)
146    /// }
147    /// ```
148    pub fn time(&self) -> MonotonicTime {
149        self.time.try_read().expect("internal simulation error: could not perform a synchronized read of the simulation time")
150    }
151
152    /// Schedules an event at a future time.
153    ///
154    /// An error is returned if the specified deadline is not in the future of
155    /// the current simulation time.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use std::time::Duration;
161    ///
162    /// use asynchronix::model::Model;
163    /// use asynchronix::time::Scheduler;
164    ///
165    /// // A timer.
166    /// pub struct Timer {}
167    ///
168    /// impl Timer {
169    ///     // Sets an alarm [input port].
170    ///     pub fn set(&mut self, setting: Duration, scheduler: &Scheduler<Self>) {
171    ///         if scheduler.schedule_event(setting, Self::ring, ()).is_err() {
172    ///             println!("The alarm clock can only be set for a future time");
173    ///         }
174    ///     }
175    ///
176    ///     // Rings [private input port].
177    ///     fn ring(&mut self) {
178    ///         println!("Brringggg");
179    ///     }
180    /// }
181    ///
182    /// impl Model for Timer {}
183    /// ```
184    pub fn schedule_event<F, T, S>(
185        &self,
186        deadline: impl Deadline,
187        func: F,
188        arg: T,
189    ) -> Result<(), SchedulingError>
190    where
191        F: for<'a> InputFn<'a, M, T, S>,
192        T: Send + Clone + 'static,
193        S: Send + 'static,
194    {
195        let now = self.time();
196        let time = deadline.into_time(now);
197        if now >= time {
198            return Err(SchedulingError::InvalidScheduledTime);
199        }
200        let sender = self.sender.clone();
201        schedule_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
202
203        Ok(())
204    }
205
206    /// Schedules a cancellable event at a future time and returns an event key.
207    ///
208    /// An error is returned if the specified deadline is not in the future of
209    /// the current simulation time.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use asynchronix::model::Model;
215    /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
216    ///
217    /// // An alarm clock that can be cancelled.
218    /// #[derive(Default)]
219    /// pub struct CancellableAlarmClock {
220    ///     event_key: Option<EventKey>,
221    /// }
222    ///
223    /// impl CancellableAlarmClock {
224    ///     // Sets an alarm [input port].
225    ///     pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
226    ///         self.cancel();
227    ///         match scheduler.schedule_keyed_event(setting, Self::ring, ()) {
228    ///             Ok(event_key) => self.event_key = Some(event_key),
229    ///             Err(_) => println!("The alarm clock can only be set for a future time"),
230    ///         };
231    ///     }
232    ///
233    ///     // Cancels the current alarm, if any [input port].
234    ///     pub fn cancel(&mut self) {
235    ///         self.event_key.take().map(|k| k.cancel());
236    ///     }
237    ///
238    ///     // Rings the alarm [private input port].
239    ///     fn ring(&mut self) {
240    ///         println!("Brringggg!");
241    ///     }
242    /// }
243    ///
244    /// impl Model for CancellableAlarmClock {}
245    /// ```
246    pub fn schedule_keyed_event<F, T, S>(
247        &self,
248        deadline: impl Deadline,
249        func: F,
250        arg: T,
251    ) -> Result<EventKey, SchedulingError>
252    where
253        F: for<'a> InputFn<'a, M, T, S>,
254        T: Send + Clone + 'static,
255        S: Send + 'static,
256    {
257        let now = self.time();
258        let time = deadline.into_time(now);
259        if now >= time {
260            return Err(SchedulingError::InvalidScheduledTime);
261        }
262        let sender = self.sender.clone();
263        let event_key =
264            schedule_keyed_event_at_unchecked(time, func, arg, sender, &self.scheduler_queue);
265
266        Ok(event_key)
267    }
268
269    /// Schedules a periodically recurring event at a future time.
270    ///
271    /// An error is returned if the specified deadline is not in the future of
272    /// the current simulation time or if the specified period is null.
273    ///
274    /// # Examples
275    ///
276    /// ```
277    /// use std::time::Duration;
278    ///
279    /// use asynchronix::model::Model;
280    /// use asynchronix::time::{MonotonicTime, Scheduler};
281    ///
282    /// // An alarm clock beeping at 1Hz.
283    /// pub struct BeepingAlarmClock {}
284    ///
285    /// impl BeepingAlarmClock {
286    ///     // Sets an alarm [input port].
287    ///     pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
288    ///         if scheduler.schedule_periodic_event(
289    ///             setting,
290    ///             Duration::from_secs(1), // 1Hz = 1/1s
291    ///             Self::beep,
292    ///             ()
293    ///         ).is_err() {
294    ///             println!("The alarm clock can only be set for a future time");
295    ///         }
296    ///     }
297    ///
298    ///     // Emits a single beep [private input port].
299    ///     fn beep(&mut self) {
300    ///         println!("Beep!");
301    ///     }
302    /// }
303    ///
304    /// impl Model for BeepingAlarmClock {}
305    /// ```
306    pub fn schedule_periodic_event<F, T, S>(
307        &self,
308        deadline: impl Deadline,
309        period: Duration,
310        func: F,
311        arg: T,
312    ) -> Result<(), SchedulingError>
313    where
314        F: for<'a> InputFn<'a, M, T, S> + Clone,
315        T: Send + Clone + 'static,
316        S: Send + 'static,
317    {
318        let now = self.time();
319        let time = deadline.into_time(now);
320        if now >= time {
321            return Err(SchedulingError::InvalidScheduledTime);
322        }
323        if period.is_zero() {
324            return Err(SchedulingError::NullRepetitionPeriod);
325        }
326        let sender = self.sender.clone();
327        schedule_periodic_event_at_unchecked(
328            time,
329            period,
330            func,
331            arg,
332            sender,
333            &self.scheduler_queue,
334        );
335
336        Ok(())
337    }
338
339    /// Schedules a cancellable, periodically recurring event at a future time
340    /// and returns an event key.
341    ///
342    /// An error is returned if the specified deadline is not in the future of
343    /// the current simulation time or if the specified period is null.
344    ///
345    /// # Examples
346    ///
347    /// ```
348    /// use std::time::Duration;
349    ///
350    /// use asynchronix::model::Model;
351    /// use asynchronix::time::{EventKey, MonotonicTime, Scheduler};
352    ///
353    /// // An alarm clock beeping at 1Hz that can be cancelled before it sets off, or
354    /// // stopped after it sets off.
355    /// #[derive(Default)]
356    /// pub struct CancellableBeepingAlarmClock {
357    ///     event_key: Option<EventKey>,
358    /// }
359    ///
360    /// impl CancellableBeepingAlarmClock {
361    ///     // Sets an alarm [input port].
362    ///     pub fn set(&mut self, setting: MonotonicTime, scheduler: &Scheduler<Self>) {
363    ///         self.cancel();
364    ///         match scheduler.schedule_keyed_periodic_event(
365    ///             setting,
366    ///             Duration::from_secs(1), // 1Hz = 1/1s
367    ///             Self::beep,
368    ///             ()
369    ///         ) {
370    ///             Ok(event_key) => self.event_key = Some(event_key),
371    ///             Err(_) => println!("The alarm clock can only be set for a future time"),
372    ///         };
373    ///     }
374    ///
375    ///     // Cancels or stops the alarm [input port].
376    ///     pub fn cancel(&mut self) {
377    ///         self.event_key.take().map(|k| k.cancel());
378    ///     }
379    ///
380    ///     // Emits a single beep [private input port].
381    ///     fn beep(&mut self) {
382    ///         println!("Beep!");
383    ///     }
384    /// }
385    ///
386    /// impl Model for CancellableBeepingAlarmClock {}
387    /// ```
388    pub fn schedule_keyed_periodic_event<F, T, S>(
389        &self,
390        deadline: impl Deadline,
391        period: Duration,
392        func: F,
393        arg: T,
394    ) -> Result<EventKey, SchedulingError>
395    where
396        F: for<'a> InputFn<'a, M, T, S> + Clone,
397        T: Send + Clone + 'static,
398        S: Send + 'static,
399    {
400        let now = self.time();
401        let time = deadline.into_time(now);
402        if now >= time {
403            return Err(SchedulingError::InvalidScheduledTime);
404        }
405        if period.is_zero() {
406            return Err(SchedulingError::NullRepetitionPeriod);
407        }
408        let sender = self.sender.clone();
409        let event_key = schedule_periodic_keyed_event_at_unchecked(
410            time,
411            period,
412            func,
413            arg,
414            sender,
415            &self.scheduler_queue,
416        );
417
418        Ok(event_key)
419    }
420}
421
422impl<M: Model> fmt::Debug for Scheduler<M> {
423    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424        f.debug_struct("Scheduler").finish_non_exhaustive()
425    }
426}
427
428/// Handle to a scheduled event.
429///
430/// An `EventKey` can be used to cancel a future event.
431#[derive(Clone, Debug)]
432#[must_use = "prefer unkeyed scheduling methods if the event is never cancelled"]
433pub struct EventKey {
434    is_cancelled: Arc<AtomicBool>,
435}
436
437impl EventKey {
438    /// Creates a key for a pending event.
439    pub(crate) fn new() -> Self {
440        Self {
441            is_cancelled: Arc::new(AtomicBool::new(false)),
442        }
443    }
444
445    /// Checks whether the event was cancelled.
446    pub(crate) fn is_cancelled(&self) -> bool {
447        self.is_cancelled.load(Ordering::Relaxed)
448    }
449
450    /// Cancels the associated event.
451    pub fn cancel(self) {
452        self.is_cancelled.store(true, Ordering::Relaxed);
453    }
454}
455
456/// Error returned when the scheduled time or the repetition period are invalid.
457#[derive(Debug, PartialEq, Eq, Clone, Copy)]
458pub enum SchedulingError {
459    /// The scheduled time does not lie in the future of the current simulation
460    /// time.
461    InvalidScheduledTime,
462    /// The repetition period is zero.
463    NullRepetitionPeriod,
464}
465
466impl fmt::Display for SchedulingError {
467    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
468        match self {
469            Self::InvalidScheduledTime => write!(
470                fmt,
471                "the scheduled time should be in the future of the current simulation time"
472            ),
473            Self::NullRepetitionPeriod => write!(fmt, "the repetition period cannot be zero"),
474        }
475    }
476}
477
478impl Error for SchedulingError {}
479
480/// Schedules an event at a future time.
481///
482/// This method does not check whether the specified time lies in the future
483/// of the current simulation time.
484pub(crate) fn schedule_event_at_unchecked<M, F, T, S>(
485    time: MonotonicTime,
486    func: F,
487    arg: T,
488    sender: Sender<M>,
489    scheduler_queue: &Mutex<SchedulerQueue>,
490) where
491    M: Model,
492    F: for<'a> InputFn<'a, M, T, S>,
493    T: Send + Clone + 'static,
494    S: Send + 'static,
495{
496    let channel_id = sender.channel_id();
497
498    let event_dispatcher = Box::new(new_event_dispatcher(func, arg, sender));
499
500    let mut scheduler_queue = scheduler_queue.lock().unwrap();
501    scheduler_queue.insert((time, channel_id), event_dispatcher);
502}
503
504/// Schedules an event at a future time, returning an event key.
505///
506/// This method does not check whether the specified time lies in the future
507/// of the current simulation time.
508pub(crate) fn schedule_keyed_event_at_unchecked<M, F, T, S>(
509    time: MonotonicTime,
510    func: F,
511    arg: T,
512    sender: Sender<M>,
513    scheduler_queue: &Mutex<SchedulerQueue>,
514) -> EventKey
515where
516    M: Model,
517    F: for<'a> InputFn<'a, M, T, S>,
518    T: Send + Clone + 'static,
519    S: Send + 'static,
520{
521    let event_key = EventKey::new();
522    let channel_id = sender.channel_id();
523    let event_dispatcher = Box::new(KeyedEventDispatcher::new(
524        event_key.clone(),
525        func,
526        arg,
527        sender,
528    ));
529
530    let mut scheduler_queue = scheduler_queue.lock().unwrap();
531    scheduler_queue.insert((time, channel_id), event_dispatcher);
532
533    event_key
534}
535
536/// Schedules a periodic event at a future time.
537///
538/// This method does not check whether the specified time lies in the future
539/// of the current simulation time.
540pub(crate) fn schedule_periodic_event_at_unchecked<M, F, T, S>(
541    time: MonotonicTime,
542    period: Duration,
543    func: F,
544    arg: T,
545    sender: Sender<M>,
546    scheduler_queue: &Mutex<SchedulerQueue>,
547) where
548    M: Model,
549    F: for<'a> InputFn<'a, M, T, S> + Clone,
550    T: Send + Clone + 'static,
551    S: Send + 'static,
552{
553    let channel_id = sender.channel_id();
554
555    let event_dispatcher = Box::new(PeriodicEventDispatcher::new(func, arg, sender, period));
556
557    let mut scheduler_queue = scheduler_queue.lock().unwrap();
558    scheduler_queue.insert((time, channel_id), event_dispatcher);
559}
560
561/// Schedules an event at a future time, returning an event key.
562///
563/// This method does not check whether the specified time lies in the future
564/// of the current simulation time.
565pub(crate) fn schedule_periodic_keyed_event_at_unchecked<M, F, T, S>(
566    time: MonotonicTime,
567    period: Duration,
568    func: F,
569    arg: T,
570    sender: Sender<M>,
571    scheduler_queue: &Mutex<SchedulerQueue>,
572) -> EventKey
573where
574    M: Model,
575    F: for<'a> InputFn<'a, M, T, S> + Clone,
576    T: Send + Clone + 'static,
577    S: Send + 'static,
578{
579    let event_key = EventKey::new();
580    let channel_id = sender.channel_id();
581    let event_dispatcher = Box::new(PeriodicKeyedEventDispatcher::new(
582        event_key.clone(),
583        func,
584        arg,
585        sender,
586        period,
587    ));
588
589    let mut scheduler_queue = scheduler_queue.lock().unwrap();
590    scheduler_queue.insert((time, channel_id), event_dispatcher);
591
592    event_key
593}
594
595/// Trait for objects that can be converted to a future dispatching a scheduled
596/// event.
597pub(crate) trait ScheduledEvent: Send {
598    /// Reports whether the associated event was cancelled.
599    fn is_cancelled(&self) -> bool;
600
601    /// Returns a boxed clone of this event and the repetition period if this is
602    /// a periodic even, otherwise returns `None`.
603    fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)>;
604
605    /// Returns a boxed future dispatching the associated event.
606    fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
607
608    /// Spawns the future that dispatches the associated event onto the provided
609    /// executor.
610    ///
611    /// This method is typically more efficient that spawning the boxed future
612    /// from `into_future` since it can directly spawn the unboxed future.
613    fn spawn_and_forget(self: Box<Self>, executor: &Executor);
614}
615
616pin_project! {
617    /// Object that can be converted to a future dispatching a non-cancellable
618    /// event.
619    ///
620    /// Note that this particular event dispatcher is in fact already a future:
621    /// since the future cannot be cancelled and the dispatcher does not need to
622    /// be cloned, there is no need to defer the construction of the future.
623    /// This makes `into_future` a trivial cast, which saves a boxing operation.
624    pub(crate) struct EventDispatcher<F> {
625        #[pin]
626        fut: F,
627    }
628}
629
630/// Constructs a new `EventDispatcher`.
631///
632/// Due to some limitations of type inference or of my understanding of it, the
633/// constructor for this event dispatchers is a freestanding function.
634fn new_event_dispatcher<M, F, T, S>(
635    func: F,
636    arg: T,
637    sender: Sender<M>,
638) -> EventDispatcher<impl Future<Output = ()>>
639where
640    M: Model,
641    F: for<'a> InputFn<'a, M, T, S>,
642    T: Send + Clone + 'static,
643{
644    let fut = dispatch_event(func, arg, sender);
645
646    EventDispatcher { fut }
647}
648
649impl<F> Future for EventDispatcher<F>
650where
651    F: Future,
652{
653    type Output = F::Output;
654
655    #[inline(always)]
656    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
657        self.project().fut.poll(cx)
658    }
659}
660
661impl<F> ScheduledEvent for EventDispatcher<F>
662where
663    F: Future<Output = ()> + Send + 'static,
664{
665    fn is_cancelled(&self) -> bool {
666        false
667    }
668    fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
669        None
670    }
671    fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
672        // No need for boxing, type coercion is enough here.
673        Box::into_pin(self)
674    }
675    fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
676        executor.spawn_and_forget(*self);
677    }
678}
679
680/// Object that can be converted to a future dispatching a non-cancellable periodic
681/// event.
682pub(crate) struct PeriodicEventDispatcher<M, F, T, S>
683where
684    M: Model,
685{
686    func: F,
687    arg: T,
688    sender: Sender<M>,
689    period: Duration,
690    _input_kind: PhantomData<S>,
691}
692
693impl<M, F, T, S> PeriodicEventDispatcher<M, F, T, S>
694where
695    M: Model,
696    F: for<'a> InputFn<'a, M, T, S>,
697    T: Send + Clone + 'static,
698{
699    /// Constructs a new `PeriodicEventDispatcher`.
700    fn new(func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
701        Self {
702            func,
703            arg,
704            sender,
705            period,
706            _input_kind: PhantomData,
707        }
708    }
709}
710
711impl<M, F, T, S> ScheduledEvent for PeriodicEventDispatcher<M, F, T, S>
712where
713    M: Model,
714    F: for<'a> InputFn<'a, M, T, S> + Clone,
715    T: Send + Clone + 'static,
716    S: Send + 'static,
717{
718    fn is_cancelled(&self) -> bool {
719        false
720    }
721    fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
722        let event = Box::new(Self::new(
723            self.func.clone(),
724            self.arg.clone(),
725            self.sender.clone(),
726            self.period,
727        ));
728
729        Some((event, self.period))
730    }
731    fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
732        let Self {
733            func, arg, sender, ..
734        } = *self;
735
736        Box::pin(dispatch_event(func, arg, sender))
737    }
738    fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
739        let Self {
740            func, arg, sender, ..
741        } = *self;
742
743        let fut = dispatch_event(func, arg, sender);
744        executor.spawn_and_forget(fut);
745    }
746}
747
748/// Object that can be converted to a future dispatching a cancellable event.
749pub(crate) struct KeyedEventDispatcher<M, F, T, S>
750where
751    M: Model,
752    F: for<'a> InputFn<'a, M, T, S>,
753    T: Send + Clone + 'static,
754{
755    event_key: EventKey,
756    func: F,
757    arg: T,
758    sender: Sender<M>,
759    _input_kind: PhantomData<S>,
760}
761
762impl<M, F, T, S> KeyedEventDispatcher<M, F, T, S>
763where
764    M: Model,
765    F: for<'a> InputFn<'a, M, T, S>,
766    T: Send + Clone + 'static,
767{
768    /// Constructs a new `KeyedEventDispatcher`.
769    fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>) -> Self {
770        Self {
771            event_key,
772            func,
773            arg,
774            sender,
775            _input_kind: PhantomData,
776        }
777    }
778}
779
780impl<M, F, T, S> ScheduledEvent for KeyedEventDispatcher<M, F, T, S>
781where
782    M: Model,
783    F: for<'a> InputFn<'a, M, T, S>,
784    T: Send + Clone + 'static,
785    S: Send + 'static,
786{
787    fn is_cancelled(&self) -> bool {
788        self.event_key.is_cancelled()
789    }
790    fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
791        None
792    }
793    fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
794        let Self {
795            event_key,
796            func,
797            arg,
798            sender,
799            ..
800        } = *self;
801
802        Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
803    }
804    fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
805        let Self {
806            event_key,
807            func,
808            arg,
809            sender,
810            ..
811        } = *self;
812
813        let fut = dispatch_keyed_event(event_key, func, arg, sender);
814        executor.spawn_and_forget(fut);
815    }
816}
817
818/// Object that can be converted to a future dispatching a cancellable event.
819pub(crate) struct PeriodicKeyedEventDispatcher<M, F, T, S>
820where
821    M: Model,
822    F: for<'a> InputFn<'a, M, T, S>,
823    T: Send + Clone + 'static,
824{
825    event_key: EventKey,
826    func: F,
827    arg: T,
828    sender: Sender<M>,
829    period: Duration,
830    _input_kind: PhantomData<S>,
831}
832
833impl<M, F, T, S> PeriodicKeyedEventDispatcher<M, F, T, S>
834where
835    M: Model,
836    F: for<'a> InputFn<'a, M, T, S>,
837    T: Send + Clone + 'static,
838{
839    /// Constructs a new `KeyedEventDispatcher`.
840    fn new(event_key: EventKey, func: F, arg: T, sender: Sender<M>, period: Duration) -> Self {
841        Self {
842            event_key,
843            func,
844            arg,
845            sender,
846            period,
847            _input_kind: PhantomData,
848        }
849    }
850}
851
852impl<M, F, T, S> ScheduledEvent for PeriodicKeyedEventDispatcher<M, F, T, S>
853where
854    M: Model,
855    F: for<'a> InputFn<'a, M, T, S> + Clone,
856    T: Send + Clone + 'static,
857    S: Send + 'static,
858{
859    fn is_cancelled(&self) -> bool {
860        self.event_key.is_cancelled()
861    }
862    fn next(&self) -> Option<(Box<dyn ScheduledEvent>, Duration)> {
863        let event = Box::new(Self::new(
864            self.event_key.clone(),
865            self.func.clone(),
866            self.arg.clone(),
867            self.sender.clone(),
868            self.period,
869        ));
870
871        Some((event, self.period))
872    }
873    fn into_future(self: Box<Self>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
874        let Self {
875            event_key,
876            func,
877            arg,
878            sender,
879            ..
880        } = *self;
881
882        Box::pin(dispatch_keyed_event(event_key, func, arg, sender))
883    }
884    fn spawn_and_forget(self: Box<Self>, executor: &Executor) {
885        let Self {
886            event_key,
887            func,
888            arg,
889            sender,
890            ..
891        } = *self;
892
893        let fut = dispatch_keyed_event(event_key, func, arg, sender);
894        executor.spawn_and_forget(fut);
895    }
896}
897
898/// Asynchronously dispatch a regular, non-cancellable event.
899async fn dispatch_event<M, F, T, S>(func: F, arg: T, sender: Sender<M>)
900where
901    M: Model,
902    F: for<'a> InputFn<'a, M, T, S>,
903    T: Send + Clone + 'static,
904{
905    let _ = sender
906        .send(
907            move |model: &mut M,
908                  scheduler,
909                  recycle_box: RecycleBox<()>|
910                  -> RecycleBox<dyn Future<Output = ()> + Send + '_> {
911                let fut = func.call(model, arg, scheduler);
912
913                coerce_box!(RecycleBox::recycle(recycle_box, fut))
914            },
915        )
916        .await;
917}
918
919/// Asynchronously dispatch a cancellable event.
920async fn dispatch_keyed_event<M, F, T, S>(event_key: EventKey, func: F, arg: T, sender: Sender<M>)
921where
922    M: Model,
923    F: for<'a> InputFn<'a, M, T, S>,
924    T: Send + Clone + 'static,
925{
926    let _ = sender
927        .send(
928            move |model: &mut M,
929                  scheduler,
930                  recycle_box: RecycleBox<()>|
931                  -> RecycleBox<dyn Future<Output = ()> + Send + '_> {
932                let fut = async move {
933                    // Only perform the call if the event wasn't cancelled.
934                    if !event_key.is_cancelled() {
935                        func.call(model, arg, scheduler).await;
936                    }
937                };
938
939                coerce_box!(RecycleBox::recycle(recycle_box, fut))
940            },
941        )
942        .await;
943}