Skip to main content

tl_stream/
pipeline.rs

1// ThinkingLanguage — Pipeline definition and execution
2
3use chrono::Utc;
4use std::fmt;
5
6/// Definition of a pipeline (ETL or general).
7#[derive(Debug, Clone)]
8pub struct PipelineDef {
9    pub name: String,
10    pub schedule: Option<String>,
11    pub timeout_ms: Option<u64>,
12    pub retries: u32,
13}
14
15impl fmt::Display for PipelineDef {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        write!(f, "<pipeline {}>", self.name)
18    }
19}
20
21/// Status of a pipeline execution.
22#[derive(Debug, Clone, PartialEq)]
23pub enum PipelineStatus {
24    Running,
25    Success,
26    Failed(String),
27    TimedOut,
28}
29
30impl fmt::Display for PipelineStatus {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            PipelineStatus::Running => write!(f, "running"),
34            PipelineStatus::Success => write!(f, "success"),
35            PipelineStatus::Failed(msg) => write!(f, "failed: {msg}"),
36            PipelineStatus::TimedOut => write!(f, "timed_out"),
37        }
38    }
39}
40
41/// Result of a pipeline execution.
42#[derive(Debug, Clone)]
43pub struct PipelineResult {
44    pub name: String,
45    pub status: PipelineStatus,
46    pub started_at: String,
47    pub ended_at: String,
48    pub rows_processed: u64,
49    pub attempts: u32,
50}
51
52impl fmt::Display for PipelineResult {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        write!(
55            f,
56            "Pipeline '{}': {} (rows: {}, attempts: {}, duration: {} → {})",
57            self.name,
58            self.status,
59            self.rows_processed,
60            self.attempts,
61            self.started_at,
62            self.ended_at
63        )
64    }
65}
66
67/// Pipeline runner that executes extract/transform/load with retry logic.
68pub struct PipelineRunner {
69    pub def: PipelineDef,
70}
71
72impl PipelineRunner {
73    pub fn new(def: PipelineDef) -> Self {
74        PipelineRunner { def }
75    }
76
77    /// Execute the pipeline with retry logic.
78    /// `run_fn` is called for each attempt — it should return Ok(rows) or Err(message).
79    pub fn execute<F>(&self, mut run_fn: F) -> PipelineResult
80    where
81        F: FnMut(u32) -> Result<u64, String>,
82    {
83        let started_at = Utc::now().to_rfc3339();
84        let max_attempts = self.def.retries + 1;
85
86        for attempt in 1..=max_attempts {
87            match run_fn(attempt) {
88                Ok(rows) => {
89                    return PipelineResult {
90                        name: self.def.name.clone(),
91                        status: PipelineStatus::Success,
92                        started_at: started_at.clone(),
93                        ended_at: Utc::now().to_rfc3339(),
94                        rows_processed: rows,
95                        attempts: attempt,
96                    };
97                }
98                Err(msg) => {
99                    if attempt == max_attempts {
100                        return PipelineResult {
101                            name: self.def.name.clone(),
102                            status: PipelineStatus::Failed(msg),
103                            started_at: started_at.clone(),
104                            ended_at: Utc::now().to_rfc3339(),
105                            rows_processed: 0,
106                            attempts: attempt,
107                        };
108                    }
109                    // retry
110                }
111            }
112        }
113
114        // Should not reach here, but just in case
115        PipelineResult {
116            name: self.def.name.clone(),
117            status: PipelineStatus::Failed("exhausted retries".to_string()),
118            started_at,
119            ended_at: Utc::now().to_rfc3339(),
120            rows_processed: 0,
121            attempts: max_attempts,
122        }
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn test_pipeline_success() {
132        let def = PipelineDef {
133            name: "test_etl".to_string(),
134            schedule: None,
135            timeout_ms: None,
136            retries: 0,
137        };
138        let runner = PipelineRunner::new(def);
139        let result = runner.execute(|_| Ok(100));
140        assert_eq!(result.status, PipelineStatus::Success);
141        assert_eq!(result.rows_processed, 100);
142        assert_eq!(result.attempts, 1);
143    }
144
145    #[test]
146    fn test_pipeline_failure() {
147        let def = PipelineDef {
148            name: "failing".to_string(),
149            schedule: None,
150            timeout_ms: None,
151            retries: 0,
152        };
153        let runner = PipelineRunner::new(def);
154        let result = runner.execute(|_| Err("boom".to_string()));
155        assert!(matches!(result.status, PipelineStatus::Failed(ref m) if m == "boom"));
156        assert_eq!(result.attempts, 1);
157    }
158
159    #[test]
160    fn test_pipeline_retry_then_success() {
161        let def = PipelineDef {
162            name: "retry_me".to_string(),
163            schedule: None,
164            timeout_ms: None,
165            retries: 2,
166        };
167        let runner = PipelineRunner::new(def);
168        let result = runner.execute(|attempt| {
169            if attempt < 3 {
170                Err("not yet".to_string())
171            } else {
172                Ok(50)
173            }
174        });
175        assert_eq!(result.status, PipelineStatus::Success);
176        assert_eq!(result.rows_processed, 50);
177        assert_eq!(result.attempts, 3);
178    }
179
180    #[test]
181    fn test_pipeline_retry_exhausted() {
182        let def = PipelineDef {
183            name: "always_fail".to_string(),
184            schedule: None,
185            timeout_ms: None,
186            retries: 2,
187        };
188        let runner = PipelineRunner::new(def);
189        let result = runner.execute(|_| Err("always fails".to_string()));
190        assert!(matches!(result.status, PipelineStatus::Failed(_)));
191        assert_eq!(result.attempts, 3);
192    }
193
194    #[test]
195    fn test_pipeline_result_display() {
196        let result = PipelineResult {
197            name: "test".to_string(),
198            status: PipelineStatus::Success,
199            started_at: "2024-01-01T00:00:00Z".to_string(),
200            ended_at: "2024-01-01T00:01:00Z".to_string(),
201            rows_processed: 1000,
202            attempts: 1,
203        };
204        let s = format!("{result}");
205        assert!(s.contains("test"));
206        assert!(s.contains("success"));
207        assert!(s.contains("1000"));
208    }
209}