Skip to main content

ferro_airflow_dag_parser/
common.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Backend-agnostic types used by both the `ruff` and `rustpython`
3//! parser backends.
4//!
5//! [`ExtractedDag`] is the unit produced by either backend on a single
6//! Python source file; [`ParseError`] is the unified error surface;
7//! [`DagId`] and [`TaskId`] are validated newtypes that reject
8//! Airflow-incompatible identifiers up-front so downstream consumers
9//! (a metadata DB, a UI, a metrics label) do not have to re-validate.
10//!
11//! The validation rule mirrors Apache Airflow™ exactly: at most 250
12//! characters, drawn from `[a-zA-Z0-9_\-\.]`, non-empty.
13
14use std::fmt;
15use std::str::FromStr;
16
17use serde::{Deserialize, Serialize};
18use thiserror::Error;
19
20/// Maximum length of any DAG / task identifier, in characters.
21///
22/// Matches
23/// the upstream Airflow constraint (`AIRFLOW__CORE__MAX_DAG_ID_LENGTH`
24/// and the equivalent task-id rule in `airflow.models.baseoperator`).
25pub const MAX_IDENTIFIER_LEN: usize = 250;
26
27/// Validation failure for [`DagId`] / [`TaskId`].
28#[derive(Debug, Clone, Error, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum IdentifierError {
31    /// Empty input.
32    #[error("{kind} must not be empty")]
33    Empty {
34        /// Identifier kind (`"dag_id"` / `"task_id"`).
35        kind: &'static str,
36    },
37    /// Identifier exceeded the 250-character cap.
38    #[error("{kind} must be at most {max_len} characters (got {len})")]
39    TooLong {
40        /// Identifier kind.
41        kind: &'static str,
42        /// Configured maximum (always [`MAX_IDENTIFIER_LEN`] in the
43        /// public constructors).
44        max_len: usize,
45        /// Length actually supplied (in `char` units, not bytes).
46        len: usize,
47    },
48    /// Identifier contained a character outside `[a-zA-Z0-9_\-\.]`.
49    #[error("{kind} contains invalid character {bad:?}; allowed: [a-zA-Z0-9_\\-\\.]")]
50    InvalidCharacter {
51        /// Identifier kind.
52        kind: &'static str,
53        /// Offending character.
54        bad: char,
55    },
56}
57
58#[inline]
59const fn is_safe_airflow_char(c: char) -> bool {
60    c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.'
61}
62
63fn validate_safe_identifier(
64    kind: &'static str,
65    value: &str,
66    max_len: usize,
67) -> Result<(), IdentifierError> {
68    if value.is_empty() {
69        return Err(IdentifierError::Empty { kind });
70    }
71    let len = value.chars().count();
72    if len > max_len {
73        return Err(IdentifierError::TooLong { kind, max_len, len });
74    }
75    if let Some(bad) = value.chars().find(|c| !is_safe_airflow_char(*c)) {
76        return Err(IdentifierError::InvalidCharacter { kind, bad });
77    }
78    Ok(())
79}
80
81macro_rules! define_safe_identifier {
82    ($(#[$meta:meta])* $name:ident, $kind:literal) => {
83        $(#[$meta])*
84        #[derive(
85            Debug,
86            Clone,
87            PartialEq,
88            Eq,
89            Hash,
90            PartialOrd,
91            Ord,
92            Serialize,
93            Deserialize,
94        )]
95        #[serde(transparent)]
96        pub struct $name(String);
97
98        impl $name {
99            /// Validate and construct from any type that converts into a `String`.
100            pub fn new(value: impl Into<String>) -> Result<Self, IdentifierError> {
101                let s = value.into();
102                validate_safe_identifier($kind, &s, MAX_IDENTIFIER_LEN)?;
103                Ok(Self(s))
104            }
105
106            /// Borrow the underlying string slice.
107            #[must_use]
108            pub fn as_str(&self) -> &str {
109                &self.0
110            }
111
112            /// Consume and return the wrapped `String`.
113            #[must_use]
114            pub fn into_inner(self) -> String {
115                self.0
116            }
117        }
118
119        impl fmt::Display for $name {
120            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121                f.write_str(&self.0)
122            }
123        }
124
125        impl AsRef<str> for $name {
126            fn as_ref(&self) -> &str {
127                &self.0
128            }
129        }
130
131        impl FromStr for $name {
132            type Err = IdentifierError;
133            fn from_str(s: &str) -> Result<Self, Self::Err> {
134                Self::new(s)
135            }
136        }
137
138        impl TryFrom<String> for $name {
139            type Error = IdentifierError;
140            fn try_from(value: String) -> Result<Self, Self::Error> {
141                Self::new(value)
142            }
143        }
144
145        impl TryFrom<&str> for $name {
146            type Error = IdentifierError;
147            fn try_from(value: &str) -> Result<Self, Self::Error> {
148                Self::new(value.to_owned())
149            }
150        }
151    };
152}
153
154define_safe_identifier!(
155    /// Validated Apache Airflow DAG identifier.
156    ///
157    /// Constructed via [`DagId::new`]; refuses inputs that exceed
158    /// [`MAX_IDENTIFIER_LEN`] characters or contain characters outside
159    /// `[a-zA-Z0-9_\-\.]`.
160    DagId,
161    "dag_id"
162);
163
164define_safe_identifier!(
165    /// Validated Apache Airflow task identifier.
166    ///
167    /// Same rule as [`DagId`]: 1–250 chars from `[a-zA-Z0-9_\-\.]`.
168    TaskId,
169    "task_id"
170);
171
172/// Parse-time output of either backend on a single Python source file.
173///
174/// Multiple DAGs in the same file flatten to multiple [`ExtractedDag`]
175/// values (one per `with DAG(...)` block or `@dag`-decorated function).
176#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
177pub struct ExtractedDag {
178    /// `dag_id="..."` on `DAG(...)` or function name under `@dag`.
179    /// `None` means the literal could not be recovered statically and
180    /// the caller must fall back to runtime evaluation (e.g. `PyO3`).
181    pub dag_id: Option<DagId>,
182    /// All `task_id="..."` operator kwargs and `@task`-decorated
183    /// function names. De-duplicated, source order preserved.
184    pub task_ids: Vec<TaskId>,
185    /// `schedule=...` or legacy `schedule_interval=...` literal value.
186    /// Non-string literals are best-effort stringified (e.g. `None`,
187    /// `timedelta(days=1)`).
188    pub schedule: Option<String>,
189    /// `default_args={...}` keyword present at DAG construction.
190    pub has_default_args: bool,
191    /// `>>` / `<<` / `set_upstream` / `set_downstream` edges. Each
192    /// tuple is `(upstream_task_id, downstream_task_id)`. When the
193    /// referent is a chain helper or a list, the edge is omitted (those
194    /// shapes need runtime fallback).
195    pub deps_edges: Vec<(TaskId, TaskId)>,
196    /// Source span of the DAG construct: the `with DAG(...)` block or
197    /// the `@dag def fn():` definition. Useful for error messages and
198    /// editor jump-to-DAG; `None` when the backend did not surface
199    /// span info.
200    pub source_span: Option<SourceSpan>,
201}
202
203/// Inclusive line range of a DAG construct in the source file.
204/// Lines are 1-indexed, matching Python tracebacks and most editors.
205#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
206pub struct SourceSpan {
207    /// First line of the DAG construct (1-indexed, inclusive).
208    pub start_line: u32,
209    /// Last line of the DAG construct (1-indexed, inclusive).
210    pub end_line: u32,
211}
212
213/// Errors returned by either backend.
214#[derive(Debug, Error)]
215#[non_exhaustive]
216pub enum ParseError {
217    /// Underlying Python parser rejected the source.
218    #[error("python parse error: {0}")]
219    Parse(String),
220    /// Recovered identifier failed Airflow-safe validation.
221    #[error("invalid {kind} {value:?}: {reason}")]
222    InvalidIdentifier {
223        /// Identifier kind (`"dag_id"` / `"task_id"`) — propagated as a
224        /// static string so the caller can pattern-match without
225        /// allocating.
226        kind: &'static str,
227        /// The raw literal that failed validation.
228        value: String,
229        /// Reason from the validator.
230        reason: String,
231    },
232    /// Internal invariant violated; should never reach a caller.
233    #[error("internal parser error: {0}")]
234    Internal(String),
235    /// I/O failure while loading a DAG file from disk.
236    #[error("io error reading {path:?}: {source}")]
237    Io {
238        /// Path that failed to read.
239        path: std::path::PathBuf,
240        /// Underlying OS error.
241        #[source]
242        source: std::io::Error,
243    },
244    /// No backend feature was enabled at compile time.
245    #[error("no parser backend enabled (compile with the `parser-ruff` feature)")]
246    NoBackend,
247}
248
249impl ParseError {
250    /// Build an [`InvalidIdentifier`](Self::InvalidIdentifier) variant
251    /// from an [`IdentifierError`].
252    pub(crate) fn from_id_error(kind: &'static str, value: String, err: &IdentifierError) -> Self {
253        Self::InvalidIdentifier {
254            kind,
255            value,
256            reason: err.to_string(),
257        }
258    }
259}
260
261/// Append `name` to `into` if it is not already present, preserving
262/// first-seen order.
263#[cfg(any(feature = "parser-ruff", test))]
264pub(crate) fn push_unique_task(into: &mut Vec<TaskId>, name: TaskId) {
265    if !into
266        .iter()
267        .any(|existing| existing.as_str() == name.as_str())
268    {
269        into.push(name);
270    }
271}
272
273/// Validate-and-wrap a recovered DAG-id literal. Surfaces
274/// [`ParseError::InvalidIdentifier`] when the literal violates the
275/// 250-char / safe-charset rule.
276pub(crate) fn make_dag_id(value: String) -> Result<DagId, ParseError> {
277    DagId::new(value.clone()).map_err(|e| ParseError::from_id_error("dag_id", value, &e))
278}
279
280/// Validate-and-wrap a recovered task-id literal. Surfaces
281/// [`ParseError::InvalidIdentifier`] when the literal violates the
282/// 250-char / safe-charset rule.
283pub(crate) fn make_task_id(value: String) -> Result<TaskId, ParseError> {
284    TaskId::new(value.clone()).map_err(|e| ParseError::from_id_error("task_id", value, &e))
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn extracted_dag_round_trips_through_json() {
293        let dag = ExtractedDag {
294            dag_id: Some(DagId::new("hello").unwrap()),
295            task_ids: vec![TaskId::new("a").unwrap(), TaskId::new("b").unwrap()],
296            schedule: Some("@daily".into()),
297            has_default_args: true,
298            deps_edges: vec![(TaskId::new("a").unwrap(), TaskId::new("b").unwrap())],
299            source_span: Some(SourceSpan {
300                start_line: 1,
301                end_line: 8,
302            }),
303        };
304        let json = serde_json::to_string(&dag).expect("serialise");
305        let back: ExtractedDag = serde_json::from_str(&json).expect("deserialise");
306        assert_eq!(dag, back);
307    }
308
309    #[test]
310    fn dag_id_rejects_invalid_chars() {
311        let e = DagId::new("has space").expect_err("must reject");
312        assert!(matches!(
313            e,
314            IdentifierError::InvalidCharacter {
315                kind: "dag_id",
316                bad: ' '
317            }
318        ));
319    }
320
321    #[test]
322    fn dag_id_rejects_too_long() {
323        let long = "a".repeat(MAX_IDENTIFIER_LEN + 1);
324        let e = DagId::new(long).expect_err("must reject");
325        assert!(matches!(e, IdentifierError::TooLong { kind: "dag_id", .. }));
326    }
327
328    #[test]
329    fn dag_id_rejects_empty() {
330        let e = DagId::new("").expect_err("must reject");
331        assert!(matches!(e, IdentifierError::Empty { kind: "dag_id" }));
332    }
333
334    #[test]
335    fn task_id_accepts_dotted_dashed_underscored() {
336        for ok in &["a", "a.b", "a-b", "a_b", "a.b-c_d.0"] {
337            TaskId::new(*ok).unwrap_or_else(|_| panic!("must accept {ok:?}"));
338        }
339    }
340
341    #[test]
342    fn dag_id_displays_inner() {
343        let id = DagId::new("hello").unwrap();
344        assert_eq!(id.to_string(), "hello");
345        assert_eq!(id.as_str(), "hello");
346    }
347
348    #[test]
349    fn dag_id_from_str_round_trips() {
350        let parsed: DagId = "ok".parse().unwrap();
351        assert_eq!(parsed.as_str(), "ok");
352    }
353
354    #[test]
355    fn task_id_try_from_string_round_trips() {
356        let s = String::from("foo");
357        let id = TaskId::try_from(s).unwrap();
358        assert_eq!(id.as_str(), "foo");
359    }
360
361    #[test]
362    fn make_dag_id_wraps_validation_error() {
363        let e = make_dag_id("has space".into()).expect_err("must reject");
364        match e {
365            ParseError::InvalidIdentifier { kind, value, .. } => {
366                assert_eq!(kind, "dag_id");
367                assert_eq!(value, "has space");
368            }
369            other => panic!("expected InvalidIdentifier, got {other:?}"),
370        }
371    }
372
373    #[test]
374    fn push_unique_task_keeps_first_seen_order() {
375        let mut v = Vec::<TaskId>::new();
376        push_unique_task(&mut v, TaskId::new("x").unwrap());
377        push_unique_task(&mut v, TaskId::new("y").unwrap());
378        push_unique_task(&mut v, TaskId::new("x").unwrap());
379        assert_eq!(v.len(), 2);
380        assert_eq!(v[0].as_str(), "x");
381        assert_eq!(v[1].as_str(), "y");
382    }
383}