ferro_airflow_dag_parser/
cache.rs1use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::time::SystemTime;
19
20use dashmap::DashMap;
21
22use crate::api::{ParseOutcome, parse_dag_file};
23use crate::common::ParseError;
24
25#[derive(Debug, Clone, Default)]
28pub struct ParseCache {
29 inner: Arc<DashMap<PathBuf, CacheEntry>>,
30}
31
32#[derive(Debug, Clone)]
33struct CacheEntry {
34 source_hash: u64,
36 fingerprint: FileFingerprint,
42 outcome: ParseOutcome,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct FileFingerprint {
48 mtime_ns: u128,
51 size: u64,
53}
54
55impl FileFingerprint {
56 fn from_metadata(meta: &std::fs::Metadata) -> Self {
57 let mtime_ns = meta
58 .modified()
59 .ok()
60 .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
61 .map_or(0u128, |d| d.as_nanos());
62 Self {
63 mtime_ns,
64 size: meta.len(),
65 }
66 }
67}
68
69impl ParseCache {
70 #[must_use]
72 pub fn new() -> Self {
73 Self::default()
74 }
75
76 #[must_use]
78 pub fn len(&self) -> usize {
79 self.inner.len()
80 }
81
82 #[must_use]
84 pub fn is_empty(&self) -> bool {
85 self.inner.is_empty()
86 }
87
88 #[must_use]
92 pub fn peek(&self, path: &Path) -> Option<ParseOutcome> {
93 let key = canonicalise(path);
94 self.inner.get(&key).map(|e| e.outcome.clone())
95 }
96
97 pub fn get_or_parse(&self, path: &Path) -> Result<ParseOutcome, ParseError> {
106 let key = canonicalise(path);
107 let meta = std::fs::metadata(&key).map_err(|e| ParseError::Io {
111 path: key.clone(),
112 source: e,
113 })?;
114 let fingerprint = FileFingerprint::from_metadata(&meta);
115 if let Some(entry) = self.inner.get(&key)
116 && entry.fingerprint == fingerprint
117 {
118 return Ok(entry.outcome.clone());
119 }
120 let source = std::fs::read_to_string(&key).map_err(|e| ParseError::Io {
122 path: key.clone(),
123 source: e,
124 })?;
125 let hash = hash_source(&source);
126 if let Some(entry) = self.inner.get(&key)
127 && entry.source_hash == hash
128 {
129 let mut e = entry.clone();
133 drop(entry);
134 e.fingerprint = fingerprint;
135 self.inner.insert(key.clone(), e.clone());
136 return Ok(e.outcome);
137 }
138 let outcome = parse_dag_file(&key, &source, hash)?;
139 self.inner.insert(
140 key,
141 CacheEntry {
142 source_hash: hash,
143 fingerprint,
144 outcome: outcome.clone(),
145 },
146 );
147 Ok(outcome)
148 }
149
150 pub fn invalidate(&self, path: &Path) {
153 let key = canonicalise(path);
154 self.inner.remove(&key);
155 }
156
157 pub fn clear(&self) {
159 self.inner.clear();
160 }
161}
162
163fn canonicalise(path: &Path) -> PathBuf {
167 std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
168}
169
170#[must_use]
174pub(crate) fn hash_source(src: &str) -> u64 {
175 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
180 for &byte in src.as_bytes() {
181 h ^= u64::from(byte);
182 h = h.wrapping_mul(0x100_0000_01b3);
183 }
184 h
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use tempfile::tempdir;
191
192 fn write_dag(path: &Path, body: &str) {
193 std::fs::write(path, body).unwrap();
194 }
195
196 const DAG_BODY: &str = r#"
197from airflow import DAG
198
199with DAG(dag_id="cached"):
200 pass
201"#;
202
203 #[test]
204 fn first_call_parses_second_call_hits_cache() {
205 let dir = tempdir().unwrap();
206 let p = dir.path().join("d1.py");
207 write_dag(&p, DAG_BODY);
208 let cache = ParseCache::new();
209 let a = cache.get_or_parse(&p).unwrap();
210 assert_eq!(cache.len(), 1);
211 let b = cache.get_or_parse(&p).unwrap();
212 assert_eq!(a.source_hash, b.source_hash);
213 assert_eq!(a.dags.len(), b.dags.len());
214 }
215
216 #[test]
217 fn rewrite_invalidates_cache_via_hash() {
218 let dir = tempdir().unwrap();
219 let p = dir.path().join("d2.py");
220 write_dag(&p, DAG_BODY);
221 let cache = ParseCache::new();
222 let a = cache.get_or_parse(&p).unwrap();
223 write_dag(&p, &DAG_BODY.replace("cached", "renamed"));
225 let b = cache.get_or_parse(&p).unwrap();
226 assert_ne!(a.source_hash, b.source_hash);
227 assert_eq!(b.dags[0].dag_id.as_ref().unwrap().as_str(), "renamed");
228 }
229
230 #[test]
231 fn invalidate_drops_entry() {
232 let dir = tempdir().unwrap();
233 let p = dir.path().join("d3.py");
234 write_dag(&p, DAG_BODY);
235 let cache = ParseCache::new();
236 cache.get_or_parse(&p).unwrap();
237 assert_eq!(cache.len(), 1);
238 cache.invalidate(&p);
239 assert_eq!(cache.len(), 0);
240 }
241
242 #[test]
243 fn clear_drops_everything() {
244 let dir = tempdir().unwrap();
245 let p1 = dir.path().join("a.py");
246 let p2 = dir.path().join("b.py");
247 write_dag(&p1, DAG_BODY);
248 write_dag(&p2, DAG_BODY);
249 let cache = ParseCache::new();
250 cache.get_or_parse(&p1).unwrap();
251 cache.get_or_parse(&p2).unwrap();
252 assert_eq!(cache.len(), 2);
253 cache.clear();
254 assert_eq!(cache.len(), 0);
255 }
256
257 #[test]
258 fn peek_returns_none_when_unparsed() {
259 let dir = tempdir().unwrap();
260 let p = dir.path().join("never.py");
261 write_dag(&p, DAG_BODY);
262 let cache = ParseCache::new();
263 assert!(cache.peek(&p).is_none());
264 cache.get_or_parse(&p).unwrap();
265 assert!(cache.peek(&p).is_some());
266 }
267
268 #[test]
269 fn missing_file_surfaces_io_error() {
270 let dir = tempdir().unwrap();
271 let p = dir.path().join("does_not_exist.py");
272 let cache = ParseCache::new();
273 let err = cache.get_or_parse(&p).unwrap_err();
274 assert!(matches!(err, ParseError::Io { .. }));
275 }
276
277 #[test]
278 fn hash_is_stable_across_runs() {
279 let h1 = hash_source("hello");
283 let h2 = hash_source("hello");
284 assert_eq!(h1, h2);
285 let h3 = hash_source("hellp");
286 assert_ne!(h1, h3);
287 }
288}