1use std::hash::Hash;
2use std::marker::PhantomData;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_common::errors::error_of::ErrorOf;
7use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, Quit, SendErrorKind, Tell};
8use mm1_core::prim::Message;
9use mm1_proto_timer as t;
10
11#[cfg(feature = "tokio-time")]
12pub async fn new_tokio_timer<K, M, Ctx>(
13 ctx: &mut Ctx,
14) -> Result<
15 crate::api::TimerApi<
16 K,
17 M,
18 Ctx,
19 impl mm1_proto_timer::Timer<
20 Key = K,
21 Message = M,
22 Instant = tokio::time::Instant,
23 Duration = std::time::Duration,
24 >,
25 >,
26 TimerError,
27>
28where
29 Ctx: Quit + Fork + Messaging + Now<Instant = tokio::time::Instant>,
30 crate::tokio_time::TokioTimer<K, M>: t::Timer<
31 Instant = tokio::time::Instant,
32 Duration = std::time::Duration,
33 Key = K,
34 Message = M,
35 >,
36 K: Hash + Ord + Clone + Send + Sync + 'static,
37 M: Message,
38 Ctx::Instant: Send,
39{
40 TimerApi::<K, M, Ctx, crate::tokio_time::TokioTimer<K, M>>::new(ctx).await
41}
42
43#[derive(Debug, thiserror::Error)]
44pub enum TimerError {
45 #[error("fork: {}", _0)]
46 Fork(ErrorOf<ForkErrorKind>),
47 #[error("send: {}", _0)]
48 Send(ErrorOf<SendErrorKind>),
49}
50
51pub struct TimerApi<Key, Msg, Ctx, T> {
52 api_context: Ctx,
53 timer_address: Address,
54 _pd: PhantomData<(Key, Msg, T)>,
55}
56
57impl<Ctx, Key, Msg, T> TimerApi<Key, Msg, Ctx, T>
58where
59 Ctx: Quit + Fork + Messaging + Now,
60 Key: Hash + Ord + Clone + Send + Sync + 'static,
61 Msg: Message,
62 T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
63 Ctx::Instant: Send,
64{
65 pub async fn new(ctx: &mut Ctx) -> Result<Self, TimerError> {
66 let receiver = ctx.address();
67 let api_context = ctx.fork().await.map_err(TimerError::Fork)?;
68 let timer_context = ctx.fork().await.map_err(TimerError::Fork)?;
69 let timer = timer_context.address();
70
71 timer_context
72 .run(move |mut timer_context| {
73 async move {
74 crate::actor::timer_actor::<T, Ctx>(&mut timer_context, receiver)
75 .boxed()
76 .await
77 }
78 })
79 .await;
80
81 let timer_api = Self {
82 api_context,
83 timer_address: timer,
84 _pd: Default::default(),
85 };
86 Ok(timer_api)
87 }
88
89 pub async fn cancel(&mut self, key: T::Key) -> Result<(), TimerError> {
90 self.api_context
91 .tell(self.timer_address, t::Cancel::<T> { key })
92 .await
93 .map_err(TimerError::Send)?;
94 Ok(())
95 }
96
97 pub async fn schedule_once_at(
98 &mut self,
99 key: Key,
100 at: T::Instant,
101 msg: Msg,
102 ) -> Result<(), TimerError> {
103 self.api_context
104 .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
105 .await
106 .map_err(TimerError::Send)?;
107 Ok(())
108 }
109
110 pub async fn schedule_once_after(
111 &mut self,
112 key: Key,
113 after: T::Duration,
114 msg: Msg,
115 ) -> Result<(), TimerError> {
116 let now = self.api_context.now();
117 let at = T::instant_plus_duration(now, after);
118 self.api_context
119 .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
120 .await
121 .map_err(TimerError::Send)?;
122 Ok(())
123 }
124}