khata-rs 0.1.1

A high-performance disk-persistent message queue. Optimized for low-latency scenarios.
Documentation

docs.rs

khata-rs

An ultra-low latency, disk-backed persistent message queue written in native Rust, inspired by Chronicle Queue.

Overview

khata-rs is designed for latency-sensitive applications that require durable message storage with near-nanosecond read/write performance. It uses memory-mapped I/O and lock-free data structures to achieve ultra-low latency while persisting all messages to disk, while also abiding by the Single Producer Multiple Consumer (SPMC) paradigm.

Features

  • Ultra-low latency - 3-50 nanoseconds per operation using memory-mapped I/O
  • Lock-free writes - No locks on the hot path for writers
  • Zero-copy reads - Readers get direct references to memory-mapped data
  • Time-based rolling - Automatic file rotation (daily, hourly, or custom intervals)
  • Data integrity - Optional SIMD-accelerated CRC-16 checksums

Installation

Add to your Cargo.toml:

[dependencies]
khata-rs = "0.1"

Quick Start

use khata_rs::Queue;

fn main() -> khata_rs::Result<()> {
    // Create or open a queue
    let queue = Queue::new("/tmp/my-queue").build()?;

    // Write messages
    {
        let mut writer = queue.writer()?;
        let index = writer.write(b"Hello, World!")?;
        println!("Written at index: {index:#x}");
    }

    // Read messages
    {
        let mut reader = queue.reader()?;
        reader.rewind()?;

        while let Some(data) = reader.read()? {
            println!("Read: {}", String::from_utf8_lossy(data));
        }
    }

    Ok(())
}

Configuration

Use the builder pattern to customize queue behavior:

use khata_rs::{Queue, RollCycle::FastHourly};

let queue = Queue::new("/tmp/queue")
    .roll_cycle(FastHourly)           // New file every hour
    .block_size(256 * 1024 * 1024)    // 256 MB memory blocks
    .checksums(true)                   // Enable CRC-16 verification
    .index_spacing(256)                // Index every 256th message
    .index_count(4096)                 // Index array size
    .read_only(false)                  // Read-write mode
    .build()?;

Roll Cycles

Cycle Duration Max Messages Use Case
FastDaily 24 hours ~4.3 billion General purpose (default)
FastHourly 1 hour ~4.3 billion Higher granularity
HalfHourly 30 min ~4.3 billion Moderate frequency
TenMinutely 10 min ~4.3 billion High frequency
FiveMinutely 5 min ~1 billion Very high frequency

API

Queue

The main entry point. Thread-safe (Send + Sync).

let queue = Queue::new(path).build()?;
let writer = queue.writer()?;  // One per thread
let reader = queue.reader()?;  // Multiple allowed

Writer

Appends messages with automatic cycle rolling.

let mut writer = queue.writer()?;

// Write and get index
let index = writer.write(b"message")?;

// Explicit flush (also happens on drop)
writer.flush()?;

Reader

Consumes messages with flexible positioning.

let mut reader = queue.reader()?;

// Position at start
reader.rewind()?;

// Sequential read
while let Some(data) = reader.read()? {
    // Process data
}

// Random access
reader.seek(index)?;

// Zero-copy read
reader.read_with(|data| {
    // Process without copying
})?;

// Bidirectional
reader.set_direction(Direction::Backward);

Performance

Designed for scenarios requiring both persistence and low latency:

Operation Latency
Append ~3-50 ns
Sequential read Sub-microsecond
Random access O(1) for indexed messages

Run benchmarks:

cargo bench
# or
cargo run --release --bin benchmark

Architecture

Queue
├── Writer (single-producer per thread)
│   └── Store (memory-mapped cycle file)
│       └── Two-level lock-free index
└── Reader (multiple consumers)
    └── Position cache for O(1) sequential reads

Messages are stored with an 8-byte header (4-byte length + 2-byte CRC + 2-byte padding) followed by the payload. Each message receives a unique 64-bit index encoding the cycle and sequence number.

License

Apache-2.0