adaptive-pipeline
High-performance adaptive file processing pipeline with configurable stages, binary format support, and cross-platform compatibility.
π― Overview
This crate provides the application and infrastructure layers for the Adaptive Pipeline system - including use cases, services, adapters, repositories, and a production-ready CLI.
Key Features
- β‘ Channel-Based Concurrency - Reader β CPU Workers β Direct Writer pattern
- π― Adaptive Performance - Dynamic chunk sizing and worker scaling
- π Enterprise Security - AES-256-GCM, ChaCha20-Poly1305, Argon2 KDF
- π Observable - Prometheus metrics, structured tracing
- π‘οΈ Zero-Panic - No unwrap/expect/panic in production
- π Cross-Platform - macOS, Linux, Windows support
π¦ Installation
As a Library
[]
= "1.0"
As a CLI Tool
ποΈ Architecture
This crate implements the Application and Infrastructure layers:
βββββββββββββββββββββββββββββββββββββββββββββ
β APPLICATION LAYER β
β βββββββββββββββββββββββββββββββββββββββ β
β β Use Cases β β
β β - ProcessFile β β
β β - RestoreFile β β
β β - CreatePipeline β β
β β - ValidateFile β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Application Services β β
β β - ConcurrentPipeline (orchestrator)β β
β β - StreamingFileProcessor β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββ
β INFRASTRUCTURE LAYER β
β βββββββββββββββββββββββββββββββββββββββ β
β β Adapters β β
β β - TokioFileIO β β
β β - AsyncCompression β β
β β - AsyncEncryption β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Repositories β β
β β - SqlitePipelineRepository β β
β βββββββββββββββββββββββββββββββββββββββ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Runtime Management β β
β β - ResourceManager (global tokens) β β
β β - StageExecutor β β
β β - Supervisor (task management) β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββ
π Library Usage
Processing Files
use ProcessFileUseCase;
use ConcurrentPipeline;
use PipelineId;
use Path;
async
Creating Custom Pipelines
use CreatePipelineUseCase;
use ;
// Define pipeline stages
let stages = vec!;
// Create and save pipeline
let pipeline = execute.await?;
println!;
Restoring Files
use RestoreFileUseCase;
use Path;
// Restore from .adapipe format
let result = execute.await?;
println!;
π₯οΈ CLI Usage
Process Files
# Basic processing
# With custom settings
Create Pipelines
# Compression only
# Full security pipeline
Restore Files
# Restore to original location
# Restore to specific directory
Validate Files
# Quick format validation
# Full integrity check
System Benchmarking
# Quick benchmark
# Comprehensive test
For complete CLI documentation, see the root README.
β‘ Performance
Concurrency Model
Channel-Based Execution:
βββββββββββββββ Channel ββββββββββββββββ Direct ββββββββββββββ
β Reader ββββββββββββββββ CPU Workers ββββββββββββββββ Writer β
β Task β Backpressure β (Parallel) β Random Accessβ (.adapipe) β
βββββββββββββββ ββββββββββββββββ ββββββββββββββ
β β β β β
File I/O Rayon Threads Concurrent Seeks
(Streaming) (CPU-bound) (No Mutex!)
Key Optimizations:
- Reader streams with backpressure
- Rayon work-stealing for CPU ops
- Direct concurrent writes (no bottleneck)
- Global resource semaphores
Benchmarks (Mac Pro 2019, Intel Xeon W-3235 @ 3.3GHz, 12-core/24-thread, 48GB RAM, NVMe SSD)
Measured with adaptive_pipeline benchmark command (2025-10-07):
| File Size | Best Throughput | Optimal Config | Adaptive Config |
|---|---|---|---|
| 100 MB | 811 MB/s | 16MB chunks, 7 workers | 502 MB/s (16MB, 8 workers) |
| 1 GB | 822 MB/s | 64MB chunks, 5 workers | 660 MB/s (64MB, 10 workers) |
Performance Insights:
- Consistent 800+ MB/s throughput shows excellent scalability
- Lower worker counts (5-7) often outperform higher counts due to reduced context switching
- Larger chunks (16-64MB) maximize sequential I/O performance
- Adaptive configuration provides good baseline; fine-tuning can improve by 20-60%
Run your own benchmarks: adaptive_pipeline benchmark --file <path>
π§ Configuration
Environment Variables
# Database
# Logging
# Performance
Configuration File
# pipeline.toml
[]
= 8
= 0 # Auto-detect
[]
= "zstd"
= "balanced"
[]
= "aes256gcm"
= "argon2id"
π Observability
Prometheus Metrics
# Start with metrics endpoint
# Query metrics (default port: 9090)
Key Metrics:
pipeline_throughput_bytes_per_secondpipeline_cpu_queue_depthpipeline_worker_utilizationpipeline_chunk_processing_duration_ms
Structured Logging
# Enable debug logging
RUST_LOG=adaptive_pipeline=debug
# Log to file
|
π― Advanced Features
Custom Stages
Implement the StageService trait:
use StageService;
use ;
Resource Management
use ResourceManager;
// Global resource manager
let rm = global;
// Acquire CPU token (respects core count)
let cpu_token = rm.acquire_cpu_token.await?;
// Acquire I/O token (respects device type)
let io_token = rm.acquire_io_token.await?;
// Tokens auto-release on drop
Binary Format
The .adapipe binary format includes:
- Header - Metadata, algorithms, original file info
- Chunks - Processed data with checksums
- Footer - Final statistics and verification data
ββββββββββββββββββββββββββββββββββββββ
β Header (1024 bytes) β
β - Magic bytes: ADAPIPE\0 β
β - Version: 1 β
β - Original filename & checksum β
β - Pipeline ID and stages β
β - Compression/encryption config β
ββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββ
β Chunk 0 (variable size) β
β - Sequence number β
β - Compressed size β
β - Data β
β - Checksum β
ββββββββββββββββββββββββββββββββββββββ
β ... more chunks ... β
ββββββββββββββββββββββββββββββββββββββ
β Footer (1024 bytes) β
β - Total chunks β
β - Output checksum β
β - Processing timestamp β
ββββββββββββββββββββββββββββββββββββββ
π§ͺ Testing
# Run all tests
# Unit tests only
# Integration tests
# With logging
RUST_LOG=debug
π Dependencies
Application Layer
- adaptive-pipeline-domain - Business logic
- adaptive-pipeline-bootstrap - Platform abstraction
- tokio - Async runtime
- rayon - CPU parallelism
Infrastructure Layer
- sqlx - Database (SQLite)
- prometheus - Metrics
- tracing - Structured logging
- brotli / zstd / lz4 / flate2 - Compression
- aes-gcm / chacha20poly1305 - Encryption
- argon2 / scrypt - Key derivation
π Related Crates
- adaptive-pipeline-domain - Pure business logic
- adaptive-pipeline-bootstrap - Platform abstraction
π License
BSD 3-Clause License - see LICENSE file for details.
π€ Contributing
Contributions welcome! Focus areas:
- β New pipeline stages
- β Performance optimizations
- β Additional compression/encryption algorithms
- β Enhanced observability
- β Bug fixes and tests
High Performance | Production Ready | Enterprise Security