spool/lifecycle_store/
projection.rs1use super::{LedgerEntry, LifecycleProjection, LifecycleStore, internal::ensure_parent_dir};
5use once_cell::sync::Lazy;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::Mutex;
11
12pub(super) const PROJECTION_SNAPSHOT_FILE_NAME: &str = "memory-ledger.latest-state.json";
13const PROJECTION_SNAPSHOT_SCHEMA_VERSION: &str = "memory-projection-cache.v1";
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16enum LedgerFingerprint {
17 Missing,
18 Existing { len: u64, modified_nanos: u128 },
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum ProjectionSource {
23 Memory,
24 Persistent,
25 Rebuilt,
26}
27
28#[derive(Debug, Clone)]
29struct ProjectionCacheEntry {
30 fingerprint: LedgerFingerprint,
31 projection: LifecycleProjection,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35struct ProjectionSnapshotFile {
36 schema_version: String,
37 fingerprint: LedgerFingerprint,
38 latest_entries: Vec<LedgerEntry>,
39}
40
41static PROJECTION_CACHE: Lazy<Mutex<HashMap<PathBuf, ProjectionCacheEntry>>> =
42 Lazy::new(|| Mutex::new(HashMap::new()));
43
44pub fn read_projection(store: &LifecycleStore) -> anyhow::Result<LifecycleProjection> {
45 Ok(read_projection_internal(store)?.0)
46}
47
48fn read_projection_internal(
49 store: &LifecycleStore,
50) -> anyhow::Result<(LifecycleProjection, ProjectionSource)> {
51 let fingerprint = ledger_fingerprint(store.ledger_path())?;
52 {
53 let cache = PROJECTION_CACHE.lock().unwrap();
54 if let Some(entry) = cache.get(store.ledger_path())
55 && entry.fingerprint == fingerprint
56 {
57 return Ok((entry.projection.clone(), ProjectionSource::Memory));
58 }
59 }
60
61 let snapshot_path = store.projection_snapshot_path();
62 if let Some(projection) = read_projection_snapshot(snapshot_path.as_path(), &fingerprint)? {
63 let mut cache = PROJECTION_CACHE.lock().unwrap();
64 cache.insert(
65 store.ledger_path().to_path_buf(),
66 ProjectionCacheEntry {
67 fingerprint,
68 projection: projection.clone(),
69 },
70 );
71 return Ok((projection, ProjectionSource::Persistent));
72 }
73
74 let projection = LifecycleProjection::from_entries(store.read_all()?);
75 let mut cache = PROJECTION_CACHE.lock().unwrap();
76 cache.insert(
77 store.ledger_path().to_path_buf(),
78 ProjectionCacheEntry {
79 fingerprint: fingerprint.clone(),
80 projection: projection.clone(),
81 },
82 );
83 write_projection_snapshot(snapshot_path.as_path(), &fingerprint, &projection)?;
84 Ok((projection, ProjectionSource::Rebuilt))
85}
86
87pub(super) fn invalidate_projection_cache(path: &Path, snapshot_path: &Path) {
88 PROJECTION_CACHE.lock().unwrap().remove(path);
89 match fs::remove_file(snapshot_path) {
90 Ok(()) => {}
91 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
92 Err(_error) => {}
93 }
94}
95
96fn ledger_fingerprint(path: &Path) -> anyhow::Result<LedgerFingerprint> {
97 let metadata = match fs::metadata(path) {
98 Ok(metadata) => metadata,
99 Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
100 return Ok(LedgerFingerprint::Missing);
101 }
102 Err(error) => return Err(error.into()),
103 };
104
105 let modified_nanos = metadata
106 .modified()
107 .ok()
108 .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
109 .map(|duration| duration.as_nanos())
110 .unwrap_or_default();
111
112 Ok(LedgerFingerprint::Existing {
113 len: metadata.len(),
114 modified_nanos,
115 })
116}
117
118fn read_projection_snapshot(
119 path: &Path,
120 fingerprint: &LedgerFingerprint,
121) -> anyhow::Result<Option<LifecycleProjection>> {
122 let raw = match fs::read_to_string(path) {
123 Ok(raw) => raw,
124 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
125 Err(error) => return Err(error.into()),
126 };
127
128 let snapshot: ProjectionSnapshotFile = match serde_json::from_str(&raw) {
129 Ok(snapshot) => snapshot,
130 Err(_) => {
131 let _ = fs::remove_file(path);
132 return Ok(None);
133 }
134 };
135 if snapshot.schema_version != PROJECTION_SNAPSHOT_SCHEMA_VERSION {
136 let _ = fs::remove_file(path);
137 return Ok(None);
138 }
139 if &snapshot.fingerprint != fingerprint {
140 return Ok(None);
141 }
142 Ok(Some(LifecycleProjection::from_entries(
143 snapshot.latest_entries,
144 )))
145}
146
147fn write_projection_snapshot(
148 path: &Path,
149 fingerprint: &LedgerFingerprint,
150 projection: &LifecycleProjection,
151) -> anyhow::Result<()> {
152 ensure_parent_dir(path)?;
153 let snapshot = ProjectionSnapshotFile {
154 schema_version: PROJECTION_SNAPSHOT_SCHEMA_VERSION.to_string(),
155 fingerprint: fingerprint.clone(),
156 latest_entries: projection.latest_entries().to_vec(),
157 };
158 let temp_path = path.with_extension("tmp");
159 fs::write(&temp_path, serde_json::to_vec(&snapshot)?)?;
160 fs::rename(temp_path, path)?;
161 Ok(())
162}
163
164#[cfg(test)]
165pub(crate) fn clear_projection_cache() {
166 PROJECTION_CACHE.lock().unwrap().clear();
167}
168
169#[cfg(test)]
170pub(crate) fn read_projection_with_cache_hit(
171 store: &LifecycleStore,
172) -> anyhow::Result<(LifecycleProjection, bool)> {
173 let (projection, source) = read_projection_internal(store)?;
174 Ok((
175 projection,
176 matches!(
177 source,
178 ProjectionSource::Memory | ProjectionSource::Persistent
179 ),
180 ))
181}
182
183#[cfg(test)]
184pub(crate) fn read_projection_with_source(
185 store: &LifecycleStore,
186) -> anyhow::Result<(LifecycleProjection, String)> {
187 let (projection, source) = read_projection_internal(store)?;
188 Ok((
189 projection,
190 match source {
191 ProjectionSource::Memory => "memory".to_string(),
192 ProjectionSource::Persistent => "persistent".to_string(),
193 ProjectionSource::Rebuilt => "rebuilt".to_string(),
194 },
195 ))
196}