1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::{collections::HashSet, process};
use crate::error::Result;
use log::{debug, info, warn};
use nix::{
sys::{
signal::{kill, Signal},
wait::{waitpid, WaitPidFlag, WaitStatus},
},
unistd::{fork, ForkResult, Pid},
};
use signal_hook::{
consts::{SIGCHLD, SIGINT, SIGQUIT, SIGTERM},
iterator::Signals,
};
pub struct Server<T, FChild>
where
FChild: Fn(u32, T),
{
num_processes: u32,
resource: Option<T>,
child_init: FChild,
}
impl<T, FChild> Server<T, FChild>
where
FChild: Fn(u32, T),
{
pub fn from_resource(resource: T, child_init: FChild, num_processes: u32) -> Self {
Self {
num_processes,
resource: Some(resource),
child_init,
}
}
pub fn fork(mut self) -> Result<bool> {
let mut pids = HashSet::new();
let mut is_parent = true;
for child_num in 0..self.num_processes {
let pid = self.fork_child(child_num)?;
if let Some(pid) = pid {
pids.insert(pid);
} else {
is_parent = false;
break;
}
}
if is_parent && !pids.is_empty() {
self.run_parent(pids)?;
}
Ok(is_parent)
}
fn fork_child(&mut self, child_num: u32) -> Result<Option<Pid>> {
match unsafe { fork()? } {
ForkResult::Parent { child } => Ok(Some(child)),
ForkResult::Child => {
if let Some(resource) = self.resource.take() {
(self.child_init)(child_num, resource);
} else {
unreachable!("fork resource is empty");
}
Ok(None)
}
}
}
fn run_parent(&self, mut pids: HashSet<Pid>) -> Result<()> {
let parent_pid = process::id();
debug!("Handling signals in parent {parent_pid}");
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT, SIGCHLD])?;
for signal in signals.forever() {
match signal {
SIGCHLD => {
wait_process(&mut pids);
if pids.is_empty() {
break;
}
}
_ => break,
}
}
for child in pids {
if let Err(err) = kill(child, Signal::SIGKILL) {
warn!("Kill failed: {err}");
} else {
debug!("Reaped {child}")
}
}
Ok(())
}
}
fn wait_process(pids: &mut HashSet<Pid>) {
while let Ok(status) = waitpid(None, Some(WaitPidFlag::WNOHANG)) {
match status {
WaitStatus::Exited(pid, exit_code) => {
info!("Child {pid} exited with status {exit_code}");
pids.remove(&pid);
}
WaitStatus::Signaled(pid, signal, ..) => {
info!("Child {pid} killed by signal {signal}");
pids.remove(&pid);
}
WaitStatus::StillAlive => break,
_ => {
info!("waitpid got status {status:?}")
}
}
}
}