Skip to main content

running_process/broker/
manifest.rs

1//! CacheManifest persistence and central-registry helpers.
2//!
3//! Phase 2 of #228 (#231). The broker and standalone cleanup tool both
4//! use this module. Manifests are prost-encoded protobuf and carry a
5//! `self_sha256` digest over the encoded manifest with that field
6//! cleared.
7
8use std::fs::{self, OpenOptions};
9use std::io::{self, Write};
10use std::path::{Path, PathBuf};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13#[cfg(not(windows))]
14use std::fs::File;
15
16use prost::Message;
17use sha2::{Digest, Sha256};
18
19use crate::broker::host_identity;
20use crate::broker::lifecycle::names::{validate_service_name, validate_version, PipePathError};
21use crate::broker::protocol::{CacheManifest, HostIdentity};
22use crate::broker::secure_dir;
23
24/// Filename written inside each daemon cache root.
25pub const ROOT_MANIFEST_FILE: &str = ".running-process-manifest.pb";
26
27/// Stable v1 manifest media type.
28pub const CACHE_MANIFEST_MEDIA_TYPE: &str = "application/vnd.running-process.cache-manifest.v1";
29
30/// Highest manifest schema this crate understands.
31pub const SUPPORTED_MANIFEST_SCHEMA_VERSION: u32 = 1;
32
33/// Errors returned by manifest persistence and validation.
34#[derive(Debug, thiserror::Error)]
35pub enum ManifestError {
36    /// Filesystem operation failed.
37    #[error("manifest I/O failed: {0}")]
38    Io(#[from] io::Error),
39    /// Protobuf decode failed.
40    #[error("manifest protobuf decode failed: {0}")]
41    Decode(#[from] prost::DecodeError),
42    /// Protobuf encode failed.
43    #[error("manifest protobuf encode failed: {0}")]
44    Encode(#[from] prost::EncodeError),
45    /// The manifest's self_sha256 digest did not match its content.
46    #[error("manifest self_sha256 mismatch")]
47    Corruption,
48    /// The manifest uses a newer schema version than this crate supports.
49    #[error("manifest schema too new: got {got}, supported {supported}")]
50    SchemaTooNew {
51        /// Manifest schema version read from disk.
52        got: u32,
53        /// Maximum schema version this crate can read.
54        supported: u32,
55    },
56    /// Service/version validation failed while deriving a registry path.
57    #[error(transparent)]
58    InvalidName(#[from] PipePathError),
59    /// A path had no parent directory.
60    #[error("manifest path has no parent: {0}")]
61    MissingParent(PathBuf),
62    /// Central-registry permissions are too broad.
63    #[error("central manifest registry has insecure permissions: {0}")]
64    InsecureRegistry(PathBuf),
65}
66
67/// Result of scanning one central-registry entry.
68#[derive(Debug)]
69pub struct ManifestScanEntry {
70    /// Full path to the manifest file.
71    pub path: PathBuf,
72    /// Read result for that path.
73    pub result: Result<CacheManifest, ManifestError>,
74}
75
76/// Write `<cache_root>/.running-process-manifest.pb` atomically.
77pub fn write_to_root(cache_root: &Path, manifest: &CacheManifest) -> Result<(), ManifestError> {
78    fs::create_dir_all(cache_root)?;
79    secure_dir::ensure_private_dir(cache_root)?;
80    let target = cache_root.join(ROOT_MANIFEST_FILE);
81    write_manifest_file(&target, manifest)
82}
83
84/// Write `<central_registry>/{service}-{version}.pb` atomically.
85pub fn write_to_central(
86    service_name: &str,
87    version: &str,
88    manifest: &CacheManifest,
89) -> Result<PathBuf, ManifestError> {
90    let dir = central_registry_dir();
91    write_to_central_in_dir(&dir, service_name, version, manifest)
92}
93
94/// Testable variant of [`write_to_central`] with an explicit registry dir.
95pub fn write_to_central_in_dir(
96    registry_dir: &Path,
97    service_name: &str,
98    version: &str,
99    manifest: &CacheManifest,
100) -> Result<PathBuf, ManifestError> {
101    ensure_central_registry_dir(registry_dir)?;
102    let target = central_manifest_path(registry_dir, service_name, version)?;
103    write_manifest_file(&target, manifest)?;
104    Ok(target)
105}
106
107/// Read and integrity-verify a CacheManifest.
108pub fn read_manifest(path: &Path) -> Result<CacheManifest, ManifestError> {
109    let bytes = fs::read(path)?;
110    let manifest = CacheManifest::decode(bytes.as_slice())?;
111    verify_schema(&manifest)?;
112    verify_self_sha256(&manifest)?;
113    Ok(manifest)
114}
115
116/// Enumerate parseable manifests for this host and boot.
117///
118/// Corrupt or stale manifests are skipped. Use [`scan_central`] when
119/// callers need error details.
120pub fn enumerate_central(registry_dir: &Path) -> Vec<CacheManifest> {
121    let current_host = host_identity::current();
122    enumerate_central_for_host(registry_dir, &current_host)
123}
124
125/// Testable variant of [`enumerate_central`] with an explicit current host.
126pub fn enumerate_central_for_host(
127    registry_dir: &Path,
128    current_host: &HostIdentity,
129) -> Vec<CacheManifest> {
130    scan_central(registry_dir)
131        .into_iter()
132        .filter_map(|entry| match entry.result {
133            Ok(manifest) if manifest_matches_host(&manifest, current_host) => Some(manifest),
134            _ => None,
135        })
136        .collect()
137}
138
139/// Scan every `.pb` file in a registry and keep per-file errors.
140pub fn scan_central(registry_dir: &Path) -> Vec<ManifestScanEntry> {
141    match secure_dir::private_dir_permissions_are_private(registry_dir) {
142        Ok(true) => {}
143        Ok(false) => {
144            return vec![ManifestScanEntry {
145                path: registry_dir.to_path_buf(),
146                result: Err(ManifestError::InsecureRegistry(registry_dir.to_path_buf())),
147            }];
148        }
149        Err(_) if !registry_dir.exists() => return Vec::new(),
150        Err(err) => {
151            return vec![ManifestScanEntry {
152                path: registry_dir.to_path_buf(),
153                result: Err(ManifestError::Io(err)),
154            }];
155        }
156    }
157
158    let read_dir = match fs::read_dir(registry_dir) {
159        Ok(read_dir) => read_dir,
160        Err(_) => return Vec::new(),
161    };
162
163    let mut out = Vec::new();
164    for entry in read_dir.flatten() {
165        let path = entry.path();
166        if path.extension().and_then(|s| s.to_str()) != Some("pb") {
167            continue;
168        }
169        let result = read_manifest(&path);
170        out.push(ManifestScanEntry { path, result });
171    }
172    out.sort_by(|a, b| a.path.cmp(&b.path));
173    out
174}
175
176/// Return the platform central-registry directory.
177///
178/// `RUNNING_PROCESS_MANIFEST_DIR` is honored as a test/development
179/// override. Production callers should leave it unset.
180pub fn central_registry_dir() -> PathBuf {
181    if let Some(path) = std::env::var_os("RUNNING_PROCESS_MANIFEST_DIR") {
182        return PathBuf::from(path);
183    }
184
185    #[cfg(windows)]
186    {
187        dirs::data_dir()
188            .unwrap_or_else(|| PathBuf::from(r"C:\ProgramData"))
189            .join("running-process")
190            .join("manifests")
191    }
192    #[cfg(target_os = "macos")]
193    {
194        dirs::home_dir()
195            .unwrap_or_else(std::env::temp_dir)
196            .join("Library")
197            .join("Application Support")
198            .join("running-process")
199            .join("manifests")
200    }
201    #[cfg(all(unix, not(target_os = "macos")))]
202    {
203        if let Some(data_home) = std::env::var_os("XDG_DATA_HOME") {
204            PathBuf::from(data_home)
205                .join("running-process")
206                .join("manifests")
207        } else {
208            dirs::home_dir()
209                .unwrap_or_else(std::env::temp_dir)
210                .join(".local")
211                .join("share")
212                .join("running-process")
213                .join("manifests")
214        }
215    }
216}
217
218/// Ensure the central-registry directory exists with private permissions.
219pub fn ensure_central_registry_dir(path: &Path) -> Result<(), ManifestError> {
220    secure_dir::ensure_private_dir(path)?;
221    if !secure_dir::private_dir_permissions_are_private(path)? {
222        return Err(ManifestError::InsecureRegistry(path.to_path_buf()));
223    }
224    Ok(())
225}
226
227/// Compute the central-registry path for one service/version manifest.
228pub fn central_manifest_path(
229    registry_dir: &Path,
230    service_name: &str,
231    version: &str,
232) -> Result<PathBuf, ManifestError> {
233    validate_service_name(service_name)?;
234    validate_version(version)?;
235    Ok(registry_dir.join(format!("{service_name}-{version}.pb")))
236}
237
238/// Clone `manifest`, fill schema/media/hash fields, and return the copy.
239pub fn manifest_with_self_sha256(manifest: &CacheManifest) -> Result<CacheManifest, ManifestError> {
240    let mut out = manifest.clone();
241    out.manifest_schema_version = SUPPORTED_MANIFEST_SCHEMA_VERSION;
242    if out.media_type.is_empty() {
243        out.media_type = CACHE_MANIFEST_MEDIA_TYPE.to_string();
244    }
245    out.self_sha256.clear();
246    let digest = sha256_for_manifest(&out)?;
247    out.self_sha256 = digest.to_vec();
248    Ok(out)
249}
250
251/// Compute the SHA-256 digest with `self_sha256` cleared.
252pub fn sha256_for_manifest(manifest: &CacheManifest) -> Result<[u8; 32], ManifestError> {
253    let mut clone = manifest.clone();
254    clone.self_sha256.clear();
255    let mut bytes = Vec::new();
256    clone.encode(&mut bytes)?;
257    let digest = Sha256::digest(&bytes);
258    let mut out = [0_u8; 32];
259    out.copy_from_slice(&digest);
260    Ok(out)
261}
262
263fn write_manifest_file(path: &Path, manifest: &CacheManifest) -> Result<(), ManifestError> {
264    let manifest = manifest_with_self_sha256(manifest)?;
265    let mut bytes = Vec::new();
266    manifest.encode(&mut bytes)?;
267    atomic_write(path, &bytes)
268}
269
270fn verify_schema(manifest: &CacheManifest) -> Result<(), ManifestError> {
271    if manifest.manifest_schema_version > SUPPORTED_MANIFEST_SCHEMA_VERSION {
272        return Err(ManifestError::SchemaTooNew {
273            got: manifest.manifest_schema_version,
274            supported: SUPPORTED_MANIFEST_SCHEMA_VERSION,
275        });
276    }
277    Ok(())
278}
279
280fn verify_self_sha256(manifest: &CacheManifest) -> Result<(), ManifestError> {
281    if manifest.self_sha256.len() != 32 {
282        return Err(ManifestError::Corruption);
283    }
284    let expected = sha256_for_manifest(manifest)?;
285    if manifest.self_sha256.as_slice() != expected {
286        return Err(ManifestError::Corruption);
287    }
288    Ok(())
289}
290
291fn manifest_matches_host(manifest: &CacheManifest, current_host: &HostIdentity) -> bool {
292    let Some(host) = manifest.host.as_ref() else {
293        return true;
294    };
295    (host.machine_id.is_empty() || host.machine_id == current_host.machine_id)
296        && (host.boot_id.is_empty() || host.boot_id == current_host.boot_id)
297}
298
299fn atomic_write(path: &Path, bytes: &[u8]) -> Result<(), ManifestError> {
300    let parent = path
301        .parent()
302        .ok_or_else(|| ManifestError::MissingParent(path.to_path_buf()))?;
303    fs::create_dir_all(parent)?;
304    let tmp = temp_path_for(path);
305
306    let write_result = (|| -> Result<(), ManifestError> {
307        let mut file = OpenOptions::new().write(true).create_new(true).open(&tmp)?;
308        file.write_all(bytes)?;
309        file.sync_all()?;
310        drop(file);
311        replace_file(&tmp, path)?;
312        sync_parent(parent)?;
313        Ok(())
314    })();
315
316    if write_result.is_err() {
317        let _ = fs::remove_file(&tmp);
318    }
319    write_result
320}
321
322fn temp_path_for(path: &Path) -> PathBuf {
323    let file_name = path
324        .file_name()
325        .and_then(|s| s.to_str())
326        .unwrap_or("manifest.pb");
327    let nanos = SystemTime::now()
328        .duration_since(UNIX_EPOCH)
329        .map(|d| d.as_nanos())
330        .unwrap_or(0);
331    path.with_file_name(format!(".{file_name}.tmp-{}-{nanos}", std::process::id()))
332}
333
334#[cfg(not(windows))]
335fn replace_file(tmp: &Path, target: &Path) -> io::Result<()> {
336    fs::rename(tmp, target)
337}
338
339#[cfg(windows)]
340fn replace_file(tmp: &Path, target: &Path) -> io::Result<()> {
341    use std::os::windows::ffi::OsStrExt;
342    use windows_sys::Win32::Storage::FileSystem::{ReplaceFileW, REPLACEFILE_WRITE_THROUGH};
343
344    if !target.exists() {
345        return fs::rename(tmp, target);
346    }
347
348    fn wide(path: &Path) -> Vec<u16> {
349        path.as_os_str()
350            .encode_wide()
351            .chain(std::iter::once(0))
352            .collect()
353    }
354
355    let target_w = wide(target);
356    let tmp_w = wide(tmp);
357    let ok = unsafe {
358        ReplaceFileW(
359            target_w.as_ptr(),
360            tmp_w.as_ptr(),
361            std::ptr::null(),
362            REPLACEFILE_WRITE_THROUGH,
363            std::ptr::null_mut(),
364            std::ptr::null_mut(),
365        )
366    };
367    if ok == 0 {
368        Err(io::Error::last_os_error())
369    } else {
370        Ok(())
371    }
372}
373
374#[cfg(not(windows))]
375fn sync_parent(parent: &Path) -> io::Result<()> {
376    File::open(parent)?.sync_all()
377}
378
379#[cfg(windows)]
380fn sync_parent(_parent: &Path) -> io::Result<()> {
381    Ok(())
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use crate::broker::protocol::Operation;
388
389    fn sample_manifest() -> CacheManifest {
390        let host = host_identity::current();
391        CacheManifest {
392            manifest_schema_version: 1,
393            media_type: CACHE_MANIFEST_MEDIA_TYPE.to_string(),
394            self_sha256: Vec::new(),
395            host: Some(host),
396            current_operation: Some(Operation {
397                kind: 0,
398                started_at_unix_ms: 1,
399                expected_done_unix_ms: 0,
400            }),
401            valid_until_unix_ms: 0,
402            service_name: "zccache".to_string(),
403            service_version: "1.2.3".to_string(),
404            broker_envelope_version: "v1".to_string(),
405            created_at_unix_ms: 1,
406            last_active_unix_ms: 2,
407            roots: Vec::new(),
408            current_daemon: None,
409            cleanup_policy: None,
410            broker_instance: "shared".to_string(),
411            depends_on: Vec::new(),
412            provides: Vec::new(),
413            observability: None,
414            bundle_id: "bundle".to_string(),
415        }
416    }
417
418    #[test]
419    fn self_hash_roundtrip() {
420        let manifest = manifest_with_self_sha256(&sample_manifest()).unwrap();
421        assert_eq!(manifest.self_sha256.len(), 32);
422        verify_self_sha256(&manifest).unwrap();
423    }
424
425    #[test]
426    fn central_path_validates_inputs() {
427        let dir = Path::new("/tmp/registry");
428        assert!(central_manifest_path(dir, "zccache", "1.2.3").is_ok());
429        assert!(central_manifest_path(dir, "Zccache", "1.2.3").is_err());
430        assert!(central_manifest_path(dir, "zccache", "../../../evil").is_err());
431    }
432
433    #[test]
434    fn central_registry_permissions_are_private_after_ensure() {
435        let tmp = tempfile::tempdir().unwrap();
436        let registry = tmp.path().join("registry");
437        ensure_central_registry_dir(&registry).unwrap();
438        assert!(secure_dir::private_dir_permissions_are_private(&registry).unwrap());
439    }
440}