1#![forbid(unsafe_code)]
13#![warn(missing_docs)]
14#![deny(clippy::unwrap_used)]
15#![warn(clippy::expect_used)]
16#![deny(clippy::panic)]
17
18pub mod error;
19pub mod resolve;
20pub mod resource_graph;
21pub mod schedule;
22pub mod traits;
23
24pub use error::{ResolveError, ResolveResult, SourceError, StageError};
25pub use traits::{
26 ExtractedDocument, PipelineItem, PushSourceAdapter, Record, SourceAdapter, SourceItem, Stage,
27 StageContext,
28};
29
30use std::collections::BTreeMap;
31use std::path::PathBuf;
32use std::sync::Arc;
33use std::time::Duration;
34
35use serde::{Deserialize, Serialize};
36
37use ecl_pipeline_spec::PipelineSpec;
38use ecl_pipeline_state::{Blake3Hash, StageId};
39
40#[derive(Debug, Clone)]
43pub struct PipelineTopology {
44 pub spec: Arc<PipelineSpec>,
46
47 pub spec_hash: Blake3Hash,
50
51 pub sources: BTreeMap<String, Arc<dyn SourceAdapter>>,
53
54 pub push_sources: BTreeMap<String, Arc<dyn PushSourceAdapter>>,
59
60 pub stages: BTreeMap<String, ResolvedStage>,
62
63 pub schedule: Vec<Vec<StageId>>,
66
67 pub output_dir: PathBuf,
69}
70
71#[derive(Debug, Clone)]
73pub struct ResolvedStage {
74 pub id: StageId,
76
77 pub handler: Arc<dyn Stage>,
79
80 pub retry: RetryPolicy,
82
83 pub skip_on_error: bool,
85
86 pub timeout: Option<Duration>,
88
89 pub source: Option<String>,
91
92 pub condition: Option<ConditionExpr>,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct RetryPolicy {
102 pub max_attempts: u32,
104 pub initial_backoff: Duration,
106 pub backoff_multiplier: f64,
108 pub max_backoff: Duration,
110}
111
112impl RetryPolicy {
113 pub fn from_spec(spec: &ecl_pipeline_spec::RetrySpec) -> Self {
115 Self {
116 max_attempts: spec.max_attempts,
117 initial_backoff: Duration::from_millis(spec.initial_backoff_ms),
118 backoff_multiplier: spec.backoff_multiplier,
119 max_backoff: Duration::from_millis(spec.max_backoff_ms),
120 }
121 }
122}
123
124impl Default for RetryPolicy {
125 fn default() -> Self {
126 Self {
127 max_attempts: 3,
128 initial_backoff: Duration::from_millis(1000),
129 backoff_multiplier: 2.0,
130 max_backoff: Duration::from_millis(30_000),
131 }
132 }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct ConditionExpr(String);
140
141impl ConditionExpr {
142 pub fn new(expr: impl Into<String>) -> Self {
144 Self(expr.into())
145 }
146
147 pub fn as_str(&self) -> &str {
149 &self.0
150 }
151}
152
153impl std::fmt::Display for ConditionExpr {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 f.write_str(&self.0)
156 }
157}
158
159#[cfg(test)]
160#[allow(clippy::unwrap_used)]
161mod tests {
162 use super::*;
163 use async_trait::async_trait;
164 use ecl_pipeline_spec::RetrySpec;
165
166 #[test]
167 fn test_condition_expr_new_and_as_str() {
168 let expr = ConditionExpr::new("x > 1");
169 assert_eq!(expr.as_str(), "x > 1");
170 }
171
172 #[test]
173 fn test_condition_expr_display() {
174 let expr = ConditionExpr::new("x > 1");
175 assert_eq!(format!("{expr}"), "x > 1");
176 }
177
178 #[test]
179 fn test_condition_expr_serde_roundtrip() {
180 let expr = ConditionExpr::new("items.count > 0");
181 let json = serde_json::to_string(&expr).unwrap();
182 let deserialized: ConditionExpr = serde_json::from_str(&json).unwrap();
183 assert_eq!(expr, deserialized);
184 }
185
186 #[test]
187 fn test_retry_policy_default_values() {
188 let policy = RetryPolicy::default();
189 assert_eq!(policy.max_attempts, 3);
190 assert_eq!(policy.initial_backoff, Duration::from_millis(1000));
191 assert!((policy.backoff_multiplier - 2.0).abs() < f64::EPSILON);
192 assert_eq!(policy.max_backoff, Duration::from_millis(30_000));
193 }
194
195 #[test]
196 fn test_retry_policy_from_spec() {
197 let spec = RetrySpec {
198 max_attempts: 5,
199 initial_backoff_ms: 500,
200 backoff_multiplier: 1.5,
201 max_backoff_ms: 10_000,
202 };
203 let policy = RetryPolicy::from_spec(&spec);
204 assert_eq!(policy.max_attempts, 5);
205 assert_eq!(policy.initial_backoff, Duration::from_millis(500));
206 assert!((policy.backoff_multiplier - 1.5).abs() < f64::EPSILON);
207 assert_eq!(policy.max_backoff, Duration::from_millis(10_000));
208 }
209
210 #[test]
211 fn test_retry_policy_serde_roundtrip() {
212 let policy = RetryPolicy {
213 max_attempts: 4,
214 initial_backoff: Duration::from_millis(2000),
215 backoff_multiplier: 3.0,
216 max_backoff: Duration::from_millis(60_000),
217 };
218 let json = serde_json::to_string(&policy).unwrap();
219 let deserialized: RetryPolicy = serde_json::from_str(&json).unwrap();
220 assert_eq!(policy, deserialized);
221 }
222
223 #[derive(Debug)]
226 struct MockSourceAdapter;
227
228 #[async_trait]
229 impl SourceAdapter for MockSourceAdapter {
230 fn source_kind(&self) -> &str {
231 "mock"
232 }
233 async fn enumerate(&self) -> Result<Vec<SourceItem>, SourceError> {
234 Ok(vec![])
235 }
236 async fn fetch(&self, _item: &SourceItem) -> Result<ExtractedDocument, SourceError> {
237 Err(SourceError::NotFound {
238 source_name: "mock".to_string(),
239 item_id: "none".to_string(),
240 })
241 }
242 }
243
244 #[derive(Debug)]
245 struct MockStage;
246
247 #[async_trait]
248 impl Stage for MockStage {
249 fn name(&self) -> &str {
250 "mock"
251 }
252 async fn process(
253 &self,
254 item: PipelineItem,
255 _ctx: &StageContext,
256 ) -> Result<Vec<PipelineItem>, StageError> {
257 Ok(vec![item])
258 }
259 }
260
261 #[test]
262 fn test_pipeline_topology_has_expected_fields() {
263 let spec = Arc::new(
264 PipelineSpec::from_toml(
265 r#"
266name = "test"
267version = 1
268output_dir = "./out"
269
270[sources.local]
271kind = "filesystem"
272root = "/tmp"
273
274[stages.extract]
275adapter = "extract"
276source = "local"
277resources = { creates = ["docs"] }
278"#,
279 )
280 .unwrap(),
281 );
282
283 let mut sources: BTreeMap<String, Arc<dyn SourceAdapter>> = BTreeMap::new();
284 sources.insert("local".to_string(), Arc::new(MockSourceAdapter));
285
286 let mut stages_map = BTreeMap::new();
287 stages_map.insert(
288 "extract".to_string(),
289 ResolvedStage {
290 id: StageId::new("extract"),
291 handler: Arc::new(MockStage),
292 retry: RetryPolicy::default(),
293 skip_on_error: false,
294 timeout: Some(Duration::from_secs(300)),
295 source: Some("local".to_string()),
296 condition: None,
297 },
298 );
299
300 let topo = PipelineTopology {
301 spec: spec.clone(),
302 spec_hash: Blake3Hash::new("abc123"),
303 sources,
304 push_sources: BTreeMap::new(),
305 stages: stages_map,
306 schedule: vec![vec![StageId::new("extract")]],
307 output_dir: PathBuf::from("./out"),
308 };
309
310 assert_eq!(topo.spec.name, "test");
311 assert_eq!(topo.schedule.len(), 1);
312 assert_eq!(topo.sources.len(), 1);
313 assert_eq!(topo.stages.len(), 1);
314 }
315
316 #[test]
317 fn test_resolved_stage_has_expected_fields() {
318 let stage = ResolvedStage {
319 id: StageId::new("normalize"),
320 handler: Arc::new(MockStage),
321 retry: RetryPolicy::default(),
322 skip_on_error: true,
323 timeout: Some(Duration::from_secs(60)),
324 source: Some("gdrive".to_string()),
325 condition: Some(ConditionExpr::new("items.count > 0")),
326 };
327
328 assert_eq!(stage.id.as_str(), "normalize");
329 assert!(stage.skip_on_error);
330 assert_eq!(stage.timeout, Some(Duration::from_secs(60)));
331 assert_eq!(stage.source, Some("gdrive".to_string()));
332 assert_eq!(stage.condition, Some(ConditionExpr::new("items.count > 0")));
333 }
334
335 #[tokio::test]
336 async fn test_mock_source_adapter_methods() {
337 let adapter: Arc<dyn SourceAdapter> = Arc::new(MockSourceAdapter);
338 assert_eq!(adapter.source_kind(), "mock");
339 let items = adapter.enumerate().await.unwrap();
340 assert!(items.is_empty());
341 }
342
343 #[tokio::test]
344 async fn test_mock_stage_process() {
345 use ecl_pipeline_state::ItemProvenance;
346 let stage: Arc<dyn Stage> = Arc::new(MockStage);
347 assert_eq!(stage.name(), "mock");
348
349 let item = PipelineItem {
350 id: "test-item".to_string(),
351 display_name: "test".to_string(),
352 content: Arc::from(b"data" as &[u8]),
353 mime_type: "text/plain".to_string(),
354 source_name: "local".to_string(),
355 source_content_hash: Blake3Hash::new("aabb"),
356 provenance: ItemProvenance {
357 source_kind: "filesystem".to_string(),
358 metadata: BTreeMap::new(),
359 source_modified: None,
360 extracted_at: chrono::Utc::now(),
361 },
362 metadata: BTreeMap::new(),
363 record: None,
364 stream: None,
365 };
366
367 let ctx = StageContext {
368 spec: Arc::new(
369 PipelineSpec::from_toml(
370 r#"
371name = "test"
372version = 1
373output_dir = "./out"
374
375[sources.local]
376kind = "filesystem"
377root = "/tmp"
378
379[stages.extract]
380adapter = "extract"
381source = "local"
382resources = { creates = ["docs"] }
383"#,
384 )
385 .unwrap(),
386 ),
387 output_dir: PathBuf::from("./output"),
388 params: serde_json::Value::Null,
389 span: tracing::Span::none(),
390 };
391
392 let result = stage.process(item, &ctx).await.unwrap();
393 assert_eq!(result.len(), 1);
394 assert_eq!(result[0].id, "test-item");
395 }
396}
397
398#[cfg(test)]
399#[allow(clippy::unwrap_used)]
400mod proptests {
401 use super::*;
402 use proptest::prelude::*;
403
404 proptest! {
405 #[test]
406 fn test_retry_policy_proptest_roundtrip(
407 max_attempts in 1..100u32,
408 initial_ms in 1..100_000u64,
409 multiplier in 1.0..10.0f64,
410 max_ms in 1..1_000_000u64,
411 ) {
412 let policy = RetryPolicy {
413 max_attempts,
414 initial_backoff: Duration::from_millis(initial_ms),
415 backoff_multiplier: multiplier,
416 max_backoff: Duration::from_millis(max_ms),
417 };
418 let json = serde_json::to_string(&policy).unwrap();
419 let deserialized: RetryPolicy = serde_json::from_str(&json).unwrap();
420 prop_assert_eq!(policy.max_attempts, deserialized.max_attempts);
422 prop_assert_eq!(policy.initial_backoff, deserialized.initial_backoff);
423 prop_assert_eq!(policy.max_backoff, deserialized.max_backoff);
424 prop_assert!((policy.backoff_multiplier - deserialized.backoff_multiplier).abs() < 1e-10);
425 }
426
427 #[test]
428 fn test_condition_expr_proptest_roundtrip(s in "\\PC{1,100}") {
429 let expr = ConditionExpr::new(s);
430 let json = serde_json::to_string(&expr).unwrap();
431 let deserialized: ConditionExpr = serde_json::from_str(&json).unwrap();
432 prop_assert_eq!(expr, deserialized);
433 }
434 }
435}