use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use polyglot_sql::DialectType;
use serde::{Deserialize, Serialize};
use crate::parser::cache::hash_str;
use super::ModelColumnLineage;
pub(super) const COLUMN_LINEAGE_CACHE_FILENAME: &str = "column_lineage_cache.json";
pub(super) const CACHE_DIR: &str = ".dlin_cache";
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ColumnLineageCacheEntry {
compiled_code_hash: u64,
dialect: String,
#[serde(default)]
manifest_columns_hash: u64,
#[serde(default)]
manifest_mtime_secs: u64,
#[serde(default)]
manifest_mtime_nanos: u32,
#[serde(default)]
manifest_size_bytes: u64,
lineage: ModelColumnLineage,
}
#[derive(Debug, Serialize, Deserialize)]
pub(super) struct ColumnLineageCacheFile {
#[serde(default)]
pub(super) version: String,
entries: HashMap<String, ColumnLineageCacheEntry>,
}
pub struct ColumnLineageCache {
version: String,
entries: HashMap<String, ColumnLineageCacheEntry>,
cache_path: Option<PathBuf>,
dirty: bool,
}
impl ColumnLineageCache {
pub fn disabled() -> Self {
Self {
version: String::new(),
entries: HashMap::new(),
cache_path: None,
dirty: false,
}
}
pub fn load(project_dir: &Path, cache_dir: Option<&Path>) -> Self {
let cache_path = match cache_dir {
Some(dir) => dir.join(COLUMN_LINEAGE_CACHE_FILENAME),
None => project_dir
.join(CACHE_DIR)
.join(COLUMN_LINEAGE_CACHE_FILENAME),
};
let version = env!("CARGO_PKG_VERSION").to_string();
let entries = std::fs::read_to_string(&cache_path)
.ok()
.and_then(|content| serde_json::from_str::<ColumnLineageCacheFile>(&content).ok())
.filter(|cf| cf.version == version)
.map(|cf| cf.entries)
.unwrap_or_default();
Self {
version,
entries,
cache_path: Some(cache_path),
dirty: false,
}
}
pub fn fresh(project_dir: &Path, cache_dir: Option<&Path>) -> Self {
let cache_path = match cache_dir {
Some(dir) => dir.join(COLUMN_LINEAGE_CACHE_FILENAME),
None => project_dir
.join(CACHE_DIR)
.join(COLUMN_LINEAGE_CACHE_FILENAME),
};
Self {
version: env!("CARGO_PKG_VERSION").to_string(),
entries: HashMap::new(),
cache_path: Some(cache_path),
dirty: false,
}
}
pub fn get(
&self,
model_name: &str,
compiled_code: &str,
dialect: DialectType,
manifest_path: Option<&Path>,
manifest_columns_hash: Option<u64>,
) -> Option<&ModelColumnLineage> {
let entry = self.entries.get(model_name)?;
let code_hash = hash_str(compiled_code);
let dialect_str = format!("{:?}", dialect);
if entry.compiled_code_hash != code_hash || entry.dialect != dialect_str {
return None;
}
if let Some(path) = manifest_path
&& let Some(stat) = manifest_stat(path)
&& (entry.manifest_mtime_secs != stat.mtime_secs
|| entry.manifest_mtime_nanos != stat.mtime_nanos
|| entry.manifest_size_bytes != stat.size_bytes)
{
return None;
}
if let Some(hash) = manifest_columns_hash
&& entry.manifest_columns_hash == hash
{
return Some(&entry.lineage);
}
None
}
pub fn insert(
&mut self,
model_name: &str,
compiled_code: &str,
dialect: DialectType,
manifest_columns_hash: u64,
manifest_path: Option<&Path>,
lineage: ModelColumnLineage,
) {
let stat = manifest_path.and_then(manifest_stat);
self.entries.insert(
model_name.to_string(),
ColumnLineageCacheEntry {
compiled_code_hash: hash_str(compiled_code),
dialect: format!("{:?}", dialect),
manifest_columns_hash,
manifest_mtime_secs: stat.as_ref().map_or(0, |s| s.mtime_secs),
manifest_mtime_nanos: stat.as_ref().map_or(0, |s| s.mtime_nanos),
manifest_size_bytes: stat.as_ref().map_or(0, |s| s.size_bytes),
lineage,
},
);
self.dirty = true;
}
pub fn save(&self) {
let cache_path = match (&self.cache_path, self.dirty) {
(Some(p), true) => p,
_ => return,
};
let cf = ColumnLineageCacheFile {
version: self.version.clone(),
entries: self.entries.clone(),
};
if let Some(parent) = cache_path.parent() {
if std::fs::create_dir_all(parent).is_err() {
crate::warn!("could not create cache directory: {}", parent.display());
return;
}
let gitignore = parent.join(".gitignore");
if !gitignore.exists()
&& let Err(e) = std::fs::write(&gitignore, "# Automatically created by dlin\n*\n")
{
crate::warn!("could not create {}: {}", gitignore.display(), e);
}
}
match serde_json::to_string(&cf) {
Ok(json) => {
if let Err(e) = std::fs::write(cache_path, json) {
crate::warn!("could not write cache file {}: {}", cache_path.display(), e);
}
}
Err(e) => {
crate::warn!("could not serialize column lineage cache: {}", e);
}
}
}
}
struct ManifestStat {
mtime_secs: u64,
mtime_nanos: u32,
size_bytes: u64,
}
fn manifest_stat(path: &Path) -> Option<ManifestStat> {
let meta = std::fs::metadata(path).ok()?;
let mtime = meta
.modified()
.ok()?
.duration_since(SystemTime::UNIX_EPOCH)
.ok()?;
Some(ManifestStat {
mtime_secs: mtime.as_secs(),
mtime_nanos: mtime.subsec_nanos(),
size_bytes: meta.len(),
})
}