Crate round_pipers

Crate round_pipers 

Source
Expand description

§Round Pipers

A high-performance Rust library for streaming data processing with circular buffers and read-only pipes. Designed for simple developers who want to process data without dealing with complex Rust concepts.

§Features

  • Circular Buffer Pipes: High-performance streaming with Linux memfd + double-mapped memory
  • Read-Only Pipes: Zero-copy access to pre-populated data using lifetimes
  • Write-Only Pipes: Efficient buffer filling and streaming to external destinations
  • Unified API: All pipe types provide identical reader interfaces
  • RAII Memory Management: Automatic pointer advancement with ChunkGuard
  • Thread Safety: Multiple readers, single writer model with Send + Sync
  • Multi-dimensional Support: Built on ndarray for complex data shapes

§Quick Start

§Circular Buffer Pipes (Streaming Data)

use round_pipers::Pipe;
use std::sync::Arc;
use std::path::Path;

// Create a pipe for streaming f64 values
let pipe = Arc::new(Pipe::new("my_buffer", 1024, [])?);
let writer = pipe.clone().get_writer()?;
let reader = pipe.clone().get_reader();

// Write data
writer.write(100, |mut chunk, _state| {
    for i in 0..100 {
        chunk[i] = i as f64;
    }
})?;

// Read data with iterator - no lifetime management needed
for chunk_result in reader.iter_chunks(25, 25) {
    let chunk = chunk_result?;
    // Process chunk - automatically advances when dropped
    for &value in chunk.iter() {
        println!("Value: {}", value);
    }
}

§Read-Only Pipes (Pre-populated Data)

use round_pipers::ReadOnlyPipe;

// Super simple API - just pass your slice
let data = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let pipe = ReadOnlyPipe::new(&data, [])?;
let reader = pipe.get_reader();

// Same API as regular pipes - no learning curve
for chunk_result in reader.iter_chunks(2, 2) {
    let chunk = chunk_result?;
    for &value in chunk.iter() {
        println!("Value: {}", value);
    }
}

§Core Concepts

§Reading and Writing

You write ndarrays of (relatively) arbitrary shape into a pipe by calling the write function on a pipe’s writer, and pass in a closure that gets an array to write to.

You read ndarrays from a pipe by calling the read function on a pipe’s reader, and pass in a closure with the buffer to process, or by iterating over the pipe using iter_chunks(). Generally you specify how much data to transfer (n_to_read), and how much to advance your read pointer by (n_to_consume).

  • Overlapping reads: n_to_consume < n_to_read (sliding window)
  • Skip data: n_to_consume > n_to_read
  • Normal processing: n_to_consume = n_to_read

§Pipe Types

§1. Circular Buffer Pipes (for streaming data)

A circular shared-memory buffer with support for a single writer and multiple readers. Perfect for audio, signal processing, or streaming pipelines. Uses Linux memfd + double-mapped memory for seamless wraparound.

§2. Read-Only Pipes (for pre-populated data)

Access pre-existing data slices with the same API as circular pipes. Simple lifetime-based approach - no data copying. Perfect for processing existing datasets or arrays.

§3. Write-Only Pipes (for filling buffers and streaming)

Two variants for different use cases:

§WriteOnlyPipeBuffer

Write data into pre-allocated mutable slices until full. Simple lifetime-based approach - no data copying, returns errors when full. Perfect for filling output buffers or collecting results.

§WriteOnlyPipeStream

Write typed data to any std::io::Write implementor (files, sockets, etc.). Owns a growable buffer, converts data to native-endian bytes and writes immediately. Perfect for streaming data to files or network destinations with minimal allocations.

§Advanced Usage

§Multi-dimensional Data

use round_pipers::Pipe;
use ndarray::Ix4;
use std::sync::Arc;
use std::path::Path;

// Create a 4D pipe for complex data structures
let pipe = Arc::new(Pipe::<f64, Ix4, ()>::new(
    Path::new("multi_dim"),
    1000,
    [10, 20, 30, 40]  // Shape: 10×20×30×40
)?);

let reader = pipe.clone().get_reader();
for chunk_result in reader.iter_chunks(5, 5) {
    let chunk = chunk_result?;
    // chunk is ArrayView<f64, Ix5> with shape [5, 10, 20, 30, 40]
    println!("Chunk shape: {:?}", chunk.shape());
}

§Peek Semantics (Sliding Window)

// Read 50 elements but only consume 10 (sliding window)
for chunk_result in reader.iter_chunks(50, 10) {
    let chunk = chunk_result?;
    // Next iteration will start 10 elements forward, 
    // but read 50 elements (40 overlap)
}

§Design Philosophy

This library is designed for simple developers who want to process data without dealing with complex Rust concepts:

§You DON’T Need To Understand:

  • Complex lifetime annotations
  • Interior mutability patterns
  • Trait object dynamics
  • Memory layout details

§You DO Need To Know:

  • Data must outlive read-only pipes (Rust will tell you if you get this wrong)
  • Use iter_chunks(n_to_read, n_to_consume) for processing
  • Chunks are automatically consumed when dropped
  • All pipe types work the same way from a user perspective

§Performance

  • Zero-copy data access via ArrayView
  • Minimal allocations (just reader state tracking)
  • Read-only pipes have no synchronization overhead
  • Circular buffer pipes handle writer synchronization automatically
  • Lock-free reading in most cases with Fibonacci backoff for contention

§Error Handling

  • Clear error messages for common mistakes
  • “Insufficient data” when trying to read past end
  • “Reader not registered” for invalid reader usage
  • All errors implement standard Rust error traits

§Requirements

  • Linux (uses memfd_create and mmap)
  • Rust 1.70+

§Example FFT Pipeline

Here’s a complete example demonstrating a multi-threaded audio processing pipeline. You can run this example with: cargo run --example audio_fft_pipeline

//! Audio FFT Pipeline Example
//!
//! This example demonstrates a complete audio processing pipeline using round_pipers:
//!
//! Thread 1: Tone Generator
//! - Generates a sine wave at 0.01 * fs (480 Hz at 48kHz sample rate)
//! - Writes audio samples to a circular buffer pipe
//!
//! Thread 2: FFT Processor  
//! - Reads audio samples with 50% overlap (64k samples, consume 32k)
//! - Computes complex FFT using rustfft
//! - Writes complex FFT results to Ix1 pipe (frequency bins as vector)
//!
//! Thread 3: Magnitude Computer and File Writer
//! - Reads complex FFT results from Ix1 pipe using iterator style (iter_chunks)
//! - Processes chunks of 4 frames with automatic memory management via ChunkGuard
//! - Computes magnitude squared of complex FFT for each chunk
//! - Writes magnitude squared data to "example.out" using WriteOnlyPipeStream
//!
//! Key features demonstrated:
//! - Multi-threaded pipeline with circular buffer pipes
//! - Automatic overlap management using read(n_to_read, n_to_consume) parameters
//! - Iterator style reading with ChunkGuard RAII memory management
//! - FnOnce style reading for direct processing
//! - WriteOnlyPipeStream for file output
//! - Real FFT processing with rustfft
//! - Complex FFT data transfer using Ix1 pipes (time-varying frequency vectors)
//! - Proper dimensional modeling: audio=Ix0 (scalar), FFT=Ix1 (vector)

use ndarray::{Ix0, Ix1};
use round_pipers::{Pipe, Readable, Result, Writable, WriteOnlyPipeStream};
use rustfft::{num_complex::Complex, FftPlanner};
use std::fs::File;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn main() -> Result<()> {
    println!("Starting audio FFT pipeline example...");

    // Create pipes for the processing chain
    let audio_pipe = Arc::new(Pipe::<f64, Ix0, ()>::new(
        "audio_samples",
        1024 * 1024, // 1M samples buffer
        [],
    )?);

    const FFT_SIZE: usize = 65536; // 64k points

    // FFT pipe stores complex numbers with Ix1 dimension (frequency bins)
    let fft_pipe = Arc::new(Pipe::<Complex<f64>, Ix1, ()>::new(
        "fft_results",
        512,        // 512 FFT frames buffer
        [FFT_SIZE], // Each frame has FFT_SIZE frequency bins
    )?);

    // Thread 1: Tone generator
    let audio_pipe_writer = audio_pipe.clone();
    let tone_thread = thread::spawn(move || -> Result<()> {
        println!("Starting tone generator thread...");
        let writer = audio_pipe_writer.get_writer()?;

        let fs = 48000.0; // Sample rate
        let freq = 0.01 * fs; // 0.01 * fs = 480 Hz
        let samples_per_write = 1024;
        let mut sample_count = 0;

        loop {
            writer.write(samples_per_write, |mut chunk, _state| {
                for (i, sample) in chunk.iter_mut().enumerate() {
                    let t = (sample_count + i) as f64 / fs;
                    *sample = (2.0 * std::f64::consts::PI * freq * t).sin();
                }
                sample_count += samples_per_write;

                // Print progress occasionally
                if sample_count % (fs as usize) == 0 {
                    println!("Generated {} seconds of audio", sample_count as f64 / fs);
                }
            })?;

            // Simulate real-time audio generation
            thread::sleep(Duration::from_millis(20));

            // Stop after generating 10 seconds
            if sample_count >= (10.0 * fs) as usize {
                break;
            }
        }

        println!("Tone generator finished");
        Ok(())
    });

    // Thread 2: FFT processor with 50% overlap
    let audio_pipe_reader = audio_pipe.clone();
    let fft_pipe_writer = fft_pipe.clone();
    let fft_thread = thread::spawn(move || -> Result<()> {
        println!("Starting FFT processor thread...");
        let reader = audio_pipe_reader.get_reader();
        let writer = fft_pipe_writer.get_writer()?;

        const OVERLAP: usize = FFT_SIZE / 2; // 50% overlap
        const ADVANCE: usize = FFT_SIZE - OVERLAP; // How much to advance each time

        // Set up rustfft
        let mut planner = FftPlanner::<f64>::new();
        let fft = planner.plan_fft_forward(FFT_SIZE);

        let mut fft_count = 0;

        loop {
            // Use the read parameters to handle overlap automatically:
            // - Read FFT_SIZE samples (64k)
            // - Only consume ADVANCE samples (32k)
            // This creates 50% overlap: each new read includes the last 32k samples
            // from the previous read plus 32k new samples
            match reader.read(FFT_SIZE, ADVANCE, |chunk, _state| -> Result<()> {
                let input_samples = chunk.as_slice().unwrap();

                // Convert to complex numbers for FFT
                let mut fft_buffer: Vec<Complex<f64>> = input_samples
                    .iter()
                    .map(|&x| Complex::new(x, 0.0))
                    .collect();

                // Compute FFT
                fft.process(&mut fft_buffer);

                // Write complex FFT results to next pipe (1 frame = FFT_SIZE frequency bins)
                writer.write(1, |mut chunk, _state| {
                    // chunk is now a 2D array with shape [1, FFT_SIZE] (1 frame, FFT_SIZE bins)
                    for (i, &fft_value) in fft_buffer.iter().enumerate() {
                        chunk[(0, i)] = fft_value;
                    }
                })?;

                fft_count += 1;
                if fft_count % 10 == 0 {
                    println!("Processed {} FFT frames", fft_count);
                }

                Ok(())
            }) {
                Ok(_) => {}
                Err(e) => {
                    if e.is_end_of_stream() {
                        println!("Audio writer finished, FFT processor stopping");
                        break;
                    }
                    return Err(e);
                }
            }
        }

        println!("FFT processor finished");
        Ok(())
    });

    // Thread 3: Magnitude computer and file writer
    let fft_pipe_reader = fft_pipe.clone();
    let magnitude_thread = thread::spawn(move || -> Result<()> {
        println!("Starting magnitude computation and file writer thread...");
        let reader = fft_pipe_reader.get_reader();

        // Create write-only pipe stream to file
        let output_file = File::create("example.out")?;
        let file_pipe =
            WriteOnlyPipeStream::<_, f64, Ix1, String>::new(output_file, 1024, [FFT_SIZE])?;
        file_pipe.set_metadata("FFT magnitude squared data".to_string());

        let mut frame_count = 0;

        // Use iterator style reading instead of FnOnce style
        // Read 4 frames at a time, consume 4 frames
        for chunk_result in reader.iter_chunks(4, 4) {
            match chunk_result {
                Ok(chunk_guard) => {
                    // chunk_guard automatically manages the data lifetime and consumption
                    let chunk = chunk_guard.data();
                    // chunk is a 2D array with shape [actual_frames, FFT_SIZE] containing Complex<f64>
                    let actual_frames = chunk.shape()[0];

                    // Compute magnitude squared and write to file
                    file_pipe.write(actual_frames, |mut output_chunk, _state| {
                        // output_chunk is also 2D with shape [actual_frames, FFT_SIZE] but f64
                        for frame in 0..actual_frames {
                            for freq_bin in 0..FFT_SIZE {
                                let complex_val = chunk[(frame, freq_bin)];
                                output_chunk[(frame, freq_bin)] = complex_val.norm_sqr();
                            }
                        }
                    })?;

                    frame_count += actual_frames;
                    println!(
                        "Wrote {} FFT magnitude frames to file (chunk size: {})",
                        frame_count, actual_frames
                    );

                    // chunk_guard automatically advances the read pointer when dropped
                }
                Err(e) => {
                    if e.is_end_of_stream() || e.is_insufficient_data() {
                        println!("FFT writer finished, magnitude computation stopping");
                        break;
                    }
                    return Err(e);
                }
            }
        }

        println!("Magnitude computation finished");
        println!("Output written to example.out");
        Ok(())
    });

    // Wait for all threads to complete
    let tone_result = tone_thread.join().unwrap();
    let fft_result = fft_thread.join().unwrap();
    let magnitude_result = magnitude_thread.join().unwrap();

    // Check results
    tone_result?;
    fft_result?;
    magnitude_result?;

    println!("Pipeline completed successfully!");
    println!("Check 'example.out' for the FFT magnitude squared data");

    // Print file info
    let metadata = std::fs::metadata("example.out")?;
    println!(
        "Output file size: {} bytes ({} f64 values)",
        metadata.len(),
        metadata.len() / 8
    );

    Ok(())
}

Structs§

ChunkGuard
RAII guard that holds an ArrayView with managed lifetime
Pipe
PipeReader
PipeState
State information about a pipe at the time of a read/write operation
PipeWriter
ReadOnlyPipe
A read-only pipe that provides access to a pre-populated slice of data Simple API: ReadOnlyPipe::new(my_slice) - no copying, uses lifetimes to ensure slice survives
ReadOnlyPipeReader
Reader for read-only pipes - provides the same API as regular PipeReader
WriteOnlyPipeBuffer
A write-only pipe that provides access to a pre-allocated mutable slice Simple API: WriteOnlyPipeBuffer::new(my_mut_slice) - no copying, uses lifetimes to ensure slice survives
WriteOnlyPipeStream
A write-only pipe that streams data to any std::io::Write implementor Owns a buffer that grows as needed, writes data to the target after successful processing

Enums§

PipeError
Main error type for round_pipers operations

Traits§

ChunkSource
Trait for types that can provide chunks of data for iteration This allows both circular buffer pipes and read-only pipes to work with the same iterator API
Readable
SizedDimension
Writable

Type Aliases§

PipeIterator
ReadOnlyPipeIterator
Result
Result type alias for round_pipers operations