1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use signal_hook::{consts::signal::*, low_level::emulate_default_handler};
use signal_hook_tokio::{Handle, Signals};
use std::{
    env,
    net::{IpAddr, SocketAddr},
    path::Path,
};
use tokio::{fs, process::Command, time, time::Duration};
use tokio_stream::StreamExt;

use crate::Result;

#[derive(Debug, Clone)]
pub struct ServerWrapper {
    addr:   SocketAddr,
    handle: Handle,
}

// TODO: Add a Builder to create the server wrapper
impl ServerWrapper {
    pub async fn new<P1, P2>(bin_path: Option<P1>, cfg_path: Option<P2>, port: Option<u16>) -> Result<Self>
    where
        P1: AsRef<Path>,
        P2: AsRef<Path>,
    {
        let bin_path = bin_path.map(|p| p.as_ref().to_owned());
        let cfg_path = cfg_path.map(|p| p.as_ref().to_owned());
        let port = port.unwrap_or(0);

        let mut signals = Signals::new(&[SIGHUP, SIGTERM, SIGINT, SIGQUIT])?;
        let handle = signals.handle();

        let mut oomagent = Command::new(bin_path.clone().unwrap_or_else(|| "oomagent".into()));
        oomagent.arg("--port").arg(port.to_string());
        if let Some(cfg_path) = cfg_path.clone() {
            oomagent.arg("--config").arg(cfg_path);
        }
        oomagent.kill_on_drop(true);

        let mut child = oomagent.spawn()?;
        let pid = child.id();

        tokio::spawn({
            async move {
                while let Some(signal) = signals.next().await {
                    child.kill().await.unwrap();
                    emulate_default_handler(signal).unwrap();
                }
            }
        });

        let addr = get_agent_address(pid.unwrap()).await?;
        Ok(Self { handle, addr })
    }

    pub async fn default() -> Result<Self> {
        Self::new(None::<String>, None::<String>, None).await
    }

    pub fn ip(&self) -> IpAddr {
        self.addr.ip()
    }

    pub fn port(&self) -> u16 {
        self.addr.port()
    }

    pub fn address(&self) -> SocketAddr {
        self.addr
    }
}

impl Drop for ServerWrapper {
    fn drop(&mut self) {
        self.handle.close();
    }
}

async fn get_agent_address(pid: u32) -> Result<SocketAddr> {
    let mut path = env::temp_dir();
    path.push("oomagent");
    path.push(pid.to_string());
    path.push("address");
    let time = time::Instant::now();

    loop {
        let result = fs::read_to_string(&path)
            .await
            .map_err(|e| e.into())
            .and_then(|addr| Ok(addr.parse()?));
        if result.is_ok() || time.elapsed() > Duration::from_millis(3000) {
            return result;
        }
        time::sleep(Duration::from_millis(200)).await;
    }
}