flowdb 0.6.1

A high-performance embedded time-series + JSON document storage engine (LSM-tree), with built-in IndexedDB-compatible API.
Documentation
use crate::block_meta_index::BlockMetaIndex;
use crate::error::Result;
use crate::manifest::{Manifest, ManifestEntry, SstInfo};
use crate::memtable::MemTables;
use crate::record::{Config, InternalRecord, SyncMode};
use crate::sstable::SstWriter;
use crate::stats::StatsCounters;
use crate::wal::Wal;
use std::sync::Arc;

pub(crate) struct WriteWorker {
    config: Config,
    pub(crate) wal: Wal,
    pub(crate) memtables: Arc<MemTables>,
    manifest: Arc<parking_lot::Mutex<Manifest>>,
    index: Arc<parking_lot::RwLock<BlockMetaIndex>>,
    stats: Arc<StatsCounters>,
}

impl WriteWorker {
    pub fn new(
        config: Config,
        wal: Wal,
        memtables: Arc<MemTables>,
        manifest: Arc<parking_lot::Mutex<Manifest>>,
        index: Arc<parking_lot::RwLock<BlockMetaIndex>>,
        stats: Arc<StatsCounters>,
    ) -> Self {
        Self {
            config,
            wal,
            memtables,
            manifest,
            index,
            stats,
        }
    }

    pub fn flush_wal(&mut self) -> Result<()> {
        self.wal.flush()
    }

    pub fn process_batch_encoded(
        &mut self,
        records: Vec<InternalRecord>,
        wal_buf: &[u8],
        mem_bytes: u64,
        num_records: u64,
        batch_max_seq: u64,
        sync_mode: SyncMode,
    ) -> Result<()> {
        // Backpressure: if frozen memtables have piled up beyond the limit,
        // flush them before accepting new writes. This prevents unbounded
        // memory growth when the flush rate can't keep up with writes.
        while self.memtables.frozen_backpressure() {
            self.do_flush()?;
        }

        let bytes_written = mem_bytes;

        self.wal.write_encoded(wal_buf, batch_max_seq)?;

        {
            let mut active = self.memtables.active_for_batch();
            for rec in records {
                active.insert(rec);
            }
        }

        // Durability: if the caller requested synchronous durability,
        // fsync the WAL immediately so this batch survives a crash.
        if sync_mode == SyncMode::Always {
            self.wal.sync_all()?;
        }

        self.stats.add_wal_bytes(bytes_written);
        let (rec_count, byte_count) = self.memtables.active_stats();
        self.stats.set_memtable(rec_count, byte_count);
        self.stats.set_frozen_count(self.memtables.frozen_count());
        self.stats.records_written(num_records, bytes_written);

        if self.memtables.should_flush() {
            self.do_flush()?;
        }

        Ok(())
    }

    pub fn do_flush(&mut self) -> Result<()> {
        let _did_freeze = self.memtables.freeze();

        // Pop the oldest frozen memtable and flush it to SST.
        // When freeze() succeeds the active was just moved to frozen, so
        // we flush the oldest entry.  When freeze() fails (active empty)
        // we still pop one — this relieves backpressure and prevents a
        // livelock when frozen_backpressure is true with an empty active.
        let frozen = self.memtables.pop_frozen();

        let frozen = match frozen {
            Some(f) => f,
            None => return Ok(()),
        };

        let start = std::time::Instant::now();
        let mut all_records: Vec<InternalRecord> = frozen.iter_sorted().cloned().collect();
        let now_us = now_micros();
        all_records.retain(|r| r.expire_at > now_us);
        // Records are already sorted by (key, ts, seq) from freeze().

        if all_records.is_empty() {
            self.stats.set_frozen_count(self.memtables.frozen_count());
            return Ok(());
        }

        let sst_id;
        {
            let mf = self.manifest.lock();
            sst_id = mf.next_sst_id();
        }

        let sst_dir = self.config.data_dir.join("SST");
        std::fs::create_dir_all(&sst_dir)?;
        let sst_path = sst_dir.join(format!("{:09}.sst", sst_id));
        let tmp_path = sst_path.with_extension("sst.tmp");

        let (sst_bytes, block_infos, bloom) = SstWriter::write(
            &tmp_path,
            &all_records,
            self.config.block_size,
            self.config.zstd_level,
            self.config.bloom_bits_per_key,
            false,
        )?;

        std::fs::rename(&tmp_path, &sst_path)?;

        // Make the rename durable by fsyncing the SST directory.
        // SstWriter::write already called sync_all on the file content,
        // but the directory entry (the rename) is not durable until the
        // parent directory's metadata is synced.
        let sst_dir_file = std::fs::File::open(&sst_dir)?;
        sst_dir_file.sync_all()?;

        let min_ts = all_records.iter().map(|r| r.ts).min().unwrap_or(0);
        let max_ts = all_records.iter().map(|r| r.ts).max().unwrap_or(0);
        let min_expire = all_records.iter().map(|r| r.expire_at).min().unwrap_or(0);
        let max_expire = all_records.iter().map(|r| r.expire_at).max().unwrap_or(0);
        let last_seq = all_records.iter().map(|r| r.seq).max().unwrap_or(0);

        let raw_bytes: u64 = all_records.iter().map(|r| r.estimated_size() as u64).sum();
        if raw_bytes > 0 {
            self.stats
                .set_compression_ratio(sst_bytes as f64 / raw_bytes as f64);
        }

        let sst_info = SstInfo {
            id: sst_id,
            records: all_records.len() as u64,
            bytes: sst_bytes,
            min_ts,
            max_ts,
            min_expire,
            max_expire,
            bloom: Some(bloom.clone()),
        };

        {
            let mut mf = self.manifest.lock();
            mf.append(&ManifestEntry::Flush {
                seq: last_seq,
                sst: sst_info,
                blocks: block_infos.clone(),
            })?;
            mf.append(&ManifestEntry::Checkpoint {
                last_flushed_seq: last_seq,
            })?;
        }

        {
            let mut idx = self.index.write();
            idx.add_sst(sst_id, &block_infos);
            idx.set_bloom(sst_id, bloom);
        }

        {
            if let Err(e) = self.wal.truncate_before(last_seq) {
                tracing::warn!("WAL truncate_before({}) failed: {}", last_seq, e);
            }
            if let Err(e) = self.wal.sync_all() {
                tracing::warn!("WAL sync_all after flush failed: {}", e);
            }
        }

        let elapsed = start.elapsed();
        self.stats.flush_done();
        self.stats.record_flush_latency(elapsed.as_micros() as u64);

        let mf = self.manifest.lock();
        let sst_count = mf.state().sstables.len();
        let total_sst_bytes: u64 = mf.state().sstables.values().map(|s| s.bytes).sum();
        self.stats.set_sstable(sst_count, total_sst_bytes);

        self.stats.set_index_stats(
            self.index.read().total_entries(),
            self.index.read().bucket_count(),
        );

        let (rec_count, byte_count) = self.memtables.active_stats();
        self.stats.set_memtable(rec_count, byte_count);
        self.stats.set_frozen_count(self.memtables.frozen_count());

        Ok(())
    }
}

fn now_micros() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_micros() as i64
}