signal_handler/handler/
impl_std.rs1use core::time::Duration;
2use std::{
3 collections::HashMap,
4 panic,
5 sync::mpsc::{channel, sync_channel, RecvTimeoutError},
6 thread::spawn,
7};
8
9use crate::{
10 callback::{Callback, CallbackInfo, CallbackType},
11 handler::{builder::Builder, HandleError, Handler},
12 register::RegisterType,
13};
14
15impl Handler {
17 pub fn handle(self) -> Result<(), HandleError> {
18 let Builder {
19 callbacks,
20 registers,
21 } = self.builder;
22
23 if callbacks.has_async() {
24 return Err(HandleError::AsyncRequired);
25 }
26
27 let (register_tx, register_rx) = sync_channel::<RegisterType>(6);
31
32 let _sig_id_map = registers
33 .register(register_tx)
34 .map_err(HandleError::RegisterFailed)?;
35
36 let mut initialized_cb = None;
40 let mut wait_for_stop_cb = None;
41
42 let mut callback_tx_map = HashMap::new();
43 let mut callback_join_handle_map = HashMap::new();
44
45 for (tp, cb) in callbacks.into_inner() {
46 match tp {
47 CallbackType::Initialized => {
48 initialized_cb = Some(cb);
49 continue;
50 }
51 CallbackType::ReloadConfig => {}
52 CallbackType::WaitForStop => {
53 wait_for_stop_cb = Some(cb);
54 continue;
55 }
56 CallbackType::PrintStats => {}
57 }
58
59 let (tx, rx) = channel::<CallbackInfo>();
60
61 let join_handle = spawn(move || {
62 let mut latest_finish_time = None;
63
64 loop {
65 match rx.recv_timeout(Duration::from_secs(1)) {
66 Ok(info) => {
67 if let Some(latest_finish_time) = latest_finish_time {
68 if latest_finish_time > *info.time() {
69 continue;
70 }
71 }
72
73 match &cb {
74 Callback::Sync(cb) => cb(info),
75 Callback::Async(_) => unreachable!(),
76 }
77
78 latest_finish_time = Some(CallbackInfo::time_now());
79 }
80 Err(RecvTimeoutError::Timeout) => {
81 continue;
82 }
83 Err(RecvTimeoutError::Disconnected) => {
84 break;
85 }
86 }
87 }
88 });
89
90 callback_tx_map.insert(tp, tx);
91 callback_join_handle_map.insert(tp, join_handle);
92 }
93
94 if let Some(cb) = initialized_cb {
98 match &cb {
99 Callback::Sync(cb) => cb(CallbackInfo::new()),
100 Callback::Async(_) => unreachable!(),
101 }
102 }
103
104 loop {
108 match register_rx.recv_timeout(Duration::from_secs(1)) {
109 #[cfg(not(windows))]
110 Ok(RegisterType::ReloadConfig) => {
111 if let Some(tx_callback) = callback_tx_map.get(&CallbackType::ReloadConfig) {
112 #[allow(clippy::single_match)]
113 match tx_callback.send(CallbackInfo::new()) {
114 Ok(_) => {}
115 Err(_) => {
116 }
118 }
119 }
120 continue;
121 }
122 Ok(RegisterType::WaitForStop) => {
123 if let Some(cb) = wait_for_stop_cb {
124 match &cb {
125 Callback::Sync(cb) => cb(CallbackInfo::new()),
126 Callback::Async(_) => unreachable!(),
127 }
128 }
129
130 drop(register_rx);
131
132 break;
133 }
134 #[cfg(not(windows))]
135 Ok(RegisterType::PrintStats) => {
136 if let Some(tx_callback) = callback_tx_map.get(&CallbackType::PrintStats) {
137 #[allow(clippy::single_match)]
138 match tx_callback.send(CallbackInfo::new()) {
139 Ok(_) => {}
140 Err(_) => {
141 }
143 }
144 }
145 continue;
146 }
147 Err(RecvTimeoutError::Timeout) => {
148 continue;
149 }
150 Err(RecvTimeoutError::Disconnected) => break,
151 }
152 }
153
154 for (_, tx) in callback_tx_map {
158 drop(tx);
159 }
160
161 for (_, join_handle) in callback_join_handle_map {
162 match join_handle.join() {
163 Ok(_) => {}
164 Err(err) => {
165 panic::resume_unwind(err);
166 }
167 }
168 }
169
170 Ok(())
171 }
172}