mm1_timer/
v1.rs

1use std::collections::BTreeSet;
2use std::hash::Hash;
3use std::time::Duration;
4
5use futures::future;
6use mm1_address::address::Address;
7use mm1_ask::{Ask, AskErrorKind, Reply};
8use mm1_common::errors::error_of::ErrorOf;
9use mm1_common::types::AnyError;
10use mm1_core::context::{Fork, ForkErrorKind, Messaging, Now, RecvErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
12use mm1_core::message::AnyMessage;
13use mm1_proto_ask::Request;
14use slotmap::SlotMap;
15use tokio::time::Instant;
16
17#[doc(hidden)]
18pub mod proto;
19
20const ASK_TIMEOUT: Duration = Duration::from_millis(100);
21
22slotmap::new_key_type! {
23    pub struct OneshotKey;
24}
25
26#[derive(Debug, thiserror::Error)]
27pub enum TimerError {
28    #[error("{}", _0)]
29    Fork(
30        #[source]
31        #[from]
32        ErrorOf<ForkErrorKind>,
33    ),
34
35    #[error("{}", _0)]
36    Ask(
37        #[source]
38        #[from]
39        ErrorOf<AskErrorKind>,
40    ),
41}
42
43#[derive(Debug, Clone)]
44pub struct OneshotTimer<Ctx> {
45    client_ctx:  Ctx,
46    server_addr: Address,
47}
48
49impl<Ctx> OneshotTimer<Ctx>
50where
51    Ctx: Ask + Fork + Messaging + Now<Instant = Instant>,
52{
53    pub async fn create(context: &mut Ctx) -> Result<Self, TimerError> {
54        let client_ctx = context.fork().await?;
55        let server_ctx = context.fork().await?;
56
57        let receiver_addr = context.address();
58        let server_addr = server_ctx.address();
59        server_ctx
60            .run(move |server_ctx| timer_run(server_ctx, receiver_addr))
61            .await;
62
63        Ok(Self {
64            client_ctx,
65            server_addr,
66        })
67    }
68
69    pub async fn schedule_once_at<M: Into<AnyMessage>>(
70        &mut self,
71        at: Instant,
72        message: M,
73    ) -> Result<OneshotKey, TimerError> {
74        let message = message.into();
75        let proto::ScheduledOneshot { key } = self
76            .client_ctx
77            .ask(
78                self.server_addr,
79                proto::ScheduleOneshotAt { at, message },
80                ASK_TIMEOUT,
81            )
82            .await?;
83        Ok(key)
84    }
85
86    pub async fn schedule_once_after<M: Into<AnyMessage>>(
87        &mut self,
88        after: Duration,
89        message: M,
90    ) -> Result<OneshotKey, TimerError> {
91        let at = self.client_ctx.now() + after;
92        self.schedule_once_at(at, message).await
93    }
94
95    pub async fn cancel(&mut self, key: OneshotKey) -> Result<Option<AnyMessage>, TimerError> {
96        let proto::CanceledOneshot { message } = self
97            .client_ctx
98            .ask(self.server_addr, proto::CancelOneshot { key }, ASK_TIMEOUT)
99            .await?;
100        Ok(message)
101    }
102}
103
104#[derive(Default)]
105struct TimerState {
106    #[allow(dead_code)]
107    entries: SlotMap<OneshotKey, Entry>,
108    ordered: BTreeSet<(Instant, OneshotKey)>,
109}
110
111#[allow(dead_code)]
112struct Entry {
113    at:      Instant,
114    message: AnyMessage,
115}
116
117async fn timer_run<Ctx>(mut ctx: Ctx, receiver_addr: Address) -> Result<(), AnyError>
118where
119    Ctx: Messaging + Reply,
120{
121    enum Event {
122        Time,
123        Schedule(Request<proto::ScheduleOneshotAt>),
124        Cancel(Request<proto::CancelOneshot>),
125        RecvError(ErrorOf<RecvErrorKind>),
126    }
127
128    let mut state: TimerState = Default::default();
129
130    loop {
131        let time = async {
132            if let Some((at, _)) = state.ordered.first().copied() {
133                tokio::time::sleep_until(at).await;
134                Event::Time
135            } else {
136                future::pending().await
137            }
138        };
139        let received = async {
140            match ctx.recv().await {
141                Ok(envelope) => {
142                    dispatch!(match envelope {
143                        msg @ Request::<proto::ScheduleOneshotAt> { .. } => Event::Schedule(msg),
144                        msg @ Request::<proto::CancelOneshot> { .. } => Event::Cancel(msg),
145                    })
146                },
147                Err(reason) => Event::RecvError(reason),
148            }
149        };
150
151        let event = tokio::select! {
152            e = time => e,
153            e = received => e,
154        };
155
156        match event {
157            Event::RecvError(reason) => return Err(reason.into()),
158            Event::Schedule(Request {
159                header,
160                payload: proto::ScheduleOneshotAt { at, message },
161            }) => {
162                let key = state.entries.insert(Entry { at, message });
163                state.ordered.insert((at, key));
164                ctx.reply(header, proto::ScheduledOneshot { key }).await?;
165            },
166            Event::Cancel(Request {
167                header,
168                payload: proto::CancelOneshot { key },
169            }) => {
170                let reply = if let Some(Entry { at, message }) = state.entries.remove(key) {
171                    let _ = state.ordered.remove(&(at, key));
172                    proto::CanceledOneshot {
173                        message: Some(message),
174                    }
175                } else {
176                    proto::CanceledOneshot { message: None }
177                };
178                ctx.reply(header, reply).await?;
179            },
180            Event::Time => {
181                let now = Instant::now();
182
183                loop {
184                    let Some((at, _)) = state.ordered.first().copied() else {
185                        break
186                    };
187                    if at > now {
188                        break
189                    }
190                    let (_, key) = state.ordered.pop_first().expect("just checked");
191                    let Entry { at: _, message } = state.entries.remove(key).expect("should exist");
192                    let header = EnvelopeHeader::to_address(receiver_addr);
193                    let envelope = Envelope::new(header, message);
194                    ctx.send(envelope).await?;
195                }
196            },
197        }
198    }
199}