Skip to main content

running_process/broker/server/
backend_registry.rs

1//! Verified backend registry keyed by broker instance, service, and version.
2
3use std::collections::HashMap;
4
5use crate::broker::backend_handle::BackendHandle;
6use crate::broker::protocol::ServiceDefinition;
7use crate::broker::server::hello_handler::RegisteredBackend;
8use crate::broker::server::instance::BrokerInstanceKey;
9
10/// Lookup key for one backend process.
11#[derive(Clone, Debug, PartialEq, Eq, Hash)]
12pub struct BackendKey {
13    /// Broker trust-domain instance.
14    pub instance: BrokerInstanceKey,
15    /// Logical service name.
16    pub service_name: String,
17    /// Service version.
18    pub service_version: String,
19}
20
21impl BackendKey {
22    /// Build a key from an instance and service tuple.
23    pub fn new(
24        instance: BrokerInstanceKey,
25        service_name: impl Into<String>,
26        service_version: impl Into<String>,
27    ) -> Self {
28        Self {
29            instance,
30            service_name: service_name.into(),
31            service_version: service_version.into(),
32        }
33    }
34}
35
36/// In-memory table of verified backend handles.
37#[derive(Default)]
38pub struct BackendRegistry {
39    entries: HashMap<BackendKey, BackendHandle>,
40}
41
42impl BackendRegistry {
43    /// Create an empty registry.
44    pub fn new() -> Self {
45        Self {
46            entries: HashMap::new(),
47        }
48    }
49
50    /// Number of registered backend handles.
51    pub fn len(&self) -> usize {
52        self.entries.len()
53    }
54
55    /// Return true when the registry has no entries.
56    pub fn is_empty(&self) -> bool {
57        self.entries.is_empty()
58    }
59
60    /// Insert or replace one verified backend handle.
61    pub fn insert(
62        &mut self,
63        instance: BrokerInstanceKey,
64        handle: BackendHandle,
65    ) -> Option<BackendHandle> {
66        let key = BackendKey::new(
67            instance,
68            handle.service_name.clone(),
69            handle.service_version.clone(),
70        );
71        self.entries.insert(key, handle)
72    }
73
74    /// Return one handle by exact instance/service/version key.
75    pub fn get(
76        &self,
77        instance: &BrokerInstanceKey,
78        service_name: &str,
79        service_version: &str,
80    ) -> Option<&BackendHandle> {
81        self.entries.get(&BackendKey::new(
82            instance.clone(),
83            service_name,
84            service_version,
85        ))
86    }
87
88    /// Iterate over all registered backend handles.
89    pub fn iter(&self) -> impl Iterator<Item = (&BackendKey, &BackendHandle)> {
90        self.entries.iter()
91    }
92
93    /// Remove backend handles whose verified process is no longer alive.
94    ///
95    /// Returns the removed keys so the lifecycle monitor can emit events,
96    /// metrics, or diagnostics after the registry mutation is complete.
97    pub fn prune_stale(&mut self) -> Vec<BackendKey> {
98        let mut removed = Vec::new();
99        self.entries.retain(|key, handle| {
100            let alive = handle.is_alive();
101            if !alive {
102                removed.push(key.clone());
103            }
104            alive
105        });
106        removed
107    }
108
109    /// Return Hello negotiation metadata for one registered backend.
110    pub fn registered_backend_for(
111        &self,
112        instance: &BrokerInstanceKey,
113        service_definition: &ServiceDefinition,
114        service_version: &str,
115    ) -> Option<RegisteredBackend> {
116        let handle = self.get(instance, &service_definition.service_name, service_version)?;
117        Some(RegisteredBackend {
118            service_definition: service_definition.clone(),
119            daemon_version: handle.service_version.clone(),
120            backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
121            server_capabilities: 0,
122        })
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
129    use crate::broker::protocol::Endpoint;
130
131    use super::*;
132
133    fn handle(service_name: &str, version: &str, pid: u32) -> BackendHandle {
134        let endpoint = Endpoint {
135            namespace_id: "shared".into(),
136            path: format!("rpb-v1-test-{service_name}-{version}"),
137        };
138        let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
139        daemon.pid = pid;
140
141        BackendHandle {
142            service_name: service_name.into(),
143            service_version: version.into(),
144            daemon_process: daemon,
145            #[cfg(unix)]
146            pid_handle: None,
147            #[cfg(windows)]
148            process_handle: None,
149        }
150    }
151
152    #[test]
153    fn prune_stale_removes_dead_handles_and_keeps_live_ones() {
154        let mut registry = BackendRegistry::new();
155        let live_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
156        let dead_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.21");
157
158        registry.insert(
159            live_key.instance.clone(),
160            handle(
161                &live_key.service_name,
162                &live_key.service_version,
163                std::process::id(),
164            ),
165        );
166        registry.insert(
167            dead_key.instance.clone(),
168            handle(&dead_key.service_name, &dead_key.service_version, u32::MAX),
169        );
170
171        let removed = registry.prune_stale();
172
173        assert_eq!(removed, vec![dead_key.clone()]);
174        assert!(registry
175            .get(
176                &live_key.instance,
177                &live_key.service_name,
178                &live_key.service_version
179            )
180            .is_some());
181        assert!(registry
182            .get(
183                &dead_key.instance,
184                &dead_key.service_name,
185                &dead_key.service_version
186            )
187            .is_none());
188    }
189}