Skip to main content

running_process/broker/server/
backend_registry.rs

1//! Verified backend registry keyed by broker instance, service, and version.
2
3use std::collections::HashMap;
4
5use crate::broker::backend_handle::BackendHandle;
6use crate::broker::protocol::ServiceDefinition;
7use crate::broker::server::hello_handler::RegisteredBackend;
8use crate::broker::server::instance::BrokerInstanceKey;
9
10/// Lookup key for one backend process.
11#[derive(Clone, Debug, PartialEq, Eq, Hash)]
12pub struct BackendKey {
13    /// Broker trust-domain instance.
14    pub instance: BrokerInstanceKey,
15    /// Logical service name.
16    pub service_name: String,
17    /// Service version.
18    pub service_version: String,
19}
20
21impl BackendKey {
22    /// Build a key from an instance and service tuple.
23    pub fn new(
24        instance: BrokerInstanceKey,
25        service_name: impl Into<String>,
26        service_version: impl Into<String>,
27    ) -> Self {
28        Self {
29            instance,
30            service_name: service_name.into(),
31            service_version: service_version.into(),
32        }
33    }
34}
35
36/// In-memory table of verified backend handles.
37#[derive(Default)]
38pub struct BackendRegistry {
39    entries: HashMap<BackendKey, BackendHandle>,
40}
41
42impl BackendRegistry {
43    /// Create an empty registry.
44    pub fn new() -> Self {
45        Self {
46            entries: HashMap::new(),
47        }
48    }
49
50    /// Number of registered backend handles.
51    pub fn len(&self) -> usize {
52        self.entries.len()
53    }
54
55    /// Return true when the registry has no entries.
56    pub fn is_empty(&self) -> bool {
57        self.entries.is_empty()
58    }
59
60    /// Insert or replace one verified backend handle.
61    pub fn insert(
62        &mut self,
63        instance: BrokerInstanceKey,
64        handle: BackendHandle,
65    ) -> Option<BackendHandle> {
66        let key = BackendKey::new(
67            instance,
68            handle.service_name.clone(),
69            handle.service_version.clone(),
70        );
71        self.entries.insert(key, handle)
72    }
73
74    /// Return one handle by exact instance/service/version key.
75    pub fn get(
76        &self,
77        instance: &BrokerInstanceKey,
78        service_name: &str,
79        service_version: &str,
80    ) -> Option<&BackendHandle> {
81        self.entries.get(&BackendKey::new(
82            instance.clone(),
83            service_name,
84            service_version,
85        ))
86    }
87
88    /// Iterate over all registered backend handles.
89    pub fn iter(&self) -> impl Iterator<Item = (&BackendKey, &BackendHandle)> {
90        self.entries.iter()
91    }
92
93    /// Remove backend handles whose verified process is no longer alive.
94    ///
95    /// Returns the removed keys so the lifecycle monitor can emit events,
96    /// metrics, or diagnostics after the registry mutation is complete.
97    pub fn prune_stale(&mut self) -> Vec<BackendKey> {
98        let mut removed = Vec::new();
99        self.entries.retain(|key, handle| {
100            let alive = handle.is_alive();
101            if !alive {
102                removed.push(key.clone());
103            }
104            alive
105        });
106        removed
107    }
108
109    /// Return Hello negotiation metadata for one registered backend.
110    pub fn registered_backend_for(
111        &self,
112        instance: &BrokerInstanceKey,
113        service_definition: &ServiceDefinition,
114        service_version: &str,
115    ) -> Option<RegisteredBackend> {
116        let handle = self.get(
117            instance,
118            &service_definition.service_name,
119            service_version,
120        )?;
121        Some(RegisteredBackend {
122            service_definition: service_definition.clone(),
123            daemon_version: handle.service_version.clone(),
124            backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
125            server_capabilities: 0,
126        })
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
133    use crate::broker::protocol::Endpoint;
134
135    use super::*;
136
137    fn handle(service_name: &str, version: &str, pid: u32) -> BackendHandle {
138        let endpoint = Endpoint {
139            namespace_id: "shared".into(),
140            path: format!("rpb-v1-test-{service_name}-{version}"),
141        };
142        let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
143        daemon.pid = pid;
144
145        BackendHandle {
146            service_name: service_name.into(),
147            service_version: version.into(),
148            daemon_process: daemon,
149            #[cfg(unix)]
150            pid_handle: None,
151            #[cfg(windows)]
152            process_handle: None,
153        }
154    }
155
156    #[test]
157    fn prune_stale_removes_dead_handles_and_keeps_live_ones() {
158        let mut registry = BackendRegistry::new();
159        let live_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
160        let dead_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.21");
161
162        registry.insert(
163            live_key.instance.clone(),
164            handle(&live_key.service_name, &live_key.service_version, std::process::id()),
165        );
166        registry.insert(
167            dead_key.instance.clone(),
168            handle(&dead_key.service_name, &dead_key.service_version, u32::MAX),
169        );
170
171        let removed = registry.prune_stale();
172
173        assert_eq!(removed, vec![dead_key.clone()]);
174        assert!(registry
175            .get(
176                &live_key.instance,
177                &live_key.service_name,
178                &live_key.service_version
179            )
180            .is_some());
181        assert!(registry
182            .get(
183                &dead_key.instance,
184                &dead_key.service_name,
185                &dead_key.service_version
186            )
187            .is_none());
188    }
189}