use crate::core::reactor::Reactor;
use crate::features::ops;
use crate::features::storage::{bitmap, buffer_pool, manifest, wal};
use crate::features::storage::{compaction, hnsw, memtable, sstable};
use super::blob::{
BlobBackend, BlobStore, InjectedBlobStoreAdapter, LocalBlobStore, RhodiumBlobStore,
};
use super::fs::{
infer_data_root, load_existing_l0_runs, recover_compaction_artifacts, wal_sync_policy_from_env,
DataDirLock,
};
use super::reads::{initialize_inbound_adjacency_index, initialize_typed_inbound_edge_index};
use super::types::{
Result, StorageConfig, StorageError, StorageHandle, ThreadCoreLaneConfig, ThreadCoreRequest,
};
use super::vector::{default_hnsw_graph_view, vector_payload_for_hnsw};
pub fn open_store(config: StorageConfig) -> Result<StorageHandle> {
open_store_with_reactor_and_blob_backend(
config,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
BlobBackend::Local,
)
}
pub fn open_store_with_blob_backend(
config: StorageConfig,
blob_backend: BlobBackend,
) -> Result<StorageHandle> {
open_store_with_reactor_and_blob_backend(
config,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
blob_backend,
)
}
pub fn open_store_for_request(
config: StorageConfig,
request: &ThreadCoreRequest,
lanes: &ThreadCoreLaneConfig,
) -> Result<StorageHandle> {
let owned_shard = request.owned_shard();
let mut lane_config = StorageConfig {
buffer_pool_pages: config.buffer_pool_pages,
wal_dir: config.wal_dir.clone(),
wal_segment_max_bytes: config.wal_segment_max_bytes,
manifest_path: config.manifest_path.clone(),
sstable_dir: config.sstable_dir.clone(),
};
if lanes.partition_wal {
lane_config.wal_dir = config.wal_dir.join(format!("core-{:04}", owned_shard));
}
if lanes.partition_sstable {
lane_config.sstable_dir = config.sstable_dir.join(format!("core-{:04}", owned_shard));
}
open_store(lane_config)
}
pub fn open_store_with_reactor(
config: StorageConfig,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
) -> Result<StorageHandle> {
open_store_with_reactor_and_blob_backend(config, reactor, BlobBackend::Local)
}
pub fn open_store_with_reactor_and_blob_backend(
config: StorageConfig,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
blob_backend: BlobBackend,
) -> Result<StorageHandle> {
let buffer_pool = buffer_pool::BufferPool::new(config.buffer_pool_pages);
let data_root = infer_data_root(&config.wal_dir, &config.sstable_dir, &config.manifest_path);
let data_dir_lock = if let Some(root) = data_root.as_ref() {
Some(DataDirLock::acquire(root)?)
} else {
None
};
let wal_sync_policy = wal_sync_policy_from_env();
let wal = wal::Wal::open_with_reactor_and_policy(
config.wal_dir,
config.wal_segment_max_bytes,
wal_sync_policy,
reactor.clone(),
)?;
let mut manifest =
manifest::Manifest::load_or_create_with_reactor(config.manifest_path, reactor.clone())?;
if manifest.schema_version < manifest::CURRENT_SCHEMA_VERSION {
manifest.schema_version = manifest::CURRENT_SCHEMA_VERSION;
manifest.persist()?;
}
reactor.create_dir_all(&config.sstable_dir)?;
let bitmap_dir = config.sstable_dir.join("bitmap");
recover_compaction_artifacts(&config.sstable_dir, reactor.as_ref())?;
let bitmap_store =
bitmap::BitmapStore::load_or_create_with_reactor(bitmap_dir, reactor.as_ref())?;
let memtable = memtable::MemTable::new();
let (l0_runs, sstable_cache) = load_existing_l0_runs(&config.sstable_dir, reactor.as_ref())?;
let mut hnsw_graphs = std::collections::HashMap::new();
for table in sstable_cache.values() {
for entry in &table.entries {
if entry.kind == sstable::EntryKind::VectorDelta {
if let Some((space_id, vec)) = vector_payload_for_hnsw(&manifest, &entry.value)? {
hnsw_graphs
.entry(space_id)
.or_insert_with(|| hnsw::HnswGraph::new(16, 32, 200))
.insert(entry.key, vec, entry.key ^ entry.version);
}
}
}
}
let hnsw_total_vectors = hnsw_graphs.values().map(|graph| graph.len() as u64).sum();
let hnsw_graph = default_hnsw_graph_view(&manifest, &hnsw_graphs);
let blob_store = make_blob_store(blob_backend, &config.sstable_dir)?;
let mut handle = StorageHandle {
buffer_pool,
wal,
manifest,
bitmap_store,
memtable,
l0_runs,
sstable_cache,
sstable_dir: config.sstable_dir,
metrics: super::types::AmpMetrics::default(),
reactor,
compaction_policy: compaction::CompactionPolicy::new(32, 10),
hnsw_scheduler: hnsw::HnswMaintenanceScheduler::new(0.05),
hnsw_graph,
hnsw_graphs,
hnsw_total_vectors,
hnsw_updated_vectors: 0,
last_hnsw_rebuild_reason: None,
pending_deltas_per_node: std::collections::HashMap::new(),
embedding_pending_nodes: std::collections::HashSet::new(),
tombstoned_node_ids: std::collections::HashSet::new(),
suspended_spaces: std::collections::HashSet::new(),
logical_node_cache: std::collections::HashMap::new(),
logical_node_cache_order: std::collections::VecDeque::new(),
logical_node_cache_capacity: 8192,
_data_dir_lock: data_dir_lock,
compaction_in_progress: false,
compaction_jobs: ops::BackgroundJobTracker::new(ops::BackgroundJobConfig {
max_queued_jobs: 16,
max_retained_terminal_jobs: 128,
})
.map_err(|err| {
StorageError::InvalidInput(format!("invalid compaction job config: {:?}", err))
})?,
last_compaction_job_id: None,
compaction_max_retries: 1,
blob_backend,
blob_store,
};
initialize_inbound_adjacency_index(&mut handle)?;
initialize_typed_inbound_edge_index(&mut handle)?;
Ok(handle)
}
fn make_blob_store(
blob_backend: BlobBackend,
sstable_dir: &std::path::Path,
) -> Result<Box<dyn BlobStore + Send + Sync>> {
let blob_root = sstable_dir.join("blobs");
match blob_backend {
BlobBackend::Local => Ok(Box::new(LocalBlobStore::new(blob_root)?)),
BlobBackend::Rhodium => Ok(Box::new(RhodiumBlobStore::new(blob_root)?)),
BlobBackend::Injected => Err(StorageError::InvalidInput(
"BlobBackend::Injected must be wired via open_store_with_injected_blob_store, not make_blob_store"
.to_string(),
)),
}
}
pub fn open_store_with_injected_blob_store(
config: StorageConfig,
store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<StorageHandle> {
open_store_with_reactor_and_injected_blob_store(
config,
std::sync::Arc::new(crate::core::reactor::SystemReactor),
store,
)
}
pub fn open_store_with_reactor_and_injected_blob_store(
config: StorageConfig,
reactor: std::sync::Arc<dyn Reactor + Send + Sync>,
store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<StorageHandle> {
let blob_store =
Box::new(InjectedBlobStoreAdapter::new(store)?) as Box<dyn BlobStore + Send + Sync>;
let mut handle = open_store_with_reactor_and_blob_backend(config, reactor, BlobBackend::Local)?;
handle.blob_store = blob_store;
handle.blob_backend = BlobBackend::Injected;
Ok(handle)
}