Skip to main content

Crate oxigdal_etl

Crate oxigdal_etl 

Source
Expand description

OxiGDAL ETL - Streaming ETL framework for geospatial data processing

This crate provides a comprehensive ETL (Extract, Transform, Load) framework for continuous geospatial data processing with OxiGDAL.

§Features

  • Async Streaming: Built on tokio for high-performance async I/O
  • Backpressure Handling: Automatic backpressure management
  • Error Recovery: Configurable error handling and retry logic
  • Checkpointing: State persistence for fault tolerance
  • Monitoring: Built-in metrics and logging
  • Resource Limits: Control parallelism and memory usage

§Architecture

The ETL framework consists of several key components:

  • Sources: Data inputs (files, HTTP, S3, STAC, Kafka, PostGIS)
  • Transforms: Data transformations (map, filter, window, join, etc.)
  • Sinks: Data outputs (files, S3, PostGIS, Kafka)
  • Pipeline: Fluent API for composing ETL workflows
  • Stream: Async stream processing with backpressure
  • Scheduler: Task scheduling and execution

§Example

use oxigdal_etl::*;
use oxigdal_etl::source::FileSource;
use oxigdal_etl::sink::FileSink;
use oxigdal_etl::error::TransformError;
use std::path::PathBuf;

// Build ETL pipeline
let pipeline = Pipeline::builder()
    .source(Box::new(FileSource::new(PathBuf::from("input.json"))))
    .map("uppercase".to_string(), |item| {
        Box::pin(async move {
            let s = String::from_utf8(item).map_err(|e| {
                TransformError::InvalidInput {
                    message: e.to_string(),
                }
            })?;
            Ok(s.to_uppercase().into_bytes())
        })
    })
    .filter("non_empty".to_string(), |item| {
        let is_empty = item.is_empty();
        Box::pin(async move { Ok(!is_empty) })
    })
    .sink(Box::new(FileSink::new(PathBuf::from("output.json"))))
    .with_checkpointing()
    .buffer_size(1000)
    .build()?;

// Execute pipeline
let stats = pipeline.run().await?;
println!("Processed {} items", stats.items_processed());

§Feature Flags

  • std (default): Enable standard library support
  • kafka: Enable Kafka source and sink
  • postgres: Enable PostgreSQL/PostGIS support
  • s3: Enable Amazon S3 support
  • stac: Enable STAC catalog support
  • http: Enable HTTP source support
  • scheduler: Enable cron-based scheduling
  • all: Enable all optional features

Re-exports§

pub use error::EtlError;
pub use error::Result;
pub use pipeline::ExecutionMode;
pub use pipeline::Pipeline;
pub use pipeline::PipelineBuilder;
pub use pipeline::PipelineConfig;
pub use pipeline::PipelineStats;
pub use scheduler::Schedule;
pub use scheduler::Scheduler;
pub use scheduler::TaskConfig;
pub use scheduler::TaskResult;
pub use sink::Sink;
pub use source::Source;
pub use stream::BoxStream;
pub use stream::BufferedStream;
pub use stream::ParallelProcessor;
pub use stream::StateManager;
pub use stream::StreamConfig;
pub use stream::StreamItem;
pub use stream::StreamProcessor;
pub use transform::Transform;

Modules§

error
Error types for OxiGDAL ETL operations
operators
Transformation operators for ETL pipelines
pipeline
Pipeline builder for constructing ETL workflows
prelude
Prelude module for convenient imports
scheduler
Task scheduling for ETL pipelines
sink
Data sink implementations for ETL pipelines
source
Data source implementations for ETL pipelines
stream
Stream processing primitives for ETL pipelines
transform
Transformation operators for ETL pipelines

Constants§

CRATE_NAME
Crate name
VERSION
Version information