Skip to main content

rns_server/
control_plane.rs

1use std::sync::{mpsc, Arc, RwLock};
2
3use rns_ctl::state::{
4    ensure_process, note_server_config_applied, note_server_config_saved, set_control_tx,
5    set_server_config, set_server_config_mutator, set_server_config_schema,
6    set_server_config_validator, set_server_mode, CtlState, ProcessControlCommand, SharedState,
7};
8
9use crate::args::Args;
10use crate::config::ServerConfig;
11
12#[cfg(feature = "rns-hooks-wasm")]
13const MANAGED_PROCESSES: [&str; 3] = ["rnsd", "rns-sentineld", "rns-statsd"];
14
15#[cfg(not(feature = "rns-hooks-wasm"))]
16const MANAGED_PROCESSES: [&str; 1] = ["rnsd"];
17
18fn read_shared_state<'a>(
19    state: &'a SharedState,
20) -> std::sync::RwLockReadGuard<'a, rns_ctl::state::CtlState> {
21    match state.read() {
22        Ok(guard) => guard,
23        Err(poisoned) => {
24            log::error!("recovering from poisoned control-plane shared state read lock");
25            poisoned.into_inner()
26        }
27    }
28}
29
30pub fn new_supervised_state() -> (
31    SharedState,
32    mpsc::Sender<ProcessControlCommand>,
33    mpsc::Receiver<ProcessControlCommand>,
34) {
35    let shared_state: SharedState = Arc::new(RwLock::new(CtlState::new()));
36    let (control_tx, control_rx) = mpsc::channel();
37    set_server_mode(&shared_state, "supervised");
38    set_control_tx(&shared_state, control_tx.clone());
39    for process in MANAGED_PROCESSES {
40        ensure_process(&shared_state, process);
41    }
42    (shared_state, control_tx, control_rx)
43}
44
45pub fn install_config_bridge(shared_state: &SharedState, args: &Args, config: &ServerConfig) {
46    set_server_config(shared_state, config.snapshot());
47    set_server_config_schema(shared_state, config.schema_snapshot());
48    set_server_config_validator(
49        shared_state,
50        Arc::new({
51            let config = config.clone();
52            move |body| config.validate_json_with_current_context(body)
53        }),
54    );
55    set_server_config_mutator(
56        shared_state,
57        Arc::new({
58            let config = config.clone();
59            let args = args.clone();
60            let shared_state = shared_state.clone();
61            move |mode, body| {
62                let control_tx = {
63                    let s = read_shared_state(&shared_state);
64                    s.control_tx.clone()
65                };
66                let result = config.mutate_json_with_current_context(mode, body, control_tx)?;
67                match mode {
68                    rns_ctl::state::ServerConfigMutationMode::Save => {
69                        note_server_config_saved(&shared_state, &result.apply_plan);
70                    }
71                    rns_ctl::state::ServerConfigMutationMode::Apply => {
72                        let refreshed = ServerConfig::from_args(&args);
73                        reload_embedded_http_auth_if_needed(
74                            &shared_state,
75                            &config,
76                            &refreshed,
77                            &result.apply_plan,
78                        );
79                        note_server_config_applied(&shared_state, &result.apply_plan);
80                        set_server_config(&shared_state, refreshed.snapshot());
81                        return Ok(result);
82                    }
83                }
84                let refreshed = ServerConfig::from_args(&args);
85                set_server_config(&shared_state, refreshed.snapshot());
86                Ok(result)
87            }
88        }),
89    );
90}
91
92fn reload_embedded_http_auth_if_needed(
93    shared_state: &SharedState,
94    current: &ServerConfig,
95    next: &ServerConfig,
96    apply_plan: &rns_ctl::state::ServerConfigApplyPlan,
97) {
98    if !apply_plan.control_plane_reload_required || apply_plan.control_plane_restart_required {
99        return;
100    }
101    if current.http.auth_token == next.http.auth_token
102        && current.http.disable_auth == next.http.disable_auth
103    {
104        return;
105    }
106
107    let config_handle = {
108        let s = read_shared_state(shared_state);
109        s.control_plane_config.clone()
110    };
111    let Some(config_handle) = config_handle else {
112        return;
113    };
114
115    let mut config = match config_handle.write() {
116        Ok(guard) => guard,
117        Err(poisoned) => {
118            log::error!("recovering from poisoned embedded control-plane config write lock");
119            poisoned.into_inner()
120        }
121    };
122    config.auth_token = next.http.auth_token.clone();
123    config.disable_auth = next.http.disable_auth;
124    log::info!("reloaded embedded control-plane auth settings in place");
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn new_supervised_state_registers_managed_processes() {
133        let (shared_state, _tx, _rx) = new_supervised_state();
134        let state = shared_state.read().unwrap();
135        assert_eq!(state.server_mode, "supervised");
136        for process in MANAGED_PROCESSES {
137            assert!(state.processes.contains_key(process));
138        }
139        assert!(state.control_tx.is_some());
140    }
141}