rns_server/
control_plane.rs1use std::sync::{mpsc, Arc, RwLock};
2
3use rns_ctl::state::{
4 ensure_process, note_server_config_applied, note_server_config_saved, set_control_tx,
5 set_server_config, set_server_config_mutator, set_server_config_schema,
6 set_server_config_validator, set_server_mode, CtlState, ProcessControlCommand, SharedState,
7};
8
9use crate::args::Args;
10use crate::config::ServerConfig;
11
12#[cfg(feature = "rns-hooks-wasm")]
13const MANAGED_PROCESSES: [&str; 3] = ["rnsd", "rns-sentineld", "rns-statsd"];
14
15#[cfg(not(feature = "rns-hooks-wasm"))]
16const MANAGED_PROCESSES: [&str; 1] = ["rnsd"];
17
18fn read_shared_state<'a>(
19 state: &'a SharedState,
20) -> std::sync::RwLockReadGuard<'a, rns_ctl::state::CtlState> {
21 match state.read() {
22 Ok(guard) => guard,
23 Err(poisoned) => {
24 log::error!("recovering from poisoned control-plane shared state read lock");
25 poisoned.into_inner()
26 }
27 }
28}
29
30pub fn new_supervised_state() -> (
31 SharedState,
32 mpsc::Sender<ProcessControlCommand>,
33 mpsc::Receiver<ProcessControlCommand>,
34) {
35 let shared_state: SharedState = Arc::new(RwLock::new(CtlState::new()));
36 let (control_tx, control_rx) = mpsc::channel();
37 set_server_mode(&shared_state, "supervised");
38 set_control_tx(&shared_state, control_tx.clone());
39 for process in MANAGED_PROCESSES {
40 ensure_process(&shared_state, process);
41 }
42 (shared_state, control_tx, control_rx)
43}
44
45pub fn install_config_bridge(shared_state: &SharedState, args: &Args, config: &ServerConfig) {
46 set_server_config(shared_state, config.snapshot());
47 set_server_config_schema(shared_state, config.schema_snapshot());
48 set_server_config_validator(
49 shared_state,
50 Arc::new({
51 let config = config.clone();
52 move |body| config.validate_json_with_current_context(body)
53 }),
54 );
55 set_server_config_mutator(
56 shared_state,
57 Arc::new({
58 let config = config.clone();
59 let args = args.clone();
60 let shared_state = shared_state.clone();
61 move |mode, body| {
62 let control_tx = {
63 let s = read_shared_state(&shared_state);
64 s.control_tx.clone()
65 };
66 let result = config.mutate_json_with_current_context(mode, body, control_tx)?;
67 match mode {
68 rns_ctl::state::ServerConfigMutationMode::Save => {
69 note_server_config_saved(&shared_state, &result.apply_plan);
70 }
71 rns_ctl::state::ServerConfigMutationMode::Apply => {
72 let refreshed = ServerConfig::from_args(&args);
73 reload_embedded_http_auth_if_needed(
74 &shared_state,
75 &config,
76 &refreshed,
77 &result.apply_plan,
78 );
79 note_server_config_applied(&shared_state, &result.apply_plan);
80 set_server_config(&shared_state, refreshed.snapshot());
81 return Ok(result);
82 }
83 }
84 let refreshed = ServerConfig::from_args(&args);
85 set_server_config(&shared_state, refreshed.snapshot());
86 Ok(result)
87 }
88 }),
89 );
90}
91
92fn reload_embedded_http_auth_if_needed(
93 shared_state: &SharedState,
94 current: &ServerConfig,
95 next: &ServerConfig,
96 apply_plan: &rns_ctl::state::ServerConfigApplyPlan,
97) {
98 if !apply_plan.control_plane_reload_required || apply_plan.control_plane_restart_required {
99 return;
100 }
101 if current.http.auth_token == next.http.auth_token
102 && current.http.disable_auth == next.http.disable_auth
103 {
104 return;
105 }
106
107 let config_handle = {
108 let s = read_shared_state(shared_state);
109 s.control_plane_config.clone()
110 };
111 let Some(config_handle) = config_handle else {
112 return;
113 };
114
115 let mut config = match config_handle.write() {
116 Ok(guard) => guard,
117 Err(poisoned) => {
118 log::error!("recovering from poisoned embedded control-plane config write lock");
119 poisoned.into_inner()
120 }
121 };
122 config.auth_token = next.http.auth_token.clone();
123 config.disable_auth = next.http.disable_auth;
124 log::info!("reloaded embedded control-plane auth settings in place");
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn new_supervised_state_registers_managed_processes() {
133 let (shared_state, _tx, _rx) = new_supervised_state();
134 let state = shared_state.read().unwrap();
135 assert_eq!(state.server_mode, "supervised");
136 for process in MANAGED_PROCESSES {
137 assert!(state.processes.contains_key(process));
138 }
139 assert!(state.control_tx.is_some());
140 }
141}