cobble 0.1.0

A flexible embedded key-value storage engine for distributed systems as well as single-node applications.
Documentation
//! This module defines the `DbIterator` struct, which provides an iterator over key-value pairs in the database.
use crate::db::value_to_vec_of_columns_with_vlog;
use crate::db_state::DbState;
use crate::error::Result;
use crate::iterator::KvIterator;
use crate::iterator::{DeduplicatingIterator, MergingIterator};
use crate::lsm::DynKvIterator;
use crate::memtable::MemtableManager;
use crate::schema::Schema;
use crate::sst::row_codec::decode_key;
use crate::ttl::TTLProvider;
use crate::vlog::VlogStore;
use bytes::Bytes;
use std::sync::Arc;

pub(crate) struct DbIteratorOptions<'a> {
    pub(crate) end_bound: Option<(Bytes, bool)>,
    pub(crate) snapshot: Arc<DbState>,
    pub(crate) memtable_manager: Option<&'a MemtableManager>,
    pub(crate) vlog_store: Arc<VlogStore>,
    pub(crate) ttl_provider: Arc<TTLProvider>,
    /// Effective schema for merge and value resolution.
    /// When column projection is active, this is a projected schema
    /// with remapped operators matching the selected column indices.
    pub(crate) schema: Arc<Schema>,
}

pub struct DbIterator<'a> {
    inner: DeduplicatingIterator<MergingIterator<DynKvIterator>>,
    end_bound: Option<(Bytes, bool)>,
    snapshot: Arc<DbState>,
    memtable_manager: Option<&'a MemtableManager>,
    vlog_store: Arc<VlogStore>,
    ttl_provider: Arc<TTLProvider>,
    schema: Arc<Schema>,
    num_columns: usize,
}

impl<'a> DbIterator<'a> {
    pub(crate) fn new(
        mut memtable_iters: Vec<DynKvIterator>,
        mut lsm_iters: Vec<DynKvIterator>,
        options: DbIteratorOptions<'a>,
    ) -> Self {
        let schema = options.schema;
        let num_columns = schema.num_columns();
        memtable_iters.append(&mut lsm_iters);
        let inner = DeduplicatingIterator::new(
            MergingIterator::new(memtable_iters),
            num_columns,
            Arc::clone(&options.ttl_provider),
            None,
            schema.clone(),
        );
        Self {
            inner,
            end_bound: options.end_bound,
            snapshot: options.snapshot,
            memtable_manager: options.memtable_manager,
            vlog_store: options.vlog_store,
            ttl_provider: options.ttl_provider,
            schema,
            num_columns,
        }
    }

    pub(crate) fn seek(&mut self, target: &[u8]) -> Result<()> {
        self.inner.seek(target)
    }

    fn is_past_end(&self, encoded_key: &[u8]) -> bool {
        if let Some((end_key, inclusive)) = &self.end_bound {
            encoded_key > end_key.as_ref() || (!inclusive && encoded_key == end_key.as_ref())
        } else {
            false
        }
    }

    fn next_row(&mut self) -> Result<Option<(Bytes, Vec<Option<Bytes>>)>> {
        while self.inner.valid() {
            let Some((mut encoded_key, kv_value)) = self.inner.take_current()? else {
                self.inner.next()?;
                continue;
            };
            self.inner.next()?;
            if self.is_past_end(encoded_key.as_ref()) {
                return Ok(None);
            }
            let key = decode_key(&mut encoded_key)?;
            let value = kv_value.into_decoded(self.num_columns)?;
            let columns = value_to_vec_of_columns_with_vlog(
                value,
                |pointer| match self
                    .vlog_store
                    .read_pointer(&self.snapshot.vlog_version, pointer)
                {
                    Ok(value) => Ok(value),
                    Err(vlog_err) => {
                        if let Some(memtable_manager) = self.memtable_manager {
                            memtable_manager
                                .read_vlog_pointer_with_snapshot(
                                    Arc::clone(&self.snapshot),
                                    pointer,
                                )?
                                .ok_or(vlog_err)
                        } else {
                            Err(vlog_err)
                        }
                    }
                },
                &self.schema,
                Some(self.ttl_provider.time_provider()),
            )?;
            if let Some(columns) = columns {
                return Ok(Some((Bytes::copy_from_slice(key.data()), columns)));
            }
        }
        Ok(None)
    }
}

impl Iterator for DbIterator<'_> {
    type Item = Result<(Bytes, Vec<Option<Bytes>>)>;

    fn next(&mut self) -> Option<Self::Item> {
        self.next_row().transpose()
    }
}