use super::{LedgerEntry, LifecycleProjection, LifecycleStore, internal::ensure_parent_dir};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
pub(super) const PROJECTION_SNAPSHOT_FILE_NAME: &str = "memory-ledger.latest-state.json";
const PROJECTION_SNAPSHOT_SCHEMA_VERSION: &str = "memory-projection-cache.v1";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
enum LedgerFingerprint {
Missing,
Existing { len: u64, modified_nanos: u128 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProjectionSource {
Memory,
Persistent,
Rebuilt,
}
#[derive(Debug, Clone)]
struct ProjectionCacheEntry {
fingerprint: LedgerFingerprint,
projection: LifecycleProjection,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProjectionSnapshotFile {
schema_version: String,
fingerprint: LedgerFingerprint,
latest_entries: Vec<LedgerEntry>,
}
static PROJECTION_CACHE: Lazy<Mutex<HashMap<PathBuf, ProjectionCacheEntry>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub fn read_projection(store: &LifecycleStore) -> anyhow::Result<LifecycleProjection> {
Ok(read_projection_internal(store)?.0)
}
fn read_projection_internal(
store: &LifecycleStore,
) -> anyhow::Result<(LifecycleProjection, ProjectionSource)> {
let fingerprint = ledger_fingerprint(store.ledger_path())?;
{
let cache = PROJECTION_CACHE.lock().unwrap();
if let Some(entry) = cache.get(store.ledger_path())
&& entry.fingerprint == fingerprint
{
return Ok((entry.projection.clone(), ProjectionSource::Memory));
}
}
let snapshot_path = store.projection_snapshot_path();
if let Some(projection) = read_projection_snapshot(snapshot_path.as_path(), &fingerprint)? {
let mut cache = PROJECTION_CACHE.lock().unwrap();
cache.insert(
store.ledger_path().to_path_buf(),
ProjectionCacheEntry {
fingerprint,
projection: projection.clone(),
},
);
return Ok((projection, ProjectionSource::Persistent));
}
let projection = LifecycleProjection::from_entries(store.read_all()?);
let mut cache = PROJECTION_CACHE.lock().unwrap();
cache.insert(
store.ledger_path().to_path_buf(),
ProjectionCacheEntry {
fingerprint: fingerprint.clone(),
projection: projection.clone(),
},
);
write_projection_snapshot(snapshot_path.as_path(), &fingerprint, &projection)?;
Ok((projection, ProjectionSource::Rebuilt))
}
pub(super) fn invalidate_projection_cache(path: &Path, snapshot_path: &Path) {
PROJECTION_CACHE.lock().unwrap().remove(path);
match fs::remove_file(snapshot_path) {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(_error) => {}
}
}
fn ledger_fingerprint(path: &Path) -> anyhow::Result<LedgerFingerprint> {
let metadata = match fs::metadata(path) {
Ok(metadata) => metadata,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
return Ok(LedgerFingerprint::Missing);
}
Err(error) => return Err(error.into()),
};
let modified_nanos = metadata
.modified()
.ok()
.and_then(|time| time.duration_since(std::time::UNIX_EPOCH).ok())
.map(|duration| duration.as_nanos())
.unwrap_or_default();
Ok(LedgerFingerprint::Existing {
len: metadata.len(),
modified_nanos,
})
}
fn read_projection_snapshot(
path: &Path,
fingerprint: &LedgerFingerprint,
) -> anyhow::Result<Option<LifecycleProjection>> {
let raw = match fs::read_to_string(path) {
Ok(raw) => raw,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(error) => return Err(error.into()),
};
let snapshot: ProjectionSnapshotFile = match serde_json::from_str(&raw) {
Ok(snapshot) => snapshot,
Err(_) => {
let _ = fs::remove_file(path);
return Ok(None);
}
};
if snapshot.schema_version != PROJECTION_SNAPSHOT_SCHEMA_VERSION {
let _ = fs::remove_file(path);
return Ok(None);
}
if &snapshot.fingerprint != fingerprint {
return Ok(None);
}
Ok(Some(LifecycleProjection::from_entries(
snapshot.latest_entries,
)))
}
fn write_projection_snapshot(
path: &Path,
fingerprint: &LedgerFingerprint,
projection: &LifecycleProjection,
) -> anyhow::Result<()> {
ensure_parent_dir(path)?;
let snapshot = ProjectionSnapshotFile {
schema_version: PROJECTION_SNAPSHOT_SCHEMA_VERSION.to_string(),
fingerprint: fingerprint.clone(),
latest_entries: projection.latest_entries().to_vec(),
};
let temp_path = path.with_extension("tmp");
fs::write(&temp_path, serde_json::to_vec(&snapshot)?)?;
fs::rename(temp_path, path)?;
Ok(())
}
#[cfg(test)]
pub(crate) fn clear_projection_cache() {
PROJECTION_CACHE.lock().unwrap().clear();
}
#[cfg(test)]
pub(crate) fn read_projection_with_cache_hit(
store: &LifecycleStore,
) -> anyhow::Result<(LifecycleProjection, bool)> {
let (projection, source) = read_projection_internal(store)?;
Ok((
projection,
matches!(
source,
ProjectionSource::Memory | ProjectionSource::Persistent
),
))
}
#[cfg(test)]
pub(crate) fn read_projection_with_source(
store: &LifecycleStore,
) -> anyhow::Result<(LifecycleProjection, String)> {
let (projection, source) = read_projection_internal(store)?;
Ok((
projection,
match source {
ProjectionSource::Memory => "memory".to_string(),
ProjectionSource::Persistent => "persistent".to_string(),
ProjectionSource::Rebuilt => "rebuilt".to_string(),
},
))
}