running_process/broker/protocol_v2/
loader.rs1use std::fs;
27use std::path::{Path, PathBuf};
28
29use prost::Message as _;
30
31use crate::broker::lifecycle::names::validate_service_name;
32use crate::broker::server::service_def_loader::ServiceDefinitionError;
33
34use super::io::{service_definition_dir_v2, service_definition_path_v2, SERVICE_DEF_V2_EXTENSION};
35use super::ServiceDefinition;
36
37#[derive(Clone, Debug)]
43pub struct ServiceDefinitionLoader {
44 root: PathBuf,
45}
46
47impl ServiceDefinitionLoader {
48 pub fn new(root: impl Into<PathBuf>) -> Self {
50 Self { root: root.into() }
51 }
52
53 #[must_use]
56 pub fn default_root() -> Self {
57 Self::new(service_definition_dir_v2())
58 }
59
60 #[must_use]
62 pub fn root(&self) -> &Path {
63 &self.root
64 }
65
66 pub fn load(&self, service_name: &str) -> Result<ServiceDefinition, ServiceDefinitionError> {
87 let path = service_definition_path_v2(&self.root, service_name)?;
88 let bytes = fs::read(&path)?;
89 let definition = ServiceDefinition::decode(bytes.as_slice())?;
90 validate_loaded_definition(&definition, service_name)?;
91 Ok(definition)
92 }
93
94 pub fn reload(&self, service_name: &str) -> Result<ServiceDefinition, ServiceDefinitionError> {
100 self.load(service_name)
101 }
102
103 pub fn lookup_or_reload(
111 &self,
112 service_name: &str,
113 ) -> Result<ServiceDefinition, ServiceDefinitionError> {
114 self.load(service_name)
115 }
116
117 #[must_use]
124 pub fn enumerate(&self) -> Vec<ServiceDefinition> {
125 self.scan()
126 .into_iter()
127 .filter_map(|entry| entry.result.ok())
128 .collect()
129 }
130
131 #[must_use]
136 pub fn scan(&self) -> Vec<ServiceDefinitionScanEntry> {
137 let read_dir = match fs::read_dir(&self.root) {
138 Ok(rd) => rd,
139 Err(_) => return Vec::new(),
140 };
141
142 let mut out: Vec<ServiceDefinitionScanEntry> = Vec::new();
143 for entry in read_dir.flatten() {
144 let path = entry.path();
145 let name = match path.file_name().and_then(|n| n.to_str()) {
150 Some(n) => n,
151 None => continue,
152 };
153 let suffix = format!(".{SERVICE_DEF_V2_EXTENSION}");
154 let Some(stem) = name.strip_suffix(&suffix) else {
155 continue;
156 };
157 let result = self.load_from_path(&path, stem);
161 out.push(ServiceDefinitionScanEntry { path, result });
162 }
163 out.sort_by(|a, b| a.path.cmp(&b.path));
164 out
165 }
166
167 fn load_from_path(
168 &self,
169 path: &Path,
170 filename_service: &str,
171 ) -> Result<ServiceDefinition, ServiceDefinitionError> {
172 let bytes = fs::read(path)?;
173 let definition = ServiceDefinition::decode(bytes.as_slice())?;
174 validate_loaded_definition(&definition, filename_service)?;
175 Ok(definition)
176 }
177}
178
179#[derive(Debug)]
181pub struct ServiceDefinitionScanEntry {
182 pub path: PathBuf,
184 pub result: Result<ServiceDefinition, ServiceDefinitionError>,
186}
187
188fn validate_loaded_definition(
189 definition: &ServiceDefinition,
190 expected_service: &str,
191) -> Result<(), ServiceDefinitionError> {
192 validate_service_name(expected_service)?;
193 if definition.service_name != expected_service {
194 return Err(ServiceDefinitionError::ServiceNameMismatch {
195 requested: expected_service.to_owned(),
196 actual: definition.service_name.clone(),
197 });
198 }
199 Ok(())
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::broker::protocol_v2::{BrokerIsolation, ServiceDefinitionBuilder};
206 use tempfile::tempdir;
207
208 fn install_test_servicedef(root: &Path, name: &str) -> PathBuf {
209 ServiceDefinitionBuilder::shared_broker(name, "/usr/bin/zccache-daemon")
210 .min_version("1.0.0")
211 .label("env", "test")
212 .install_in(root)
213 .expect("install_in")
214 }
215
216 #[test]
217 fn load_round_trips_an_installed_servicedef() {
218 let dir = tempdir().expect("tempdir");
219 install_test_servicedef(dir.path(), "zccache");
220
221 let loader = ServiceDefinitionLoader::new(dir.path());
222 let loaded = loader.load("zccache").expect("load");
223
224 assert_eq!(loaded.service_name, "zccache");
225 assert_eq!(loaded.binary_path, "/usr/bin/zccache-daemon");
226 assert_eq!(loaded.isolation, BrokerIsolation::SharedBroker as i32);
227 assert_eq!(loaded.min_version, "1.0.0");
228 assert_eq!(loaded.labels.get("env").map(String::as_str), Some("test"));
229 }
230
231 #[test]
232 fn load_returns_io_error_for_missing_file() {
233 let dir = tempdir().expect("tempdir");
234 let loader = ServiceDefinitionLoader::new(dir.path());
235 let err = loader.load("no-such-service").expect_err("must Err");
236 assert!(
237 matches!(err, ServiceDefinitionError::Io(_)),
238 "missing file → Io, got: {err:?}"
239 );
240 }
241
242 #[test]
243 fn load_rejects_invalid_service_name() {
244 let dir = tempdir().expect("tempdir");
245 let loader = ServiceDefinitionLoader::new(dir.path());
246 for bad in ["BAD-Caps", "", "a/b", "x\0y"] {
247 let err = loader.load(bad).expect_err("must Err");
248 assert!(
249 matches!(err, ServiceDefinitionError::InvalidName(_)),
250 "{bad:?} → InvalidName, got: {err:?}"
251 );
252 }
253 }
254
255 #[test]
256 fn load_detects_filename_service_mismatch() {
257 let dir = tempdir().expect("tempdir");
258 install_test_servicedef(dir.path(), "zccache");
260 let original = dir.path().join("zccache.servicedef.v2");
263 let renamed = dir.path().join("other.servicedef.v2");
264 fs::rename(&original, &renamed).expect("rename");
265
266 let loader = ServiceDefinitionLoader::new(dir.path());
267 let err = loader.load("other").expect_err("mismatch must Err");
268 assert!(
269 matches!(
270 err,
271 ServiceDefinitionError::ServiceNameMismatch { ref requested, ref actual }
272 if requested == "other" && actual == "zccache"
273 ),
274 "expected ServiceNameMismatch, got: {err:?}"
275 );
276 }
277
278 #[test]
279 fn load_rejects_corrupt_protobuf_bytes() {
280 let dir = tempdir().expect("tempdir");
281 let path = dir.path().join("badproto.servicedef.v2");
283 fs::write(&path, b"\x01\x02\x03\x04\x05\xFF\xFF\xFF\xFF\x00").expect("write");
284
285 let loader = ServiceDefinitionLoader::new(dir.path());
286 let result = loader.load("badproto");
287 assert!(
291 matches!(
292 result,
293 Err(ServiceDefinitionError::Decode(_))
294 | Err(ServiceDefinitionError::ServiceNameMismatch { .. })
295 ),
296 "corrupt proto must Err, got: {result:?}"
297 );
298 }
299
300 #[test]
301 fn enumerate_returns_every_parseable_servicedef() {
302 let dir = tempdir().expect("tempdir");
303 install_test_servicedef(dir.path(), "zccache");
304 install_test_servicedef(dir.path(), "fbuild");
305 install_test_servicedef(dir.path(), "soldr");
306
307 let loader = ServiceDefinitionLoader::new(dir.path());
308 let defs = loader.enumerate();
309 assert_eq!(defs.len(), 3);
310 let names: std::collections::HashSet<String> =
311 defs.iter().map(|d| d.service_name.clone()).collect();
312 assert!(names.contains("zccache"));
313 assert!(names.contains("fbuild"));
314 assert!(names.contains("soldr"));
315 }
316
317 #[test]
318 fn enumerate_skips_corrupt_files_silently() {
319 let dir = tempdir().expect("tempdir");
320 install_test_servicedef(dir.path(), "zccache");
321 fs::write(
323 dir.path().join("corrupt.servicedef.v2"),
324 b"\xFF\xFF\xFF\xFF",
325 )
326 .expect("write corrupt");
327
328 let loader = ServiceDefinitionLoader::new(dir.path());
329 let defs = loader.enumerate();
330 assert_eq!(defs.len(), 1, "only zccache should be returned");
331 assert_eq!(defs[0].service_name, "zccache");
332 }
333
334 #[test]
335 fn scan_surfaces_per_file_errors() {
336 let dir = tempdir().expect("tempdir");
337 install_test_servicedef(dir.path(), "zccache");
338 fs::write(
339 dir.path().join("corrupt.servicedef.v2"),
340 b"\xFF\xFF\xFF\xFF",
341 )
342 .expect("write corrupt");
343
344 let loader = ServiceDefinitionLoader::new(dir.path());
345 let entries = loader.scan();
346 assert_eq!(entries.len(), 2);
347 let ok_count = entries.iter().filter(|e| e.result.is_ok()).count();
350 let err_count = entries.iter().filter(|e| e.result.is_err()).count();
351 assert_eq!(ok_count, 1);
352 assert_eq!(err_count, 1);
353 }
354
355 #[test]
356 fn enumerate_ignores_files_with_wrong_extension() {
357 let dir = tempdir().expect("tempdir");
358 install_test_servicedef(dir.path(), "zccache");
359 fs::write(dir.path().join("legacy.servicedef"), b"junk").expect("write legacy");
361 fs::write(dir.path().join("readme.txt"), b"hello").expect("write readme");
363
364 let loader = ServiceDefinitionLoader::new(dir.path());
365 let defs = loader.enumerate();
366 assert_eq!(
367 defs.len(),
368 1,
369 "only the .servicedef.v2 file should be loaded"
370 );
371 assert_eq!(defs[0].service_name, "zccache");
372 }
373
374 #[test]
375 fn enumerate_handles_missing_root_gracefully() {
376 let loader = ServiceDefinitionLoader::new("/nonexistent/path/to/services");
377 let defs = loader.enumerate();
378 assert!(defs.is_empty(), "missing root → empty result");
379 let entries = loader.scan();
380 assert!(entries.is_empty(), "missing root → empty scan");
381 }
382
383 #[test]
384 fn reload_is_equivalent_to_load() {
385 let dir = tempdir().expect("tempdir");
386 install_test_servicedef(dir.path(), "zccache");
387 let loader = ServiceDefinitionLoader::new(dir.path());
388 let a = loader.load("zccache").expect("load");
389 let b = loader.reload("zccache").expect("reload");
390 let c = loader.lookup_or_reload("zccache").expect("lookup_or_reload");
391 assert_eq!(a, b);
392 assert_eq!(b, c);
393 }
394}