Crate aqueducts

Crate aqueducts 

Source
Expand description

§Aqueducts - Data Pipeline Framework

Aqueducts is a declarative framework for building ETL (Extract, Transform, Load) data pipelines. It allows you to define complex data processing workflows using configuration files in JSON, YAML, or TOML formats.

§Features

This crate provides a unified interface to all Aqueducts functionality through feature flags:

§Format Support

  • json - Enable JSON configuration file support
  • toml - Enable TOML configuration file support
  • yaml - Enable YAML configuration file support (enabled by default)

§Cloud Storage Providers

  • s3 - Amazon S3 and S3-compatible storage support (enabled by default)
  • gcs - Google Cloud Storage support (enabled by default)
  • azure - Azure Blob Storage support (enabled by default)

§Database Connectivity

  • odbc - ODBC database connectivity for sources and destinations
  • delta - Delta Lake table support for advanced analytics workloads

§Development Features

  • schema_gen - JSON schema generation for configuration validation
  • protocol - WebSocket protocol support for distributed execution

§Quick Start

use aqueducts::prelude::*;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    // Load pipeline configuration
    let pipeline = Aqueduct::from_file("pipeline.yml", Default::default())?;
     
    // Create DataFusion context
    let ctx = Arc::new(SessionContext::new());
     
    // Execute pipeline
    let _result_ctx = run_pipeline(ctx, pipeline, None).await?;
     
    Ok(())
}

§Configuration Examples

§Basic File Processing Pipeline

version: "v2"
sources:
  - type: file
    name: sales_data
    format:
      type: csv
      options:
        has_header: true
        delimiter: ","
    location: "s3://my-bucket/sales.csv"

stages:
  - - name: process_sales
      query: |
        SELECT
          product_id,
          SUM(quantity) as total_quantity,
          SUM(amount) as total_amount
        FROM sales_data
        GROUP BY product_id

destination:
  type: file
  name: processed_sales
  format:
    type: parquet
    options: {}
  location: "s3://my-bucket/processed/output.parquet"

§Working with Delta Tables

version: "v2"
sources:
  - type: delta
    name: events
    location: "s3://data-lake/events/"
    storage_config: {}
       
stages:
  - - name: daily_summary
      query: |
        SELECT
          DATE(timestamp) as date,
          event_type,
          COUNT(*) as event_count
        FROM events
        WHERE DATE(timestamp) = CURRENT_DATE
        GROUP BY DATE(timestamp), event_type

destination:
  type: delta
  name: daily_metrics
  location: "s3://data-lake/metrics/"
  storage_config: {}
  table_properties: {}
  write_mode:
    operation: append

§Template Parameters

Aqueducts supports template parameters in configuration files using ${parameter_name} syntax. This allows you to create reusable pipeline configurations that can be customized at runtime.

§Example: Parameterized Pipeline

# pipeline-template.yml
version: "v2"
sources:
  - type: file
    name: input_data
    format:
      type: csv
      options:
        has_header: true
    location: "${input_path}"

stages:
  - - name: filter_data
      query: |
        SELECT *
        FROM input_data
        WHERE date >= '${start_date}'
          AND region = '${target_region}'

destination:
  type: file
  name: filtered_output
  format:
    type: parquet
  location: "${output_path}"

§Loading Templates with Parameters

use aqueducts::prelude::*;
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<()> {
    // Define template parameters
    let mut params = HashMap::new();
    params.insert("input_path".to_string(), "s3://data/sales-2024.csv".to_string());
    params.insert("start_date".to_string(), "2024-01-01".to_string());
    params.insert("target_region".to_string(), "US".to_string());
    params.insert("output_path".to_string(), "s3://results/us-sales.parquet".to_string());
     
    // Load templated pipeline
    let pipeline = Aqueduct::from_file("pipeline-template.yml", params)?;
    let ctx = std::sync::Arc::new(datafusion::prelude::SessionContext::new());
     
    run_pipeline(ctx, pipeline, None).await?;
    Ok(())
}

§Loading from Strings

use aqueducts::prelude::*;
use std::collections::HashMap;

let template = r#"
version: "v2"
sources:
  - type: file
    name: logs
    format: { type: json }
    location: "${log_path}"
stages:
  - - name: filter
      query: "SELECT * FROM logs WHERE level = '${log_level}'"
destination:
  type: file
  name: output
  format: { type: parquet }
  location: "${output_path}"
"#;

let mut params = HashMap::new();
params.insert("log_path".to_string(), "/var/log/app.jsonl".to_string());
params.insert("log_level".to_string(), "ERROR".to_string());
params.insert("output_path".to_string(), "/tmp/errors.parquet".to_string());

let pipeline = Aqueduct::from_str(template, TemplateFormat::Yaml, params)?;

§Feature Flag Guide

When using Aqueducts in your Cargo.toml, enable only the features you need:

[dependencies]
# Minimal setup with just local file processing
aqueducts = { version = "0.10", default-features = false, features = ["yaml"] }

# Cloud data processing with S3 and Delta Lake
aqueducts = { version = "0.10", features = ["yaml", "s3", "delta"] }

# Full-featured setup with all storage providers and formats
aqueducts = { version = "0.10", features = ["json", "toml", "yaml", "s3", "gcs", "azure", "odbc", "delta"] }

§Error Handling

All operations return semantic errors through the unified AqueductsError type:

use aqueducts::prelude::*;
use datafusion::prelude::SessionContext;
use std::sync::Arc;

async fn example() -> Result<()> {
    let pipeline = Aqueduct::from_file("pipeline.yml", Default::default())?;
    let ctx = Arc::new(SessionContext::new());
     
    match run_pipeline(ctx, pipeline, None).await {
        Ok(result) => println!("Pipeline executed successfully"),
        Err(AqueductsError::Source { name, message }) => {
            eprintln!("Source '{}' failed: {}", name, message);
        }
        Err(AqueductsError::SchemaValidation { message }) => {
            eprintln!("Schema validation error: {}", message);
        }
        Err(err) => eprintln!("Pipeline error: {}", err),
    }
    Ok(())
}

Modules§

prelude
Prelude module for convenient imports

Structs§

LoggingProgressTracker
A simple progress tracker that logs progress events and stage output using the tracing crate.

Enums§

AqueductsError
TemplateFormat
Serialization format of the Aqueduct pipeline configuration.

Traits§

ProgressTracker
A trait for handling progress events and stage output during pipeline execution.
TemplateLoader
A trait for loading Aqueduct pipeline configurations from various sources with parameter substitution.

Functions§

run_pipeline
Execute an Aqueducts data pipeline.

Type Aliases§

Result