ferro_airflow_dag_parser/
common.rs1use std::fmt;
15use std::str::FromStr;
16
17use serde::{Deserialize, Serialize};
18use thiserror::Error;
19
20pub const MAX_IDENTIFIER_LEN: usize = 250;
26
27#[derive(Debug, Clone, Error, PartialEq, Eq)]
29#[non_exhaustive]
30pub enum IdentifierError {
31 #[error("{kind} must not be empty")]
33 Empty {
34 kind: &'static str,
36 },
37 #[error("{kind} must be at most {max_len} characters (got {len})")]
39 TooLong {
40 kind: &'static str,
42 max_len: usize,
45 len: usize,
47 },
48 #[error("{kind} contains invalid character {bad:?}; allowed: [a-zA-Z0-9_\\-\\.]")]
50 InvalidCharacter {
51 kind: &'static str,
53 bad: char,
55 },
56}
57
58#[inline]
59const fn is_safe_airflow_char(c: char) -> bool {
60 c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.'
61}
62
63fn validate_safe_identifier(
64 kind: &'static str,
65 value: &str,
66 max_len: usize,
67) -> Result<(), IdentifierError> {
68 if value.is_empty() {
69 return Err(IdentifierError::Empty { kind });
70 }
71 let len = value.chars().count();
72 if len > max_len {
73 return Err(IdentifierError::TooLong { kind, max_len, len });
74 }
75 if let Some(bad) = value.chars().find(|c| !is_safe_airflow_char(*c)) {
76 return Err(IdentifierError::InvalidCharacter { kind, bad });
77 }
78 Ok(())
79}
80
81macro_rules! define_safe_identifier {
82 ($(#[$meta:meta])* $name:ident, $kind:literal) => {
83 $(#[$meta])*
84 #[derive(
85 Debug,
86 Clone,
87 PartialEq,
88 Eq,
89 Hash,
90 PartialOrd,
91 Ord,
92 Serialize,
93 Deserialize,
94 )]
95 #[serde(transparent)]
96 pub struct $name(String);
97
98 impl $name {
99 pub fn new(value: impl Into<String>) -> Result<Self, IdentifierError> {
101 let s = value.into();
102 validate_safe_identifier($kind, &s, MAX_IDENTIFIER_LEN)?;
103 Ok(Self(s))
104 }
105
106 #[must_use]
108 pub fn as_str(&self) -> &str {
109 &self.0
110 }
111
112 #[must_use]
114 pub fn into_inner(self) -> String {
115 self.0
116 }
117 }
118
119 impl fmt::Display for $name {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.write_str(&self.0)
122 }
123 }
124
125 impl AsRef<str> for $name {
126 fn as_ref(&self) -> &str {
127 &self.0
128 }
129 }
130
131 impl FromStr for $name {
132 type Err = IdentifierError;
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
134 Self::new(s)
135 }
136 }
137
138 impl TryFrom<String> for $name {
139 type Error = IdentifierError;
140 fn try_from(value: String) -> Result<Self, Self::Error> {
141 Self::new(value)
142 }
143 }
144
145 impl TryFrom<&str> for $name {
146 type Error = IdentifierError;
147 fn try_from(value: &str) -> Result<Self, Self::Error> {
148 Self::new(value.to_owned())
149 }
150 }
151 };
152}
153
154define_safe_identifier!(
155 DagId,
161 "dag_id"
162);
163
164define_safe_identifier!(
165 TaskId,
169 "task_id"
170);
171
172#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
177pub struct ExtractedDag {
178 pub dag_id: Option<DagId>,
182 pub task_ids: Vec<TaskId>,
185 pub schedule: Option<String>,
189 pub has_default_args: bool,
191 pub deps_edges: Vec<(TaskId, TaskId)>,
196 pub source_span: Option<SourceSpan>,
201}
202
203#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
206pub struct SourceSpan {
207 pub start_line: u32,
209 pub end_line: u32,
211}
212
213#[derive(Debug, Error)]
215#[non_exhaustive]
216pub enum ParseError {
217 #[error("python parse error: {0}")]
219 Parse(String),
220 #[error("invalid {kind} {value:?}: {reason}")]
222 InvalidIdentifier {
223 kind: &'static str,
227 value: String,
229 reason: String,
231 },
232 #[error("internal parser error: {0}")]
234 Internal(String),
235 #[error("io error reading {path:?}: {source}")]
237 Io {
238 path: std::path::PathBuf,
240 #[source]
242 source: std::io::Error,
243 },
244 #[error("no parser backend enabled (compile with the `parser-ruff` feature)")]
246 NoBackend,
247}
248
249impl ParseError {
250 pub(crate) fn from_id_error(kind: &'static str, value: String, err: &IdentifierError) -> Self {
253 Self::InvalidIdentifier {
254 kind,
255 value,
256 reason: err.to_string(),
257 }
258 }
259}
260
261#[cfg(any(feature = "parser-ruff", test))]
264pub(crate) fn push_unique_task(into: &mut Vec<TaskId>, name: TaskId) {
265 if !into
266 .iter()
267 .any(|existing| existing.as_str() == name.as_str())
268 {
269 into.push(name);
270 }
271}
272
273pub(crate) fn make_dag_id(value: String) -> Result<DagId, ParseError> {
277 DagId::new(value.clone()).map_err(|e| ParseError::from_id_error("dag_id", value, &e))
278}
279
280pub(crate) fn make_task_id(value: String) -> Result<TaskId, ParseError> {
284 TaskId::new(value.clone()).map_err(|e| ParseError::from_id_error("task_id", value, &e))
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn extracted_dag_round_trips_through_json() {
293 let dag = ExtractedDag {
294 dag_id: Some(DagId::new("hello").unwrap()),
295 task_ids: vec![TaskId::new("a").unwrap(), TaskId::new("b").unwrap()],
296 schedule: Some("@daily".into()),
297 has_default_args: true,
298 deps_edges: vec![(TaskId::new("a").unwrap(), TaskId::new("b").unwrap())],
299 source_span: Some(SourceSpan {
300 start_line: 1,
301 end_line: 8,
302 }),
303 };
304 let json = serde_json::to_string(&dag).expect("serialise");
305 let back: ExtractedDag = serde_json::from_str(&json).expect("deserialise");
306 assert_eq!(dag, back);
307 }
308
309 #[test]
310 fn dag_id_rejects_invalid_chars() {
311 let e = DagId::new("has space").expect_err("must reject");
312 assert!(matches!(
313 e,
314 IdentifierError::InvalidCharacter {
315 kind: "dag_id",
316 bad: ' '
317 }
318 ));
319 }
320
321 #[test]
322 fn dag_id_rejects_too_long() {
323 let long = "a".repeat(MAX_IDENTIFIER_LEN + 1);
324 let e = DagId::new(long).expect_err("must reject");
325 assert!(matches!(e, IdentifierError::TooLong { kind: "dag_id", .. }));
326 }
327
328 #[test]
329 fn dag_id_rejects_empty() {
330 let e = DagId::new("").expect_err("must reject");
331 assert!(matches!(e, IdentifierError::Empty { kind: "dag_id" }));
332 }
333
334 #[test]
335 fn task_id_accepts_dotted_dashed_underscored() {
336 for ok in &["a", "a.b", "a-b", "a_b", "a.b-c_d.0"] {
337 TaskId::new(*ok).unwrap_or_else(|_| panic!("must accept {ok:?}"));
338 }
339 }
340
341 #[test]
342 fn dag_id_displays_inner() {
343 let id = DagId::new("hello").unwrap();
344 assert_eq!(id.to_string(), "hello");
345 assert_eq!(id.as_str(), "hello");
346 }
347
348 #[test]
349 fn dag_id_from_str_round_trips() {
350 let parsed: DagId = "ok".parse().unwrap();
351 assert_eq!(parsed.as_str(), "ok");
352 }
353
354 #[test]
355 fn task_id_try_from_string_round_trips() {
356 let s = String::from("foo");
357 let id = TaskId::try_from(s).unwrap();
358 assert_eq!(id.as_str(), "foo");
359 }
360
361 #[test]
362 fn make_dag_id_wraps_validation_error() {
363 let e = make_dag_id("has space".into()).expect_err("must reject");
364 match e {
365 ParseError::InvalidIdentifier { kind, value, .. } => {
366 assert_eq!(kind, "dag_id");
367 assert_eq!(value, "has space");
368 }
369 other => panic!("expected InvalidIdentifier, got {other:?}"),
370 }
371 }
372
373 #[test]
374 fn push_unique_task_keeps_first_seen_order() {
375 let mut v = Vec::<TaskId>::new();
376 push_unique_task(&mut v, TaskId::new("x").unwrap());
377 push_unique_task(&mut v, TaskId::new("y").unwrap());
378 push_unique_task(&mut v, TaskId::new("x").unwrap());
379 assert_eq!(v.len(), 2);
380 assert_eq!(v[0].as_str(), "x");
381 assert_eq!(v[1].as_str(), "y");
382 }
383}