prestige 0.3.2

Prestige file reading and writing utilities and tools
Documentation

Prestige

A high-performance Rust library for working with Parquet files and S3 storage, built on Apache Arrow. Prestige provides a complete toolkit for streaming data to/from Parquet format with automatic batching, file rotation, and S3 integration — plus optional Apache Iceberg table format support for catalog-managed lakehouse workloads.

Side note: the name "Prestige" is a reference to the "PrestoDB" query engine (since rebranded "Trino") for providing a relational SQL interface to columnar data files, including Parquet, in S3-compatible block storage.

Features

  • Type-safe Parquet I/O: Derive macros for automatic schema generation and serialization
  • Streaming Architecture: Process large datasets without loading everything into memory
  • Automatic File Rotation: Configure rotation based on row count, byte size, or time intervals
  • S3 Integration: Native support for reading from and writing to S3
  • Crash Recovery: Automatic recovery and cleanup of incomplete files
  • File Monitoring: Poll S3 buckets for new files with configurable lookback
  • Batching & Buffering: Configurable batch sizes for optimal performance
  • Metrics Support: Built-in metrics via metrics or opentelemetry crates
  • Apache Iceberg: Catalog-managed tables with streaming writes, incremental reads, time travel, and automatic compaction

Architecture

Prestige is organized into several key components:

ParquetSink

A managed actor that writes Rust types to Parquet files with automatic batching and rotation.

use prestige::ParquetSinkBuilder;

#[prestige::prestige_schema]
#[derive(Debug, Clone)]
struct SensorData {
    timestamp: u64,
    sensor_id: String,
    temperature: f32,
}

// Create a sink with file rotation
let (client, sink) = ParquetSinkBuilder::<SensorData>::new(
    "sensor_data",
    output_dir,
    file_upload,
    "sensor_metrics",
)
.batch_size(1000)           // Buffer 1000 records before writing
.max_rows(100_000)          // Rotate after 100k rows
.rotation_interval(3600)    // Or rotate every hour
.auto_commit(true)          // Auto-upload completed files
.create()
.await?;

// Write records
client.write(sensor_data, &[]).await?;

// Commit to finalize and get file manifest
let manifest = client.commit().await?;

File Source

Stream Parquet files from local filesystem or S3 as Arrow RecordBatch.

use prestige::file_source;
use futures::StreamExt;

// Read from local files
let paths = vec!["data/file1.parquet", "data/file2.parquet"];
let mut stream = file_source::source(paths, None, None);

while let Some(batch) = stream.next().await {
    let batch = batch?;
    // Process RecordBatch
}

// Read from S3
let client = prestige::new_client(None, None, None, None).await;
let metas = prestige::list_files(&client, "my-bucket", "sensor_data/", None, None);
let mut stream = file_source::source_s3_files(&client, "my-bucket", metas, None, None);

File Upload

Managed service for uploading files to S3 with automatic retries and metrics.

use prestige::FileUpload;

let client = prestige::new_client(None, None, None, None).await;
let (uploader, server) = FileUpload::new(client, "my-bucket".to_string()).await;

// Upload returns immediately, actual upload happens in background
uploader.upload(file_path).await?;

File Poller

Monitor S3 buckets for new files with configurable polling intervals and state tracking.

use prestige::{FilePollerConfigBuilder, LookbackBehavior};
use chrono::Duration;

let config = FilePollerConfigBuilder::default()
    .bucket("my-bucket".to_string())
    .file_type("sensor_data/".to_string())
    .poll_interval(Duration::seconds(60))
    .lookback(LookbackBehavior::from_duration(Duration::hours(24)))
    .build()?;

let (poller, mut file_stream) = FilePollerServer::new(config, state_store).await?;

// Receive new files as they appear
while let Some(file_meta) = file_stream.recv().await {
    println!("New file: {}", file_meta.key);
}

Schema Attribute Macro

The #[prestige::prestige_schema] attribute macro automatically generates all necessary schema and serialization code. It also auto-injects Serialize and Deserialize derives if not already present.

#[prestige::prestige_schema]
#[derive(Debug, Clone)]
struct MyData {
    id: u64,
    name: String,
    value: f64,
    optional_field: Option<i32>,
    raw_bytes: [u8; 16],             // Default: List(UInt8) — structural representation
    #[prestige(as_binary)]
    binary_data: [u8; 16],           // Opt-in: FixedSizeBinary(16)
    #[prestige(as_binary)]
    payload: Vec<u8>,                // Opt-in: Binary
}

// Generated methods:
// - arrow_schema() -> Schema
// - from_arrow_records() -> Result<Vec<Self>>
// - to_arrow_arrays() -> Result<(Vec<Arc<Array>>, Schema)>
// - from_arrow_reader() / write_arrow_file() / write_arrow_stream()

Individual derive macros are also available for advanced use cases:

  • #[derive(ArrowGroup)] - Schema generation only
  • #[derive(ArrowReader)] - Reading from Arrow/Parquet (requires Deserialize)
  • #[derive(ArrowWriter)] - Writing to Arrow/Parquet (requires Serialize)

Apache Iceberg

Enable with the iceberg feature:

[dependencies]
prestige = { version = "0.3", features = ["iceberg"] }

Prestige targets Iceberg V2 and V3 table formats only. V1 tables are not supported.

Schema Definition with Iceberg Annotations

The prestige_schema macro supports iceberg-specific annotations for table name, namespace, partition spec, sort order, and identifier fields:

#[prestige::prestige_schema]
#[prestige(table = "sensor_readings", namespace = "telemetry")]
#[derive(Debug, Clone)]
struct SensorReading {
    #[prestige(identifier)]                    // Identifier field (used for dedup)
    sensor_id: String,

    #[prestige(partition(day), sort_key(order = 1))]  // Partition by day, sort first
    timestamp: i64,

    #[prestige(sort_key(order = 2))]           // Sort second (ascending by default)
    temperature: f64,

    humidity: Option<f64>,

    #[prestige(partition)]                     // Identity partition
    location: String,
}

Partition transforms: identity (default), year, month, day, hour, bucket(n), truncate(n)

Sort key options: sort_key (ascending), sort_key(desc), sort_key(order = N) for explicit ordering, sort_key(desc, order = N) for descending with order

Catalog Connection

Connect to an Iceberg REST catalog:

use prestige::iceberg::{CatalogConfigBuilder, connect_catalog};

let config = CatalogConfigBuilder::default()
    .catalog_uri("http://localhost:8181")
    .catalog_name("my_catalog")
    .warehouse("s3://my-warehouse")
    .s3(S3Config {
        region: "us-east-1".into(),
        endpoint: Some("http://localhost:9000".into()),  // MinIO/LocalStack
        access_key: Some("minioadmin".into()),
        secret_key: Some("minioadmin".into()),
    })
    .build()?;

let catalog = connect_catalog(&config).await?;

Streaming Writes (IcebergSink)

The iceberg sink provides pipelined writes to catalog-managed tables with crash recovery:

use prestige::iceberg::IcebergSinkBuilder;
use std::time::Duration;

let (client, sink) = IcebergSinkBuilder::<SensorReading>::for_type(
    catalog.clone(),
    "sensor_readings",
).await?
.max_rows(500_000)
.max_size_bytes(64 * 1024 * 1024)     // 64 MB
.roll_time(Duration::from_secs(60))    // Commit every 60s
.auto_commit(true)                     // Auto-commit on rotation
.manifest_dir("/tmp/prestige/manifests") // Enable crash recovery
.max_pending_commits(3)                // Pipeline up to 3 catalog commits
.create();

// Run the sink (typically via a supervisor)
tokio::spawn(sink.run(shutdown_signal));

// Write records — returns immediately, batching happens internally
client.write(reading).await?;

// Explicit commit returns file paths after all in-flight commits complete
let file_paths = client.commit().await?;

The sink pipelines S3 writes with catalog commits: while commit N is landing in the catalog, new data can be written to S3 for commit N+1. Per-commit manifest files on local disk ensure crash recovery — on restart, orphaned manifests are detected and re-committed.

Incremental Reads (IcebergPoller)

Stream new data as it arrives via incremental snapshot scanning:

use prestige::iceberg::IcebergPollerConfigBuilder;
use std::sync::Arc;

let (mut receiver, poller) = IcebergPollerConfigBuilder::new(
    table,
    Arc::new(catalog) as Arc<dyn iceberg::Catalog>,
    "sensor-consumer",
)
.poll_interval(Duration::from_secs(10))  // Check every 10s
.channel_size(5)                          // Buffer up to 5 snapshots
.send_timeout(Duration::from_secs(30))   // Backpressure timeout
.start_after_snapshot(last_checkpoint)    // Resume from checkpoint
.create();

// Run the poller
tokio::spawn(poller.run(shutdown_signal));

// Consume incremental data
while let Some(file_stream) = receiver.recv().await {
    println!("snapshot {} from {}", file_stream.snapshot_id, file_stream.table_name);

    // Process all batches in the stream
    let mut stream = file_stream.batches;
    while let Some(batch) = stream.try_next().await? {
        // Process RecordBatch...
    }

    // Acknowledge after processing — poller won't advance until acked
    file_stream.ack();
}

The poller is compaction-aware: when the compactor rewrites files, the incremental scan correctly filters out Replace snapshots to avoid double-delivering data that was already consumed.

Scan API — Windowed Reads and Time Travel

Prestige provides composable scan functions for arbitrary access patterns:

use prestige::iceberg::{
    scan_table, scan_snapshot, scan_since_snapshot, scan_snapshot_range,
    scan_at_timestamp, snapshot_at_timestamp, earliest_snapshot,
    scan_with_filter, scan_columns,
};

// Full table scan (current state)
let stream = scan_table(&table).await?;

// Point-in-time snapshot read
let stream = scan_snapshot(&table, snapshot_id).await?;

// Incremental: all data added after a checkpoint snapshot
let stream = scan_since_snapshot(&table, after_snapshot_id).await?;

// Windowed: data added between two arbitrary snapshots
let stream = scan_snapshot_range(&table, from_snapshot, Some(to_snapshot)).await?;

// Time travel: table state as of a timestamp (epoch millis)
let stream = scan_at_timestamp(&table, 1710547200000).await?;

// Resolve snapshots by timestamp
let snap_id = snapshot_at_timestamp(&table, timestamp_ms);

// Discover the earliest snapshot for full replay
let first = earliest_snapshot(&table);

// Predicate pushdown (partition pruning + row-group filtering)
use prestige::iceberg::{Reference, Datum};
let filter = Reference::new("temperature").greater_than(Datum::double(100.0));
let stream = scan_with_filter(&table, filter).await?;

// Column projection
let stream = scan_columns(&table, vec!["sensor_id", "temperature"]).await?;

Compaction

The compactor consolidates small files into larger, sorted files:

use prestige::iceberg::IcebergCompactorConfigBuilder;

let result = IcebergCompactorConfigBuilder::default()
    .table(table.clone())
    .catalog(catalog.clone())
    .target_file_size_bytes(128 * 1024 * 1024)  // 128 MB target
    .min_files_to_compact(5_usize)               // Compact when >= 5 files/partition
    .deduplicate(true)                            // Remove duplicate rows by identifier fields
    .compression(Compression::ZSTD(Default::default()))
    .build()?
    .execute()
    .await?;

println!(
    "Compacted {} files into {}, eliminated {} duplicates",
    result.files_read, result.files_written, result.duplicates_eliminated
);

Compaction produces Operation::Replace snapshots that atomically swap old files for new ones. Output files are sorted according to the table's sort order for optimal query performance. The compactor handles concurrent writes via optimistic concurrency with automatic retry, and correctly applies delete files so that deleted rows are not resurrected in compacted output.

Automatic Compaction Scheduling

For streaming workloads with frequent small commits, run compaction on a timer:

use prestige::iceberg::CompactionSchedulerBuilder;

let scheduler = CompactionSchedulerBuilder::new(
    table.clone(),
    catalog.clone(),
    "sensor_readings",
)
.interval(Duration::from_secs(300))       // Check every 5 minutes
.min_files_to_compact(5)                  // Threshold per partition
.target_file_size_bytes(128 * 1024 * 1024)
.deduplicate(true)
.build();

// Run as a managed process alongside sinks and pollers
tokio::spawn(scheduler.run(shutdown_signal));

The scheduler performs a lightweight file-count check per partition each cycle, only invoking the full compaction when thresholds are exceeded.

Write-Audit-Publish (WAP)

For workflows requiring validation before data becomes visible:

let (client, sink) = IcebergSinkBuilder::<SensorReading>::for_type(catalog, "sensor_readings")
    .await?
    .wap_enabled("audit-branch")   // Write to audit branch
    .auto_publish(false)           // Don't auto-publish to main
    .create();

// Write data to audit branch
client.write(reading).await?;
client.commit().await?;

// Validate, then publish to main
client.publish().await?;

Complete Iceberg Pipeline Example

use prestige::iceberg::{
    CatalogConfigBuilder, IcebergSinkBuilder, IcebergPollerConfigBuilder,
    CompactionSchedulerBuilder, connect_catalog,
};
use std::time::Duration;

#[prestige::prestige_schema]
#[prestige(table = "events", namespace = "analytics")]
#[derive(Debug, Clone)]
struct Event {
    #[prestige(identifier)]
    event_id: String,
    #[prestige(partition(day), sort_key(order = 1))]
    timestamp: i64,
    #[prestige(partition)]
    event_type: String,
    user_id: String,
    payload: String,
}

#[tokio::main]
async fn main() -> prestige::Result<()> {
    let catalog = connect_catalog(&config).await?;

    // --- Writer ---
    let (sink_client, sink) = IcebergSinkBuilder::<Event>::for_type(catalog.clone(), "events")
        .await?
        .auto_commit(true)
        .roll_time(Duration::from_secs(30))
        .manifest_dir("/tmp/prestige/manifests")
        .create();
    tokio::spawn(sink.run(shutdown.clone()));

    // --- Compaction scheduler ---
    let table = prestige::iceberg::load_table(&catalog, "analytics", "events").await?;
    let scheduler = CompactionSchedulerBuilder::new(table.clone(), catalog.clone(), "events")
        .interval(Duration::from_secs(120))
        .min_files_to_compact(3)
        .deduplicate(true)
        .build();
    tokio::spawn(scheduler.run(shutdown.clone()));

    // --- Consumer ---
    let (mut rx, poller) = IcebergPollerConfigBuilder::new(
        table,
        catalog.as_iceberg_catalog(),
        "events-consumer",
    )
    .poll_interval(Duration::from_secs(5))
    .create();
    tokio::spawn(poller.run(shutdown.clone()));

    // Write events
    sink_client.write(Event { /* ... */ }).await?;

    // Consume events
    while let Some(stream) = rx.recv().await {
        // Process stream.batches...
        stream.ack();
    }

    Ok(())
}

S3 Configuration

Prestige uses the AWS SDK for S3 operations. Configure credentials using standard AWS methods:

# Environment variables
export AWS_REGION=us-east-1
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret

# Or use AWS profiles
export AWS_PROFILE=my-profile

For local testing with LocalStack:

let client = prestige::new_client(
    Some("us-east-1".to_string()),
    Some("http://localhost:4566".to_string()),  // LocalStack endpoint
    Some("test".to_string()),
    Some("test".to_string()),
).await;

Crash Recovery

ParquetSink includes automatic crash recovery:

  • Auto-commit enabled: Incomplete files are moved to .incomplete directory on restart
  • Auto-commit disabled: Incomplete files are deleted on restart
  • Completed files: Automatically re-uploaded if found in output directory

IcebergSink uses per-commit manifest files for crash recovery:

  • Data file paths are recorded to local disk before each catalog commit
  • On restart, orphaned manifests are discovered and their data files re-committed
  • Files already committed are detected and manifests cleaned up automatically

Optional Features

Enable additional functionality via Cargo features:

[dependencies]
prestige = { version = "0.3", features = ["iceberg"] }
  • chrono (default): Support for chrono::DateTime types
  • decimal: Support for rust_decimal::Decimal types
  • iceberg: Apache Iceberg table format support (REST catalog, streaming sink, poller, compactor, scanner)
  • iceberg-test-harness: Test utilities for iceberg integration tests
  • sqlx: Enable SQLx integration
  • sqlx-postgres: PostgreSQL support via SQLx
  • metrics: Instrument with performance metrics compatible with the metrics crate
  • opentelemetry: Instrument with performance metrics compatible with the opentelemetry crate

Metrics Support

Prestige supports optional metrics collection via two backends:

Using metrics-rs

[dependencies]
prestige = { version = "0.3", features = ["metrics"] }
metrics-exporter-prometheus = "0.16" # or your preferred exporter library

Using OpenTelemetry

[dependencies]
prestige = { version = "0.3", features = ["opentelemetry"] }
opentelemetry = { version = "0.31", features = ["metrics"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "metrics"] }
opentelemetry-otlp = { version = "0.31", features = ["metrics", "grpc-tonic"] }

Disabling Metrics

To compile without any metrics overhead, simply don't enable either feature:

[dependencies]
prestige = "0.3"  # No features = no-op metrics impl

Performance Considerations

  • Batch Size: Larger batches reduce overhead but increase memory usage (default: 8192 for reading, configurable for writing)
  • File Rotation: Balance between number of files and file size (default: no rotation)
  • Buffering: File source reads up to 2 files concurrently by default
  • Parallel S3 Reads: Use source_s3_files_unordered() for maximum throughput when order doesn't matter
  • Iceberg Commit Pipeline: The sink pipelines S3 writes with catalog commits — tune max_pending_commits based on commit latency vs memory
  • Compaction Interval: More frequent compaction keeps file counts low for query performance, but each compaction cycle has I/O cost

License

Licensed under either of:

at your option.