iridium_core 0.1.10

SQL Server-compatible Rust engine core for Iridium SQL
Documentation
use std::collections::HashMap;

use redb::{AccessGuard, ReadableTable, TableDefinition};

use crate::error::DbError;
use crate::storage::{CheckpointableStorage, StorageCheckpointData, StoredRow};

use super::redb_storage::RedbStorage;

const ROWS_TABLE: TableDefinition<(u32, u64), &[u8]> = TableDefinition::new("rows");
const TABLE_META: TableDefinition<u32, u64> = TableDefinition::new("rows_meta");

impl CheckpointableStorage for RedbStorage {
    fn get_checkpoint_data(&self) -> StorageCheckpointData {
        let Ok(db) = self.db() else {
            return StorageCheckpointData::InMemory(HashMap::new());
        };
        let read_txn = match db.begin_read() {
            Ok(txn) => txn,
            Err(_) => return StorageCheckpointData::InMemory(HashMap::new()),
        };
        let table = match read_txn.open_table(ROWS_TABLE) {
            Ok(t) => t,
            Err(_) => return StorageCheckpointData::InMemory(HashMap::new()),
        };
        let meta = match read_txn.open_table(TABLE_META) {
            Ok(m) => m,
            Err(_) => return StorageCheckpointData::InMemory(HashMap::new()),
        };
        let range = match table.range((0u32, 0u64)..) {
            Ok(r) => r,
            Err(_) => return StorageCheckpointData::InMemory(HashMap::new()),
        };

        let mut rows_map: HashMap<u32, Vec<StoredRow>> = HashMap::new();
        for entry in range {
            let (key, value): (AccessGuard<'_, (u32, u64)>, AccessGuard<'_, &[u8]>) = match entry {
                Ok(e) => e,
                Err(_) => continue,
            };
            let (table_id, _row_idx) = key.value();
            let row = match RedbStorage::deserialize_row(value.value()) {
                Ok(r) => r,
                Err(_) => continue,
            };
            rows_map.entry(table_id).or_default().push(row);
        }

        let meta_range = match meta.range(0u32..) {
            Ok(r) => r,
            Err(_) => return StorageCheckpointData::InMemory(rows_map),
        };
        for entry in meta_range {
            let (table_id, _next_idx): (AccessGuard<'_, u32>, AccessGuard<'_, u64>) = match entry {
                Ok(e) => e,
                Err(_) => continue,
            };
            rows_map.entry(table_id.value()).or_default();
        }

        StorageCheckpointData::InMemory(rows_map)
    }

    fn restore_from_checkpoint(&mut self, data: StorageCheckpointData) -> Result<(), DbError> {
        let rows_map = match data {
            StorageCheckpointData::InMemory(map) => map,
            StorageCheckpointData::Persistent => return Ok(()),
        };
        let db = self.db()?;
        RedbStorage::with_write_txn(db, |write_txn| {
            let mut table = write_txn
                .open_table(ROWS_TABLE)
                .map_err(|e| RedbStorage::storage_err("failed to open rows table", e))?;
            let mut meta = write_txn
                .open_table(TABLE_META)
                .map_err(|e| RedbStorage::storage_err("failed to open meta table", e))?;

            let keys: Vec<(u32, u64)> = table
                .range((0u32, 0u64)..)
                .map_err(|e| RedbStorage::storage_err("failed to scan rows", e))?
                .flatten()
                .map(|(k, _)| k.value())
                .collect();
            for key in keys {
                table
                    .remove(key)
                    .map_err(|e| RedbStorage::storage_err("failed to remove row", e))?;
            }
            let meta_keys: Vec<u32> = meta
                .range(0u32..)
                .map_err(|e| RedbStorage::storage_err("failed to scan meta", e))?
                .flatten()
                .map(|(k, _)| k.value())
                .collect();
            for key in meta_keys {
                meta.remove(key)
                    .map_err(|e| RedbStorage::storage_err("failed to remove meta", e))?;
            }

            for (table_id, rows) in &rows_map {
                let next_idx = rows.len() as u64;
                meta.insert(*table_id, &next_idx)
                    .map_err(|e| RedbStorage::storage_err("failed to set next_idx", e))?;
                for (idx, row) in rows.iter().enumerate() {
                    let row_bytes = RedbStorage::serialize_row(row)?;
                    table
                        .insert((*table_id, idx as u64), row_bytes.as_slice())
                        .map_err(|e| RedbStorage::storage_err("failed to insert row", e))?;
                }
            }
            Ok(())
        })
    }

    fn clone_checkpointable(&self) -> Box<dyn crate::storage::CheckpointableStorage> {
        Box::new(self.clone())
    }
}