1use std::collections::{HashMap, VecDeque};
2use std::sync::{mpsc, Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
3use std::time::Instant;
4
5use serde::Serialize;
6
7use rns_crypto::identity::Identity;
8use rns_net::{Destination, RnsNode};
9
10use crate::encode::to_hex;
11
12const MAX_RECORDS: usize = 1000;
13
14pub type SharedState = Arc<RwLock<CtlState>>;
16pub type ControlPlaneConfigHandle = Arc<RwLock<crate::config::CtlConfig>>;
17pub type ServerConfigValidator =
18 Arc<dyn Fn(&[u8]) -> Result<ServerConfigValidationSnapshot, String> + Send + Sync>;
19pub type ServerConfigMutator = Arc<
20 dyn Fn(ServerConfigMutationMode, &[u8]) -> Result<ServerConfigMutationResult, String>
21 + Send
22 + Sync,
23>;
24
25pub type WsBroadcast = Arc<Mutex<Vec<std::sync::mpsc::Sender<WsEvent>>>>;
27
28pub struct CtlState {
29 pub started_at: Instant,
30 pub server_mode: String,
31 pub server_config: Option<ServerConfigSnapshot>,
32 pub server_config_schema: Option<ServerConfigSchemaSnapshot>,
33 pub server_config_status: ServerConfigStatusState,
34 pub server_config_validator: Option<ServerConfigValidator>,
35 pub server_config_mutator: Option<ServerConfigMutator>,
36 pub identity_hash: Option<[u8; 16]>,
37 pub identity: Option<Identity>,
38 pub announces: VecDeque<AnnounceRecord>,
39 pub packets: VecDeque<PacketRecord>,
40 pub proofs: VecDeque<ProofRecord>,
41 pub link_events: VecDeque<LinkEventRecord>,
42 pub resource_events: VecDeque<ResourceEventRecord>,
43 pub process_events: VecDeque<ProcessEventRecord>,
44 pub process_logs: HashMap<String, VecDeque<ProcessLogRecord>>,
45 pub destinations: HashMap<[u8; 16], DestinationEntry>,
46 pub processes: HashMap<String, ManagedProcessState>,
47 pub control_tx: Option<mpsc::Sender<ProcessControlCommand>>,
48 pub control_plane_config: Option<ControlPlaneConfigHandle>,
49 pub node_handle: Option<Arc<Mutex<Option<RnsNode>>>>,
50}
51
52pub struct DestinationEntry {
54 pub destination: Destination,
55 pub full_name: String,
57}
58
59impl CtlState {
60 pub fn new() -> Self {
61 CtlState {
62 started_at: Instant::now(),
63 server_mode: "standalone".into(),
64 server_config: None,
65 server_config_schema: None,
66 server_config_status: ServerConfigStatusState::default(),
67 server_config_validator: None,
68 server_config_mutator: None,
69 identity_hash: None,
70 identity: None,
71 announces: VecDeque::new(),
72 packets: VecDeque::new(),
73 proofs: VecDeque::new(),
74 link_events: VecDeque::new(),
75 resource_events: VecDeque::new(),
76 process_events: VecDeque::new(),
77 process_logs: HashMap::new(),
78 destinations: HashMap::new(),
79 processes: HashMap::new(),
80 control_tx: None,
81 control_plane_config: None,
82 node_handle: None,
83 }
84 }
85
86 pub fn uptime_seconds(&self) -> f64 {
87 self.started_at.elapsed().as_secs_f64()
88 }
89}
90
91fn push_capped<T>(deque: &mut VecDeque<T>, item: T) {
92 if deque.len() >= MAX_RECORDS {
93 deque.pop_front();
94 }
95 deque.push_back(item);
96}
97
98pub(crate) fn read_state<'a>(state: &'a SharedState) -> RwLockReadGuard<'a, CtlState> {
99 match state.read() {
100 Ok(guard) => guard,
101 Err(poisoned) => {
102 log::error!("recovering from poisoned control-plane shared state read lock");
103 poisoned.into_inner()
104 }
105 }
106}
107
108pub(crate) fn write_state<'a>(state: &'a SharedState) -> RwLockWriteGuard<'a, CtlState> {
109 match state.write() {
110 Ok(guard) => guard,
111 Err(poisoned) => {
112 log::error!("recovering from poisoned control-plane shared state write lock");
113 poisoned.into_inner()
114 }
115 }
116}
117
118pub(crate) fn read_control_plane_config<'a>(
119 config: &'a ControlPlaneConfigHandle,
120) -> RwLockReadGuard<'a, crate::config::CtlConfig> {
121 match config.read() {
122 Ok(guard) => guard,
123 Err(poisoned) => {
124 log::error!("recovering from poisoned control-plane config read lock");
125 poisoned.into_inner()
126 }
127 }
128}
129
130pub(crate) fn lock_ws_broadcast<'a>(
131 ws: &'a WsBroadcast,
132) -> MutexGuard<'a, Vec<std::sync::mpsc::Sender<WsEvent>>> {
133 match ws.lock() {
134 Ok(guard) => guard,
135 Err(poisoned) => {
136 log::error!("recovering from poisoned WebSocket broadcast registry");
137 poisoned.into_inner()
138 }
139 }
140}
141
142pub(crate) fn lock_node_handle<'a>(
143 node: &'a Arc<Mutex<Option<RnsNode>>>,
144) -> MutexGuard<'a, Option<RnsNode>> {
145 match node.lock() {
146 Ok(guard) => guard,
147 Err(poisoned) => {
148 log::error!("recovering from poisoned node handle lock");
149 poisoned.into_inner()
150 }
151 }
152}
153
154pub fn push_announce(state: &SharedState, record: AnnounceRecord) {
155 let mut s = write_state(state);
156 push_capped(&mut s.announces, record);
157}
158
159pub fn push_packet(state: &SharedState, record: PacketRecord) {
160 let mut s = write_state(state);
161 push_capped(&mut s.packets, record);
162}
163
164pub fn push_proof(state: &SharedState, record: ProofRecord) {
165 let mut s = write_state(state);
166 push_capped(&mut s.proofs, record);
167}
168
169pub fn push_link_event(state: &SharedState, record: LinkEventRecord) {
170 let mut s = write_state(state);
171 push_capped(&mut s.link_events, record);
172}
173
174pub fn push_resource_event(state: &SharedState, record: ResourceEventRecord) {
175 let mut s = write_state(state);
176 push_capped(&mut s.resource_events, record);
177}
178
179pub fn broadcast(ws: &WsBroadcast, event: WsEvent) {
181 let mut senders = lock_ws_broadcast(ws);
182 senders.retain(|tx| tx.send(event.clone()).is_ok());
183}
184
185pub fn set_server_mode(state: &SharedState, mode: impl Into<String>) {
186 let mut s = write_state(state);
187 s.server_mode = mode.into();
188}
189
190pub fn set_server_config(state: &SharedState, config: ServerConfigSnapshot) {
191 let mut s = write_state(state);
192 s.server_config = Some(config);
193}
194
195pub fn set_server_config_schema(state: &SharedState, schema: ServerConfigSchemaSnapshot) {
196 let mut s = write_state(state);
197 s.server_config_schema = Some(schema);
198}
199
200pub fn note_server_config_saved(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
201 let mut s = write_state(state);
202 s.server_config_status.last_saved_at = Some(Instant::now());
203 s.server_config_status.last_action = Some("save".into());
204 s.server_config_status.last_action_at = Some(Instant::now());
205 s.server_config_status.pending_process_restarts.clear();
206 s.server_config_status.control_plane_reload_required = apply_plan.control_plane_reload_required;
207 s.server_config_status.control_plane_restart_required =
208 apply_plan.control_plane_restart_required;
209 s.server_config_status.runtime_differs_from_saved = !apply_plan.processes_to_restart.is_empty()
210 || apply_plan.control_plane_reload_required
211 || apply_plan.control_plane_restart_required;
212 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
213}
214
215pub fn note_server_config_applied(state: &SharedState, apply_plan: &ServerConfigApplyPlan) {
216 let mut s = write_state(state);
217 let now = Instant::now();
218 s.server_config_status.last_saved_at = Some(now);
219 s.server_config_status.last_apply_at = Some(now);
220 s.server_config_status.last_action = Some("apply".into());
221 s.server_config_status.last_action_at = Some(now);
222 s.server_config_status.pending_process_restarts = apply_plan.processes_to_restart.clone();
223 s.server_config_status.control_plane_reload_required = false;
224 s.server_config_status.control_plane_restart_required =
225 apply_plan.control_plane_restart_required;
226 s.server_config_status.runtime_differs_from_saved =
227 !s.server_config_status.pending_process_restarts.is_empty()
228 || s.server_config_status.control_plane_restart_required;
229 s.server_config_status.last_apply_plan = Some(apply_plan.clone());
230}
231
232pub fn reconcile_config_status_for_process(
233 state: &SharedState,
234 name: &str,
235 ready: bool,
236 status: &str,
237) {
238 let mut s = write_state(state);
239 if ready {
240 s.server_config_status
241 .pending_process_restarts
242 .retain(|process| process != name);
243 }
244 if status == "failed" {
245 s.server_config_status.runtime_differs_from_saved = true;
246 } else if s.server_config_status.pending_process_restarts.is_empty()
247 && !s.server_config_status.control_plane_reload_required
248 && !s.server_config_status.control_plane_restart_required
249 {
250 s.server_config_status.runtime_differs_from_saved = false;
251 }
252}
253
254pub fn set_server_config_validator(state: &SharedState, validator: ServerConfigValidator) {
255 let mut s = write_state(state);
256 s.server_config_validator = Some(validator);
257}
258
259pub fn set_server_config_mutator(state: &SharedState, mutator: ServerConfigMutator) {
260 let mut s = write_state(state);
261 s.server_config_mutator = Some(mutator);
262}
263
264pub fn ensure_process(state: &SharedState, name: impl Into<String>) {
265 let mut s = write_state(state);
266 let name = name.into();
267 s.processes
268 .entry(name.clone())
269 .or_insert_with(|| ManagedProcessState::new(name.clone()));
270 s.process_logs.entry(name.clone()).or_default();
271 push_capped(
272 &mut s.process_events,
273 ProcessEventRecord::new(name, "registered", Some("process registered".into())),
274 );
275}
276
277pub fn push_process_log(state: &SharedState, name: &str, stream: &str, line: impl Into<String>) {
278 let mut s = write_state(state);
279 let recent_log_lines = {
280 let logs = s.process_logs.entry(name.to_string()).or_default();
281 if logs.len() >= MAX_RECORDS {
282 logs.pop_front();
283 }
284 logs.push_back(ProcessLogRecord {
285 process: name.to_string(),
286 stream: stream.to_string(),
287 line: line.into(),
288 recorded_at: Instant::now(),
289 });
290 logs.len()
291 };
292 let process = s
293 .processes
294 .entry(name.to_string())
295 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
296 process.last_log_at = Some(Instant::now());
297 process.recent_log_lines = recent_log_lines;
298}
299
300pub fn set_process_log_path(state: &SharedState, name: &str, path: impl Into<String>) {
301 let mut s = write_state(state);
302 let process = s
303 .processes
304 .entry(name.to_string())
305 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
306 process.durable_log_path = Some(path.into());
307}
308
309pub fn set_control_tx(state: &SharedState, tx: mpsc::Sender<ProcessControlCommand>) {
310 let mut s = write_state(state);
311 s.control_tx = Some(tx);
312}
313
314pub fn set_control_plane_config(state: &SharedState, config: ControlPlaneConfigHandle) {
315 let mut s = write_state(state);
316 s.control_plane_config = Some(config);
317}
318
319pub fn mark_process_running(state: &SharedState, name: &str, pid: u32) {
320 let mut s = write_state(state);
321 let process = s
322 .processes
323 .entry(name.to_string())
324 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
325 process.status = "running".into();
326 process.ready = false;
327 process.ready_state = "starting".into();
328 process.pid = Some(pid);
329 process.started_at = Some(Instant::now());
330 process.last_transition_at = Some(Instant::now());
331 process.last_error = None;
332 process.status_detail = Some("process spawned".into());
333 push_capped(
334 &mut s.process_events,
335 ProcessEventRecord::new(name.to_string(), "running", Some(format!("pid={}", pid))),
336 );
337 drop(s);
338 reconcile_config_status_for_process(state, name, false, "running");
339}
340
341pub fn bump_process_restart_count(state: &SharedState, name: &str) {
342 let mut s = write_state(state);
343 let restart_count = {
344 let process = s
345 .processes
346 .entry(name.to_string())
347 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
348 process.restart_count = process.restart_count.saturating_add(1);
349 process.restart_count
350 };
351 push_capped(
352 &mut s.process_events,
353 ProcessEventRecord::new(
354 name.to_string(),
355 "restart_requested",
356 Some(format!("restart_count={}", restart_count)),
357 ),
358 );
359}
360
361pub fn record_process_termination_observation(
362 state: &SharedState,
363 name: &str,
364 drain_acknowledged: bool,
365 forced_kill: bool,
366) {
367 let mut s = write_state(state);
368 let detail = {
369 let process = s
370 .processes
371 .entry(name.to_string())
372 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
373 if drain_acknowledged {
374 process.drain_ack_count = process.drain_ack_count.saturating_add(1);
375 }
376 if forced_kill {
377 process.forced_kill_count = process.forced_kill_count.saturating_add(1);
378 }
379
380 let mut parts = Vec::new();
381 if drain_acknowledged {
382 parts.push(format!("drain_ack_count={}", process.drain_ack_count));
383 }
384 if forced_kill {
385 parts.push(format!("forced_kill_count={}", process.forced_kill_count));
386 }
387 (!parts.is_empty()).then(|| parts.join(", "))
388 };
389
390 if let Some(detail) = detail {
391 push_capped(
392 &mut s.process_events,
393 ProcessEventRecord::new(name.to_string(), "termination_observed", Some(detail)),
394 );
395 }
396}
397
398pub fn mark_process_stopped(state: &SharedState, name: &str, exit_code: Option<i32>) {
399 let mut s = write_state(state);
400 let process = s
401 .processes
402 .entry(name.to_string())
403 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
404 process.status = "stopped".into();
405 process.ready = false;
406 process.ready_state = "stopped".into();
407 process.pid = None;
408 process.last_exit_code = exit_code;
409 process.started_at = None;
410 process.last_transition_at = Some(Instant::now());
411 process.status_detail = Some("process stopped".into());
412 push_capped(
413 &mut s.process_events,
414 ProcessEventRecord::new(
415 name.to_string(),
416 "stopped",
417 Some(format!(
418 "exit_code={}",
419 exit_code
420 .map(|v| v.to_string())
421 .unwrap_or_else(|| "none".into())
422 )),
423 ),
424 );
425 drop(s);
426 reconcile_config_status_for_process(state, name, false, "stopped");
427}
428
429pub fn mark_process_failed_spawn(state: &SharedState, name: &str, error: String) {
430 let mut s = write_state(state);
431 let detail = {
432 let process = s
433 .processes
434 .entry(name.to_string())
435 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
436 process.status = "failed".into();
437 process.ready = false;
438 process.ready_state = "failed".into();
439 process.pid = None;
440 process.last_error = Some(error);
441 process.started_at = None;
442 process.last_transition_at = Some(Instant::now());
443 process.status_detail = process.last_error.clone();
444 process.last_error.clone()
445 };
446 push_capped(
447 &mut s.process_events,
448 ProcessEventRecord::new(name.to_string(), "spawn_failed", detail),
449 );
450 drop(s);
451 reconcile_config_status_for_process(state, name, false, "failed");
452}
453
454pub fn set_process_readiness(
455 state: &SharedState,
456 name: &str,
457 ready: bool,
458 ready_state: &str,
459 status_detail: Option<String>,
460) {
461 let mut s = write_state(state);
462 let detail_clone = {
463 let process = s
464 .processes
465 .entry(name.to_string())
466 .or_insert_with(|| ManagedProcessState::new(name.to_string()));
467 process.ready = ready;
468 process.ready_state = ready_state.to_string();
469 process.status_detail = status_detail;
470 process.status_detail.clone()
471 };
472 let should_record = match s.process_events.back() {
473 Some(last) => {
474 last.process != name || last.event != ready_state || last.detail != detail_clone
475 }
476 None => true,
477 };
478 if should_record {
479 push_capped(
480 &mut s.process_events,
481 ProcessEventRecord::new(name.to_string(), ready_state.to_string(), detail_clone),
482 );
483 }
484 drop(s);
485 reconcile_config_status_for_process(state, name, ready, ready_state);
486}
487
488#[derive(Debug, Clone, Serialize)]
491pub struct AnnounceRecord {
492 pub dest_hash: String,
493 pub identity_hash: String,
494 pub hops: u8,
495 pub app_data: Option<String>,
496 pub received_at: f64,
497}
498
499#[derive(Debug, Clone, Serialize)]
500pub struct PacketRecord {
501 pub dest_hash: String,
502 pub packet_hash: String,
503 pub data_base64: String,
504 pub received_at: f64,
505}
506
507#[derive(Debug, Clone, Serialize)]
508pub struct ProofRecord {
509 pub dest_hash: String,
510 pub packet_hash: String,
511 pub rtt: f64,
512}
513
514#[derive(Debug, Clone, Serialize)]
515pub struct LinkEventRecord {
516 pub link_id: String,
517 pub event_type: String,
518 pub is_initiator: Option<bool>,
519 pub rtt: Option<f64>,
520 pub identity_hash: Option<String>,
521 pub reason: Option<String>,
522}
523
524#[derive(Debug, Clone, Serialize)]
525pub struct ResourceEventRecord {
526 pub link_id: String,
527 pub event_type: String,
528 pub data_base64: Option<String>,
529 pub metadata_base64: Option<String>,
530 pub error: Option<String>,
531 pub received: Option<usize>,
532 pub total: Option<usize>,
533}
534
535#[derive(Debug, Clone)]
536pub struct ProcessEventRecord {
537 pub process: String,
538 pub event: String,
539 pub detail: Option<String>,
540 pub recorded_at: Instant,
541}
542
543#[derive(Debug, Clone)]
544pub struct ProcessLogRecord {
545 pub process: String,
546 pub stream: String,
547 pub line: String,
548 pub recorded_at: Instant,
549}
550
551impl ProcessEventRecord {
552 fn new(process: String, event: impl Into<String>, detail: Option<String>) -> Self {
553 Self {
554 process,
555 event: event.into(),
556 detail,
557 recorded_at: Instant::now(),
558 }
559 }
560}
561
562#[derive(Debug, Clone, Serialize)]
563pub struct ServerConfigSnapshot {
564 pub config_path: Option<String>,
565 pub resolved_config_dir: String,
566 pub server_config_file_path: String,
567 pub server_config_file_present: bool,
568 pub server_config_file_json: String,
569 pub stats_db_path: String,
570 pub rnsd_bin: String,
571 pub sentineld_bin: String,
572 pub statsd_bin: String,
573 pub http: ServerHttpConfigSnapshot,
574 pub launch_plan: Vec<LaunchProcessSnapshot>,
575}
576
577#[derive(Debug, Clone, Serialize)]
578pub struct ServerConfigSchemaSnapshot {
579 pub format: String,
580 pub example_config_json: String,
581 pub notes: Vec<String>,
582 pub fields: Vec<ServerConfigFieldSchema>,
583}
584
585#[derive(Debug, Clone, Serialize)]
586pub struct ServerConfigFieldSchema {
587 pub field: String,
588 pub field_type: String,
589 pub required: bool,
590 pub default_value: String,
591 pub description: String,
592 pub effect: String,
593}
594
595#[derive(Debug, Clone, Serialize)]
596pub struct ServerConfigStatusSnapshot {
597 pub last_saved_age_seconds: Option<f64>,
598 pub last_apply_age_seconds: Option<f64>,
599 pub last_action: Option<String>,
600 pub last_action_age_seconds: Option<f64>,
601 pub pending_action: Option<String>,
602 pub pending_targets: Vec<String>,
603 pub blocking_reason: Option<String>,
604 pub pending_process_restarts: Vec<String>,
605 pub control_plane_reload_required: bool,
606 pub control_plane_restart_required: bool,
607 pub runtime_differs_from_saved: bool,
608 pub converged: bool,
609 pub summary: String,
610 pub last_apply_plan: Option<ServerConfigApplyPlan>,
611}
612
613#[derive(Debug, Clone, Serialize)]
614pub struct ServerHttpConfigSnapshot {
615 pub enabled: bool,
616 pub host: String,
617 pub port: u16,
618 pub auth_mode: String,
619 pub token_configured: bool,
620 pub daemon_mode: bool,
621}
622
623#[derive(Debug, Clone, Serialize)]
624pub struct LaunchProcessSnapshot {
625 pub name: String,
626 pub bin: String,
627 pub args: Vec<String>,
628 pub command_line: String,
629}
630
631#[derive(Debug, Clone, Serialize)]
632pub struct ServerConfigValidationSnapshot {
633 pub valid: bool,
634 pub config: ServerConfigSnapshot,
635 pub warnings: Vec<String>,
636}
637
638#[derive(Debug, Clone, Serialize)]
639pub struct ServerConfigMutationResult {
640 pub action: String,
641 pub config: ServerConfigSnapshot,
642 pub apply_plan: ServerConfigApplyPlan,
643 pub warnings: Vec<String>,
644}
645
646#[derive(Debug, Clone, Serialize)]
647pub struct ServerConfigApplyPlan {
648 pub overall_action: String,
649 pub processes_to_restart: Vec<String>,
650 pub control_plane_reload_required: bool,
651 pub control_plane_restart_required: bool,
652 pub notes: Vec<String>,
653 pub changes: Vec<ServerConfigChange>,
654}
655
656#[derive(Debug, Clone, Serialize)]
657pub struct ServerConfigChange {
658 pub field: String,
659 pub before: String,
660 pub after: String,
661 pub effect: String,
662}
663
664#[derive(Debug, Clone, Copy)]
665pub enum ServerConfigMutationMode {
666 Save,
667 Apply,
668}
669
670#[derive(Debug, Clone, Default)]
671pub struct ServerConfigStatusState {
672 pub last_saved_at: Option<Instant>,
673 pub last_apply_at: Option<Instant>,
674 pub last_action: Option<String>,
675 pub last_action_at: Option<Instant>,
676 pub pending_process_restarts: Vec<String>,
677 pub control_plane_reload_required: bool,
678 pub control_plane_restart_required: bool,
679 pub runtime_differs_from_saved: bool,
680 pub last_apply_plan: Option<ServerConfigApplyPlan>,
681}
682
683impl ServerConfigStatusState {
684 pub fn snapshot(&self) -> ServerConfigStatusSnapshot {
685 let converged = self.pending_process_restarts.is_empty()
686 && !self.control_plane_reload_required
687 && !self.control_plane_restart_required;
688 let pending_action = (!converged)
689 .then(|| {
690 self.last_apply_plan
691 .as_ref()
692 .map(|plan| plan.overall_action.clone())
693 })
694 .flatten();
695 let mut pending_targets = self.pending_process_restarts.clone();
696 if self.control_plane_reload_required {
697 pending_targets.push("embedded-http-auth".into());
698 }
699 if self.control_plane_restart_required {
700 pending_targets.push("rns-server".into());
701 }
702 let blocking_reason = if self.control_plane_restart_required {
703 Some("Restart rns-server to apply embedded HTTP bind or enablement changes.".into())
704 } else if self.control_plane_reload_required {
705 Some("Apply config to reload embedded HTTP auth settings into the running control plane.".into())
706 } else if !self.pending_process_restarts.is_empty() {
707 Some(format!(
708 "Waiting for restarted processes to become ready: {}.",
709 self.pending_process_restarts.join(", ")
710 ))
711 } else {
712 None
713 };
714 let summary = if self.runtime_differs_from_saved {
715 if self.control_plane_restart_required {
716 "Saved config is not fully active; rns-server restart is still required.".into()
717 } else if self.control_plane_reload_required {
718 "Saved config is not fully active; embedded HTTP auth reload is still required."
719 .into()
720 } else if self.pending_process_restarts.is_empty() {
721 "Saved config differs from runtime state.".into()
722 } else {
723 format!(
724 "Waiting for restarted processes to converge: {}.",
725 self.pending_process_restarts.join(", ")
726 )
727 }
728 } else {
729 "Running state is converged with the saved config.".into()
730 };
731
732 ServerConfigStatusSnapshot {
733 last_saved_age_seconds: self
734 .last_saved_at
735 .map(|instant| instant.elapsed().as_secs_f64()),
736 last_apply_age_seconds: self
737 .last_apply_at
738 .map(|instant| instant.elapsed().as_secs_f64()),
739 last_action: self.last_action.clone(),
740 last_action_age_seconds: self
741 .last_action_at
742 .map(|instant| instant.elapsed().as_secs_f64()),
743 pending_action,
744 pending_targets,
745 blocking_reason,
746 pending_process_restarts: self.pending_process_restarts.clone(),
747 control_plane_reload_required: self.control_plane_reload_required,
748 control_plane_restart_required: self.control_plane_restart_required,
749 runtime_differs_from_saved: self.runtime_differs_from_saved,
750 converged,
751 summary,
752 last_apply_plan: self.last_apply_plan.clone(),
753 }
754 }
755}
756
757#[derive(Debug, Clone)]
758pub struct ManagedProcessState {
759 pub name: String,
760 pub status: String,
761 pub ready: bool,
762 pub ready_state: String,
763 pub pid: Option<u32>,
764 pub last_exit_code: Option<i32>,
765 pub restart_count: u32,
766 pub drain_ack_count: u32,
767 pub forced_kill_count: u32,
768 pub last_error: Option<String>,
769 pub status_detail: Option<String>,
770 pub durable_log_path: Option<String>,
771 pub last_log_at: Option<Instant>,
772 pub recent_log_lines: usize,
773 pub started_at: Option<Instant>,
774 pub last_transition_at: Option<Instant>,
775}
776
777#[derive(Debug, Clone)]
778pub enum ProcessControlCommand {
779 Restart(String),
780 Start(String),
781 Stop(String),
782}
783
784impl ManagedProcessState {
785 pub fn new(name: String) -> Self {
786 Self {
787 name,
788 status: "stopped".into(),
789 ready: false,
790 ready_state: "stopped".into(),
791 pid: None,
792 last_exit_code: None,
793 restart_count: 0,
794 drain_ack_count: 0,
795 forced_kill_count: 0,
796 last_error: None,
797 status_detail: None,
798 durable_log_path: None,
799 last_log_at: None,
800 recent_log_lines: 0,
801 started_at: None,
802 last_transition_at: None,
803 }
804 }
805
806 pub fn uptime_seconds(&self) -> Option<f64> {
807 self.started_at
808 .map(|started| started.elapsed().as_secs_f64())
809 }
810
811 pub fn last_transition_seconds(&self) -> Option<f64> {
812 self.last_transition_at
813 .map(|transition| transition.elapsed().as_secs_f64())
814 }
815
816 pub fn last_log_age_seconds(&self) -> Option<f64> {
817 self.last_log_at
818 .map(|logged| logged.elapsed().as_secs_f64())
819 }
820}
821
822#[derive(Debug, Clone)]
825pub struct WsEvent {
826 pub topic: &'static str,
827 pub payload: serde_json::Value,
828}
829
830impl WsEvent {
831 pub fn announce(record: &AnnounceRecord) -> Self {
832 WsEvent {
833 topic: "announces",
834 payload: serde_json::to_value(record).unwrap_or_default(),
835 }
836 }
837
838 pub fn packet(record: &PacketRecord) -> Self {
839 WsEvent {
840 topic: "packets",
841 payload: serde_json::to_value(record).unwrap_or_default(),
842 }
843 }
844
845 pub fn proof(record: &ProofRecord) -> Self {
846 WsEvent {
847 topic: "proofs",
848 payload: serde_json::to_value(record).unwrap_or_default(),
849 }
850 }
851
852 pub fn link(record: &LinkEventRecord) -> Self {
853 WsEvent {
854 topic: "links",
855 payload: serde_json::to_value(record).unwrap_or_default(),
856 }
857 }
858
859 pub fn resource(record: &ResourceEventRecord) -> Self {
860 WsEvent {
861 topic: "resources",
862 payload: serde_json::to_value(record).unwrap_or_default(),
863 }
864 }
865
866 pub fn to_json(&self) -> String {
867 let obj = serde_json::json!({
868 "type": self.topic.trim_end_matches('s'),
869 "data": self.payload,
870 });
871 serde_json::to_string(&obj).unwrap_or_default()
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::{
878 mark_process_running, record_process_termination_observation, CtlState, SharedState,
879 };
880 use std::sync::{Arc, RwLock};
881
882 #[test]
883 fn termination_observation_tracks_drain_ack_and_forced_kill_counts() {
884 let state: SharedState = Arc::new(RwLock::new(CtlState::new()));
885 mark_process_running(&state, "rnsd", 1234);
886
887 record_process_termination_observation(&state, "rnsd", true, false);
888 record_process_termination_observation(&state, "rnsd", false, true);
889
890 let snapshot = {
891 let s = state.read().unwrap();
892 s.processes.get("rnsd").cloned().unwrap()
893 };
894 assert_eq!(snapshot.drain_ack_count, 1);
895 assert_eq!(snapshot.forced_kill_count, 1);
896 }
897}
898
899pub fn make_announce_record(announced: &rns_net::AnnouncedIdentity) -> AnnounceRecord {
901 AnnounceRecord {
902 dest_hash: to_hex(&announced.dest_hash.0),
903 identity_hash: to_hex(&announced.identity_hash.0),
904 hops: announced.hops,
905 app_data: announced
906 .app_data
907 .as_ref()
908 .map(|d| crate::encode::to_base64(d)),
909 received_at: announced.received_at,
910 }
911}