Skip to main content

Crate ferro_airflow_dag_parser

Crate ferro_airflow_dag_parser 

Source
Expand description

§ferro-airflow-dag-parser

License Rust 1.85+

Static, AST-based extractor for Apache Airflow™ Python DAG files. Recovers dag_id, task_ids, task dependencies, schedule, and a catalogue of “this can’t be resolved statically” markers — without running the source.

⚠️ Alpha (v0.0.1). API will change between 0.0.x releases. The implementation is pulled from production use in the Ferro ecosystem; it is the static fast-path that orchestrators use to skip CPython evaluation when a DAG file’s structure can be determined by looking at the Python AST alone.

Part of the Ferro ecosystem. Extracted from production use in FerroAir (an Airflow-3-compatible orchestrator written in Rust).

§What this crate does

Apache Airflow’s reference scheduler imports every dags/*.py file through CPython on every poll cycle so it can read the resulting DAG objects. That works, but it pays the full cost of evaluating every import-time expression — including the ones that have no side effects relevant to the scheduler.

This crate parses the Python source with the ruff_python_parser (vendored as littrs-ruff-python-parser) and walks the AST to recover the same information statically:

  • DAG ID — from with DAG(dag_id="…") or @dag def fn():.
  • Task IDs — from every task_id="…" operator kwarg and every @task-decorated function name, deduplicated, source-order preserved.
  • Schedule — schedule="@daily" / schedule_interval="…" / timetable=…. Best-effort stringified for non-string literals.
  • Dependency edges — >> / << / set_upstream / set_downstream.
  • default_args={…} presence flag.
  • Source span (1-indexed inclusive lines) for error messages and jump-to-DAG features.

It also detects seven dynamic-fallback markers that say “static analysis is incomplete; if you need full fidelity, route this DAG through CPython”:

  1. dag_id=Path(__file__).stem (or any non-literal expression)
  2. chain(*list) / cross_downstream(*list) splat
  3. task_id=f"task_{i}" (f-string task IDs in a loop)
  4. schedule=Asset("…") / schedule=Timetable() (non-literal schedule)
  5. @task(expand=…) / @task(partial=…) / dynamic taskflow decorators
  6. if X: with DAG(...) (import-time conditional DAG)
  7. for x in …: PythonOperator(...) (operator construction in a loop)

§What this crate does not do

  • Run the file. This is a static analyzer, not an executor. If a DAG’s structure depends on runtime state, this crate will surface a dynamic-fallback marker rather than try to evaluate the expression.
  • Validate semantics. Recovered identifiers are validated against Airflow’s identifier rule (1–250 chars, [a-zA-Z0-9_\-\.]). The crate does not check whether task_ids match operator contracts, whether DAG runs would succeed, or whether the schedule is sane.
  • Mirror the full upstream DagBag API. Where airflow.models.DagBag reports import errors, plugin lookups, dag-pickle round-trips, and more, this crate reports Result<ExtractedDag, ParseError>. The call site decides what to do with parse failures.

§Quick start

use ferro_airflow_dag_parser::{extract_static_dag, dynamic_markers_for};

let src = r#"
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(dag_id="hello", schedule="@daily"):
    a = BashOperator(task_id="a", bash_command="echo a")
    b = BashOperator(task_id="b", bash_command="echo b")
    a >> b
"#;

let dag = extract_static_dag(src).unwrap();
assert_eq!(dag.dag_id.as_ref().map(|d| d.as_str()), Some("hello"));
assert_eq!(dag.task_ids.len(), 2);
assert_eq!(dag.schedule.as_deref(), Some("@daily"));
assert!(dag.deps_edges.iter().any(|(u, d)| u.as_str() == "a" && d.as_str() == "b"));
assert!(dynamic_markers_for(src).is_empty());

§Cache (for filesystem-watching consumers)

If you are a DAG-folder watcher (the typical use case), construct a process-local ParseCache and call get_or_parse(path) instead of re-parsing on every poll:

use std::path::Path;
use ferro_airflow_dag_parser::ParseCache;

let cache = ParseCache::new();
let outcome = cache.get_or_parse(Path::new("dags/hello.py")).unwrap();
println!("{} DAG(s), source_hash = {:#x}", outcome.dags.len(), outcome.source_hash);

The cache uses a stat-only fast path (mtime + size fingerprint) before falling back to re-hashing the file contents, matching the behaviour of Airflow’s reference DAG processor.

§Backend

The crate uses ruff_python_parser (vendored as the littrs-ruff-python-parser crates.io mirror, pinned for reproducibility). It is the only backend, gated behind the parser-ruff feature which is on by default. Set default-features = false if you only need the codec-free types in common and line_index.

A second rustpython-parser backend was used as a parity-checking companion during the originating Ferro PoC and removed before publication: its transitive dependency closure pulls LGPL-3.0-only crates (the malachite-* family) and unmaintained Unicode crates, neither of which are appropriate for this workspace’s Apache-2.0-clean license profile.

§Status

AspectStatus
API stabilityalpha (v0.0.x — breaking changes allowed at any release)
Use in productionYes, in FerroAir
MSRVrustc 1.88
Coverage target80%+ line; current measured in CI
Async runtimeNone (synchronous; the ParseCache uses dashmap for thread safety)
Test fixturesInline source strings only — no vendored Apache Airflow™ DAGs

§Used in production by

  • FerroAir — Apache Airflow-3-compatible orchestrator written in Rust. The static fast-path uses this crate (private at time of writing; will switch to ferro-airflow-dag-parser once published).

§Compatibility note

Apache Airflow™ is a registered trademark of the Apache Software Foundation. This crate implements a static analyzer compatible with the Airflow DAG Python API; it is not endorsed by, or affiliated with, the ASF.

The identifier-validation rule (1–250 characters drawn from [a-zA-Z0-9_\-\.]) is taken from Airflow’s reference implementation (airflow.models.dag.DAG_ID_RE_VALID_CHARS).

§Triage policy

See the workspace CONTRIBUTING.md. In short: security 48h, bugs (with a reproducer) 14 days best-effort, features collected for the next minor.

§License

Apache-2.0. See LICENSE.

Re-exports§

pub use common::DagId;
pub use common::ExtractedDag;
pub use common::IdentifierError;
pub use common::ParseError;
pub use common::SourceSpan;
pub use common::TaskId;
pub use api::ParseOutcome;parser-ruff
pub use api::dynamic_markers_for;parser-ruff
pub use api::extract_all_static_dags;parser-ruff
pub use api::extract_static_dag;parser-ruff
pub use api::parse_dag_path;parser-ruff
pub use cache::ParseCache;parser-ruff
pub use dynamic_markers::DynamicMarker;parser-ruff
pub use dynamic_markers::detect_dynamic_markers;parser-ruff

Modules§

apiparser-ruff
Stable public API for the static DAG extractor.
cacheparser-ruff
Path-keyed parse cache for ParseOutcome.
common
Backend-agnostic types used by both the ruff and rustpython parser backends.
dynamic_markersparser-ruff
Detect dynamic patterns in a Python DAG file that the static AST extractor cannot resolve and that therefore need PyO3 fallback.
line_index
Byte-offset → 1-indexed line/column resolver shared by both backends.
ruff_implparser-ruff
ruff_python_parser (vendored as littrs-ruff-python-parser) backend for static Airflow DAG extraction.

Constants§

CRATE_NAME
Crate name, exposed for diagnostics and /metrics labelling.