mm1_timer/
tokio_time.rs

1use std::hash::Hash;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::FutureExt;
6use mm1_address::address::Address;
7use mm1_core::context::{Fork, Messaging, Now, Quit, Tell};
8use mm1_proto::Message;
9use mm1_proto_timer as t;
10use mm1_proto_timer::Timer;
11
12use crate::api::{TimerApi, TimerError};
13
14pub struct TokioTimer<K, M> {
15    _key: PhantomData<K>,
16    _msg: PhantomData<M>,
17}
18
19impl<K, M> Timer for TokioTimer<K, M>
20where
21    K: Message + Hash + Ord + Clone + Send + 'static,
22    M: Message,
23{
24    type Duration = Duration;
25    type Instant = tokio::time::Instant;
26    type Key = K;
27    type Message = M;
28
29    async fn sleep(d: Self::Duration) {
30        tokio::time::sleep(d).await;
31    }
32}
33
34#[cfg(feature = "tokio-time")]
35pub async fn new_tokio_timer<K, M, Ctx>(
36    ctx: &mut Ctx,
37) -> Result<
38    impl TimerApi<
39        Key = K,
40        Message = M,
41        Instant = tokio::time::Instant,
42        Duration = tokio::time::Duration,
43    >,
44    TimerError,
45>
46where
47    Ctx: Quit + Fork + Messaging + Now<Instant = tokio::time::Instant>,
48    crate::tokio_time::TokioTimer<K, M>: t::Timer<
49            Instant = tokio::time::Instant,
50            Duration = std::time::Duration,
51            Key = K,
52            Message = M,
53        >,
54    K: Message + Hash + Ord + Clone + Send + Sync + 'static,
55    M: Message,
56{
57    TimerApiImpl::<K, M, Ctx, crate::tokio_time::TokioTimer<K, M>>::new(ctx).await
58}
59
60struct TimerApiImpl<Key, Msg, Ctx, T> {
61    api_context:   Ctx,
62    timer_address: Address,
63    _pd:           PhantomData<(Key, Msg, T)>,
64}
65
66impl<Ctx, Key, Msg, T> TimerApiImpl<Key, Msg, Ctx, T>
67where
68    Ctx: Quit + Fork + Messaging + Now,
69    Key: Hash + Ord + Clone + Send + Sync + 'static,
70    Msg: Message,
71    T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
72    Ctx::Instant: Send,
73{
74    async fn new(ctx: &mut Ctx) -> Result<Self, TimerError> {
75        let receiver = ctx.address();
76        let api_context = ctx.fork().await.map_err(TimerError::Fork)?;
77        let timer_context = ctx.fork().await.map_err(TimerError::Fork)?;
78        let timer = timer_context.address();
79
80        timer_context
81            .run(move |mut timer_context| {
82                async move {
83                    crate::actor::timer_actor::<T, Ctx>(&mut timer_context, receiver)
84                        .boxed()
85                        .await
86                }
87            })
88            .await;
89
90        let timer_api = Self {
91            api_context,
92            timer_address: timer,
93            _pd: Default::default(),
94        };
95        Ok(timer_api)
96    }
97}
98
99impl<Ctx, Key, Msg, T> TimerApi for TimerApiImpl<Key, Msg, Ctx, T>
100where
101    Ctx: Quit + Fork + Messaging + Now,
102    Key: Message + Hash + Ord + Clone + Send + Sync + 'static,
103    Msg: Message,
104    T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
105    Ctx::Instant: Send,
106{
107    type Duration = T::Duration;
108    type Instant = T::Instant;
109    type Key = T::Key;
110    type Message = T::Message;
111    type Timer = T;
112
113    async fn cancel(&mut self, key: Self::Key) -> Result<(), TimerError> {
114        self.api_context
115            .tell(self.timer_address, t::Cancel::<T> { key })
116            .await
117            .map_err(TimerError::Send)?;
118        Ok(())
119    }
120
121    async fn schedule_once_at(
122        &mut self,
123        key: Self::Key,
124        at: Self::Instant,
125        msg: Self::Message,
126    ) -> Result<(), TimerError> {
127        self.api_context
128            .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
129            .await
130            .map_err(TimerError::Send)?;
131        Ok(())
132    }
133
134    async fn schedule_once_after(
135        &mut self,
136        key: Self::Key,
137        after: Self::Duration,
138        msg: Self::Message,
139    ) -> Result<(), TimerError> {
140        let now = self.api_context.now();
141        let at = T::instant_plus_duration(now, after);
142        self.api_context
143            .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
144            .await
145            .map_err(TimerError::Send)?;
146        Ok(())
147    }
148}