Skip to main content

ferro_airflow_dag_parser/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Path-keyed parse cache for [`ParseOutcome`].
3//!
4//! `ferroair-dag-processor` re-scans the DAGs folder every few seconds
5//! (the `dag_processor.poll_interval` setting). Re-parsing every file
6//! every poll is wasteful — Airflow's reference implementation hashes
7//! file content and skips the parse when the hash matches the cached
8//! one. Phase 1 reproduces that behaviour using a `dashmap::DashMap`
9//! keyed on the file's canonicalised path.
10//!
11//! The cache is intentionally process-local (no on-disk persistence)
12//! and assumes the caller validated the path is inside the configured
13//! DAGs folder. Cross-process invalidation is the dag-processor's job
14//! (it owns the inotify / kqueue watcher).
15
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21
22use crate::api::{ParseOutcome, parse_dag_file};
23use crate::common::ParseError;
24
25/// Process-local parse cache. Cheap to clone (the inner [`DashMap`] is
26/// behind an [`Arc`]).
27#[derive(Debug, Clone, Default)]
28pub struct ParseCache {
29    inner: Arc<DashMap<PathBuf, CacheEntry>>,
30}
31
32#[derive(Debug, Clone)]
33struct CacheEntry {
34    /// Hash of the file contents at the time of the last parse.
35    source_hash: u64,
36    /// File mtime + size at the time of the last parse. Used as a
37    /// cheap "did the file change?" pre-check so the hot path can skip
38    /// the file read + hash on a stat-only match (the workspace target
39    /// is < 5 µs / hit, which a full `read_to_string` + `FxHash` blows
40    /// past on a 600-byte DAG).
41    fingerprint: FileFingerprint,
42    /// Memoised parse outcome.
43    outcome: ParseOutcome,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct FileFingerprint {
48    /// File modification time as nanoseconds since the Unix epoch.
49    /// Falls back to `0` when the platform reports no mtime.
50    mtime_ns: u128,
51    /// File size in bytes.
52    size: u64,
53}
54
55impl FileFingerprint {
56    fn from_metadata(meta: &std::fs::Metadata) -> Self {
57        let mtime_ns = meta
58            .modified()
59            .ok()
60            .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
61            .map_or(0u128, |d| d.as_nanos());
62        Self {
63            mtime_ns,
64            size: meta.len(),
65        }
66    }
67}
68
69impl ParseCache {
70    /// Create an empty cache.
71    #[must_use]
72    pub fn new() -> Self {
73        Self::default()
74    }
75
76    /// Number of cached entries.
77    #[must_use]
78    pub fn len(&self) -> usize {
79        self.inner.len()
80    }
81
82    /// `true` when the cache holds no entries.
83    #[must_use]
84    pub fn is_empty(&self) -> bool {
85        self.inner.is_empty()
86    }
87
88    /// Retrieve a cached [`ParseOutcome`] without re-parsing. Returns
89    /// `None` when the file has not been parsed yet (or was invalidated).
90    /// Useful for read-only consumers like a `/metrics` exporter.
91    #[must_use]
92    pub fn peek(&self, path: &Path) -> Option<ParseOutcome> {
93        let key = canonicalise(path);
94        self.inner.get(&key).map(|e| e.outcome.clone())
95    }
96
97    /// Parse `path` if the cached hash differs from the on-disk hash, or
98    /// return the cached [`ParseOutcome`] when they match.
99    ///
100    /// # Errors
101    ///
102    /// Returns [`ParseError::Io`] if the file cannot be read, or
103    /// [`ParseError::Parse`] / [`ParseError::InvalidIdentifier`] if the
104    /// underlying parse fails.
105    pub fn get_or_parse(&self, path: &Path) -> Result<ParseOutcome, ParseError> {
106        let key = canonicalise(path);
107        // Cheap pre-check: stat the file and look up the fingerprint.
108        // When the fingerprint matches the cached entry we return
109        // immediately — no `read_to_string` + hash on the hot path.
110        let meta = std::fs::metadata(&key).map_err(|e| ParseError::Io {
111            path: key.clone(),
112            source: e,
113        })?;
114        let fingerprint = FileFingerprint::from_metadata(&meta);
115        if let Some(entry) = self.inner.get(&key)
116            && entry.fingerprint == fingerprint
117        {
118            return Ok(entry.outcome.clone());
119        }
120        // Cold path: read + hash + parse.
121        let source = std::fs::read_to_string(&key).map_err(|e| ParseError::Io {
122            path: key.clone(),
123            source: e,
124        })?;
125        let hash = hash_source(&source);
126        if let Some(entry) = self.inner.get(&key)
127            && entry.source_hash == hash
128        {
129            // Content unchanged but fingerprint drifted (e.g. `touch`).
130            // Refresh the fingerprint to keep the fast path warm but
131            // skip the parse.
132            let mut e = entry.clone();
133            drop(entry);
134            e.fingerprint = fingerprint;
135            self.inner.insert(key.clone(), e.clone());
136            return Ok(e.outcome);
137        }
138        let outcome = parse_dag_file(&key, &source, hash)?;
139        self.inner.insert(
140            key,
141            CacheEntry {
142                source_hash: hash,
143                fingerprint,
144                outcome: outcome.clone(),
145            },
146        );
147        Ok(outcome)
148    }
149
150    /// Drop the cache entry for `path`, if any. Used by the
151    /// dag-processor when its watcher reports a delete / unlink.
152    pub fn invalidate(&self, path: &Path) {
153        let key = canonicalise(path);
154        self.inner.remove(&key);
155    }
156
157    /// Drop every entry. Used on configuration reload.
158    pub fn clear(&self) {
159        self.inner.clear();
160    }
161}
162
163/// Best-effort path canonicalisation. Falls back to the input path when
164/// canonicalisation fails (e.g. the file does not yet exist on disk —
165/// the caller is about to parse a synthetic source string).
166fn canonicalise(path: &Path) -> PathBuf {
167    std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
168}
169
170/// Stable, machine-independent `FxHash` of the file contents. Stable
171/// because the dag-processor compares hashes across scheduler restarts
172/// to decide whether to re-emit `ImportError` rows.
173#[must_use]
174pub(crate) fn hash_source(src: &str) -> u64 {
175    // We use a simple FxHash implementation rather than DefaultHasher
176    // (which is randomised per-process). This matters for cache-warm
177    // restarts in tests; production code only relies on within-process
178    // consistency.
179    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
180    for &byte in src.as_bytes() {
181        h ^= u64::from(byte);
182        h = h.wrapping_mul(0x100_0000_01b3);
183    }
184    h
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use tempfile::tempdir;
191
192    fn write_dag(path: &Path, body: &str) {
193        std::fs::write(path, body).unwrap();
194    }
195
196    const DAG_BODY: &str = r#"
197from airflow import DAG
198
199with DAG(dag_id="cached"):
200    pass
201"#;
202
203    #[test]
204    fn first_call_parses_second_call_hits_cache() {
205        let dir = tempdir().unwrap();
206        let p = dir.path().join("d1.py");
207        write_dag(&p, DAG_BODY);
208        let cache = ParseCache::new();
209        let a = cache.get_or_parse(&p).unwrap();
210        assert_eq!(cache.len(), 1);
211        let b = cache.get_or_parse(&p).unwrap();
212        assert_eq!(a.source_hash, b.source_hash);
213        assert_eq!(a.dags.len(), b.dags.len());
214    }
215
216    #[test]
217    fn rewrite_invalidates_cache_via_hash() {
218        let dir = tempdir().unwrap();
219        let p = dir.path().join("d2.py");
220        write_dag(&p, DAG_BODY);
221        let cache = ParseCache::new();
222        let a = cache.get_or_parse(&p).unwrap();
223        // Rewrite with different content.
224        write_dag(&p, &DAG_BODY.replace("cached", "renamed"));
225        let b = cache.get_or_parse(&p).unwrap();
226        assert_ne!(a.source_hash, b.source_hash);
227        assert_eq!(b.dags[0].dag_id.as_ref().unwrap().as_str(), "renamed");
228    }
229
230    #[test]
231    fn invalidate_drops_entry() {
232        let dir = tempdir().unwrap();
233        let p = dir.path().join("d3.py");
234        write_dag(&p, DAG_BODY);
235        let cache = ParseCache::new();
236        cache.get_or_parse(&p).unwrap();
237        assert_eq!(cache.len(), 1);
238        cache.invalidate(&p);
239        assert_eq!(cache.len(), 0);
240    }
241
242    #[test]
243    fn clear_drops_everything() {
244        let dir = tempdir().unwrap();
245        let p1 = dir.path().join("a.py");
246        let p2 = dir.path().join("b.py");
247        write_dag(&p1, DAG_BODY);
248        write_dag(&p2, DAG_BODY);
249        let cache = ParseCache::new();
250        cache.get_or_parse(&p1).unwrap();
251        cache.get_or_parse(&p2).unwrap();
252        assert_eq!(cache.len(), 2);
253        cache.clear();
254        assert_eq!(cache.len(), 0);
255    }
256
257    #[test]
258    fn peek_returns_none_when_unparsed() {
259        let dir = tempdir().unwrap();
260        let p = dir.path().join("never.py");
261        write_dag(&p, DAG_BODY);
262        let cache = ParseCache::new();
263        assert!(cache.peek(&p).is_none());
264        cache.get_or_parse(&p).unwrap();
265        assert!(cache.peek(&p).is_some());
266    }
267
268    #[test]
269    fn missing_file_surfaces_io_error() {
270        let dir = tempdir().unwrap();
271        let p = dir.path().join("does_not_exist.py");
272        let cache = ParseCache::new();
273        let err = cache.get_or_parse(&p).unwrap_err();
274        assert!(matches!(err, ParseError::Io { .. }));
275    }
276
277    #[test]
278    fn hash_is_stable_across_runs() {
279        // Stable hash means two runs of the test process will produce
280        // the same value for the same input. We rely on this so the
281        // dag-processor can dedupe ImportError rows across restarts.
282        let h1 = hash_source("hello");
283        let h2 = hash_source("hello");
284        assert_eq!(h1, h2);
285        let h3 = hash_source("hellp");
286        assert_ne!(h1, h3);
287    }
288}