Skip to main content

oxigdal_etl/
lib.rs

1//! OxiGDAL ETL - Streaming ETL framework for geospatial data processing
2//!
3//! This crate provides a comprehensive ETL (Extract, Transform, Load) framework
4//! for continuous geospatial data processing with OxiGDAL.
5//!
6//! # Features
7//!
8//! - **Async Streaming**: Built on tokio for high-performance async I/O
9//! - **Backpressure Handling**: Automatic backpressure management
10//! - **Error Recovery**: Configurable error handling and retry logic
11//! - **Checkpointing**: State persistence for fault tolerance
12//! - **Monitoring**: Built-in metrics and logging
13//! - **Resource Limits**: Control parallelism and memory usage
14//!
15//! # Architecture
16//!
17//! The ETL framework consists of several key components:
18//!
19//! - **Sources**: Data inputs (files, HTTP, S3, STAC, Kafka, PostGIS)
20//! - **Transforms**: Data transformations (map, filter, window, join, etc.)
21//! - **Sinks**: Data outputs (files, S3, PostGIS, Kafka)
22//! - **Pipeline**: Fluent API for composing ETL workflows
23//! - **Stream**: Async stream processing with backpressure
24//! - **Scheduler**: Task scheduling and execution
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! use oxigdal_etl::*;
30//! use oxigdal_etl::source::FileSource;
31//! use oxigdal_etl::sink::FileSink;
32//! use oxigdal_etl::error::TransformError;
33//! use std::path::PathBuf;
34//!
35//! # async fn example() -> Result<()> {
36//! // Build ETL pipeline
37//! let pipeline = Pipeline::builder()
38//!     .source(Box::new(FileSource::new(PathBuf::from("input.json"))))
39//!     .map("uppercase".to_string(), |item| {
40//!         Box::pin(async move {
41//!             let s = String::from_utf8(item).map_err(|e| {
42//!                 TransformError::InvalidInput {
43//!                     message: e.to_string(),
44//!                 }
45//!             })?;
46//!             Ok(s.to_uppercase().into_bytes())
47//!         })
48//!     })
49//!     .filter("non_empty".to_string(), |item| {
50//!         let is_empty = item.is_empty();
51//!         Box::pin(async move { Ok(!is_empty) })
52//!     })
53//!     .sink(Box::new(FileSink::new(PathBuf::from("output.json"))))
54//!     .with_checkpointing()
55//!     .buffer_size(1000)
56//!     .build()?;
57//!
58//! // Execute pipeline
59//! let stats = pipeline.run().await?;
60//! println!("Processed {} items", stats.items_processed());
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! # Feature Flags
66//!
67//! - `std` (default): Enable standard library support
68//! - `kafka`: Enable Kafka source and sink
69//! - `postgres`: Enable PostgreSQL/PostGIS support
70//! - `s3`: Enable Amazon S3 support
71//! - `stac`: Enable STAC catalog support
72//! - `http`: Enable HTTP source support
73//! - `scheduler`: Enable cron-based scheduling
74//! - `all`: Enable all optional features
75
76#![warn(missing_docs)]
77#![cfg_attr(not(feature = "std"), no_std)]
78
79// Re-export key types
80pub use error::{EtlError, Result};
81pub use pipeline::{ExecutionMode, Pipeline, PipelineBuilder, PipelineConfig, PipelineStats};
82pub use scheduler::{Schedule, Scheduler, TaskConfig, TaskResult};
83pub use sink::Sink;
84pub use source::Source;
85pub use stream::{
86    BoxStream, BufferedStream, ParallelProcessor, StateManager, StreamConfig, StreamItem,
87    StreamProcessor,
88};
89pub use transform::Transform;
90
91// Modules
92pub mod error;
93pub mod operators;
94pub mod pipeline;
95pub mod scheduler;
96pub mod sink;
97pub mod source;
98pub mod stream;
99pub mod transform;
100
101/// Prelude module for convenient imports
102pub mod prelude {
103    pub use crate::error::{EtlError, Result};
104    pub use crate::operators::{aggregate::*, filter::*, join::*, map::*, window::*};
105    pub use crate::pipeline::{ExecutionMode, Pipeline, PipelineBuilder};
106    pub use crate::scheduler::{Schedule, Scheduler, TaskConfig};
107    pub use crate::sink::{FileSink, Sink};
108    pub use crate::source::{FileSource, Source};
109    pub use crate::stream::{StreamConfig, StreamItem, StreamProcessor};
110    pub use crate::transform::Transform;
111
112    #[cfg(feature = "kafka")]
113    pub use crate::sink::KafkaSink;
114    #[cfg(feature = "kafka")]
115    pub use crate::source::KafkaSource;
116
117    #[cfg(feature = "postgres")]
118    pub use crate::sink::PostGisSink;
119
120    #[cfg(feature = "s3")]
121    pub use crate::sink::S3Sink;
122
123    #[cfg(feature = "stac")]
124    pub use crate::source::StacSource;
125
126    #[cfg(feature = "http")]
127    pub use crate::source::HttpSource;
128}
129
130/// Version information
131pub const VERSION: &str = env!("CARGO_PKG_VERSION");
132
133/// Crate name
134pub const CRATE_NAME: &str = env!("CARGO_PKG_NAME");
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_version() {
142        assert!(!VERSION.is_empty());
143    }
144
145    #[test]
146    fn test_crate_name() {
147        assert_eq!(CRATE_NAME, "oxigdal-etl");
148    }
149}