actix_daemon_utils/
looper.rs1use crate::graceful_stop::{StopRequest, SystemTerminator};
2use actix::prelude::*;
3use std::sync::Arc;
4use std::time::Duration;
5
6#[allow(dead_code)]
8pub struct Looper {
9 task: Recipient<Task>,
10 system_terminator: Arc<SystemTerminator>,
11}
12
13#[derive(Message, Debug)]
14#[rtype(result = "()")]
15enum NextLoop {
16 Immediately,
17 Later(Duration),
18}
19
20#[derive(Message, Debug)]
22#[rtype(result = "Option<Duration>")]
23pub struct Task;
24
25impl Looper {
26 pub fn new(task: Recipient<Task>, system_terminator: Arc<SystemTerminator>) -> Self {
27 Self {
28 task: task,
29 system_terminator: system_terminator,
30 }
31 }
32}
33
34impl Actor for Looper {
35 type Context = Context<Self>;
36
37 fn started(&mut self, ctx: &mut Self::Context) {
38 ctx.notify(NextLoop::Immediately);
39 }
40}
41
42impl Handler<NextLoop> for Looper {
43 type Result = ();
44
45 fn handle(&mut self, msg: NextLoop, ctx: &mut Self::Context) {
46 if let NextLoop::Later(duration) = msg {
47 ctx.notify_later(NextLoop::Immediately, duration);
48 return;
49 }
50 self.task
51 .send(Task)
52 .into_actor(self)
53 .then(move |res, act, ctx2| {
54 ctx2.notify(match res {
55 Ok(Some(duration)) => NextLoop::Later(duration),
56 _ => NextLoop::Immediately,
57 });
58 async {}.into_actor(act)
59 })
60 .wait(ctx);
61 }
62}
63
64impl Handler<StopRequest> for Looper {
65 type Result = <StopRequest as Message>::Result;
66
67 fn handle(&mut self, _msg: StopRequest, ctx: &mut Self::Context) {
68 ctx.stop();
69 }
70}