use std::path::{Path, PathBuf};
use std::sync::Mutex;
use crate::features::query;
use crate::features::runtime;
use crate::features::storage::api as storage_api;
use super::{DriverConfig, DriverError, MutationOptions, Result, RowCursor};
pub struct RustDriver {
config: DriverConfig,
injected_store: Option<std::sync::Arc<dyn alloy_storage::BlobStore>>,
state: Mutex<DriverState>,
}
pub(super) struct DriverState {
pub(super) handle: Option<storage_api::StorageHandle>,
pub(super) ingest_session_active: bool,
pub(super) seen_mutations: std::collections::HashSet<String>,
pub(super) seen_mutation_order: std::collections::VecDeque<String>,
}
pub fn embedded_driver(config: DriverConfig) -> Result<RustDriver> {
RustDriver::new(config)
}
pub fn embedded_driver_with_store(
config: DriverConfig,
store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<RustDriver> {
RustDriver::new_with_store(config, store)
}
impl RustDriver {
pub fn new(config: DriverConfig) -> Result<Self> {
if config.execute_params.morsel_size == 0 {
return Err(DriverError::InvalidConfig(
"execute_params.morsel_size must be > 0".to_string(),
));
}
if config.execute_params.scan_end_exclusive <= config.execute_params.scan_start {
return Err(DriverError::InvalidConfig(
"execute_params.scan_end_exclusive must be > scan_start".to_string(),
));
}
if config.idempotency_cache_capacity == 0 {
return Err(DriverError::InvalidConfig(
"idempotency_cache_capacity must be > 0".to_string(),
));
}
Ok(Self {
config,
injected_store: None,
state: Mutex::new(DriverState {
handle: None,
ingest_session_active: false,
seen_mutations: std::collections::HashSet::new(),
seen_mutation_order: std::collections::VecDeque::new(),
}),
})
}
pub fn new_with_store(
config: DriverConfig,
store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> Result<Self> {
if config.execute_params.morsel_size == 0 {
return Err(DriverError::InvalidConfig(
"execute_params.morsel_size must be > 0".to_string(),
));
}
if config.execute_params.scan_end_exclusive <= config.execute_params.scan_start {
return Err(DriverError::InvalidConfig(
"execute_params.scan_end_exclusive must be > scan_start".to_string(),
));
}
if config.idempotency_cache_capacity == 0 {
return Err(DriverError::InvalidConfig(
"idempotency_cache_capacity must be > 0".to_string(),
));
}
Ok(Self {
config,
injected_store: Some(store),
state: Mutex::new(DriverState {
handle: None,
ingest_session_active: false,
seen_mutations: std::collections::HashSet::new(),
seen_mutation_order: std::collections::VecDeque::new(),
}),
})
}
pub fn query(&self, query_text: &str) -> Result<runtime::RowStream> {
if self.config.execution_mode != runtime::ExecutionMode::Native {
return Err(DriverError::InvalidConfig(
"query text execution is only available in native mode; use query_serialized_plan in plexus mode"
.to_string(),
));
}
let ast = query::parse(query_text)?;
let typed = query::validate(&ast, &query::Catalog)?;
let plan = runtime::explain(&typed)?;
self.with_store_handle(|handle| {
Ok(runtime::execute(
&plan,
&self.config.execute_params,
handle,
)?)
})
}
pub fn query_serialized_plan(&self, serialized_plan: &[u8]) -> Result<runtime::RowStream> {
if self.config.execution_mode != runtime::ExecutionMode::Plexus {
return Err(DriverError::InvalidConfig(
"serialized plan execution requires plexus mode".to_string(),
));
}
self.with_store_handle(|handle| {
Ok(runtime::execute_serialized_plan(
serialized_plan,
&self.config.execute_params,
handle,
)?)
})
}
#[cfg(feature = "rhodium-backend")]
pub fn describe_compiled_plan_cache(
&self,
serialized_plan: &[u8],
) -> Result<rhodium_cache::core::storage::blob::CompiledPlanCacheDescriptor> {
Ok(runtime::describe_compiled_plan_cache(
serialized_plan,
None,
)?)
}
#[cfg(feature = "rhodium-backend")]
pub fn get_compiled_plan_with_options(
&self,
descriptor: &rhodium_cache::core::storage::blob::CompiledPlanCacheDescriptor,
options: storage_api::BlobReadOptions,
) -> Result<Option<storage_api::BlobGetResult>> {
self.with_store_handle(|handle| {
Ok(storage_api::get_compiled_plan_with_options(
handle, descriptor, options,
)?)
})
}
pub fn query_stream(&self, query_text: &str) -> Result<RowCursor> {
let rows = self.query(query_text)?;
Ok(RowCursor {
rows: rows.rows,
index: 0,
})
}
pub fn explain(&self, query_text: &str) -> Result<runtime::ExplainPlan> {
if self.config.execution_mode != runtime::ExecutionMode::Native {
return Err(DriverError::InvalidConfig(
"explain(query_text) is only available in native mode".to_string(),
));
}
let ast = query::parse(query_text)?;
let typed = query::validate(&ast, &query::Catalog)?;
Ok(runtime::explain(&typed)?)
}
pub fn ingest_node(&self, node_id: u64, version: u64, adjacency: &[u64]) -> Result<()> {
self.ingest_node_with_options(node_id, version, adjacency, &MutationOptions::default())
}
pub fn ingest_node_with_options(
&self,
node_id: u64,
version: u64,
adjacency: &[u64],
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
storage_api::put_full_node(handle, node_id, version, adjacency)
})
}
pub fn ingest_edge(&self, node_id: u64, version: u64, payload: &[u8]) -> Result<()> {
self.ingest_edge_with_options(node_id, version, payload, &MutationOptions::default())
}
pub fn ingest_edge_with_options(
&self,
node_id: u64,
version: u64,
payload: &[u8],
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
let delta = storage_api::encode_delta(node_id, version, payload);
storage_api::put_edge_delta(handle, &delta)
})
}
pub fn ingest_vector(&self, node_id: u64, version: u64, values: &[f32]) -> Result<()> {
self.ingest_vector_with_options(node_id, version, values, &MutationOptions::default())
}
pub fn ingest_vector_with_options(
&self,
node_id: u64,
version: u64,
values: &[f32],
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
let payload = storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Cosine,
values,
false,
);
let delta = storage_api::encode_delta(node_id, version, &payload);
storage_api::put_vector_delta(handle, &delta)
})
}
pub fn ingest_nodes_batch(&self, nodes: &[(u64, u64, Vec<u64>)]) -> Result<()> {
self.ingest_nodes_batch_with_options(nodes, &MutationOptions::default())
}
pub fn ingest_nodes_batch_with_options(
&self,
nodes: &[(u64, u64, Vec<u64>)],
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
for (node_id, version, adjacency) in nodes {
storage_api::put_full_node(handle, *node_id, *version, adjacency)?;
}
Ok(())
})
}
pub fn ingest_edges_batch(&self, edges: &[(u64, u64, Vec<u8>)]) -> Result<()> {
self.ingest_edges_batch_with_options(edges, &MutationOptions::default())
}
pub fn ingest_edges_batch_with_options(
&self,
edges: &[(u64, u64, Vec<u8>)],
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
let mut deltas = Vec::with_capacity(edges.len());
for (node_id, version, payload) in edges {
deltas.push(storage_api::encode_delta(*node_id, *version, payload));
}
storage_api::put_edge_deltas_batch(handle, &deltas)
})
}
pub fn create_bitmap_index(&self, index_name: &str, field_path: &str) -> Result<()> {
self.create_bitmap_index_with_options(index_name, field_path, &MutationOptions::default())
}
pub fn create_bitmap_index_with_options(
&self,
index_name: &str,
field_path: &str,
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
storage_api::create_bitmap_index(handle, index_name, field_path)
})
}
pub fn list_bitmap_indexes(&self) -> Result<Vec<(String, String)>> {
self.with_store_handle(|handle| {
Ok(storage_api::list_bitmap_indexes(handle)
.into_iter()
.map(|desc| (desc.index_name, desc.field_path))
.collect())
})
}
pub fn bitmap_add_posting(
&self,
index_name: &str,
value_key: &str,
node_id: u64,
) -> Result<()> {
self.bitmap_add_posting_with_options(
index_name,
value_key,
node_id,
&MutationOptions::default(),
)
}
pub fn bitmap_add_posting_with_options(
&self,
index_name: &str,
value_key: &str,
node_id: u64,
options: &MutationOptions,
) -> Result<()> {
self.with_ingest_handle(options, |handle| {
storage_api::bitmap_add_posting(handle, index_name, value_key, node_id)
})
}
pub fn begin_ingest(&self) -> Result<()> {
let mut guard = self.state_guard()?;
let _ = self.ensure_handle(&mut guard)?;
guard.ingest_session_active = true;
Ok(())
}
pub fn finish_ingest(&self) -> Result<()> {
let mut guard = self.state_guard()?;
if let Some(handle) = guard.handle.as_mut() {
storage_api::flush(handle)?;
}
guard.ingest_session_active = false;
Ok(())
}
pub fn put_blob(&self, blob_id: &str, bytes: &[u8]) -> Result<()> {
self.with_store_handle(|handle| {
storage_api::put_blob(handle, blob_id, bytes)?;
Ok(())
})
}
pub fn put_blob_with_options(
&self,
blob_id: &str,
bytes: &[u8],
options: storage_api::BlobPutOptions,
) -> Result<storage_api::BlobPutResult> {
self.with_store_handle(|handle| {
Ok(storage_api::put_blob_with_options(
handle, blob_id, bytes, options,
)?)
})
}
pub fn get_blob(&self, blob_id: &str) -> Result<Option<Vec<u8>>> {
self.with_store_handle(|handle| Ok(storage_api::get_blob(handle, blob_id)?))
}
pub fn get_blob_with_options(
&self,
blob_id: &str,
options: storage_api::BlobReadOptions,
) -> Result<Option<storage_api::BlobGetResult>> {
self.with_store_handle(|handle| {
Ok(storage_api::get_blob_with_options(
handle, blob_id, options,
)?)
})
}
pub fn has_blob(&self, blob_id: &str) -> Result<bool> {
self.with_store_handle(|handle| Ok(storage_api::has_blob(handle, blob_id)?))
}
pub fn delete_blob(&self, blob_id: &str) -> Result<()> {
self.with_store_handle(|handle| {
storage_api::delete_blob(handle, blob_id)?;
Ok(())
})
}
pub fn has_blobs(&self, blob_ids: &[String]) -> Result<Vec<bool>> {
self.with_store_handle(|handle| Ok(storage_api::has_blobs(handle, blob_ids)?))
}
pub fn delete_blobs(&self, blob_ids: &[String]) -> Result<usize> {
self.with_store_handle(|handle| Ok(storage_api::delete_blobs(handle, blob_ids)?))
}
pub fn list_blob_prefix(
&self,
namespace: &str,
prefix: &str,
limit: usize,
) -> Result<Vec<String>> {
self.with_store_handle(|handle| {
Ok(storage_api::list_blob_prefix(
handle, namespace, prefix, limit,
)?)
})
}
pub fn delete_blob_prefix(
&self,
namespace: &str,
prefix: &str,
batch_limit: usize,
) -> Result<storage_api::BlobPrefixDeleteResult> {
self.with_store_handle(|handle| {
Ok(storage_api::delete_blob_prefix(
handle,
namespace,
prefix,
batch_limit,
)?)
})
}
fn with_ingest_handle<F>(&self, options: &MutationOptions, op: F) -> Result<()>
where
F: FnOnce(&mut storage_api::StorageHandle) -> storage_api::Result<()>,
{
let mut guard = self.state_guard()?;
let idempotency_key = validated_idempotency_key(options)?;
let ingest_session_active = guard.ingest_session_active;
let _ = self.ensure_handle(&mut guard)?;
if let Some(key) = idempotency_key.as_deref() {
if guard.seen_mutations.contains(key) {
return Ok(());
}
}
let handle = self.ensure_handle(&mut guard)?;
op(handle)?;
if !ingest_session_active {
storage_api::flush(handle)?;
}
if let Some(key) = idempotency_key {
remember_mutation_key(&mut guard, &self.config, key);
}
Ok(())
}
fn with_store_handle<F, T>(&self, op: F) -> Result<T>
where
F: FnOnce(&mut storage_api::StorageHandle) -> Result<T>,
{
let mut guard = self.state_guard()?;
let handle = self.ensure_handle(&mut guard)?;
op(handle)
}
pub(super) fn state_guard(&self) -> Result<std::sync::MutexGuard<'_, DriverState>> {
self.state
.lock()
.map_err(|_| DriverError::Internal("driver state mutex poisoned".to_string()))
}
pub(super) fn ensure_handle<'a>(
&self,
state: &'a mut DriverState,
) -> Result<&'a mut storage_api::StorageHandle> {
if state.handle.is_none() {
let mut handle = if let Some(store) = self.injected_store.clone() {
open_store_in_data_dir_with_store(&self.config.data_dir, store)?
} else {
open_store_in_data_dir(&self.config.data_dir, self.config.blob_backend)?
};
storage_api::recover_from_wal(&mut handle)?;
if self.config.persist_idempotency_keys {
load_idempotency_cache_from_disk(state, &self.config)?;
}
state.handle = Some(handle);
}
state
.handle
.as_mut()
.ok_or_else(|| DriverError::Internal("store handle unavailable".to_string()))
}
}
fn open_store_in_data_dir_with_store(
data_dir: &Path,
store: std::sync::Arc<dyn alloy_storage::BlobStore>,
) -> storage_api::Result<storage_api::StorageHandle> {
let wal_dir = data_dir.join("wal");
let manifest_path = data_dir.join("ir.manifest");
let sstable_dir = data_dir.join("sst");
storage_api::open_store_with_injected_blob_store(
storage_api::StorageConfig {
buffer_pool_pages: 1024,
wal_dir,
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir,
},
store,
)
}
fn open_store_in_data_dir(
data_dir: &Path,
blob_backend: storage_api::BlobBackend,
) -> storage_api::Result<storage_api::StorageHandle> {
let wal_dir = data_dir.join("wal");
let manifest_path = data_dir.join("ir.manifest");
let sstable_dir = data_dir.join("sst");
storage_api::open_store_with_blob_backend(
storage_api::StorageConfig {
buffer_pool_pages: 1024,
wal_dir,
wal_segment_max_bytes: 1 << 20,
manifest_path,
sstable_dir,
},
blob_backend,
)
}
fn validated_idempotency_key(options: &MutationOptions) -> Result<Option<String>> {
let Some(raw) = &options.idempotency_key else {
return Ok(None);
};
let trimmed = raw.trim();
if trimmed.is_empty() {
return Err(DriverError::InvalidConfig(
"idempotency_key must not be empty when provided".to_string(),
));
}
Ok(Some(trimmed.to_string()))
}
fn remember_mutation_key(state: &mut DriverState, config: &DriverConfig, key: String) {
remember_mutation_key_in_memory(state, config, key.clone());
if config.persist_idempotency_keys {
let _ = append_idempotency_key_to_disk(config, &key);
}
}
fn remember_mutation_key_in_memory(state: &mut DriverState, config: &DriverConfig, key: String) {
if !state.seen_mutations.insert(key.clone()) {
return;
}
state.seen_mutation_order.push_back(key);
while state.seen_mutations.len() > config.idempotency_cache_capacity {
if let Some(evicted) = state.seen_mutation_order.pop_front() {
state.seen_mutations.remove(&evicted);
}
}
}
fn idempotency_store_path(config: &DriverConfig) -> PathBuf {
config.data_dir.join("idempotency.keys")
}
fn append_idempotency_key_to_disk(config: &DriverConfig, key: &str) -> std::io::Result<()> {
let path = idempotency_store_path(config);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
use std::io::Write;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
writeln!(file, "{}", key)?;
file.flush()?;
Ok(())
}
fn load_idempotency_cache_from_disk(state: &mut DriverState, config: &DriverConfig) -> Result<()> {
let path = idempotency_store_path(config);
let text = match std::fs::read_to_string(path) {
Ok(text) => text,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(err) => {
return Err(DriverError::Internal(format!(
"idempotency load failed: {}",
err
)))
}
};
for line in text.lines() {
let key = line.trim();
if key.is_empty() {
continue;
}
remember_mutation_key_in_memory(state, config, key.to_string());
}
Ok(())
}