1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::{collections::HashSet, process};

use crate::error::Result;
use log::{debug, info, warn};
use nix::{
    sys::{
        signal::{kill, Signal},
        wait::{waitpid, WaitPidFlag, WaitStatus},
    },
    unistd::{fork, ForkResult, Pid},
};
use signal_hook::{
    consts::{SIGCHLD, SIGINT, SIGQUIT, SIGTERM},
    iterator::Signals,
};

pub struct Server<T, FChild>
where
    FChild: Fn(u32, T),
{
    num_processes: u32,
    resource: Option<T>,
    child_init: FChild,
}

impl<T, FChild> Server<T, FChild>
where
    FChild: Fn(u32, T),
{
    pub fn from_resource(resource: T, child_init: FChild, num_processes: u32) -> Self {
        Self {
            num_processes,
            resource: Some(resource),
            child_init,
        }
    }

    pub fn fork(mut self) -> Result<bool> {
        let mut pids = HashSet::new();
        let mut is_parent = true;
        for child_num in 0..self.num_processes {
            let pid = self.fork_child(child_num)?;
            if let Some(pid) = pid {
                pids.insert(pid);
            } else {
                is_parent = false;
                break;
            }
        }
        if is_parent && !pids.is_empty() {
            self.run_parent(pids)?;
        }
        Ok(is_parent)
    }

    fn fork_child(&mut self, child_num: u32) -> Result<Option<Pid>> {
        match unsafe { fork()? } {
            ForkResult::Parent { child } => Ok(Some(child)),
            ForkResult::Child => {
                if let Some(resource) = self.resource.take() {
                    (self.child_init)(child_num, resource);
                } else {
                    unreachable!("fork resource is empty");
                }
                Ok(None)
            }
        }
    }

    fn run_parent(&self, mut pids: HashSet<Pid>) -> Result<()> {
        let parent_pid = process::id();
        debug!("Handling signals in parent {parent_pid}");
        let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT, SIGCHLD])?;
        for signal in signals.forever() {
            match signal {
                SIGCHLD => {
                    wait_process(&mut pids);
                    if pids.is_empty() {
                        break;
                    }
                }
                _ => break,
            }
        }
        for child in pids {
            if let Err(err) = kill(child, Signal::SIGKILL) {
                warn!("Kill failed: {err}");
            } else {
                debug!("Reaped {child}")
            }
        }
        Ok(())
    }
}

fn wait_process(pids: &mut HashSet<Pid>) {
    while let Ok(status) = waitpid(None, Some(WaitPidFlag::WNOHANG)) {
        match status {
            WaitStatus::Exited(pid, exit_code) => {
                info!("Child {pid} exited with status {exit_code}");
                pids.remove(&pid);
            }
            WaitStatus::Signaled(pid, signal, ..) => {
                info!("Child {pid} killed by signal {signal}");
                pids.remove(&pid);
            }
            WaitStatus::StillAlive => break,
            _ => {
                info!("waitpid got status {status:?}")
            }
        }
    }
}