1use crate::Context;
2use core::time::Duration;
3use futures::future::{AbortHandle, Abortable};
4use ockam_core::compat::sync::Arc;
5use ockam_core::{Address, AllowOnwardAddress, DenyAll, Mailboxes, Message, Result};
6
7pub struct DelayedEvent<M: Message + Clone> {
11 ctx: Arc<Context>,
12 destination_addr: Address,
13 msg: M,
14 abort_handle: Option<AbortHandle>,
15}
16
17impl<M: Message + Clone> Drop for DelayedEvent<M> {
18 fn drop(&mut self) {
19 self.cancel()
20 }
21}
22
23impl<M: Message + Clone> DelayedEvent<M> {
24 pub fn create(ctx: &Context, destination_addr: impl Into<Address>, msg: M) -> Result<Self> {
26 let destination_addr = destination_addr.into();
27 let mailboxes = Mailboxes::primary(
28 Address::random_tagged("DelayedEvent.create"),
29 Arc::new(DenyAll),
30 Arc::new(AllowOnwardAddress(destination_addr.clone())),
31 );
32 let child_ctx = ctx.new_detached_with_mailboxes(mailboxes)?;
33
34 let heartbeat = Self {
35 ctx: Arc::new(child_ctx),
36 destination_addr,
37 abort_handle: None,
38 msg,
39 };
40
41 Ok(heartbeat)
42 }
43
44 pub fn address(&self) -> &Address {
46 self.ctx.primary_address()
47 }
48}
49
50impl<M: Message + Clone> DelayedEvent<M> {
51 pub fn cancel(&mut self) {
53 if let Some(handle) = self.abort_handle.take() {
54 handle.abort()
55 }
56 }
57
58 pub fn schedule(&mut self, duration: Duration) -> Result<()> {
60 self.cancel();
61
62 let destination_addr = self.destination_addr.clone();
63 let msg = self.msg.clone();
64
65 let ctx_clone = self.ctx.clone();
66 let (handle, reg) = AbortHandle::new_pair();
67 let future = Abortable::new(
68 async move {
69 ctx_clone.sleep(duration).await;
70
71 let res = ctx_clone.send(destination_addr.clone(), msg).await;
72
73 if res.is_err() {
74 warn!("Error sending heartbeat message to {}", destination_addr);
75 } else {
76 debug!("Sent heartbeat message to {}", destination_addr);
77 }
78 },
79 reg,
80 );
81
82 self.abort_handle = Some(handle);
83 self.ctx.runtime().spawn(future);
84
85 Ok(())
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use crate::{Context, DelayedEvent};
92 use core::sync::atomic::Ordering;
93 use core::time::Duration;
94 use ockam_core::compat::{boxed::Box, string::ToString, sync::Arc};
95 use ockam_core::{async_trait, Any};
96 use ockam_core::{Result, Routed, Worker};
97 use std::sync::atomic::AtomicI8;
98 use tokio::time::sleep;
99
100 struct CountingWorker {
101 msgs_count: Arc<AtomicI8>,
102 }
103
104 #[async_trait]
105 impl Worker for CountingWorker {
106 type Context = Context;
107 type Message = Any;
108
109 async fn handle_message(
110 &mut self,
111 _context: &mut Self::Context,
112 _msg: Routed<Self::Message>,
113 ) -> Result<()> {
114 let _ = self.msgs_count.fetch_add(1, Ordering::Relaxed);
115
116 Ok(())
117 }
118 }
119
120 #[allow(non_snake_case)]
121 #[ockam_macros::test(crate = "crate")]
122 async fn scheduled_3_times__counting_worker__messages_count_matches(
123 ctx: &mut Context,
124 ) -> Result<()> {
125 let msgs_count = Arc::new(AtomicI8::new(0));
126 let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
127
128 let worker = CountingWorker {
129 msgs_count: msgs_count.clone(),
130 };
131
132 ctx.start_worker("counting_worker", worker)?;
133
134 heartbeat.schedule(Duration::from_millis(100))?;
135 sleep(Duration::from_millis(150)).await;
136 heartbeat.schedule(Duration::from_millis(100))?;
137 sleep(Duration::from_millis(150)).await;
138 heartbeat.schedule(Duration::from_millis(100))?;
139 sleep(Duration::from_millis(150)).await;
140
141 assert_eq!(3, msgs_count.load(Ordering::Relaxed));
142 Ok(())
143 }
144
145 #[allow(non_snake_case)]
146 #[ockam_macros::test(crate = "crate")]
147 async fn rescheduling__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
148 let msgs_count = Arc::new(AtomicI8::new(0));
149 let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
150
151 let worker = CountingWorker {
152 msgs_count: msgs_count.clone(),
153 };
154
155 ctx.start_worker("counting_worker", worker)?;
156
157 heartbeat.schedule(Duration::from_millis(100))?;
158 heartbeat.schedule(Duration::from_millis(100))?;
159 heartbeat.schedule(Duration::from_millis(100))?;
160 sleep(Duration::from_millis(150)).await;
161
162 assert_eq!(1, msgs_count.load(Ordering::Relaxed));
163 Ok(())
164 }
165
166 #[allow(non_snake_case)]
167 #[ockam_macros::test(crate = "crate")]
168 async fn cancel__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
169 let msgs_count = Arc::new(AtomicI8::new(0));
170 let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
171
172 let worker = CountingWorker {
173 msgs_count: msgs_count.clone(),
174 };
175
176 ctx.start_worker("counting_worker", worker)?;
177
178 heartbeat.schedule(Duration::from_millis(100))?;
179 sleep(Duration::from_millis(150)).await;
180 heartbeat.schedule(Duration::from_millis(200))?;
181 sleep(Duration::from_millis(100)).await;
182 heartbeat.cancel();
183 sleep(Duration::from_millis(300)).await;
184
185 assert_eq!(1, msgs_count.load(Ordering::Relaxed));
186 Ok(())
187 }
188
189 #[allow(non_snake_case)]
190 #[ockam_macros::test(crate = "crate")]
191 async fn drop__counting_worker__aborts_existing(ctx: &mut Context) -> Result<()> {
192 let msgs_count = Arc::new(AtomicI8::new(0));
193 let mut heartbeat = DelayedEvent::create(ctx, "counting_worker", "Hello".to_string())?;
194
195 let worker = CountingWorker {
196 msgs_count: msgs_count.clone(),
197 };
198
199 ctx.start_worker("counting_worker", worker)?;
200
201 heartbeat.schedule(Duration::from_millis(100))?;
202 sleep(Duration::from_millis(150)).await;
203 heartbeat.schedule(Duration::from_millis(200))?;
204 sleep(Duration::from_millis(100)).await;
205 drop(heartbeat);
206 sleep(Duration::from_millis(300)).await;
207
208 assert_eq!(1, msgs_count.load(Ordering::Relaxed));
209 Ok(())
210 }
211}