Skip to main content

rns_server/supervisor/
readiness.rs

1use super::*;
2
3#[derive(Clone)]
4pub enum ReadinessTarget {
5    Tcp(SocketAddr),
6    UnixSocket(PathBuf),
7    ReadyFile(PathBuf),
8    HookSet {
9        rpc_addr: RpcAddr,
10        auth_key: [u8; 32],
11        required_hooks: Vec<(String, String)>,
12    },
13    ProcessAge(Duration),
14}
15
16#[derive(Clone)]
17pub struct ProcessReadiness {
18    pub role: Role,
19    pub target: ReadinessTarget,
20}
21
22impl ProcessReadiness {
23    pub fn name(&self) -> &'static str {
24        self.role.display_name()
25    }
26
27    pub(crate) fn probe(&self, state: &SharedState) -> (bool, &'static str, Option<String>) {
28        match &self.target {
29            ReadinessTarget::Tcp(addr) => {
30                match TcpStream::connect_timeout(addr, Duration::from_millis(150)) {
31                    Ok(_) => (true, "ready", Some(format!("listening on {}", addr))),
32                    Err(err) => (false, "waiting", Some(format!("waiting for {}", err))),
33                }
34            }
35            ReadinessTarget::UnixSocket(path) => match UnixStream::connect(path) {
36                Ok(_) => (
37                    true,
38                    "ready",
39                    Some(format!("socket available at {}", path.display())),
40                ),
41                Err(err) => (
42                    false,
43                    "waiting",
44                    Some(format!("waiting for socket {}: {}", path.display(), err)),
45                ),
46            },
47            ReadinessTarget::ReadyFile(path) => match probe_ready_file(path, self.name(), state) {
48                Ok(detail) => (true, "ready", Some(detail)),
49                Err(err) => (false, "waiting", Some(err)),
50            },
51            ReadinessTarget::HookSet {
52                rpc_addr,
53                auth_key,
54                required_hooks,
55            } => match probe_hook_set(rpc_addr, auth_key, required_hooks) {
56                Ok((true, detail)) => (true, "ready", Some(detail)),
57                Ok((false, detail)) => (false, "warming", Some(detail)),
58                Err(err) => (
59                    false,
60                    "waiting",
61                    Some(format!("waiting for hook load: {}", err)),
62                ),
63            },
64            ReadinessTarget::ProcessAge(min_age) => {
65                let started_at = {
66                    let s = read_shared_state(state);
67                    s.processes
68                        .get(self.name())
69                        .and_then(|process| process.started_at)
70                };
71                match started_at {
72                    Some(started_at) if started_at.elapsed() >= *min_age => (
73                        true,
74                        "ready",
75                        Some("process has stayed up past startup window".into()),
76                    ),
77                    Some(started_at) => (
78                        false,
79                        "warming",
80                        Some(format!(
81                            "startup grace period {:.1}s remaining",
82                            (min_age.as_secs_f64() - started_at.elapsed().as_secs_f64()).max(0.0)
83                        )),
84                    ),
85                    None => (false, "stopped", Some("process is not running".into())),
86                }
87            }
88        }
89    }
90}
91
92pub(crate) fn probe_ready_file(
93    path: &PathBuf,
94    process_name: &str,
95    state: &SharedState,
96) -> Result<String, String> {
97    let contract = inspect_ready_file(path, process_name, state)?;
98
99    if contract.status != "ready" {
100        return Err(format!(
101            "readiness file {} reports status={}",
102            path.display(),
103            contract.status
104        ));
105    }
106
107    Ok(format!(
108        "{} (pid {}, file {})",
109        contract.detail,
110        contract.pid,
111        path.display()
112    ))
113}
114
115pub(crate) fn inspect_ready_file(
116    path: &PathBuf,
117    process_name: &str,
118    state: &SharedState,
119) -> Result<ReadyFileContract, String> {
120    let body = std::fs::read_to_string(path)
121        .map_err(|err| format!("waiting for readiness file {}: {}", path.display(), err))?;
122    let contract = ReadyFileContract::parse(&body)?;
123
124    if contract.process != process_name {
125        return Err(format!(
126            "readiness file {} belongs to {}",
127            path.display(),
128            contract.process
129        ));
130    }
131
132    let expected_pid = {
133        let s = read_shared_state(state);
134        s.processes
135            .get(process_name)
136            .and_then(|process| process.pid)
137    };
138    if let Some(expected_pid) = expected_pid {
139        if contract.pid != expected_pid {
140            return Err(format!(
141                "readiness file {} is stale for pid {} (expected {})",
142                path.display(),
143                contract.pid,
144                expected_pid
145            ));
146        }
147    }
148
149    Ok(contract)
150}
151
152pub(crate) struct ReadyFileContract {
153    pub(crate) status: String,
154    pub(crate) process: String,
155    pub(crate) pid: u32,
156    pub(crate) detail: String,
157}
158
159impl ReadyFileContract {
160    fn parse(body: &str) -> Result<Self, String> {
161        let mut status = None;
162        let mut process = None;
163        let mut pid = None;
164        let mut detail = None;
165
166        for line in body.lines() {
167            let Some((key, value)) = line.split_once('=') else {
168                continue;
169            };
170            let value = unescape_ready_value(value);
171            match key {
172                "status" => status = Some(value),
173                "process" => process = Some(value),
174                "pid" => {
175                    pid = Some(
176                        value
177                            .parse::<u32>()
178                            .map_err(|err| format!("invalid readiness pid '{}': {}", value, err))?,
179                    )
180                }
181                "detail" => detail = Some(value),
182                _ => {}
183            }
184        }
185
186        Ok(Self {
187            status: status.ok_or_else(|| "readiness file missing status".to_string())?,
188            process: process.ok_or_else(|| "readiness file missing process".to_string())?,
189            pid: pid.ok_or_else(|| "readiness file missing pid".to_string())?,
190            detail: detail.unwrap_or_else(|| "ready".into()),
191        })
192    }
193}
194
195fn unescape_ready_value(value: &str) -> String {
196    let mut out = String::new();
197    let mut chars = value.chars();
198    while let Some(ch) = chars.next() {
199        if ch == '\\' {
200            match chars.next() {
201                Some('n') => out.push('\n'),
202                Some('\\') => out.push('\\'),
203                Some(other) => {
204                    out.push('\\');
205                    out.push(other);
206                }
207                None => out.push('\\'),
208            }
209        } else {
210            out.push(ch);
211        }
212    }
213    out
214}
215
216fn probe_hook_set(
217    rpc_addr: &RpcAddr,
218    auth_key: &[u8; 32],
219    required_hooks: &[(String, String)],
220) -> Result<(bool, String), String> {
221    let mut client = RpcClient::connect(rpc_addr, auth_key)
222        .map_err(|err| format!("rpc connect failed: {}", err))?;
223    let hooks = client
224        .list_hooks()
225        .map_err(|err| format!("list_hooks failed: {}", err))?;
226
227    let missing = missing_required_hooks(&hooks, required_hooks);
228
229    if missing.is_empty() {
230        Ok((
231            true,
232            format!("all {} required hooks loaded", required_hooks.len()),
233        ))
234    } else {
235        Ok((false, format!("missing hooks: {}", missing.join(", "))))
236    }
237}
238
239pub(crate) fn missing_required_hooks(
240    hooks: &[HookInfo],
241    required_hooks: &[(String, String)],
242) -> Vec<String> {
243    required_hooks
244        .iter()
245        .filter(|(name, attach_point)| {
246            !hooks.iter().any(|hook| {
247                hook.name == *name && hook.attach_point == *attach_point && hook.enabled
248            })
249        })
250        .map(|(name, attach_point)| format!("{name}@{attach_point}"))
251        .collect()
252}
253
254pub(crate) fn observe_sidecar_draining(
255    managed: &ManagedChild,
256    shared_state: Option<&SharedState>,
257    ready_file: Option<&PathBuf>,
258) -> bool {
259    let Some(state) = shared_state else {
260        return false;
261    };
262    let Some(path) = ready_file else {
263        return false;
264    };
265    let Ok(contract) = inspect_ready_file(path, managed.role.display_name(), state) else {
266        return false;
267    };
268    if contract.status != "draining" {
269        return false;
270    }
271
272    set_process_readiness(
273        state,
274        managed.role.display_name(),
275        false,
276        "draining",
277        Some(format!(
278            "{} (pid {}, file {})",
279            contract.detail,
280            contract.pid,
281            path.display()
282        )),
283    );
284    true
285}
286
287pub(crate) fn ready_file_path_for_role(
288    role: Role,
289    readiness: &[ProcessReadiness],
290) -> Option<PathBuf> {
291    readiness.iter().find_map(|probe| {
292        if probe.role != role {
293            return None;
294        }
295        match &probe.target {
296            ReadinessTarget::ReadyFile(path) => Some(path.clone()),
297            _ => None,
298        }
299    })
300}