1use std::hash::Hash;
2use std::marker::PhantomData;
3use std::time::Duration;
4
5use futures::FutureExt;
6use mm1_address::address::Address;
7use mm1_core::context::{Fork, Messaging, Now, Quit, Tell};
8use mm1_proto::Message;
9use mm1_proto_timer as t;
10use mm1_proto_timer::Timer;
11
12use crate::api::{TimerApi, TimerError};
13
14pub struct TokioTimer<K, M> {
15 _key: PhantomData<K>,
16 _msg: PhantomData<M>,
17}
18
19impl<K, M> Timer for TokioTimer<K, M>
20where
21 K: Message + Hash + Ord + Clone + Send + 'static,
22 M: Message,
23{
24 type Duration = Duration;
25 type Instant = tokio::time::Instant;
26 type Key = K;
27 type Message = M;
28
29 async fn sleep(d: Self::Duration) {
30 tokio::time::sleep(d).await;
31 }
32}
33
34#[cfg(feature = "tokio-time")]
35pub async fn new_tokio_timer<K, M, Ctx>(
36 ctx: &mut Ctx,
37) -> Result<
38 impl TimerApi<
39 Key = K,
40 Message = M,
41 Instant = tokio::time::Instant,
42 Duration = tokio::time::Duration,
43 >,
44 TimerError,
45>
46where
47 Ctx: Quit + Fork + Messaging + Now<Instant = tokio::time::Instant>,
48 crate::tokio_time::TokioTimer<K, M>: t::Timer<
49 Instant = tokio::time::Instant,
50 Duration = std::time::Duration,
51 Key = K,
52 Message = M,
53 >,
54 K: Message + Hash + Ord + Clone + Send + Sync + 'static,
55 M: Message,
56{
57 TimerApiImpl::<K, M, Ctx, crate::tokio_time::TokioTimer<K, M>>::new(ctx).await
58}
59
60struct TimerApiImpl<Key, Msg, Ctx, T> {
61 api_context: Ctx,
62 timer_address: Address,
63 _pd: PhantomData<(Key, Msg, T)>,
64}
65
66impl<Ctx, Key, Msg, T> TimerApiImpl<Key, Msg, Ctx, T>
67where
68 Ctx: Quit + Fork + Messaging + Now,
69 Key: Hash + Ord + Clone + Send + Sync + 'static,
70 Msg: Message,
71 T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
72 Ctx::Instant: Send,
73{
74 async fn new(ctx: &mut Ctx) -> Result<Self, TimerError> {
75 let receiver = ctx.address();
76 let api_context = ctx.fork().await.map_err(TimerError::Fork)?;
77 let timer_context = ctx.fork().await.map_err(TimerError::Fork)?;
78 let timer = timer_context.address();
79
80 timer_context
81 .run(move |mut timer_context| {
82 async move {
83 crate::actor::timer_actor::<T, Ctx>(&mut timer_context, receiver)
84 .boxed()
85 .await
86 }
87 })
88 .await;
89
90 let timer_api = Self {
91 api_context,
92 timer_address: timer,
93 _pd: Default::default(),
94 };
95 Ok(timer_api)
96 }
97}
98
99impl<Ctx, Key, Msg, T> TimerApi for TimerApiImpl<Key, Msg, Ctx, T>
100where
101 Ctx: Quit + Fork + Messaging + Now,
102 Key: Message + Hash + Ord + Clone + Send + Sync + 'static,
103 Msg: Message,
104 T: t::Timer<Key = Key, Message = Msg, Instant = Ctx::Instant>,
105 Ctx::Instant: Send,
106{
107 type Duration = T::Duration;
108 type Instant = T::Instant;
109 type Key = T::Key;
110 type Message = T::Message;
111 type Timer = T;
112
113 async fn cancel(&mut self, key: Self::Key) -> Result<(), TimerError> {
114 self.api_context
115 .tell(self.timer_address, t::Cancel::<T> { key })
116 .await
117 .map_err(TimerError::Send)?;
118 Ok(())
119 }
120
121 async fn schedule_once_at(
122 &mut self,
123 key: Self::Key,
124 at: Self::Instant,
125 msg: Self::Message,
126 ) -> Result<(), TimerError> {
127 self.api_context
128 .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
129 .await
130 .map_err(TimerError::Send)?;
131 Ok(())
132 }
133
134 async fn schedule_once_after(
135 &mut self,
136 key: Self::Key,
137 after: Self::Duration,
138 msg: Self::Message,
139 ) -> Result<(), TimerError> {
140 let now = self.api_context.now();
141 let at = T::instant_plus_duration(now, after);
142 self.api_context
143 .tell(self.timer_address, t::ScheduleOnce::<T> { key, at, msg })
144 .await
145 .map_err(TimerError::Send)?;
146 Ok(())
147 }
148}