Expand description
§Round Pipers
A high-performance Rust library for streaming data processing with circular buffers and read-only pipes. Designed for simple developers who want to process data without dealing with complex Rust concepts.
§Features
- Circular Buffer Pipes: High-performance streaming with Linux
memfd+ double-mapped memory - Read-Only Pipes: Zero-copy access to pre-populated data using lifetimes
- Write-Only Pipes: Efficient buffer filling and streaming to external destinations
- Unified API: All pipe types provide identical reader interfaces
- RAII Memory Management: Automatic pointer advancement with
ChunkGuard - Thread Safety: Multiple readers, single writer model with
Send + Sync - Multi-dimensional Support: Built on
ndarrayfor complex data shapes
§Quick Start
§Circular Buffer Pipes (Streaming Data)
use round_pipers::Pipe;
use std::sync::Arc;
use std::path::Path;
// Create a pipe for streaming f64 values
let pipe = Arc::new(Pipe::new("my_buffer", 1024, [])?);
let writer = pipe.clone().get_writer()?;
let reader = pipe.clone().get_reader();
// Write data
writer.write(100, |mut chunk, _state| {
for i in 0..100 {
chunk[i] = i as f64;
}
})?;
// Read data with iterator - no lifetime management needed
for chunk_result in reader.iter_chunks(25, 25) {
let chunk = chunk_result?;
// Process chunk - automatically advances when dropped
for &value in chunk.iter() {
println!("Value: {}", value);
}
}§Read-Only Pipes (Pre-populated Data)
use round_pipers::ReadOnlyPipe;
// Super simple API - just pass your slice
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let pipe = ReadOnlyPipe::new(&data, [])?;
let reader = pipe.get_reader();
// Same API as regular pipes - no learning curve
for chunk_result in reader.iter_chunks(2, 2) {
let chunk = chunk_result?;
for &value in chunk.iter() {
println!("Value: {}", value);
}
}§Core Concepts
§Reading and Writing
You write ndarrays of (relatively) arbitrary shape into a pipe by calling the write function on a pipe’s writer, and pass in a closure
that gets an array to write to.
You read ndarrays from a pipe by calling the read function on
a pipe’s reader, and pass in a closure with the buffer to process, or by iterating over the
pipe using iter_chunks(). Generally you specify how much data to transfer (n_to_read),
and how much to advance your read pointer by (n_to_consume).
- Overlapping reads:
n_to_consume<n_to_read(sliding window) - Skip data:
n_to_consume>n_to_read - Normal processing:
n_to_consume=n_to_read
§Pipe Types
§1. Circular Buffer Pipes (for streaming data)
A circular shared-memory buffer with support for a single writer and multiple readers.
Perfect for audio, signal processing, or streaming pipelines.
Uses Linux memfd + double-mapped memory for seamless wraparound.
§2. Read-Only Pipes (for pre-populated data)
Access pre-existing data slices with the same API as circular pipes. Simple lifetime-based approach - no data copying. Perfect for processing existing datasets or arrays.
§3. Write-Only Pipes (for filling buffers and streaming)
Two variants for different use cases:
§WriteOnlyPipeBuffer
Write data into pre-allocated mutable slices until full. Simple lifetime-based approach - no data copying, returns errors when full. Perfect for filling output buffers or collecting results.
§WriteOnlyPipeStream
Write typed data to any std::io::Write implementor (files, sockets, etc.). Owns a growable buffer, converts data to native-endian bytes and writes immediately. Perfect for streaming data to files or network destinations with minimal allocations.
§Advanced Usage
§Multi-dimensional Data
use round_pipers::Pipe;
use ndarray::Ix4;
use std::sync::Arc;
use std::path::Path;
// Create a 4D pipe for complex data structures
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(
Path::new("multi_dim"),
1000,
[10, 20, 30, 40] // Shape: 10×20×30×40
)?);
let reader = pipe.clone().get_reader();
for chunk_result in reader.iter_chunks(5, 5) {
let chunk = chunk_result?;
// chunk is ArrayView<f64, Ix5> with shape [5, 10, 20, 30, 40]
println!("Chunk shape: {:?}", chunk.shape());
}§Peek Semantics (Sliding Window)
// Read 50 elements but only consume 10 (sliding window)
for chunk_result in reader.iter_chunks(50, 10) {
let chunk = chunk_result?;
// Next iteration will start 10 elements forward,
// but read 50 elements (40 overlap)
}§Design Philosophy
This library is designed for simple developers who want to process data without dealing with complex Rust concepts:
§You DON’T Need To Understand:
- Complex lifetime annotations
- Interior mutability patterns
- Trait object dynamics
- Memory layout details
§You DO Need To Know:
- Data must outlive read-only pipes (Rust will tell you if you get this wrong)
- Use
iter_chunks(n_to_read, n_to_consume)for processing - Chunks are automatically consumed when dropped
- All pipe types work the same way from a user perspective
§Performance
- Zero-copy data access via ArrayView
- Minimal allocations (just reader state tracking)
- Read-only pipes have no synchronization overhead
- Circular buffer pipes handle writer synchronization automatically
- Lock-free reading in most cases with Fibonacci backoff for contention
§Error Handling
- Clear error messages for common mistakes
- “Insufficient data” when trying to read past end
- “Reader not registered” for invalid reader usage
- All errors implement standard Rust error traits
§Requirements
- Linux (uses
memfd_createandmmap) - Rust 1.70+
§Example FFT Pipeline
Here’s a complete example demonstrating a multi-threaded audio processing pipeline.
You can run this example with: cargo run --example audio_fft_pipeline
//! Audio FFT Pipeline Example
//!
//! This example demonstrates a complete audio processing pipeline using round_pipers:
//!
//! Thread 1: Tone Generator
//! - Generates a sine wave at 0.01 * fs (480 Hz at 48kHz sample rate)
//! - Writes audio samples to a circular buffer pipe
//!
//! Thread 2: FFT Processor
//! - Reads audio samples with 50% overlap (64k samples, consume 32k)
//! - Computes complex FFT using rustfft
//! - Writes complex FFT results to Ix1 pipe (frequency bins as vector)
//!
//! Thread 3: Magnitude Computer and File Writer
//! - Reads complex FFT results from Ix1 pipe using iterator style (iter_chunks)
//! - Processes chunks of 4 frames with automatic memory management via ChunkGuard
//! - Computes magnitude squared of complex FFT for each chunk
//! - Writes magnitude squared data to "example.out" using WriteOnlyPipeStream
//!
//! Key features demonstrated:
//! - Multi-threaded pipeline with circular buffer pipes
//! - Automatic overlap management using read(n_to_read, n_to_consume) parameters
//! - Iterator style reading with ChunkGuard RAII memory management
//! - FnOnce style reading for direct processing
//! - WriteOnlyPipeStream for file output
//! - Real FFT processing with rustfft
//! - Complex FFT data transfer using Ix1 pipes (time-varying frequency vectors)
//! - Proper dimensional modeling: audio=Ix0 (scalar), FFT=Ix1 (vector)
use ndarray::{Ix0, Ix1};
use round_pipers::{Pipe, Readable, Result, Writable, WriteOnlyPipeStream};
use rustfft::{num_complex::Complex, FftPlanner};
use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() -> Result<()> {
println!("Starting audio FFT pipeline example...");
// Create pipes for the processing chain
let audio_pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(
"audio_samples",
1024 * 1024, // 1M samples buffer
[],
)?);
const FFT_SIZE: usize = 65536; // 64k points
// FFT pipe stores complex numbers with Ix1 dimension (frequency bins)
let fft_pipe = Arc::new(Pipe::<Complex<f64>, Ix1, ()>::new(
"fft_results",
512, // 512 FFT frames buffer
[FFT_SIZE], // Each frame has FFT_SIZE frequency bins
)?);
// Thread 1: Tone generator
let audio_pipe_writer = audio_pipe.clone();
let tone_thread = thread::spawn(move || -> Result<()> {
println!("Starting tone generator thread...");
let writer = audio_pipe_writer.get_writer()?;
let fs = 48000.0; // Sample rate
let freq = 0.01 * fs; // 0.01 * fs = 480 Hz
let samples_per_write = 1024;
let mut sample_count = 0;
loop {
writer.write(samples_per_write, |mut chunk, _state| {
for (i, sample) in chunk.iter_mut().enumerate() {
let t = (sample_count + i) as f64 / fs;
*sample = (2.0 * std::f64::consts::PI * freq * t).sin();
}
sample_count += samples_per_write;
// Print progress occasionally
if sample_count % (fs as usize) == 0 {
println!("Generated {} seconds of audio", sample_count as f64 / fs);
}
})?;
// Simulate real-time audio generation
thread::sleep(Duration::from_millis(20));
// Stop after generating 10 seconds
if sample_count >= (10.0 * fs) as usize {
break;
}
}
println!("Tone generator finished");
Ok(())
});
// Thread 2: FFT processor with 50% overlap
let audio_pipe_reader = audio_pipe.clone();
let fft_pipe_writer = fft_pipe.clone();
let fft_thread = thread::spawn(move || -> Result<()> {
println!("Starting FFT processor thread...");
let reader = audio_pipe_reader.get_reader();
let writer = fft_pipe_writer.get_writer()?;
const OVERLAP: usize = FFT_SIZE / 2; // 50% overlap
const ADVANCE: usize = FFT_SIZE - OVERLAP; // How much to advance each time
// Set up rustfft
let mut planner = FftPlanner::<f64>::new();
let fft = planner.plan_fft_forward(FFT_SIZE);
let mut fft_count = 0;
loop {
// Use the read parameters to handle overlap automatically:
// - Read FFT_SIZE samples (64k)
// - Only consume ADVANCE samples (32k)
// This creates 50% overlap: each new read includes the last 32k samples
// from the previous read plus 32k new samples
match reader.read(FFT_SIZE, ADVANCE, |chunk, _state| -> Result<()> {
let input_samples = chunk.as_slice().unwrap();
// Convert to complex numbers for FFT
let mut fft_buffer: Vec<Complex<f64>> = input_samples
.iter()
.map(|&x| Complex::new(x, 0.0))
.collect();
// Compute FFT
fft.process(&mut fft_buffer);
// Write complex FFT results to next pipe (1 frame = FFT_SIZE frequency bins)
writer.write(1, |mut chunk, _state| {
// chunk is now a 2D array with shape [1, FFT_SIZE] (1 frame, FFT_SIZE bins)
for (i, &fft_value) in fft_buffer.iter().enumerate() {
chunk[(0, i)] = fft_value;
}
})?;
fft_count += 1;
if fft_count % 10 == 0 {
println!("Processed {} FFT frames", fft_count);
}
Ok(())
}) {
Ok(_) => {}
Err(e) => {
if e.is_end_of_stream() {
println!("Audio writer finished, FFT processor stopping");
break;
}
return Err(e);
}
}
}
println!("FFT processor finished");
Ok(())
});
// Thread 3: Magnitude computer and file writer
let fft_pipe_reader = fft_pipe.clone();
let magnitude_thread = thread::spawn(move || -> Result<()> {
println!("Starting magnitude computation and file writer thread...");
let reader = fft_pipe_reader.get_reader();
// Create write-only pipe stream to file
let output_file = File::create("example.out")?;
let file_pipe =
WriteOnlyPipeStream::<_, f64, Ix1, String>::new(output_file, 1024, [FFT_SIZE])?;
file_pipe.set_metadata("FFT magnitude squared data".to_string());
let mut frame_count = 0;
// Use iterator style reading instead of FnOnce style
// Read 4 frames at a time, consume 4 frames
for chunk_result in reader.iter_chunks(4, 4) {
match chunk_result {
Ok(chunk_guard) => {
// chunk_guard automatically manages the data lifetime and consumption
let chunk = chunk_guard.data();
// chunk is a 2D array with shape [actual_frames, FFT_SIZE] containing Complex<f64>
let actual_frames = chunk.shape()[0];
// Compute magnitude squared and write to file
file_pipe.write(actual_frames, |mut output_chunk, _state| {
// output_chunk is also 2D with shape [actual_frames, FFT_SIZE] but f64
for frame in 0..actual_frames {
for freq_bin in 0..FFT_SIZE {
let complex_val = chunk[(frame, freq_bin)];
output_chunk[(frame, freq_bin)] = complex_val.norm_sqr();
}
}
})?;
frame_count += actual_frames;
println!(
"Wrote {} FFT magnitude frames to file (chunk size: {})",
frame_count, actual_frames
);
// chunk_guard automatically advances the read pointer when dropped
}
Err(e) => {
if e.is_end_of_stream() || e.is_insufficient_data() {
println!("FFT writer finished, magnitude computation stopping");
break;
}
return Err(e);
}
}
}
println!("Magnitude computation finished");
println!("Output written to example.out");
Ok(())
});
// Wait for all threads to complete
let tone_result = tone_thread.join().unwrap();
let fft_result = fft_thread.join().unwrap();
let magnitude_result = magnitude_thread.join().unwrap();
// Check results
tone_result?;
fft_result?;
magnitude_result?;
println!("Pipeline completed successfully!");
println!("Check 'example.out' for the FFT magnitude squared data");
// Print file info
let metadata = std::fs::metadata("example.out")?;
println!(
"Output file size: {} bytes ({} f64 values)",
metadata.len(),
metadata.len() / 8
);
Ok(())
}Structs§
- Chunk
Guard - RAII guard that holds an ArrayView with managed lifetime
- Pipe
- Pipe
Reader - Pipe
State - State information about a pipe at the time of a read/write operation
- Pipe
Writer - Read
Only Pipe - A read-only pipe that provides access to a pre-populated slice of data Simple API: ReadOnlyPipe::new(my_slice) - no copying, uses lifetimes to ensure slice survives
- Read
Only Pipe Reader - Reader for read-only pipes - provides the same API as regular PipeReader
- Write
Only Pipe Buffer - A write-only pipe that provides access to a pre-allocated mutable slice Simple API: WriteOnlyPipeBuffer::new(my_mut_slice) - no copying, uses lifetimes to ensure slice survives
- Write
Only Pipe Stream - A write-only pipe that streams data to any std::io::Write implementor Owns a buffer that grows as needed, writes data to the target after successful processing
Enums§
- Pipe
Error - Main error type for round_pipers operations
Traits§
- Chunk
Source - Trait for types that can provide chunks of data for iteration This allows both circular buffer pipes and read-only pipes to work with the same iterator API
- Readable
- Sized
Dimension - Writable
Type Aliases§
- Pipe
Iterator - Read
Only Pipe Iterator - Result
- Result type alias for round_pipers operations