mm1_timer/
v1.rs

1use std::collections::BTreeSet;
2use std::hash::Hash;
3use std::time::Duration;
4
5use futures::future;
6use mm1_address::address::Address;
7use mm1_ask::{Ask, AskErrorKind, Reply};
8use mm1_common::errors::error_of::ErrorOf;
9use mm1_common::types::AnyError;
10use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, RecvErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
12use mm1_core::message::AnyMessage;
13use mm1_core::tracing::TraceId;
14use mm1_proto_ask::Request;
15use slotmap::SlotMap;
16use tokio::time::Instant;
17
18#[doc(hidden)]
19pub mod proto;
20
21const ASK_TIMEOUT: Duration = Duration::from_millis(100);
22
23slotmap::new_key_type! {
24    pub struct OneshotKey;
25}
26
27#[derive(Debug, thiserror::Error)]
28pub enum TimerError {
29    #[error("{}", _0)]
30    Fork(
31        #[source]
32        #[from]
33        ErrorOf<ForkErrorKind>,
34    ),
35
36    #[error("{}", _0)]
37    Ask(
38        #[source]
39        #[from]
40        ErrorOf<AskErrorKind>,
41    ),
42}
43
44#[derive(Debug, Clone)]
45pub struct OneshotTimer<Ctx> {
46    client_ctx:  Ctx,
47    server_addr: Address,
48}
49
50impl<Ctx> OneshotTimer<Ctx>
51where
52    Ctx: Ask + Fork + Messaging + Now<Instant = Instant>,
53{
54    pub async fn create(context: &mut Ctx) -> Result<Self, TimerError> {
55        let client_ctx = context.fork().await?;
56        let server_ctx = context.fork().await?;
57
58        let receiver_addr = context.address();
59        let server_addr = server_ctx.address();
60        server_ctx
61            .run(move |server_ctx| timer_run(server_ctx, receiver_addr))
62            .await;
63
64        Ok(Self {
65            client_ctx,
66            server_addr,
67        })
68    }
69
70    pub async fn schedule_once_at<M: Into<AnyMessage>>(
71        &mut self,
72        at: Instant,
73        message: M,
74    ) -> Result<OneshotKey, TimerError> {
75        let message = message.into();
76        let proto::ScheduledOneshot { key } = self
77            .client_ctx
78            .ask_nofork(
79                self.server_addr,
80                proto::ScheduleOneshotAt { at, message },
81                ASK_TIMEOUT,
82            )
83            .await?;
84        Ok(key)
85    }
86
87    pub async fn schedule_once_after<M: Into<AnyMessage>>(
88        &mut self,
89        after: Duration,
90        message: M,
91    ) -> Result<OneshotKey, TimerError> {
92        let at = self.client_ctx.now() + after;
93        self.schedule_once_at(at, message).await
94    }
95
96    pub async fn cancel(&mut self, key: OneshotKey) -> Result<Option<AnyMessage>, TimerError> {
97        let proto::CanceledOneshot { message } = self
98            .client_ctx
99            .ask_nofork(self.server_addr, proto::CancelOneshot { key }, ASK_TIMEOUT)
100            .await?;
101        Ok(message)
102    }
103}
104
105#[derive(Default)]
106struct TimerState {
107    #[allow(dead_code)]
108    entries: SlotMap<OneshotKey, Entry>,
109    ordered: BTreeSet<(Instant, OneshotKey)>,
110}
111
112#[allow(dead_code)]
113struct Entry {
114    at:      Instant,
115    message: AnyMessage,
116}
117
118async fn timer_run<Ctx>(mut ctx: Ctx, receiver_addr: Address) -> Result<(), AnyError>
119where
120    Ctx: Messaging + Reply,
121{
122    enum Event {
123        Time,
124        Schedule(Request<proto::ScheduleOneshotAt>),
125        Cancel(Request<proto::CancelOneshot>),
126        RecvError(ErrorOf<RecvErrorKind>),
127    }
128
129    let mut state: TimerState = Default::default();
130
131    loop {
132        let time = async {
133            if let Some((at, _)) = state.ordered.first().copied() {
134                tokio::time::sleep_until(at).await;
135                Event::Time
136            } else {
137                future::pending().await
138            }
139        };
140        let received = async {
141            match ctx.recv().await {
142                Ok(envelope) => {
143                    dispatch!(match envelope {
144                        msg @ Request::<proto::ScheduleOneshotAt> { .. } => Event::Schedule(msg),
145                        msg @ Request::<proto::CancelOneshot> { .. } => Event::Cancel(msg),
146                    })
147                },
148                Err(reason) => Event::RecvError(reason),
149            }
150        };
151
152        let event = tokio::select! {
153            e = time => e,
154            e = received => e,
155        };
156
157        match event {
158            Event::RecvError(reason) => return Err(reason.into()),
159            Event::Schedule(Request {
160                header,
161                payload: proto::ScheduleOneshotAt { at, message },
162            }) => {
163                let key = state.entries.insert(Entry { at, message });
164                state.ordered.insert((at, key));
165                ctx.reply(header, proto::ScheduledOneshot { key }).await?;
166            },
167            Event::Cancel(Request {
168                header,
169                payload: proto::CancelOneshot { key },
170            }) => {
171                let reply = if let Some(Entry { at, message }) = state.entries.remove(key) {
172                    let _ = state.ordered.remove(&(at, key));
173                    proto::CanceledOneshot {
174                        message: Some(message),
175                    }
176                } else {
177                    proto::CanceledOneshot { message: None }
178                };
179                ctx.reply(header, reply).await?;
180            },
181            Event::Time => {
182                let now = Instant::now();
183
184                while let Some((at, _)) = state.ordered.first().copied() {
185                    if at > now {
186                        break
187                    }
188                    let (_, key) = state.ordered.pop_first().expect("just checked");
189                    let Entry { at: _, message } = state.entries.remove(key).expect("should exist");
190                    let header =
191                        EnvelopeHeader::to_address(receiver_addr).with_trace_id(TraceId::random());
192                    let envelope = Envelope::new(header, message);
193                    ctx.send(envelope).await?;
194                }
195            },
196        }
197    }
198}