ockam_node/
delayed.rs

1use crate::Context;
2use core::time::Duration;
3use futures::future::{AbortHandle, Abortable};
4use ockam_core::compat::sync::Arc;
5use ockam_core::{Address, AllowOnwardAddress, DenyAll, Mailboxes, Message, Result};
6
7/// Allow to send message to destination address periodically after some delay
8/// Only one scheduled heartbeat allowed at a time
9/// Dropping this handle cancels scheduled heartbeat
10pub struct DelayedEvent<M: Message + Clone> {
11    ctx: Arc<Context>,
12    destination_addr: Address,
13    msg: M,
14    abort_handle: Option<AbortHandle>,
15}
16
17impl<M: Message + Clone> Drop for DelayedEvent<M> {
18    fn drop(&mut self) {
19        self.cancel()
20    }
21}
22
23impl<M: Message + Clone> DelayedEvent<M> {
24    /// Create a heartbeat
25    pub fn create(ctx: &Context, destination_addr: impl Into<Address>, msg: M) -> Result<Self> {
26        let destination_addr = destination_addr.into();
27        let mailboxes = Mailboxes::primary(
28            Address::random_tagged("DelayedEvent.create"),
29            Arc::new(DenyAll),
30            Arc::new(AllowOnwardAddress(destination_addr.clone())),
31        );
32        let child_ctx = ctx.new_detached_with_mailboxes(mailboxes)?;
33
34        let heartbeat = Self {
35            ctx: Arc::new(child_ctx),
36            destination_addr,
37            abort_handle: None,
38            msg,
39        };
40
41        Ok(heartbeat)
42    }
43
44    /// Address used to send messages to destination address
45    pub fn address(&self) -> &Address {
46        self.ctx.primary_address()
47    }
48}
49
50impl<M: Message + Clone> DelayedEvent<M> {
51    /// Cancel heartbeat
52    pub fn cancel(&mut self) {
53        if let Some(handle) = self.abort_handle.take() {
54            handle.abort()
55        }
56    }
57
58    /// Schedule heartbeat. Cancels already scheduled heartbeat if there is such heartbeat
59    pub fn schedule(&mut self, duration: Duration) -> Result<()> {
60        self.cancel();
61
62        let destination_addr = self.destination_addr.clone();
63        let msg = self.msg.clone();
64
65        let ctx_clone = self.ctx.clone();
66        let (handle, reg) = AbortHandle::new_pair();
67        let future = Abortable::new(
68            async move {
69                ctx_clone.sleep(duration).await;
70
71                let res = ctx_clone.send(destination_addr.clone(), msg).await;
72
73                if res.is_err() {
74                    warn!("Error sending heartbeat message to {}", destination_addr);
75                } else {
76                    debug!("Sent heartbeat message to {}", destination_addr);
77                }
78            },
79            reg,
80        );
81
82        self.abort_handle = Some(handle);
83        self.ctx.runtime().spawn(future);
84
85        Ok(())
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use crate::{Context, DelayedEvent};
92    use core::sync::atomic::Ordering;
93    use core::time::Duration;
94    use ockam_core::compat::{boxed::Box, string::ToString, sync::Arc};
95    use ockam_core::{async_trait, Any};
96    use ockam_core::{Result, Routed, Worker};
97    use std::sync::atomic::AtomicI8;
98    use tokio::time::sleep;
99
100    struct CountingWorker {
101        msgs_count: Arc<AtomicI8>,
102    }
103
104    #[async_trait]
105    impl Worker for CountingWorker {
106        type Context = Context;
107        type Message = Any;
108
109        async fn handle_message(
110            &mut self,
111            _context: &mut Self::Context,
112            _msg: Routed<Self::Message>,
113        ) -> Result<()> {
114            let _ = self.msgs_count.fetch_add(1, Ordering::Relaxed);
115
116            Ok(())
117        }
118    }
119
120    #[allow(non_snake_case)]
121    #[ockam_macros::test(crate = "crate")]
122    async fn scheduled_3_times__counting_worker__messages_count_matches(
123        ctx: &mut Context,
124    ) -> Result<()> {
125        let msgs_count = Arc::new(AtomicI8::new(0));
126        let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
127
128        let worker = CountingWorker {
129            msgs_count: msgs_count.clone(),
130        };
131
132        ctx.start_worker("counting_worker", worker)?;
133
134        heartbeat.schedule(Duration::from_millis(100))?;
135        sleep(Duration::from_millis(150)).await;
136        heartbeat.schedule(Duration::from_millis(100))?;
137        sleep(Duration::from_millis(150)).await;
138        heartbeat.schedule(Duration::from_millis(100))?;
139        sleep(Duration::from_millis(150)).await;
140
141        assert_eq!(3, msgs_count.load(Ordering::Relaxed));
142        Ok(())
143    }
144
145    #[allow(non_snake_case)]
146    #[ockam_macros::test(crate = "crate")]
147    async fn rescheduling__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
148        let msgs_count = Arc::new(AtomicI8::new(0));
149        let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
150
151        let worker = CountingWorker {
152            msgs_count: msgs_count.clone(),
153        };
154
155        ctx.start_worker("counting_worker", worker)?;
156
157        heartbeat.schedule(Duration::from_millis(100))?;
158        heartbeat.schedule(Duration::from_millis(100))?;
159        heartbeat.schedule(Duration::from_millis(100))?;
160        sleep(Duration::from_millis(150)).await;
161
162        assert_eq!(1, msgs_count.load(Ordering::Relaxed));
163        Ok(())
164    }
165
166    #[allow(non_snake_case)]
167    #[ockam_macros::test(crate = "crate")]
168    async fn cancel__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
169        let msgs_count = Arc::new(AtomicI8::new(0));
170        let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
171
172        let worker = CountingWorker {
173            msgs_count: msgs_count.clone(),
174        };
175
176        ctx.start_worker("counting_worker", worker)?;
177
178        heartbeat.schedule(Duration::from_millis(100))?;
179        sleep(Duration::from_millis(150)).await;
180        heartbeat.schedule(Duration::from_millis(200))?;
181        sleep(Duration::from_millis(100)).await;
182        heartbeat.cancel();
183        sleep(Duration::from_millis(300)).await;
184
185        assert_eq!(1, msgs_count.load(Ordering::Relaxed));
186        Ok(())
187    }
188
189    #[allow(non_snake_case)]
190    #[ockam_macros::test(crate = "crate")]
191    async fn drop__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
192        let msgs_count = Arc::new(AtomicI8::new(0));
193        let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
194
195        let worker = CountingWorker {
196            msgs_count: msgs_count.clone(),
197        };
198
199        ctx.start_worker("counting_worker", worker)?;
200
201        heartbeat.schedule(Duration::from_millis(100))?;
202        sleep(Duration::from_millis(150)).await;
203        heartbeat.schedule(Duration::from_millis(200))?;
204        sleep(Duration::from_millis(100)).await;
205        drop(heartbeat);
206        sleep(Duration::from_millis(300)).await;
207
208        assert_eq!(1, msgs_count.load(Ordering::Relaxed));
209        Ok(())
210    }
211}