1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, RwLock};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18 Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20 dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21 + Send
22 + Sync,
23>;
24
25pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29 pub started_at: Instant,
30 pub server_mode: String,
31 pub server_config: Option<ServerConfigSnapshot>,
32 pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33 pub server_config_status: ServerConfigStatusState,
34 pub server_config_validator: Option<ServerConfigValidator>,
35 pub server_config_mutator: Option<ServerConfigMutator>,
36 pub identity_hash: Option<[u8; 16]>,
37 pub identity: Option<Identity>,
38 pub announces: VecDeque<AnnounceRecord>,
39 pub packets: VecDeque<PacketRecord>,
40 pub proofs: VecDeque<ProofRecord>,
41 pub link_events: VecDeque<LinkEventRecord>,
42 pub resource_events: VecDeque<ResourceEventRecord>,
43 pub process_events: VecDeque<ProcessEventRecord>,
44 pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45 pub destinations: HashMap<[u8; 16], DestinationEntry>,
46 pub processes: HashMap<String, ManagedProcessState>,
47 pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48 pub control_plane_config: Option<ControlPlaneConfigHandle>,
49 pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52pub struct DestinationEntry {
54 pub destination: Destination,
55 pub full_name: String,
57}
58
59impl CtlState {
60 pub fn new() -> Self {
61 CtlState {
62 started_at: Instant::now(),
63 server_mode: "standalone".into(),
64 server_config: None,
65 server_config_schema: None,
66 server_config_status: ServerConfigStatusState::default(),
67 server_config_validator: None,
68 server_config_mutator: None,
69 identity_hash: None,
70 identity: None,
71 announces: VecDeque::new(),
72 packets: VecDeque::new(),
73 proofs: VecDeque::new(),
74 link_events: VecDeque::new(),
75 resource_events: VecDeque::new(),
76 process_events: VecDeque::new(),
77 process_logs: HashMap::new(),
78 destinations: HashMap::new(),
79 processes: HashMap::new(),
80 control_tx: None,
81 control_plane_config: None,
82 node_handle: None,
83 }
84 }
85
86 pub fn uptime_seconds(&self) -> f64 {
87 self.started_at.elapsed().as_secs_f64()
88 }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92 if deque.len() >= MAX_RECORDS {
93 deque.pop_front();
94 }
95 deque.push_back(item);
96}
97
98pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
99 let mut s = state.write().unwrap();
100 push_capped(&mut s.announces, record);
101}
102
103pub fn push_packet(state: &SharedState, record: PacketRecord) {
104 let mut s = state.write().unwrap();
105 push_capped(&mut s.packets, record);
106}
107
108pub fn push_proof(state: &SharedState, record: ProofRecord) {
109 let mut s = state.write().unwrap();
110 push_capped(&mut s.proofs, record);
111}
112
113pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
114 let mut s = state.write().unwrap();
115 push_capped(&mut s.link_events, record);
116}
117
118pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
119 let mut s = state.write().unwrap();
120 push_capped(&mut s.resource_events, record);
121}
122
123pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
125 let mut senders = ws.lock().unwrap();
126 senders.retain(|tx| tx.send(event.clone()).is_ok());
127}
128
129pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
130 let mut s = state.write().unwrap();
131 s.server_mode = mode.into();
132}
133
134pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
135 let mut s = state.write().unwrap();
136 s.server_config = Some(config);
137}
138
139pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
140 let mut s = state.write().unwrap();
141 s.server_config_schema = Some(schema);
142}
143
144pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
145 let mut s = state.write().unwrap();
146 s.server_config_status.last_saved_at = Some(Instant::now());
147 s.server_config_status.last_action = Some("save".into());
148 s.server_config_status.last_action_at = Some(Instant::now());
149 s.server_config_status.pending_process_restarts.clear();
150 s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
151 s.server_config_status.control_plane_restart_required =
152 apply_plan.control_plane_restart_required;
153 s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
154 || apply_plan.control_plane_reload_required
155 || apply_plan.control_plane_restart_required;
156 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
157}
158
159pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
160 let mut s = state.write().unwrap();
161 let now = Instant::now();
162 s.server_config_status.last_saved_at = Some(now);
163 s.server_config_status.last_apply_at = Some(now);
164 s.server_config_status.last_action = Some("apply".into());
165 s.server_config_status.last_action_at = Some(now);
166 s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
167 s.server_config_status.control_plane_reload_required = false;
168 s.server_config_status.control_plane_restart_required =
169 apply_plan.control_plane_restart_required;
170 s.server_config_status.runtime_differs_from_saved =
171 !s.server_config_status.pending_process_restarts.is_empty()
172 || s.server_config_status.control_plane_restart_required;
173 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
174}
175
176pub fn reconcile_config_status_for_process(
177 state: &SharedState,
178 name: &str,
179 ready: bool,
180 status: &str,
181) {
182 let mut s = state.write().unwrap();
183 if ready {
184 s.server_config_status
185 .pending_process_restarts
186 .retain(|process| process != name);
187 }
188 if status == "failed" {
189 s.server_config_status.runtime_differs_from_saved = true;
190 } else if s.server_config_status.pending_process_restarts.is_empty()
191 && !s.server_config_status.control_plane_reload_required
192 && !s.server_config_status.control_plane_restart_required
193 {
194 s.server_config_status.runtime_differs_from_saved = false;
195 }
196}
197
198pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
199 let mut s = state.write().unwrap();
200 s.server_config_validator = Some(validator);
201}
202
203pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
204 let mut s = state.write().unwrap();
205 s.server_config_mutator = Some(mutator);
206}
207
208pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
209 let mut s = state.write().unwrap();
210 let name = name.into();
211 s.processes
212 .entry(name.clone())
213 .or_insert_with(|| ManagedProcessState::new(name.clone()));
214 s.process_logs.entry(name.clone()).or_default();
215 push_capped(
216 &mut s.process_events,
217 ProcessEventRecord::new(name, "registered", Some("process registered".into())),
218 );
219}
220
221pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
222 let mut s = state.write().unwrap();
223 let recent_log_lines = {
224 let logs = s.process_logs.entry(name.to_string()).or_default();
225 if logs.len() >= MAX_RECORDS {
226 logs.pop_front();
227 }
228 logs.push_back(ProcessLogRecord {
229 process: name.to_string(),
230 stream: stream.to_string(),
231 line: line.into(),
232 recorded_at: Instant::now(),
233 });
234 logs.len()
235 };
236 let process = s
237 .processes
238 .entry(name.to_string())
239 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
240 process.last_log_at = Some(Instant::now());
241 process.recent_log_lines = recent_log_lines;
242}
243
244pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
245 let mut s = state.write().unwrap();
246 let process = s
247 .processes
248 .entry(name.to_string())
249 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
250 process.durable_log_path = Some(path.into());
251}
252
253pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
254 let mut s = state.write().unwrap();
255 s.control_tx = Some(tx);
256}
257
258pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
259 let mut s = state.write().unwrap();
260 s.control_plane_config = Some(config);
261}
262
263pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
264 let mut s = state.write().unwrap();
265 let process = s
266 .processes
267 .entry(name.to_string())
268 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
269 process.status = "running".into();
270 process.ready = false;
271 process.ready_state = "starting".into();
272 process.pid = Some(pid);
273 process.started_at = Some(Instant::now());
274 process.last_transition_at = Some(Instant::now());
275 process.last_error = None;
276 process.status_detail = Some("process spawned".into());
277 push_capped(
278 &mut s.process_events,
279 ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
280 );
281 drop(s);
282 reconcile_config_status_for_process(state, name, false, "running");
283}
284
285pub fn bump_process_restart_count(state: &SharedState, name: &str) {
286 let mut s = state.write().unwrap();
287 let restart_count = {
288 let process = s
289 .processes
290 .entry(name.to_string())
291 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
292 process.restart_count = process.restart_count.saturating_add(1);
293 process.restart_count
294 };
295 push_capped(
296 &mut s.process_events,
297 ProcessEventRecord::new(
298 name.to_string(),
299 "restart_requested",
300 Some(format!("restart_count={}", restart_count)),
301 ),
302 );
303}
304
305pub fn record_process_termination_observation(
306 state: &SharedState,
307 name: &str,
308 drain_acknowledged: bool,
309 forced_kill: bool,
310) {
311 let mut s = state.write().unwrap();
312 let detail = {
313 let process = s
314 .processes
315 .entry(name.to_string())
316 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
317 if drain_acknowledged {
318 process.drain_ack_count = process.drain_ack_count.saturating_add(1);
319 }
320 if forced_kill {
321 process.forced_kill_count = process.forced_kill_count.saturating_add(1);
322 }
323
324 let mut parts = Vec::new();
325 if drain_acknowledged {
326 parts.push(format!("drain_ack_count={}", process.drain_ack_count));
327 }
328 if forced_kill {
329 parts.push(format!("forced_kill_count={}", process.forced_kill_count));
330 }
331 (!parts.is_empty()).then(|| parts.join(", "))
332 };
333
334 if let Some(detail) = detail {
335 push_capped(
336 &mut s.process_events,
337 ProcessEventRecord::new(name.to_string(), "termination_observed", Some(detail)),
338 );
339 }
340}
341
342pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
343 let mut s = state.write().unwrap();
344 let process = s
345 .processes
346 .entry(name.to_string())
347 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
348 process.status = "stopped".into();
349 process.ready = false;
350 process.ready_state = "stopped".into();
351 process.pid = None;
352 process.last_exit_code = exit_code;
353 process.started_at = None;
354 process.last_transition_at = Some(Instant::now());
355 process.status_detail = Some("process stopped".into());
356 push_capped(
357 &mut s.process_events,
358 ProcessEventRecord::new(
359 name.to_string(),
360 "stopped",
361 Some(format!(
362 "exit_code={}",
363 exit_code
364 .map(|v| v.to_string())
365 .unwrap_or_else(|| "none".into())
366 )),
367 ),
368 );
369 drop(s);
370 reconcile_config_status_for_process(state, name, false, "stopped");
371}
372
373pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
374 let mut s = state.write().unwrap();
375 let detail = {
376 let process = s
377 .processes
378 .entry(name.to_string())
379 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
380 process.status = "failed".into();
381 process.ready = false;
382 process.ready_state = "failed".into();
383 process.pid = None;
384 process.last_error = Some(error);
385 process.started_at = None;
386 process.last_transition_at = Some(Instant::now());
387 process.status_detail = process.last_error.clone();
388 process.last_error.clone()
389 };
390 push_capped(
391 &mut s.process_events,
392 ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
393 );
394 drop(s);
395 reconcile_config_status_for_process(state, name, false, "failed");
396}
397
398pub fn set_process_readiness(
399 state: &SharedState,
400 name: &str,
401 ready: bool,
402 ready_state: &str,
403 status_detail: Option<String>,
404) {
405 let mut s = state.write().unwrap();
406 let detail_clone = {
407 let process = s
408 .processes
409 .entry(name.to_string())
410 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
411 process.ready = ready;
412 process.ready_state = ready_state.to_string();
413 process.status_detail = status_detail;
414 process.status_detail.clone()
415 };
416 let should_record = match s.process_events.back() {
417 Some(last) => {
418 last.process != name || last.event != ready_state || last.detail != detail_clone
419 }
420 None => true,
421 };
422 if should_record {
423 push_capped(
424 &mut s.process_events,
425 ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
426 );
427 }
428 drop(s);
429 reconcile_config_status_for_process(state, name, ready, ready_state);
430}
431
432#[derive(Debug, Clone, Serialize)]
435pub struct AnnounceRecord {
436 pub dest_hash: String,
437 pub identity_hash: String,
438 pub hops: u8,
439 pub app_data: Option<String>,
440 pub received_at: f64,
441}
442
443#[derive(Debug, Clone, Serialize)]
444pub struct PacketRecord {
445 pub dest_hash: String,
446 pub packet_hash: String,
447 pub data_base64: String,
448 pub received_at: f64,
449}
450
451#[derive(Debug, Clone, Serialize)]
452pub struct ProofRecord {
453 pub dest_hash: String,
454 pub packet_hash: String,
455 pub rtt: f64,
456}
457
458#[derive(Debug, Clone, Serialize)]
459pub struct LinkEventRecord {
460 pub link_id: String,
461 pub event_type: String,
462 pub is_initiator: Option<bool>,
463 pub rtt: Option<f64>,
464 pub identity_hash: Option<String>,
465 pub reason: Option<String>,
466}
467
468#[derive(Debug, Clone, Serialize)]
469pub struct ResourceEventRecord {
470 pub link_id: String,
471 pub event_type: String,
472 pub data_base64: Option<String>,
473 pub metadata_base64: Option<String>,
474 pub error: Option<String>,
475 pub received: Option<usize>,
476 pub total: Option<usize>,
477}
478
479#[derive(Debug, Clone)]
480pub struct ProcessEventRecord {
481 pub process: String,
482 pub event: String,
483 pub detail: Option<String>,
484 pub recorded_at: Instant,
485}
486
487#[derive(Debug, Clone)]
488pub struct ProcessLogRecord {
489 pub process: String,
490 pub stream: String,
491 pub line: String,
492 pub recorded_at: Instant,
493}
494
495impl ProcessEventRecord {
496 fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
497 Self {
498 process,
499 event: event.into(),
500 detail,
501 recorded_at: Instant::now(),
502 }
503 }
504}
505
506#[derive(Debug, Clone, Serialize)]
507pub struct ServerConfigSnapshot {
508 pub config_path: Option<String>,
509 pub resolved_config_dir: String,
510 pub server_config_file_path: String,
511 pub server_config_file_present: bool,
512 pub server_config_file_json: String,
513 pub stats_db_path: String,
514 pub rnsd_bin: String,
515 pub sentineld_bin: String,
516 pub statsd_bin: String,
517 pub http: ServerHttpConfigSnapshot,
518 pub launch_plan: Vec<LaunchProcessSnapshot>,
519}
520
521#[derive(Debug, Clone, Serialize)]
522pub struct ServerConfigSchemaSnapshot {
523 pub format: String,
524 pub example_config_json: String,
525 pub notes: Vec<String>,
526 pub fields: Vec<ServerConfigFieldSchema>,
527}
528
529#[derive(Debug, Clone, Serialize)]
530pub struct ServerConfigFieldSchema {
531 pub field: String,
532 pub field_type: String,
533 pub required: bool,
534 pub default_value: String,
535 pub description: String,
536 pub effect: String,
537}
538
539#[derive(Debug, Clone, Serialize)]
540pub struct ServerConfigStatusSnapshot {
541 pub last_saved_age_seconds: Option<f64>,
542 pub last_apply_age_seconds: Option<f64>,
543 pub last_action: Option<String>,
544 pub last_action_age_seconds: Option<f64>,
545 pub pending_action: Option<String>,
546 pub pending_targets: Vec<String>,
547 pub blocking_reason: Option<String>,
548 pub pending_process_restarts: Vec<String>,
549 pub control_plane_reload_required: bool,
550 pub control_plane_restart_required: bool,
551 pub runtime_differs_from_saved: bool,
552 pub converged: bool,
553 pub summary: String,
554 pub last_apply_plan: Option<ServerConfigApplyPlan>,
555}
556
557#[derive(Debug, Clone, Serialize)]
558pub struct ServerHttpConfigSnapshot {
559 pub enabled: bool,
560 pub host: String,
561 pub port: u16,
562 pub auth_mode: String,
563 pub token_configured: bool,
564 pub daemon_mode: bool,
565}
566
567#[derive(Debug, Clone, Serialize)]
568pub struct LaunchProcessSnapshot {
569 pub name: String,
570 pub bin: String,
571 pub args: Vec<String>,
572 pub command_line: String,
573}
574
575#[derive(Debug, Clone, Serialize)]
576pub struct ServerConfigValidationSnapshot {
577 pub valid: bool,
578 pub config: ServerConfigSnapshot,
579 pub warnings: Vec<String>,
580}
581
582#[derive(Debug, Clone, Serialize)]
583pub struct ServerConfigMutationResult {
584 pub action: String,
585 pub config: ServerConfigSnapshot,
586 pub apply_plan: ServerConfigApplyPlan,
587 pub warnings: Vec<String>,
588}
589
590#[derive(Debug, Clone, Serialize)]
591pub struct ServerConfigApplyPlan {
592 pub overall_action: String,
593 pub processes_to_restart: Vec<String>,
594 pub control_plane_reload_required: bool,
595 pub control_plane_restart_required: bool,
596 pub notes: Vec<String>,
597 pub changes: Vec<ServerConfigChange>,
598}
599
600#[derive(Debug, Clone, Serialize)]
601pub struct ServerConfigChange {
602 pub field: String,
603 pub before: String,
604 pub after: String,
605 pub effect: String,
606}
607
608#[derive(Debug, Clone, Copy)]
609pub enum ServerConfigMutationMode {
610 Save,
611 Apply,
612}
613
614#[derive(Debug, Clone, Default)]
615pub struct ServerConfigStatusState {
616 pub last_saved_at: Option<Instant>,
617 pub last_apply_at: Option<Instant>,
618 pub last_action: Option<String>,
619 pub last_action_at: Option<Instant>,
620 pub pending_process_restarts: Vec<String>,
621 pub control_plane_reload_required: bool,
622 pub control_plane_restart_required: bool,
623 pub runtime_differs_from_saved: bool,
624 pub last_apply_plan: Option<ServerConfigApplyPlan>,
625}
626
627impl ServerConfigStatusState {
628 pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
629 let converged = self.pending_process_restarts.is_empty()
630 && !self.control_plane_reload_required
631 && !self.control_plane_restart_required;
632 let pending_action = (!converged)
633 .then(|| {
634 self.last_apply_plan
635 .as_ref()
636 .map(|plan| plan.overall_action.clone())
637 })
638 .flatten();
639 let mut pending_targets = self.pending_process_restarts.clone();
640 if self.control_plane_reload_required {
641 pending_targets.push("embedded-http-auth".into());
642 }
643 if self.control_plane_restart_required {
644 pending_targets.push("rns-server".into());
645 }
646 let blocking_reason = if self.control_plane_restart_required {
647 Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
648 } else if self.control_plane_reload_required {
649 Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
650 } else if !self.pending_process_restarts.is_empty() {
651 Some(format!(
652 "Waiting for restarted processes to become ready: {}.",
653 self.pending_process_restarts.join(", ")
654 ))
655 } else {
656 None
657 };
658 let summary = if self.runtime_differs_from_saved {
659 if self.control_plane_restart_required {
660 "Saved config is not fully active; rns-server restart is still required.".into()
661 } else if self.control_plane_reload_required {
662 "Saved config is not fully active; embedded HTTP auth reload is still required."
663 .into()
664 } else if self.pending_process_restarts.is_empty() {
665 "Saved config differs from runtime state.".into()
666 } else {
667 format!(
668 "Waiting for restarted processes to converge: {}.",
669 self.pending_process_restarts.join(", ")
670 )
671 }
672 } else {
673 "Running state is converged with the saved config.".into()
674 };
675
676 ServerConfigStatusSnapshot {
677 last_saved_age_seconds: self
678 .last_saved_at
679 .map(|instant| instant.elapsed().as_secs_f64()),
680 last_apply_age_seconds: self
681 .last_apply_at
682 .map(|instant| instant.elapsed().as_secs_f64()),
683 last_action: self.last_action.clone(),
684 last_action_age_seconds: self
685 .last_action_at
686 .map(|instant| instant.elapsed().as_secs_f64()),
687 pending_action,
688 pending_targets,
689 blocking_reason,
690 pending_process_restarts: self.pending_process_restarts.clone(),
691 control_plane_reload_required: self.control_plane_reload_required,
692 control_plane_restart_required: self.control_plane_restart_required,
693 runtime_differs_from_saved: self.runtime_differs_from_saved,
694 converged,
695 summary,
696 last_apply_plan: self.last_apply_plan.clone(),
697 }
698 }
699}
700
701#[derive(Debug, Clone)]
702pub struct ManagedProcessState {
703 pub name: String,
704 pub status: String,
705 pub ready: bool,
706 pub ready_state: String,
707 pub pid: Option<u32>,
708 pub last_exit_code: Option<i32>,
709 pub restart_count: u32,
710 pub drain_ack_count: u32,
711 pub forced_kill_count: u32,
712 pub last_error: Option<String>,
713 pub status_detail: Option<String>,
714 pub durable_log_path: Option<String>,
715 pub last_log_at: Option<Instant>,
716 pub recent_log_lines: usize,
717 pub started_at: Option<Instant>,
718 pub last_transition_at: Option<Instant>,
719}
720
721#[derive(Debug, Clone)]
722pub enum ProcessControlCommand {
723 Restart(String),
724 Start(String),
725 Stop(String),
726}
727
728impl ManagedProcessState {
729 pub fn new(name: String) -> Self {
730 Self {
731 name,
732 status: "stopped".into(),
733 ready: false,
734 ready_state: "stopped".into(),
735 pid: None,
736 last_exit_code: None,
737 restart_count: 0,
738 drain_ack_count: 0,
739 forced_kill_count: 0,
740 last_error: None,
741 status_detail: None,
742 durable_log_path: None,
743 last_log_at: None,
744 recent_log_lines: 0,
745 started_at: None,
746 last_transition_at: None,
747 }
748 }
749
750 pub fn uptime_seconds(&self) -> Option<f64> {
751 self.started_at
752 .map(|started| started.elapsed().as_secs_f64())
753 }
754
755 pub fn last_transition_seconds(&self) -> Option<f64> {
756 self.last_transition_at
757 .map(|transition| transition.elapsed().as_secs_f64())
758 }
759
760 pub fn last_log_age_seconds(&self) -> Option<f64> {
761 self.last_log_at
762 .map(|logged| logged.elapsed().as_secs_f64())
763 }
764}
765
766#[derive(Debug, Clone)]
769pub struct WsEvent {
770 pub topic: &'static str,
771 pub payload: serde_json::Value,
772}
773
774impl WsEvent {
775 pub fn announce(record: &AnnounceRecord) -> Self {
776 WsEvent {
777 topic: "announces",
778 payload: serde_json::to_value(record).unwrap_or_default(),
779 }
780 }
781
782 pub fn packet(record: &PacketRecord) -> Self {
783 WsEvent {
784 topic: "packets",
785 payload: serde_json::to_value(record).unwrap_or_default(),
786 }
787 }
788
789 pub fn proof(record: &ProofRecord) -> Self {
790 WsEvent {
791 topic: "proofs",
792 payload: serde_json::to_value(record).unwrap_or_default(),
793 }
794 }
795
796 pub fn link(record: &LinkEventRecord) -> Self {
797 WsEvent {
798 topic: "links",
799 payload: serde_json::to_value(record).unwrap_or_default(),
800 }
801 }
802
803 pub fn resource(record: &ResourceEventRecord) -> Self {
804 WsEvent {
805 topic: "resources",
806 payload: serde_json::to_value(record).unwrap_or_default(),
807 }
808 }
809
810 pub fn to_json(&self) -> String {
811 let obj = serde_json::json!({
812 "type": self.topic.trim_end_matches('s'),
813 "data": self.payload,
814 });
815 serde_json::to_string(&obj).unwrap_or_default()
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::{
822 mark_process_running, record_process_termination_observation, CtlState, SharedState,
823 };
824 use std::sync::{Arc, RwLock};
825
826 #[test]
827 fn termination_observation_tracks_drain_ack_and_forced_kill_counts() {
828 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
829 mark_process_running(&state, "rnsd", 1234);
830
831 record_process_termination_observation(&state, "rnsd", true, false);
832 record_process_termination_observation(&state, "rnsd", false, true);
833
834 let snapshot = {
835 let s = state.read().unwrap();
836 s.processes.get("rnsd").cloned().unwrap()
837 };
838 assert_eq!(snapshot.drain_ack_count, 1);
839 assert_eq!(snapshot.forced_kill_count, 1);
840 }
841}
842
843pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
845 AnnounceRecord {
846 dest_hash: to_hex(&announced.dest_hash.0),
847 identity_hash: to_hex(&announced.identity_hash.0),
848 hops: announced.hops,
849 app_data: announced
850 .app_data
851 .as_ref()
852 .map(|d| crate::encode::to_base64(d)),
853 received_at: announced.received_at,
854 }
855}