1use chrono::Utc;
4use std::fmt;
5
6#[derive(Debug, Clone)]
8pub struct PipelineDef {
9 pub name: String,
10 pub schedule: Option<String>,
11 pub timeout_ms: Option<u64>,
12 pub retries: u32,
13}
14
15impl fmt::Display for PipelineDef {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 write!(f, "<pipeline {}>", self.name)
18 }
19}
20
21#[derive(Debug, Clone, PartialEq)]
23pub enum PipelineStatus {
24 Running,
25 Success,
26 Failed(String),
27 TimedOut,
28}
29
30impl fmt::Display for PipelineStatus {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 PipelineStatus::Running => write!(f, "running"),
34 PipelineStatus::Success => write!(f, "success"),
35 PipelineStatus::Failed(msg) => write!(f, "failed: {msg}"),
36 PipelineStatus::TimedOut => write!(f, "timed_out"),
37 }
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct PipelineResult {
44 pub name: String,
45 pub status: PipelineStatus,
46 pub started_at: String,
47 pub ended_at: String,
48 pub rows_processed: u64,
49 pub attempts: u32,
50}
51
52impl fmt::Display for PipelineResult {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 write!(
55 f,
56 "Pipeline '{}': {} (rows: {}, attempts: {}, duration: {} → {})",
57 self.name,
58 self.status,
59 self.rows_processed,
60 self.attempts,
61 self.started_at,
62 self.ended_at
63 )
64 }
65}
66
67pub struct PipelineRunner {
69 pub def: PipelineDef,
70}
71
72impl PipelineRunner {
73 pub fn new(def: PipelineDef) -> Self {
74 PipelineRunner { def }
75 }
76
77 pub fn execute<F>(&self, mut run_fn: F) -> PipelineResult
80 where
81 F: FnMut(u32) -> Result<u64, String>,
82 {
83 let started_at = Utc::now().to_rfc3339();
84 let max_attempts = self.def.retries + 1;
85
86 for attempt in 1..=max_attempts {
87 match run_fn(attempt) {
88 Ok(rows) => {
89 return PipelineResult {
90 name: self.def.name.clone(),
91 status: PipelineStatus::Success,
92 started_at: started_at.clone(),
93 ended_at: Utc::now().to_rfc3339(),
94 rows_processed: rows,
95 attempts: attempt,
96 };
97 }
98 Err(msg) => {
99 if attempt == max_attempts {
100 return PipelineResult {
101 name: self.def.name.clone(),
102 status: PipelineStatus::Failed(msg),
103 started_at: started_at.clone(),
104 ended_at: Utc::now().to_rfc3339(),
105 rows_processed: 0,
106 attempts: attempt,
107 };
108 }
109 }
111 }
112 }
113
114 PipelineResult {
116 name: self.def.name.clone(),
117 status: PipelineStatus::Failed("exhausted retries".to_string()),
118 started_at,
119 ended_at: Utc::now().to_rfc3339(),
120 rows_processed: 0,
121 attempts: max_attempts,
122 }
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn test_pipeline_success() {
132 let def = PipelineDef {
133 name: "test_etl".to_string(),
134 schedule: None,
135 timeout_ms: None,
136 retries: 0,
137 };
138 let runner = PipelineRunner::new(def);
139 let result = runner.execute(|_| Ok(100));
140 assert_eq!(result.status, PipelineStatus::Success);
141 assert_eq!(result.rows_processed, 100);
142 assert_eq!(result.attempts, 1);
143 }
144
145 #[test]
146 fn test_pipeline_failure() {
147 let def = PipelineDef {
148 name: "failing".to_string(),
149 schedule: None,
150 timeout_ms: None,
151 retries: 0,
152 };
153 let runner = PipelineRunner::new(def);
154 let result = runner.execute(|_| Err("boom".to_string()));
155 assert!(matches!(result.status, PipelineStatus::Failed(ref m) if m == "boom"));
156 assert_eq!(result.attempts, 1);
157 }
158
159 #[test]
160 fn test_pipeline_retry_then_success() {
161 let def = PipelineDef {
162 name: "retry_me".to_string(),
163 schedule: None,
164 timeout_ms: None,
165 retries: 2,
166 };
167 let runner = PipelineRunner::new(def);
168 let result = runner.execute(|attempt| {
169 if attempt < 3 {
170 Err("not yet".to_string())
171 } else {
172 Ok(50)
173 }
174 });
175 assert_eq!(result.status, PipelineStatus::Success);
176 assert_eq!(result.rows_processed, 50);
177 assert_eq!(result.attempts, 3);
178 }
179
180 #[test]
181 fn test_pipeline_retry_exhausted() {
182 let def = PipelineDef {
183 name: "always_fail".to_string(),
184 schedule: None,
185 timeout_ms: None,
186 retries: 2,
187 };
188 let runner = PipelineRunner::new(def);
189 let result = runner.execute(|_| Err("always fails".to_string()));
190 assert!(matches!(result.status, PipelineStatus::Failed(_)));
191 assert_eq!(result.attempts, 3);
192 }
193
194 #[test]
195 fn test_pipeline_result_display() {
196 let result = PipelineResult {
197 name: "test".to_string(),
198 status: PipelineStatus::Success,
199 started_at: "2024-01-01T00:00:00Z".to_string(),
200 ended_at: "2024-01-01T00:01:00Z".to_string(),
201 rows_processed: 1000,
202 attempts: 1,
203 };
204 let s = format!("{result}");
205 assert!(s.contains("test"));
206 assert!(s.contains("success"));
207 assert!(s.contains("1000"));
208 }
209}