flowdb 0.2.2

A high-performance embedded time-series storage engine written in Rust, powered by an LSM-tree architecture.
Documentation
use arbitrary::{Arbitrary, Unstructured};
use flowdb::{Config, Engine, Query, Record};
use tempfile::TempDir;

const FUZZ_ITERS: u64 = 200;

fn make_config(dir: &std::path::Path) -> Config {
    Config {
        data_dir: dir.to_path_buf(),
        memtable_size_mb: 1,
        block_size: 100,
        gc_interval_secs: 3600,
        max_frozen_memtables: 2,
        zstd_level: 1,
        flush_interval_ms: 60000,
        time_bucket_secs: 3600,
        block_cache_capacity_mb: 16,
        index_memory_budget_mb: 64,
        default_ttl_secs: None,
        bloom_bits_per_key: 10,
        wal_segment_size_mb: 64,
        compaction_threshold: 2,
        create_if_missing: true,
        wal_sync_mode: flowdb::SyncMode::IntervalMs(u64::MAX),
        auto_background: false,
    }
}

#[derive(Debug, Clone)]
struct FuzzRecord {
    key: String,
    ts: i64,
    value: Vec<u8>,
}

impl<'a> Arbitrary<'a> for FuzzRecord {
    fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
        let key_len = u.int_in_range(1usize..=16)?;
        let mut key_bytes = vec![0u8; key_len];
        u.fill_buffer(&mut key_bytes)?;
        for b in key_bytes.iter_mut() {
            *b = (*b % 26) + b'a';
        }
        let key = String::from_utf8(key_bytes).unwrap();
        let ts = u.int_in_range(0i64..=1_000_000)?;
        let val_len = u.int_in_range(0usize..=32)?;
        let mut value = vec![0u8; val_len];
        u.fill_buffer(&mut value)?;
        Ok(FuzzRecord { key, ts, value })
    }
}

fn to_record(fr: &FuzzRecord) -> Record {
    Record {
        key: fr.key.clone().into_bytes(),
        ts: fr.ts,
        expire_at: i64::MAX,
        value: fr.value.clone(),
    }
}

fn generate_seed_data(seed: u64) -> Vec<u8> {
    let size = 32768;
    let mut data = Vec::with_capacity(size);
    let mut state = seed;
    for _ in 0..size {
        state = state
            .wrapping_mul(6364136223846793005)
            .wrapping_add(1442695040888963407);
        data.push((state >> 33) as u8);
    }
    data
}

fn gen_records(u: &mut Unstructured, max: usize) -> Vec<FuzzRecord> {
    let count = u.int_in_range(1usize..=max).unwrap_or(1);
    let mut records = Vec::with_capacity(count);
    for _ in 0..count {
        match FuzzRecord::arbitrary(u) {
            Ok(r) => records.push(r),
            Err(_) => break,
        }
    }
    records
}

#[tokio::test]
async fn fuzz_wal_encode_decode() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let config = make_config(dir.path());

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);
        let fuzz_records = gen_records(&mut u, 10);
        if fuzz_records.is_empty() {
            continue;
        }

        let records: Vec<Record> = fuzz_records.iter().map(to_record).collect();

        {
            let engine = Engine::open(config.clone()).await.unwrap();
            engine.write_batch(&records).unwrap();
            engine.shutdown().await.unwrap();
        }

        let engine2 = Engine::open(config).await.unwrap();
        for fr in &fuzz_records {
            let results = engine2.query_by_prefix(&fr.key).await.unwrap();
            let found = results.iter().any(|r| r.ts == fr.ts && r.value == fr.value);
            assert!(
                found,
                "seed={}: record key={} ts={} not recovered",
                seed, fr.key, fr.ts
            );
        }
        engine2.shutdown().await.unwrap();

        drop(dir);
    }
}

#[tokio::test]
async fn fuzz_sstable_write_read() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let config = make_config(dir.path());

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);
        let fuzz_records = gen_records(&mut u, 20);
        if fuzz_records.is_empty() {
            continue;
        }

        let records: Vec<Record> = fuzz_records.iter().map(to_record).collect();

        let engine = Engine::open(config).await.unwrap();
        engine.write_batch(&records).unwrap();
        engine.flush().await.unwrap();

        for fr in &fuzz_records {
            let results = engine.query_by_prefix(&fr.key).await.unwrap();
            let found = results.iter().any(|r| r.ts == fr.ts && r.value == fr.value);
            assert!(
                found,
                "seed={}: record key={} ts={} not found after flush",
                seed, fr.key, fr.ts
            );
        }
        engine.shutdown().await.unwrap();
        drop(dir);
    }
}

#[tokio::test]
async fn fuzz_memtable_query() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let mut config = make_config(dir.path());
        config.memtable_size_mb = 64;

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);
        let fuzz_records = gen_records(&mut u, 10);
        if fuzz_records.is_empty() {
            continue;
        }

        let records: Vec<Record> = fuzz_records.iter().map(to_record).collect();

        let engine = Engine::open(config).await.unwrap();
        engine.write_batch(&records).unwrap();

        for fr in &fuzz_records {
            let results = engine.query_by_prefix(&fr.key).await.unwrap();
            assert!(
                !results.is_empty(),
                "seed={}: memtable query returned empty for key={}",
                seed,
                fr.key
            );
        }

        let all_keys: Vec<String> = fuzz_records.iter().map(|r| r.key.clone()).collect();
        let all_results = engine
            .query(Query::key_range(
                all_keys.iter().min().unwrap().clone(),
                all_keys.iter().max().unwrap().clone(),
            ))
            .await
            .unwrap();
        assert!(
            !all_results.is_empty(),
            "seed={}: key_range query returned empty",
            seed
        );

        engine.shutdown().await.unwrap();
        drop(dir);
    }
}

#[tokio::test]
async fn fuzz_engine_write_query() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let config = make_config(dir.path());

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);

        let engine = Engine::open(config).await.unwrap();

        let batch_count = u.int_in_range(1usize..=3).unwrap_or(1);
        let mut all_written: Vec<Record> = Vec::new();

        for _ in 0..batch_count {
            let fuzz_recs = gen_records(&mut u, 5);
            let records: Vec<Record> = fuzz_recs.iter().map(to_record).collect();
            if records.is_empty() {
                continue;
            }
            engine.write_batch(&records).unwrap();
            all_written.extend(records);
        }

        if all_written.is_empty() {
            engine.shutdown().await.unwrap();
            continue;
        }

        if let Some(key) = all_written.first().map(|r| r.key.clone()) {
            let key_str = String::from_utf8_lossy(&key);
            let results = engine.query_by_prefix(&key_str).await.unwrap();
            assert!(
                !results.is_empty(),
                "seed={}: prefix query empty for key={:?}",
                seed,
                key
            );
        }

        let min_ts = all_written.iter().map(|r| r.ts).min().unwrap();
        let max_ts = all_written.iter().map(|r| r.ts).max().unwrap();
        let results = engine.query_time_range(min_ts, max_ts).await.unwrap();
        assert!(!results.is_empty(), "seed={}: time_range query empty", seed);

        engine.shutdown().await.unwrap();
        drop(dir);
    }
}

#[tokio::test]
async fn fuzz_manifest_recovery() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let config = make_config(dir.path());

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);

        let fuzz_records = gen_records(&mut u, 20);
        if fuzz_records.is_empty() {
            continue;
        }
        let records: Vec<Record> = fuzz_records.iter().map(to_record).collect();

        {
            let engine = Engine::open(config.clone()).await.unwrap();
            engine.write_batch(&records).unwrap();
            engine.flush().await.unwrap();

            let fuzz_more = gen_records(&mut u, 10);
            let more_records: Vec<Record> = fuzz_more.iter().map(to_record).collect();
            if !more_records.is_empty() {
                engine.write_batch(&more_records).unwrap();
            }

            engine.shutdown().await.unwrap();
        }

        let engine2 = Engine::open(config).await.unwrap();
        for fr in &fuzz_records {
            let results = engine2.query_by_prefix(&fr.key).await.unwrap();
            let found = results.iter().any(|r| r.ts == fr.ts && r.value == fr.value);
            assert!(
                found,
                "seed={}: record key={} ts={} lost after recovery",
                seed, fr.key, fr.ts
            );
        }
        engine2.shutdown().await.unwrap();
        drop(dir);
    }
}

#[tokio::test]
async fn fuzz_block_meta_index_queries() {
    for seed in 0..FUZZ_ITERS {
        let dir = TempDir::new().unwrap();
        let config = make_config(dir.path());

        let data = generate_seed_data(seed);
        let mut u = Unstructured::new(&data);

        let engine = Engine::open(config).await.unwrap();

        let mut all_keys = Vec::new();
        for _ in 0..3 {
            let fuzz_recs = gen_records(&mut u, 10);
            let records: Vec<Record> = fuzz_recs.iter().map(to_record).collect();
            if records.is_empty() {
                continue;
            }
            all_keys.extend(fuzz_recs.iter().map(|r| r.key.clone()));
            engine.write_batch(&records).unwrap();
            engine.flush().await.unwrap();
        }

        if all_keys.is_empty() {
            engine.shutdown().await.unwrap();
            continue;
        }

        for key in &all_keys {
            let _ = engine.query_by_prefix(key).await.unwrap();
        }

        if all_keys.len() >= 2 {
            let mut sorted = all_keys.clone();
            sorted.sort();
            let _ = engine
                .query_by_key_range(&sorted[0], sorted.last().unwrap())
                .await
                .unwrap();
        }

        let _ = engine.query_time_range(0, 1_000_000).await.unwrap();

        if let Some(key) = all_keys.first() {
            let _ = engine
                .query_prefix_time_range(key, 0, 1_000_000)
                .await
                .unwrap();
        }

        engine.shutdown().await.unwrap();
        drop(dir);
    }
}