Skip to main content

rns_ctl/
state.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14/// Shared state accessible from HTTP handlers and Callbacks.
15pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18    Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20    dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21        + Send
22        + Sync,
23>;
24
25/// Registry of WebSocket broadcast senders.
26pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29    pub started_at: Instant,
30    pub server_mode: String,
31    pub server_config: Option<ServerConfigSnapshot>,
32    pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33    pub server_config_status: ServerConfigStatusState,
34    pub server_config_validator: Option<ServerConfigValidator>,
35    pub server_config_mutator: Option<ServerConfigMutator>,
36    pub identity_hash: Option<[u8; 16]>,
37    pub identity: Option<Identity>,
38    pub announces: VecDeque<AnnounceRecord>,
39    pub packets: VecDeque<PacketRecord>,
40    pub proofs: VecDeque<ProofRecord>,
41    pub link_events: VecDeque<LinkEventRecord>,
42    pub resource_events: VecDeque<ResourceEventRecord>,
43    pub process_events: VecDeque<ProcessEventRecord>,
44    pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45    pub destinations: HashMap<[u8; 16], DestinationEntry>,
46    pub processes: HashMap<String, ManagedProcessState>,
47    pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48    pub control_plane_config: Option<ControlPlaneConfigHandle>,
49    pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52/// A registered destination plus metadata for the API.
53pub struct DestinationEntry {
54    pub destination: Destination,
55    /// Full name: "app_name.aspect1.aspect2"
56    pub full_name: String,
57}
58
59impl CtlState {
60    pub fn new() -> Self {
61        CtlState {
62            started_at: Instant::now(),
63            server_mode: "standalone".into(),
64            server_config: None,
65            server_config_schema: None,
66            server_config_status: ServerConfigStatusState::default(),
67            server_config_validator: None,
68            server_config_mutator: None,
69            identity_hash: None,
70            identity: None,
71            announces: VecDeque::new(),
72            packets: VecDeque::new(),
73            proofs: VecDeque::new(),
74            link_events: VecDeque::new(),
75            resource_events: VecDeque::new(),
76            process_events: VecDeque::new(),
77            process_logs: HashMap::new(),
78            destinations: HashMap::new(),
79            processes: HashMap::new(),
80            control_tx: None,
81            control_plane_config: None,
82            node_handle: None,
83        }
84    }
85
86    pub fn uptime_seconds(&self) -> f64 {
87        self.started_at.elapsed().as_secs_f64()
88    }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92    if deque.len() >= MAX_RECORDS {
93        deque.pop_front();
94    }
95    deque.push_back(item);
96}
97
98pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
99    let mut s = state.write().unwrap();
100    push_capped(&mut s.announces, record);
101}
102
103pub fn push_packet(state: &SharedState, record: PacketRecord) {
104    let mut s = state.write().unwrap();
105    push_capped(&mut s.packets, record);
106}
107
108pub fn push_proof(state: &SharedState, record: ProofRecord) {
109    let mut s = state.write().unwrap();
110    push_capped(&mut s.proofs, record);
111}
112
113pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
114    let mut s = state.write().unwrap();
115    push_capped(&mut s.link_events, record);
116}
117
118pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
119    let mut s = state.write().unwrap();
120    push_capped(&mut s.resource_events, record);
121}
122
123/// Broadcast a WsEvent to all connected WebSocket clients.
124pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
125    let mut senders = ws.lock().unwrap();
126    senders.retain(|tx| tx.send(event.clone()).is_ok());
127}
128
129pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
130    let mut s = state.write().unwrap();
131    s.server_mode = mode.into();
132}
133
134pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
135    let mut s = state.write().unwrap();
136    s.server_config = Some(config);
137}
138
139pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
140    let mut s = state.write().unwrap();
141    s.server_config_schema = Some(schema);
142}
143
144pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
145    let mut s = state.write().unwrap();
146    s.server_config_status.last_saved_at = Some(Instant::now());
147    s.server_config_status.last_action = Some("save".into());
148    s.server_config_status.last_action_at = Some(Instant::now());
149    s.server_config_status.pending_process_restarts.clear();
150    s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
151    s.server_config_status.control_plane_restart_required =
152        apply_plan.control_plane_restart_required;
153    s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
154        || apply_plan.control_plane_reload_required
155        || apply_plan.control_plane_restart_required;
156    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
157}
158
159pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
160    let mut s = state.write().unwrap();
161    let now = Instant::now();
162    s.server_config_status.last_saved_at = Some(now);
163    s.server_config_status.last_apply_at = Some(now);
164    s.server_config_status.last_action = Some("apply".into());
165    s.server_config_status.last_action_at = Some(now);
166    s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
167    s.server_config_status.control_plane_reload_required = false;
168    s.server_config_status.control_plane_restart_required =
169        apply_plan.control_plane_restart_required;
170    s.server_config_status.runtime_differs_from_saved =
171        !s.server_config_status.pending_process_restarts.is_empty()
172            || s.server_config_status.control_plane_restart_required;
173    s.server_config_status.last_apply_plan = Some(apply_plan.clone());
174}
175
176pub fn reconcile_config_status_for_process(
177    state: &SharedState,
178    name: &str,
179    ready: bool,
180    status: &str,
181) {
182    let mut s = state.write().unwrap();
183    if ready {
184        s.server_config_status
185            .pending_process_restarts
186            .retain(|process| process != name);
187    }
188    if status == "failed" {
189        s.server_config_status.runtime_differs_from_saved = true;
190    } else if s.server_config_status.pending_process_restarts.is_empty()
191        && !s.server_config_status.control_plane_reload_required
192        && !s.server_config_status.control_plane_restart_required
193    {
194        s.server_config_status.runtime_differs_from_saved = false;
195    }
196}
197
198pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
199    let mut s = state.write().unwrap();
200    s.server_config_validator = Some(validator);
201}
202
203pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
204    let mut s = state.write().unwrap();
205    s.server_config_mutator = Some(mutator);
206}
207
208pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
209    let mut s = state.write().unwrap();
210    let name = name.into();
211    s.processes
212        .entry(name.clone())
213        .or_insert_with(|| ManagedProcessState::new(name.clone()));
214    s.process_logs.entry(name.clone()).or_default();
215    push_capped(
216        &mut s.process_events,
217        ProcessEventRecord::new(name, "registered", Some("process registered".into())),
218    );
219}
220
221pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
222    let mut s = state.write().unwrap();
223    let recent_log_lines = {
224        let logs = s.process_logs.entry(name.to_string()).or_default();
225        if logs.len() >= MAX_RECORDS {
226            logs.pop_front();
227        }
228        logs.push_back(ProcessLogRecord {
229            process: name.to_string(),
230            stream: stream.to_string(),
231            line: line.into(),
232            recorded_at: Instant::now(),
233        });
234        logs.len()
235    };
236    let process = s
237        .processes
238        .entry(name.to_string())
239        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
240    process.last_log_at = Some(Instant::now());
241    process.recent_log_lines = recent_log_lines;
242}
243
244pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
245    let mut s = state.write().unwrap();
246    let process = s
247        .processes
248        .entry(name.to_string())
249        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
250    process.durable_log_path = Some(path.into());
251}
252
253pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
254    let mut s = state.write().unwrap();
255    s.control_tx = Some(tx);
256}
257
258pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
259    let mut s = state.write().unwrap();
260    s.control_plane_config = Some(config);
261}
262
263pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
264    let mut s = state.write().unwrap();
265    let process = s
266        .processes
267        .entry(name.to_string())
268        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
269    process.status = "running".into();
270    process.ready = false;
271    process.ready_state = "starting".into();
272    process.pid = Some(pid);
273    process.started_at = Some(Instant::now());
274    process.last_transition_at = Some(Instant::now());
275    process.last_error = None;
276    process.status_detail = Some("process spawned".into());
277    push_capped(
278        &mut s.process_events,
279        ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
280    );
281    drop(s);
282    reconcile_config_status_for_process(state, name, false, "running");
283}
284
285pub fn bump_process_restart_count(state: &SharedState, name: &str) {
286    let mut s = state.write().unwrap();
287    let restart_count = {
288        let process = s
289            .processes
290            .entry(name.to_string())
291            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
292        process.restart_count = process.restart_count.saturating_add(1);
293        process.restart_count
294    };
295    push_capped(
296        &mut s.process_events,
297        ProcessEventRecord::new(
298            name.to_string(),
299            "restart_requested",
300            Some(format!("restart_count={}", restart_count)),
301        ),
302    );
303}
304
305pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
306    let mut s = state.write().unwrap();
307    let process = s
308        .processes
309        .entry(name.to_string())
310        .or_insert_with(|| ManagedProcessState::new(name.to_string()));
311    process.status = "stopped".into();
312    process.ready = false;
313    process.ready_state = "stopped".into();
314    process.pid = None;
315    process.last_exit_code = exit_code;
316    process.started_at = None;
317    process.last_transition_at = Some(Instant::now());
318    process.status_detail = Some("process stopped".into());
319    push_capped(
320        &mut s.process_events,
321        ProcessEventRecord::new(
322            name.to_string(),
323            "stopped",
324            Some(format!(
325                "exit_code={}",
326                exit_code
327                    .map(|v| v.to_string())
328                    .unwrap_or_else(|| "none".into())
329            )),
330        ),
331    );
332    drop(s);
333    reconcile_config_status_for_process(state, name, false, "stopped");
334}
335
336pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
337    let mut s = state.write().unwrap();
338    let detail = {
339        let process = s
340            .processes
341            .entry(name.to_string())
342            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
343        process.status = "failed".into();
344        process.ready = false;
345        process.ready_state = "failed".into();
346        process.pid = None;
347        process.last_error = Some(error);
348        process.started_at = None;
349        process.last_transition_at = Some(Instant::now());
350        process.status_detail = process.last_error.clone();
351        process.last_error.clone()
352    };
353    push_capped(
354        &mut s.process_events,
355        ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
356    );
357    drop(s);
358    reconcile_config_status_for_process(state, name, false, "failed");
359}
360
361pub fn set_process_readiness(
362    state: &SharedState,
363    name: &str,
364    ready: bool,
365    ready_state: &str,
366    status_detail: Option<String>,
367) {
368    let mut s = state.write().unwrap();
369    let detail_clone = {
370        let process = s
371            .processes
372            .entry(name.to_string())
373            .or_insert_with(|| ManagedProcessState::new(name.to_string()));
374        process.ready = ready;
375        process.ready_state = ready_state.to_string();
376        process.status_detail = status_detail;
377        process.status_detail.clone()
378    };
379    let should_record = match s.process_events.back() {
380        Some(last) => {
381            last.process != name || last.event != ready_state || last.detail != detail_clone
382        }
383        None => true,
384    };
385    if should_record {
386        push_capped(
387            &mut s.process_events,
388            ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
389        );
390    }
391    drop(s);
392    reconcile_config_status_for_process(state, name, ready, ready_state);
393}
394
395// --- Record types ---
396
397#[derive(Debug, Clone, Serialize)]
398pub struct AnnounceRecord {
399    pub dest_hash: String,
400    pub identity_hash: String,
401    pub hops: u8,
402    pub app_data: Option<String>,
403    pub received_at: f64,
404}
405
406#[derive(Debug, Clone, Serialize)]
407pub struct PacketRecord {
408    pub dest_hash: String,
409    pub packet_hash: String,
410    pub data_base64: String,
411    pub received_at: f64,
412}
413
414#[derive(Debug, Clone, Serialize)]
415pub struct ProofRecord {
416    pub dest_hash: String,
417    pub packet_hash: String,
418    pub rtt: f64,
419}
420
421#[derive(Debug, Clone, Serialize)]
422pub struct LinkEventRecord {
423    pub link_id: String,
424    pub event_type: String,
425    pub is_initiator: Option<bool>,
426    pub rtt: Option<f64>,
427    pub identity_hash: Option<String>,
428    pub reason: Option<String>,
429}
430
431#[derive(Debug, Clone, Serialize)]
432pub struct ResourceEventRecord {
433    pub link_id: String,
434    pub event_type: String,
435    pub data_base64: Option<String>,
436    pub metadata_base64: Option<String>,
437    pub error: Option<String>,
438    pub received: Option<usize>,
439    pub total: Option<usize>,
440}
441
442#[derive(Debug, Clone)]
443pub struct ProcessEventRecord {
444    pub process: String,
445    pub event: String,
446    pub detail: Option<String>,
447    pub recorded_at: Instant,
448}
449
450#[derive(Debug, Clone)]
451pub struct ProcessLogRecord {
452    pub process: String,
453    pub stream: String,
454    pub line: String,
455    pub recorded_at: Instant,
456}
457
458impl ProcessEventRecord {
459    fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
460        Self {
461            process,
462            event: event.into(),
463            detail,
464            recorded_at: Instant::now(),
465        }
466    }
467}
468
469#[derive(Debug, Clone, Serialize)]
470pub struct ServerConfigSnapshot {
471    pub config_path: Option<String>,
472    pub resolved_config_dir: String,
473    pub server_config_file_path: String,
474    pub server_config_file_present: bool,
475    pub server_config_file_json: String,
476    pub stats_db_path: String,
477    pub rnsd_bin: String,
478    pub sentineld_bin: String,
479    pub statsd_bin: String,
480    pub http: ServerHttpConfigSnapshot,
481    pub launch_plan: Vec<LaunchProcessSnapshot>,
482}
483
484#[derive(Debug, Clone, Serialize)]
485pub struct ServerConfigSchemaSnapshot {
486    pub format: String,
487    pub example_config_json: String,
488    pub notes: Vec<String>,
489    pub fields: Vec<ServerConfigFieldSchema>,
490}
491
492#[derive(Debug, Clone, Serialize)]
493pub struct ServerConfigFieldSchema {
494    pub field: String,
495    pub field_type: String,
496    pub required: bool,
497    pub default_value: String,
498    pub description: String,
499    pub effect: String,
500}
501
502#[derive(Debug, Clone, Serialize)]
503pub struct ServerConfigStatusSnapshot {
504    pub last_saved_age_seconds: Option<f64>,
505    pub last_apply_age_seconds: Option<f64>,
506    pub last_action: Option<String>,
507    pub last_action_age_seconds: Option<f64>,
508    pub pending_action: Option<String>,
509    pub pending_targets: Vec<String>,
510    pub blocking_reason: Option<String>,
511    pub pending_process_restarts: Vec<String>,
512    pub control_plane_reload_required: bool,
513    pub control_plane_restart_required: bool,
514    pub runtime_differs_from_saved: bool,
515    pub converged: bool,
516    pub summary: String,
517    pub last_apply_plan: Option<ServerConfigApplyPlan>,
518}
519
520#[derive(Debug, Clone, Serialize)]
521pub struct ServerHttpConfigSnapshot {
522    pub enabled: bool,
523    pub host: String,
524    pub port: u16,
525    pub auth_mode: String,
526    pub token_configured: bool,
527    pub daemon_mode: bool,
528}
529
530#[derive(Debug, Clone, Serialize)]
531pub struct LaunchProcessSnapshot {
532    pub name: String,
533    pub bin: String,
534    pub args: Vec<String>,
535    pub command_line: String,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct ServerConfigValidationSnapshot {
540    pub valid: bool,
541    pub config: ServerConfigSnapshot,
542    pub warnings: Vec<String>,
543}
544
545#[derive(Debug, Clone, Serialize)]
546pub struct ServerConfigMutationResult {
547    pub action: String,
548    pub config: ServerConfigSnapshot,
549    pub apply_plan: ServerConfigApplyPlan,
550    pub warnings: Vec<String>,
551}
552
553#[derive(Debug, Clone, Serialize)]
554pub struct ServerConfigApplyPlan {
555    pub overall_action: String,
556    pub processes_to_restart: Vec<String>,
557    pub control_plane_reload_required: bool,
558    pub control_plane_restart_required: bool,
559    pub notes: Vec<String>,
560    pub changes: Vec<ServerConfigChange>,
561}
562
563#[derive(Debug, Clone, Serialize)]
564pub struct ServerConfigChange {
565    pub field: String,
566    pub before: String,
567    pub after: String,
568    pub effect: String,
569}
570
571#[derive(Debug, Clone, Copy)]
572pub enum ServerConfigMutationMode {
573    Save,
574    Apply,
575}
576
577#[derive(Debug, Clone, Default)]
578pub struct ServerConfigStatusState {
579    pub last_saved_at: Option<Instant>,
580    pub last_apply_at: Option<Instant>,
581    pub last_action: Option<String>,
582    pub last_action_at: Option<Instant>,
583    pub pending_process_restarts: Vec<String>,
584    pub control_plane_reload_required: bool,
585    pub control_plane_restart_required: bool,
586    pub runtime_differs_from_saved: bool,
587    pub last_apply_plan: Option<ServerConfigApplyPlan>,
588}
589
590impl ServerConfigStatusState {
591    pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
592        let converged = self.pending_process_restarts.is_empty()
593            && !self.control_plane_reload_required
594            && !self.control_plane_restart_required;
595        let pending_action = (!converged)
596            .then(|| {
597                self.last_apply_plan
598                    .as_ref()
599                    .map(|plan| plan.overall_action.clone())
600            })
601            .flatten();
602        let mut pending_targets = self.pending_process_restarts.clone();
603        if self.control_plane_reload_required {
604            pending_targets.push("embedded-http-auth".into());
605        }
606        if self.control_plane_restart_required {
607            pending_targets.push("rns-server".into());
608        }
609        let blocking_reason = if self.control_plane_restart_required {
610            Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
611        } else if self.control_plane_reload_required {
612            Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
613        } else if !self.pending_process_restarts.is_empty() {
614            Some(format!(
615                "Waiting for restarted processes to become ready: {}.",
616                self.pending_process_restarts.join(", ")
617            ))
618        } else {
619            None
620        };
621        let summary = if self.runtime_differs_from_saved {
622            if self.control_plane_restart_required {
623                "Saved config is not fully active; rns-server restart is still required.".into()
624            } else if self.control_plane_reload_required {
625                "Saved config is not fully active; embedded HTTP auth reload is still required."
626                    .into()
627            } else if self.pending_process_restarts.is_empty() {
628                "Saved config differs from runtime state.".into()
629            } else {
630                format!(
631                    "Waiting for restarted processes to converge: {}.",
632                    self.pending_process_restarts.join(", ")
633                )
634            }
635        } else {
636            "Running state is converged with the saved config.".into()
637        };
638
639        ServerConfigStatusSnapshot {
640            last_saved_age_seconds: self
641                .last_saved_at
642                .map(|instant| instant.elapsed().as_secs_f64()),
643            last_apply_age_seconds: self
644                .last_apply_at
645                .map(|instant| instant.elapsed().as_secs_f64()),
646            last_action: self.last_action.clone(),
647            last_action_age_seconds: self
648                .last_action_at
649                .map(|instant| instant.elapsed().as_secs_f64()),
650            pending_action,
651            pending_targets,
652            blocking_reason,
653            pending_process_restarts: self.pending_process_restarts.clone(),
654            control_plane_reload_required: self.control_plane_reload_required,
655            control_plane_restart_required: self.control_plane_restart_required,
656            runtime_differs_from_saved: self.runtime_differs_from_saved,
657            converged,
658            summary,
659            last_apply_plan: self.last_apply_plan.clone(),
660        }
661    }
662}
663
664#[derive(Debug, Clone)]
665pub struct ManagedProcessState {
666    pub name: String,
667    pub status: String,
668    pub ready: bool,
669    pub ready_state: String,
670    pub pid: Option<u32>,
671    pub last_exit_code: Option<i32>,
672    pub restart_count: u32,
673    pub last_error: Option<String>,
674    pub status_detail: Option<String>,
675    pub durable_log_path: Option<String>,
676    pub last_log_at: Option<Instant>,
677    pub recent_log_lines: usize,
678    pub started_at: Option<Instant>,
679    pub last_transition_at: Option<Instant>,
680}
681
682#[derive(Debug, Clone)]
683pub enum ProcessControlCommand {
684    Restart(String),
685    Start(String),
686    Stop(String),
687}
688
689impl ManagedProcessState {
690    pub fn new(name: String) -> Self {
691        Self {
692            name,
693            status: "stopped".into(),
694            ready: false,
695            ready_state: "stopped".into(),
696            pid: None,
697            last_exit_code: None,
698            restart_count: 0,
699            last_error: None,
700            status_detail: None,
701            durable_log_path: None,
702            last_log_at: None,
703            recent_log_lines: 0,
704            started_at: None,
705            last_transition_at: None,
706        }
707    }
708
709    pub fn uptime_seconds(&self) -> Option<f64> {
710        self.started_at
711            .map(|started| started.elapsed().as_secs_f64())
712    }
713
714    pub fn last_transition_seconds(&self) -> Option<f64> {
715        self.last_transition_at
716            .map(|transition| transition.elapsed().as_secs_f64())
717    }
718
719    pub fn last_log_age_seconds(&self) -> Option<f64> {
720        self.last_log_at
721            .map(|logged| logged.elapsed().as_secs_f64())
722    }
723}
724
725// --- WebSocket events ---
726
727#[derive(Debug, Clone)]
728pub struct WsEvent {
729    pub topic: &'static str,
730    pub payload: serde_json::Value,
731}
732
733impl WsEvent {
734    pub fn announce(record: &AnnounceRecord) -> Self {
735        WsEvent {
736            topic: "announces",
737            payload: serde_json::to_value(record).unwrap_or_default(),
738        }
739    }
740
741    pub fn packet(record: &PacketRecord) -> Self {
742        WsEvent {
743            topic: "packets",
744            payload: serde_json::to_value(record).unwrap_or_default(),
745        }
746    }
747
748    pub fn proof(record: &ProofRecord) -> Self {
749        WsEvent {
750            topic: "proofs",
751            payload: serde_json::to_value(record).unwrap_or_default(),
752        }
753    }
754
755    pub fn link(record: &LinkEventRecord) -> Self {
756        WsEvent {
757            topic: "links",
758            payload: serde_json::to_value(record).unwrap_or_default(),
759        }
760    }
761
762    pub fn resource(record: &ResourceEventRecord) -> Self {
763        WsEvent {
764            topic: "resources",
765            payload: serde_json::to_value(record).unwrap_or_default(),
766        }
767    }
768
769    pub fn to_json(&self) -> String {
770        let obj = serde_json::json!({
771            "type": self.topic.trim_end_matches('s'),
772            "data": self.payload,
773        });
774        serde_json::to_string(&obj).unwrap_or_default()
775    }
776}
777
778/// Helper to create an AnnounceRecord from callback data.
779pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
780    AnnounceRecord {
781        dest_hash: to_hex(&announced.dest_hash.0),
782        identity_hash: to_hex(&announced.identity_hash.0),
783        hops: announced.hops,
784        app_data: announced
785            .app_data
786            .as_ref()
787            .map(|d| crate::encode::to_base64(d)),
788        received_at: announced.received_at,
789    }
790}