Expand description
OxiGDAL ETL - Streaming ETL framework for geospatial data processing
This crate provides a comprehensive ETL (Extract, Transform, Load) framework for continuous geospatial data processing with OxiGDAL.
§Features
- Async Streaming: Built on tokio for high-performance async I/O
- Backpressure Handling: Automatic backpressure management
- Error Recovery: Configurable error handling and retry logic
- Checkpointing: State persistence for fault tolerance
- Monitoring: Built-in metrics and logging
- Resource Limits: Control parallelism and memory usage
§Architecture
The ETL framework consists of several key components:
- Sources: Data inputs (files, HTTP, S3, STAC, Kafka, PostGIS)
- Transforms: Data transformations (map, filter, window, join, etc.)
- Sinks: Data outputs (files, S3, PostGIS, Kafka)
- Pipeline: Fluent API for composing ETL workflows
- Stream: Async stream processing with backpressure
- Scheduler: Task scheduling and execution
§Example
use oxigdal_etl::*;
use oxigdal_etl::source::FileSource;
use oxigdal_etl::sink::FileSink;
use oxigdal_etl::error::TransformError;
use std::path::PathBuf;
// Build ETL pipeline
let pipeline = Pipeline::builder()
.source(Box::new(FileSource::new(PathBuf::from("input.json"))))
.map("uppercase".to_string(), |item| {
Box::pin(async move {
let s = String::from_utf8(item).map_err(|e| {
TransformError::InvalidInput {
message: e.to_string(),
}
})?;
Ok(s.to_uppercase().into_bytes())
})
})
.filter("non_empty".to_string(), |item| {
let is_empty = item.is_empty();
Box::pin(async move { Ok(!is_empty) })
})
.sink(Box::new(FileSink::new(PathBuf::from("output.json"))))
.with_checkpointing()
.buffer_size(1000)
.build()?;
// Execute pipeline
let stats = pipeline.run().await?;
println!("Processed {} items", stats.items_processed());§Feature Flags
std(default): Enable standard library supportkafka: Enable Kafka source and sinkpostgres: Enable PostgreSQL/PostGIS supports3: Enable Amazon S3 supportstac: Enable STAC catalog supporthttp: Enable HTTP source supportscheduler: Enable cron-based schedulingall: Enable all optional features
Re-exports§
pub use error::EtlError;pub use error::Result;pub use pipeline::ExecutionMode;pub use pipeline::Pipeline;pub use pipeline::PipelineBuilder;pub use pipeline::PipelineConfig;pub use pipeline::PipelineStats;pub use scheduler::Schedule;pub use scheduler::Scheduler;pub use scheduler::TaskConfig;pub use scheduler::TaskResult;pub use sink::Sink;pub use source::Source;pub use stream::BoxStream;pub use stream::BufferedStream;pub use stream::ParallelProcessor;pub use stream::StateManager;pub use stream::StreamConfig;pub use stream::StreamItem;pub use stream::StreamProcessor;pub use transform::Transform;
Modules§
- error
- Error types for OxiGDAL ETL operations
- operators
- Transformation operators for ETL pipelines
- pipeline
- Pipeline builder for constructing ETL workflows
- prelude
- Prelude module for convenient imports
- scheduler
- Task scheduling for ETL pipelines
- sink
- Data sink implementations for ETL pipelines
- source
- Data source implementations for ETL pipelines
- stream
- Stream processing primitives for ETL pipelines
- transform
- Transformation operators for ETL pipelines
Constants§
- CRATE_
NAME - Crate name
- VERSION
- Version information