universal-inserter 0.1.0

A runtime-agnostic batch inserter library implementing the ClickHouse Inserter pattern
Documentation
  • Coverage
  • 0%
    0 out of 6 items documented0 out of 0 items with examples
  • Size
  • Source code size: 22.19 kB This is the summed size of all the files inside the crates.io package for this release.
  • Documentation size: 2.33 MB This is the summed size of all files generated by rustdoc for all configured targets
  • Ø build duration
  • this release: 13s Average build duration of successful builds.
  • all releases: 13s Average build duration of successful builds in releases after 2024-10-23.
  • Links
  • Homepage
  • avbel/universal-inserter
    0 0 0
  • crates.io
  • Dependencies
  • Versions
  • Owners
  • avbel

universal-inserter

A runtime-agnostic Rust library implementing the batch inserter pattern (similar to ClickHouse's Inserter). Buffers items and flushes them via a user-provided async insert function based on configurable limits.

Features

  • Runtime-agnostic: Works with Tokio, async-std, smol, or any async runtime
  • Row limit: Flush when buffer reaches N items
  • Time period: Flush after duration elapsed
  • Period bias: Optional randomization to prevent synchronized flushes
  • Zero dependencies by default (only rand for period_bias feature)

Installation

[dependencies]
universal-inserter = "0.1"

# Optional: enable period bias randomization
universal-inserter = { version = "0.1", features = ["period_bias"] }

Usage

Basic Example

use universal_inserter::Inserter;
use std::time::Duration;

#[derive(Clone)]
struct MyRow {
    id: u64,
    name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut inserter = Inserter::new(|batch: Vec<MyRow>| async move {
        println!("Inserting {} rows", batch.len());
        // Your insert logic here (database, API, file, etc.)
        Ok::<_, std::io::Error>(())
    })
    .with_max_rows(1000)
    .with_period(Duration::from_secs(5));

    // Write rows
    for i in 0..2500 {
        inserter.write(&MyRow { id: i, name: format!("row_{}", i) });
        inserter.commit().await?;  // Flushes if limits reached
    }

    // Flush remaining and close
    let stats = inserter.end().await?;
    println!("Total: {} rows, {} transactions", stats.rows, stats.transactions);

    Ok(())
}

With Period Bias

use universal_inserter::Inserter;
use std::time::Duration;

let mut inserter = Inserter::new(insert_fn)
    .with_max_rows(500)
    .with_period(Duration::from_secs(10))
    .with_period_bias(0.2);  // ±20% randomization

With Commit Callback

use universal_inserter::Inserter;

let mut inserter = Inserter::new(insert_fn)
    .with_max_rows(100)
    .with_commit_callback(|stats| {
        println!("Committed {} rows", stats.rows);
    });

Force Commit

// Flush unconditionally, regardless of limits
let stats = inserter.force_commit().await?;

Check Pending Stats

let pending = inserter.pending();
println!("Buffered: {} rows", pending.rows);

if let Some(time_left) = inserter.time_left() {
    println!("Next flush in: {:?}", time_left);
}

API

Inserter Methods

Method Description
new(insert_fn) Create inserter with async insert function
with_max_rows(n) Set row limit (default: unlimited)
with_period(duration) Set time-based flush interval
with_period_bias(bias) Add randomization ±bias (requires period_bias feature)
with_commit_callback(fn) Register callback after successful commits
write(item) Add item to buffer (clones item)
write_owned(item) Add item to buffer (moves item)
commit() Check limits and flush if reached
force_commit() Flush unconditionally
end() Consume inserter and flush remaining
pending() Get current buffer statistics
time_left() Duration until next period tick

License

MIT