Skip to main content

rns_server/
supervisor.rs

1use std::collections::HashMap;
2use std::io::{self, BufRead, BufReader};
3use std::net::{SocketAddr, TcpStream};
4use std::os::unix::net::UnixStream;
5use std::os::unix::process::CommandExt;
6use std::path::PathBuf;
7use std::process::{Child, Command, ExitStatus, Stdio};
8use std::sync::mpsc;
9use std::thread;
10use std::time::Duration;
11
12use crate::logs::LogStore;
13use crate::self_exec::{resolve_self_exec, self_exec_display};
14use rns_ctl::state::{
15    bump_process_restart_count, mark_process_failed_spawn, mark_process_running,
16    mark_process_stopped, push_process_log, record_process_termination_observation,
17    set_process_log_path, set_process_readiness, ProcessControlCommand, SharedState,
18};
19use rns_net::{event::DrainStatus, HookInfo, RpcAddr, RpcClient};
20
21mod drain;
22mod process;
23mod readiness;
24
25use self::process::{
26    check_exits, exit_code, role_from_name, spawn_child, terminate_child, terminate_children,
27    ManagedChild,
28};
29use self::readiness::ready_file_path_for_role;
30pub use self::readiness::{ProcessReadiness, ReadinessTarget};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Role {
34    Rnsd,
35    Sentineld,
36    Statsd,
37}
38
39impl Role {
40    pub fn display_name(self) -> &'static str {
41        match self {
42            Role::Rnsd => "rnsd",
43            Role::Sentineld => "rns-sentineld",
44            Role::Statsd => "rns-statsd",
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub struct ProcessSpec {
51    pub role: Role,
52    pub command: ProcessCommand,
53    pub args: Vec<String>,
54}
55
56impl ProcessSpec {
57    pub fn command_line(&self) -> String {
58        let mut parts = vec![self.command.display(self.role)];
59        parts.extend(self.args.iter().cloned());
60        parts.join(" ")
61    }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub enum ProcessCommand {
66    External(PathBuf),
67    SelfInvoke,
68}
69
70impl ProcessCommand {
71    pub fn display(&self, role: Role) -> String {
72        match self {
73            ProcessCommand::External(path) => path.display().to_string(),
74            ProcessCommand::SelfInvoke => {
75                format!(
76                    "{} --internal-role {}",
77                    self_exec_display(),
78                    role.display_name()
79                )
80            }
81        }
82    }
83}
84
85pub struct SupervisorConfig {
86    pub specs: Vec<ProcessSpec>,
87    pub shared_state: Option<SharedState>,
88    pub control_rx: Option<mpsc::Receiver<ProcessControlCommand>>,
89    pub readiness: Vec<ProcessReadiness>,
90    pub log_dir: Option<PathBuf>,
91    pub rnsd_drain: Option<RnsdDrainConfig>,
92}
93
94pub struct Supervisor {
95    specs: Vec<ProcessSpec>,
96    shared_state: Option<SharedState>,
97    control_rx: Option<mpsc::Receiver<ProcessControlCommand>>,
98    readiness: Vec<ProcessReadiness>,
99    log_store: Option<LogStore>,
100    rnsd_drain: Option<RnsdDrainConfig>,
101}
102
103fn read_shared_state<'a>(
104    state: &'a SharedState,
105) -> std::sync::RwLockReadGuard<'a, rns_ctl::state::CtlState> {
106    match state.read() {
107        Ok(guard) => guard,
108        Err(poisoned) => {
109            log::error!("recovering from poisoned supervisor shared state read lock");
110            poisoned.into_inner()
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct RnsdDrainConfig {
117    pub rpc_addr: RpcAddr,
118    pub auth_key: [u8; 32],
119    pub timeout: Duration,
120    pub poll_interval: Duration,
121}
122
123impl Supervisor {
124    pub fn new(config: SupervisorConfig) -> Self {
125        Self {
126            specs: config.specs,
127            shared_state: config.shared_state,
128            control_rx: config.control_rx,
129            readiness: config.readiness,
130            log_store: config.log_dir.map(LogStore::new),
131            rnsd_drain: config.rnsd_drain,
132        }
133    }
134
135    pub fn specs(&self) -> &[ProcessSpec] {
136        &self.specs
137    }
138
139    pub fn run(&self) -> Result<i32, String> {
140        self.run_with_started_hook(|| Ok(()))
141    }
142
143    pub fn run_with_started_hook<F>(&self, on_started: F) -> Result<i32, String>
144    where
145        F: FnOnce() -> Result<(), String>,
146    {
147        let mut children = self
148            .specs
149            .iter()
150            .map(|spec| spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone()))
151            .collect::<Result<Vec<_>, _>>()?;
152        let mut unexpected_restart_counts = HashMap::new();
153
154        on_started()?;
155
156        let stop_rx = install_signal_handlers();
157
158        loop {
159            if stop_rx.try_recv().is_ok() {
160                log::info!("shutdown requested");
161                self.drain_rnsd(&children, "supervisor shutdown");
162                terminate_children(&mut children, self.shared_state.as_ref(), &self.readiness);
163                return Ok(0);
164            }
165
166            if let Some(command) = self.next_control_command() {
167                self.handle_control_command(command, &mut children)?;
168            }
169
170            self.refresh_readiness();
171
172            if let Some((role, status)) = check_exits(&mut children)? {
173                log::warn!("{} exited with status {}", role.display_name(), status);
174                if self.restart_unexpected_exit(
175                    role,
176                    status,
177                    &mut children,
178                    &mut unexpected_restart_counts,
179                )? {
180                    continue;
181                }
182                if let Some(state) = self.shared_state.as_ref() {
183                    mark_process_stopped(state, role.display_name(), status.code());
184                }
185                terminate_children(&mut children, self.shared_state.as_ref(), &self.readiness);
186                return Ok(exit_code(status));
187            }
188
189            std::thread::sleep(Duration::from_millis(200));
190        }
191    }
192}
193
194impl Supervisor {
195    fn restart_unexpected_exit(
196        &self,
197        role: Role,
198        status: ExitStatus,
199        children: &mut [ManagedChild],
200        unexpected_restart_counts: &mut HashMap<Role, usize>,
201    ) -> Result<bool, String> {
202        const MAX_UNEXPECTED_RESTARTS: usize = 3;
203        const UNEXPECTED_RESTART_BACKOFF: Duration = Duration::from_millis(200);
204
205        let Some(index) = children.iter().position(|child| child.role == role) else {
206            return Ok(false);
207        };
208        let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
209            return Ok(false);
210        };
211
212        let attempts = unexpected_restart_counts.entry(role).or_insert(0);
213        if *attempts >= MAX_UNEXPECTED_RESTARTS {
214            return Ok(false);
215        }
216        *attempts += 1;
217
218        if let Some(state) = self.shared_state.as_ref() {
219            mark_process_stopped(state, role.display_name(), status.code());
220            bump_process_restart_count(state, role.display_name());
221            push_process_log(
222                state,
223                role.display_name(),
224                "supervisor",
225                format!(
226                    "unexpected exit with status {}; restarting ({}/{})",
227                    exit_code(status),
228                    *attempts,
229                    MAX_UNEXPECTED_RESTARTS
230                ),
231            );
232        }
233
234        thread::sleep(UNEXPECTED_RESTART_BACKOFF);
235        children[index] = spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone())?;
236        Ok(true)
237    }
238
239    fn next_control_command(&self) -> Option<ProcessControlCommand> {
240        self.control_rx.as_ref().and_then(|rx| rx.try_recv().ok())
241    }
242
243    fn handle_control_command(
244        &self,
245        command: ProcessControlCommand,
246        children: &mut Vec<ManagedChild>,
247    ) -> Result<(), String> {
248        match command {
249            ProcessControlCommand::Restart(name) => self.restart_process(&name, children),
250            ProcessControlCommand::Start(name) => self.start_process(&name, children),
251            ProcessControlCommand::Stop(name) => self.stop_process(&name, children),
252        }
253    }
254
255    fn restart_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
256        let Some(role) = role_from_name(name) else {
257            return Err(format!("unknown process '{}'", name));
258        };
259        let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
260            return Err(format!("missing process spec for '{}'", name));
261        };
262
263        if let Some(index) = children.iter().position(|child| child.role == role) {
264            self.drain_role(role, children, "process restart");
265            let ready_file = ready_file_path_for_role(role, &self.readiness);
266            terminate_child(
267                &mut children[index],
268                self.shared_state.as_ref(),
269                ready_file.as_ref(),
270            )
271            .map_err(|e| {
272                format!(
273                    "failed to terminate {} during restart: {}",
274                    role.display_name(),
275                    e
276                )
277            })?;
278            if let Some(state) = self.shared_state.as_ref() {
279                mark_process_stopped(state, role.display_name(), None);
280                bump_process_restart_count(state, role.display_name());
281            }
282            children[index] =
283                spawn_child(spec, self.shared_state.as_ref(), self.log_store.clone())?;
284        }
285
286        Ok(())
287    }
288
289    fn start_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
290        let Some(role) = role_from_name(name) else {
291            return Err(format!("unknown process '{}'", name));
292        };
293        if children.iter().any(|child| child.role == role) {
294            return Ok(());
295        }
296        let Some(spec) = self.specs.iter().find(|spec| spec.role == role) else {
297            return Err(format!("missing process spec for '{}'", name));
298        };
299        children.push(spawn_child(
300            spec,
301            self.shared_state.as_ref(),
302            self.log_store.clone(),
303        )?);
304        Ok(())
305    }
306
307    fn stop_process(&self, name: &str, children: &mut Vec<ManagedChild>) -> Result<(), String> {
308        let Some(role) = role_from_name(name) else {
309            return Err(format!("unknown process '{}'", name));
310        };
311        let Some(index) = children.iter().position(|child| child.role == role) else {
312            return Ok(());
313        };
314        self.drain_role(role, children, "process stop");
315        let ready_file = ready_file_path_for_role(role, &self.readiness);
316        terminate_child(
317            &mut children[index],
318            self.shared_state.as_ref(),
319            ready_file.as_ref(),
320        )
321        .map_err(|e| {
322            format!(
323                "failed to terminate {} during stop: {}",
324                role.display_name(),
325                e
326            )
327        })?;
328        if let Some(state) = self.shared_state.as_ref() {
329            let code = children[index]
330                .child
331                .try_wait()
332                .ok()
333                .flatten()
334                .and_then(|status| status.code());
335            mark_process_stopped(state, role.display_name(), code);
336        }
337        children.remove(index);
338        Ok(())
339    }
340
341    fn refresh_readiness(&self) {
342        let Some(state) = self.shared_state.as_ref() else {
343            return;
344        };
345
346        for readiness in &self.readiness {
347            let (ready, ready_state, detail) = readiness.probe(state);
348            set_process_readiness(state, readiness.name(), ready, ready_state, detail);
349        }
350    }
351}
352
353static STOP_TX: std::sync::Mutex<Option<mpsc::Sender<()>>> = std::sync::Mutex::new(None);
354
355extern "C" fn signal_handler(_sig: libc::c_int) {
356    if let Ok(guard) = STOP_TX.lock() {
357        if let Some(ref tx) = *guard {
358            let _ = tx.send(());
359        }
360    }
361}
362
363fn install_signal_handlers() -> mpsc::Receiver<()> {
364    let (stop_tx, stop_rx) = mpsc::channel();
365    match STOP_TX.lock() {
366        Ok(mut guard) => {
367            guard.replace(stop_tx);
368        }
369        Err(poisoned) => {
370            log::error!("recovering from poisoned supervisor stop-channel lock");
371            poisoned.into_inner().replace(stop_tx);
372        }
373    }
374    #[cfg(unix)]
375    unsafe {
376        libc::signal(libc::SIGINT, signal_handler as *const () as usize);
377        libc::signal(libc::SIGTERM, signal_handler as *const () as usize);
378    }
379    stop_rx
380}
381
382#[cfg(test)]
383mod tests {
384    use super::drain::{
385        drain_complete_for_shutdown, format_drain_status_detail, log_rnsd_drain_progress,
386        reflect_rnsd_drain_status, request_rnsd_drain, wait_for_rnsd_drain,
387    };
388    use super::process::{command_for_spec, role_from_name, shutdown_priority};
389    use super::readiness::{
390        inspect_ready_file, missing_required_hooks, observe_sidecar_draining, probe_ready_file,
391        ready_file_path_for_role,
392    };
393    use super::{
394        ProcessCommand, ProcessReadiness, ProcessSpec, ReadinessTarget, RnsdDrainConfig, Role,
395        Supervisor, SupervisorConfig, STOP_TX,
396    };
397    use rns_ctl::state::{ensure_process, mark_process_running, CtlState, SharedState};
398    use rns_net::{
399        event::EventSender,
400        event::{DrainStatus, Event, LifecycleState, QueryRequest, QueryResponse},
401        HookInfo, RpcAddr, RpcServer,
402    };
403    use std::net::TcpListener;
404    use std::os::unix::fs::PermissionsExt;
405    use std::path::PathBuf;
406    use std::process::Command;
407    use std::sync::mpsc;
408    use std::sync::{Arc, RwLock};
409    use std::time::{Duration, SystemTime, UNIX_EPOCH};
410
411    #[test]
412    fn supervisor_holds_expected_specs() {
413        let specs = vec![
414            ProcessSpec {
415                role: Role::Rnsd,
416                command: ProcessCommand::External(PathBuf::from("rnsd")),
417                args: vec!["--config".into(), "/tmp/rns".into()],
418            },
419            ProcessSpec {
420                role: Role::Sentineld,
421                command: ProcessCommand::External(PathBuf::from("rns-sentineld")),
422                args: vec!["--config".into(), "/tmp/rns".into()],
423            },
424            ProcessSpec {
425                role: Role::Statsd,
426                command: ProcessCommand::External(PathBuf::from("rns-statsd")),
427                args: vec![
428                    "--config".into(),
429                    "/tmp/rns".into(),
430                    "--db".into(),
431                    "/tmp/rns/stats.db".into(),
432                ],
433            },
434        ];
435
436        let supervisor = SupervisorConfig {
437            specs,
438            shared_state: None,
439            control_rx: None,
440            readiness: Vec::new(),
441            log_dir: None,
442            rnsd_drain: None,
443        };
444
445        assert_eq!(supervisor.specs.len(), 3);
446        assert_eq!(supervisor.specs[0].role, Role::Rnsd);
447        assert_eq!(supervisor.specs[1].role, Role::Sentineld);
448        assert_eq!(supervisor.specs[2].role, Role::Statsd);
449        assert!(supervisor.specs[2].args.iter().any(|arg| arg == "--db"));
450    }
451
452    #[test]
453    fn process_spec_command_line() {
454        let spec = ProcessSpec {
455            role: Role::Rnsd,
456            command: ProcessCommand::External(PathBuf::from("rnsd")),
457            args: vec!["--config".into(), "/data".into()],
458        };
459        assert_eq!(spec.command_line(), "rnsd --config /data");
460    }
461
462    #[test]
463    fn self_invoke_command_line_uses_internal_role() {
464        let spec = ProcessSpec {
465            role: Role::Statsd,
466            command: ProcessCommand::SelfInvoke,
467            args: vec!["--config".into(), "/data".into()],
468        };
469        assert_eq!(
470            spec.command_line(),
471            "/proc/self/exe --internal-role rns-statsd --config /data"
472        );
473    }
474
475    #[test]
476    fn self_invoke_command_builder_includes_internal_role_args() {
477        let spec = ProcessSpec {
478            role: Role::Sentineld,
479            command: ProcessCommand::SelfInvoke,
480            args: vec!["--config".into(), "/data".into()],
481        };
482        let command = command_for_spec(&spec).unwrap();
483        let program = command.get_program().to_string_lossy().to_string();
484        let args: Vec<String> = command
485            .get_args()
486            .map(|arg| arg.to_string_lossy().to_string())
487            .collect();
488        assert!(program == "/proc/self/exe" || !program.is_empty());
489        assert_eq!(
490            args,
491            vec![
492                "--internal-role".to_string(),
493                "rns-sentineld".to_string(),
494                "--config".to_string(),
495                "/data".to_string()
496            ]
497        );
498    }
499
500    #[test]
501    fn role_from_name_maps_known_processes() {
502        assert_eq!(role_from_name("rnsd"), Some(Role::Rnsd));
503        assert_eq!(role_from_name("rns-sentineld"), Some(Role::Sentineld));
504        assert_eq!(role_from_name("rns-statsd"), Some(Role::Statsd));
505        assert_eq!(role_from_name("unknown"), None);
506    }
507
508    #[test]
509    fn shutdown_priority_stops_sidecars_before_rnsd() {
510        let mut roles = vec![Role::Rnsd, Role::Sentineld, Role::Statsd];
511        roles.sort_by_key(|role| shutdown_priority(*role));
512        assert_eq!(roles, vec![Role::Statsd, Role::Sentineld, Role::Rnsd]);
513    }
514
515    #[test]
516    fn missing_required_hooks_requires_enabled_hook_match() {
517        let hooks = vec![
518            HookInfo {
519                name: "hook-a".into(),
520                hook_type: "wasm".into(),
521                attach_point: "AttachA".into(),
522                priority: 0,
523                enabled: true,
524                consecutive_traps: 0,
525            },
526            HookInfo {
527                name: "hook-b".into(),
528                hook_type: "wasm".into(),
529                attach_point: "AttachB".into(),
530                priority: 0,
531                enabled: false,
532                consecutive_traps: 0,
533            },
534        ];
535        let required = vec![
536            ("hook-a".to_string(), "AttachA".to_string()),
537            ("hook-b".to_string(), "AttachB".to_string()),
538            ("hook-c".to_string(), "AttachC".to_string()),
539        ];
540
541        let missing = missing_required_hooks(&hooks, &required);
542
543        assert_eq!(
544            missing,
545            vec!["hook-b@AttachB".to_string(), "hook-c@AttachC".to_string()]
546        );
547    }
548
549    #[test]
550    fn ready_file_probe_requires_matching_process_and_pid() {
551        let path = unique_temp_path("rns-sentineld");
552        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
553        ensure_process(&state, "rns-sentineld");
554        mark_process_running(&state, "rns-sentineld", 4242);
555        std::fs::write(
556            &path,
557            "version=1\nstatus=ready\nprocess=rns-sentineld\npid=4242\ndetail=provider ready\n",
558        )
559        .unwrap();
560
561        let detail = probe_ready_file(&path, "rns-sentineld", &state).unwrap();
562        assert!(detail.contains("provider ready"));
563        let _ = std::fs::remove_file(path);
564    }
565
566    #[test]
567    fn ready_file_probe_rejects_stale_pid() {
568        let path = unique_temp_path("rns-statsd");
569        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
570        ensure_process(&state, "rns-statsd");
571        mark_process_running(&state, "rns-statsd", 99);
572        std::fs::write(
573            &path,
574            "version=1\nstatus=ready\nprocess=rns-statsd\npid=77\ndetail=stats ready\n",
575        )
576        .unwrap();
577
578        let err = probe_ready_file(&path, "rns-statsd", &state).unwrap_err();
579        assert!(err.contains("stale"));
580        let _ = std::fs::remove_file(path);
581    }
582
583    #[test]
584    fn inspect_ready_file_accepts_draining_status_for_matching_process_and_pid() {
585        let path = unique_temp_path("rns-statsd");
586        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
587        ensure_process(&state, "rns-statsd");
588        mark_process_running(&state, "rns-statsd", 77);
589        std::fs::write(
590            &path,
591            "version=1\nstatus=draining\nprocess=rns-statsd\npid=77\ndetail=flushing stats\n",
592        )
593        .unwrap();
594
595        let contract = inspect_ready_file(&path, "rns-statsd", &state).unwrap();
596        assert_eq!(contract.status, "draining");
597        assert_eq!(contract.detail, "flushing stats");
598        let _ = std::fs::remove_file(path);
599    }
600
601    #[test]
602    fn ready_file_target_is_constructible() {
603        let target = ReadinessTarget::ReadyFile(PathBuf::from("/tmp/rns.ready"));
604        match target {
605            ReadinessTarget::ReadyFile(path) => assert_eq!(path, PathBuf::from("/tmp/rns.ready")),
606            _ => panic!("unexpected target"),
607        }
608    }
609
610    #[test]
611    fn ready_file_path_for_role_selects_matching_ready_file_target() {
612        let path = PathBuf::from("/tmp/rns-sentineld.ready");
613        let readiness = vec![
614            ProcessReadiness {
615                role: Role::Sentineld,
616                target: ReadinessTarget::ReadyFile(path.clone()),
617            },
618            ProcessReadiness {
619                role: Role::Rnsd,
620                target: ReadinessTarget::ProcessAge(std::time::Duration::from_secs(1)),
621            },
622        ];
623
624        assert_eq!(
625            ready_file_path_for_role(Role::Sentineld, &readiness),
626            Some(path)
627        );
628        assert_eq!(ready_file_path_for_role(Role::Statsd, &readiness), None);
629    }
630
631    #[test]
632    fn observe_sidecar_draining_updates_process_state() {
633        let path = unique_temp_path("rns-sentineld");
634        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
635        ensure_process(&state, "rns-sentineld");
636        mark_process_running(&state, "rns-sentineld", 4242);
637        std::fs::write(
638            &path,
639            "version=1\nstatus=draining\nprocess=rns-sentineld\npid=4242\ndetail=draining queue\n",
640        )
641        .unwrap();
642
643        let managed = super::ManagedChild {
644            role: Role::Sentineld,
645            child: Command::new("sleep").arg("0").spawn().unwrap(),
646        };
647
648        assert!(observe_sidecar_draining(
649            &managed,
650            Some(&state),
651            Some(&path)
652        ));
653
654        let snapshot = {
655            let s = state.read().unwrap();
656            s.processes.get("rns-sentineld").cloned().unwrap()
657        };
658        assert!(!snapshot.ready);
659        assert_eq!(snapshot.ready_state, "draining");
660        assert!(snapshot
661            .status_detail
662            .unwrap_or_default()
663            .contains("draining queue"));
664
665        let _ = std::fs::remove_file(path);
666    }
667
668    #[test]
669    fn format_drain_status_detail_includes_deadline_when_present() {
670        let status = DrainStatus {
671            state: LifecycleState::Draining,
672            drain_age_seconds: Some(1.2),
673            deadline_remaining_seconds: Some(2.5),
674            drain_complete: false,
675            interface_writer_queued_frames: 0,
676            provider_backlog_events: 0,
677            provider_consumer_queued_events: 0,
678            detail: Some("draining 2 links".into()),
679        };
680
681        assert_eq!(
682            format_drain_status_detail(&status),
683            "draining 2 links (deadline 2.5s remaining)"
684        );
685    }
686
687    #[test]
688    fn reflect_rnsd_drain_status_updates_process_readiness() {
689        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
690        ensure_process(&state, "rnsd");
691        mark_process_running(&state, "rnsd", 1234);
692
693        reflect_rnsd_drain_status(
694            Some(&state),
695            &DrainStatus {
696                state: LifecycleState::Draining,
697                drain_age_seconds: Some(0.5),
698                deadline_remaining_seconds: Some(1.0),
699                drain_complete: false,
700                interface_writer_queued_frames: 0,
701                provider_backlog_events: 0,
702                provider_consumer_queued_events: 0,
703                detail: Some("1 link still active".into()),
704            },
705        );
706
707        let snapshot = {
708            let s = state.read().unwrap();
709            s.processes.get("rnsd").cloned().unwrap()
710        };
711        assert!(!snapshot.ready);
712        assert_eq!(snapshot.ready_state, "draining");
713        assert!(snapshot
714            .status_detail
715            .unwrap_or_default()
716            .contains("1 link still active"));
717    }
718
719    #[test]
720    fn log_rnsd_drain_progress_deduplicates_identical_updates() {
721        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
722        ensure_process(&state, "rnsd");
723        let mut last_observed = None;
724        let draining = DrainStatus {
725            state: LifecycleState::Draining,
726            drain_age_seconds: Some(0.5),
727            deadline_remaining_seconds: Some(2.0),
728            drain_complete: false,
729            interface_writer_queued_frames: 0,
730            provider_backlog_events: 0,
731            provider_consumer_queued_events: 0,
732            detail: Some("1 link still active".into()),
733        };
734
735        log_rnsd_drain_progress(Some(&state), &draining, &mut last_observed);
736        log_rnsd_drain_progress(Some(&state), &draining, &mut last_observed);
737        log_rnsd_drain_progress(
738            Some(&state),
739            &DrainStatus {
740                state: LifecycleState::Stopping,
741                drain_age_seconds: Some(1.5),
742                deadline_remaining_seconds: Some(0.0),
743                drain_complete: true,
744                interface_writer_queued_frames: 0,
745                provider_backlog_events: 0,
746                provider_consumer_queued_events: 0,
747                detail: Some("tearing down remaining work".into()),
748            },
749            &mut last_observed,
750        );
751
752        let log_count = {
753            let s = state.read().unwrap();
754            s.process_logs
755                .get("rnsd")
756                .map(|logs| logs.len())
757                .unwrap_or(0)
758        };
759        assert_eq!(log_count, 2);
760    }
761
762    #[test]
763    fn drain_complete_for_shutdown_accepts_expired_deadline() {
764        let status = DrainStatus {
765            state: LifecycleState::Draining,
766            drain_age_seconds: Some(1.0),
767            deadline_remaining_seconds: Some(0.0),
768            drain_complete: false,
769            interface_writer_queued_frames: 0,
770            provider_backlog_events: 0,
771            provider_consumer_queued_events: 0,
772            detail: Some("1 link still active".into()),
773        };
774
775        assert!(drain_complete_for_shutdown(&status));
776    }
777
778    #[test]
779    fn request_rnsd_drain_emits_begin_drain_over_rpc() {
780        let (event_tx, event_rx) = rns_net::event::channel();
781        let auth_key = [0x42; 32];
782        let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
783        let config = RnsdDrainConfig {
784            rpc_addr,
785            auth_key,
786            timeout: Duration::from_millis(250),
787            poll_interval: Duration::from_millis(10),
788        };
789
790        request_rnsd_drain(&config, "test shutdown").unwrap();
791
792        match event_rx.recv_timeout(Duration::from_secs(1)).unwrap() {
793            Event::BeginDrain { timeout } => assert_eq!(timeout, config.timeout),
794            other => panic!("expected BeginDrain event, got {:?}", other),
795        }
796    }
797
798    #[test]
799    fn wait_for_rnsd_drain_returns_true_after_live_rpc_completion() {
800        let (event_tx, event_rx) = rns_net::event::channel();
801        let auth_key = [0x24; 32];
802        let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
803        let config = RnsdDrainConfig {
804            rpc_addr,
805            auth_key,
806            timeout: Duration::from_millis(250),
807            poll_interval: Duration::from_millis(10),
808        };
809        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
810        ensure_process(&state, "rnsd");
811        mark_process_running(&state, "rnsd", 1234);
812        let (done_tx, done_rx) = mpsc::channel();
813
814        let driver = std::thread::spawn(move || {
815            let mut polls = 0usize;
816            while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
817                if let Event::Query(QueryRequest::DrainStatus, resp_tx) = event {
818                    polls += 1;
819                    let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
820                        state: if polls == 1 {
821                            LifecycleState::Draining
822                        } else {
823                            LifecycleState::Stopping
824                        },
825                        drain_age_seconds: Some((polls as f64) * 0.05),
826                        deadline_remaining_seconds: Some(if polls == 1 { 0.2 } else { 0.0 }),
827                        drain_complete: polls > 1,
828                        interface_writer_queued_frames: if polls == 1 { 2 } else { 0 },
829                        provider_backlog_events: 0,
830                        provider_consumer_queued_events: 0,
831                        detail: Some(if polls == 1 {
832                            "waiting for queued interface writes".into()
833                        } else {
834                            "all work drained".into()
835                        }),
836                    }));
837                    if polls > 1 {
838                        break;
839                    }
840                }
841            }
842            let _ = done_tx.send(());
843        });
844
845        assert!(wait_for_rnsd_drain(&config, Some(&state)));
846        done_rx.recv_timeout(Duration::from_secs(1)).unwrap();
847        driver.join().unwrap();
848
849        let snapshot = {
850            let s = state.read().unwrap();
851            s.processes.get("rnsd").cloned().unwrap()
852        };
853        assert_eq!(snapshot.ready_state, "stopping");
854        assert!(snapshot
855            .status_detail
856            .unwrap_or_default()
857            .contains("all work drained"));
858    }
859
860    #[test]
861    fn wait_for_rnsd_drain_times_out_when_live_rpc_never_completes() {
862        let (event_tx, event_rx) = rns_net::event::channel();
863        let auth_key = [0x11; 32];
864        let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
865        let config = RnsdDrainConfig {
866            rpc_addr,
867            auth_key,
868            timeout: Duration::from_millis(120),
869            poll_interval: Duration::from_millis(10),
870        };
871        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
872        ensure_process(&state, "rnsd");
873        mark_process_running(&state, "rnsd", 777);
874        let (stop_tx, stop_rx) = mpsc::channel();
875
876        let driver = std::thread::spawn(move || loop {
877            match event_rx.recv_timeout(Duration::from_millis(250)) {
878                Ok(Event::Query(QueryRequest::DrainStatus, resp_tx)) => {
879                    let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
880                        state: LifecycleState::Draining,
881                        drain_age_seconds: Some(0.05),
882                        deadline_remaining_seconds: Some(0.05),
883                        drain_complete: false,
884                        interface_writer_queued_frames: 1,
885                        provider_backlog_events: 2,
886                        provider_consumer_queued_events: 3,
887                        detail: Some("still draining queued work".into()),
888                    }));
889                }
890                Ok(_) => {}
891                Err(_) => break,
892            }
893            if stop_rx.try_recv().is_ok() {
894                break;
895            }
896        });
897
898        assert!(!wait_for_rnsd_drain(&config, Some(&state)));
899        let _ = stop_tx.send(());
900        driver.join().unwrap();
901
902        let snapshot = {
903            let s = state.read().unwrap();
904            s.processes.get("rnsd").cloned().unwrap()
905        };
906        assert_eq!(snapshot.ready_state, "draining");
907        assert!(snapshot
908            .status_detail
909            .unwrap_or_default()
910            .contains("still draining queued work"));
911    }
912
913    #[test]
914    fn stop_process_requests_rnsd_drain_before_termination() {
915        let (event_tx, event_rx) = rns_net::event::channel();
916        let auth_key = [0x51; 32];
917        let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
918        let supervisor = Supervisor::new(SupervisorConfig {
919            specs: vec![ProcessSpec {
920                role: Role::Rnsd,
921                command: ProcessCommand::External(PathBuf::from("sleep")),
922                args: vec!["60".into()],
923            }],
924            shared_state: None,
925            control_rx: None,
926            readiness: Vec::new(),
927            log_dir: None,
928            rnsd_drain: Some(RnsdDrainConfig {
929                rpc_addr,
930                auth_key,
931                timeout: Duration::from_millis(250),
932                poll_interval: Duration::from_millis(10),
933            }),
934        });
935        let mut children = vec![super::ManagedChild {
936            role: Role::Rnsd,
937            child: Command::new("sleep").arg("60").spawn().unwrap(),
938        }];
939        let (done_tx, done_rx) = mpsc::channel();
940
941        let driver = std::thread::spawn(move || {
942            let mut saw_begin_drain = false;
943            while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
944                match event {
945                    Event::BeginDrain { .. } => saw_begin_drain = true,
946                    Event::Query(QueryRequest::DrainStatus, resp_tx) => {
947                        let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
948                            state: LifecycleState::Stopping,
949                            drain_age_seconds: Some(0.05),
950                            deadline_remaining_seconds: Some(0.0),
951                            drain_complete: true,
952                            interface_writer_queued_frames: 0,
953                            provider_backlog_events: 0,
954                            provider_consumer_queued_events: 0,
955                            detail: Some("ready to stop".into()),
956                        }));
957                        let _ = done_tx.send(saw_begin_drain);
958                        break;
959                    }
960                    _ => {}
961                }
962            }
963        });
964
965        supervisor.stop_process("rnsd", &mut children).unwrap();
966        assert!(children.is_empty());
967        assert!(done_rx.recv_timeout(Duration::from_secs(1)).unwrap());
968        driver.join().unwrap();
969    }
970
971    #[test]
972    fn restart_process_requests_rnsd_drain_before_replacement() {
973        let (event_tx, event_rx) = rns_net::event::channel();
974        let auth_key = [0x61; 32];
975        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
976        ensure_process(&state, "rnsd");
977        let (rpc_addr, _server) = start_test_rpc_server(auth_key, event_tx);
978        let supervisor = Supervisor::new(SupervisorConfig {
979            specs: vec![ProcessSpec {
980                role: Role::Rnsd,
981                command: ProcessCommand::External(PathBuf::from("sleep")),
982                args: vec!["60".into()],
983            }],
984            shared_state: Some(state.clone()),
985            control_rx: None,
986            readiness: Vec::new(),
987            log_dir: None,
988            rnsd_drain: Some(RnsdDrainConfig {
989                rpc_addr: rpc_addr.clone(),
990                auth_key,
991                timeout: Duration::from_millis(250),
992                poll_interval: Duration::from_millis(10),
993            }),
994        });
995        let original = Command::new("sleep").arg("60").spawn().unwrap();
996        mark_process_running(&state, "rnsd", original.id());
997        let original_pid = original.id();
998        let mut children = vec![super::ManagedChild {
999            role: Role::Rnsd,
1000            child: original,
1001        }];
1002        let (done_tx, done_rx) = mpsc::channel();
1003
1004        let driver = std::thread::spawn(move || {
1005            let mut saw_begin_drain = false;
1006            while let Ok(event) = event_rx.recv_timeout(Duration::from_secs(1)) {
1007                match event {
1008                    Event::BeginDrain { .. } => saw_begin_drain = true,
1009                    Event::Query(QueryRequest::DrainStatus, resp_tx) => {
1010                        let _ = resp_tx.send(QueryResponse::DrainStatus(DrainStatus {
1011                            state: LifecycleState::Stopping,
1012                            drain_age_seconds: Some(0.05),
1013                            deadline_remaining_seconds: Some(0.0),
1014                            drain_complete: true,
1015                            interface_writer_queued_frames: 0,
1016                            provider_backlog_events: 0,
1017                            provider_consumer_queued_events: 0,
1018                            detail: Some("ready to restart".into()),
1019                        }));
1020                        let _ = done_tx.send(saw_begin_drain);
1021                        break;
1022                    }
1023                    _ => {}
1024                }
1025            }
1026        });
1027
1028        supervisor.restart_process("rnsd", &mut children).unwrap();
1029        assert_eq!(children.len(), 1);
1030        assert_eq!(children[0].role, Role::Rnsd);
1031        assert_ne!(children[0].child.id(), original_pid);
1032        assert!(done_rx.recv_timeout(Duration::from_secs(1)).unwrap());
1033        driver.join().unwrap();
1034
1035        let snapshot = {
1036            let s = state.read().unwrap();
1037            s.processes.get("rnsd").cloned().unwrap()
1038        };
1039        assert_eq!(snapshot.restart_count, 1);
1040
1041        let _ = children[0].child.kill();
1042        let _ = children[0].child.wait();
1043    }
1044
1045    #[test]
1046    fn supervisor_restarts_unexpectedly_exited_child_instead_of_exiting() {
1047        let temp_root = std::env::temp_dir().join(format!(
1048            "rns-server-supervisor-test-{}-{}",
1049            std::process::id(),
1050            SystemTime::now()
1051                .duration_since(UNIX_EPOCH)
1052                .unwrap_or_default()
1053                .as_nanos()
1054        ));
1055        std::fs::create_dir_all(&temp_root).unwrap();
1056        let count_path = temp_root.join("count");
1057        let script_path = temp_root.join("restart-once.sh");
1058        std::fs::write(
1059            &script_path,
1060            format!(
1061                "#!/usr/bin/env bash\nset -eu\ncount_file=\"{}\"\ncount=$(cat \"$count_file\" 2>/dev/null || echo 0)\ncount=$((count+1))\nprintf '%s' \"$count\" > \"$count_file\"\nif [ \"$count\" -eq 1 ]; then\n  exit 7\nfi\nsleep 60\n",
1062                count_path.display()
1063            ),
1064        )
1065        .unwrap();
1066        let mut perms = std::fs::metadata(&script_path).unwrap().permissions();
1067        perms.set_mode(0o755);
1068        std::fs::set_permissions(&script_path, perms).unwrap();
1069
1070        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
1071        ensure_process(&state, "rns-sentineld");
1072        let supervisor = Supervisor::new(SupervisorConfig {
1073            specs: vec![ProcessSpec {
1074                role: Role::Sentineld,
1075                command: ProcessCommand::External(script_path.clone()),
1076                args: Vec::new(),
1077            }],
1078            shared_state: Some(state.clone()),
1079            control_rx: None,
1080            readiness: Vec::new(),
1081            log_dir: None,
1082            rnsd_drain: None,
1083        });
1084        let (result_tx, result_rx) = mpsc::channel();
1085
1086        let handle = std::thread::spawn(move || {
1087            let result = supervisor.run();
1088            let _ = result_tx.send(result);
1089        });
1090
1091        match result_rx.recv_timeout(Duration::from_secs(2)) {
1092            Err(mpsc::RecvTimeoutError::Timeout) => {}
1093            Ok(result) => panic!("supervisor exited unexpectedly: {:?}", result),
1094            Err(err) => panic!("failed waiting for supervisor result: {}", err),
1095        }
1096
1097        let restart_count = {
1098            let s = state.read().unwrap();
1099            let process = s.processes.get("rns-sentineld").cloned().unwrap();
1100            assert!(
1101                process.pid.is_some(),
1102                "expected restarted child to be running"
1103            );
1104            process.restart_count
1105        };
1106        assert_eq!(restart_count, 1);
1107        assert_eq!(std::fs::read_to_string(&count_path).unwrap(), "2");
1108
1109        STOP_TX.lock().unwrap().as_ref().unwrap().send(()).unwrap();
1110        let result = result_rx.recv_timeout(Duration::from_secs(2)).unwrap();
1111        assert_eq!(result.unwrap(), 0);
1112        handle.join().unwrap();
1113        let _ = std::fs::remove_dir_all(temp_root);
1114    }
1115
1116    fn unique_temp_path(prefix: &str) -> PathBuf {
1117        let now = SystemTime::now()
1118            .duration_since(UNIX_EPOCH)
1119            .unwrap_or_default()
1120            .as_nanos();
1121        std::env::temp_dir().join(format!("{prefix}-{}-{now}.ready", std::process::id()))
1122    }
1123
1124    fn start_test_rpc_server(auth_key: [u8; 32], event_tx: EventSender) -> (RpcAddr, RpcServer) {
1125        for _ in 0..16 {
1126            let listener = TcpListener::bind(("127.0.0.1", 0)).unwrap();
1127            let port = listener.local_addr().unwrap().port();
1128            drop(listener);
1129            let rpc_addr = RpcAddr::Tcp("127.0.0.1".into(), port);
1130            match RpcServer::start(&rpc_addr, auth_key, event_tx.clone()) {
1131                Ok(server) => return (rpc_addr, server),
1132                Err(err) if err.kind() == std::io::ErrorKind::AddrInUse => continue,
1133                Err(err) => panic!("failed to start rpc server for test: {err}"),
1134            }
1135        }
1136
1137        panic!("failed to allocate rpc server address for test");
1138    }
1139}