1use std::collections::BTreeSet;
2use std::hash::Hash;
3use std::time::Duration;
4
5use futures::future;
6use mm1_address::address::Address;
7use mm1_ask::{Ask, AskErrorKind, Reply};
8use mm1_common::errors::error_of::ErrorOf;
9use mm1_common::types::AnyError;
10use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, RecvErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
12use mm1_core::message::AnyMessage;
13use mm1_core::tracing::TraceId;
14use mm1_proto_ask::Request;
15use slotmap::SlotMap;
16use tokio::time::Instant;
17
18#[doc(hidden)]
19pub mod proto;
20
21const ASK_TIMEOUT: Duration = Duration::from_millis(100);
22
23slotmap::new_key_type! {
24 pub struct OneshotKey;
25}
26
27#[derive(Debug, thiserror::Error)]
28pub enum TimerError {
29 #[error("{}", _0)]
30 Fork(
31 #[source]
32 #[from]
33 ErrorOf<ForkErrorKind>,
34 ),
35
36 #[error("{}", _0)]
37 Ask(
38 #[source]
39 #[from]
40 ErrorOf<AskErrorKind>,
41 ),
42}
43
44#[derive(Debug, Clone)]
45pub struct OneshotTimer<Ctx> {
46 client_ctx: Ctx,
47 server_addr: Address,
48}
49
50impl<Ctx> OneshotTimer<Ctx>
51where
52 Ctx: Ask + Fork + Messaging + Now<Instant = Instant>,
53{
54 pub async fn create(context: &mut Ctx) -> Result<Self, TimerError> {
55 let client_ctx = context.fork().await?;
56 let server_ctx = context.fork().await?;
57
58 let receiver_addr = context.address();
59 let server_addr = server_ctx.address();
60 server_ctx
61 .run(move |server_ctx| timer_run(server_ctx, receiver_addr))
62 .await;
63
64 Ok(Self {
65 client_ctx,
66 server_addr,
67 })
68 }
69
70 pub async fn schedule_once_at<M: Into<AnyMessage>>(
71 &mut self,
72 at: Instant,
73 message: M,
74 ) -> Result<OneshotKey, TimerError> {
75 let message = message.into();
76 let proto::ScheduledOneshot { key } = self
77 .client_ctx
78 .ask(
79 self.server_addr,
80 proto::ScheduleOneshotAt { at, message },
81 ASK_TIMEOUT,
82 )
83 .await?;
84 Ok(key)
85 }
86
87 pub async fn schedule_once_after<M: Into<AnyMessage>>(
88 &mut self,
89 after: Duration,
90 message: M,
91 ) -> Result<OneshotKey, TimerError> {
92 let at = self.client_ctx.now() + after;
93 self.schedule_once_at(at, message).await
94 }
95
96 pub async fn cancel(&mut self, key: OneshotKey) -> Result<Option<AnyMessage>, TimerError> {
97 let proto::CanceledOneshot { message } = self
98 .client_ctx
99 .ask(self.server_addr, proto::CancelOneshot { key }, ASK_TIMEOUT)
100 .await?;
101 Ok(message)
102 }
103}
104
105#[derive(Default)]
106struct TimerState {
107 #[allow(dead_code)]
108 entries: SlotMap<OneshotKey, Entry>,
109 ordered: BTreeSet<(Instant, OneshotKey)>,
110}
111
112#[allow(dead_code)]
113struct Entry {
114 at: Instant,
115 message: AnyMessage,
116}
117
118async fn timer_run<Ctx>(mut ctx: Ctx, receiver_addr: Address) -> Result<(), AnyError>
119where
120 Ctx: Messaging + Reply,
121{
122 enum Event {
123 Time,
124 Schedule(Request<proto::ScheduleOneshotAt>),
125 Cancel(Request<proto::CancelOneshot>),
126 RecvError(ErrorOf<RecvErrorKind>),
127 }
128
129 let mut state: TimerState = Default::default();
130
131 loop {
132 let time = async {
133 if let Some((at, _)) = state.ordered.first().copied() {
134 tokio::time::sleep_until(at).await;
135 Event::Time
136 } else {
137 future::pending().await
138 }
139 };
140 let received = async {
141 match ctx.recv().await {
142 Ok(envelope) => {
143 dispatch!(match envelope {
144 msg @ Request::<proto::ScheduleOneshotAt> { .. } => Event::Schedule(msg),
145 msg @ Request::<proto::CancelOneshot> { .. } => Event::Cancel(msg),
146 })
147 },
148 Err(reason) => Event::RecvError(reason),
149 }
150 };
151
152 let event = tokio::select! {
153 e = time => e,
154 e = received => e,
155 };
156
157 match event {
158 Event::RecvError(reason) => return Err(reason.into()),
159 Event::Schedule(Request {
160 header,
161 payload: proto::ScheduleOneshotAt { at, message },
162 }) => {
163 let key = state.entries.insert(Entry { at, message });
164 state.ordered.insert((at, key));
165 ctx.reply(header, proto::ScheduledOneshot { key }).await?;
166 },
167 Event::Cancel(Request {
168 header,
169 payload: proto::CancelOneshot { key },
170 }) => {
171 let reply = if let Some(Entry { at, message }) = state.entries.remove(key) {
172 let _ = state.ordered.remove(&(at, key));
173 proto::CanceledOneshot {
174 message: Some(message),
175 }
176 } else {
177 proto::CanceledOneshot { message: None }
178 };
179 ctx.reply(header, reply).await?;
180 },
181 Event::Time => {
182 let now = Instant::now();
183
184 loop {
185 let Some((at, _)) = state.ordered.first().copied() else {
186 break
187 };
188 if at > now {
189 break
190 }
191 let (_, key) = state.ordered.pop_first().expect("just checked");
192 let Entry { at: _, message } = state.entries.remove(key).expect("should exist");
193 let header =
194 EnvelopeHeader::to_address(receiver_addr).with_trace_id(TraceId::random());
195 let envelope = Envelope::new(header, message);
196 ctx.send(envelope).await?;
197 }
198 },
199 }
200 }
201}