Skip to main content

ecl_pipeline_topo/
lib.rs

1//! Pipeline topology, resource graph, and core traits for the ECL pipeline runner.
2//!
3//! This crate defines:
4//! - The resolved pipeline topology (`PipelineTopology`, `ResolvedStage`)
5//! - Resource graph computation and parallel schedule derivation
6//! - Core traits (`SourceAdapter`, `Stage`) and their supporting types
7//!   (`PipelineItem`, `SourceItem`, `ExtractedDocument`, `StageContext`)
8//!
9//! The topology is computed from a `PipelineSpec` (from `ecl-pipeline-spec`)
10//! at init time and is immutable during execution.
11
12#![forbid(unsafe_code)]
13#![warn(missing_docs)]
14#![deny(clippy::unwrap_used)]
15#![warn(clippy::expect_used)]
16#![deny(clippy::panic)]
17
18pub mod error;
19pub mod resolve;
20pub mod resource_graph;
21pub mod schedule;
22pub mod traits;
23
24pub use error::{ResolveError, ResolveResult, SourceError, StageError};
25pub use traits::{
26    ExtractedDocument, PipelineItem, PushSourceAdapter, Record, SourceAdapter, SourceItem, Stage,
27    StageContext,
28};
29
30use std::collections::BTreeMap;
31use std::path::PathBuf;
32use std::sync::Arc;
33use std::time::Duration;
34
35use serde::{Deserialize, Serialize};
36
37use ecl_pipeline_spec::PipelineSpec;
38use ecl_pipeline_state::{Blake3Hash, StageId};
39
40/// The resolved pipeline, ready to execute.
41/// Computed from `PipelineSpec` at init time. Immutable during execution.
42#[derive(Debug, Clone)]
43pub struct PipelineTopology {
44    /// The original spec, preserved for checkpoint embedding.
45    pub spec: Arc<PipelineSpec>,
46
47    /// Blake3 hash of the serialized spec, for detecting config drift
48    /// between a checkpoint and the current TOML file.
49    pub spec_hash: Blake3Hash,
50
51    /// Resolved source adapters, keyed by source name from the spec.
52    pub sources: BTreeMap<String, Arc<dyn SourceAdapter>>,
53
54    /// Resolved push-based source adapters, keyed by source name from the spec.
55    /// Push sources receive data via external events (webhooks, etc.) rather
56    /// than polling. Separated from `sources` to avoid disrupting the existing
57    /// pull-based pipeline flow.
58    pub push_sources: BTreeMap<String, Arc<dyn PushSourceAdapter>>,
59
60    /// Resolved stage implementations, keyed by stage name from the spec.
61    pub stages: BTreeMap<String, ResolvedStage>,
62
63    /// The computed execution schedule: batches of parallel stages.
64    /// Each inner `Vec` contains stages that can run concurrently.
65    pub schedule: Vec<Vec<StageId>>,
66
67    /// Resolved output directory (created if needed at init).
68    pub output_dir: PathBuf,
69}
70
71/// A resolved stage: the concrete implementation with merged configuration.
72#[derive(Debug, Clone)]
73pub struct ResolvedStage {
74    /// Name from the spec (stable across runs for checkpointing).
75    pub id: StageId,
76
77    /// The concrete stage implementation.
78    pub handler: Arc<dyn Stage>,
79
80    /// Resolved retry policy (stage override merged with global default).
81    pub retry: RetryPolicy,
82
83    /// Skip-on-error behavior for item-level failures.
84    pub skip_on_error: bool,
85
86    /// Timeout for stage execution.
87    pub timeout: Option<Duration>,
88
89    /// Which source this stage operates on (for extract stages).
90    pub source: Option<String>,
91
92    /// Condition predicate (`None` = always run).
93    pub condition: Option<ConditionExpr>,
94}
95
96/// Retry policy with resolved, concrete values.
97/// Unlike `RetrySpec` (from ecl-pipeline-spec) which stores milliseconds as
98/// `u64`, this stores `Duration` values — the resolved form ready for use
99/// by the runner.
100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct RetryPolicy {
102    /// Total attempts (1 = no retry).
103    pub max_attempts: u32,
104    /// Initial backoff duration before the first retry.
105    pub initial_backoff: Duration,
106    /// Multiplier applied to backoff after each attempt.
107    pub backoff_multiplier: f64,
108    /// Maximum backoff duration (caps exponential growth).
109    pub max_backoff: Duration,
110}
111
112impl RetryPolicy {
113    /// Create a `RetryPolicy` from a `RetrySpec` (millisecond-based config form).
114    pub fn from_spec(spec: &ecl_pipeline_spec::RetrySpec) -> Self {
115        Self {
116            max_attempts: spec.max_attempts,
117            initial_backoff: Duration::from_millis(spec.initial_backoff_ms),
118            backoff_multiplier: spec.backoff_multiplier,
119            max_backoff: Duration::from_millis(spec.max_backoff_ms),
120        }
121    }
122}
123
124impl Default for RetryPolicy {
125    fn default() -> Self {
126        Self {
127            max_attempts: 3,
128            initial_backoff: Duration::from_millis(1000),
129            backoff_multiplier: 2.0,
130            max_backoff: Duration::from_millis(30_000),
131        }
132    }
133}
134
135/// A condition expression that determines whether a stage should run.
136/// Currently a simple string wrapper; the evaluator is deferred to a
137/// future milestone.
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct ConditionExpr(String);
140
141impl ConditionExpr {
142    /// Create a new condition expression from a string.
143    pub fn new(expr: impl Into<String>) -> Self {
144        Self(expr.into())
145    }
146
147    /// Get the expression string.
148    pub fn as_str(&self) -> &str {
149        &self.0
150    }
151}
152
153impl std::fmt::Display for ConditionExpr {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.write_str(&self.0)
156    }
157}
158
159#[cfg(test)]
160#[allow(clippy::unwrap_used)]
161mod tests {
162    use super::*;
163    use async_trait::async_trait;
164    use ecl_pipeline_spec::RetrySpec;
165
166    #[test]
167    fn test_condition_expr_new_and_as_str() {
168        let expr = ConditionExpr::new("x > 1");
169        assert_eq!(expr.as_str(), "x > 1");
170    }
171
172    #[test]
173    fn test_condition_expr_display() {
174        let expr = ConditionExpr::new("x > 1");
175        assert_eq!(format!("{expr}"), "x > 1");
176    }
177
178    #[test]
179    fn test_condition_expr_serde_roundtrip() {
180        let expr = ConditionExpr::new("items.count > 0");
181        let json = serde_json::to_string(&expr).unwrap();
182        let deserialized: ConditionExpr = serde_json::from_str(&json).unwrap();
183        assert_eq!(expr, deserialized);
184    }
185
186    #[test]
187    fn test_retry_policy_default_values() {
188        let policy = RetryPolicy::default();
189        assert_eq!(policy.max_attempts, 3);
190        assert_eq!(policy.initial_backoff, Duration::from_millis(1000));
191        assert!((policy.backoff_multiplier - 2.0).abs() < f64::EPSILON);
192        assert_eq!(policy.max_backoff, Duration::from_millis(30_000));
193    }
194
195    #[test]
196    fn test_retry_policy_from_spec() {
197        let spec = RetrySpec {
198            max_attempts: 5,
199            initial_backoff_ms: 500,
200            backoff_multiplier: 1.5,
201            max_backoff_ms: 10_000,
202        };
203        let policy = RetryPolicy::from_spec(&spec);
204        assert_eq!(policy.max_attempts, 5);
205        assert_eq!(policy.initial_backoff, Duration::from_millis(500));
206        assert!((policy.backoff_multiplier - 1.5).abs() < f64::EPSILON);
207        assert_eq!(policy.max_backoff, Duration::from_millis(10_000));
208    }
209
210    #[test]
211    fn test_retry_policy_serde_roundtrip() {
212        let policy = RetryPolicy {
213            max_attempts: 4,
214            initial_backoff: Duration::from_millis(2000),
215            backoff_multiplier: 3.0,
216            max_backoff: Duration::from_millis(60_000),
217        };
218        let json = serde_json::to_string(&policy).unwrap();
219        let deserialized: RetryPolicy = serde_json::from_str(&json).unwrap();
220        assert_eq!(policy, deserialized);
221    }
222
223    // Mock types for structural tests
224
225    #[derive(Debug)]
226    struct MockSourceAdapter;
227
228    #[async_trait]
229    impl SourceAdapter for MockSourceAdapter {
230        fn source_kind(&self) -> &str {
231            "mock"
232        }
233        async fn enumerate(&self) -> Result<Vec<SourceItem>, SourceError> {
234            Ok(vec![])
235        }
236        async fn fetch(&self, _item: &SourceItem) -> Result<ExtractedDocument, SourceError> {
237            Err(SourceError::NotFound {
238                source_name: "mock".to_string(),
239                item_id: "none".to_string(),
240            })
241        }
242    }
243
244    #[derive(Debug)]
245    struct MockStage;
246
247    #[async_trait]
248    impl Stage for MockStage {
249        fn name(&self) -> &str {
250            "mock"
251        }
252        async fn process(
253            &self,
254            item: PipelineItem,
255            _ctx: &StageContext,
256        ) -> Result<Vec<PipelineItem>, StageError> {
257            Ok(vec![item])
258        }
259    }
260
261    #[test]
262    fn test_pipeline_topology_has_expected_fields() {
263        let spec = Arc::new(
264            PipelineSpec::from_toml(
265                r#"
266name = "test"
267version = 1
268output_dir = "./out"
269
270[sources.local]
271kind = "filesystem"
272root = "/tmp"
273
274[stages.extract]
275adapter = "extract"
276source = "local"
277resources = { creates = ["docs"] }
278"#,
279            )
280            .unwrap(),
281        );
282
283        let mut sources: BTreeMap<String, Arc<dyn SourceAdapter>> = BTreeMap::new();
284        sources.insert("local".to_string(), Arc::new(MockSourceAdapter));
285
286        let mut stages_map = BTreeMap::new();
287        stages_map.insert(
288            "extract".to_string(),
289            ResolvedStage {
290                id: StageId::new("extract"),
291                handler: Arc::new(MockStage),
292                retry: RetryPolicy::default(),
293                skip_on_error: false,
294                timeout: Some(Duration::from_secs(300)),
295                source: Some("local".to_string()),
296                condition: None,
297            },
298        );
299
300        let topo = PipelineTopology {
301            spec: spec.clone(),
302            spec_hash: Blake3Hash::new("abc123"),
303            sources,
304            push_sources: BTreeMap::new(),
305            stages: stages_map,
306            schedule: vec![vec![StageId::new("extract")]],
307            output_dir: PathBuf::from("./out"),
308        };
309
310        assert_eq!(topo.spec.name, "test");
311        assert_eq!(topo.schedule.len(), 1);
312        assert_eq!(topo.sources.len(), 1);
313        assert_eq!(topo.stages.len(), 1);
314    }
315
316    #[test]
317    fn test_resolved_stage_has_expected_fields() {
318        let stage = ResolvedStage {
319            id: StageId::new("normalize"),
320            handler: Arc::new(MockStage),
321            retry: RetryPolicy::default(),
322            skip_on_error: true,
323            timeout: Some(Duration::from_secs(60)),
324            source: Some("gdrive".to_string()),
325            condition: Some(ConditionExpr::new("items.count > 0")),
326        };
327
328        assert_eq!(stage.id.as_str(), "normalize");
329        assert!(stage.skip_on_error);
330        assert_eq!(stage.timeout, Some(Duration::from_secs(60)));
331        assert_eq!(stage.source, Some("gdrive".to_string()));
332        assert_eq!(stage.condition, Some(ConditionExpr::new("items.count > 0")));
333    }
334
335    #[tokio::test]
336    async fn test_mock_source_adapter_methods() {
337        let adapter: Arc<dyn SourceAdapter> = Arc::new(MockSourceAdapter);
338        assert_eq!(adapter.source_kind(), "mock");
339        let items = adapter.enumerate().await.unwrap();
340        assert!(items.is_empty());
341    }
342
343    #[tokio::test]
344    async fn test_mock_stage_process() {
345        use ecl_pipeline_state::ItemProvenance;
346        let stage: Arc<dyn Stage> = Arc::new(MockStage);
347        assert_eq!(stage.name(), "mock");
348
349        let item = PipelineItem {
350            id: "test-item".to_string(),
351            display_name: "test".to_string(),
352            content: Arc::from(b"data" as &[u8]),
353            mime_type: "text/plain".to_string(),
354            source_name: "local".to_string(),
355            source_content_hash: Blake3Hash::new("aabb"),
356            provenance: ItemProvenance {
357                source_kind: "filesystem".to_string(),
358                metadata: BTreeMap::new(),
359                source_modified: None,
360                extracted_at: chrono::Utc::now(),
361            },
362            metadata: BTreeMap::new(),
363            record: None,
364            stream: None,
365        };
366
367        let ctx = StageContext {
368            spec: Arc::new(
369                PipelineSpec::from_toml(
370                    r#"
371name = "test"
372version = 1
373output_dir = "./out"
374
375[sources.local]
376kind = "filesystem"
377root = "/tmp"
378
379[stages.extract]
380adapter = "extract"
381source = "local"
382resources = { creates = ["docs"] }
383"#,
384                )
385                .unwrap(),
386            ),
387            output_dir: PathBuf::from("./output"),
388            params: serde_json::Value::Null,
389            span: tracing::Span::none(),
390        };
391
392        let result = stage.process(item, &ctx).await.unwrap();
393        assert_eq!(result.len(), 1);
394        assert_eq!(result[0].id, "test-item");
395    }
396}
397
398#[cfg(test)]
399#[allow(clippy::unwrap_used)]
400mod proptests {
401    use super::*;
402    use proptest::prelude::*;
403
404    proptest! {
405        #[test]
406        fn test_retry_policy_proptest_roundtrip(
407            max_attempts in 1..100u32,
408            initial_ms in 1..100_000u64,
409            multiplier in 1.0..10.0f64,
410            max_ms in 1..1_000_000u64,
411        ) {
412            let policy = RetryPolicy {
413                max_attempts,
414                initial_backoff: Duration::from_millis(initial_ms),
415                backoff_multiplier: multiplier,
416                max_backoff: Duration::from_millis(max_ms),
417            };
418            let json = serde_json::to_string(&policy).unwrap();
419            let deserialized: RetryPolicy = serde_json::from_str(&json).unwrap();
420            // Compare fields individually due to f64 precision
421            prop_assert_eq!(policy.max_attempts, deserialized.max_attempts);
422            prop_assert_eq!(policy.initial_backoff, deserialized.initial_backoff);
423            prop_assert_eq!(policy.max_backoff, deserialized.max_backoff);
424            prop_assert!((policy.backoff_multiplier - deserialized.backoff_multiplier).abs() < 1e-10);
425        }
426
427        #[test]
428        fn test_condition_expr_proptest_roundtrip(s in "\\PC{1,100}") {
429            let expr = ConditionExpr::new(s);
430            let json = serde_json::to_string(&expr).unwrap();
431            let deserialized: ConditionExpr = serde_json::from_str(&json).unwrap();
432            prop_assert_eq!(expr, deserialized);
433        }
434    }
435}