mm1_timer/
api.rs

1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_common::errors::error_of::ErrorOf;
7use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, Quit, SendErrorKind, Tell};
8use mm1_core::prim::Message;
9use mm1_proto_timer as t;
10
11#[cfg(feature = "tokio-time")]
12pub async fn new_tokio_timer<K, M, Ctx>(
13    ctx: &mut Ctx,
14) -> Result<
15    crate::api::TimerApi<
16        K,
17        M,
18        Ctx,
19        impl mm1_proto_timer::Timer<
20            Key = K,
21            Message = M,
22            Instant = tokio::time::Instant,
23            Duration = std::time::Duration,
24        >,
25    >,
26    TimerError,
27>
28where
29    Ctx: Quit + Fork + Messaging + Now<Instant = tokio::time::Instant>,
30    crate::tokio_time::TokioTimer<K, M>: t::Timer<
31            Instant = tokio::time::Instant,
32            Duration = std::time::Duration,
33            Key = K,
34            Message = M,
35        >,
36    K: Hash + Ord + Clone + Send + Sync + 'static,
37    M: Message,
38    Ctx::Instant: Send,
39{
40    TimerApi::<K, M, Ctx, crate::tokio_time::TokioTimer<K, M>>::new(ctx).await
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum TimerError {
45    #[error("fork: {}", _0)]
46    Fork(ErrorOf<ForkErrorKind>),
47    #[error("send: {}", _0)]
48    Send(ErrorOf<SendErrorKind>),
49}
50
51pub struct TimerApi<Key, Msg, Ctx, T> {
52    api_context:   Ctx,
53    timer_address: Address,
54    _pd:           PhantomData<(Key, Msg, T)>,
55}
56
57impl<Ctx, Key, Msg, T> TimerApi<Key, Msg, Ctx, T>
58where
59    Ctx: Quit + Fork + Messaging + Now,
60    Key: Hash + Ord + Clone + Send + Sync + 'static,
61    Msg: Message,
62    T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
63    Ctx::Instant: Send,
64{
65    pub async fn new(ctx: &mut Ctx) -> Result<Self, TimerError> {
66        let receiver = ctx.address();
67        let api_context = ctx.fork().await.map_err(TimerError::Fork)?;
68        let timer_context = ctx.fork().await.map_err(TimerError::Fork)?;
69        let timer = timer_context.address();
70
71        timer_context
72            .run(move |mut timer_context| {
73                async move {
74                    crate::actor::timer_actor::<T, Ctx>(&mut timer_context, receiver)
75                        .boxed()
76                        .await
77                }
78            })
79            .await;
80
81        let timer_api = Self {
82            api_context,
83            timer_address: timer,
84            _pd: Default::default(),
85        };
86        Ok(timer_api)
87    }
88
89    pub async fn cancel(&mut self, key: T::Key) -> Result<(), TimerError> {
90        self.api_context
91            .tell(self.timer_address, t::Cancel::<T> { key })
92            .await
93            .map_err(TimerError::Send)?;
94        Ok(())
95    }
96
97    pub async fn schedule_once_at(
98        &mut self,
99        key: Key,
100        at: T::Instant,
101        msg: Msg,
102    ) -> Result<(), TimerError> {
103        self.api_context
104            .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
105            .await
106            .map_err(TimerError::Send)?;
107        Ok(())
108    }
109
110    pub async fn schedule_once_after(
111        &mut self,
112        key: Key,
113        after: T::Duration,
114        msg: Msg,
115    ) -> Result<(), TimerError> {
116        let now = self.api_context.now();
117        let at = T::instant_plus_duration(now, after);
118        self.api_context
119            .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
120            .await
121            .map_err(TimerError::Send)?;
122        Ok(())
123    }
124}