signal_handler/handler/
impl_std.rs

1use core::time::Duration;
2use std::{
3    collections::HashMap,
4    panic,
5    sync::mpsc::{channel, sync_channel, RecvTimeoutError},
6    thread::spawn,
7};
8
9use crate::{
10    callback::{Callback, CallbackInfo, CallbackType},
11    handler::{builder::Builder, HandleError, Handler},
12    register::RegisterType,
13};
14
15//
16impl Handler {
17    pub fn handle(self) -> Result<(), HandleError> {
18        let Builder {
19            callbacks,
20            registers,
21        } = self.builder;
22
23        if callbacks.has_async() {
24            return Err(HandleError::AsyncRequired);
25        }
26
27        //
28        //
29        //
30        let (register_tx, register_rx) = sync_channel::<RegisterType>(6);
31
32        let _sig_id_map = registers
33            .register(register_tx)
34            .map_err(HandleError::RegisterFailed)?;
35
36        //
37        //
38        //
39        let mut initialized_cb = None;
40        let mut wait_for_stop_cb = None;
41
42        let mut callback_tx_map = HashMap::new();
43        let mut callback_join_handle_map = HashMap::new();
44
45        for (tp, cb) in callbacks.into_inner() {
46            match tp {
47                CallbackType::Initialized => {
48                    initialized_cb = Some(cb);
49                    continue;
50                }
51                CallbackType::ReloadConfig => {}
52                CallbackType::WaitForStop => {
53                    wait_for_stop_cb = Some(cb);
54                    continue;
55                }
56                CallbackType::PrintStats => {}
57            }
58
59            let (tx, rx) = channel::<CallbackInfo>();
60
61            let join_handle = spawn(move || {
62                let mut latest_finish_time = None;
63
64                loop {
65                    match rx.recv_timeout(Duration::from_secs(1)) {
66                        Ok(info) => {
67                            if let Some(latest_finish_time) = latest_finish_time {
68                                if latest_finish_time > *info.time() {
69                                    continue;
70                                }
71                            }
72
73                            match &cb {
74                                Callback::Sync(cb) => cb(info),
75                                Callback::Async(_) => unreachable!(),
76                            }
77
78                            latest_finish_time = Some(CallbackInfo::time_now());
79                        }
80                        Err(RecvTimeoutError::Timeout) => {
81                            continue;
82                        }
83                        Err(RecvTimeoutError::Disconnected) => {
84                            break;
85                        }
86                    }
87                }
88            });
89
90            callback_tx_map.insert(tp, tx);
91            callback_join_handle_map.insert(tp, join_handle);
92        }
93
94        //
95        //
96        //
97        if let Some(cb) = initialized_cb {
98            match &cb {
99                Callback::Sync(cb) => cb(CallbackInfo::new()),
100                Callback::Async(_) => unreachable!(),
101            }
102        }
103
104        //
105        //
106        //
107        loop {
108            match register_rx.recv_timeout(Duration::from_secs(1)) {
109                #[cfg(not(windows))]
110                Ok(RegisterType::ReloadConfig) => {
111                    if let Some(tx_callback) = callback_tx_map.get(&CallbackType::ReloadConfig) {
112                        #[allow(clippy::single_match)]
113                        match tx_callback.send(CallbackInfo::new()) {
114                            Ok(_) => {}
115                            Err(_) => {
116                                // Ignore, disconnected
117                            }
118                        }
119                    }
120                    continue;
121                }
122                Ok(RegisterType::WaitForStop) => {
123                    if let Some(cb) = wait_for_stop_cb {
124                        match &cb {
125                            Callback::Sync(cb) => cb(CallbackInfo::new()),
126                            Callback::Async(_) => unreachable!(),
127                        }
128                    }
129
130                    drop(register_rx);
131
132                    break;
133                }
134                #[cfg(not(windows))]
135                Ok(RegisterType::PrintStats) => {
136                    if let Some(tx_callback) = callback_tx_map.get(&CallbackType::PrintStats) {
137                        #[allow(clippy::single_match)]
138                        match tx_callback.send(CallbackInfo::new()) {
139                            Ok(_) => {}
140                            Err(_) => {
141                                // Ignore, disconnected
142                            }
143                        }
144                    }
145                    continue;
146                }
147                Err(RecvTimeoutError::Timeout) => {
148                    continue;
149                }
150                Err(RecvTimeoutError::Disconnected) => break,
151            }
152        }
153
154        //
155        //
156        //
157        for (_, tx) in callback_tx_map {
158            drop(tx);
159        }
160
161        for (_, join_handle) in callback_join_handle_map {
162            match join_handle.join() {
163                Ok(_) => {}
164                Err(err) => {
165                    panic::resume_unwind(err);
166                }
167            }
168        }
169
170        Ok(())
171    }
172}