1use std::collections::HashMap;
2use std::io::{self, BufRead, BufReader};
3use std::net::{SocketAddr, TcpStream};
4use std::os::unix::net::UnixStream;
5use std::os::unix::process::CommandExt;
6use std::path::PathBuf;
7use std::process::{Child, Command, ExitStatus, Stdio};
8use std::sync::mpsc;
9use std::thread;
10use std::time::Duration;
11
12use crate::logs::LogStore;
13use crate::self_exec::{resolve_self_exec, self_exec_display};
14use rns_ctl::state::{
15 bump_process_restart_count, mark_process_failed_spawn, mark_process_running,
16 mark_process_stopped, push_process_log, record_process_termination_observation,
17 set_process_log_path, set_process_readiness, ProcessControlCommand, SharedState,
18};
19use rns_net::{event::DrainStatus, HookInfo, RpcAddr, RpcClient};
20
21mod drain;
22mod process;
23mod readiness;
24
25use self::process::{
26 check_exits, exit_code, role_from_name, spawn_child, terminate_child, terminate_children,
27 ManagedChild,
28};
29use self::readiness::ready_file_path_for_role;
30pub use self::readiness::{ProcessReadiness, ReadinessTarget};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Role {
34 Rnsd,
35 Sentineld,
36 Statsd,
37}
38
39impl Role {
40 pub fn display_name(self) -> &'static str {
41 match self {
42 Role::Rnsd => "rnsd",
43 Role::Sentineld => "rns-sentineld",
44 Role::Statsd => "rns-statsd",
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub struct ProcessSpec {
51 pub role: Role,
52 pub command: ProcessCommand,
53 pub args: Vec<String>,
54}
55
56impl ProcessSpec {
57 pub fn command_line(&self) -> String {
58 let mut parts = vec![self.command.display(self.role)];
59 parts.extend(self.args.iter().cloned());
60 parts.join(" ")
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum ProcessCommand {
66 External(PathBuf),
67 SelfInvoke,
68}
69
70impl ProcessCommand {
71 pub fn display(&self, role: Role) -> String {
72 match self {
73 ProcessCommand::External(path) => path.display().to_string(),
74 ProcessCommand::SelfInvoke => {
75 format!(
76 "{} --internal-role {}",
77 self_exec_display(),
78 role.display_name()
79 )
80 }
81 }
82 }
83}
84
85pub struct SupervisorConfig {
86 pub specs: Vec<ProcessSpec>,
87 pub shared_state: Option<SharedState>,
88 pub control_rx: Option<mpsc::Receiver<ProcessControlCommand>>,
89 pub readiness: Vec<ProcessReadiness>,
90 pub log_dir: Option<PathBuf>,
91 pub rnsd_drain: Option<RnsdDrainConfig>,
92}
93
94pub struct Supervisor {
95 specs: Vec<ProcessSpec>,
96 shared_state: Option<SharedState>,
97 control_rx: Option<mpsc::Receiver<ProcessControlCommand>>,
98 readiness: Vec<ProcessReadiness>,
99 log_store: Option<LogStore>,
100 rnsd_drain: Option<RnsdDrainConfig>,
101}
102
103fn read_shared_state<'a>(
104 state: &'a SharedState,
105) -> std::sync::RwLockReadGuard<'a, rns_ctl::state::CtlState> {
106 match state.read() {
107 Ok(guard) => guard,
108 Err(poisoned) => {
109 log::error!("recovering from poisoned supervisor shared state read lock");
110 poisoned.into_inner()
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct RnsdDrainConfig {
117 pub rpc_addr: RpcAddr,
118 pub auth_key: [u8; 32],
119 pub timeout: Duration,
120 pub poll_interval: Duration,
121}
122
123impl Supervisor {
124 pub fn new(config: SupervisorConfig) -> Self {
125 Self {
126 specs: config.specs,
127 shared_state: config.shared_state,
128 control_rx: config.control_rx,
129 readiness: config.readiness,
130 log_store: config.log_dir.map(LogStore::new),
131 rnsd_drain: config.rnsd_drain,
132 }
133 }
134
135 pub fn specs(&self) -> &[ProcessSpec] {
136 &self.specs
137 }
138
139 pub fn run(&self) -> Result<i32, String> {
140 self.run_with_started_hook(|| Ok(()))
141 }
142
143 pub fn run_with_started_hook<F>(&self, on_started: F) -> Result<i32, String>
144 where
145 F: FnOnce() -> Result<(), String>,
146 {
147 let mut children = self
148 .specs
149 .iter()
150 .map(|spec| spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone()))
151 .collect::<Result<Vec<_>, _>>()?;
152 let mut unexpected_restart_counts = HashMap::new();
153
154 on_started()?;
155
156 let stop_rx = install_signal_handlers();
157
158 loop {
159 if stop_rx.try_recv().is_ok() {
160 log::info!("shutdown requested");
161 self.drain_rnsd(&children, "supervisor shutdown");
162 terminate_children(&mut children, self.shared_state.as_ref(), &self.readiness);
163 return Ok(0);
164 }
165
166 if let Some(command) = self.next_control_command() {
167 self.handle_control_command(command, &mut children)?;
168 }
169
170 self.refresh_readiness();
171
172 if let Some((role, status)) = check_exits(&mut children)? {
173 log::warn!("{} exited with status {}", role.display_name(), status);
174 if self.restart_unexpected_exit(
175 role,
176 status,
177 &mut children,
178 &mut unexpected_restart_counts,
179 )? {
180 continue;
181 }
182 if let Some(state) = self.shared_state.as_ref() {
183 mark_process_stopped(state, role.display_name(), status.code());
184 }
185 terminate_children(&mut children, self.shared_state.as_ref(), &self.readiness);
186 return Ok(exit_code(status));
187 }
188
189 std::thread::sleep(Duration::from_millis(200));
190 }
191 }
192}
193
194impl Supervisor {
195 fn restart_unexpected_exit(
196 &self,
197 role: Role,
198 status: ExitStatus,
199 children: &mut [ManagedChild],
200 unexpected_restart_counts: &mut HashMap<Role, usize>,
201 ) -> Result<bool, String> {
202 const MAX_UNEXPECTED_RESTARTS: usize = 3;
203 const UNEXPECTED_RESTART_BACKOFF: Duration = Duration::from_millis(200);
204
205 let Some(index) = children.iter().position(|child| child.role == role) else {
206 return Ok(false);
207 };
208 let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
209 return Ok(false);
210 };
211
212 let attempts = unexpected_restart_counts.entry(role).or_insert(0);
213 if *attempts >= MAX_UNEXPECTED_RESTARTS {
214 return Ok(false);
215 }
216 *attempts += 1;
217
218 if let Some(state) = self.shared_state.as_ref() {
219 mark_process_stopped(state, role.display_name(), status.code());
220 bump_process_restart_count(state, role.display_name());
221 push_process_log(
222 state,
223 role.display_name(),
224 "supervisor",
225 format!(
226 "unexpected exit with status {}; restarting ({}/{})",
227 exit_code(status),
228 *attempts,
229 MAX_UNEXPECTED_RESTARTS
230 ),
231 );
232 }
233
234 thread::sleep(UNEXPECTED_RESTART_BACKOFF);
235 children[index] = spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone())?;
236 Ok(true)
237 }
238
239 fn next_control_command(&self) -> Option<ProcessControlCommand> {
240 self.control_rx.as_ref().and_then(|rx| rx.try_recv().ok())
241 }
242
243 fn handle_control_command(
244 &self,
245 command: ProcessControlCommand,
246 children: &mut Vec<ManagedChild>,
247 ) -> Result<(), String> {
248 match command {
249 ProcessControlCommand::Restart(name) => self.restart_process(&name, children),
250 ProcessControlCommand::Start(name) => self.start_process(&name, children),
251 ProcessControlCommand::Stop(name) => self.stop_process(&name, children),
252 }
253 }
254
255 fn restart_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
256 let Some(role) = role_from_name(name) else {
257 return Err(format!("unknown process '{}'", name));
258 };
259 let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
260 return Err(format!("missing process spec for '{}'", name));
261 };
262
263 if let Some(index) = children.iter().position(|child| child.role == role) {
264 self.drain_role(role, children, "process restart");
265 let ready_file = ready_file_path_for_role(role, &self.readiness);
266 terminate_child(
267 &mut children[index],
268 self.shared_state.as_ref(),
269 ready_file.as_ref(),
270 )
271 .map_err(|e| {
272 format!(
273 "failed to terminate {} during restart: {}",
274 role.display_name(),
275 e
276 )
277 })?;
278 if let Some(state) = self.shared_state.as_ref() {
279 mark_process_stopped(state, role.display_name(), None);
280 bump_process_restart_count(state, role.display_name());
281 }
282 children[index] =
283 spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone())?;
284 }
285
286 Ok(())
287 }
288
289 fn start_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
290 let Some(role) = role_from_name(name) else {
291 return Err(format!("unknown process '{}'", name));
292 };
293 if children.iter().any(|child| child.role == role) {
294 return Ok(());
295 }
296 let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
297 return Err(format!("missing process spec for '{}'", name));
298 };
299 children.push(spawn_child(
300 spec,
301 self.shared_state.as_ref(),
302 self.log_store.clone(),
303 )?);
304 Ok(())
305 }
306
307 fn stop_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
308 let Some(role) = role_from_name(name) else {
309 return Err(format!("unknown process '{}'", name));
310 };
311 let Some(index) = children.iter().position(|child| child.role == role) else {
312 return Ok(());
313 };
314 self.drain_role(role, children, "process stop");
315 let ready_file = ready_file_path_for_role(role, &self.readiness);
316 terminate_child(
317 &mut children[index],
318 self.shared_state.as_ref(),
319 ready_file.as_ref(),
320 )
321 .map_err(|e| {
322 format!(
323 "failed to terminate {} during stop: {}",
324 role.display_name(),
325 e
326 )
327 })?;
328 if let Some(state) = self.shared_state.as_ref() {
329 let code = children[index]
330 .child
331 .try_wait()
332 .ok()
333 .flatten()
334 .and_then(|status| status.code());
335 mark_process_stopped(state, role.display_name(), code);
336 }
337 children.remove(index);
338 Ok(())
339 }
340
341 fn refresh_readiness(&self) {
342 let Some(state) = self.shared_state.as_ref() else {
343 return;
344 };
345
346 for readiness in &self.readiness {
347 let (ready, ready_state, detail) = readiness.probe(state);
348 set_process_readiness(state, readiness.name(), ready, ready_state, detail);
349 }
350 }
351}
352
353static STOP_TX: std::sync::Mutex<Option<mpsc::Sender<()>>> = std::sync::Mutex::new(None);
354
355extern "C" fn signal_handler(_sig: libc::c_int) {
356 if let Ok(guard) = STOP_TX.lock() {
357 if let Some(ref tx) = *guard {
358 let _ = tx.send(());
359 }
360 }
361}
362
363fn install_signal_handlers() -> mpsc::Receiver<()> {
364 let (stop_tx, stop_rx) = mpsc::channel();
365 match STOP_TX.lock() {
366 Ok(mut guard) => {
367 guard.replace(stop_tx);
368 }
369 Err(poisoned) => {
370 log::error!("recovering from poisoned supervisor stop-channel lock");
371 poisoned.into_inner().replace(stop_tx);
372 }
373 }
374 #[cfg(unix)]
375 unsafe {
376 libc::signal(libc::SIGINT, signal_handler as *const () as usize);
377 libc::signal(libc::SIGTERM, signal_handler as *const () as usize);
378 }
379 stop_rx
380}
381
382#[cfg(test)]
383mod tests {
384 use super::drain::{
385 drain_complete_for_shutdown, format_drain_status_detail, log_rnsd_drain_progress,
386 reflect_rnsd_drain_status, request_rnsd_drain, wait_for_rnsd_drain,
387 };
388 use super::process::{command_for_spec, role_from_name, shutdown_priority};
389 use super::readiness::{
390 inspect_ready_file, missing_required_hooks, observe_sidecar_draining, probe_ready_file,
391 ready_file_path_for_role,
392 };
393 use super::{
394 ProcessCommand, ProcessReadiness, ProcessSpec, ReadinessTarget, RnsdDrainConfig, Role,
395 Supervisor, SupervisorConfig, STOP_TX,
396 };
397 use rns_ctl::state::{ensure_process, mark_process_running, CtlState, SharedState};
398 use rns_net::{
399 event::EventSender,
400 event::{DrainStatus, Event, LifecycleState, QueryRequest, QueryResponse},
401 HookInfo, RpcAddr, RpcServer,
402 };
403 use std::net::TcpListener;
404 use std::os::unix::fs::PermissionsExt;
405 use std::path::PathBuf;
406 use std::process::Command;
407 use std::sync::mpsc;
408 use std::sync::{Arc, RwLock};
409 use std::time::{Duration, SystemTime, UNIX_EPOCH};
410
411 #[test]
412 fn supervisor_holds_expected_specs() {
413 let specs = vec![
414 ProcessSpec {
415 role: Role::Rnsd,
416 command: ProcessCommand::External(PathBuf::from("rnsd")),
417 args: vec!["--config".into(), "/tmp/rns".into()],
418 },
419 ProcessSpec {
420 role: Role::Sentineld,
421 command: ProcessCommand::External(PathBuf::from("rns-sentineld")),
422 args: vec!["--config".into(), "/tmp/rns".into()],
423 },
424 ProcessSpec {
425 role: Role::Statsd,
426 command: ProcessCommand::External(PathBuf::from("rns-statsd")),
427 args: vec![
428 "--config".into(),
429 "/tmp/rns".into(),
430 "--db".into(),
431 "/tmp/rns/stats.db".into(),
432 ],
433 },
434 ];
435
436 let supervisor = SupervisorConfig {
437 specs,
438 shared_state: None,
439 control_rx: None,
440 readiness: Vec::new(),
441 log_dir: None,
442 rnsd_drain: None,
443 };
444
445 assert_eq!(supervisor.specs.len(), 3);
446 assert_eq!(supervisor.specs[0].role, Role::Rnsd);
447 assert_eq!(supervisor.specs[1].role, Role::Sentineld);
448 assert_eq!(supervisor.specs[2].role, Role::Statsd);
449 assert!(supervisor.specs[2].args.iter().any(|arg| arg == "--db"));
450 }
451
452 #[test]
453 fn process_spec_command_line() {
454 let spec = ProcessSpec {
455 role: Role::Rnsd,
456 command: ProcessCommand::External(PathBuf::from("rnsd")),
457 args: vec!["--config".into(), "/data".into()],
458 };
459 assert_eq!(spec.command_line(), "rnsd --config /data");
460 }
461
462 #[test]
463 fn self_invoke_command_line_uses_internal_role() {
464 let spec = ProcessSpec {
465 role: Role::Statsd,
466 command: ProcessCommand::SelfInvoke,
467 args: vec!["--config".into(), "/data".into()],
468 };
469 assert_eq!(
470 spec.command_line(),
471 "/proc/self/exe --internal-role rns-statsd --config /data"
472 );
473 }
474
475 #[test]
476 fn self_invoke_command_builder_includes_internal_role_args() {
477 let spec = ProcessSpec {
478 role: Role::Sentineld,
479 command: ProcessCommand::SelfInvoke,
480 args: vec!["--config".into(), "/data".into()],
481 };
482 let command = command_for_spec(&spec).unwrap();
483 let program = command.get_program().to_string_lossy().to_string();
484 let args: Vec<String> = command
485 .get_args()
486 .map(|arg| arg.to_string_lossy().to_string())
487 .collect();
488 assert!(program == "/proc/self/exe" || !program.is_empty());
489 assert_eq!(
490 args,
491 vec![
492 "--internal-role".to_string(),
493 "rns-sentineld".to_string(),
494 "--config".to_string(),
495 "/data".to_string()
496 ]
497 );
498 }
499
500 #[test]
501 fn role_from_name_maps_known_processes() {
502 assert_eq!(role_from_name("rnsd"), Some(Role::Rnsd));
503 assert_eq!(role_from_name("rns-sentineld"), Some(Role::Sentineld));
504 assert_eq!(role_from_name("rns-statsd"), Some(Role::Statsd));
505 assert_eq!(role_from_name("unknown"), None);
506 }
507
508 #[test]
509 fn shutdown_priority_stops_sidecars_before_rnsd() {
510 let mut roles = vec![Role::Rnsd, Role::Sentineld, Role::Statsd];
511 roles.sort_by_key(|role| shutdown_priority(*role));
512 assert_eq!(roles, vec![Role::Statsd, Role::Sentineld, Role::Rnsd]);
513 }
514
515 #[test]
516 fn missing_required_hooks_requires_enabled_hook_match() {
517 let hooks = vec![
518 HookInfo {
519 name: "hook-a".into(),
520 hook_type: "wasm".into(),
521 attach_point: "AttachA".into(),
522 priority: 0,
523 enabled: true,
524 consecutive_traps: 0,
525 },
526 HookInfo {
527 name: "hook-b".into(),
528 hook_type: "wasm".into(),
529 attach_point: "AttachB".into(),
530 priority: 0,
531 enabled: false,
532 consecutive_traps: 0,
533 },
534 ];
535 let required = vec![
536 ("hook-a".to_string(), "AttachA".to_string()),
537 ("hook-b".to_string(), "AttachB".to_string()),
538 ("hook-c".to_string(), "AttachC".to_string()),
539 ];
540
541 let missing = missing_required_hooks(&hooks, &required);
542
543 assert_eq!(
544 missing,
545 vec!["hook-b@AttachB".to_string(), "hook-c@AttachC".to_string()]
546 );
547 }
548
549 #[test]
550 fn ready_file_probe_requires_matching_process_and_pid() {
551 let path = unique_temp_path("rns-sentineld");
552 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
553 ensure_process(&state, "rns-sentineld");
554 mark_process_running(&state, "rns-sentineld", 4242);
555 std::fs::write(
556 &path,
557 "version=1\nstatus=ready\nprocess=rns-sentineld\npid=4242\ndetail=provider ready\n",
558 )
559 .unwrap();
560
561 let detail = probe_ready_file(&path, "rns-sentineld", &state).unwrap();
562 assert!(detail.contains("provider ready"));
563 let _ = std::fs::remove_file(path);
564 }
565
566 #[test]
567 fn ready_file_probe_rejects_stale_pid() {
568 let path = unique_temp_path("rns-statsd");
569 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
570 ensure_process(&state, "rns-statsd");
571 mark_process_running(&state, "rns-statsd", 99);
572 std::fs::write(
573 &path,
574 "version=1\nstatus=ready\nprocess=rns-statsd\npid=77\ndetail=stats ready\n",
575 )
576 .unwrap();
577
578 let err = probe_ready_file(&path, "rns-statsd", &state).unwrap_err();
579 assert!(err.contains("stale"));
580 let _ = std::fs::remove_file(path);
581 }
582
583 #[test]
584 fn inspect_ready_file_accepts_draining_status_for_matching_process_and_pid() {
585 let path = unique_temp_path("rns-statsd");
586 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
587 ensure_process(&state, "rns-statsd");
588 mark_process_running(&state, "rns-statsd", 77);
589 std::fs::write(
590 &path,
591 "version=1\nstatus=draining\nprocess=rns-statsd\npid=77\ndetail=flushing stats\n",
592 )
593 .unwrap();
594
595 let contract = inspect_ready_file(&path, "rns-statsd", &state).unwrap();
596 assert_eq!(contract.status, "draining");
597 assert_eq!(contract.detail, "flushing stats");
598 let _ = std::fs::remove_file(path);
599 }
600
601 #[test]
602 fn ready_file_target_is_constructible() {
603 let target = ReadinessTarget::ReadyFile(PathBuf::from("/tmp/rns.ready"));
604 match target {
605 ReadinessTarget::ReadyFile(path) => assert_eq!(path, PathBuf::from("/tmp/rns.ready")),
606 _ => panic!("unexpected target"),
607 }
608 }
609
610 #[test]
611 fn ready_file_path_for_role_selects_matching_ready_file_target() {
612 let path = PathBuf::from("/tmp/rns-sentineld.ready");
613 let readiness = vec![
614 ProcessReadiness {
615 role: Role::Sentineld,
616 target: ReadinessTarget::ReadyFile(path.clone()),
617 },
618 ProcessReadiness {
619 role: Role::Rnsd,
620 target: ReadinessTarget::ProcessAge(std::time::Duration::from_secs(1)),
621 },
622 ];
623
624 assert_eq!(
625 ready_file_path_for_role(Role::Sentineld, &readiness),
626 Some(path)
627 );
628 assert_eq!(ready_file_path_for_role(Role::Statsd, &readiness), None);
629 }
630
631 #[test]
632 fn observe_sidecar_draining_updates_process_state() {
633 let path = unique_temp_path("rns-sentineld");
634 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
635 ensure_process(&state, "rns-sentineld");
636 mark_process_running(&state, "rns-sentineld", 4242);
637 std::fs::write(
638 &path,
639 "version=1\nstatus=draining\nprocess=rns-sentineld\npid=4242\ndetail=draining queue\n",
640 )
641 .unwrap();
642
643 let managed = super::ManagedChild {
644 role: Role::Sentineld,
645 child: Command::new("sleep").arg("0").spawn().unwrap(),
646 };
647
648 assert!(observe_sidecar_draining(
649 &managed,
650 Some(&state),
651 Some(&path)
652 ));
653
654 let snapshot = {
655 let s = state.read().unwrap();
656 s.processes.get("rns-sentineld").cloned().unwrap()
657 };
658 assert!(!snapshot.ready);
659 assert_eq!(snapshot.ready_state, "draining");
660 assert!(snapshot
661 .status_detail
662 .unwrap_or_default()
663 .contains("draining queue"));
664
665 let _ = std::fs::remove_file(path);
666 }
667
668 #[test]
669 fn format_drain_status_detail_includes_deadline_when_present() {
670 let status = DrainStatus {
671 state: LifecycleState::Draining,
672 drain_age_seconds: Some(1.2),
673 deadline_remaining_seconds: Some(2.5),
674 drain_complete: false,
675 interface_writer_queued_frames: 0,
676 provider_backlog_events: 0,
677 provider_consumer_queued_events: 0,
678 detail: Some("draining 2 links".into()),
679 };
680
681 assert_eq!(
682 format_drain_status_detail(&status),
683 "draining 2 links (deadline 2.5s remaining)"
684 );
685 }
686
687 #[test]
688 fn reflect_rnsd_drain_status_updates_process_readiness() {
689 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
690 ensure_process(&state, "rnsd");
691 mark_process_running(&state, "rnsd", 1234);
692
693 reflect_rnsd_drain_status(
694 Some(&state),
695 &DrainStatus {
696 state: LifecycleState::Draining,
697 drain_age_seconds: Some(0.5),
698 deadline_remaining_seconds: Some(1.0),
699 drain_complete: false,
700 interface_writer_queued_frames: 0,
701 provider_backlog_events: 0,
702 provider_consumer_queued_events: 0,
703 detail: Some("1 link still active".into()),
704 },
705 );
706
707 let snapshot = {
708 let s = state.read().unwrap();
709 s.processes.get("rnsd").cloned().unwrap()
710 };
711 assert!(!snapshot.ready);
712 assert_eq!(snapshot.ready_state, "draining");
713 assert!(snapshot
714 .status_detail
715 .unwrap_or_default()
716 .contains("1 link still active"));
717 }
718
719 #[test]
720 fn log_rnsd_drain_progress_deduplicates_identical_updates() {
721 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
722 ensure_process(&state, "rnsd");
723 let mut last_observed = None;
724 let draining = DrainStatus {
725 state: LifecycleState::Draining,
726 drain_age_seconds: Some(0.5),
727 deadline_remaining_seconds: Some(2.0),
728 drain_complete: false,
729 interface_writer_queued_frames: 0,
730 provider_backlog_events: 0,
731 provider_consumer_queued_events: 0,
732 detail: Some("1 link still active".into()),
733 };
734
735 log_rnsd_drain_progress(Some(&state), &draining, &mut last_observed);
736 log_rnsd_drain_progress(Some(&state), &draining, &mut last_observed);
737 log_rnsd_drain_progress(
738 Some(&state),
739 &DrainStatus {
740 state: LifecycleState::Stopping,
741 drain_age_seconds: Some(1.5),
742 deadline_remaining_seconds: Some(0.0),
743 drain_complete: true,
744 interface_writer_queued_frames: 0,
745 provider_backlog_events: 0,
746 provider_consumer_queued_events: 0,
747 detail: Some("tearing down remaining work".into()),
748 },
749 &mut last_observed,
750 );
751
752 let log_count = {
753 let s = state.read().unwrap();
754 s.process_logs
755 .get("rnsd")
756 .map(|logs| logs.len())
757 .unwrap_or(0)
758 };
759 assert_eq!(log_count, 2);
760 }
761
762 #[test]
763 fn drain_complete_for_shutdown_accepts_expired_deadline() {
764 let status = DrainStatus {
765 state: LifecycleState::Draining,
766 drain_age_seconds: Some(1.0),
767 deadline_remaining_seconds: Some(0.0),
768 drain_complete: false,
769 interface_writer_queued_frames: 0,
770 provider_backlog_events: 0,
771 provider_consumer_queued_events: 0,
772 detail: Some("1 link still active".into()),
773 };
774
775 assert!(drain_complete_for_shutdown(&status));
776 }
777
778 #[test]
779 fn request_rnsd_drain_emits_begin_drain_over_rpc() {
780 let (event_tx, event_rx) = rns_net::event::channel();
781 let auth_key = [0x42; 32];
782 let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
783 let config = RnsdDrainConfig {
784 rpc_addr,
785 auth_key,
786 timeout: Duration::from_millis(250),
787 poll_interval: Duration::from_millis(10),
788 };
789
790 request_rnsd_drain(&config, "test shutdown").unwrap();
791
792 match event_rx.recv_timeout(Duration::from_secs(1)).unwrap() {
793 Event::BeginDrain { timeout } => assert_eq!(timeout, config.timeout),
794 other => panic!("expected BeginDrain event, got {:?}", other),
795 }
796 }
797
798 #[test]
799 fn wait_for_rnsd_drain_returns_true_after_live_rpc_completion() {
800 let (event_tx, event_rx) = rns_net::event::channel();
801 let auth_key = [0x24; 32];
802 let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
803 let config = RnsdDrainConfig {
804 rpc_addr,
805 auth_key,
806 timeout: Duration::from_millis(250),
807 poll_interval: Duration::from_millis(10),
808 };
809 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
810 ensure_process(&state, "rnsd");
811 mark_process_running(&state, "rnsd", 1234);
812 let (done_tx, done_rx) = mpsc::channel();
813
814 let driver = std::thread::spawn(move || {
815 let mut polls = 0usize;
816 while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
817 if let Event::Query(QueryRequest::DrainStatus, resp_tx) = event {
818 polls += 1;
819 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
820 state: if polls == 1 {
821 LifecycleState::Draining
822 } else {
823 LifecycleState::Stopping
824 },
825 drain_age_seconds: Some((polls as f64) * 0.05),
826 deadline_remaining_seconds: Some(if polls == 1 { 0.2 } else { 0.0 }),
827 drain_complete: polls > 1,
828 interface_writer_queued_frames: if polls == 1 { 2 } else { 0 },
829 provider_backlog_events: 0,
830 provider_consumer_queued_events: 0,
831 detail: Some(if polls == 1 {
832 "waiting for queued interface writes".into()
833 } else {
834 "all work drained".into()
835 }),
836 }));
837 if polls > 1 {
838 break;
839 }
840 }
841 }
842 let _ = done_tx.send(());
843 });
844
845 assert!(wait_for_rnsd_drain(&config, Some(&state)));
846 done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
847 driver.join().unwrap();
848
849 let snapshot = {
850 let s = state.read().unwrap();
851 s.processes.get("rnsd").cloned().unwrap()
852 };
853 assert_eq!(snapshot.ready_state, "stopping");
854 assert!(snapshot
855 .status_detail
856 .unwrap_or_default()
857 .contains("all work drained"));
858 }
859
860 #[test]
861 fn wait_for_rnsd_drain_times_out_when_live_rpc_never_completes() {
862 let (event_tx, event_rx) = rns_net::event::channel();
863 let auth_key = [0x11; 32];
864 let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
865 let config = RnsdDrainConfig {
866 rpc_addr,
867 auth_key,
868 timeout: Duration::from_millis(120),
869 poll_interval: Duration::from_millis(10),
870 };
871 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
872 ensure_process(&state, "rnsd");
873 mark_process_running(&state, "rnsd", 777);
874 let (stop_tx, stop_rx) = mpsc::channel();
875
876 let driver = std::thread::spawn(move || loop {
877 match event_rx.recv_timeout(Duration::from_millis(250)) {
878 Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) => {
879 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
880 state: LifecycleState::Draining,
881 drain_age_seconds: Some(0.05),
882 deadline_remaining_seconds: Some(0.05),
883 drain_complete: false,
884 interface_writer_queued_frames: 1,
885 provider_backlog_events: 2,
886 provider_consumer_queued_events: 3,
887 detail: Some("still draining queued work".into()),
888 }));
889 }
890 Ok(_) => {}
891 Err(_) => break,
892 }
893 if stop_rx.try_recv().is_ok() {
894 break;
895 }
896 });
897
898 assert!(!wait_for_rnsd_drain(&config, Some(&state)));
899 let _ = stop_tx.send(());
900 driver.join().unwrap();
901
902 let snapshot = {
903 let s = state.read().unwrap();
904 s.processes.get("rnsd").cloned().unwrap()
905 };
906 assert_eq!(snapshot.ready_state, "draining");
907 assert!(snapshot
908 .status_detail
909 .unwrap_or_default()
910 .contains("still draining queued work"));
911 }
912
913 #[test]
914 fn stop_process_requests_rnsd_drain_before_termination() {
915 let (event_tx, event_rx) = rns_net::event::channel();
916 let auth_key = [0x51; 32];
917 let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
918 let supervisor = Supervisor::new(SupervisorConfig {
919 specs: vec![ProcessSpec {
920 role: Role::Rnsd,
921 command: ProcessCommand::External(PathBuf::from("sleep")),
922 args: vec!["60".into()],
923 }],
924 shared_state: None,
925 control_rx: None,
926 readiness: Vec::new(),
927 log_dir: None,
928 rnsd_drain: Some(RnsdDrainConfig {
929 rpc_addr,
930 auth_key,
931 timeout: Duration::from_millis(250),
932 poll_interval: Duration::from_millis(10),
933 }),
934 });
935 let mut children = vec![super::ManagedChild {
936 role: Role::Rnsd,
937 child: Command::new("sleep").arg("60").spawn().unwrap(),
938 }];
939 let (done_tx, done_rx) = mpsc::channel();
940
941 let driver = std::thread::spawn(move || {
942 let mut saw_begin_drain = false;
943 while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
944 match event {
945 Event::BeginDrain { .. } => saw_begin_drain = true,
946 Event::Query(QueryRequest::DrainStatus, resp_tx) => {
947 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
948 state: LifecycleState::Stopping,
949 drain_age_seconds: Some(0.05),
950 deadline_remaining_seconds: Some(0.0),
951 drain_complete: true,
952 interface_writer_queued_frames: 0,
953 provider_backlog_events: 0,
954 provider_consumer_queued_events: 0,
955 detail: Some("ready to stop".into()),
956 }));
957 let _ = done_tx.send(saw_begin_drain);
958 break;
959 }
960 _ => {}
961 }
962 }
963 });
964
965 supervisor.stop_process("rnsd", &mut children).unwrap();
966 assert!(children.is_empty());
967 assert!(done_rx.recv_timeout(Duration::from_secs(1)).unwrap());
968 driver.join().unwrap();
969 }
970
971 #[test]
972 fn restart_process_requests_rnsd_drain_before_replacement() {
973 let (event_tx, event_rx) = rns_net::event::channel();
974 let auth_key = [0x61; 32];
975 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
976 ensure_process(&state, "rnsd");
977 let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
978 let supervisor = Supervisor::new(SupervisorConfig {
979 specs: vec![ProcessSpec {
980 role: Role::Rnsd,
981 command: ProcessCommand::External(PathBuf::from("sleep")),
982 args: vec!["60".into()],
983 }],
984 shared_state: Some(state.clone()),
985 control_rx: None,
986 readiness: Vec::new(),
987 log_dir: None,
988 rnsd_drain: Some(RnsdDrainConfig {
989 rpc_addr: rpc_addr.clone(),
990 auth_key,
991 timeout: Duration::from_millis(250),
992 poll_interval: Duration::from_millis(10),
993 }),
994 });
995 let original = Command::new("sleep").arg("60").spawn().unwrap();
996 mark_process_running(&state, "rnsd", original.id());
997 let original_pid = original.id();
998 let mut children = vec![super::ManagedChild {
999 role: Role::Rnsd,
1000 child: original,
1001 }];
1002 let (done_tx, done_rx) = mpsc::channel();
1003
1004 let driver = std::thread::spawn(move || {
1005 let mut saw_begin_drain = false;
1006 while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
1007 match event {
1008 Event::BeginDrain { .. } => saw_begin_drain = true,
1009 Event::Query(QueryRequest::DrainStatus, resp_tx) => {
1010 let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
1011 state: LifecycleState::Stopping,
1012 drain_age_seconds: Some(0.05),
1013 deadline_remaining_seconds: Some(0.0),
1014 drain_complete: true,
1015 interface_writer_queued_frames: 0,
1016 provider_backlog_events: 0,
1017 provider_consumer_queued_events: 0,
1018 detail: Some("ready to restart".into()),
1019 }));
1020 let _ = done_tx.send(saw_begin_drain);
1021 break;
1022 }
1023 _ => {}
1024 }
1025 }
1026 });
1027
1028 supervisor.restart_process("rnsd", &mut children).unwrap();
1029 assert_eq!(children.len(), 1);
1030 assert_eq!(children[0].role, Role::Rnsd);
1031 assert_ne!(children[0].child.id(), original_pid);
1032 assert!(done_rx.recv_timeout(Duration::from_secs(1)).unwrap());
1033 driver.join().unwrap();
1034
1035 let snapshot = {
1036 let s = state.read().unwrap();
1037 s.processes.get("rnsd").cloned().unwrap()
1038 };
1039 assert_eq!(snapshot.restart_count, 1);
1040
1041 let _ = children[0].child.kill();
1042 let _ = children[0].child.wait();
1043 }
1044
1045 #[test]
1046 fn supervisor_restarts_unexpectedly_exited_child_instead_of_exiting() {
1047 let temp_root = std::env::temp_dir().join(format!(
1048 "rns-server-supervisor-test-{}-{}",
1049 std::process::id(),
1050 SystemTime::now()
1051 .duration_since(UNIX_EPOCH)
1052 .unwrap_or_default()
1053 .as_nanos()
1054 ));
1055 std::fs::create_dir_all(&temp_root).unwrap();
1056 let count_path = temp_root.join("count");
1057 let script_path = temp_root.join("restart-once.sh");
1058 std::fs::write(
1059 &script_path,
1060 format!(
1061 "#!/usr/bin/env bash\nset -eu\ncount_file=\"{}\"\ncount=$(cat \"$count_file\" 2>/dev/null || echo 0)\ncount=$((count+1))\nprintf '%s' \"$count\" > \"$count_file\"\nif [ \"$count\" -eq 1 ]; then\n exit 7\nfi\nsleep 60\n",
1062 count_path.display()
1063 ),
1064 )
1065 .unwrap();
1066 let mut perms = std::fs::metadata(&script_path).unwrap().permissions();
1067 perms.set_mode(0o755);
1068 std::fs::set_permissions(&script_path, perms).unwrap();
1069
1070 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
1071 ensure_process(&state, "rns-sentineld");
1072 let supervisor = Supervisor::new(SupervisorConfig {
1073 specs: vec![ProcessSpec {
1074 role: Role::Sentineld,
1075 command: ProcessCommand::External(script_path.clone()),
1076 args: Vec::new(),
1077 }],
1078 shared_state: Some(state.clone()),
1079 control_rx: None,
1080 readiness: Vec::new(),
1081 log_dir: None,
1082 rnsd_drain: None,
1083 });
1084 let (result_tx, result_rx) = mpsc::channel();
1085
1086 let handle = std::thread::spawn(move || {
1087 let result = supervisor.run();
1088 let _ = result_tx.send(result);
1089 });
1090
1091 match result_rx.recv_timeout(Duration::from_secs(2)) {
1092 Err(mpsc::RecvTimeoutError::Timeout) => {}
1093 Ok(result) => panic!("supervisor exited unexpectedly: {:?}", result),
1094 Err(err) => panic!("failed waiting for supervisor result: {}", err),
1095 }
1096
1097 let restart_count = {
1098 let s = state.read().unwrap();
1099 let process = s.processes.get("rns-sentineld").cloned().unwrap();
1100 assert!(
1101 process.pid.is_some(),
1102 "expected restarted child to be running"
1103 );
1104 process.restart_count
1105 };
1106 assert_eq!(restart_count, 1);
1107 assert_eq!(std::fs::read_to_string(&count_path).unwrap(), "2");
1108
1109 STOP_TX.lock().unwrap().as_ref().unwrap().send(()).unwrap();
1110 let result = result_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1111 assert_eq!(result.unwrap(), 0);
1112 handle.join().unwrap();
1113 let _ = std::fs::remove_dir_all(temp_root);
1114 }
1115
1116 fn unique_temp_path(prefix: &str) -> PathBuf {
1117 let now = SystemTime::now()
1118 .duration_since(UNIX_EPOCH)
1119 .unwrap_or_default()
1120 .as_nanos();
1121 std::env::temp_dir().join(format!("{prefix}-{}-{now}.ready", std::process::id()))
1122 }
1123
1124 fn start_test_rpc_server(auth_key: [u8; 32], event_tx: EventSender) -> (RpcAddr, RpcServer) {
1125 for _ in 0..16 {
1126 let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap();
1127 let port = listener.local_addr().unwrap().port();
1128 drop(listener);
1129 let rpc_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
1130 match RpcServer::start(&rpc_addr, auth_key, event_tx.clone()) {
1131 Ok(server) => return (rpc_addr, server),
1132 Err(err) if err.kind() == std::io::ErrorKind::AddrInUse => continue,
1133 Err(err) => panic!("failed to start rpc server for test: {err}"),
1134 }
1135 }
1136
1137 panic!("failed to allocate rpc server address for test");
1138 }
1139}