Skip to main content

rns_ctl/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14/// Shared state accessible from HTTP handlers and Callbacks.
15pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18    Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20    dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21        + Send
22        + Sync,
23>;
24
25/// Registry of WebSocket broadcast senders.
26pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29    pub started_at: Instant,
30    pub server_mode: String,
31    pub server_config: Option<ServerConfigSnapshot>,
32    pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33    pub server_config_status: ServerConfigStatusState,
34    pub server_config_validator: Option<ServerConfigValidator>,
35    pub server_config_mutator: Option<ServerConfigMutator>,
36    pub identity_hash: Option<[u8; 16]>,
37    pub identity: Option<Identity>,
38    pub announces: VecDeque<AnnounceRecord>,
39    pub packets: VecDeque<PacketRecord>,
40    pub proofs: VecDeque<ProofRecord>,
41    pub link_events: VecDeque<LinkEventRecord>,
42    pub resource_events: VecDeque<ResourceEventRecord>,
43    pub process_events: VecDeque<ProcessEventRecord>,
44    pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45    pub destinations: HashMap<[u8; 16], DestinationEntry>,
46    pub processes: HashMap<String, ManagedProcessState>,
47    pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48    pub control_plane_config: Option<ControlPlaneConfigHandle>,
49    pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52/// A registered destination plus metadata for the API.
53pub struct DestinationEntry {
54    pub destination: Destination,
55    /// Full name: "app_name.aspect1.aspect2"
56    pub full_name: String,
57}
58
59impl CtlState {
60    pub fn new() -> Self {
61        CtlState {
62            started_at: Instant::now(),
63            server_mode: "standalone".into(),
64            server_config: None,
65            server_config_schema: None,
66            server_config_status: ServerConfigStatusState::default(),
67            server_config_validator: None,
68            server_config_mutator: None,
69            identity_hash: None,
70            identity: None,
71            announces: VecDeque::new(),
72            packets: VecDeque::new(),
73            proofs: VecDeque::new(),
74            link_events: VecDeque::new(),
75            resource_events: VecDeque::new(),
76            process_events: VecDeque::new(),
77            process_logs: HashMap::new(),
78            destinations: HashMap::new(),
79            processes: HashMap::new(),
80            control_tx: None,
81            control_plane_config: None,
82            node_handle: None,
83        }
84    }
85
86    pub fn uptime_seconds(&self) -> f64 {
87        self.started_at.elapsed().as_secs_f64()
88    }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92    if deque.len() >= MAX_RECORDS {
93        deque.pop_front();
94    }
95    deque.push_back(item);
96}
97
98pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
99    let mut s = state.write().unwrap();
100    push_capped(&mut s.announces, record);
101}
102
103pub fn push_packet(state: &SharedState, record: PacketRecord) {
104    let mut s = state.write().unwrap();
105    push_capped(&mut s.packets, record);
106}
107
108pub fn push_proof(state: &SharedState, record: ProofRecord) {
109    let mut s = state.write().unwrap();
110    push_capped(&mut s.proofs, record);
111}
112
113pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
114    let mut s = state.write().unwrap();
115    push_capped(&mut s.link_events, record);
116}
117
118pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
119    let mut s = state.write().unwrap();
120    push_capped(&mut s.resource_events, record);
121}
122
123/// Broadcast a WsEvent to all connected WebSocket clients.
124pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
125    let mut senders = ws.lock().unwrap();
126    senders.retain(|tx| tx.send(event.clone()).is_ok());
127}
128
129pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
130    let mut s = state.write().unwrap();
131    s.server_mode = mode.into();
132}
133
134pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
135    let mut s = state.write().unwrap();
136    s.server_config = Some(config);
137}
138
139pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
140    let mut s = state.write().unwrap();
141    s.server_config_schema = Some(schema);
142}
143
144pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
145    let mut s = state.write().unwrap();
146    s.server_config_status.last_saved_at = Some(Instant::now());
147    s.server_config_status.last_action = Some("save".into());
148    s.server_config_status.last_action_at = Some(Instant::now());
149    s.server_config_status.pending_process_restarts.clear();
150    s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
151    s.server_config_status.control_plane_restart_required =
152        apply_plan.control_plane_restart_required;
153    s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
154        || apply_plan.control_plane_reload_required
155        || apply_plan.control_plane_restart_required;
156    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
157}
158
159pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
160    let mut s = state.write().unwrap();
161    let now = Instant::now();
162    s.server_config_status.last_saved_at = Some(now);
163    s.server_config_status.last_apply_at = Some(now);
164    s.server_config_status.last_action = Some("apply".into());
165    s.server_config_status.last_action_at = Some(now);
166    s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
167    s.server_config_status.control_plane_reload_required = false;
168    s.server_config_status.control_plane_restart_required =
169        apply_plan.control_plane_restart_required;
170    s.server_config_status.runtime_differs_from_saved =
171        !s.server_config_status.pending_process_restarts.is_empty()
172            || s.server_config_status.control_plane_restart_required;
173    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
174}
175
176pub fn reconcile_config_status_for_process(
177    state: &SharedState,
178    name: &str,
179    ready: bool,
180    status: &str,
181) {
182    let mut s = state.write().unwrap();
183    if ready {
184        s.server_config_status
185            .pending_process_restarts
186            .retain(|process| process != name);
187    }
188    if status == "failed" {
189        s.server_config_status.runtime_differs_from_saved = true;
190    } else if s.server_config_status.pending_process_restarts.is_empty()
191        && !s.server_config_status.control_plane_reload_required
192        && !s.server_config_status.control_plane_restart_required
193    {
194        s.server_config_status.runtime_differs_from_saved = false;
195    }
196}
197
198pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
199    let mut s = state.write().unwrap();
200    s.server_config_validator = Some(validator);
201}
202
203pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
204    let mut s = state.write().unwrap();
205    s.server_config_mutator = Some(mutator);
206}
207
208pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
209    let mut s = state.write().unwrap();
210    let name = name.into();
211    s.processes
212        .entry(name.clone())
213        .or_insert_with(|| ManagedProcessState::new(name.clone()));
214    s.process_logs.entry(name.clone()).or_default();
215    push_capped(
216        &mut s.process_events,
217        ProcessEventRecord::new(name, "registered", Some("process registered".into())),
218    );
219}
220
221pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
222    let mut s = state.write().unwrap();
223    let recent_log_lines = {
224        let logs = s.process_logs.entry(name.to_string()).or_default();
225        if logs.len() >= MAX_RECORDS {
226            logs.pop_front();
227        }
228        logs.push_back(ProcessLogRecord {
229            process: name.to_string(),
230            stream: stream.to_string(),
231            line: line.into(),
232            recorded_at: Instant::now(),
233        });
234        logs.len()
235    };
236    let process = s
237        .processes
238        .entry(name.to_string())
239        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
240    process.last_log_at = Some(Instant::now());
241    process.recent_log_lines = recent_log_lines;
242}
243
244pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
245    let mut s = state.write().unwrap();
246    let process = s
247        .processes
248        .entry(name.to_string())
249        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
250    process.durable_log_path = Some(path.into());
251}
252
253pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
254    let mut s = state.write().unwrap();
255    s.control_tx = Some(tx);
256}
257
258pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
259    let mut s = state.write().unwrap();
260    s.control_plane_config = Some(config);
261}
262
263pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
264    let mut s = state.write().unwrap();
265    let process = s
266        .processes
267        .entry(name.to_string())
268        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
269    process.status = "running".into();
270    process.ready = false;
271    process.ready_state = "starting".into();
272    process.pid = Some(pid);
273    process.started_at = Some(Instant::now());
274    process.last_transition_at = Some(Instant::now());
275    process.last_error = None;
276    process.status_detail = Some("process spawned".into());
277    push_capped(
278        &mut s.process_events,
279        ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
280    );
281    drop(s);
282    reconcile_config_status_for_process(state, name, false, "running");
283}
284
285pub fn bump_process_restart_count(state: &SharedState, name: &str) {
286    let mut s = state.write().unwrap();
287    let restart_count = {
288        let process = s
289            .processes
290            .entry(name.to_string())
291            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
292        process.restart_count = process.restart_count.saturating_add(1);
293        process.restart_count
294    };
295    push_capped(
296        &mut s.process_events,
297        ProcessEventRecord::new(
298            name.to_string(),
299            "restart_requested",
300            Some(format!("restart_count={}", restart_count)),
301        ),
302    );
303}
304
305pub fn record_process_termination_observation(
306    state: &SharedState,
307    name: &str,
308    drain_acknowledged: bool,
309    forced_kill: bool,
310) {
311    let mut s = state.write().unwrap();
312    let detail = {
313        let process = s
314            .processes
315            .entry(name.to_string())
316            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
317        if drain_acknowledged {
318            process.drain_ack_count = process.drain_ack_count.saturating_add(1);
319        }
320        if forced_kill {
321            process.forced_kill_count = process.forced_kill_count.saturating_add(1);
322        }
323
324        let mut parts = Vec::new();
325        if drain_acknowledged {
326            parts.push(format!("drain_ack_count={}", process.drain_ack_count));
327        }
328        if forced_kill {
329            parts.push(format!("forced_kill_count={}", process.forced_kill_count));
330        }
331        (!parts.is_empty()).then(|| parts.join(", "))
332    };
333
334    if let Some(detail) = detail {
335        push_capped(
336            &mut s.process_events,
337            ProcessEventRecord::new(name.to_string(), "termination_observed", Some(detail)),
338        );
339    }
340}
341
342pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
343    let mut s = state.write().unwrap();
344    let process = s
345        .processes
346        .entry(name.to_string())
347        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
348    process.status = "stopped".into();
349    process.ready = false;
350    process.ready_state = "stopped".into();
351    process.pid = None;
352    process.last_exit_code = exit_code;
353    process.started_at = None;
354    process.last_transition_at = Some(Instant::now());
355    process.status_detail = Some("process stopped".into());
356    push_capped(
357        &mut s.process_events,
358        ProcessEventRecord::new(
359            name.to_string(),
360            "stopped",
361            Some(format!(
362                "exit_code={}",
363                exit_code
364                    .map(|v| v.to_string())
365                    .unwrap_or_else(|| "none".into())
366            )),
367        ),
368    );
369    drop(s);
370    reconcile_config_status_for_process(state, name, false, "stopped");
371}
372
373pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
374    let mut s = state.write().unwrap();
375    let detail = {
376        let process = s
377            .processes
378            .entry(name.to_string())
379            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
380        process.status = "failed".into();
381        process.ready = false;
382        process.ready_state = "failed".into();
383        process.pid = None;
384        process.last_error = Some(error);
385        process.started_at = None;
386        process.last_transition_at = Some(Instant::now());
387        process.status_detail = process.last_error.clone();
388        process.last_error.clone()
389    };
390    push_capped(
391        &mut s.process_events,
392        ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
393    );
394    drop(s);
395    reconcile_config_status_for_process(state, name, false, "failed");
396}
397
398pub fn set_process_readiness(
399    state: &SharedState,
400    name: &str,
401    ready: bool,
402    ready_state: &str,
403    status_detail: Option<String>,
404) {
405    let mut s = state.write().unwrap();
406    let detail_clone = {
407        let process = s
408            .processes
409            .entry(name.to_string())
410            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
411        process.ready = ready;
412        process.ready_state = ready_state.to_string();
413        process.status_detail = status_detail;
414        process.status_detail.clone()
415    };
416    let should_record = match s.process_events.back() {
417        Some(last) => {
418            last.process != name || last.event != ready_state || last.detail != detail_clone
419        }
420        None => true,
421    };
422    if should_record {
423        push_capped(
424            &mut s.process_events,
425            ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
426        );
427    }
428    drop(s);
429    reconcile_config_status_for_process(state, name, ready, ready_state);
430}
431
432// --- Record types ---
433
434#[derive(Debug, Clone, Serialize)]
435pub struct AnnounceRecord {
436    pub dest_hash: String,
437    pub identity_hash: String,
438    pub hops: u8,
439    pub app_data: Option<String>,
440    pub received_at: f64,
441}
442
443#[derive(Debug, Clone, Serialize)]
444pub struct PacketRecord {
445    pub dest_hash: String,
446    pub packet_hash: String,
447    pub data_base64: String,
448    pub received_at: f64,
449}
450
451#[derive(Debug, Clone, Serialize)]
452pub struct ProofRecord {
453    pub dest_hash: String,
454    pub packet_hash: String,
455    pub rtt: f64,
456}
457
458#[derive(Debug, Clone, Serialize)]
459pub struct LinkEventRecord {
460    pub link_id: String,
461    pub event_type: String,
462    pub is_initiator: Option<bool>,
463    pub rtt: Option<f64>,
464    pub identity_hash: Option<String>,
465    pub reason: Option<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct ResourceEventRecord {
470    pub link_id: String,
471    pub event_type: String,
472    pub data_base64: Option<String>,
473    pub metadata_base64: Option<String>,
474    pub error: Option<String>,
475    pub received: Option<usize>,
476    pub total: Option<usize>,
477}
478
479#[derive(Debug, Clone)]
480pub struct ProcessEventRecord {
481    pub process: String,
482    pub event: String,
483    pub detail: Option<String>,
484    pub recorded_at: Instant,
485}
486
487#[derive(Debug, Clone)]
488pub struct ProcessLogRecord {
489    pub process: String,
490    pub stream: String,
491    pub line: String,
492    pub recorded_at: Instant,
493}
494
495impl ProcessEventRecord {
496    fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
497        Self {
498            process,
499            event: event.into(),
500            detail,
501            recorded_at: Instant::now(),
502        }
503    }
504}
505
506#[derive(Debug, Clone, Serialize)]
507pub struct ServerConfigSnapshot {
508    pub config_path: Option<String>,
509    pub resolved_config_dir: String,
510    pub server_config_file_path: String,
511    pub server_config_file_present: bool,
512    pub server_config_file_json: String,
513    pub stats_db_path: String,
514    pub rnsd_bin: String,
515    pub sentineld_bin: String,
516    pub statsd_bin: String,
517    pub http: ServerHttpConfigSnapshot,
518    pub launch_plan: Vec<LaunchProcessSnapshot>,
519}
520
521#[derive(Debug, Clone, Serialize)]
522pub struct ServerConfigSchemaSnapshot {
523    pub format: String,
524    pub example_config_json: String,
525    pub notes: Vec<String>,
526    pub fields: Vec<ServerConfigFieldSchema>,
527}
528
529#[derive(Debug, Clone, Serialize)]
530pub struct ServerConfigFieldSchema {
531    pub field: String,
532    pub field_type: String,
533    pub required: bool,
534    pub default_value: String,
535    pub description: String,
536    pub effect: String,
537}
538
539#[derive(Debug, Clone, Serialize)]
540pub struct ServerConfigStatusSnapshot {
541    pub last_saved_age_seconds: Option<f64>,
542    pub last_apply_age_seconds: Option<f64>,
543    pub last_action: Option<String>,
544    pub last_action_age_seconds: Option<f64>,
545    pub pending_action: Option<String>,
546    pub pending_targets: Vec<String>,
547    pub blocking_reason: Option<String>,
548    pub pending_process_restarts: Vec<String>,
549    pub control_plane_reload_required: bool,
550    pub control_plane_restart_required: bool,
551    pub runtime_differs_from_saved: bool,
552    pub converged: bool,
553    pub summary: String,
554    pub last_apply_plan: Option<ServerConfigApplyPlan>,
555}
556
557#[derive(Debug, Clone, Serialize)]
558pub struct ServerHttpConfigSnapshot {
559    pub enabled: bool,
560    pub host: String,
561    pub port: u16,
562    pub auth_mode: String,
563    pub token_configured: bool,
564    pub daemon_mode: bool,
565}
566
567#[derive(Debug, Clone, Serialize)]
568pub struct LaunchProcessSnapshot {
569    pub name: String,
570    pub bin: String,
571    pub args: Vec<String>,
572    pub command_line: String,
573}
574
575#[derive(Debug, Clone, Serialize)]
576pub struct ServerConfigValidationSnapshot {
577    pub valid: bool,
578    pub config: ServerConfigSnapshot,
579    pub warnings: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize)]
583pub struct ServerConfigMutationResult {
584    pub action: String,
585    pub config: ServerConfigSnapshot,
586    pub apply_plan: ServerConfigApplyPlan,
587    pub warnings: Vec<String>,
588}
589
590#[derive(Debug, Clone, Serialize)]
591pub struct ServerConfigApplyPlan {
592    pub overall_action: String,
593    pub processes_to_restart: Vec<String>,
594    pub control_plane_reload_required: bool,
595    pub control_plane_restart_required: bool,
596    pub notes: Vec<String>,
597    pub changes: Vec<ServerConfigChange>,
598}
599
600#[derive(Debug, Clone, Serialize)]
601pub struct ServerConfigChange {
602    pub field: String,
603    pub before: String,
604    pub after: String,
605    pub effect: String,
606}
607
608#[derive(Debug, Clone, Copy)]
609pub enum ServerConfigMutationMode {
610    Save,
611    Apply,
612}
613
614#[derive(Debug, Clone, Default)]
615pub struct ServerConfigStatusState {
616    pub last_saved_at: Option<Instant>,
617    pub last_apply_at: Option<Instant>,
618    pub last_action: Option<String>,
619    pub last_action_at: Option<Instant>,
620    pub pending_process_restarts: Vec<String>,
621    pub control_plane_reload_required: bool,
622    pub control_plane_restart_required: bool,
623    pub runtime_differs_from_saved: bool,
624    pub last_apply_plan: Option<ServerConfigApplyPlan>,
625}
626
627impl ServerConfigStatusState {
628    pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
629        let converged = self.pending_process_restarts.is_empty()
630            && !self.control_plane_reload_required
631            && !self.control_plane_restart_required;
632        let pending_action = (!converged)
633            .then(|| {
634                self.last_apply_plan
635                    .as_ref()
636                    .map(|plan| plan.overall_action.clone())
637            })
638            .flatten();
639        let mut pending_targets = self.pending_process_restarts.clone();
640        if self.control_plane_reload_required {
641            pending_targets.push("embedded-http-auth".into());
642        }
643        if self.control_plane_restart_required {
644            pending_targets.push("rns-server".into());
645        }
646        let blocking_reason = if self.control_plane_restart_required {
647            Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
648        } else if self.control_plane_reload_required {
649            Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
650        } else if !self.pending_process_restarts.is_empty() {
651            Some(format!(
652                "Waiting for restarted processes to become ready: {}.",
653                self.pending_process_restarts.join(", ")
654            ))
655        } else {
656            None
657        };
658        let summary = if self.runtime_differs_from_saved {
659            if self.control_plane_restart_required {
660                "Saved config is not fully active; rns-server restart is still required.".into()
661            } else if self.control_plane_reload_required {
662                "Saved config is not fully active; embedded HTTP auth reload is still required."
663                    .into()
664            } else if self.pending_process_restarts.is_empty() {
665                "Saved config differs from runtime state.".into()
666            } else {
667                format!(
668                    "Waiting for restarted processes to converge: {}.",
669                    self.pending_process_restarts.join(", ")
670                )
671            }
672        } else {
673            "Running state is converged with the saved config.".into()
674        };
675
676        ServerConfigStatusSnapshot {
677            last_saved_age_seconds: self
678                .last_saved_at
679                .map(|instant| instant.elapsed().as_secs_f64()),
680            last_apply_age_seconds: self
681                .last_apply_at
682                .map(|instant| instant.elapsed().as_secs_f64()),
683            last_action: self.last_action.clone(),
684            last_action_age_seconds: self
685                .last_action_at
686                .map(|instant| instant.elapsed().as_secs_f64()),
687            pending_action,
688            pending_targets,
689            blocking_reason,
690            pending_process_restarts: self.pending_process_restarts.clone(),
691            control_plane_reload_required: self.control_plane_reload_required,
692            control_plane_restart_required: self.control_plane_restart_required,
693            runtime_differs_from_saved: self.runtime_differs_from_saved,
694            converged,
695            summary,
696            last_apply_plan: self.last_apply_plan.clone(),
697        }
698    }
699}
700
701#[derive(Debug, Clone)]
702pub struct ManagedProcessState {
703    pub name: String,
704    pub status: String,
705    pub ready: bool,
706    pub ready_state: String,
707    pub pid: Option<u32>,
708    pub last_exit_code: Option<i32>,
709    pub restart_count: u32,
710    pub drain_ack_count: u32,
711    pub forced_kill_count: u32,
712    pub last_error: Option<String>,
713    pub status_detail: Option<String>,
714    pub durable_log_path: Option<String>,
715    pub last_log_at: Option<Instant>,
716    pub recent_log_lines: usize,
717    pub started_at: Option<Instant>,
718    pub last_transition_at: Option<Instant>,
719}
720
721#[derive(Debug, Clone)]
722pub enum ProcessControlCommand {
723    Restart(String),
724    Start(String),
725    Stop(String),
726}
727
728impl ManagedProcessState {
729    pub fn new(name: String) -> Self {
730        Self {
731            name,
732            status: "stopped".into(),
733            ready: false,
734            ready_state: "stopped".into(),
735            pid: None,
736            last_exit_code: None,
737            restart_count: 0,
738            drain_ack_count: 0,
739            forced_kill_count: 0,
740            last_error: None,
741            status_detail: None,
742            durable_log_path: None,
743            last_log_at: None,
744            recent_log_lines: 0,
745            started_at: None,
746            last_transition_at: None,
747        }
748    }
749
750    pub fn uptime_seconds(&self) -> Option<f64> {
751        self.started_at
752            .map(|started| started.elapsed().as_secs_f64())
753    }
754
755    pub fn last_transition_seconds(&self) -> Option<f64> {
756        self.last_transition_at
757            .map(|transition| transition.elapsed().as_secs_f64())
758    }
759
760    pub fn last_log_age_seconds(&self) -> Option<f64> {
761        self.last_log_at
762            .map(|logged| logged.elapsed().as_secs_f64())
763    }
764}
765
766// --- WebSocket events ---
767
768#[derive(Debug, Clone)]
769pub struct WsEvent {
770    pub topic: &'static str,
771    pub payload: serde_json::Value,
772}
773
774impl WsEvent {
775    pub fn announce(record: &AnnounceRecord) -> Self {
776        WsEvent {
777            topic: "announces",
778            payload: serde_json::to_value(record).unwrap_or_default(),
779        }
780    }
781
782    pub fn packet(record: &PacketRecord) -> Self {
783        WsEvent {
784            topic: "packets",
785            payload: serde_json::to_value(record).unwrap_or_default(),
786        }
787    }
788
789    pub fn proof(record: &ProofRecord) -> Self {
790        WsEvent {
791            topic: "proofs",
792            payload: serde_json::to_value(record).unwrap_or_default(),
793        }
794    }
795
796    pub fn link(record: &LinkEventRecord) -> Self {
797        WsEvent {
798            topic: "links",
799            payload: serde_json::to_value(record).unwrap_or_default(),
800        }
801    }
802
803    pub fn resource(record: &ResourceEventRecord) -> Self {
804        WsEvent {
805            topic: "resources",
806            payload: serde_json::to_value(record).unwrap_or_default(),
807        }
808    }
809
810    pub fn to_json(&self) -> String {
811        let obj = serde_json::json!({
812            "type": self.topic.trim_end_matches('s'),
813            "data": self.payload,
814        });
815        serde_json::to_string(&obj).unwrap_or_default()
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::{
822        mark_process_running, record_process_termination_observation, CtlState, SharedState,
823    };
824    use std::sync::{Arc, RwLock};
825
826    #[test]
827    fn termination_observation_tracks_drain_ack_and_forced_kill_counts() {
828        let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
829        mark_process_running(&state, "rnsd", 1234);
830
831        record_process_termination_observation(&state, "rnsd", true, false);
832        record_process_termination_observation(&state, "rnsd", false, true);
833
834        let snapshot = {
835            let s = state.read().unwrap();
836            s.processes.get("rnsd").cloned().unwrap()
837        };
838        assert_eq!(snapshot.drain_ack_count, 1);
839        assert_eq!(snapshot.forced_kill_count, 1);
840    }
841}
842
843/// Helper to create an AnnounceRecord from callback data.
844pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
845    AnnounceRecord {
846        dest_hash: to_hex(&announced.dest_hash.0),
847        identity_hash: to_hex(&announced.identity_hash.0),
848        hops: announced.hops,
849        app_data: announced
850            .app_data
851            .as_ref()
852            .map(|d| crate::encode::to_base64(d)),
853        received_at: announced.received_at,
854    }
855}