running_process/broker/server/
backend_registry.rs1use std::collections::HashMap;
4
5use crate::broker::backend_handle::BackendHandle;
6use crate::broker::protocol::ServiceDefinition;
7use crate::broker::server::hello_handler::RegisteredBackend;
8use crate::broker::server::instance::BrokerInstanceKey;
9
10#[derive(Clone, Debug, PartialEq, Eq, Hash)]
12pub struct BackendKey {
13 pub instance: BrokerInstanceKey,
15 pub service_name: String,
17 pub service_version: String,
19}
20
21impl BackendKey {
22 pub fn new(
24 instance: BrokerInstanceKey,
25 service_name: impl Into<String>,
26 service_version: impl Into<String>,
27 ) -> Self {
28 Self {
29 instance,
30 service_name: service_name.into(),
31 service_version: service_version.into(),
32 }
33 }
34}
35
36#[derive(Default)]
38pub struct BackendRegistry {
39 entries: HashMap<BackendKey, BackendHandle>,
40}
41
42impl BackendRegistry {
43 pub fn new() -> Self {
45 Self {
46 entries: HashMap::new(),
47 }
48 }
49
50 pub fn len(&self) -> usize {
52 self.entries.len()
53 }
54
55 pub fn is_empty(&self) -> bool {
57 self.entries.is_empty()
58 }
59
60 pub fn insert(
62 &mut self,
63 instance: BrokerInstanceKey,
64 handle: BackendHandle,
65 ) -> Option<BackendHandle> {
66 let key = BackendKey::new(
67 instance,
68 handle.service_name.clone(),
69 handle.service_version.clone(),
70 );
71 self.entries.insert(key, handle)
72 }
73
74 pub fn get(
76 &self,
77 instance: &BrokerInstanceKey,
78 service_name: &str,
79 service_version: &str,
80 ) -> Option<&BackendHandle> {
81 self.entries.get(&BackendKey::new(
82 instance.clone(),
83 service_name,
84 service_version,
85 ))
86 }
87
88 pub fn iter(&self) -> impl Iterator<Item = (&BackendKey, &BackendHandle)> {
90 self.entries.iter()
91 }
92
93 pub fn prune_stale(&mut self) -> Vec<BackendKey> {
98 let mut removed = Vec::new();
99 self.entries.retain(|key, handle| {
100 let alive = handle.is_alive();
101 if !alive {
102 removed.push(key.clone());
103 }
104 alive
105 });
106 removed
107 }
108
109 pub fn registered_backend_for(
111 &self,
112 instance: &BrokerInstanceKey,
113 service_definition: &ServiceDefinition,
114 service_version: &str,
115 ) -> Option<RegisteredBackend> {
116 let handle = self.get(instance, &service_definition.service_name, service_version)?;
117 Some(RegisteredBackend {
118 service_definition: service_definition.clone(),
119 daemon_version: handle.service_version.clone(),
120 backend_pipe: handle.daemon_process.ipc_endpoint.path.clone(),
121 server_capabilities: 0,
122 })
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use crate::broker::backend_handle::{BackendHandle, DaemonProcess};
129 use crate::broker::protocol::Endpoint;
130
131 use super::*;
132
133 fn handle(service_name: &str, version: &str, pid: u32) -> BackendHandle {
134 let endpoint = Endpoint {
135 namespace_id: "shared".into(),
136 path: format!("rpb-v1-test-{service_name}-{version}"),
137 };
138 let mut daemon = DaemonProcess::current_process(endpoint, Some(30)).unwrap();
139 daemon.pid = pid;
140
141 BackendHandle {
142 service_name: service_name.into(),
143 service_version: version.into(),
144 daemon_process: daemon,
145 #[cfg(unix)]
146 pid_handle: None,
147 #[cfg(windows)]
148 process_handle: None,
149 }
150 }
151
152 #[test]
153 fn prune_stale_removes_dead_handles_and_keeps_live_ones() {
154 let mut registry = BackendRegistry::new();
155 let live_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.20");
156 let dead_key = BackendKey::new(BrokerInstanceKey::Shared, "zccache", "1.11.21");
157
158 registry.insert(
159 live_key.instance.clone(),
160 handle(
161 &live_key.service_name,
162 &live_key.service_version,
163 std::process::id(),
164 ),
165 );
166 registry.insert(
167 dead_key.instance.clone(),
168 handle(&dead_key.service_name, &dead_key.service_version, u32::MAX),
169 );
170
171 let removed = registry.prune_stale();
172
173 assert_eq!(removed, vec![dead_key.clone()]);
174 assert!(registry
175 .get(
176 &live_key.instance,
177 &live_key.service_name,
178 &live_key.service_version
179 )
180 .is_some());
181 assert!(registry
182 .get(
183 &dead_key.instance,
184 &dead_key.service_name,
185 &dead_key.service_version
186 )
187 .is_none());
188 }
189}