1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18 Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20 dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21 + Send
22 + Sync,
23>;
24
25pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29 pub started_at: Instant,
30 pub server_mode: String,
31 pub server_config: Option<ServerConfigSnapshot>,
32 pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33 pub server_config_status: ServerConfigStatusState,
34 pub server_config_validator: Option<ServerConfigValidator>,
35 pub server_config_mutator: Option<ServerConfigMutator>,
36 pub identity_hash: Option<[u8; 16]>,
37 pub identity: Option<Identity>,
38 pub announces: VecDeque<AnnounceRecord>,
39 pub packets: VecDeque<PacketRecord>,
40 pub proofs: VecDeque<ProofRecord>,
41 pub link_events: VecDeque<LinkEventRecord>,
42 pub resource_events: VecDeque<ResourceEventRecord>,
43 pub process_events: VecDeque<ProcessEventRecord>,
44 pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45 pub destinations: HashMap<[u8; 16], DestinationEntry>,
46 pub processes: HashMap<String, ManagedProcessState>,
47 pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48 pub control_plane_config: Option<ControlPlaneConfigHandle>,
49 pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52pub struct DestinationEntry {
54 pub destination: Destination,
55 pub full_name: String,
57}
58
59impl CtlState {
60 pub fn new() -> Self {
61 CtlState {
62 started_at: Instant::now(),
63 server_mode: "standalone".into(),
64 server_config: None,
65 server_config_schema: None,
66 server_config_status: ServerConfigStatusState::default(),
67 server_config_validator: None,
68 server_config_mutator: None,
69 identity_hash: None,
70 identity: None,
71 announces: VecDeque::new(),
72 packets: VecDeque::new(),
73 proofs: VecDeque::new(),
74 link_events: VecDeque::new(),
75 resource_events: VecDeque::new(),
76 process_events: VecDeque::new(),
77 process_logs: HashMap::new(),
78 destinations: HashMap::new(),
79 processes: HashMap::new(),
80 control_tx: None,
81 control_plane_config: None,
82 node_handle: None,
83 }
84 }
85
86 pub fn uptime_seconds(&self) -> f64 {
87 self.started_at.elapsed().as_secs_f64()
88 }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92 if deque.len() >= MAX_RECORDS {
93 deque.pop_front();
94 }
95 deque.push_back(item);
96}
97
98pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
99 let mut s = state.write().unwrap();
100 push_capped(&mut s.announces, record);
101}
102
103pub fn push_packet(state: &SharedState, record: PacketRecord) {
104 let mut s = state.write().unwrap();
105 push_capped(&mut s.packets, record);
106}
107
108pub fn push_proof(state: &SharedState, record: ProofRecord) {
109 let mut s = state.write().unwrap();
110 push_capped(&mut s.proofs, record);
111}
112
113pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
114 let mut s = state.write().unwrap();
115 push_capped(&mut s.link_events, record);
116}
117
118pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
119 let mut s = state.write().unwrap();
120 push_capped(&mut s.resource_events, record);
121}
122
123pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
125 let mut senders = ws.lock().unwrap();
126 senders.retain(|tx| tx.send(event.clone()).is_ok());
127}
128
129pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
130 let mut s = state.write().unwrap();
131 s.server_mode = mode.into();
132}
133
134pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
135 let mut s = state.write().unwrap();
136 s.server_config = Some(config);
137}
138
139pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
140 let mut s = state.write().unwrap();
141 s.server_config_schema = Some(schema);
142}
143
144pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
145 let mut s = state.write().unwrap();
146 s.server_config_status.last_saved_at = Some(Instant::now());
147 s.server_config_status.last_action = Some("save".into());
148 s.server_config_status.last_action_at = Some(Instant::now());
149 s.server_config_status.pending_process_restarts.clear();
150 s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
151 s.server_config_status.control_plane_restart_required =
152 apply_plan.control_plane_restart_required;
153 s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
154 || apply_plan.control_plane_reload_required
155 || apply_plan.control_plane_restart_required;
156 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
157}
158
159pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
160 let mut s = state.write().unwrap();
161 let now = Instant::now();
162 s.server_config_status.last_saved_at = Some(now);
163 s.server_config_status.last_apply_at = Some(now);
164 s.server_config_status.last_action = Some("apply".into());
165 s.server_config_status.last_action_at = Some(now);
166 s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
167 s.server_config_status.control_plane_reload_required = false;
168 s.server_config_status.control_plane_restart_required =
169 apply_plan.control_plane_restart_required;
170 s.server_config_status.runtime_differs_from_saved =
171 !s.server_config_status.pending_process_restarts.is_empty()
172 || s.server_config_status.control_plane_restart_required;
173 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
174}
175
176pub fn reconcile_config_status_for_process(
177 state: &SharedState,
178 name: &str,
179 ready: bool,
180 status: &str,
181) {
182 let mut s = state.write().unwrap();
183 if ready {
184 s.server_config_status
185 .pending_process_restarts
186 .retain(|process| process != name);
187 }
188 if status == "failed" {
189 s.server_config_status.runtime_differs_from_saved = true;
190 } else if s.server_config_status.pending_process_restarts.is_empty()
191 && !s.server_config_status.control_plane_reload_required
192 && !s.server_config_status.control_plane_restart_required
193 {
194 s.server_config_status.runtime_differs_from_saved = false;
195 }
196}
197
198pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
199 let mut s = state.write().unwrap();
200 s.server_config_validator = Some(validator);
201}
202
203pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
204 let mut s = state.write().unwrap();
205 s.server_config_mutator = Some(mutator);
206}
207
208pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
209 let mut s = state.write().unwrap();
210 let name = name.into();
211 s.processes
212 .entry(name.clone())
213 .or_insert_with(|| ManagedProcessState::new(name.clone()));
214 s.process_logs.entry(name.clone()).or_default();
215 push_capped(
216 &mut s.process_events,
217 ProcessEventRecord::new(name, "registered", Some("process registered".into())),
218 );
219}
220
221pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
222 let mut s = state.write().unwrap();
223 let recent_log_lines = {
224 let logs = s.process_logs.entry(name.to_string()).or_default();
225 if logs.len() >= MAX_RECORDS {
226 logs.pop_front();
227 }
228 logs.push_back(ProcessLogRecord {
229 process: name.to_string(),
230 stream: stream.to_string(),
231 line: line.into(),
232 recorded_at: Instant::now(),
233 });
234 logs.len()
235 };
236 let process = s
237 .processes
238 .entry(name.to_string())
239 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
240 process.last_log_at = Some(Instant::now());
241 process.recent_log_lines = recent_log_lines;
242}
243
244pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
245 let mut s = state.write().unwrap();
246 let process = s
247 .processes
248 .entry(name.to_string())
249 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
250 process.durable_log_path = Some(path.into());
251}
252
253pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
254 let mut s = state.write().unwrap();
255 s.control_tx = Some(tx);
256}
257
258pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
259 let mut s = state.write().unwrap();
260 s.control_plane_config = Some(config);
261}
262
263pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
264 let mut s = state.write().unwrap();
265 let process = s
266 .processes
267 .entry(name.to_string())
268 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
269 process.status = "running".into();
270 process.ready = false;
271 process.ready_state = "starting".into();
272 process.pid = Some(pid);
273 process.started_at = Some(Instant::now());
274 process.last_transition_at = Some(Instant::now());
275 process.last_error = None;
276 process.status_detail = Some("process spawned".into());
277 push_capped(
278 &mut s.process_events,
279 ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
280 );
281 drop(s);
282 reconcile_config_status_for_process(state, name, false, "running");
283}
284
285pub fn bump_process_restart_count(state: &SharedState, name: &str) {
286 let mut s = state.write().unwrap();
287 let restart_count = {
288 let process = s
289 .processes
290 .entry(name.to_string())
291 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
292 process.restart_count = process.restart_count.saturating_add(1);
293 process.restart_count
294 };
295 push_capped(
296 &mut s.process_events,
297 ProcessEventRecord::new(
298 name.to_string(),
299 "restart_requested",
300 Some(format!("restart_count={}", restart_count)),
301 ),
302 );
303}
304
305pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
306 let mut s = state.write().unwrap();
307 let process = s
308 .processes
309 .entry(name.to_string())
310 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
311 process.status = "stopped".into();
312 process.ready = false;
313 process.ready_state = "stopped".into();
314 process.pid = None;
315 process.last_exit_code = exit_code;
316 process.started_at = None;
317 process.last_transition_at = Some(Instant::now());
318 process.status_detail = Some("process stopped".into());
319 push_capped(
320 &mut s.process_events,
321 ProcessEventRecord::new(
322 name.to_string(),
323 "stopped",
324 Some(format!(
325 "exit_code={}",
326 exit_code
327 .map(|v| v.to_string())
328 .unwrap_or_else(|| "none".into())
329 )),
330 ),
331 );
332 drop(s);
333 reconcile_config_status_for_process(state, name, false, "stopped");
334}
335
336pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
337 let mut s = state.write().unwrap();
338 let detail = {
339 let process = s
340 .processes
341 .entry(name.to_string())
342 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
343 process.status = "failed".into();
344 process.ready = false;
345 process.ready_state = "failed".into();
346 process.pid = None;
347 process.last_error = Some(error);
348 process.started_at = None;
349 process.last_transition_at = Some(Instant::now());
350 process.status_detail = process.last_error.clone();
351 process.last_error.clone()
352 };
353 push_capped(
354 &mut s.process_events,
355 ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
356 );
357 drop(s);
358 reconcile_config_status_for_process(state, name, false, "failed");
359}
360
361pub fn set_process_readiness(
362 state: &SharedState,
363 name: &str,
364 ready: bool,
365 ready_state: &str,
366 status_detail: Option<String>,
367) {
368 let mut s = state.write().unwrap();
369 let detail_clone = {
370 let process = s
371 .processes
372 .entry(name.to_string())
373 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
374 process.ready = ready;
375 process.ready_state = ready_state.to_string();
376 process.status_detail = status_detail;
377 process.status_detail.clone()
378 };
379 let should_record = match s.process_events.back() {
380 Some(last) => {
381 last.process != name || last.event != ready_state || last.detail != detail_clone
382 }
383 None => true,
384 };
385 if should_record {
386 push_capped(
387 &mut s.process_events,
388 ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
389 );
390 }
391 drop(s);
392 reconcile_config_status_for_process(state, name, ready, ready_state);
393}
394
395#[derive(Debug, Clone, Serialize)]
398pub struct AnnounceRecord {
399 pub dest_hash: String,
400 pub identity_hash: String,
401 pub hops: u8,
402 pub app_data: Option<String>,
403 pub received_at: f64,
404}
405
406#[derive(Debug, Clone, Serialize)]
407pub struct PacketRecord {
408 pub dest_hash: String,
409 pub packet_hash: String,
410 pub data_base64: String,
411 pub received_at: f64,
412}
413
414#[derive(Debug, Clone, Serialize)]
415pub struct ProofRecord {
416 pub dest_hash: String,
417 pub packet_hash: String,
418 pub rtt: f64,
419}
420
421#[derive(Debug, Clone, Serialize)]
422pub struct LinkEventRecord {
423 pub link_id: String,
424 pub event_type: String,
425 pub is_initiator: Option<bool>,
426 pub rtt: Option<f64>,
427 pub identity_hash: Option<String>,
428 pub reason: Option<String>,
429}
430
431#[derive(Debug, Clone, Serialize)]
432pub struct ResourceEventRecord {
433 pub link_id: String,
434 pub event_type: String,
435 pub data_base64: Option<String>,
436 pub metadata_base64: Option<String>,
437 pub error: Option<String>,
438 pub received: Option<usize>,
439 pub total: Option<usize>,
440}
441
442#[derive(Debug, Clone)]
443pub struct ProcessEventRecord {
444 pub process: String,
445 pub event: String,
446 pub detail: Option<String>,
447 pub recorded_at: Instant,
448}
449
450#[derive(Debug, Clone)]
451pub struct ProcessLogRecord {
452 pub process: String,
453 pub stream: String,
454 pub line: String,
455 pub recorded_at: Instant,
456}
457
458impl ProcessEventRecord {
459 fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
460 Self {
461 process,
462 event: event.into(),
463 detail,
464 recorded_at: Instant::now(),
465 }
466 }
467}
468
469#[derive(Debug, Clone, Serialize)]
470pub struct ServerConfigSnapshot {
471 pub config_path: Option<String>,
472 pub resolved_config_dir: String,
473 pub server_config_file_path: String,
474 pub server_config_file_present: bool,
475 pub server_config_file_json: String,
476 pub stats_db_path: String,
477 pub rnsd_bin: String,
478 pub sentineld_bin: String,
479 pub statsd_bin: String,
480 pub http: ServerHttpConfigSnapshot,
481 pub launch_plan: Vec<LaunchProcessSnapshot>,
482}
483
484#[derive(Debug, Clone, Serialize)]
485pub struct ServerConfigSchemaSnapshot {
486 pub format: String,
487 pub example_config_json: String,
488 pub notes: Vec<String>,
489 pub fields: Vec<ServerConfigFieldSchema>,
490}
491
492#[derive(Debug, Clone, Serialize)]
493pub struct ServerConfigFieldSchema {
494 pub field: String,
495 pub field_type: String,
496 pub required: bool,
497 pub default_value: String,
498 pub description: String,
499 pub effect: String,
500}
501
502#[derive(Debug, Clone, Serialize)]
503pub struct ServerConfigStatusSnapshot {
504 pub last_saved_age_seconds: Option<f64>,
505 pub last_apply_age_seconds: Option<f64>,
506 pub last_action: Option<String>,
507 pub last_action_age_seconds: Option<f64>,
508 pub pending_action: Option<String>,
509 pub pending_targets: Vec<String>,
510 pub blocking_reason: Option<String>,
511 pub pending_process_restarts: Vec<String>,
512 pub control_plane_reload_required: bool,
513 pub control_plane_restart_required: bool,
514 pub runtime_differs_from_saved: bool,
515 pub converged: bool,
516 pub summary: String,
517 pub last_apply_plan: Option<ServerConfigApplyPlan>,
518}
519
520#[derive(Debug, Clone, Serialize)]
521pub struct ServerHttpConfigSnapshot {
522 pub enabled: bool,
523 pub host: String,
524 pub port: u16,
525 pub auth_mode: String,
526 pub token_configured: bool,
527 pub daemon_mode: bool,
528}
529
530#[derive(Debug, Clone, Serialize)]
531pub struct LaunchProcessSnapshot {
532 pub name: String,
533 pub bin: String,
534 pub args: Vec<String>,
535 pub command_line: String,
536}
537
538#[derive(Debug, Clone, Serialize)]
539pub struct ServerConfigValidationSnapshot {
540 pub valid: bool,
541 pub config: ServerConfigSnapshot,
542 pub warnings: Vec<String>,
543}
544
545#[derive(Debug, Clone, Serialize)]
546pub struct ServerConfigMutationResult {
547 pub action: String,
548 pub config: ServerConfigSnapshot,
549 pub apply_plan: ServerConfigApplyPlan,
550 pub warnings: Vec<String>,
551}
552
553#[derive(Debug, Clone, Serialize)]
554pub struct ServerConfigApplyPlan {
555 pub overall_action: String,
556 pub processes_to_restart: Vec<String>,
557 pub control_plane_reload_required: bool,
558 pub control_plane_restart_required: bool,
559 pub notes: Vec<String>,
560 pub changes: Vec<ServerConfigChange>,
561}
562
563#[derive(Debug, Clone, Serialize)]
564pub struct ServerConfigChange {
565 pub field: String,
566 pub before: String,
567 pub after: String,
568 pub effect: String,
569}
570
571#[derive(Debug, Clone, Copy)]
572pub enum ServerConfigMutationMode {
573 Save,
574 Apply,
575}
576
577#[derive(Debug, Clone, Default)]
578pub struct ServerConfigStatusState {
579 pub last_saved_at: Option<Instant>,
580 pub last_apply_at: Option<Instant>,
581 pub last_action: Option<String>,
582 pub last_action_at: Option<Instant>,
583 pub pending_process_restarts: Vec<String>,
584 pub control_plane_reload_required: bool,
585 pub control_plane_restart_required: bool,
586 pub runtime_differs_from_saved: bool,
587 pub last_apply_plan: Option<ServerConfigApplyPlan>,
588}
589
590impl ServerConfigStatusState {
591 pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
592 let converged = self.pending_process_restarts.is_empty()
593 && !self.control_plane_reload_required
594 && !self.control_plane_restart_required;
595 let pending_action = (!converged)
596 .then(|| {
597 self.last_apply_plan
598 .as_ref()
599 .map(|plan| plan.overall_action.clone())
600 })
601 .flatten();
602 let mut pending_targets = self.pending_process_restarts.clone();
603 if self.control_plane_reload_required {
604 pending_targets.push("embedded-http-auth".into());
605 }
606 if self.control_plane_restart_required {
607 pending_targets.push("rns-server".into());
608 }
609 let blocking_reason = if self.control_plane_restart_required {
610 Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
611 } else if self.control_plane_reload_required {
612 Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
613 } else if !self.pending_process_restarts.is_empty() {
614 Some(format!(
615 "Waiting for restarted processes to become ready: {}.",
616 self.pending_process_restarts.join(", ")
617 ))
618 } else {
619 None
620 };
621 let summary = if self.runtime_differs_from_saved {
622 if self.control_plane_restart_required {
623 "Saved config is not fully active; rns-server restart is still required.".into()
624 } else if self.control_plane_reload_required {
625 "Saved config is not fully active; embedded HTTP auth reload is still required."
626 .into()
627 } else if self.pending_process_restarts.is_empty() {
628 "Saved config differs from runtime state.".into()
629 } else {
630 format!(
631 "Waiting for restarted processes to converge: {}.",
632 self.pending_process_restarts.join(", ")
633 )
634 }
635 } else {
636 "Running state is converged with the saved config.".into()
637 };
638
639 ServerConfigStatusSnapshot {
640 last_saved_age_seconds: self
641 .last_saved_at
642 .map(|instant| instant.elapsed().as_secs_f64()),
643 last_apply_age_seconds: self
644 .last_apply_at
645 .map(|instant| instant.elapsed().as_secs_f64()),
646 last_action: self.last_action.clone(),
647 last_action_age_seconds: self
648 .last_action_at
649 .map(|instant| instant.elapsed().as_secs_f64()),
650 pending_action,
651 pending_targets,
652 blocking_reason,
653 pending_process_restarts: self.pending_process_restarts.clone(),
654 control_plane_reload_required: self.control_plane_reload_required,
655 control_plane_restart_required: self.control_plane_restart_required,
656 runtime_differs_from_saved: self.runtime_differs_from_saved,
657 converged,
658 summary,
659 last_apply_plan: self.last_apply_plan.clone(),
660 }
661 }
662}
663
664#[derive(Debug, Clone)]
665pub struct ManagedProcessState {
666 pub name: String,
667 pub status: String,
668 pub ready: bool,
669 pub ready_state: String,
670 pub pid: Option<u32>,
671 pub last_exit_code: Option<i32>,
672 pub restart_count: u32,
673 pub last_error: Option<String>,
674 pub status_detail: Option<String>,
675 pub durable_log_path: Option<String>,
676 pub last_log_at: Option<Instant>,
677 pub recent_log_lines: usize,
678 pub started_at: Option<Instant>,
679 pub last_transition_at: Option<Instant>,
680}
681
682#[derive(Debug, Clone)]
683pub enum ProcessControlCommand {
684 Restart(String),
685 Start(String),
686 Stop(String),
687}
688
689impl ManagedProcessState {
690 pub fn new(name: String) -> Self {
691 Self {
692 name,
693 status: "stopped".into(),
694 ready: false,
695 ready_state: "stopped".into(),
696 pid: None,
697 last_exit_code: None,
698 restart_count: 0,
699 last_error: None,
700 status_detail: None,
701 durable_log_path: None,
702 last_log_at: None,
703 recent_log_lines: 0,
704 started_at: None,
705 last_transition_at: None,
706 }
707 }
708
709 pub fn uptime_seconds(&self) -> Option<f64> {
710 self.started_at
711 .map(|started| started.elapsed().as_secs_f64())
712 }
713
714 pub fn last_transition_seconds(&self) -> Option<f64> {
715 self.last_transition_at
716 .map(|transition| transition.elapsed().as_secs_f64())
717 }
718
719 pub fn last_log_age_seconds(&self) -> Option<f64> {
720 self.last_log_at
721 .map(|logged| logged.elapsed().as_secs_f64())
722 }
723}
724
725#[derive(Debug, Clone)]
728pub struct WsEvent {
729 pub topic: &'static str,
730 pub payload: serde_json::Value,
731}
732
733impl WsEvent {
734 pub fn announce(record: &AnnounceRecord) -> Self {
735 WsEvent {
736 topic: "announces",
737 payload: serde_json::to_value(record).unwrap_or_default(),
738 }
739 }
740
741 pub fn packet(record: &PacketRecord) -> Self {
742 WsEvent {
743 topic: "packets",
744 payload: serde_json::to_value(record).unwrap_or_default(),
745 }
746 }
747
748 pub fn proof(record: &ProofRecord) -> Self {
749 WsEvent {
750 topic: "proofs",
751 payload: serde_json::to_value(record).unwrap_or_default(),
752 }
753 }
754
755 pub fn link(record: &LinkEventRecord) -> Self {
756 WsEvent {
757 topic: "links",
758 payload: serde_json::to_value(record).unwrap_or_default(),
759 }
760 }
761
762 pub fn resource(record: &ResourceEventRecord) -> Self {
763 WsEvent {
764 topic: "resources",
765 payload: serde_json::to_value(record).unwrap_or_default(),
766 }
767 }
768
769 pub fn to_json(&self) -> String {
770 let obj = serde_json::json!({
771 "type": self.topic.trim_end_matches('s'),
772 "data": self.payload,
773 });
774 serde_json::to_string(&obj).unwrap_or_default()
775 }
776}
777
778pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
780 AnnounceRecord {
781 dest_hash: to_hex(&announced.dest_hash.0),
782 identity_hash: to_hex(&announced.identity_hash.0),
783 hops: announced.hops,
784 app_data: announced
785 .app_data
786 .as_ref()
787 .map(|d| crate::encode::to_base64(d)),
788 received_at: announced.received_at,
789 }
790}