actix_daemon_utils/
graceful_stop.rs1use crate::delayer::{Delayer, Task};
2use actix::prelude::*;
3use actix_rt::signal::unix::{signal, SignalKind};
4use futures_util::stream::once;
5use std::{
6 sync::{mpsc::Sender, Arc},
7 time::Duration,
8};
9
10pub struct GracefulStop {
12 stop_request_recipients: Vec<Recipient<StopRequest>>,
13 system_terminator: Arc<SystemTerminator>,
14}
15
16impl GracefulStop {
17 pub fn new() -> GracefulStop {
18 GracefulStop {
19 stop_request_recipients: Vec::new(),
20 system_terminator: Arc::new(SystemTerminator { sender: None }),
21 }
22 }
23
24 pub fn new_with_sender(sender: Sender<()>) -> GracefulStop {
25 GracefulStop {
26 stop_request_recipients: Vec::new(),
27 system_terminator: Arc::new(SystemTerminator {
28 sender: Some(sender),
29 }),
30 }
31 }
32
33 pub fn subscribe(mut self, recipient: Recipient<StopRequest>) -> Self {
34 self.stop_request_recipients.push(recipient);
35 self
36 }
37
38 pub fn clone_system_terminator(&self) -> Arc<SystemTerminator> {
39 Arc::clone(&self.system_terminator)
40 }
41
42 pub fn subscribe_ref(&mut self, recipient: Recipient<StopRequest>) {
43 self.stop_request_recipients.push(recipient);
44 }
45
46 pub fn start_with_delayers(receipts: Vec<(Recipient<Task>, Duration)>) -> Addr<Self> {
47 let mut graceful_stop = GracefulStop::new();
48 for receipt in receipts {
49 let delayer = Delayer::new(
50 receipt.0,
51 graceful_stop.clone_system_terminator(),
52 receipt.1,
53 )
54 .start();
55 graceful_stop.subscribe_ref(delayer.recipient());
56 }
57 graceful_stop.start()
58 }
59}
60
61impl Actor for GracefulStop {
62 type Context = Context<Self>;
63
64 fn started(&mut self, ctx: &mut Self::Context) {
65 let signals = vec![
66 Box::new(SignalKind::hangup()),
67 Box::new(SignalKind::interrupt()),
68 Box::new(SignalKind::quit()),
69 Box::new(SignalKind::terminate()),
70 ];
71 for signal_kind in signals.into_iter() {
72 let mut s = signal(*signal_kind).unwrap();
73 ctx.add_message_stream(once(async move {
74 s.recv().await;
75 StopEvent
76 }));
77 }
78 }
79}
80
81#[derive(Message)]
83#[rtype(result = "()")]
84pub struct StopRequest;
85
86#[derive(Message)]
87#[rtype(result = "()")]
88pub struct StopEvent;
89
90impl Handler<StopEvent> for GracefulStop {
91 type Result = ();
92
93 fn handle(&mut self, _signal_event: StopEvent, ctx: &mut Self::Context) {
94 for recipient in self.stop_request_recipients.drain(..) {
95 let _ = recipient.do_send(StopRequest);
96 }
97 ctx.stop();
98 }
99}
100
101pub struct SystemTerminator {
103 sender: Option<Sender<()>>,
104}
105
106impl Drop for SystemTerminator {
107 fn drop(&mut self) {
108 match self.sender {
109 Some(ref sender) => {
110 sender.send(()).unwrap();
111 }
112 None => {
113 actix::System::current().stop();
114 }
115 }
116 }
117}