flowdb 0.2.2

A high-performance embedded time-series storage engine written in Rust, powered by an LSM-tree architecture.
Documentation
# FlowDB

A high-performance embedded time-series storage engine written in Rust, powered by an LSM-tree architecture with WAL, SSTables, and Bloom filters.

## Benchmark Results

FlowDB vs RocksDB comparison (100K records, 128B values, batch=100, release build, Apple M-series):

| Category | FlowDB | RocksDB | Result |
|---|---|---|---|
| Sequential Write | 3.7M ops/s | 3.2M ops/s | **FlowDB 1.18x faster** |
| Concurrent Write (8 threads) | 10.2M ops/s | 4.6M ops/s | **FlowDB 2.24x faster** |
| Point Query | 6.1M ops/s | 548K ops/s | **FlowDB 11.1x faster** |
| Prefix Scan (~200 recs) | 71K ops/s | 11K ops/s | **FlowDB 6.3x faster** |
| Full Scan (200K recs) | 73 ops/s | 40 ops/s | **FlowDB 1.82x faster** |
| Storage | 2.0MB | 1.8MB | ~same |

**FlowDB wins on every category** — writes, reads, and scans.

```bash
cargo run --release --example flowdb-vs-rocksdb
```

## Features

- LSM-tree storage with WAL (write-ahead log) for crash recovery
- **Per-record WAL checksums** — corruption detected on replay, bad records rejected
- **Config validation** — invalid configs rejected at startup instead of crashing
- **Frozen memtable backpressure** — writes stall when flush can't keep up
- **Lazy scan iterator** (RocksDB-style `ScanIterator`) for bounded-memory range scans
- **`get_latest(key)`** for retrieving the most recent record by key
- Bloom filters for fast point query negative checks
- Dual compression: lz4 for flush (speed), zstd for compaction (ratio)
- Buffered WAL writes (256KB buffer) for reduced syscall overhead
- WAL pre-encoding outside the write lock for better concurrency
- Time-bucketed block index with binary search
- **LRU block cache** (64 shards, powered by `lru` crate) with true LRU eviction
- BTreeMap-based frozen memtable for O(log n) range queries on flush
- Vec-based active memtable for O(1) writes (no key clone, no tree traversal)
- Zero-copy owned write path (`write_batch_owned`)
- Synchronous write path (`write_batch_sync`) for non-async callers
- **Size-tiered compaction** with streaming heap merge (low memory footprint)
- **Range tombstones** (`delete_range`) for efficient bulk key-range deletion
- Garbage collection (TTL expiry), and point deletes
- **Graceful shutdown**`shutdown()` flushes WAL + memtables before exit
- **Engine stats**`engine.stats()` returns structured counters; `engine.metrics_text()` returns Prometheus-format string

## Quick Start

```toml
[dependencies]
flowdb = "0.2"
```

### Rust Library Usage

```rust
use flowdb::{
    Engine, Config, Record, Query, ScanRange, ScanIterator, ReadOptions,
};

let config = Config::default();
let engine = Engine::open(config).await?;

// Write
let records = vec![Record {
    key: "sensor.temp".into(),
    ts: 1700000000,
    expire_at: i64::MAX,
    value: b"22.5".to_vec(),
}];
engine.write_batch(&records).await?;

// Zero-copy write (moves key/value, no clones)
engine.write_batch_owned(records).await?;

// Delete a range of keys (range tombstone)
engine.delete_range("sensor.a", "sensor.z").await?;

// ── Eager query (returns Vec<Record>) ──
let results = engine.query_by_prefix("sensor.").await?;
let results = engine.query_prefix_time_range("sensor.", 1700000000, 1700003600).await?;

// ── Lazy scan iterator (RocksDB-style, recommended for large ranges) ──

// Prefix scan — yields records one-at-a-time, bounded memory
let iter: ScanIterator = engine.scan_prefix("sensor.")?;
for result in iter {
    let record = result?;
    println!("{} @ {} = {:?}", record.key, record.ts, record.value);
}

// Prefix + time range scan
let iter = engine.scan_prefix_time_range("sensor.", 1700000000, 1700003600)?;

// Full key+time range scan with ReadOptions
let iter = engine.scan_opt(
    ScanRange::prefix_time_range("sensor.", 1700000000, 1700003600),
    &ReadOptions { fill_cache: true, verify_checksums: true },
)?;

// Key range scan
let iter = engine.scan(ScanRange::key_range("sensor.a", "sensor.z"))?;

// Full scan
let iter = engine.scan(ScanRange::all())?;

// Take only first N records (lazy — doesn't read the rest)
let first_10: Vec<Record> = engine
    .scan_prefix("sensor.")?
    .take(10)
    .map(|r| r.unwrap())
    .collect();

// ── get_latest: retrieve the most recent record for a key ──
let latest = engine.get_latest("sensor.temp").await?; // Option<Record>

// Graceful shutdown (flushes WAL + memtables)
engine.shutdown().await?;
```

### ScanRange Builders

| Method | Description |
|---|---|
| `ScanRange::prefix(p)` | All records with key prefix `p` |
| `ScanRange::time_range(t1, t2)` | All records in time range `[t1, t2]` |
| `ScanRange::prefix_time_range(p, t1, t2)` | Prefix + time range |
| `ScanRange::key_range(k1, k2)` | Key range `[k1, k2]` |
| `ScanRange::key_time_range(k1, k2, t1, t2)` | Key range + time range |
| `ScanRange::all()` | Full scan |

### Engine API Reference

| Method | Returns | Description |
|---|---|---|
| `scan(range)` | `Result<ScanIterator>` | Lazy iterator scan |
| `scan_opt(range, opts)` | `Result<ScanIterator>` | Lazy scan with `ReadOptions` |
| `scan_prefix(p)` | `Result<ScanIterator>` | Prefix scan (convenience) |
| `scan_prefix_time_range(p, t1, t2)` | `Result<ScanIterator>` | Prefix + time scan (convenience) |
| `get_latest(key)` | `Result<Option<Record>>` | Latest record for key |
| `query(query)` | `Result<Vec<Record>>` | Eager query (backward compat) |
| `get(key, ts)` | `Result<Option<Record>>` | Point get by exact `(key, ts)` |
| `write_batch(recs)` | `Result<()>` | Batch write |
| `write_batch_owned(recs)` | `Result<()>` | Zero-copy batch write |
| `delete_batch(recs)` | `Result<()>` | Batch point deletes |
| `delete_range(start, end)` | `Result<()>` | Range tombstone delete |
| `patch_record(...)` | `Result<Record>` | Update value/TTL of existing record |
| `flush()` | `Result<()>` | Force memtable flush to SSTable |
| `trigger_gc()` | `Result<usize>` | Run garbage collection |
| `trigger_compaction()` | `Result<bool>` | Trigger size-tiered compaction |
| `shutdown()` | `Result<()>` | Graceful shutdown (flush + cleanup) |
| `stats()` | `EngineStats` | Structured engine counters |
| `metrics_text()` | `String` | Prometheus-format metrics string |

## Configuration

```rust
use flowdb::Config;

let config = Config {
    data_dir: "./data".into(),
    memtable_size_mb: 64,        // Active memtable flush threshold
    max_frozen_memtables: 2,     // Backpressure limit
    block_size: 8192,            // SSTable block size (records per block)
    zstd_level: 3,               // Compaction compression level (1-22)
    bloom_bits_per_key: 10,      // Bloom filter precision
    wal_segment_size_mb: 64,     // WAL segment rotation size
    compaction_threshold: 2,     // SSTable count to trigger compaction
    flush_interval_ms: 1000,     // Background flush interval
    gc_interval_secs: 3600,      // GC interval
    time_bucket_secs: 3600,      // Block index time granularity
    block_cache_capacity_mb: 128,// LRU block cache size
    index_memory_budget_mb: 256, // Block index memory budget
    ..Default::default()
};
```

| Parameter | Default | Description |
|---|---|---|
| `data_dir` | `"./data"` | Data directory path |
| `create_if_missing` | `true` | Create data directory if it doesn't exist |
| `memtable_size_mb` | `64` | Active memtable size threshold (MB) before flush |
| `max_frozen_memtables` | `2` | Max frozen memtables before writes block |
| `block_size` | `8192` | SSTable block size (number of records per block) |
| `zstd_level` | `3` | Zstd compression level (1-22) |
| `bloom_bits_per_key` | `10` | Bloom filter bits per key |
| `wal_segment_size_mb` | `64` | WAL segment file size before rotation (MB) |
| `compaction_threshold` | `2` | Number of SSTables to trigger compaction |
| `flush_interval_ms` | `1000` | Background flush interval (ms) |
| `gc_interval_secs` | `3600` | Garbage collection interval (seconds) |
| `default_ttl_secs` | `None` | Default TTL for records without explicit expiry |
| `time_bucket_secs` | `3600` | Time bucket granularity for block index |
| `index_memory_budget_mb` | `256` | Memory budget for block index (MB) |
| `block_cache_capacity_mb` | `128` | Block cache capacity (MB) |
| `wal_sync_mode` | `IntervalMs(1000)` | WAL fsync mode (`Always`, `IntervalMs(n)`, `None`) |

## Architecture

```
Write Path:
  Client → encode_batch() (outside lock) → WriteWorker mutex → WAL (buffered + checksummed) + MemTable
                                                                           ↓ (when full)
                                                                       Flush → SSTable

Read Path:
  Query → Active MemTable → Frozen MemTables → Block Index → Bloom Filter → SSTable (LRU cached)
  Scan   → ScanIterator (lazy merge heap over memtable + SST block sources)

Background:
  Flush:    MemTable → SSTable (sorted, lz4-compressed, bloom-filtered)
  Compact:  Size-tiered merge (streaming heap merge, zstd-compressed)
  GC:       Remove fully-expired SSTables
  Delete:   Point deletes (tombstones) + Range deletes (range tombstones)
```

## Benchmarks

```bash
# Stress test
cargo run --release --bin flowdb-stress

# Micro-benchmarks
cargo bench

# Coverage
cargo llvm-cov --summary-only
```

## Project Layout

```
src/
  lib.rs              – public API surface (Config, Engine, Record, Query, ...)
  engine.rs           – Engine + ScanIterator (the core)
  memtable.rs         – in-memory write buffer (MemTables)
  wal.rs              – write-ahead log (checksummed)
  sstable.rs          – on-disk sorted-string table reader/writer
  block_meta_index.rs – fine-grained block-level index
  bloom.rs            – bloom filter for SST point queries
  cache.rs            – block cache (LRU)
  compaction.rs       – size-tiered compaction
  gc.rs               – expired-SST garbage collection
  manifest.rs         – append-only manifest log
  record.rs           – Record / InternalRecord / Query / Config types
  write_worker.rs     – single-writer worker driving WAL + memtable
  stats.rs            – engine stats + Prometheus exporter
  error.rs            – FlowError / Result
  bin/
    flowdb-stress.rs  – stress-testing / benchmarking binary
```

## License

MIT.