nodedb 0.0.0-beta.1

Local-first, real-time, edge-to-cloud hybrid database for multi-modal workloads
Documentation
//! Table scanning and import/export methods for `SparseEngine`.

use tracing::debug;

use super::btree::{DOCUMENTS, INDEXES, SparseEngine, redb_err};

impl SparseEngine {
    /// Scan documents in a collection (reads DOCUMENTS table, not INDEXES).
    ///
    /// Returns `(document_id, document_bytes)` pairs for all documents in the
    /// collection, up to `limit`. Use for full table scans and post-scan filtering.
    pub fn scan_documents(
        &self,
        tenant_id: u32,
        collection: &str,
        limit: usize,
    ) -> crate::Result<Vec<(String, Vec<u8>)>> {
        let prefix = format!("{tenant_id}:{collection}:");
        let end = format!("{tenant_id}:{collection}:\u{ffff}");

        let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
        let table = read_txn
            .open_table(DOCUMENTS)
            .map_err(|e| redb_err("open table", e))?;

        let range = table
            .range(prefix.as_str()..end.as_str())
            .map_err(|e| redb_err("doc range", e))?;

        let mut results = Vec::with_capacity(limit.min(256));
        for entry in range {
            if results.len() >= limit {
                break;
            }
            let entry = entry.map_err(|e| redb_err("doc entry", e))?;
            let key = entry.0.value().to_string();
            // Extract document_id from key format "{tenant}:{collection}:{doc_id}"
            let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
            let value = entry.1.value().to_vec();
            results.push((doc_id, value));
        }

        debug!(collection, count = results.len(), "document scan");
        Ok(results)
    }

    /// Scan index entries grouped by value for a field.
    ///
    /// Returns `(value, count)` pairs by scanning the INDEXES table for
    /// `{tenant}:{collection}:{field}:*` entries and counting documents
    /// per value. Used for index-backed `GROUP BY field` + `COUNT(*)`
    /// queries — no document table access needed at all.
    pub fn scan_index_groups(
        &self,
        tenant_id: u32,
        collection: &str,
        field: &str,
    ) -> crate::Result<Vec<(String, usize)>> {
        let prefix = format!("{tenant_id}:{collection}:{field}:");
        let end = format!("{tenant_id}:{collection}:{field}:\u{ffff}");

        let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
        let table = read_txn
            .open_table(INDEXES)
            .map_err(|e| redb_err("open table", e))?;

        let range = table
            .range(prefix.as_str()..end.as_str())
            .map_err(|e| redb_err("index range", e))?;

        // Index keys have format: {tenant}:{collection}:{field}:{value}:{doc_id}
        // Group by {value} (4th component) and count occurrences.
        let mut groups: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
        for entry in range {
            let entry = entry.map_err(|e| redb_err("index entry", e))?;
            let key = entry.0.value().to_string();
            // Extract value from key: skip prefix, split on ':', take first part.
            // rest = "{value}:{doc_id}" — split at last ':' to get value.
            if let Some(rest) = key.strip_prefix(&prefix)
                && let Some(colon_pos) = rest.rfind(':')
            {
                let value = &rest[..colon_pos];
                *groups.entry(value.to_string()).or_default() += 1;
            }
        }

        let mut result: Vec<(String, usize)> = groups.into_iter().collect();
        result.sort_by(|a, b| a.0.cmp(&b.0));
        debug!(collection, field, groups = result.len(), "index group scan");
        Ok(result)
    }

    /// Scan documents with predicate evaluation at the storage layer.
    ///
    /// Unlike `scan_documents` which returns all documents and filters
    /// post-fetch, this method evaluates predicates during the scan.
    /// Non-matching documents are never allocated or returned — only their
    /// raw bytes are decoded for predicate evaluation, then dropped if they
    /// don't match. This avoids O(N) allocation for large collections when
    /// only a small fraction matches the predicate.
    ///
    /// `predicate` receives the raw document bytes and returns true if the
    /// document should be included in results.
    pub fn scan_documents_filtered(
        &self,
        tenant_id: u32,
        collection: &str,
        limit: usize,
        predicate: &dyn Fn(&[u8]) -> bool,
    ) -> crate::Result<Vec<(String, Vec<u8>)>> {
        let prefix = format!("{tenant_id}:{collection}:");
        let end = format!("{tenant_id}:{collection}:\u{ffff}");

        let read_txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
        let table = read_txn
            .open_table(DOCUMENTS)
            .map_err(|e| redb_err("open table", e))?;

        let range = table
            .range(prefix.as_str()..end.as_str())
            .map_err(|e| redb_err("doc range", e))?;

        let mut results = Vec::with_capacity(limit.min(256));
        for entry in range {
            if results.len() >= limit {
                break;
            }
            let entry = entry.map_err(|e| redb_err("doc entry", e))?;
            let value_bytes = entry.1.value();

            // Evaluate predicate on raw bytes — skip allocation if no match.
            if !predicate(value_bytes) {
                continue;
            }

            let key = entry.0.value().to_string();
            let doc_id = key.strip_prefix(&prefix).unwrap_or(&key).to_string();
            results.push((doc_id, value_bytes.to_vec()));
        }

        debug!(collection, count = results.len(), "filtered document scan");
        Ok(results)
    }

    /// Export all documents as key-value pairs (for snapshot transfer).
    pub fn export_documents(&self) -> crate::Result<Vec<(String, Vec<u8>)>> {
        let txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
        let table = txn
            .open_table(DOCUMENTS)
            .map_err(|e| redb_err("open docs", e))?;
        let mut pairs = Vec::new();
        let iter = table
            .range::<&str>(..)
            .map_err(|e| redb_err("iter docs", e))?;
        for entry in iter {
            let entry = entry.map_err(|e| redb_err("read doc entry", e))?;
            pairs.push((entry.0.value().to_string(), entry.1.value().to_vec()));
        }
        Ok(pairs)
    }

    /// Export all index entries as key-value pairs (for snapshot transfer).
    pub fn export_indexes(&self) -> crate::Result<Vec<(String, Vec<u8>)>> {
        let txn = self.db.begin_read().map_err(|e| redb_err("read txn", e))?;
        let table = txn
            .open_table(INDEXES)
            .map_err(|e| redb_err("open indexes", e))?;
        let mut pairs = Vec::new();
        let iter = table
            .range::<&str>(..)
            .map_err(|e| redb_err("iter indexes", e))?;
        for entry in iter {
            let entry = entry.map_err(|e| redb_err("read index entry", e))?;
            pairs.push((entry.0.value().to_string(), entry.1.value().to_vec()));
        }
        Ok(pairs)
    }

    /// Import documents from a snapshot (overwrites existing data).
    pub fn import_documents(&self, pairs: &[(String, Vec<u8>)]) -> crate::Result<()> {
        let txn = self
            .db
            .begin_write()
            .map_err(|e| redb_err("write txn", e))?;
        {
            let mut table = txn
                .open_table(DOCUMENTS)
                .map_err(|e| redb_err("open docs", e))?;
            for (key, value) in pairs {
                table
                    .insert(key.as_str(), value.as_slice())
                    .map_err(|e| redb_err("insert doc", e))?;
            }
        }
        txn.commit().map_err(|e| redb_err("commit", e))?;
        Ok(())
    }

    /// Import index entries from a snapshot.
    pub fn import_indexes(&self, pairs: &[(String, Vec<u8>)]) -> crate::Result<()> {
        let txn = self
            .db
            .begin_write()
            .map_err(|e| redb_err("write txn", e))?;
        {
            let mut table = txn
                .open_table(INDEXES)
                .map_err(|e| redb_err("open indexes", e))?;
            for (key, value) in pairs {
                table
                    .insert(key.as_str(), value.as_slice())
                    .map_err(|e| redb_err("insert idx", e))?;
            }
        }
        txn.commit().map_err(|e| redb_err("commit", e))?;
        Ok(())
    }
}