Skip to main content

rns_ctl/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14/// Shared state accessible from HTTP handlers and Callbacks.
15pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18    Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20    dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21        + Send
22        + Sync,
23>;
24
25/// Registry of WebSocket broadcast senders.
26pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29    pub started_at: Instant,
30    pub server_mode: String,
31    pub server_config: Option<ServerConfigSnapshot>,
32    pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33    pub server_config_status: ServerConfigStatusState,
34    pub server_config_validator: Option<ServerConfigValidator>,
35    pub server_config_mutator: Option<ServerConfigMutator>,
36    pub identity_hash: Option<[u8; 16]>,
37    pub identity: Option<Identity>,
38    pub announces: VecDeque<AnnounceRecord>,
39    pub packets: VecDeque<PacketRecord>,
40    pub proofs: VecDeque<ProofRecord>,
41    pub link_events: VecDeque<LinkEventRecord>,
42    pub resource_events: VecDeque<ResourceEventRecord>,
43    pub process_events: VecDeque<ProcessEventRecord>,
44    pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45    pub destinations: HashMap<[u8; 16], DestinationEntry>,
46    pub processes: HashMap<String, ManagedProcessState>,
47    pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48    pub control_plane_config: Option<ControlPlaneConfigHandle>,
49    pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52/// A registered destination plus metadata for the API.
53pub struct DestinationEntry {
54    pub destination: Destination,
55    /// Full name: "app_name.aspect1.aspect2"
56    pub full_name: String,
57}
58
59impl CtlState {
60    pub fn new() -> Self {
61        CtlState {
62            started_at: Instant::now(),
63            server_mode: "standalone".into(),
64            server_config: None,
65            server_config_schema: None,
66            server_config_status: ServerConfigStatusState::default(),
67            server_config_validator: None,
68            server_config_mutator: None,
69            identity_hash: None,
70            identity: None,
71            announces: VecDeque::new(),
72            packets: VecDeque::new(),
73            proofs: VecDeque::new(),
74            link_events: VecDeque::new(),
75            resource_events: VecDeque::new(),
76            process_events: VecDeque::new(),
77            process_logs: HashMap::new(),
78            destinations: HashMap::new(),
79            processes: HashMap::new(),
80            control_tx: None,
81            control_plane_config: None,
82            node_handle: None,
83        }
84    }
85
86    pub fn uptime_seconds(&self) -> f64 {
87        self.started_at.elapsed().as_secs_f64()
88    }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92    if deque.len() >= MAX_RECORDS {
93        deque.pop_front();
94    }
95    deque.push_back(item);
96}
97
98pub(crate) fn read_state<'a>(state: &'a SharedState) -> RwLockReadGuard<'a, CtlState> {
99    match state.read() {
100        Ok(guard) => guard,
101        Err(poisoned) => {
102            log::error!("recovering from poisoned control-plane shared state read lock");
103            poisoned.into_inner()
104        }
105    }
106}
107
108pub(crate) fn write_state<'a>(state: &'a SharedState) -> RwLockWriteGuard<'a, CtlState> {
109    match state.write() {
110        Ok(guard) => guard,
111        Err(poisoned) => {
112            log::error!("recovering from poisoned control-plane shared state write lock");
113            poisoned.into_inner()
114        }
115    }
116}
117
118pub(crate) fn read_control_plane_config<'a>(
119    config: &'a ControlPlaneConfigHandle,
120) -> RwLockReadGuard<'a, crate::config::CtlConfig> {
121    match config.read() {
122        Ok(guard) => guard,
123        Err(poisoned) => {
124            log::error!("recovering from poisoned control-plane config read lock");
125            poisoned.into_inner()
126        }
127    }
128}
129
130pub(crate) fn lock_ws_broadcast<'a>(
131    ws: &'a WsBroadcast,
132) -> MutexGuard<'a, Vec<std::sync::mpsc::Sender<WsEvent>>> {
133    match ws.lock() {
134        Ok(guard) => guard,
135        Err(poisoned) => {
136            log::error!("recovering from poisoned WebSocket broadcast registry");
137            poisoned.into_inner()
138        }
139    }
140}
141
142pub(crate) fn lock_node_handle<'a>(
143    node: &'a Arc<Mutex<Option<RnsNode>>>,
144) -> MutexGuard<'a, Option<RnsNode>> {
145    match node.lock() {
146        Ok(guard) => guard,
147        Err(poisoned) => {
148            log::error!("recovering from poisoned node handle lock");
149            poisoned.into_inner()
150        }
151    }
152}
153
154pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
155    let mut s = write_state(state);
156    push_capped(&mut s.announces, record);
157}
158
159pub fn push_packet(state: &SharedState, record: PacketRecord) {
160    let mut s = write_state(state);
161    push_capped(&mut s.packets, record);
162}
163
164pub fn push_proof(state: &SharedState, record: ProofRecord) {
165    let mut s = write_state(state);
166    push_capped(&mut s.proofs, record);
167}
168
169pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
170    let mut s = write_state(state);
171    push_capped(&mut s.link_events, record);
172}
173
174pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
175    let mut s = write_state(state);
176    push_capped(&mut s.resource_events, record);
177}
178
179/// Broadcast a WsEvent to all connected WebSocket clients.
180pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
181    let mut senders = lock_ws_broadcast(ws);
182    senders.retain(|tx| tx.send(event.clone()).is_ok());
183}
184
185pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
186    let mut s = write_state(state);
187    s.server_mode = mode.into();
188}
189
190pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
191    let mut s = write_state(state);
192    s.server_config = Some(config);
193}
194
195pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
196    let mut s = write_state(state);
197    s.server_config_schema = Some(schema);
198}
199
200pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
201    let mut s = write_state(state);
202    s.server_config_status.last_saved_at = Some(Instant::now());
203    s.server_config_status.last_action = Some("save".into());
204    s.server_config_status.last_action_at = Some(Instant::now());
205    s.server_config_status.pending_process_restarts.clear();
206    s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
207    s.server_config_status.control_plane_restart_required =
208        apply_plan.control_plane_restart_required;
209    s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
210        || apply_plan.control_plane_reload_required
211        || apply_plan.control_plane_restart_required;
212    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
213}
214
215pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
216    let mut s = write_state(state);
217    let now = Instant::now();
218    s.server_config_status.last_saved_at = Some(now);
219    s.server_config_status.last_apply_at = Some(now);
220    s.server_config_status.last_action = Some("apply".into());
221    s.server_config_status.last_action_at = Some(now);
222    s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
223    s.server_config_status.control_plane_reload_required = false;
224    s.server_config_status.control_plane_restart_required =
225        apply_plan.control_plane_restart_required;
226    s.server_config_status.runtime_differs_from_saved =
227        !s.server_config_status.pending_process_restarts.is_empty()
228            || s.server_config_status.control_plane_restart_required;
229    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
230}
231
232pub fn reconcile_config_status_for_process(
233    state: &SharedState,
234    name: &str,
235    ready: bool,
236    status: &str,
237) {
238    let mut s = write_state(state);
239    if ready {
240        s.server_config_status
241            .pending_process_restarts
242            .retain(|process| process != name);
243    }
244    if status == "failed" {
245        s.server_config_status.runtime_differs_from_saved = true;
246    } else if s.server_config_status.pending_process_restarts.is_empty()
247        && !s.server_config_status.control_plane_reload_required
248        && !s.server_config_status.control_plane_restart_required
249    {
250        s.server_config_status.runtime_differs_from_saved = false;
251    }
252}
253
254pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
255    let mut s = write_state(state);
256    s.server_config_validator = Some(validator);
257}
258
259pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
260    let mut s = write_state(state);
261    s.server_config_mutator = Some(mutator);
262}
263
264pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
265    let mut s = write_state(state);
266    let name = name.into();
267    s.processes
268        .entry(name.clone())
269        .or_insert_with(|| ManagedProcessState::new(name.clone()));
270    s.process_logs.entry(name.clone()).or_default();
271    push_capped(
272        &mut s.process_events,
273        ProcessEventRecord::new(name, "registered", Some("process registered".into())),
274    );
275}
276
277pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
278    let mut s = write_state(state);
279    let recent_log_lines = {
280        let logs = s.process_logs.entry(name.to_string()).or_default();
281        if logs.len() >= MAX_RECORDS {
282            logs.pop_front();
283        }
284        logs.push_back(ProcessLogRecord {
285            process: name.to_string(),
286            stream: stream.to_string(),
287            line: line.into(),
288            recorded_at: Instant::now(),
289        });
290        logs.len()
291    };
292    let process = s
293        .processes
294        .entry(name.to_string())
295        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
296    process.last_log_at = Some(Instant::now());
297    process.recent_log_lines = recent_log_lines;
298}
299
300pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
301    let mut s = write_state(state);
302    let process = s
303        .processes
304        .entry(name.to_string())
305        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
306    process.durable_log_path = Some(path.into());
307}
308
309pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
310    let mut s = write_state(state);
311    s.control_tx = Some(tx);
312}
313
314pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
315    let mut s = write_state(state);
316    s.control_plane_config = Some(config);
317}
318
319pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
320    let mut s = write_state(state);
321    let process = s
322        .processes
323        .entry(name.to_string())
324        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
325    process.status = "running".into();
326    process.ready = false;
327    process.ready_state = "starting".into();
328    process.pid = Some(pid);
329    process.started_at = Some(Instant::now());
330    process.last_transition_at = Some(Instant::now());
331    process.last_error = None;
332    process.status_detail = Some("process spawned".into());
333    push_capped(
334        &mut s.process_events,
335        ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
336    );
337    drop(s);
338    reconcile_config_status_for_process(state, name, false, "running");
339}
340
341pub fn bump_process_restart_count(state: &SharedState, name: &str) {
342    let mut s = write_state(state);
343    let restart_count = {
344        let process = s
345            .processes
346            .entry(name.to_string())
347            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
348        process.restart_count = process.restart_count.saturating_add(1);
349        process.restart_count
350    };
351    push_capped(
352        &mut s.process_events,
353        ProcessEventRecord::new(
354            name.to_string(),
355            "restart_requested",
356            Some(format!("restart_count={}", restart_count)),
357        ),
358    );
359}
360
361pub fn record_process_termination_observation(
362    state: &SharedState,
363    name: &str,
364    drain_acknowledged: bool,
365    forced_kill: bool,
366) {
367    let mut s = write_state(state);
368    let detail = {
369        let process = s
370            .processes
371            .entry(name.to_string())
372            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
373        if drain_acknowledged {
374            process.drain_ack_count = process.drain_ack_count.saturating_add(1);
375        }
376        if forced_kill {
377            process.forced_kill_count = process.forced_kill_count.saturating_add(1);
378        }
379
380        let mut parts = Vec::new();
381        if drain_acknowledged {
382            parts.push(format!("drain_ack_count={}", process.drain_ack_count));
383        }
384        if forced_kill {
385            parts.push(format!("forced_kill_count={}", process.forced_kill_count));
386        }
387        (!parts.is_empty()).then(|| parts.join(", "))
388    };
389
390    if let Some(detail) = detail {
391        push_capped(
392            &mut s.process_events,
393            ProcessEventRecord::new(name.to_string(), "termination_observed", Some(detail)),
394        );
395    }
396}
397
398pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
399    let mut s = write_state(state);
400    let process = s
401        .processes
402        .entry(name.to_string())
403        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
404    process.status = "stopped".into();
405    process.ready = false;
406    process.ready_state = "stopped".into();
407    process.pid = None;
408    process.last_exit_code = exit_code;
409    process.started_at = None;
410    process.last_transition_at = Some(Instant::now());
411    process.status_detail = Some("process stopped".into());
412    push_capped(
413        &mut s.process_events,
414        ProcessEventRecord::new(
415            name.to_string(),
416            "stopped",
417            Some(format!(
418                "exit_code={}",
419                exit_code
420                    .map(|v| v.to_string())
421                    .unwrap_or_else(|| "none".into())
422            )),
423        ),
424    );
425    drop(s);
426    reconcile_config_status_for_process(state, name, false, "stopped");
427}
428
429pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
430    let mut s = write_state(state);
431    let detail = {
432        let process = s
433            .processes
434            .entry(name.to_string())
435            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
436        process.status = "failed".into();
437        process.ready = false;
438        process.ready_state = "failed".into();
439        process.pid = None;
440        process.last_error = Some(error);
441        process.started_at = None;
442        process.last_transition_at = Some(Instant::now());
443        process.status_detail = process.last_error.clone();
444        process.last_error.clone()
445    };
446    push_capped(
447        &mut s.process_events,
448        ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
449    );
450    drop(s);
451    reconcile_config_status_for_process(state, name, false, "failed");
452}
453
454pub fn set_process_readiness(
455    state: &SharedState,
456    name: &str,
457    ready: bool,
458    ready_state: &str,
459    status_detail: Option<String>,
460) {
461    let mut s = write_state(state);
462    let detail_clone = {
463        let process = s
464            .processes
465            .entry(name.to_string())
466            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
467        process.ready = ready;
468        process.ready_state = ready_state.to_string();
469        process.status_detail = status_detail;
470        process.status_detail.clone()
471    };
472    let should_record = match s.process_events.back() {
473        Some(last) => {
474            last.process != name || last.event != ready_state || last.detail != detail_clone
475        }
476        None => true,
477    };
478    if should_record {
479        push_capped(
480            &mut s.process_events,
481            ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
482        );
483    }
484    drop(s);
485    reconcile_config_status_for_process(state, name, ready, ready_state);
486}
487
488// --- Record types ---
489
490#[derive(Debug, Clone, Serialize)]
491pub struct AnnounceRecord {
492    pub dest_hash: String,
493    pub identity_hash: String,
494    pub hops: u8,
495    pub app_data: Option<String>,
496    pub received_at: f64,
497}
498
499#[derive(Debug, Clone, Serialize)]
500pub struct PacketRecord {
501    pub dest_hash: String,
502    pub packet_hash: String,
503    pub data_base64: String,
504    pub received_at: f64,
505}
506
507#[derive(Debug, Clone, Serialize)]
508pub struct ProofRecord {
509    pub dest_hash: String,
510    pub packet_hash: String,
511    pub rtt: f64,
512}
513
514#[derive(Debug, Clone, Serialize)]
515pub struct LinkEventRecord {
516    pub link_id: String,
517    pub event_type: String,
518    pub is_initiator: Option<bool>,
519    pub rtt: Option<f64>,
520    pub identity_hash: Option<String>,
521    pub reason: Option<String>,
522}
523
524#[derive(Debug, Clone, Serialize)]
525pub struct ResourceEventRecord {
526    pub link_id: String,
527    pub event_type: String,
528    pub data_base64: Option<String>,
529    pub metadata_base64: Option<String>,
530    pub error: Option<String>,
531    pub received: Option<usize>,
532    pub total: Option<usize>,
533}
534
535#[derive(Debug, Clone)]
536pub struct ProcessEventRecord {
537    pub process: String,
538    pub event: String,
539    pub detail: Option<String>,
540    pub recorded_at: Instant,
541}
542
543#[derive(Debug, Clone)]
544pub struct ProcessLogRecord {
545    pub process: String,
546    pub stream: String,
547    pub line: String,
548    pub recorded_at: Instant,
549}
550
551impl ProcessEventRecord {
552    fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
553        Self {
554            process,
555            event: event.into(),
556            detail,
557            recorded_at: Instant::now(),
558        }
559    }
560}
561
562#[derive(Debug, Clone, Serialize)]
563pub struct ServerConfigSnapshot {
564    pub config_path: Option<String>,
565    pub resolved_config_dir: String,
566    pub server_config_file_path: String,
567    pub server_config_file_present: bool,
568    pub server_config_file_json: String,
569    pub stats_db_path: String,
570    pub rnsd_bin: String,
571    pub sentineld_bin: String,
572    pub statsd_bin: String,
573    pub http: ServerHttpConfigSnapshot,
574    pub launch_plan: Vec<LaunchProcessSnapshot>,
575}
576
577#[derive(Debug, Clone, Serialize)]
578pub struct ServerConfigSchemaSnapshot {
579    pub format: String,
580    pub example_config_json: String,
581    pub notes: Vec<String>,
582    pub fields: Vec<ServerConfigFieldSchema>,
583}
584
585#[derive(Debug, Clone, Serialize)]
586pub struct ServerConfigFieldSchema {
587    pub field: String,
588    pub field_type: String,
589    pub required: bool,
590    pub default_value: String,
591    pub description: String,
592    pub effect: String,
593}
594
595#[derive(Debug, Clone, Serialize)]
596pub struct ServerConfigStatusSnapshot {
597    pub last_saved_age_seconds: Option<f64>,
598    pub last_apply_age_seconds: Option<f64>,
599    pub last_action: Option<String>,
600    pub last_action_age_seconds: Option<f64>,
601    pub pending_action: Option<String>,
602    pub pending_targets: Vec<String>,
603    pub blocking_reason: Option<String>,
604    pub pending_process_restarts: Vec<String>,
605    pub control_plane_reload_required: bool,
606    pub control_plane_restart_required: bool,
607    pub runtime_differs_from_saved: bool,
608    pub converged: bool,
609    pub summary: String,
610    pub last_apply_plan: Option<ServerConfigApplyPlan>,
611}
612
613#[derive(Debug, Clone, Serialize)]
614pub struct ServerHttpConfigSnapshot {
615    pub enabled: bool,
616    pub host: String,
617    pub port: u16,
618    pub auth_mode: String,
619    pub token_configured: bool,
620    pub daemon_mode: bool,
621}
622
623#[derive(Debug, Clone, Serialize)]
624pub struct LaunchProcessSnapshot {
625    pub name: String,
626    pub bin: String,
627    pub args: Vec<String>,
628    pub command_line: String,
629}
630
631#[derive(Debug, Clone, Serialize)]
632pub struct ServerConfigValidationSnapshot {
633    pub valid: bool,
634    pub config: ServerConfigSnapshot,
635    pub warnings: Vec<String>,
636}
637
638#[derive(Debug, Clone, Serialize)]
639pub struct ServerConfigMutationResult {
640    pub action: String,
641    pub config: ServerConfigSnapshot,
642    pub apply_plan: ServerConfigApplyPlan,
643    pub warnings: Vec<String>,
644}
645
646#[derive(Debug, Clone, Serialize)]
647pub struct ServerConfigApplyPlan {
648    pub overall_action: String,
649    pub processes_to_restart: Vec<String>,
650    pub control_plane_reload_required: bool,
651    pub control_plane_restart_required: bool,
652    pub notes: Vec<String>,
653    pub changes: Vec<ServerConfigChange>,
654}
655
656#[derive(Debug, Clone, Serialize)]
657pub struct ServerConfigChange {
658    pub field: String,
659    pub before: String,
660    pub after: String,
661    pub effect: String,
662}
663
664#[derive(Debug, Clone, Copy)]
665pub enum ServerConfigMutationMode {
666    Save,
667    Apply,
668}
669
670#[derive(Debug, Clone, Default)]
671pub struct ServerConfigStatusState {
672    pub last_saved_at: Option<Instant>,
673    pub last_apply_at: Option<Instant>,
674    pub last_action: Option<String>,
675    pub last_action_at: Option<Instant>,
676    pub pending_process_restarts: Vec<String>,
677    pub control_plane_reload_required: bool,
678    pub control_plane_restart_required: bool,
679    pub runtime_differs_from_saved: bool,
680    pub last_apply_plan: Option<ServerConfigApplyPlan>,
681}
682
683impl ServerConfigStatusState {
684    pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
685        let converged = self.pending_process_restarts.is_empty()
686            && !self.control_plane_reload_required
687            && !self.control_plane_restart_required;
688        let pending_action = (!converged)
689            .then(|| {
690                self.last_apply_plan
691                    .as_ref()
692                    .map(|plan| plan.overall_action.clone())
693            })
694            .flatten();
695        let mut pending_targets = self.pending_process_restarts.clone();
696        if self.control_plane_reload_required {
697            pending_targets.push("embedded-http-auth".into());
698        }
699        if self.control_plane_restart_required {
700            pending_targets.push("rns-server".into());
701        }
702        let blocking_reason = if self.control_plane_restart_required {
703            Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
704        } else if self.control_plane_reload_required {
705            Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
706        } else if !self.pending_process_restarts.is_empty() {
707            Some(format!(
708                "Waiting for restarted processes to become ready: {}.",
709                self.pending_process_restarts.join(", ")
710            ))
711        } else {
712            None
713        };
714        let summary = if self.runtime_differs_from_saved {
715            if self.control_plane_restart_required {
716                "Saved config is not fully active; rns-server restart is still required.".into()
717            } else if self.control_plane_reload_required {
718                "Saved config is not fully active; embedded HTTP auth reload is still required."
719                    .into()
720            } else if self.pending_process_restarts.is_empty() {
721                "Saved config differs from runtime state.".into()
722            } else {
723                format!(
724                    "Waiting for restarted processes to converge: {}.",
725                    self.pending_process_restarts.join(", ")
726                )
727            }
728        } else {
729            "Running state is converged with the saved config.".into()
730        };
731
732        ServerConfigStatusSnapshot {
733            last_saved_age_seconds: self
734                .last_saved_at
735                .map(|instant| instant.elapsed().as_secs_f64()),
736            last_apply_age_seconds: self
737                .last_apply_at
738                .map(|instant| instant.elapsed().as_secs_f64()),
739            last_action: self.last_action.clone(),
740            last_action_age_seconds: self
741                .last_action_at
742                .map(|instant| instant.elapsed().as_secs_f64()),
743            pending_action,
744            pending_targets,
745            blocking_reason,
746            pending_process_restarts: self.pending_process_restarts.clone(),
747            control_plane_reload_required: self.control_plane_reload_required,
748            control_plane_restart_required: self.control_plane_restart_required,
749            runtime_differs_from_saved: self.runtime_differs_from_saved,
750            converged,
751            summary,
752            last_apply_plan: self.last_apply_plan.clone(),
753        }
754    }
755}
756
757#[derive(Debug, Clone)]
758pub struct ManagedProcessState {
759    pub name: String,
760    pub status: String,
761    pub ready: bool,
762    pub ready_state: String,
763    pub pid: Option<u32>,
764    pub last_exit_code: Option<i32>,
765    pub restart_count: u32,
766    pub drain_ack_count: u32,
767    pub forced_kill_count: u32,
768    pub last_error: Option<String>,
769    pub status_detail: Option<String>,
770    pub durable_log_path: Option<String>,
771    pub last_log_at: Option<Instant>,
772    pub recent_log_lines: usize,
773    pub started_at: Option<Instant>,
774    pub last_transition_at: Option<Instant>,
775}
776
777#[derive(Debug, Clone)]
778pub enum ProcessControlCommand {
779    Restart(String),
780    Start(String),
781    Stop(String),
782}
783
784impl ManagedProcessState {
785    pub fn new(name: String) -> Self {
786        Self {
787            name,
788            status: "stopped".into(),
789            ready: false,
790            ready_state: "stopped".into(),
791            pid: None,
792            last_exit_code: None,
793            restart_count: 0,
794            drain_ack_count: 0,
795            forced_kill_count: 0,
796            last_error: None,
797            status_detail: None,
798            durable_log_path: None,
799            last_log_at: None,
800            recent_log_lines: 0,
801            started_at: None,
802            last_transition_at: None,
803        }
804    }
805
806    pub fn uptime_seconds(&self) -> Option<f64> {
807        self.started_at
808            .map(|started| started.elapsed().as_secs_f64())
809    }
810
811    pub fn last_transition_seconds(&self) -> Option<f64> {
812        self.last_transition_at
813            .map(|transition| transition.elapsed().as_secs_f64())
814    }
815
816    pub fn last_log_age_seconds(&self) -> Option<f64> {
817        self.last_log_at
818            .map(|logged| logged.elapsed().as_secs_f64())
819    }
820}
821
822// --- WebSocket events ---
823
824#[derive(Debug, Clone)]
825pub struct WsEvent {
826    pub topic: &'static str,
827    pub payload: serde_json::Value,
828}
829
830impl WsEvent {
831    pub fn announce(record: &AnnounceRecord) -> Self {
832        WsEvent {
833            topic: "announces",
834            payload: serde_json::to_value(record).unwrap_or_default(),
835        }
836    }
837
838    pub fn packet(record: &PacketRecord) -> Self {
839        WsEvent {
840            topic: "packets",
841            payload: serde_json::to_value(record).unwrap_or_default(),
842        }
843    }
844
845    pub fn proof(record: &ProofRecord) -> Self {
846        WsEvent {
847            topic: "proofs",
848            payload: serde_json::to_value(record).unwrap_or_default(),
849        }
850    }
851
852    pub fn link(record: &LinkEventRecord) -> Self {
853        WsEvent {
854            topic: "links",
855            payload: serde_json::to_value(record).unwrap_or_default(),
856        }
857    }
858
859    pub fn resource(record: &ResourceEventRecord) -> Self {
860        WsEvent {
861            topic: "resources",
862            payload: serde_json::to_value(record).unwrap_or_default(),
863        }
864    }
865
866    pub fn to_json(&self) -> String {
867        let obj = serde_json::json!({
868            "type": self.topic.trim_end_matches('s'),
869            "data": self.payload,
870        });
871        serde_json::to_string(&obj).unwrap_or_default()
872    }
873}
874
875#[cfg(test)]
876mod tests {
877    use super::{
878        mark_process_running, record_process_termination_observation, CtlState, SharedState,
879    };
880    use std::sync::{Arc, RwLock};
881
882    #[test]
883    fn termination_observation_tracks_drain_ack_and_forced_kill_counts() {
884        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
885        mark_process_running(&state, "rnsd", 1234);
886
887        record_process_termination_observation(&state, "rnsd", true, false);
888        record_process_termination_observation(&state, "rnsd", false, true);
889
890        let snapshot = {
891            let s = state.read().unwrap();
892            s.processes.get("rnsd").cloned().unwrap()
893        };
894        assert_eq!(snapshot.drain_ack_count, 1);
895        assert_eq!(snapshot.forced_kill_count, 1);
896    }
897}
898
899/// Helper to create an AnnounceRecord from callback data.
900pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
901    AnnounceRecord {
902        dest_hash: to_hex(&announced.dest_hash.0),
903        identity_hash: to_hex(&announced.identity_hash.0),
904        hops: announced.hops,
905        app_data: announced
906            .app_data
907            .as_ref()
908            .map(|d| crate::encode::to_base64(d)),
909        received_at: announced.received_at,
910    }
911}