1use super::*;
2
3#[derive(Clone)]
4pub enum ReadinessTarget {
5 Tcp(SocketAddr),
6 UnixSocket(PathBuf),
7 ReadyFile(PathBuf),
8 HookSet {
9 rpc_addr: RpcAddr,
10 auth_key: [u8; 32],
11 required_hooks: Vec<(String, String)>,
12 },
13 ProcessAge(Duration),
14}
15
16#[derive(Clone)]
17pub struct ProcessReadiness {
18 pub role: Role,
19 pub target: ReadinessTarget,
20}
21
22impl ProcessReadiness {
23 pub fn name(&self) -> &'static str {
24 self.role.display_name()
25 }
26
27 pub(crate) fn probe(&self, state: &SharedState) -> (bool, &'static str, Option<String>) {
28 match &self.target {
29 ReadinessTarget::Tcp(addr) => {
30 match TcpStream::connect_timeout(addr, Duration::from_millis(150)) {
31 Ok(_) => (true, "ready", Some(format!("listening on {}", addr))),
32 Err(err) => (false, "waiting", Some(format!("waiting for {}", err))),
33 }
34 }
35 ReadinessTarget::UnixSocket(path) => match UnixStream::connect(path) {
36 Ok(_) => (
37 true,
38 "ready",
39 Some(format!("socket available at {}", path.display())),
40 ),
41 Err(err) => (
42 false,
43 "waiting",
44 Some(format!("waiting for socket {}: {}", path.display(), err)),
45 ),
46 },
47 ReadinessTarget::ReadyFile(path) => match probe_ready_file(path, self.name(), state) {
48 Ok(detail) => (true, "ready", Some(detail)),
49 Err(err) => (false, "waiting", Some(err)),
50 },
51 ReadinessTarget::HookSet {
52 rpc_addr,
53 auth_key,
54 required_hooks,
55 } => match probe_hook_set(rpc_addr, auth_key, required_hooks) {
56 Ok((true, detail)) => (true, "ready", Some(detail)),
57 Ok((false, detail)) => (false, "warming", Some(detail)),
58 Err(err) => (
59 false,
60 "waiting",
61 Some(format!("waiting for hook load: {}", err)),
62 ),
63 },
64 ReadinessTarget::ProcessAge(min_age) => {
65 let started_at = {
66 let s = read_shared_state(state);
67 s.processes
68 .get(self.name())
69 .and_then(|process| process.started_at)
70 };
71 match started_at {
72 Some(started_at) if started_at.elapsed() >= *min_age => (
73 true,
74 "ready",
75 Some("process has stayed up past startup window".into()),
76 ),
77 Some(started_at) => (
78 false,
79 "warming",
80 Some(format!(
81 "startup grace period {:.1}s remaining",
82 (min_age.as_secs_f64() - started_at.elapsed().as_secs_f64()).max(0.0)
83 )),
84 ),
85 None => (false, "stopped", Some("process is not running".into())),
86 }
87 }
88 }
89 }
90}
91
92pub(crate) fn probe_ready_file(
93 path: &PathBuf,
94 process_name: &str,
95 state: &SharedState,
96) -> Result<String, String> {
97 let contract = inspect_ready_file(path, process_name, state)?;
98
99 if contract.status != "ready" {
100 return Err(format!(
101 "readiness file {} reports status={}",
102 path.display(),
103 contract.status
104 ));
105 }
106
107 Ok(format!(
108 "{} (pid {}, file {})",
109 contract.detail,
110 contract.pid,
111 path.display()
112 ))
113}
114
115pub(crate) fn inspect_ready_file(
116 path: &PathBuf,
117 process_name: &str,
118 state: &SharedState,
119) -> Result<ReadyFileContract, String> {
120 let body = std::fs::read_to_string(path)
121 .map_err(|err| format!("waiting for readiness file {}: {}", path.display(), err))?;
122 let contract = ReadyFileContract::parse(&body)?;
123
124 if contract.process != process_name {
125 return Err(format!(
126 "readiness file {} belongs to {}",
127 path.display(),
128 contract.process
129 ));
130 }
131
132 let expected_pid = {
133 let s = read_shared_state(state);
134 s.processes
135 .get(process_name)
136 .and_then(|process| process.pid)
137 };
138 if let Some(expected_pid) = expected_pid {
139 if contract.pid != expected_pid {
140 return Err(format!(
141 "readiness file {} is stale for pid {} (expected {})",
142 path.display(),
143 contract.pid,
144 expected_pid
145 ));
146 }
147 }
148
149 Ok(contract)
150}
151
152pub(crate) struct ReadyFileContract {
153 pub(crate) status: String,
154 pub(crate) process: String,
155 pub(crate) pid: u32,
156 pub(crate) detail: String,
157}
158
159impl ReadyFileContract {
160 fn parse(body: &str) -> Result<Self, String> {
161 let mut status = None;
162 let mut process = None;
163 let mut pid = None;
164 let mut detail = None;
165
166 for line in body.lines() {
167 let Some((key, value)) = line.split_once('=') else {
168 continue;
169 };
170 let value = unescape_ready_value(value);
171 match key {
172 "status" => status = Some(value),
173 "process" => process = Some(value),
174 "pid" => {
175 pid = Some(
176 value
177 .parse::<u32>()
178 .map_err(|err| format!("invalid readiness pid '{}': {}", value, err))?,
179 )
180 }
181 "detail" => detail = Some(value),
182 _ => {}
183 }
184 }
185
186 Ok(Self {
187 status: status.ok_or_else(|| "readiness file missing status".to_string())?,
188 process: process.ok_or_else(|| "readiness file missing process".to_string())?,
189 pid: pid.ok_or_else(|| "readiness file missing pid".to_string())?,
190 detail: detail.unwrap_or_else(|| "ready".into()),
191 })
192 }
193}
194
195fn unescape_ready_value(value: &str) -> String {
196 let mut out = String::new();
197 let mut chars = value.chars();
198 while let Some(ch) = chars.next() {
199 if ch == '\\' {
200 match chars.next() {
201 Some('n') => out.push('\n'),
202 Some('\\') => out.push('\\'),
203 Some(other) => {
204 out.push('\\');
205 out.push(other);
206 }
207 None => out.push('\\'),
208 }
209 } else {
210 out.push(ch);
211 }
212 }
213 out
214}
215
216fn probe_hook_set(
217 rpc_addr: &RpcAddr,
218 auth_key: &[u8; 32],
219 required_hooks: &[(String, String)],
220) -> Result<(bool, String), String> {
221 let mut client = RpcClient::connect(rpc_addr, auth_key)
222 .map_err(|err| format!("rpc connect failed: {}", err))?;
223 let hooks = client
224 .list_hooks()
225 .map_err(|err| format!("list_hooks failed: {}", err))?;
226
227 let missing = missing_required_hooks(&hooks, required_hooks);
228
229 if missing.is_empty() {
230 Ok((
231 true,
232 format!("all {} required hooks loaded", required_hooks.len()),
233 ))
234 } else {
235 Ok((false, format!("missing hooks: {}", missing.join(", "))))
236 }
237}
238
239pub(crate) fn missing_required_hooks(
240 hooks: &[HookInfo],
241 required_hooks: &[(String, String)],
242) -> Vec<String> {
243 required_hooks
244 .iter()
245 .filter(|(name, attach_point)| {
246 !hooks.iter().any(|hook| {
247 hook.name == *name && hook.attach_point == *attach_point && hook.enabled
248 })
249 })
250 .map(|(name, attach_point)| format!("{name}@{attach_point}"))
251 .collect()
252}
253
254pub(crate) fn observe_sidecar_draining(
255 managed: &ManagedChild,
256 shared_state: Option<&SharedState>,
257 ready_file: Option<&PathBuf>,
258) -> bool {
259 let Some(state) = shared_state else {
260 return false;
261 };
262 let Some(path) = ready_file else {
263 return false;
264 };
265 let Ok(contract) = inspect_ready_file(path, managed.role.display_name(), state) else {
266 return false;
267 };
268 if contract.status != "draining" {
269 return false;
270 }
271
272 set_process_readiness(
273 state,
274 managed.role.display_name(),
275 false,
276 "draining",
277 Some(format!(
278 "{} (pid {}, file {})",
279 contract.detail,
280 contract.pid,
281 path.display()
282 )),
283 );
284 true
285}
286
287pub(crate) fn ready_file_path_for_role(
288 role: Role,
289 readiness: &[ProcessReadiness],
290) -> Option<PathBuf> {
291 readiness.iter().find_map(|probe| {
292 if probe.role != role {
293 return None;
294 }
295 match &probe.target {
296 ReadinessTarget::ReadyFile(path) => Some(path.clone()),
297 _ => None,
298 }
299 })
300}