iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use crate::core::reactor::Reactor;
use crate::features::ops;
use crate::features::storage::{bitmap, buffer_pool, manifest, wal};
use crate::features::storage::{compaction, hnsw, memtable, sstable};

use super::blob::{
    BlobBackend, BlobStore, InjectedBlobStoreAdapter, LocalBlobStore, RhodiumBlobStore,
};
use super::fs::{
    infer_data_root, load_existing_l0_runs, recover_compaction_artifacts, wal_sync_policy_from_env,
    DataDirLock,
};
use super::reads::{initialize_inbound_adjacency_index, initialize_typed_inbound_edge_index};
use super::types::{
    Result, StorageConfig, StorageError, StorageHandle, ThreadCoreLaneConfig, ThreadCoreRequest,
};
use super::vector::{default_hnsw_graph_view, vector_payload_for_hnsw};

pub fn open_store(config: StorageConfig) -> Result<StorageHandle> {
    open_store_with_reactor_and_blob_backend(
        config,
        std::sync::Arc::new(crate::core::reactor::SystemReactor),
        BlobBackend::Local,
    )
}

pub fn open_store_with_blob_backend(
    config: StorageConfig,
    blob_backend: BlobBackend,
) -> Result<StorageHandle> {
    open_store_with_reactor_and_blob_backend(
        config,
        std::sync::Arc::new(crate::core::reactor::SystemReactor),
        blob_backend,
    )
}

pub fn open_store_for_request(
    config: StorageConfig,
    request: &ThreadCoreRequest,
    lanes: &ThreadCoreLaneConfig,
) -> Result<StorageHandle> {
    let owned_shard = request.owned_shard();
    let mut lane_config = StorageConfig {
        buffer_pool_pages: config.buffer_pool_pages,
        wal_dir: config.wal_dir.clone(),
        wal_segment_max_bytes: config.wal_segment_max_bytes,
        manifest_path: config.manifest_path.clone(),
        sstable_dir: config.sstable_dir.clone(),
    };
    if lanes.partition_wal {
        lane_config.wal_dir = config.wal_dir.join(format!("core-{:04}", owned_shard));
    }
    if lanes.partition_sstable {
        lane_config.sstable_dir = config.sstable_dir.join(format!("core-{:04}", owned_shard));
    }
    open_store(lane_config)
}

pub fn open_store_with_reactor(
    config: StorageConfig,
    reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> Result<StorageHandle> {
    open_store_with_reactor_and_blob_backend(config, reactor, BlobBackend::Local)
}

pub fn open_store_with_reactor_and_blob_backend(
    config: StorageConfig,
    reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
    blob_backend: BlobBackend,
) -> Result<StorageHandle> {
    let buffer_pool = buffer_pool::BufferPool::new(config.buffer_pool_pages);
    let data_root = infer_data_root(&config.wal_dir, &config.sstable_dir, &config.manifest_path);
    let data_dir_lock = if let Some(root) = data_root.as_ref() {
        Some(DataDirLock::acquire(root)?)
    } else {
        None
    };
    let wal_sync_policy = wal_sync_policy_from_env();
    let wal = wal::Wal::open_with_reactor_and_policy(
        config.wal_dir,
        config.wal_segment_max_bytes,
        wal_sync_policy,
        reactor.clone(),
    )?;
    let mut manifest =
        manifest::Manifest::load_or_create_with_reactor(config.manifest_path, reactor.clone())?;
    if manifest.schema_version < manifest::CURRENT_SCHEMA_VERSION {
        manifest.schema_version = manifest::CURRENT_SCHEMA_VERSION;
        manifest.persist()?;
    }
    reactor.create_dir_all(&config.sstable_dir)?;
    let bitmap_dir = config.sstable_dir.join("bitmap");
    recover_compaction_artifacts(&config.sstable_dir, reactor.as_ref())?;
    let bitmap_store =
        bitmap::BitmapStore::load_or_create_with_reactor(bitmap_dir, reactor.as_ref())?;
    let memtable = memtable::MemTable::new();
    let (l0_runs, sstable_cache) = load_existing_l0_runs(&config.sstable_dir, reactor.as_ref())?;

    let mut hnsw_graphs = std::collections::HashMap::new();
    for table in sstable_cache.values() {
        for entry in &table.entries {
            if entry.kind == sstable::EntryKind::VectorDelta {
                if let Some((space_id, vec)) = vector_payload_for_hnsw(&manifest, &entry.value)? {
                    hnsw_graphs
                        .entry(space_id)
                        .or_insert_with(|| hnsw::HnswGraph::new(16, 32, 200))
                        .insert(entry.key, vec, entry.key ^ entry.version);
                }
            }
        }
    }
    let hnsw_total_vectors = hnsw_graphs.values().map(|graph| graph.len() as u64).sum();
    let hnsw_graph = default_hnsw_graph_view(&manifest, &hnsw_graphs);

    let blob_store = make_blob_store(blob_backend, &config.sstable_dir)?;

    let mut handle = StorageHandle {
        buffer_pool,
        wal,
        manifest,
        bitmap_store,
        memtable,
        l0_runs,
        sstable_cache,
        sstable_dir: config.sstable_dir,
        metrics: super::types::AmpMetrics::default(),
        reactor,
        compaction_policy: compaction::CompactionPolicy::new(32, 10),
        hnsw_scheduler: hnsw::HnswMaintenanceScheduler::new(0.05),
        hnsw_graph,
        hnsw_graphs,
        hnsw_total_vectors,
        hnsw_updated_vectors: 0,
        last_hnsw_rebuild_reason: None,
        pending_deltas_per_node: std::collections::HashMap::new(),
        embedding_pending_nodes: std::collections::HashSet::new(),
        tombstoned_node_ids: std::collections::HashSet::new(),
        suspended_spaces: std::collections::HashSet::new(),
        logical_node_cache: std::collections::HashMap::new(),
        logical_node_cache_order: std::collections::VecDeque::new(),
        logical_node_cache_capacity: 8192,
        _data_dir_lock: data_dir_lock,
        compaction_in_progress: false,
        compaction_jobs: ops::BackgroundJobTracker::new(ops::BackgroundJobConfig {
            max_queued_jobs: 16,
            max_retained_terminal_jobs: 128,
        })
        .map_err(|err| {
            StorageError::InvalidInput(format!("invalid compaction job config: {:?}", err))
        })?,
        last_compaction_job_id: None,
        compaction_max_retries: 1,
        blob_backend,
        blob_store,
    };
    initialize_inbound_adjacency_index(&mut handle)?;
    initialize_typed_inbound_edge_index(&mut handle)?;
    Ok(handle)
}

fn make_blob_store(
    blob_backend: BlobBackend,
    sstable_dir: &std::path::Path,
) -> Result<Box<dyn BlobStore + Send + Sync>> {
    let blob_root = sstable_dir.join("blobs");
    match blob_backend {
        BlobBackend::Local => Ok(Box::new(LocalBlobStore::new(blob_root)?)),
        BlobBackend::Rhodium => Ok(Box::new(RhodiumBlobStore::new(blob_root)?)),
        BlobBackend::Injected => Err(StorageError::InvalidInput(
            "BlobBackend::Injected must be wired via open_store_with_injected_blob_store, not make_blob_store"
                .to_string(),
        )),
    }
}

pub fn open_store_with_injected_blob_store(
    config: StorageConfig,
    store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<StorageHandle> {
    open_store_with_reactor_and_injected_blob_store(
        config,
        std::sync::Arc::new(crate::core::reactor::SystemReactor),
        store,
    )
}

pub fn open_store_with_reactor_and_injected_blob_store(
    config: StorageConfig,
    reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
    store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<StorageHandle> {
    let blob_store =
        Box::new(InjectedBlobStoreAdapter::new(store)?) as Box<dyn BlobStore + Send + Sync>;
    let mut handle = open_store_with_reactor_and_blob_backend(config, reactor, BlobBackend::Local)?;
    handle.blob_store = blob_store;
    handle.blob_backend = BlobBackend::Injected;
    Ok(handle)
}