use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::SystemTime;
use dashmap::DashMap;
use crate::api::{ParseOutcome, parse_dag_file};
use crate::common::ParseError;
#[derive(Debug, Clone, Default)]
pub struct ParseCache {
inner: Arc<DashMap<PathBuf, CacheEntry>>,
}
#[derive(Debug, Clone)]
struct CacheEntry {
source_hash: u64,
fingerprint: FileFingerprint,
outcome: ParseOutcome,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct FileFingerprint {
mtime_ns: u128,
size: u64,
}
impl FileFingerprint {
fn from_metadata(meta: &std::fs::Metadata) -> Self {
let mtime_ns = meta
.modified()
.ok()
.and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
.map_or(0u128, |d| d.as_nanos());
Self {
mtime_ns,
size: meta.len(),
}
}
}
impl ParseCache {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn len(&self) -> usize {
self.inner.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[must_use]
pub fn peek(&self, path: &Path) -> Option<ParseOutcome> {
let key = canonicalise(path);
self.inner.get(&key).map(|e| e.outcome.clone())
}
pub fn get_or_parse(&self, path: &Path) -> Result<ParseOutcome, ParseError> {
let key = canonicalise(path);
let meta = std::fs::metadata(&key).map_err(|e| ParseError::Io {
path: key.clone(),
source: e,
})?;
let fingerprint = FileFingerprint::from_metadata(&meta);
if let Some(entry) = self.inner.get(&key)
&& entry.fingerprint == fingerprint
{
return Ok(entry.outcome.clone());
}
let source = std::fs::read_to_string(&key).map_err(|e| ParseError::Io {
path: key.clone(),
source: e,
})?;
let hash = hash_source(&source);
if let Some(entry) = self.inner.get(&key)
&& entry.source_hash == hash
{
let mut e = entry.clone();
drop(entry);
e.fingerprint = fingerprint;
self.inner.insert(key.clone(), e.clone());
return Ok(e.outcome);
}
let outcome = parse_dag_file(&key, &source, hash)?;
self.inner.insert(
key,
CacheEntry {
source_hash: hash,
fingerprint,
outcome: outcome.clone(),
},
);
Ok(outcome)
}
pub fn invalidate(&self, path: &Path) {
let key = canonicalise(path);
self.inner.remove(&key);
}
pub fn clear(&self) {
self.inner.clear();
}
}
fn canonicalise(path: &Path) -> PathBuf {
std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
}
#[must_use]
pub(crate) fn hash_source(src: &str) -> u64 {
let mut h: u64 = 0xcbf2_9ce4_8422_2325;
for &byte in src.as_bytes() {
h ^= u64::from(byte);
h = h.wrapping_mul(0x100_0000_01b3);
}
h
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn write_dag(path: &Path, body: &str) {
std::fs::write(path, body).unwrap();
}
const DAG_BODY: &str = r#"
from airflow import DAG
with DAG(dag_id="cached"):
pass
"#;
#[test]
fn first_call_parses_second_call_hits_cache() {
let dir = tempdir().unwrap();
let p = dir.path().join("d1.py");
write_dag(&p, DAG_BODY);
let cache = ParseCache::new();
let a = cache.get_or_parse(&p).unwrap();
assert_eq!(cache.len(), 1);
let b = cache.get_or_parse(&p).unwrap();
assert_eq!(a.source_hash, b.source_hash);
assert_eq!(a.dags.len(), b.dags.len());
}
#[test]
fn rewrite_invalidates_cache_via_hash() {
let dir = tempdir().unwrap();
let p = dir.path().join("d2.py");
write_dag(&p, DAG_BODY);
let cache = ParseCache::new();
let a = cache.get_or_parse(&p).unwrap();
write_dag(&p, &DAG_BODY.replace("cached", "renamed"));
let b = cache.get_or_parse(&p).unwrap();
assert_ne!(a.source_hash, b.source_hash);
assert_eq!(b.dags[0].dag_id.as_ref().unwrap().as_str(), "renamed");
}
#[test]
fn invalidate_drops_entry() {
let dir = tempdir().unwrap();
let p = dir.path().join("d3.py");
write_dag(&p, DAG_BODY);
let cache = ParseCache::new();
cache.get_or_parse(&p).unwrap();
assert_eq!(cache.len(), 1);
cache.invalidate(&p);
assert_eq!(cache.len(), 0);
}
#[test]
fn clear_drops_everything() {
let dir = tempdir().unwrap();
let p1 = dir.path().join("a.py");
let p2 = dir.path().join("b.py");
write_dag(&p1, DAG_BODY);
write_dag(&p2, DAG_BODY);
let cache = ParseCache::new();
cache.get_or_parse(&p1).unwrap();
cache.get_or_parse(&p2).unwrap();
assert_eq!(cache.len(), 2);
cache.clear();
assert_eq!(cache.len(), 0);
}
#[test]
fn peek_returns_none_when_unparsed() {
let dir = tempdir().unwrap();
let p = dir.path().join("never.py");
write_dag(&p, DAG_BODY);
let cache = ParseCache::new();
assert!(cache.peek(&p).is_none());
cache.get_or_parse(&p).unwrap();
assert!(cache.peek(&p).is_some());
}
#[test]
fn missing_file_surfaces_io_error() {
let dir = tempdir().unwrap();
let p = dir.path().join("does_not_exist.py");
let cache = ParseCache::new();
let err = cache.get_or_parse(&p).unwrap_err();
assert!(matches!(err, ParseError::Io { .. }));
}
#[test]
fn hash_is_stable_across_runs() {
let h1 = hash_source("hello");
let h2 = hash_source("hello");
assert_eq!(h1, h2);
let h3 = hash_source("hellp");
assert_ne!(h1, h3);
}
}