actix_daemon_utils/
delayer.rs

1use crate::graceful_stop::{StopRequest, SystemTerminator};
2use actix::prelude::*;
3use std::sync::Arc;
4use std::time::Duration;
5
6/// This is a delay actor.
7#[allow(dead_code)]
8pub struct Delayer {
9    task: Recipient<Task>,
10    system_terminator: Arc<SystemTerminator>,
11    error_duration: Duration,
12}
13
14#[derive(Message, Debug)]
15#[rtype(result = "()")]
16pub enum Timing {
17    Immediately,
18    Later(Duration),
19}
20
21/// This is a task call by Looper after specified seconds spend.
22#[derive(Message)]
23#[rtype(result = "()")]
24pub struct Task(pub Addr<Delayer>);
25
26impl Delayer {
27    pub fn new(
28        task: Recipient<Task>,
29        system_terminator: Arc<SystemTerminator>,
30        error_duration: Duration,
31    ) -> Self {
32        Self {
33            task: task,
34            system_terminator: system_terminator,
35            error_duration: error_duration,
36        }
37    }
38}
39
40impl Actor for Delayer {
41    type Context = Context<Self>;
42
43    fn started(&mut self, ctx: &mut Self::Context) {
44        ctx.notify(Timing::Immediately);
45    }
46}
47
48impl Handler<Timing> for Delayer {
49    type Result = ();
50
51    fn handle(&mut self, msg: Timing, ctx: &mut Self::Context) {
52        if let Timing::Later(duration) = msg {
53            ctx.notify_later(Timing::Immediately, duration);
54            return;
55        }
56        match self.task.try_send(Task(ctx.address())) {
57            Err(SendError::Full(_)) => {
58                ctx.notify(Timing::Later(self.error_duration));
59            }
60            _ => {}
61        }
62    }
63}
64
65impl Handler<StopRequest> for Delayer {
66    type Result = <StopRequest as Message>::Result;
67
68    fn handle(&mut self, _msg: StopRequest, ctx: &mut Self::Context) {
69        ctx.stop();
70    }
71}