Prestige
A high-performance Rust library for working with Parquet files and S3 storage, built on Apache Arrow. Prestige provides a complete toolkit for streaming data to/from Parquet format with automatic batching, file rotation, and S3 integration — plus optional Apache Iceberg table format support for catalog-managed lakehouse workloads.
Side note: the name "Prestige" is a reference to the "PrestoDB" query engine (since rebranded "Trino") for providing a relational SQL interface to columnar data files, including Parquet, in S3-compatible block storage.
Features
- Type-safe Parquet I/O: Derive macros for automatic schema generation and serialization
- Streaming Architecture: Process large datasets without loading everything into memory
- Automatic File Rotation: Configure rotation based on row count, byte size, or time intervals
- S3 Integration: Native support for reading from and writing to S3
- Crash Recovery: Automatic recovery and cleanup of incomplete files
- File Monitoring: Poll S3 buckets for new files with configurable lookback
- Batching & Buffering: Configurable batch sizes for optimal performance
- Metrics Support: Built-in metrics via
metricsoropentelemetrycrates - Apache Iceberg: Catalog-managed tables with streaming writes, incremental reads, time travel, and automatic compaction
Architecture
Prestige is organized into several key components:
ParquetSink
A managed actor that writes Rust types to Parquet files with automatic batching and rotation.
use ParquetSinkBuilder;
// Create a sink with file rotation
let = new
.batch_size // Buffer 1000 records before writing
.max_rows // Rotate after 100k rows
.rotation_interval // Or rotate every hour
.auto_commit // Auto-upload completed files
.create
.await?;
// Write records
client.write.await?;
// Commit to finalize and get file manifest
let manifest = client.commit.await?;
File Source
Stream Parquet files from local filesystem or S3 as Arrow RecordBatch.
use file_source;
use StreamExt;
// Read from local files
let paths = vec!;
let mut stream = source;
while let Some = stream.next.await
// Read from S3
let client = new_client.await;
let metas = list_files;
let mut stream = source_s3_files;
File Upload
Managed service for uploading files to S3 with automatic retries and metrics.
use FileUpload;
let client = new_client.await;
let = new.await;
// Upload returns immediately, actual upload happens in background
uploader.upload.await?;
File Poller
Monitor S3 buckets for new files with configurable polling intervals and state tracking.
use ;
use Duration;
let config = default
.bucket
.file_type
.poll_interval
.lookback
.build?;
let = new.await?;
// Receive new files as they appear
while let Some = file_stream.recv.await
Schema Attribute Macro
The #[prestige::prestige_schema] attribute macro automatically generates all necessary schema and serialization code. It also auto-injects Serialize and Deserialize derives if not already present.
// Generated methods:
// - arrow_schema() -> Schema
// - from_arrow_records() -> Result<Vec<Self>>
// - to_arrow_arrays() -> Result<(Vec<Arc<Array>>, Schema)>
// - from_arrow_reader() / write_arrow_file() / write_arrow_stream()
Individual derive macros are also available for advanced use cases:
#[derive(ArrowGroup)]- Schema generation only#[derive(ArrowReader)]- Reading from Arrow/Parquet (requiresDeserialize)#[derive(ArrowWriter)]- Writing to Arrow/Parquet (requiresSerialize)
Apache Iceberg
Enable with the iceberg feature:
[]
= { = "0.3", = ["iceberg"] }
Prestige targets Iceberg V2 and V3 table formats only. V1 tables are not supported.
Schema Definition with Iceberg Annotations
The prestige_schema macro supports iceberg-specific annotations for table name, namespace, partition spec, sort order, and identifier fields:
Partition transforms: identity (default), year, month, day, hour, bucket(n), truncate(n)
Sort key options: sort_key (ascending), sort_key(desc), sort_key(order = N) for explicit ordering, sort_key(desc, order = N) for descending with order
Catalog Connection
Connect to an Iceberg REST catalog:
use ;
let config = default
.catalog_uri
.catalog_name
.warehouse
.s3
.build?;
let catalog = connect_catalog.await?;
Streaming Writes (IcebergSink)
The iceberg sink provides pipelined writes to catalog-managed tables with crash recovery:
use IcebergSinkBuilder;
use Duration;
let = for_type.await?
.max_rows
.max_size_bytes // 64 MB
.roll_time // Commit every 60s
.auto_commit // Auto-commit on rotation
.manifest_dir // Enable crash recovery
.max_pending_commits // Pipeline up to 3 catalog commits
.create;
// Run the sink (typically via a supervisor)
spawn;
// Write records — returns immediately, batching happens internally
client.write.await?;
// Explicit commit returns file paths after all in-flight commits complete
let file_paths = client.commit.await?;
The sink pipelines S3 writes with catalog commits: while commit N is landing in the catalog, new data can be written to S3 for commit N+1. Per-commit manifest files on local disk ensure crash recovery — on restart, orphaned manifests are detected and re-committed.
Incremental Reads (IcebergPoller)
Stream new data as it arrives via incremental snapshot scanning:
use IcebergPollerConfigBuilder;
use Arc;
let = new
.poll_interval // Check every 10s
.channel_size // Buffer up to 5 snapshots
.send_timeout // Backpressure timeout
.start_after_snapshot // Resume from checkpoint
.create;
// Run the poller
spawn;
// Consume incremental data
while let Some = receiver.recv.await
The poller is compaction-aware: when the compactor rewrites files, the incremental scan correctly filters out Replace snapshots to avoid double-delivering data that was already consumed.
Scan API — Windowed Reads and Time Travel
Prestige provides composable scan functions for arbitrary access patterns:
use ;
// Full table scan (current state)
let stream = scan_table.await?;
// Point-in-time snapshot read
let stream = scan_snapshot.await?;
// Incremental: all data added after a checkpoint snapshot
let stream = scan_since_snapshot.await?;
// Windowed: data added between two arbitrary snapshots
let stream = scan_snapshot_range.await?;
// Time travel: table state as of a timestamp (epoch millis)
let stream = scan_at_timestamp.await?;
// Resolve snapshots by timestamp
let snap_id = snapshot_at_timestamp;
// Discover the earliest snapshot for full replay
let first = earliest_snapshot;
// Predicate pushdown (partition pruning + row-group filtering)
use ;
let filter = new.greater_than;
let stream = scan_with_filter.await?;
// Column projection
let stream = scan_columns.await?;
Compaction
The compactor consolidates small files into larger, sorted files:
use IcebergCompactorConfigBuilder;
let result = default
.table
.catalog
.target_file_size_bytes // 128 MB target
.min_files_to_compact // Compact when >= 5 files/partition
.deduplicate // Remove duplicate rows by identifier fields
.compression
.build?
.execute
.await?;
println!;
Compaction produces Operation::Replace snapshots that atomically swap old files for new ones. Output files are sorted according to the table's sort order for optimal query performance. The compactor handles concurrent writes via optimistic concurrency with automatic retry, and correctly applies delete files so that deleted rows are not resurrected in compacted output.
Automatic Compaction Scheduling
For streaming workloads with frequent small commits, run compaction on a timer:
use CompactionSchedulerBuilder;
let scheduler = new
.interval // Check every 5 minutes
.min_files_to_compact // Threshold per partition
.target_file_size_bytes
.deduplicate
.build;
// Run as a managed process alongside sinks and pollers
spawn;
The scheduler performs a lightweight file-count check per partition each cycle, only invoking the full compaction when thresholds are exceeded.
Write-Audit-Publish (WAP)
For workflows requiring validation before data becomes visible:
let = for_type
.await?
.wap_enabled // Write to audit branch
.auto_publish // Don't auto-publish to main
.create;
// Write data to audit branch
client.write.await?;
client.commit.await?;
// Validate, then publish to main
client.publish.await?;
Complete Iceberg Pipeline Example
use ;
use Duration;
async
S3 Configuration
Prestige uses the AWS SDK for S3 operations. Configure credentials using standard AWS methods:
# Environment variables
# Or use AWS profiles
For local testing with LocalStack:
let client = new_client.await;
Crash Recovery
ParquetSink includes automatic crash recovery:
- Auto-commit enabled: Incomplete files are moved to
.incompletedirectory on restart - Auto-commit disabled: Incomplete files are deleted on restart
- Completed files: Automatically re-uploaded if found in output directory
IcebergSink uses per-commit manifest files for crash recovery:
- Data file paths are recorded to local disk before each catalog commit
- On restart, orphaned manifests are discovered and their data files re-committed
- Files already committed are detected and manifests cleaned up automatically
Optional Features
Enable additional functionality via Cargo features:
[]
= { = "0.3", = ["iceberg"] }
chrono(default): Support forchrono::DateTimetypesdecimal: Support forrust_decimal::Decimaltypesiceberg: Apache Iceberg table format support (REST catalog, streaming sink, poller, compactor, scanner)iceberg-test-harness: Test utilities for iceberg integration testssqlx: Enable SQLx integrationsqlx-postgres: PostgreSQL support via SQLxmetrics: Instrument with performance metrics compatible with themetricscrateopentelemetry: Instrument with performance metrics compatible with theopentelemetrycrate
Metrics Support
Prestige supports optional metrics collection via two backends:
Using metrics-rs
[]
= { = "0.3", = ["metrics"] }
= "0.16" # or your preferred exporter library
Using OpenTelemetry
[]
= { = "0.3", = ["opentelemetry"] }
= { = "0.31", = ["metrics"] }
= { = "0.31", = ["rt-tokio", "metrics"] }
= { = "0.31", = ["metrics", "grpc-tonic"] }
Disabling Metrics
To compile without any metrics overhead, simply don't enable either feature:
[]
= "0.3" # No features = no-op metrics impl
Performance Considerations
- Batch Size: Larger batches reduce overhead but increase memory usage (default: 8192 for reading, configurable for writing)
- File Rotation: Balance between number of files and file size (default: no rotation)
- Buffering: File source reads up to 2 files concurrently by default
- Parallel S3 Reads: Use
source_s3_files_unordered()for maximum throughput when order doesn't matter - Iceberg Commit Pipeline: The sink pipelines S3 writes with catalog commits — tune
max_pending_commitsbased on commit latency vs memory - Compaction Interval: More frequent compaction keeps file counts low for query performance, but each compaction cycle has I/O cost
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.