oxigdal_etl/lib.rs
1//! OxiGDAL ETL - Streaming ETL framework for geospatial data processing
2//!
3//! This crate provides a comprehensive ETL (Extract, Transform, Load) framework
4//! for continuous geospatial data processing with OxiGDAL.
5//!
6//! # Features
7//!
8//! - **Async Streaming**: Built on tokio for high-performance async I/O
9//! - **Backpressure Handling**: Automatic backpressure management
10//! - **Error Recovery**: Configurable error handling and retry logic
11//! - **Checkpointing**: State persistence for fault tolerance
12//! - **Monitoring**: Built-in metrics and logging
13//! - **Resource Limits**: Control parallelism and memory usage
14//!
15//! # Architecture
16//!
17//! The ETL framework consists of several key components:
18//!
19//! - **Sources**: Data inputs (files, HTTP, S3, STAC, Kafka, PostGIS)
20//! - **Transforms**: Data transformations (map, filter, window, join, etc.)
21//! - **Sinks**: Data outputs (files, S3, PostGIS, Kafka)
22//! - **Pipeline**: Fluent API for composing ETL workflows
23//! - **Stream**: Async stream processing with backpressure
24//! - **Scheduler**: Task scheduling and execution
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! use oxigdal_etl::*;
30//! use oxigdal_etl::source::FileSource;
31//! use oxigdal_etl::sink::FileSink;
32//! use oxigdal_etl::error::TransformError;
33//! use std::path::PathBuf;
34//!
35//! # async fn example() -> Result<()> {
36//! // Build ETL pipeline
37//! let pipeline = Pipeline::builder()
38//! .source(Box::new(FileSource::new(PathBuf::from("input.json"))))
39//! .map("uppercase".to_string(), |item| {
40//! Box::pin(async move {
41//! let s = String::from_utf8(item).map_err(|e| {
42//! TransformError::InvalidInput {
43//! message: e.to_string(),
44//! }
45//! })?;
46//! Ok(s.to_uppercase().into_bytes())
47//! })
48//! })
49//! .filter("non_empty".to_string(), |item| {
50//! let is_empty = item.is_empty();
51//! Box::pin(async move { Ok(!is_empty) })
52//! })
53//! .sink(Box::new(FileSink::new(PathBuf::from("output.json"))))
54//! .with_checkpointing()
55//! .buffer_size(1000)
56//! .build()?;
57//!
58//! // Execute pipeline
59//! let stats = pipeline.run().await?;
60//! println!("Processed {} items", stats.items_processed());
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! # Feature Flags
66//!
67//! - `std` (default): Enable standard library support
68//! - `kafka`: Enable Kafka source and sink
69//! - `postgres`: Enable PostgreSQL/PostGIS support
70//! - `s3`: Enable Amazon S3 support
71//! - `stac`: Enable STAC catalog support
72//! - `http`: Enable HTTP source support
73//! - `scheduler`: Enable cron-based scheduling
74//! - `all`: Enable all optional features
75
76#![warn(missing_docs)]
77#![cfg_attr(not(feature = "std"), no_std)]
78
79// Re-export key types
80pub use error::{EtlError, Result};
81pub use pipeline::{ExecutionMode, Pipeline, PipelineBuilder, PipelineConfig, PipelineStats};
82pub use scheduler::{Schedule, Scheduler, TaskConfig, TaskResult};
83pub use sink::Sink;
84pub use source::Source;
85pub use stream::{
86 BoxStream, BufferedStream, ParallelProcessor, StateManager, StreamConfig, StreamItem,
87 StreamProcessor,
88};
89pub use transform::Transform;
90
91// Modules
92pub mod error;
93pub mod operators;
94pub mod pipeline;
95pub mod scheduler;
96pub mod sink;
97pub mod source;
98pub mod stream;
99pub mod transform;
100
101/// Prelude module for convenient imports
102pub mod prelude {
103 pub use crate::error::{EtlError, Result};
104 pub use crate::operators::{aggregate::*, filter::*, join::*, map::*, window::*};
105 pub use crate::pipeline::{ExecutionMode, Pipeline, PipelineBuilder};
106 pub use crate::scheduler::{Schedule, Scheduler, TaskConfig};
107 pub use crate::sink::{FileSink, Sink};
108 pub use crate::source::{FileSource, Source};
109 pub use crate::stream::{StreamConfig, StreamItem, StreamProcessor};
110 pub use crate::transform::Transform;
111
112 #[cfg(feature = "kafka")]
113 pub use crate::sink::KafkaSink;
114 #[cfg(feature = "kafka")]
115 pub use crate::source::KafkaSource;
116
117 #[cfg(feature = "postgres")]
118 pub use crate::sink::PostGisSink;
119
120 #[cfg(feature = "s3")]
121 pub use crate::sink::S3Sink;
122
123 #[cfg(feature = "stac")]
124 pub use crate::source::StacSource;
125
126 #[cfg(feature = "http")]
127 pub use crate::source::HttpSource;
128}
129
130/// Version information
131pub const VERSION: &str = env!("CARGO_PKG_VERSION");
132
133/// Crate name
134pub const CRATE_NAME: &str = env!("CARGO_PKG_NAME");
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_version() {
142 assert!(!VERSION.is_empty());
143 }
144
145 #[test]
146 fn test_crate_name() {
147 assert_eq!(CRATE_NAME, "oxigdal-etl");
148 }
149}