buffer_ring 0.1.1

A latch-free I/O buffer ring implementation for concurrent log-structured storage using io_uring
Documentation

Buffer Ring

A latch-free I/O buffer ring implementation for concurrent log-structured storage. Built on io_uring for efficient asynchronous I/O operations on Linux systems. Intended to be the sole write path for Bloom_Db

Overview

This crate implements a fixed-size ring of 1 MB-aligned buffers that amortizes individual I/O operations into larger, sequential writes before they are dispatched to stable storage. It provides latch-free concurrent access using a single packed atomic state word per buffer, making it suitable for high-throughput scenarios where multiple threads need to write to the same buffer simultaneously.

Key Features

  • Latch-free writes: No global locks; all state is managed through atomic operations
  • O_DIRECT compatible: All buffers are 1 MB-aligned (ONE_MEGABYTE_BLOCK) for direct I/O
  • Concurrent amortization: Multiple threads fill one buffer before flush
  • Flexible flushing: Automatic or manual control over when buffers are dispatched
  • Ring-based rotation: Seamlessly rotates to the next buffer when sealed

System Architecture

State Word Layout

All per-buffer metadata is packed into a single AtomicUsize, ensuring self-consistent snapshots:

┌────────────────┬────────────────┬──────────────────┬───────────────────┬──────────┐
│  Bits 63..32   │  Bits 31..8    │  Bits 7..2       │  Bit 1            │  Bit 0   │
│  write offset  │  writer count  │  (reserved)      │  flush-in-prog    │  sealed  │
└────────────────┴────────────────┴──────────────────┴───────────────────┴──────────┘

Flush Protocol

The ring implements the flush protocol from the LLAMA paper without global locks:

  1. Identify the page state to write
  2. Seize space in the active buffer via atomic fetch-and-add
  3. Check atomically whether reservation succeeded; if the buffer is full, seal and rotate
  4. Write payload into reserved range while flush-in-progress bit prevents premature dispatch

Usage

Basic Setup with Automatic Flushing

For most applications, automatic flushing should probaly enabled by default:

use std::sync::Arc;
use flush_buffer_ring::{BufferRing, FlushRingOptions, QuickIO};

let flusher = Arc::new(QuickIO::new(io_uring, file));

// Create a ring with 4 buffers, 1 MB each, auto-flushing enabled
let ring = FlushRingOptions::new()
    .buffers(4)
    .flusher(flusher)
    .build();

Manual Flushing for Custom Protocols

If you need to implement custom buffer protocols or have specific flushing requirements, opt out of automatic flushing using FlushRingOptions:

use std::sync::Arc;
use flush_buffer_ring::{BufferRing, FlushRingOptions, QuickIO};

let flusher = Arc::new(QuickIO::new(io_uring, file));

// Create a ring with MANUAL flushing
let ring = FlushRingOptions::new()
    .buffers(4)
    .auto_flush(false)  // ⚠️ Disable automatic flushing
    .flusher(flusher)
    .build();

// Now you have full control over when buffers are flushed

Key Methods for Manual Flushing

When auto_flush is false, use these methods to control flushing:

// Check if the current buffer is sealed (full)
if ring.current_buffer_full() {
    // Implement your custom flushing logic here
    let buffer = ring.current_buffer();
    ring.flush(&buffer);
}

// Or flush the current buffer explicitly at any time
ring.flush_current_buffer();

// Or flush a specific buffer
ring.flush(&my_buffer);

Builder Configuration

FlushRingOptions provides a fluent API for customization:

let ring = FlushRingOptions::new()
    .buffers(8)                      // Set number of buffers
    .auto_flush(true)                // Enable/disable auto-flush (default: true)
    .flusher(flusher_behavior)       // Set the flush dispatcher
    .build();

Configuration Details

Option Type Default Description
buffers() usize 4 Number of buffers in the ring
auto_flush() bool true Automatically flush when buffer sealed
flusher() Arc<QuickIO> None I/O dispatcher (test mode if None)
Buffer Size 1 MB Always ONE_MEGABYTE_BLOCK (fixed)

Note: Buffer size is intentionally fixed at 1 MB for O_DIRECT compatibility and efficient page-aligned I/O. All buffers in the ring use this size.

When to Use Manual Flushing

Choose manual flushing (auto_flush(false)) when:

  • Implementing custom buffer protocols or serialization formats
  • You need explicit control over flush timing for performance tuning
  • You must batch multiple buffers before dispatching to storage
  • Your workload has specific flush semantics beyond simple "on seal" behavior

Choose automatic flushing (auto_flush(true), the default) when:

  • You want simplicity and predictable, automatic I/O dispatch
  • Standard log-structured storage semantics are sufficient
  • Thread safety and lock-free concurrency are your priorities

⚠️ Critical Warnings: Manual Flushing Pitfalls

When you disable automatic flushing, you assume significant responsibility for system correctness. Below are the most dangerous scenarios:

1. Ring Exhaustion (Deadlock)

The most critical danger. If all buffers become sealed and none are flushed, the ring becomes completely stuck. New write attempts will fail with BufferError::RingExhausted.

// DANGEROUS: Auto-flush disabled, but never flush!
let ring = FlushRingOptions::new()
    .buffers(4)
    .auto_flush(false)  // ⚠️ Manual flush required
    .build();

// Write until all buffers seal...
// Now ring.put() returns RingExhausted on every thread!
// Application DEADLOCKED - cannot progress.

Fix: Establish a flush schedule. Every sealed buffer MUST eventually be flushed.

// CORRECT: Regular flushing prevents exhaustion
for batch in incoming_batches {
    // ... write data ...
    if ring.current_buffer_full() {
        ring.flush_current_buffer();  // ✓ Flush regularly
    }
}

2. Premature Buffer Reuse (Data Corruption)

If a buffer is reset before its I/O completes, new data can overwrite in-flight data:

// DANGEROUS: Reset before I/O completion
let buffer = ring.current_buffer();
ring.flush(buffer);
ring.reset_buffer(buffer);  // ⚠️ Too early! I/O not done!
// Now buffer can be reused and new data overwrites pending I/O

Fix: Only reset_buffer() after confirmed I/O completion:

// CORRECT: Reset after I/O completion handler
fn on_io_complete(buffer: &FlushBuffer) {
    // I/O is confirmed done
    ring.reset_buffer(buffer);  // ✓ Safe now
}

3. Lost Writes and Data Races

Without careful synchronization, concurrent threads can corrupt buffer state:

// DANGEROUS: Unsynchronized concurrent access
let buffer = ring.current_buffer();
buffer.increment_writers();    // Thread A
let _ = buffer.set_sealed_bit_true();  // Thread B
ring.flush(buffer);            // Both threads unsynchronized!

Fix: Use the put() method which handles synchronization internally, or implement your own CAS-based locking. Bare in mind, put also automatically rotates a buffer with insufficent space.

4. Flushed Buffer Still Locked (Ring Stall)

If a buffer is stuck in flush-in-progress state, it never re-enters the ring:

// DANGEROUS: Set flush-in-progress without resetting
let buffer = ring.current_buffer();
buffer.set_flush_in_progress();
// ... forget to call reset_buffer() ...
// This buffer is now **permanently locked**
// Ring slowly exhausts as buffers are permanently claimed

Fix: Always pair flush() with an eventual reset_buffer() call:

// CORRECT: Flush and reset in matching pair
ring.flush(buffer);
// ... I/O dispatcher receives callback ...
on_io_completion(buffer);
ring.reset_buffer(buffer);  // ✓ Re-enable for ring

5. no Flusher Registered (Automatic Resets)

When auto_flush is false and no QuickIO is registered, buffers reset immediately (test mode):

let ring = FlushRingOptions::new()
    .auto_flush(false)
    .flusher(None)  // No actual I/O dispatcher
    .build();

// Buffers reset immediately without actual I/O
// Data is lost! Designed for testing only.

Fix: Always register a flusher in production:

let flusher = Arc::new(QuickIO::with_wait_appender(io_uring, file));
let ring = FlushRingOptions::new()
    .auto_flush(false)
    .flusher(flusher)  // ✓ Real dispatcher
    .build();

6. no new current Buffer set after seel

When a caller seals a buffer, they must ensure that a new current buffer is set. They can do so manually through there own protocols or throught the built in rotate_after_seal() method. The rotate_after_seal() method which rotates the ring away from the from its current state to the next available buffer.

    let buffer = ring.current_buffer();
    let _ = buffer.set_sealed_bit_true(); 

    self.rotate_after_seal(buffer.pos)?; // Rotates to the next available buffer
    
    ring.flush(buffer);           

Detailed Reference: Manual Flushing APIs

Methods for Manual Control

current_buffer() -> &'static FlushBuffer

Get the active buffer for custom protocols:

let active = ring.current_buffer();
let state = active.state.load(Ordering::Acquire);
let offset = state_offset(state);  // Parse packed state

Safety Notes:

  • The returned reference is only valid for the current rotation cycle
  • Ring may rotate anytime if the current buffer is sealed
  • Safe for read-only inspection only

is_current_buffer_sealed() -> bool

Check if current buffer is sealed (full):

if ring.is_current_buffer_sealed() {
    ring.flush_current_buffer();
}

Use for: Intelligent batching decisions

flush_current_buffer()

Convenience method to flush the active buffer:

let buffer = ring.current_buffer();
buffer.set_sealed_bit_true()?;
ring.flush_current_buffer();  // Dispatch to I/O

Equivalent to:

ring.flush(ring.current_buffer());

flush(&buffer: &FlushBuffer)

Explicit dispatch of a specific buffer:

ring.flush(buffer);  // Sets flush-in-progress bit

Must be paired with:

  • reset_buffer() after I/O completion
  • Monitoring via your QuickIO dispatcher

reset_buffer(&buffer: &FlushBuffer)

Clear state after I/O completion:

// Called from I/O completion handler
fn on_completion(buffer: &FlushBuffer) {
    ring.reset_buffer(buffer);  // Re-enable for ring
}

Critical: Do NOT call until I/O is confirmed complete.

Complete Manual Flushing Protocol

use flush_buffer_ring::{FlushRingOptions, QuickIO};
use std::sync::Arc;

// 1. Create ring with manual control
let flusher = Arc::new(QuickIO::with_wait_appender(...));
let ring = Arc::new(
    FlushRingOptions::new()
        .buffers(8)
        .auto_flush(false)  // Enable manual mode
        .flusher(flusher)
        .build()
);

// 2. Register I/O completion callback (typically in io_uring code)
let ring_clone = Arc::clone(&ring);
async_register_completion_handler(move |buffer| {
    // I/O is done, disk is safe
    ring_clone.reset_buffer(buffer);
});

// 3. Main write loop with manual flush control
for entry in entries {
    loop {
        let current = ring.current_buffer();
        
        match current.reserve_space(entry.len()) {
            Ok(offset) => {
                current.write(offset, entry);
                current.decrement_writers();
                break;
            }
            Err(_) => {
                // Buffer full, must flush
                let _ = current.set_sealed_bit_true();
                ring.rotate_after_seal(current.pos); // Must rotate
                

                ring.flush_current_buffer();
            }
        }
    }
}



If need be we can keep track of an address range slot for every buffer For the purpose of log Structured Systems, this is needed as buffers should never write to the same location twice. The next_address_range attribute may be atomically incremented to sort of logically move the the BufferRing along the log.

Implementation Checklist for Manual Flushing

When implementing manual flushing, verify:

  • Every sealed buffer is eventually flushed
  • Current buffer has been set
  • reset_buffer() is called only after I/O completion
  • No buffer is preemptively reset before I/O starts
  • QuickIO is registered (not None)
  • Completion callbacks are properly synchronized
  • Ring exhaustion is monitored and alerts configured
  • Tests verify your flush schedule cannot deadlock
  • Documentation explains custom flush semantics to users

Flush Behaviors

The crate provides built-in flush strategies via QuickIO:

use flush_buffer_ring::QuickIO;
use std::sync::Arc;

// Parallel flushing: multiple buffers dispatched concurrently
let parallel = QuickIO::new(io_uring, file);

// Serial flushing: buffers dispatched one at a time
let serial = QuickIO::link(io_uring, file);

Error Handling

The ring returns BufferError variants to indicate various conditions:

pub enum BufferError {
    InsufficientSpace,        // Buffer too full for this write
    EncounteredSealedBuffer,  // Buffer was sealed; retry with new one
    RingExhausted,            // All buffers busy; back off and retry
    FlushInProgress,          // Flush operation already in progress
    InvalidState,             // Internal state corrupted
}

Thread Safety

The ring is fully thread-safe:

  • All buffers can be accessed from multiple threads simultaneously
  • No global locks; only atomic operations and CAS loops
  • State is self-consistent within each atomic snapshot
  • Gracefully handles concurrent sealing, rotation, and flushing

Performance Characteristics

  • Write latency: Sub-microsecond atomic operations (no locks)
  • Memory overhead: Fixed ~64 bytes per buffer for metadata
  • I/O batching: Amortizes overhead by buffering multiple writes per flush

Examples

Simple Concurrent Writes

use std::sync::Arc;
use std::thread;
use flush_buffer_ring::{BufferRing, FlushRingOptions, QuickIO};

let flusher = Arc::new(QuickIO::new(io_uring, file));
let ring = Arc::new(
    FlushRingOptions::new()
        .buffers(4)
        .flusher(flusher)
        .build()
);

let mut handles = vec![];

for _ in 0..4 {
    let r = ring.clone();
    handles.push(thread::spawn(move || {
        for i in 0..1000 {
            let payload = format!("entry_{}", i).as_bytes().to_vec();
            // Write will autoflushed when buffer is sealed
            // (Real usage would reserve space first)
        }
    }));
}

for handle in handles {
    handle.join().unwrap();
}

Custom Flushing Logic

use flush_buffer_ring::{FlushBufferRing, FlushRingOptions};
use std::sync::Arc;

let ring = Arc::new(
    FlushRingOptions::new()
        .buffers(4)
        .auto_flush(false)  // Disable automatic flushing
        .flusher(flusher)
        .build()
);

// Custom flush strategy: flush every 5 buffers
let mut flush_count = 0;

// ... write operations ...

if ring.is_current_buffer_sealed() {
    ring.flush_current_buffer();
    flush_count += 1;
    
    if flush_count >= 5 {
        // Custom logic: wait for all flushes to complete, etc.
        flush_count = 0;
    }
}

Constants

  • ONE_MEGABYTE_BLOCK = 1024 * 1024 (1 MB): Fixed buffer size for all rings

Implementation Notes

  • Buffers are allocated with malloc and manually aligned to ONE_MEGABYTE_BLOCK
  • All state transitions use atomic compare-exchange loops
  • The flush-in-progress bit prevents race conditions during I/O dispatch
  • Ring rotation uses a simple index scanning strategy to find available buffers
  • No memory barriers are used beyond those in atomic operations

Notes

Work to fully extract the FlushBufferRing implementation from Bloom_lfs is currently underway

Testing

Run the comprehensive test suite:

cargo test

License

license = "GPL-3.0"