use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use manifoldb_core::encoding::keys::encode_edge_key;
use manifoldb_core::encoding::Encoder;
use manifoldb_core::{Edge, EdgeId, Entity, EntityId};
use manifoldb_storage::backends::redb::{RedbConfig, RedbEngine};
use manifoldb_storage::Transaction;
use crate::cache::{extract_cache_hint, CacheHint, CacheMetrics, QueryCache, QueryCacheKey};
use crate::config::{Config, DatabaseBuilder};
use crate::error::{Error, Result};
use crate::execution::{execute_statement, extract_tables_from_sql};
use crate::index::{IndexInfo, IndexManager, IndexMetadata, IndexStats, IndexType};
use crate::metrics::{CacheMetricsSnapshot, DatabaseMetrics, MetricsSnapshot};
use crate::prepared::{PreparedStatement, PreparedStatementCache};
use crate::schema::SchemaManager;
use crate::transaction::{DatabaseTransaction, TransactionManager};
#[derive(Clone)]
pub struct Database {
inner: Arc<DatabaseInner>,
}
struct DatabaseInner {
manager: TransactionManager<RedbEngine>,
config: Config,
query_cache: QueryCache,
prepared_cache: PreparedStatementCache,
db_metrics: Arc<DatabaseMetrics>,
index_manager: IndexManager,
}
impl Database {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
DatabaseBuilder::new().path(path).open()
}
pub fn in_memory() -> Result<Self> {
DatabaseBuilder::in_memory().open()
}
pub fn open_with_config(config: Config) -> Result<Self> {
let engine = if config.in_memory {
RedbEngine::in_memory().map_err(|e| Error::Open(e.to_string()))?
} else {
let mut redb_config = RedbConfig::new();
if let Some(cache_size) = config.cache_size {
redb_config = redb_config.cache_size(cache_size);
}
if let Some(max_size) = config.max_size {
redb_config = redb_config.max_size(max_size);
}
RedbEngine::open_with_config(&config.path, redb_config)
.map_err(|e| Error::Open(e.to_string()))?
};
let manager = TransactionManager::with_config(engine, config.transaction_config());
let query_cache = QueryCache::new(config.query_cache_config.clone());
let prepared_cache = PreparedStatementCache::default();
let db_metrics = Arc::new(DatabaseMetrics::new());
let index_manager = IndexManager::new(manager.engine_arc());
let inner = DatabaseInner {
manager,
config,
query_cache,
prepared_cache,
db_metrics,
index_manager,
};
let db = Self { inner: Arc::new(inner) };
if let Ok(tx) = db.begin_read() {
if let Ok(version) = SchemaManager::get_version(&tx) {
db.inner.prepared_cache.set_schema_version(version);
}
}
Ok(db)
}
#[must_use]
pub fn builder() -> DatabaseBuilder {
DatabaseBuilder::new()
}
#[must_use]
pub fn config(&self) -> &Config {
&self.inner.config
}
pub fn begin(
&self,
) -> Result<
DatabaseTransaction<<RedbEngine as manifoldb_storage::StorageEngine>::Transaction<'_>>,
> {
self.inner.manager.begin_write().map_err(Error::Transaction)
}
pub fn begin_read(
&self,
) -> Result<
DatabaseTransaction<<RedbEngine as manifoldb_storage::StorageEngine>::Transaction<'_>>,
> {
self.inner.manager.begin_read().map_err(Error::Transaction)
}
pub fn execute(&self, sql: &str) -> Result<u64> {
self.execute_with_params(sql, &[])
}
pub fn execute_with_params(&self, sql: &str, params: &[manifoldb_core::Value]) -> Result<u64> {
let start = Instant::now();
let affected_tables = extract_tables_from_sql(sql);
let is_ddl = Self::is_ddl_statement(sql);
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let result = execute_statement(&mut tx, sql, params);
match result {
Ok(count) => {
let new_schema_version =
if is_ddl { SchemaManager::get_version(&tx).ok() } else { None };
let commit_start = Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
self.inner.query_cache.invalidate_tables(&affected_tables);
self.inner.prepared_cache.invalidate_tables(&affected_tables)?;
if let Some(version) = new_schema_version {
self.inner.prepared_cache.set_schema_version(version);
}
Ok(count)
}
Err(e) => {
self.inner.db_metrics.record_query(start.elapsed(), false);
self.inner.db_metrics.record_rollback();
Err(e)
}
}
}
fn is_ddl_statement(sql: &str) -> bool {
let sql_upper = sql.trim().to_uppercase();
sql_upper.starts_with("CREATE TABLE")
|| sql_upper.starts_with("DROP TABLE")
|| sql_upper.starts_with("CREATE INDEX")
|| sql_upper.starts_with("DROP INDEX")
|| sql_upper.starts_with("ALTER TABLE")
}
pub fn query(&self, sql: &str) -> Result<QueryResult> {
self.query_with_params(sql, &[])
}
pub fn query_with_params(
&self,
sql: &str,
params: &[manifoldb_core::Value],
) -> Result<QueryResult> {
let (hint, clean_sql) = extract_cache_hint(sql);
let use_cache = match hint {
CacheHint::Cache => true,
CacheHint::NoCache => false,
CacheHint::Default => self.inner.query_cache.is_enabled(),
};
if use_cache {
let cache_key = QueryCacheKey::new(&clean_sql, params);
if let Some(cached_result) = self.inner.query_cache.get(&cache_key) {
self.inner.query_cache.touch(&cache_key);
return Ok(cached_result);
}
}
let start = Instant::now();
let tx = self.begin_read()?;
let result = crate::execution::execute_query_with_limit(
&tx,
&clean_sql,
params,
self.inner.config.max_rows_in_memory,
);
match result {
Ok(result_set) => {
self.inner.db_metrics.record_query(start.elapsed(), true);
let result = QueryResult::from_result_set(result_set);
if use_cache {
let cache_key = QueryCacheKey::new(&clean_sql, params);
let accessed_tables = extract_tables_from_sql(&clean_sql);
self.inner.query_cache.insert(cache_key, result.clone(), accessed_tables);
}
Ok(result)
}
Err(e) => {
self.inner.db_metrics.record_query(start.elapsed(), false);
Err(e)
}
}
}
pub fn flush(&self) -> Result<()> {
self.inner.manager.flush().map_err(Error::Transaction)
}
#[must_use]
pub fn transaction_manager(&self) -> &TransactionManager<RedbEngine> {
&self.inner.manager
}
#[must_use]
pub fn query_cache(&self) -> &QueryCache {
&self.inner.query_cache
}
#[must_use]
pub fn cache_metrics(&self) -> Arc<CacheMetrics> {
self.inner.query_cache.metrics()
}
pub fn clear_cache(&self) {
self.inner.query_cache.clear();
}
pub fn invalidate_cache_for_tables(&self, tables: &[String]) {
self.inner.query_cache.invalidate_tables(tables);
}
#[must_use]
pub fn metrics(&self) -> MetricsSnapshot {
let mut snapshot = self.inner.db_metrics.snapshot();
let cache_snapshot = self.inner.query_cache.metrics().snapshot();
snapshot.cache = Some(CacheMetricsSnapshot::from_cache_snapshot(cache_snapshot));
snapshot
}
#[must_use]
pub fn raw_metrics(&self) -> Arc<DatabaseMetrics> {
Arc::clone(&self.inner.db_metrics)
}
pub fn reset_metrics(&self) {
self.inner.db_metrics.reset();
self.inner.query_cache.metrics().reset();
}
pub fn create_index(&self, label: &str, property: &str) -> Result<()> {
self.create_index_with_type(label, property, IndexType::Equality)
}
pub fn create_index_with_type(
&self,
label: &str,
property: &str,
index_type: IndexType,
) -> Result<()> {
self.inner.index_manager.create_index(label, property, index_type)
}
pub fn drop_index(&self, label: &str, property: &str) -> Result<()> {
self.inner.index_manager.drop_index(label, property)
}
pub fn list_indexes(&self) -> Result<Vec<IndexInfo>> {
self.inner.index_manager.list_indexes()
}
pub fn index_stats(&self, label: &str, property: &str) -> Result<IndexStats> {
self.inner.index_manager.index_stats(label, property)
}
pub fn get_index_metadata(&self, label: &str, property: &str) -> Result<Option<IndexMetadata>> {
self.inner.index_manager.get_index_metadata(label, property)
}
pub fn index_lookup(
&self,
label: &str,
property: &str,
value: &manifoldb_core::Value,
) -> Result<Option<Vec<EntityId>>> {
self.inner.index_manager.lookup_eq(label, property, value)
}
pub fn build_planner_catalog(&self) -> Result<manifoldb_query::PlannerCatalog> {
use manifoldb_query::{PlannerCatalog, PlannerIndexInfo, TableStats};
let mut catalog = PlannerCatalog::new();
for idx in self.list_indexes()? {
let index_name = format!("{}_{}_idx", idx.label, idx.property);
let planner_idx = PlannerIndexInfo::btree(
index_name,
&idx.label, vec![idx.property.clone()],
);
catalog = catalog.with_index(planner_idx);
catalog = catalog.with_table(TableStats::new(&idx.label, idx.entry_count as usize));
}
Ok(catalog)
}
pub fn prepare(&self, sql: &str) -> Result<Arc<PreparedStatement>> {
self.inner.prepared_cache.prepare(sql)
}
pub fn prepare_cached(&self, sql: &str) -> Result<Arc<PreparedStatement>> {
self.inner.prepared_cache.get_or_prepare(sql)
}
pub fn query_prepared(
&self,
stmt: &PreparedStatement,
params: &[manifoldb_core::Value],
) -> Result<QueryResult> {
let current_version = self.inner.prepared_cache.schema_version();
if !stmt.is_valid(current_version) {
return Err(Error::Execution(
"Prepared statement is invalid due to schema changes. Please re-prepare."
.to_string(),
));
}
let start = Instant::now();
let tx = self.begin_read()?;
let result = crate::execution::execute_prepared_query(&tx, stmt, params);
match result {
Ok(result_set) => {
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(QueryResult::from_result_set(result_set))
}
Err(e) => {
self.inner.db_metrics.record_query(start.elapsed(), false);
Err(e)
}
}
}
pub fn execute_prepared(
&self,
stmt: &PreparedStatement,
params: &[manifoldb_core::Value],
) -> Result<u64> {
let current_version = self.inner.prepared_cache.schema_version();
if !stmt.is_valid(current_version) {
return Err(Error::Execution(
"Prepared statement is invalid due to schema changes. Please re-prepare."
.to_string(),
));
}
let start = Instant::now();
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let result = crate::execution::execute_prepared_statement(&mut tx, stmt, params);
match result {
Ok(count) => {
let new_schema_version =
if stmt.is_ddl() { SchemaManager::get_version(&tx).ok() } else { None };
let commit_start = Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
let affected_tables: Vec<String> = stmt.accessed_tables().iter().cloned().collect();
self.inner.query_cache.invalidate_tables(&affected_tables);
self.inner.prepared_cache.invalidate_tables(&affected_tables)?;
if let Some(version) = new_schema_version {
self.inner.prepared_cache.set_schema_version(version);
}
Ok(count)
}
Err(e) => {
self.inner.db_metrics.record_query(start.elapsed(), false);
self.inner.db_metrics.record_rollback();
Err(e)
}
}
}
#[must_use]
pub fn prepared_cache(&self) -> &PreparedStatementCache {
&self.inner.prepared_cache
}
pub fn clear_prepared_cache(&self) -> Result<()> {
self.inner.prepared_cache.clear()
}
pub fn bulk_insert_entities(&self, entities: &[Entity]) -> Result<Vec<EntityId>> {
use crate::execution::EntityIndexMaintenance;
use rayon::prelude::*;
if entities.is_empty() {
return Ok(Vec::new());
}
let start = std::time::Instant::now();
let serialized: std::result::Result<Vec<(usize, Vec<u8>)>, Error> = entities
.par_iter()
.enumerate()
.map(|(idx, entity)| {
bincode::serde::encode_to_vec(entity, bincode::config::standard())
.map(|bytes| (idx, bytes))
.map_err(|e| {
Error::Execution(format!(
"Failed to serialize entity at index {}: {}",
idx, e
))
})
})
.collect();
let serialized = serialized?;
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let entity_count = entities.len() as u64;
let start_id = {
let current = match tx.get_metadata(b"next_entity_id")? {
Some(bytes) if bytes.len() == 8 => {
let arr: [u8; 8] = bytes
.try_into()
.map_err(|_| Error::Execution("invalid entity counter".to_string()))?;
u64::from_be_bytes(arr)
}
_ => 1, };
let next = current + entity_count;
tx.put_metadata(b"next_entity_id", &next.to_be_bytes())?;
current
};
let entities_with_ids: std::result::Result<Vec<(EntityId, Entity, Vec<u8>)>, Error> =
entities
.par_iter()
.enumerate()
.map(|(idx, entity)| {
let id = EntityId::new(start_id + idx as u64);
let mut entity_with_id = entity.clone();
entity_with_id.id = id;
bincode::serde::encode_to_vec(&entity_with_id, bincode::config::standard())
.map(|bytes| (id, entity_with_id, bytes))
.map_err(|e| {
Error::Execution(format!(
"Failed to serialize entity at index {}: {}",
idx, e
))
})
})
.collect();
let entities_with_ids = entities_with_ids?;
drop(serialized);
let ids: Vec<EntityId> = entities_with_ids.iter().map(|(id, _, _)| *id).collect();
for (id, entity, bytes) in &entities_with_ids {
let key = id.as_u64().to_be_bytes();
let storage = tx.storage_mut_ref().map_err(Error::Transaction)?;
storage
.put("nodes", &key, bytes)
.map_err(|e| Error::Execution(format!("Failed to write entity: {}", e)))?;
for label in &entity.labels {
let label_bytes = label.as_str().as_bytes();
let len = label_bytes.len() as u16;
let mut label_key = Vec::with_capacity(2 + label_bytes.len() + 8);
label_key.extend_from_slice(&len.to_be_bytes());
label_key.extend_from_slice(label_bytes);
label_key.extend_from_slice(&id.as_u64().to_be_bytes());
storage
.put("label_index", &label_key, &[])
.map_err(|e| Error::Execution(format!("Failed to write label index: {}", e)))?;
}
EntityIndexMaintenance::on_insert(&mut tx, entity)
.map_err(|e| Error::Execution(format!("Index maintenance failed: {}", e)))?;
self.inner.index_manager.on_entity_upsert_tx(
tx.storage_mut_ref().map_err(Error::Transaction)?,
entity,
None, )?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(ids)
}
pub fn bulk_insert_edges(&self, edges: &[Edge]) -> Result<Vec<EdgeId>> {
use manifoldb_graph::index::IndexMaintenance;
use rayon::prelude::*;
const TABLE_EDGES: &str = "edges";
const TABLE_EDGES_OUT: &str = "edges_out";
const TABLE_EDGES_IN: &str = "edges_in";
fn make_adjacency_key(entity_id: EntityId, edge_id: EdgeId) -> [u8; 16] {
let mut key = [0u8; 16];
key[0..8].copy_from_slice(&entity_id.as_u64().to_be_bytes());
key[8..16].copy_from_slice(&edge_id.as_u64().to_be_bytes());
key
}
if edges.is_empty() {
return Ok(Vec::new());
}
let start = std::time::Instant::now();
{
let tx = self.begin_read()?;
let mut entity_ids_to_check: Vec<EntityId> =
edges.iter().flat_map(|e| [e.source, e.target]).collect();
entity_ids_to_check.sort_unstable();
entity_ids_to_check.dedup();
for entity_id in &entity_ids_to_check {
if tx.get_entity(*entity_id)?.is_none() {
return Err(Error::InvalidEntityReference(*entity_id));
}
}
}
let serialized: std::result::Result<Vec<(usize, Vec<u8>)>, Error> = edges
.par_iter()
.enumerate()
.map(|(idx, edge)| {
bincode::serde::encode_to_vec(edge, bincode::config::standard())
.map(|bytes| (idx, bytes))
.map_err(|e| {
Error::Execution(format!(
"Failed to serialize edge at index {}: {}",
idx, e
))
})
})
.collect();
let serialized = serialized?;
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let edge_count = edges.len() as u64;
let start_id = {
let current = match tx.get_metadata(b"next_edge_id")? {
Some(bytes) if bytes.len() == 8 => {
let arr: [u8; 8] = bytes
.try_into()
.map_err(|_| Error::Execution("invalid edge counter".to_string()))?;
u64::from_be_bytes(arr)
}
_ => 1, };
let next = current + edge_count;
tx.put_metadata(b"next_edge_id", &next.to_be_bytes())?;
current
};
let edges_with_ids: std::result::Result<Vec<(EdgeId, Edge, Vec<u8>)>, Error> = edges
.par_iter()
.enumerate()
.map(|(idx, edge)| {
let id = EdgeId::new(start_id + idx as u64);
let mut edge_with_id = edge.clone();
edge_with_id.id = id;
edge_with_id.encode().map(|bytes| (id, edge_with_id, bytes)).map_err(|e| {
Error::Execution(format!("Failed to serialize edge at index {}: {}", idx, e))
})
})
.collect();
let edges_with_ids = edges_with_ids?;
drop(serialized);
let ids: Vec<EdgeId> = edges_with_ids.iter().map(|(id, _, _)| *id).collect();
for (id, edge, bytes) in &edges_with_ids {
let key = encode_edge_key(*id);
tx.storage_mut_ref()
.map_err(Error::Transaction)?
.put(TABLE_EDGES, &key, bytes)
.map_err(|e| Error::Execution(format!("Failed to write edge: {}", e)))?;
let out_key = make_adjacency_key(edge.source, *id);
tx.storage_mut_ref()
.map_err(Error::Transaction)?
.put(TABLE_EDGES_OUT, &out_key, &[])
.map_err(|e| Error::Execution(format!("Failed to write outgoing index: {}", e)))?;
let in_key = make_adjacency_key(edge.target, *id);
tx.storage_mut_ref()
.map_err(Error::Transaction)?
.put(TABLE_EDGES_IN, &in_key, &[])
.map_err(|e| Error::Execution(format!("Failed to write incoming index: {}", e)))?;
IndexMaintenance::add_edge_indexes(
tx.storage_mut_ref().map_err(Error::Transaction)?,
edge,
)
.map_err(|e| Error::Execution(format!("Edge index maintenance failed: {}", e)))?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(ids)
}
pub fn bulk_insert_vectors(
&self,
collection_name: &str,
vectors: &[(manifoldb_core::EntityId, String, Vec<f32>)],
) -> Result<usize> {
use crate::collection::{CollectionManager, CollectionName};
use crate::vector::update_point_vector_in_index;
use manifoldb_core::PointId;
use manifoldb_vector::{
encode_vector_value, encoding::encode_collection_vector_key, VectorData,
TABLE_COLLECTION_VECTORS,
};
if vectors.is_empty() {
return Ok(0);
}
let start = std::time::Instant::now();
let count = vectors.len();
let coll_name =
CollectionName::new(collection_name).map_err(|e| Error::InvalidInput(e.to_string()))?;
{
let tx = self.begin_read()?;
for (entity_id, _, _) in vectors {
if tx.get_entity(*entity_id)?.is_none() {
return Err(Error::EntityNotFound(*entity_id));
}
}
}
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let collection_id = match CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
{
Some(collection) => collection.id(),
None => {
let collection = CollectionManager::create(&mut tx, &coll_name, std::iter::empty())
.map_err(|e| Error::Collection(e.to_string()))?;
collection.id()
}
};
{
let storage = tx.storage_mut().map_err(Error::Transaction)?;
for (entity_id, vector_name, data) in vectors {
let vector_data = VectorData::Dense(data.clone());
let key = encode_collection_vector_key(collection_id, *entity_id, vector_name);
let value = encode_vector_value(&vector_data, vector_name);
storage.put(TABLE_COLLECTION_VECTORS, &key, &value).map_err(Error::Storage)?;
}
}
for (entity_id, vector_name, data) in vectors {
let point_id = PointId::new(entity_id.as_u64());
update_point_vector_in_index(&mut tx, collection_name, vector_name, point_id, data)
.map_err(|e| Error::Vector(e.to_string()))?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(count)
}
pub fn bulk_insert_named_vectors(
&self,
collection_name: &str,
vector_name: &str,
vectors: &[(manifoldb_core::EntityId, Vec<f32>)],
) -> Result<usize> {
let expanded: Vec<(manifoldb_core::EntityId, String, Vec<f32>)> =
vectors.iter().map(|(id, data)| (*id, vector_name.to_string(), data.clone())).collect();
self.bulk_insert_vectors(collection_name, &expanded)
}
pub fn bulk_delete_vectors(
&self,
vectors: &[(manifoldb_core::EntityId, String)],
) -> Result<usize> {
use crate::collection::{CollectionManager, CollectionName};
use crate::vector::remove_point_vector_from_index;
use manifoldb_core::PointId;
use manifoldb_vector::{encoding::encode_collection_vector_key, TABLE_COLLECTION_VECTORS};
if vectors.is_empty() {
return Ok(0);
}
let start = std::time::Instant::now();
{
let tx = self.begin_read()?;
for (entity_id, _) in vectors {
if tx.get_entity(*entity_id)?.is_none() {
return Err(Error::EntityNotFound(*entity_id));
}
}
}
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let mut deleted_count = 0;
let collections: Vec<(CollectionName, manifoldb_core::CollectionId)> = {
let names =
CollectionManager::list(&tx).map_err(|e| Error::Collection(e.to_string()))?;
let mut result = Vec::new();
for name in names {
if let Some(collection) = CollectionManager::get(&tx, &name)
.map_err(|e| Error::Collection(e.to_string()))?
{
result.push((name, collection.id()));
}
}
result
};
let mut hnsw_updates: Vec<(&str, manifoldb_core::EntityId, &str)> = Vec::new();
{
let storage = tx.storage_mut().map_err(Error::Transaction)?;
for (entity_id, vector_name) in vectors {
for (collection_name, collection_id) in &collections {
let key = encode_collection_vector_key(*collection_id, *entity_id, vector_name);
if storage
.get(TABLE_COLLECTION_VECTORS, &key)
.map_err(Error::Storage)?
.is_some()
{
storage.delete(TABLE_COLLECTION_VECTORS, &key).map_err(Error::Storage)?;
deleted_count += 1;
hnsw_updates.push((collection_name.as_str(), *entity_id, vector_name));
break; }
}
}
}
for (collection_name, entity_id, vector_name) in hnsw_updates {
let point_id = PointId::new(entity_id.as_u64());
remove_point_vector_from_index(&mut tx, collection_name, vector_name, point_id)
.map_err(|e| Error::Vector(e.to_string()))?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(deleted_count)
}
pub fn bulk_delete_vectors_by_name(
&self,
vector_name: &str,
entity_ids: &[manifoldb_core::EntityId],
) -> Result<usize> {
let expanded: Vec<(manifoldb_core::EntityId, String)> =
entity_ids.iter().map(|id| (*id, vector_name.to_string())).collect();
self.bulk_delete_vectors(&expanded)
}
pub fn bulk_update_vectors(
&self,
collection_name: &str,
vectors: &[(manifoldb_core::EntityId, String, Vec<f32>)],
) -> Result<usize> {
use crate::collection::{CollectionManager, CollectionName};
use crate::vector::update_point_vector_in_index;
use manifoldb_core::PointId;
use manifoldb_vector::{
encode_vector_value, encoding::encode_collection_vector_key, VectorData,
TABLE_COLLECTION_VECTORS,
};
if vectors.is_empty() {
return Ok(0);
}
let start = std::time::Instant::now();
let count = vectors.len();
let coll_name =
CollectionName::new(collection_name).map_err(|e| Error::InvalidInput(e.to_string()))?;
{
let tx = self.begin_read()?;
let collection = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.ok_or_else(|| {
Error::Collection(format!("Collection '{}' not found", collection_name))
})?;
let collection_id = collection.id();
let storage = tx.storage_ref().map_err(Error::Transaction)?;
for (entity_id, vector_name, _) in vectors {
if tx.get_entity(*entity_id)?.is_none() {
return Err(Error::EntityNotFound(*entity_id));
}
let key = encode_collection_vector_key(collection_id, *entity_id, vector_name);
if storage.get(TABLE_COLLECTION_VECTORS, &key).map_err(Error::Storage)?.is_none() {
return Err(Error::Vector(format!(
"Entity {} does not have vector '{}' to update",
entity_id, vector_name
)));
}
}
}
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let collection_id = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.map(|c| c.id())
.ok_or_else(|| {
Error::Collection(format!("Collection '{}' not found", collection_name))
})?;
{
let storage = tx.storage_mut().map_err(Error::Transaction)?;
for (entity_id, vector_name, data) in vectors {
let vector_data = VectorData::Dense(data.clone());
let key = encode_collection_vector_key(collection_id, *entity_id, vector_name);
let value = encode_vector_value(&vector_data, vector_name);
storage.put(TABLE_COLLECTION_VECTORS, &key, &value).map_err(Error::Storage)?;
}
}
for (entity_id, vector_name, data) in vectors {
let point_id = PointId::new(entity_id.as_u64());
update_point_vector_in_index(&mut tx, collection_name, vector_name, point_id, data)
.map_err(|e| Error::Vector(e.to_string()))?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(count)
}
pub fn bulk_replace_named_vectors(
&self,
collection_name: &str,
vector_name: &str,
vectors: &[(manifoldb_core::EntityId, Vec<f32>)],
) -> Result<usize> {
let expanded: Vec<(manifoldb_core::EntityId, String, Vec<f32>)> =
vectors.iter().map(|(id, data)| (*id, vector_name.to_string(), data.clone())).collect();
self.bulk_update_vectors(collection_name, &expanded)
}
pub fn get_vector(
&self,
collection_name: &str,
entity_id: manifoldb_core::EntityId,
vector_name: &str,
) -> Result<Option<manifoldb_vector::VectorData>> {
use crate::collection::{CollectionManager, CollectionName};
use manifoldb_vector::{encoding::encode_collection_vector_key, TABLE_COLLECTION_VECTORS};
let coll_name =
CollectionName::new(collection_name).map_err(|e| Error::InvalidInput(e.to_string()))?;
let tx = self.begin_read()?;
let collection = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.ok_or_else(|| {
Error::Collection(format!("collection '{}' not found", collection_name))
})?;
let collection_id = collection.id();
let storage = tx.storage_ref().map_err(Error::Transaction)?;
let key = encode_collection_vector_key(collection_id, entity_id, vector_name);
match storage.get(TABLE_COLLECTION_VECTORS, &key).map_err(Error::Storage)? {
Some(bytes) => {
let (data, _name) =
manifoldb_vector::store::decode_vector_value(&bytes).map_err(|e| {
Error::Storage(manifoldb_storage::StorageError::Serialization(
e.to_string(),
))
})?;
Ok(Some(data))
}
None => Ok(None),
}
}
pub fn get_all_vectors(
&self,
collection_name: &str,
entity_id: manifoldb_core::EntityId,
) -> Result<std::collections::HashMap<String, manifoldb_vector::VectorData>> {
use crate::collection::{CollectionManager, CollectionName};
use manifoldb_storage::Cursor;
use manifoldb_vector::{encoding::encode_entity_vector_prefix, TABLE_COLLECTION_VECTORS};
use std::ops::Bound;
let coll_name =
CollectionName::new(collection_name).map_err(|e| Error::InvalidInput(e.to_string()))?;
let tx = self.begin_read()?;
let collection = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.ok_or_else(|| {
Error::Collection(format!("collection '{}' not found", collection_name))
})?;
let collection_id = collection.id();
let storage = tx.storage_ref().map_err(Error::Transaction)?;
let prefix = encode_entity_vector_prefix(collection_id, entity_id);
let prefix_end = next_prefix(&prefix);
let mut cursor = storage
.range(
TABLE_COLLECTION_VECTORS,
Bound::Included(prefix.as_slice()),
Bound::Excluded(prefix_end.as_slice()),
)
.map_err(Error::Storage)?;
let mut vectors = std::collections::HashMap::new();
while let Some((_key, value)) = cursor.next().map_err(Error::Storage)? {
let (data, vector_name) = manifoldb_vector::store::decode_vector_value(&value)
.map_err(|e| {
Error::Storage(manifoldb_storage::StorageError::Serialization(e.to_string()))
})?;
vectors.insert(vector_name, data);
}
Ok(vectors)
}
pub fn has_vector(
&self,
collection_name: &str,
entity_id: manifoldb_core::EntityId,
vector_name: &str,
) -> Result<bool> {
use crate::collection::{CollectionManager, CollectionName};
use manifoldb_vector::{encoding::encode_collection_vector_key, TABLE_COLLECTION_VECTORS};
let coll_name =
CollectionName::new(collection_name).map_err(|e| Error::InvalidInput(e.to_string()))?;
let tx = self.begin_read()?;
let collection = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.ok_or_else(|| {
Error::Collection(format!("collection '{}' not found", collection_name))
})?;
let collection_id = collection.id();
let storage = tx.storage_ref().map_err(Error::Transaction)?;
let key = encode_collection_vector_key(collection_id, entity_id, vector_name);
Ok(storage.get(TABLE_COLLECTION_VECTORS, &key).map_err(Error::Storage)?.is_some())
}
pub fn bulk_upsert_entities(&self, entities: &[Entity]) -> Result<(usize, usize)> {
use crate::execution::EntityIndexMaintenance;
use rayon::prelude::*;
if entities.is_empty() {
return Ok((0, 0));
}
let start = std::time::Instant::now();
let mut to_insert: Vec<(usize, &Entity)> = Vec::new();
let mut to_update: Vec<(usize, &Entity, Entity)> = Vec::new();
{
let tx = self.begin_read()?;
for (idx, entity) in entities.iter().enumerate() {
if entity.id.as_u64() == 0 {
to_insert.push((idx, entity));
} else {
match tx.get_entity(entity.id)? {
Some(old_entity) => {
to_update.push((idx, entity, old_entity));
}
None => {
to_insert.push((idx, entity));
}
}
}
}
}
let inserted_count = to_insert.len();
let updated_count = to_update.len();
let validation_result: std::result::Result<(), Error> =
entities.par_iter().enumerate().try_for_each(|(idx, entity)| {
bincode::serde::encode_to_vec(entity, bincode::config::standard())
.map(|_| ())
.map_err(|e| {
Error::Execution(format!(
"Failed to serialize entity at index {}: {}",
idx, e
))
})
});
validation_result?;
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let new_entity_ids = if to_insert.is_empty() {
Vec::new()
} else {
let entity_count = to_insert.len() as u64;
let start_id = {
let current = match tx.get_metadata(b"next_entity_id")? {
Some(bytes) if bytes.len() == 8 => {
let arr: [u8; 8] = bytes
.try_into()
.map_err(|_| Error::Execution("invalid entity counter".to_string()))?;
u64::from_be_bytes(arr)
}
_ => 1, };
let next = current + entity_count;
tx.put_metadata(b"next_entity_id", &next.to_be_bytes())?;
current
};
to_insert
.iter()
.enumerate()
.map(|(i, _)| EntityId::new(start_id + i as u64))
.collect::<Vec<_>>()
};
let serialized_inserts: std::result::Result<Vec<(EntityId, Entity, Vec<u8>)>, Error> =
to_insert
.par_iter()
.zip(new_entity_ids.par_iter())
.map(|((_, entity), &id)| {
let mut entity_with_id = (*entity).clone();
entity_with_id.id = id;
bincode::serde::encode_to_vec(&entity_with_id, bincode::config::standard())
.map(|bytes| (id, entity_with_id, bytes))
.map_err(|e| {
Error::Execution(format!(
"Failed to serialize entity for insert: {}",
e
))
})
})
.collect();
let serialized_inserts = serialized_inserts?;
let serialized_updates: std::result::Result<
Vec<(EntityId, Entity, Entity, Vec<u8>)>,
Error,
> = to_update
.par_iter()
.map(|(_, new_entity, old_entity)| {
let entity_with_id = (*new_entity).clone();
bincode::serde::encode_to_vec(&entity_with_id, bincode::config::standard())
.map(|bytes| (entity_with_id.id, entity_with_id, old_entity.clone(), bytes))
.map_err(|e| {
Error::Execution(format!("Failed to serialize entity for update: {}", e))
})
})
.collect();
let serialized_updates = serialized_updates?;
for (id, entity, bytes) in &serialized_inserts {
let key = id.as_u64().to_be_bytes();
let storage = tx.storage_mut_ref().map_err(Error::Transaction)?;
storage
.put("nodes", &key, bytes)
.map_err(|e| Error::Execution(format!("Failed to write entity: {}", e)))?;
for label in &entity.labels {
let label_bytes = label.as_str().as_bytes();
let len = label_bytes.len() as u16;
let mut label_key = Vec::with_capacity(2 + label_bytes.len() + 8);
label_key.extend_from_slice(&len.to_be_bytes());
label_key.extend_from_slice(label_bytes);
label_key.extend_from_slice(&id.as_u64().to_be_bytes());
storage
.put("label_index", &label_key, &[])
.map_err(|e| Error::Execution(format!("Failed to write label index: {}", e)))?;
}
EntityIndexMaintenance::on_insert(&mut tx, entity)
.map_err(|e| Error::Execution(format!("Index maintenance failed: {}", e)))?;
self.inner.index_manager.on_entity_upsert_tx(
tx.storage_mut_ref().map_err(Error::Transaction)?,
entity,
None, )?;
}
for (id, new_entity, old_entity, bytes) in &serialized_updates {
let key = id.as_u64().to_be_bytes();
let storage = tx.storage_mut_ref().map_err(Error::Transaction)?;
storage
.put("nodes", &key, bytes)
.map_err(|e| Error::Execution(format!("Failed to write entity: {}", e)))?;
for old_label in &old_entity.labels {
if !new_entity.labels.contains(old_label) {
let label_bytes = old_label.as_str().as_bytes();
let len = label_bytes.len() as u16;
let mut label_key = Vec::with_capacity(2 + label_bytes.len() + 8);
label_key.extend_from_slice(&len.to_be_bytes());
label_key.extend_from_slice(label_bytes);
label_key.extend_from_slice(&id.as_u64().to_be_bytes());
storage.delete("label_index", &label_key).map_err(|e| {
Error::Execution(format!("Failed to delete label index: {}", e))
})?;
}
}
for new_label in &new_entity.labels {
if !old_entity.labels.contains(new_label) {
let label_bytes = new_label.as_str().as_bytes();
let len = label_bytes.len() as u16;
let mut label_key = Vec::with_capacity(2 + label_bytes.len() + 8);
label_key.extend_from_slice(&len.to_be_bytes());
label_key.extend_from_slice(label_bytes);
label_key.extend_from_slice(&id.as_u64().to_be_bytes());
storage.put("label_index", &label_key, &[]).map_err(|e| {
Error::Execution(format!("Failed to write label index: {}", e))
})?;
}
}
EntityIndexMaintenance::on_update(&mut tx, old_entity, new_entity)
.map_err(|e| Error::Execution(format!("Index maintenance failed: {}", e)))?;
self.inner.index_manager.on_entity_upsert_tx(
tx.storage_mut_ref().map_err(Error::Transaction)?,
new_entity,
Some(old_entity),
)?;
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok((inserted_count, updated_count))
}
pub fn bulk_delete_entities(&self, entity_ids: &[EntityId]) -> Result<usize> {
self.bulk_delete_entities_impl(entity_ids, true)
}
pub fn bulk_delete_entities_checked(&self, entity_ids: &[EntityId]) -> Result<usize> {
self.bulk_delete_entities_impl(entity_ids, false)
}
fn bulk_delete_entities_impl(
&self,
entity_ids: &[EntityId],
cascade_edges: bool,
) -> Result<usize> {
use crate::collection::CollectionManager;
use crate::execution::EntityIndexMaintenance;
use manifoldb_storage::Cursor;
use manifoldb_vector::{encoding::encode_entity_vector_prefix, TABLE_COLLECTION_VECTORS};
use std::collections::HashSet;
use std::ops::Bound;
if entity_ids.is_empty() {
return Ok(0);
}
let start = std::time::Instant::now();
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let mut deleted_count = 0;
let mut affected_tables: HashSet<String> = HashSet::new();
let collection_ids: Vec<_> = {
let names =
CollectionManager::list(&tx).map_err(|e| Error::Collection(e.to_string()))?;
let mut ids = Vec::new();
for name in names {
if let Some(collection) = CollectionManager::get(&tx, &name)
.map_err(|e| Error::Collection(e.to_string()))?
{
ids.push(collection.id());
}
}
ids
};
for &entity_id in entity_ids {
let entity = match tx.get_entity(entity_id)? {
Some(e) => e,
None => continue, };
for label in &entity.labels {
affected_tables.insert(label.as_str().to_string());
}
if cascade_edges {
let outgoing = tx.get_outgoing_edges(entity_id)?;
let incoming = tx.get_incoming_edges(entity_id)?;
for edge in &outgoing {
tx.delete_edge(edge.id)?;
}
for edge in &incoming {
if edge.source != edge.target {
tx.delete_edge(edge.id)?;
}
}
} else {
if tx.has_edges(entity_id)? {
return Err(Error::bulk_operation(format!(
"entity {} has connected edges; use bulk_delete_entities for cascade delete",
entity_id.as_u64()
)));
}
}
{
let storage = tx.storage_mut().map_err(Error::Transaction)?;
for &collection_id in &collection_ids {
let prefix = encode_entity_vector_prefix(collection_id, entity_id);
let prefix_end = {
let mut result = prefix.clone();
for byte in result.iter_mut().rev() {
if *byte < 0xFF {
*byte += 1;
break;
}
}
result
};
let mut keys_to_delete: Vec<Vec<u8>> = Vec::new();
{
let mut cursor = storage
.range(
TABLE_COLLECTION_VECTORS,
Bound::Included(prefix.as_slice()),
Bound::Excluded(prefix_end.as_slice()),
)
.map_err(Error::Storage)?;
while let Some((key, _)) = cursor.next().map_err(Error::Storage)? {
keys_to_delete.push(key.clone());
}
}
for key in keys_to_delete {
storage.delete(TABLE_COLLECTION_VECTORS, &key).map_err(Error::Storage)?;
}
}
}
EntityIndexMaintenance::on_delete(&mut tx, &entity)
.map_err(|e| Error::Execution(format!("property index removal failed: {e}")))?;
self.inner
.index_manager
.on_entity_delete_tx(tx.storage_mut_ref().map_err(Error::Transaction)?, &entity)?;
crate::vector::remove_entity_from_indexes(&mut tx, &entity)
.map_err(|e| Error::Execution(format!("vector index removal failed: {e}")))?;
if tx.delete_entity(entity_id)? {
deleted_count += 1;
}
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
let table_list: Vec<String> = affected_tables.into_iter().collect();
self.inner.query_cache.invalidate_tables(&table_list);
if let Err(e) = self.inner.prepared_cache.invalidate_tables(&table_list) {
eprintln!("Warning: failed to invalidate prepared cache: {e}");
}
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(deleted_count)
}
pub fn bulk_delete_edges(&self, edge_ids: &[EdgeId]) -> Result<usize> {
if edge_ids.is_empty() {
return Ok(0);
}
let start = std::time::Instant::now();
let mut tx = self.begin()?;
self.inner.db_metrics.transactions.record_start();
let mut deleted_count = 0;
for &edge_id in edge_ids {
if tx.delete_edge(edge_id)? {
deleted_count += 1;
}
}
let commit_start = std::time::Instant::now();
tx.commit().map_err(Error::Transaction)?;
self.inner.db_metrics.record_commit(commit_start.elapsed());
self.inner.db_metrics.record_query(start.elapsed(), true);
Ok(deleted_count)
}
pub fn create_collection(
&self,
name: &str,
) -> Result<
crate::collection::CollectionBuilder<
std::sync::Arc<manifoldb_storage::backends::RedbEngine>,
>,
> {
let coll_name = crate::collection::CollectionName::new(name)
.map_err(|e| Error::Collection(e.to_string()))?;
Ok(crate::collection::CollectionBuilder::new(self.inner.manager.engine_arc(), coll_name))
}
pub fn collection(
&self,
name: &str,
) -> Result<
crate::collection::CollectionHandle<
std::sync::Arc<manifoldb_storage::backends::RedbEngine>,
>,
> {
let coll_name = crate::collection::CollectionName::new(name)
.map_err(|e| Error::Collection(e.to_string()))?;
crate::collection::CollectionHandle::open(self.inner.manager.engine_arc(), coll_name)
.map_err(|e| Error::Collection(e.to_string()))
}
pub fn drop_collection(&self, name: &str) -> Result<()> {
use crate::collection::{CollectionManager, CollectionName};
use crate::vector::drop_indexes_for_collection;
use manifoldb_storage::Cursor;
use manifoldb_vector::{
encoding::encode_collection_vector_prefix, TABLE_COLLECTION_VECTORS,
};
use std::ops::Bound;
let coll_name = CollectionName::new(name).map_err(|e| Error::Collection(e.to_string()))?;
let mut tx = self.begin()?;
let collection = CollectionManager::get(&tx, &coll_name)
.map_err(|e| Error::Collection(e.to_string()))?
.ok_or_else(|| Error::Collection(format!("Collection '{}' not found", name)))?;
let collection_id = collection.id();
drop_indexes_for_collection(&mut tx, name).map_err(|e| Error::Vector(e.to_string()))?;
{
let prefix = encode_collection_vector_prefix(collection_id);
let next_prefix = next_prefix(&prefix);
let storage = tx.storage_mut().map_err(Error::Transaction)?;
let mut keys_to_delete: Vec<Vec<u8>> = Vec::new();
let mut cursor = storage
.range(
TABLE_COLLECTION_VECTORS,
Bound::Included(prefix.as_slice()),
Bound::Excluded(next_prefix.as_slice()),
)
.map_err(Error::Storage)?;
while let Some((key, _)) = cursor.next()? {
keys_to_delete.push(key.clone());
}
drop(cursor);
for key in &keys_to_delete {
storage.delete(TABLE_COLLECTION_VECTORS, key).map_err(Error::Storage)?;
}
}
CollectionManager::delete(&mut tx, &coll_name, false)
.map_err(|e| Error::Collection(e.to_string()))?;
tx.commit().map_err(Error::Transaction)?;
Ok(())
}
pub fn list_collections(&self) -> Result<Vec<String>> {
use crate::collection::CollectionManager;
let tx = self.begin_read()?;
let collections =
CollectionManager::list(&tx).map_err(|e| Error::Collection(e.to_string()))?;
Ok(collections.into_iter().map(|c| c.as_str().to_string()).collect())
}
pub fn search(
&self,
collection: &str,
vector_name: &str,
) -> Result<crate::search::EntitySearchBuilder> {
let handle = self.collection(collection)?;
let engine = self.inner.manager.engine_arc();
Ok(crate::search::EntitySearchBuilder::new(handle, engine, vector_name))
}
pub fn upsert(&self, collection: &str, entity: &Entity) -> Result<()> {
use crate::search::entity_to_point_struct;
let handle = self.collection(collection)?;
let point = entity_to_point_struct(entity, collection);
handle.upsert_point(point).map_err(|e| Error::Collection(e.to_string()))
}
pub fn upsert_batch(&self, collection: &str, entities: &[Entity]) -> Result<()> {
use crate::search::entity_to_point_struct;
let handle = self.collection(collection)?;
for entity in entities {
let point = entity_to_point_struct(entity, collection);
handle.upsert_point(point).map_err(|e| Error::Collection(e.to_string()))?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct QueryResult {
columns: Vec<String>,
rows: Vec<QueryRow>,
}
impl QueryResult {
#[must_use]
pub fn empty() -> Self {
Self { columns: Vec::new(), rows: Vec::new() }
}
#[must_use]
pub fn new(columns: Vec<String>, rows: Vec<QueryRow>) -> Self {
Self { columns, rows }
}
#[must_use]
pub fn from_result_set(result_set: manifoldb_query::ResultSet) -> Self {
let columns = result_set.columns().into_iter().map(|s| s.to_owned()).collect();
let rows =
result_set.into_iter().map(|row| QueryRow { values: row.values().to_vec() }).collect();
Self { columns, rows }
}
#[must_use]
pub fn columns(&self) -> &[String] {
&self.columns
}
#[must_use]
pub fn num_columns(&self) -> usize {
self.columns.len()
}
#[must_use]
pub fn rows(&self) -> &[QueryRow] {
&self.rows
}
#[must_use]
pub fn len(&self) -> usize {
self.rows.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
#[must_use]
pub fn get(&self, index: usize) -> Option<&QueryRow> {
self.rows.get(index)
}
#[must_use]
pub fn first(&self) -> Option<&QueryRow> {
self.rows.first()
}
pub fn iter(&self) -> impl Iterator<Item = &QueryRow> {
self.rows.iter()
}
#[must_use]
pub fn column_index(&self, name: &str) -> Option<usize> {
self.columns.iter().position(|c| c == name)
}
}
impl IntoIterator for QueryResult {
type Item = QueryRow;
type IntoIter = std::vec::IntoIter<QueryRow>;
fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}
impl<'a> IntoIterator for &'a QueryResult {
type Item = &'a QueryRow;
type IntoIter = std::slice::Iter<'a, QueryRow>;
fn into_iter(self) -> Self::IntoIter {
self.rows.iter()
}
}
#[derive(Debug, Clone)]
pub struct QueryRow {
values: Vec<manifoldb_core::Value>,
}
impl QueryRow {
#[must_use]
pub fn new(values: Vec<manifoldb_core::Value>) -> Self {
Self { values }
}
#[must_use]
pub fn get(&self, index: usize) -> Option<&manifoldb_core::Value> {
self.values.get(index)
}
#[must_use]
pub fn values(&self) -> &[manifoldb_core::Value] {
&self.values
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn get_as<T: FromValue>(&self, index: usize) -> Result<T> {
self.values
.get(index)
.ok_or_else(|| Error::InvalidParameter(format!("column index {} out of bounds", index)))
.and_then(T::from_value)
}
}
pub trait FromValue: Sized {
fn from_value(value: &manifoldb_core::Value) -> Result<Self>;
}
impl FromValue for String {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::String(s) => Ok(s.clone()),
_ => Err(Error::Type(format!("expected string, got {:?}", value))),
}
}
}
impl FromValue for i64 {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::Int(n) => Ok(*n),
_ => Err(Error::Type(format!("expected integer, got {:?}", value))),
}
}
}
impl FromValue for f64 {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::Float(f) => Ok(*f),
manifoldb_core::Value::Int(n) => Ok(*n as f64),
_ => Err(Error::Type(format!("expected float, got {:?}", value))),
}
}
}
impl FromValue for bool {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::Bool(b) => Ok(*b),
_ => Err(Error::Type(format!("expected boolean, got {:?}", value))),
}
}
}
impl FromValue for Vec<f32> {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::Vector(v) => Ok(v.clone()),
_ => Err(Error::Type(format!("expected vector, got {:?}", value))),
}
}
}
impl<T: FromValue> FromValue for Option<T> {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
match value {
manifoldb_core::Value::Null => Ok(None),
_ => T::from_value(value).map(Some),
}
}
}
impl FromValue for manifoldb_core::Value {
fn from_value(value: &manifoldb_core::Value) -> Result<Self> {
Ok(value.clone())
}
}
#[derive(Debug, Clone, Default)]
pub struct QueryParams {
values: Vec<manifoldb_core::Value>,
}
impl QueryParams {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add(&mut self, value: impl Into<manifoldb_core::Value>) -> &mut Self {
self.values.push(value.into());
self
}
#[must_use]
pub fn with(mut self, value: impl Into<manifoldb_core::Value>) -> Self {
self.values.push(value.into());
self
}
#[must_use]
pub fn values(&self) -> &[manifoldb_core::Value] {
&self.values
}
#[must_use]
pub fn len(&self) -> usize {
self.values.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn clear(&mut self) {
self.values.clear();
}
}
#[macro_export]
macro_rules! params {
() => {
[]
};
($($value:expr),+ $(,)?) => {
[$($crate::Value::from($value)),+]
};
}
fn next_prefix(prefix: &[u8]) -> Vec<u8> {
let mut result = prefix.to_vec();
for byte in result.iter_mut().rev() {
if *byte < 0xFF {
*byte += 1;
return result;
}
}
result.push(0xFF);
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_database_in_memory() {
let db = Database::in_memory().expect("failed to create in-memory db");
assert!(db.config().in_memory);
}
#[test]
fn test_database_begin_transaction() {
let db = Database::in_memory().expect("failed to create in-memory db");
let tx = db.begin().expect("failed to begin transaction");
assert!(!tx.is_read_only());
tx.rollback().expect("failed to rollback");
}
#[test]
fn test_database_begin_read_transaction() {
let db = Database::in_memory().expect("failed to create in-memory db");
let tx = db.begin_read().expect("failed to begin read transaction");
assert!(tx.is_read_only());
tx.rollback().expect("failed to rollback");
}
#[test]
fn test_query_result_empty() {
let result = QueryResult::empty();
assert!(result.is_empty());
assert_eq!(result.len(), 0);
assert_eq!(result.num_columns(), 0);
}
#[test]
fn test_query_result_with_data() {
let columns = vec!["id".to_string(), "name".to_string()];
let rows = vec![
QueryRow::new(vec![
manifoldb_core::Value::Int(1),
manifoldb_core::Value::String("Alice".to_string()),
]),
QueryRow::new(vec![
manifoldb_core::Value::Int(2),
manifoldb_core::Value::String("Bob".to_string()),
]),
];
let result = QueryResult::new(columns, rows);
assert_eq!(result.len(), 2);
assert_eq!(result.num_columns(), 2);
assert_eq!(result.columns(), &["id", "name"]);
assert_eq!(result.column_index("name"), Some(1));
}
#[test]
fn test_query_row_get_as() {
let row = QueryRow::new(vec![
manifoldb_core::Value::Int(42),
manifoldb_core::Value::String("test".to_string()),
manifoldb_core::Value::Bool(true),
manifoldb_core::Value::Float(2.5),
]);
assert_eq!(row.get_as::<i64>(0).unwrap(), 42);
assert_eq!(row.get_as::<String>(1).unwrap(), "test");
assert_eq!(row.get_as::<bool>(2).unwrap(), true);
assert!((row.get_as::<f64>(3).unwrap() - 2.5).abs() < f64::EPSILON);
}
#[test]
fn test_query_row_get_as_error() {
let row = QueryRow::new(vec![manifoldb_core::Value::Int(42)]);
assert!(row.get_as::<String>(0).is_err());
assert!(row.get_as::<i64>(999).is_err());
}
#[test]
fn test_query_params() {
let params = QueryParams::new().with("Alice").with(30i64).with(true);
assert_eq!(params.len(), 3);
assert_eq!(params.values()[0], manifoldb_core::Value::String("Alice".to_string()));
assert_eq!(params.values()[1], manifoldb_core::Value::Int(30));
assert_eq!(params.values()[2], manifoldb_core::Value::Bool(true));
}
#[test]
fn test_query_result_iterator() {
let columns = vec!["n".to_string()];
let rows = vec![
QueryRow::new(vec![manifoldb_core::Value::Int(1)]),
QueryRow::new(vec![manifoldb_core::Value::Int(2)]),
QueryRow::new(vec![manifoldb_core::Value::Int(3)]),
];
let result = QueryResult::new(columns, rows);
let sum: i64 = result.iter().filter_map(|r| r.get_as::<i64>(0).ok()).sum();
assert_eq!(sum, 6);
}
#[test]
fn test_entity_crud_via_database() {
let db = Database::in_memory().expect("failed to create in-memory db");
let mut tx = db.begin().expect("failed to begin write");
let entity = tx
.create_entity()
.expect("failed to create entity")
.with_label("Person")
.with_property("name", "Alice");
let entity_id = entity.id;
tx.put_entity(&entity).expect("failed to put entity");
tx.commit().expect("failed to commit");
let tx = db.begin_read().expect("failed to begin read");
let retrieved =
tx.get_entity(entity_id).expect("failed to get entity").expect("entity not found");
assert_eq!(retrieved.id, entity_id);
assert!(retrieved.has_label("Person"));
}
#[test]
fn test_parse_and_execute_query() {
let db = Database::in_memory().expect("failed to create in-memory db");
let result = db.query("SELECT * FROM users WHERE id = 1");
assert!(result.is_ok());
}
#[test]
fn test_parse_invalid_query() {
let db = Database::in_memory().expect("failed to create in-memory db");
let result = db.query("INVALID SQL SYNTAX !!!");
assert!(result.is_err());
}
#[test]
fn test_query_cache_hit() {
let db = Database::in_memory().expect("failed to create in-memory db");
let _result1 = db.query("SELECT * FROM users").expect("query failed");
let metrics = db.cache_metrics();
assert_eq!(metrics.misses(), 1);
assert_eq!(metrics.hits(), 0);
let _result2 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(metrics.misses(), 1);
assert_eq!(metrics.hits(), 1);
}
#[test]
fn test_query_cache_invalidation_on_insert() {
let db = Database::in_memory().expect("failed to create in-memory db");
let _result1 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().misses(), 1);
let _result2 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().hits(), 1);
db.execute("INSERT INTO users (name) VALUES ('Alice')").expect("insert failed");
let _result3 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().misses(), 2);
}
#[test]
fn test_query_cache_no_cache_hint() {
let db = Database::in_memory().expect("failed to create in-memory db");
let _result1 = db.query("/*+ NO_CACHE */ SELECT * FROM users").expect("query failed");
let _result2 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().misses(), 1);
let _result3 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().hits(), 1);
}
#[test]
fn test_query_cache_with_params() {
let db = Database::in_memory().expect("failed to create in-memory db");
let params1 = &[manifoldb_core::Value::Int(1)];
let params2 = &[manifoldb_core::Value::Int(2)];
let _result1 = db
.query_with_params("SELECT * FROM users WHERE id = $1", params1)
.expect("query failed");
assert_eq!(db.cache_metrics().misses(), 1);
let _result2 = db
.query_with_params("SELECT * FROM users WHERE id = $1", params1)
.expect("query failed");
assert_eq!(db.cache_metrics().hits(), 1);
let _result3 = db
.query_with_params("SELECT * FROM users WHERE id = $1", params2)
.expect("query failed");
assert_eq!(db.cache_metrics().misses(), 2);
}
#[test]
fn test_query_cache_clear() {
let db = Database::in_memory().expect("failed to create in-memory db");
let _result1 = db.query("SELECT * FROM users").expect("query failed");
let _result2 = db.query("SELECT * FROM orders").expect("query failed");
assert_eq!(db.query_cache().len(), 2);
db.clear_cache();
assert!(db.query_cache().is_empty());
let _result3 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().misses(), 3);
}
#[test]
fn test_query_cache_disabled() {
use crate::cache::CacheConfig;
let db = DatabaseBuilder::in_memory()
.query_cache_config(CacheConfig::disabled())
.open()
.expect("failed to create db");
let _result1 = db.query("SELECT * FROM users").expect("query failed");
let _result2 = db.query("SELECT * FROM users").expect("query failed");
assert_eq!(db.cache_metrics().hits(), 0);
}
#[test]
fn test_query_cache_metrics() {
let db = Database::in_memory().expect("failed to create in-memory db");
let _result1 = db.query("SELECT * FROM users").expect("query failed");
let _result2 = db.query("SELECT * FROM users").expect("query failed");
let _result3 = db.query("SELECT * FROM orders").expect("query failed");
let _result4 = db.query("SELECT * FROM users").expect("query failed");
let metrics = db.cache_metrics();
assert_eq!(metrics.total_lookups(), 4);
assert_eq!(metrics.hits(), 2); assert_eq!(metrics.misses(), 2);
let hit_rate = metrics.hit_rate().expect("should have hit rate");
assert!((hit_rate - 50.0).abs() < 0.1); }
#[test]
fn test_bulk_insert_empty() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = Vec::new();
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
assert!(ids.is_empty());
}
#[test]
fn test_bulk_insert_single_entity() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities =
vec![Entity::new(EntityId::new(0)).with_label("Person").with_property("name", "Alice")];
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
assert_eq!(ids.len(), 1);
let tx = db.begin_read().expect("failed to begin read");
let retrieved = tx.get_entity(ids[0]).expect("get failed").expect("entity not found");
assert!(retrieved.has_label("Person"));
assert_eq!(
retrieved.get_property("name"),
Some(&manifoldb_core::Value::String("Alice".to_string()))
);
}
#[test]
fn test_bulk_insert_multiple_entities() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..100)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Document")
.with_property("index", i as i64)
})
.collect();
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
assert_eq!(ids.len(), 100);
for (i, id) in ids.iter().enumerate() {
assert_eq!(id.as_u64(), (i + 1) as u64);
}
let tx = db.begin_read().expect("failed to begin read");
for (i, id) in ids.iter().enumerate() {
let entity = tx.get_entity(*id).expect("get failed").expect("entity not found");
assert!(entity.has_label("Document"));
assert_eq!(entity.get_property("index"), Some(&manifoldb_core::Value::Int(i as i64)));
}
}
#[test]
fn test_bulk_insert_preserves_existing_id_sequence() {
let db = Database::in_memory().expect("failed to create in-memory db");
{
let mut tx = db.begin().expect("failed to begin");
for _ in 0..5 {
let entity = tx.create_entity().expect("failed to create");
tx.put_entity(&entity).expect("failed to put");
}
tx.commit().expect("failed to commit");
}
let entities: Vec<Entity> = (0..10)
.map(|i| Entity::new(EntityId::new(0)).with_property("bulk_index", i as i64))
.collect();
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
assert_eq!(ids[0].as_u64(), 6);
assert_eq!(ids[9].as_u64(), 15);
}
#[test]
fn test_bulk_insert_with_multiple_labels() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0))
.with_label("Person")
.with_label("Employee")
.with_label("Manager")
.with_property("name", "Alice")];
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
let tx = db.begin_read().expect("failed to begin read");
let retrieved = tx.get_entity(ids[0]).expect("get failed").expect("entity not found");
assert!(retrieved.has_label("Person"));
assert!(retrieved.has_label("Employee"));
assert!(retrieved.has_label("Manager"));
}
#[test]
fn test_bulk_insert_large_batch() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..10_000)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Item")
.with_property("id", i as i64)
.with_property("data", format!("item_{}", i))
})
.collect();
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
assert_eq!(ids.len(), 10_000);
let tx = db.begin_read().expect("failed to begin read");
for check_idx in [0, 100, 5000, 9999] {
let entity =
tx.get_entity(ids[check_idx]).expect("get failed").expect("entity not found");
assert!(entity.has_label("Item"));
assert_eq!(
entity.get_property("id"),
Some(&manifoldb_core::Value::Int(check_idx as i64))
);
}
}
#[test]
fn test_bulk_insert_returns_correct_order() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..50)
.map(|i| Entity::new(EntityId::new(0)).with_property("unique_marker", i * 1000 + 42))
.collect();
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
let tx = db.begin_read().expect("failed to begin read");
for (i, id) in ids.iter().enumerate() {
let entity = tx.get_entity(*id).expect("get failed").expect("entity not found");
let expected_marker = i as i64 * 1000 + 42;
assert_eq!(
entity.get_property("unique_marker"),
Some(&manifoldb_core::Value::Int(expected_marker)),
"Entity at position {} has wrong marker",
i
);
}
}
#[test]
fn test_bulk_upsert_empty() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = Vec::new();
let (inserted, updated) = db.bulk_upsert_entities(&entities).expect("bulk upsert failed");
assert_eq!(inserted, 0);
assert_eq!(updated, 0);
}
#[test]
fn test_bulk_upsert_all_inserts() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..10)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Document")
.with_property("index", i as i64)
})
.collect();
let (inserted, updated) = db.bulk_upsert_entities(&entities).expect("bulk upsert failed");
assert_eq!(inserted, 10);
assert_eq!(updated, 0);
let tx = db.begin_read().expect("failed to begin read");
for i in 1..=10 {
let entity =
tx.get_entity(EntityId::new(i)).expect("get failed").expect("entity not found");
assert!(entity.has_label("Document"));
assert_eq!(
entity.get_property("index"),
Some(&manifoldb_core::Value::Int((i - 1) as i64))
);
}
}
#[test]
fn test_bulk_upsert_all_updates() {
let db = Database::in_memory().expect("failed to create in-memory db");
let initial_entities: Vec<Entity> = (0..10)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Document")
.with_property("version", 1i64)
.with_property("index", i as i64)
})
.collect();
let ids = db.bulk_insert_entities(&initial_entities).expect("bulk insert failed");
let update_entities: Vec<Entity> = ids
.iter()
.enumerate()
.map(|(i, id)| {
Entity::new(*id)
.with_label("Document")
.with_property("version", 2i64)
.with_property("index", i as i64)
})
.collect();
let (inserted, updated) =
db.bulk_upsert_entities(&update_entities).expect("bulk upsert failed");
assert_eq!(inserted, 0);
assert_eq!(updated, 10);
let tx = db.begin_read().expect("failed to begin read");
for id in &ids {
let entity = tx.get_entity(*id).expect("get failed").expect("entity not found");
assert_eq!(entity.get_property("version"), Some(&manifoldb_core::Value::Int(2)));
}
}
#[test]
fn test_bulk_upsert_mixed_insert_and_update() {
let db = Database::in_memory().expect("failed to create in-memory db");
let initial_entities: Vec<Entity> = (0..50)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Document")
.with_property("version", 1i64)
.with_property("original_index", i as i64)
})
.collect();
let ids = db.bulk_insert_entities(&initial_entities).expect("bulk insert failed");
let mut upsert_entities: Vec<Entity> = Vec::new();
for (i, id) in ids.iter().take(30).enumerate() {
upsert_entities.push(
Entity::new(*id)
.with_label("Document")
.with_property("version", 2i64)
.with_property("original_index", i as i64),
);
}
for i in 0..20 {
upsert_entities.push(
Entity::new(EntityId::new(0))
.with_label("Document")
.with_property("version", 1i64)
.with_property("new_index", i as i64),
);
}
let (inserted, updated) =
db.bulk_upsert_entities(&upsert_entities).expect("bulk upsert failed");
assert_eq!(inserted, 20);
assert_eq!(updated, 30);
let tx = db.begin_read().expect("failed to begin read");
for id in ids.iter().take(30) {
let entity = tx.get_entity(*id).expect("get failed").expect("entity not found");
assert_eq!(entity.get_property("version"), Some(&manifoldb_core::Value::Int(2)));
}
let expected_start_id = ids.len() as u64 + 1;
for i in 0..20u64 {
let entity = tx
.get_entity(EntityId::new(expected_start_id + i))
.expect("get failed")
.expect("entity not found");
assert_eq!(entity.get_property("version"), Some(&manifoldb_core::Value::Int(1)));
assert_eq!(
entity.get_property("new_index"),
Some(&manifoldb_core::Value::Int(i as i64))
);
}
}
#[test]
fn test_bulk_upsert_preserves_labels_on_update() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0))
.with_label("Person")
.with_label("Employee")
.with_property("name", "Alice")];
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
let update_entities = vec![Entity::new(ids[0])
.with_label("Person")
.with_label("Manager") .with_property("name", "Alice")
.with_property("promoted", true)];
let (inserted, updated) =
db.bulk_upsert_entities(&update_entities).expect("bulk upsert failed");
assert_eq!(inserted, 0);
assert_eq!(updated, 1);
let tx = db.begin_read().expect("failed to begin read");
let entity = tx.get_entity(ids[0]).expect("get failed").expect("entity not found");
assert!(entity.has_label("Person"));
assert!(entity.has_label("Manager"));
assert!(!entity.has_label("Employee")); assert_eq!(entity.get_property("promoted"), Some(&manifoldb_core::Value::Bool(true)));
}
#[test]
fn test_bulk_upsert_nonexistent_id_becomes_insert() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(999))
.with_label("Ghost")
.with_property("name", "Phantom")];
let (inserted, updated) = db.bulk_upsert_entities(&entities).expect("bulk upsert failed");
assert_eq!(inserted, 1);
assert_eq!(updated, 0);
let tx = db.begin_read().expect("failed to begin read");
let entity =
tx.get_entity(EntityId::new(1)).expect("get failed").expect("entity not found");
assert!(entity.has_label("Ghost"));
assert_eq!(
entity.get_property("name"),
Some(&manifoldb_core::Value::String("Phantom".to_string()))
);
}
#[test]
fn test_bulk_upsert_large_batch() {
let db = Database::in_memory().expect("failed to create in-memory db");
let initial_entities: Vec<Entity> = (0..5000)
.map(|i| {
Entity::new(EntityId::new(0))
.with_label("Item")
.with_property("index", i as i64)
.with_property("version", 1i64)
})
.collect();
let ids = db.bulk_insert_entities(&initial_entities).expect("bulk insert failed");
let mut upsert_entities: Vec<Entity> = Vec::new();
for (i, id) in ids.iter().take(2500).enumerate() {
upsert_entities.push(
Entity::new(*id)
.with_label("Item")
.with_property("index", i as i64)
.with_property("version", 2i64),
);
}
for i in 0..2500 {
upsert_entities.push(
Entity::new(EntityId::new(0))
.with_label("Item")
.with_property("index", (5000 + i) as i64)
.with_property("version", 1i64),
);
}
let (inserted, updated) =
db.bulk_upsert_entities(&upsert_entities).expect("bulk upsert failed");
assert_eq!(inserted, 2500);
assert_eq!(updated, 2500);
let tx = db.begin_read().expect("failed to begin read");
let updated_entity =
tx.get_entity(ids[100]).expect("get failed").expect("entity not found");
assert_eq!(updated_entity.get_property("version"), Some(&manifoldb_core::Value::Int(2)));
let unchanged_entity =
tx.get_entity(ids[4000]).expect("get failed").expect("entity not found");
assert_eq!(unchanged_entity.get_property("version"), Some(&manifoldb_core::Value::Int(1)));
}
#[test]
fn test_bulk_upsert_update_removes_property() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0))
.with_label("Person")
.with_property("name", "Alice")
.with_property("age", 30i64)
.with_property("email", "alice@example.com")];
let ids = db.bulk_insert_entities(&entities).expect("bulk insert failed");
let update_entities = vec![Entity::new(ids[0])
.with_label("Person")
.with_property("name", "Alice")
.with_property("age", 31i64)];
let (inserted, updated) =
db.bulk_upsert_entities(&update_entities).expect("bulk upsert failed");
assert_eq!(inserted, 0);
assert_eq!(updated, 1);
let tx = db.begin_read().expect("failed to begin read");
let entity = tx.get_entity(ids[0]).expect("get failed").expect("entity not found");
assert_eq!(
entity.get_property("name"),
Some(&manifoldb_core::Value::String("Alice".to_string()))
);
assert_eq!(entity.get_property("age"), Some(&manifoldb_core::Value::Int(31)));
assert_eq!(entity.get_property("email"), None); }
#[test]
fn test_bulk_insert_edges_empty() {
let db = Database::in_memory().expect("failed to create in-memory db");
let edges: Vec<Edge> = Vec::new();
let ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert!(ids.is_empty());
}
#[test]
fn test_bulk_insert_edges_single() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![
Entity::new(EntityId::new(0)).with_label("Person"),
Entity::new(EntityId::new(0)).with_label("Person"),
];
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![Edge::new(EdgeId::new(0), entity_ids[0], entity_ids[1], "FOLLOWS")
.with_property("since", "2024-01-01")];
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 1);
let tx = db.begin_read().expect("failed to begin read");
let retrieved = tx.get_edge(edge_ids[0]).expect("get failed").expect("edge not found");
assert_eq!(retrieved.source, entity_ids[0]);
assert_eq!(retrieved.target, entity_ids[1]);
assert_eq!(retrieved.edge_type.as_str(), "FOLLOWS");
assert_eq!(
retrieved.get_property("since"),
Some(&manifoldb_core::Value::String("2024-01-01".to_string()))
);
}
#[test]
fn test_bulk_insert_edges_multiple() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..10)
.map(|i| {
Entity::new(EntityId::new(0)).with_label("Node").with_property("idx", i as i64)
})
.collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges: Vec<Edge> = entity_ids
.windows(2)
.enumerate()
.map(|(i, pair)| {
Edge::new(EdgeId::new(0), pair[0], pair[1], "NEXT").with_property("order", i as i64)
})
.collect();
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 9);
for (i, id) in edge_ids.iter().enumerate() {
assert_eq!(id.as_u64(), (i + 1) as u64);
}
let tx = db.begin_read().expect("failed to begin read");
for (i, edge_id) in edge_ids.iter().enumerate() {
let edge = tx.get_edge(*edge_id).expect("get failed").expect("edge not found");
assert_eq!(edge.source, entity_ids[i]);
assert_eq!(edge.target, entity_ids[i + 1]);
assert_eq!(edge.edge_type.as_str(), "NEXT");
assert_eq!(edge.get_property("order"), Some(&manifoldb_core::Value::Int(i as i64)));
}
}
#[test]
fn test_bulk_insert_edges_invalid_source() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0)).with_label("Person")];
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![Edge::new(
EdgeId::new(0),
EntityId::new(999), entity_ids[0],
"FOLLOWS",
)];
let result = db.bulk_insert_edges(&edges);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, Error::InvalidEntityReference(_)));
}
#[test]
fn test_bulk_insert_edges_invalid_target() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0)).with_label("Person")];
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![Edge::new(
EdgeId::new(0),
entity_ids[0],
EntityId::new(999), "FOLLOWS",
)];
let result = db.bulk_insert_edges(&edges);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, Error::InvalidEntityReference(_)));
}
#[test]
fn test_bulk_insert_edges_preserves_id_sequence() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> =
(0..5).map(|_| Entity::new(EntityId::new(0)).with_label("Node")).collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
{
let mut tx = db.begin().expect("failed to begin");
for i in 0..3 {
let edge = tx
.create_edge(entity_ids[i], entity_ids[i + 1], "LINK")
.expect("failed to create edge");
tx.put_edge(&edge).expect("failed to put edge");
}
tx.commit().expect("failed to commit");
}
let edges: Vec<Edge> =
vec![Edge::new(EdgeId::new(0), entity_ids[3], entity_ids[4], "LINK")];
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids[0].as_u64(), 4);
}
#[test]
fn test_bulk_insert_edges_with_properties() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> =
(0..3).map(|_| Entity::new(EntityId::new(0)).with_label("User")).collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![
Edge::new(EdgeId::new(0), entity_ids[0], entity_ids[1], "FOLLOWS")
.with_property("weight", 0.8f64)
.with_property("since", "2024-01-01")
.with_property("mutual", true),
Edge::new(EdgeId::new(0), entity_ids[1], entity_ids[2], "KNOWS")
.with_property("strength", 5i64)
.with_property("context", "work"),
];
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 2);
let tx = db.begin_read().expect("failed to begin read");
let edge1 = tx.get_edge(edge_ids[0]).expect("get failed").expect("edge not found");
assert_eq!(edge1.get_property("weight"), Some(&manifoldb_core::Value::Float(0.8)));
assert_eq!(
edge1.get_property("since"),
Some(&manifoldb_core::Value::String("2024-01-01".to_string()))
);
assert_eq!(edge1.get_property("mutual"), Some(&manifoldb_core::Value::Bool(true)));
let edge2 = tx.get_edge(edge_ids[1]).expect("get failed").expect("edge not found");
assert_eq!(edge2.get_property("strength"), Some(&manifoldb_core::Value::Int(5)));
assert_eq!(
edge2.get_property("context"),
Some(&manifoldb_core::Value::String("work".to_string()))
);
}
#[test]
fn test_bulk_insert_edges_self_referential() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0)).with_label("Node")];
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![Edge::new(
EdgeId::new(0),
entity_ids[0],
entity_ids[0], "SELF_LINK",
)];
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 1);
let tx = db.begin_read().expect("failed to begin read");
let edge = tx.get_edge(edge_ids[0]).expect("get failed").expect("edge not found");
assert_eq!(edge.source, entity_ids[0]);
assert_eq!(edge.target, entity_ids[0]);
assert_eq!(edge.edge_type.as_str(), "SELF_LINK");
}
#[test]
fn test_bulk_insert_edges_large_batch() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> = (0..100)
.map(|i| {
Entity::new(EntityId::new(0)).with_label("Node").with_property("idx", i as i64)
})
.collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges: Vec<Edge> = (0..1000)
.map(|i| {
let source_idx = i % 100;
let target_idx = (i * 7 + 13) % 100;
Edge::new(EdgeId::new(0), entity_ids[source_idx], entity_ids[target_idx], "LINK")
.with_property("edge_idx", i as i64)
})
.collect();
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 1000);
let tx = db.begin_read().expect("failed to begin read");
for check_idx in [0, 100, 500, 999] {
let edge =
tx.get_edge(edge_ids[check_idx]).expect("get failed").expect("edge not found");
assert_eq!(
edge.get_property("edge_idx"),
Some(&manifoldb_core::Value::Int(check_idx as i64))
);
}
}
#[test]
fn test_bulk_insert_edges_returns_correct_order() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> =
(0..10).map(|_| Entity::new(EntityId::new(0)).with_label("Node")).collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges: Vec<Edge> = (0..20)
.map(|i| {
let source_idx = i % 10;
let target_idx = (i + 1) % 10;
Edge::new(EdgeId::new(0), entity_ids[source_idx], entity_ids[target_idx], "LINK")
.with_property("unique_marker", i as i64 * 1000 + 42)
})
.collect();
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
let tx = db.begin_read().expect("failed to begin read");
for (i, id) in edge_ids.iter().enumerate() {
let edge = tx.get_edge(*id).expect("get failed").expect("edge not found");
let expected_marker = i as i64 * 1000 + 42;
assert_eq!(
edge.get_property("unique_marker"),
Some(&manifoldb_core::Value::Int(expected_marker)),
"Edge at position {} has wrong marker",
i
);
}
}
#[test]
fn test_bulk_insert_edges_multiple_types() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities: Vec<Entity> =
(0..4).map(|_| Entity::new(EntityId::new(0)).with_label("Node")).collect();
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![
Edge::new(EdgeId::new(0), entity_ids[0], entity_ids[1], "FOLLOWS"),
Edge::new(EdgeId::new(0), entity_ids[1], entity_ids[2], "LIKES"),
Edge::new(EdgeId::new(0), entity_ids[2], entity_ids[3], "KNOWS"),
Edge::new(EdgeId::new(0), entity_ids[3], entity_ids[0], "WORKS_WITH"),
];
let edge_ids = db.bulk_insert_edges(&edges).expect("bulk insert failed");
assert_eq!(edge_ids.len(), 4);
let tx = db.begin_read().expect("failed to begin read");
let edge_types = ["FOLLOWS", "LIKES", "KNOWS", "WORKS_WITH"];
for (i, edge_id) in edge_ids.iter().enumerate() {
let edge = tx.get_edge(*edge_id).expect("get failed").expect("edge not found");
assert_eq!(edge.edge_type.as_str(), edge_types[i]);
}
}
#[test]
fn test_bulk_insert_edges_all_invalid_rejected() {
let db = Database::in_memory().expect("failed to create in-memory db");
let entities = vec![Entity::new(EntityId::new(0)).with_label("Node")];
let entity_ids = db.bulk_insert_entities(&entities).expect("entity insert failed");
let edges = vec![
Edge::new(EdgeId::new(0), entity_ids[0], entity_ids[0], "VALID"),
Edge::new(
EdgeId::new(0),
entity_ids[0],
EntityId::new(999), "INVALID",
),
];
let result = db.bulk_insert_edges(&edges);
assert!(result.is_err());
let tx = db.begin_read().expect("failed to begin read");
let edge = tx.get_edge(EdgeId::new(1)).expect("get failed");
assert!(edge.is_none(), "No edges should have been created");
}
}