Skip to main content

spool/lifecycle_store/
projection.rs

1//! Projection / snapshot cache / fingerprint 管理。
2//! 读路径:内存缓存 → 磁盘快照 → 重建并写回快照。
3
4use super::{LedgerEntry, LifecycleProjection, LifecycleStore, internal::ensure_parent_dir};
5use once_cell::sync::Lazy;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::fs;
9use std::path::{Path, PathBuf};
10use std::sync::Mutex;
11
12pub(super) const PROJECTION_SNAPSHOT_FILE_NAME: &str = "memory-ledger.latest-state.json";
13const PROJECTION_SNAPSHOT_SCHEMA_VERSION: &str = "memory-projection-cache.v1";
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16enum LedgerFingerprint {
17    Missing,
18    Existing { len: u64, modified_nanos: u128 },
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22enum ProjectionSource {
23    Memory,
24    Persistent,
25    Rebuilt,
26}
27
28#[derive(Debug, Clone)]
29struct ProjectionCacheEntry {
30    fingerprint: LedgerFingerprint,
31    projection: LifecycleProjection,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35struct ProjectionSnapshotFile {
36    schema_version: String,
37    fingerprint: LedgerFingerprint,
38    latest_entries: Vec<LedgerEntry>,
39}
40
41static PROJECTION_CACHE: Lazy<Mutex<HashMap<PathBuf, ProjectionCacheEntry>>> =
42    Lazy::new(|| Mutex::new(HashMap::new()));
43
44pub fn read_projection(store: &LifecycleStore) -> anyhow::Result<LifecycleProjection> {
45    Ok(read_projection_internal(store)?.0)
46}
47
48fn read_projection_internal(
49    store: &LifecycleStore,
50) -> anyhow::Result<(LifecycleProjection, ProjectionSource)> {
51    let fingerprint = ledger_fingerprint(store.ledger_path())?;
52    {
53        let cache = PROJECTION_CACHE.lock().unwrap();
54        if let Some(entry) = cache.get(store.ledger_path())
55            && entry.fingerprint == fingerprint
56        {
57            return Ok((entry.projection.clone(), ProjectionSource::Memory));
58        }
59    }
60
61    let snapshot_path = store.projection_snapshot_path();
62    if let Some(projection) = read_projection_snapshot(snapshot_path.as_path(), &fingerprint)? {
63        let mut cache = PROJECTION_CACHE.lock().unwrap();
64        cache.insert(
65            store.ledger_path().to_path_buf(),
66            ProjectionCacheEntry {
67                fingerprint,
68                projection: projection.clone(),
69            },
70        );
71        return Ok((projection, ProjectionSource::Persistent));
72    }
73
74    let projection = LifecycleProjection::from_entries(store.read_all()?);
75    let mut cache = PROJECTION_CACHE.lock().unwrap();
76    cache.insert(
77        store.ledger_path().to_path_buf(),
78        ProjectionCacheEntry {
79            fingerprint: fingerprint.clone(),
80            projection: projection.clone(),
81        },
82    );
83    write_projection_snapshot(snapshot_path.as_path(), &fingerprint, &projection)?;
84    Ok((projection, ProjectionSource::Rebuilt))
85}
86
87pub(super) fn invalidate_projection_cache(path: &Path, snapshot_path: &Path) {
88    PROJECTION_CACHE.lock().unwrap().remove(path);
89    match fs::remove_file(snapshot_path) {
90        Ok(()) => {}
91        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
92        Err(_error) => {}
93    }
94}
95
96fn ledger_fingerprint(path: &Path) -> anyhow::Result<LedgerFingerprint> {
97    let metadata = match fs::metadata(path) {
98        Ok(metadata) => metadata,
99        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
100            return Ok(LedgerFingerprint::Missing);
101        }
102        Err(error) => return Err(error.into()),
103    };
104
105    let modified_nanos = metadata
106        .modified()
107        .ok()
108        .and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
109        .map(|duration| duration.as_nanos())
110        .unwrap_or_default();
111
112    Ok(LedgerFingerprint::Existing {
113        len: metadata.len(),
114        modified_nanos,
115    })
116}
117
118fn read_projection_snapshot(
119    path: &Path,
120    fingerprint: &LedgerFingerprint,
121) -> anyhow::Result<Option<LifecycleProjection>> {
122    let raw = match fs::read_to_string(path) {
123        Ok(raw) => raw,
124        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
125        Err(error) => return Err(error.into()),
126    };
127
128    let snapshot: ProjectionSnapshotFile = match serde_json::from_str(&raw) {
129        Ok(snapshot) => snapshot,
130        Err(_) => {
131            let _ = fs::remove_file(path);
132            return Ok(None);
133        }
134    };
135    if snapshot.schema_version != PROJECTION_SNAPSHOT_SCHEMA_VERSION {
136        let _ = fs::remove_file(path);
137        return Ok(None);
138    }
139    if &snapshot.fingerprint != fingerprint {
140        return Ok(None);
141    }
142    Ok(Some(LifecycleProjection::from_entries(
143        snapshot.latest_entries,
144    )))
145}
146
147fn write_projection_snapshot(
148    path: &Path,
149    fingerprint: &LedgerFingerprint,
150    projection: &LifecycleProjection,
151) -> anyhow::Result<()> {
152    ensure_parent_dir(path)?;
153    let snapshot = ProjectionSnapshotFile {
154        schema_version: PROJECTION_SNAPSHOT_SCHEMA_VERSION.to_string(),
155        fingerprint: fingerprint.clone(),
156        latest_entries: projection.latest_entries().to_vec(),
157    };
158    let temp_path = path.with_extension("tmp");
159    fs::write(&temp_path, serde_json::to_vec(&snapshot)?)?;
160    fs::rename(temp_path, path)?;
161    Ok(())
162}
163
164#[cfg(test)]
165pub(crate) fn clear_projection_cache() {
166    PROJECTION_CACHE.lock().unwrap().clear();
167}
168
169#[cfg(test)]
170pub(crate) fn read_projection_with_cache_hit(
171    store: &LifecycleStore,
172) -> anyhow::Result<(LifecycleProjection, bool)> {
173    let (projection, source) = read_projection_internal(store)?;
174    Ok((
175        projection,
176        matches!(
177            source,
178            ProjectionSource::Memory | ProjectionSource::Persistent
179        ),
180    ))
181}
182
183#[cfg(test)]
184pub(crate) fn read_projection_with_source(
185    store: &LifecycleStore,
186) -> anyhow::Result<(LifecycleProjection, String)> {
187    let (projection, source) = read_projection_internal(store)?;
188    Ok((
189        projection,
190        match source {
191            ProjectionSource::Memory => "memory".to_string(),
192            ProjectionSource::Persistent => "persistent".to_string(),
193            ProjectionSource::Rebuilt => "rebuilt".to_string(),
194        },
195    ))
196}