1use std::collections::BTreeSet;
2use std::hash::Hash;
3use std::time::Duration;
4
5use futures::future;
6use mm1_address::address::Address;
7use mm1_ask::{Ask, AskErrorKind, Reply};
8use mm1_common::errors::error_of::ErrorOf;
9use mm1_common::types::AnyError;
10use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, RecvErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
12use mm1_core::message::AnyMessage;
13use mm1_proto_ask::Request;
14use slotmap::SlotMap;
15use tokio::time::Instant;
16
17#[doc(hidden)]
18pub mod proto;
19
20const ASK_TIMEOUT: Duration = Duration::from_millis(100);
21
22slotmap::new_key_type! {
23 pub struct OneshotKey;
24}
25
26#[derive(Debug, thiserror::Error)]
27pub enum TimerError {
28 #[error("{}", _0)]
29 Fork(
30 #[source]
31 #[from]
32 ErrorOf<ForkErrorKind>,
33 ),
34
35 #[error("{}", _0)]
36 Ask(
37 #[source]
38 #[from]
39 ErrorOf<AskErrorKind>,
40 ),
41}
42
43#[derive(Debug, Clone)]
44pub struct OneshotTimer<Ctx> {
45 client_ctx: Ctx,
46 server_addr: Address,
47}
48
49impl<Ctx> OneshotTimer<Ctx>
50where
51 Ctx: Ask + Fork + Messaging + Now<Instant = Instant>,
52{
53 pub async fn create(context: &mut Ctx) -> Result<Self, TimerError> {
54 let client_ctx = context.fork().await?;
55 let server_ctx = context.fork().await?;
56
57 let receiver_addr = context.address();
58 let server_addr = server_ctx.address();
59 server_ctx
60 .run(move |server_ctx| timer_run(server_ctx, receiver_addr))
61 .await;
62
63 Ok(Self {
64 client_ctx,
65 server_addr,
66 })
67 }
68
69 pub async fn schedule_once_at<M: Into<AnyMessage>>(
70 &mut self,
71 at: Instant,
72 message: M,
73 ) -> Result<OneshotKey, TimerError> {
74 let message = message.into();
75 let proto::ScheduledOneshot { key } = self
76 .client_ctx
77 .ask(
78 self.server_addr,
79 proto::ScheduleOneshotAt { at, message },
80 ASK_TIMEOUT,
81 )
82 .await?;
83 Ok(key)
84 }
85
86 pub async fn schedule_once_after<M: Into<AnyMessage>>(
87 &mut self,
88 after: Duration,
89 message: M,
90 ) -> Result<OneshotKey, TimerError> {
91 let at = self.client_ctx.now() + after;
92 self.schedule_once_at(at, message).await
93 }
94
95 pub async fn cancel(&mut self, key: OneshotKey) -> Result<Option<AnyMessage>, TimerError> {
96 let proto::CanceledOneshot { message } = self
97 .client_ctx
98 .ask(self.server_addr, proto::CancelOneshot { key }, ASK_TIMEOUT)
99 .await?;
100 Ok(message)
101 }
102}
103
104#[derive(Default)]
105struct TimerState {
106 #[allow(dead_code)]
107 entries: SlotMap<OneshotKey, Entry>,
108 ordered: BTreeSet<(Instant, OneshotKey)>,
109}
110
111#[allow(dead_code)]
112struct Entry {
113 at: Instant,
114 message: AnyMessage,
115}
116
117async fn timer_run<Ctx>(mut ctx: Ctx, receiver_addr: Address) -> Result<(), AnyError>
118where
119 Ctx: Messaging + Reply,
120{
121 enum Event {
122 Time,
123 Schedule(Request<proto::ScheduleOneshotAt>),
124 Cancel(Request<proto::CancelOneshot>),
125 RecvError(ErrorOf<RecvErrorKind>),
126 }
127
128 let mut state: TimerState = Default::default();
129
130 loop {
131 let time = async {
132 if let Some((at, _)) = state.ordered.first().copied() {
133 tokio::time::sleep_until(at).await;
134 Event::Time
135 } else {
136 future::pending().await
137 }
138 };
139 let received = async {
140 match ctx.recv().await {
141 Ok(envelope) => {
142 dispatch!(match envelope {
143 msg @ Request::<proto::ScheduleOneshotAt> { .. } => Event::Schedule(msg),
144 msg @ Request::<proto::CancelOneshot> { .. } => Event::Cancel(msg),
145 })
146 },
147 Err(reason) => Event::RecvError(reason),
148 }
149 };
150
151 let event = tokio::select! {
152 e = time => e,
153 e = received => e,
154 };
155
156 match event {
157 Event::RecvError(reason) => return Err(reason.into()),
158 Event::Schedule(Request {
159 header,
160 payload: proto::ScheduleOneshotAt { at, message },
161 }) => {
162 let key = state.entries.insert(Entry { at, message });
163 state.ordered.insert((at, key));
164 ctx.reply(header, proto::ScheduledOneshot { key }).await?;
165 },
166 Event::Cancel(Request {
167 header,
168 payload: proto::CancelOneshot { key },
169 }) => {
170 let reply = if let Some(Entry { at, message }) = state.entries.remove(key) {
171 let _ = state.ordered.remove(&(at, key));
172 proto::CanceledOneshot {
173 message: Some(message),
174 }
175 } else {
176 proto::CanceledOneshot { message: None }
177 };
178 ctx.reply(header, reply).await?;
179 },
180 Event::Time => {
181 let now = Instant::now();
182
183 loop {
184 let Some((at, _)) = state.ordered.first().copied() else {
185 break
186 };
187 if at > now {
188 break
189 }
190 let (_, key) = state.ordered.pop_first().expect("just checked");
191 let Entry { at: _, message } = state.entries.remove(key).expect("should exist");
192 let header = EnvelopeHeader::to_address(receiver_addr);
193 let envelope = Envelope::new(header, message);
194 ctx.send(envelope).await?;
195 }
196 },
197 }
198 }
199}