use std::sync::Arc;
use async_trait::async_trait;
use nodedb_types::document::Document;
use nodedb_types::dropped_collection::DroppedCollection;
use nodedb_types::error::{NodeDbError, NodeDbResult};
use nodedb_types::filter::{EdgeFilter, MetadataFilter};
use nodedb_types::id::{EdgeId, NodeId};
use nodedb_types::protocol::Limits;
use nodedb_types::result::{QueryResult, SearchResult, SubGraph};
use nodedb_types::text_search::TextSearchParams;
use nodedb_types::value::Value;
#[derive(Debug, Clone)]
pub struct CollectionPurgedEvent {
pub tenant_id: u64,
pub name: String,
pub purge_lsn: u64,
}
pub type CollectionPurgedHandler = Arc<dyn Fn(CollectionPurgedEvent) + Send + Sync + 'static>;
#[cfg(not(target_arch = "wasm32"))]
pub trait NodeDbMarker: Send + Sync {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send + Sync + ?Sized> NodeDbMarker for T {}
#[cfg(target_arch = "wasm32")]
pub trait NodeDbMarker {}
#[cfg(target_arch = "wasm32")]
impl<T: ?Sized> NodeDbMarker for T {}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait NodeDb: NodeDbMarker {
async fn vector_search(
&self,
collection: &str,
query: &[f32],
k: usize,
filter: Option<&MetadataFilter>,
) -> NodeDbResult<Vec<SearchResult>>;
async fn vector_insert(
&self,
collection: &str,
id: &str,
embedding: &[f32],
metadata: Option<Document>,
) -> NodeDbResult<()>;
async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
async fn graph_traverse(
&self,
collection: &str,
start: &NodeId,
depth: u8,
edge_filter: Option<&EdgeFilter>,
) -> NodeDbResult<SubGraph>;
async fn graph_insert_edge(
&self,
collection: &str,
from: &NodeId,
to: &NodeId,
edge_type: &str,
properties: Option<Document>,
) -> NodeDbResult<EdgeId>;
async fn graph_delete_edge(&self, collection: &str, edge_id: &EdgeId) -> NodeDbResult<()>;
async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult<Option<Document>>;
async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()>;
async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
async fn vector_insert_field(
&self,
collection: &str,
field_name: &str,
id: &str,
embedding: &[f32],
metadata: Option<Document>,
) -> NodeDbResult<()> {
let _ = (collection, id, embedding, metadata);
Err(NodeDbError::storage(format!(
"vector_insert_field is not implemented on this client; \
field_name={field_name} would have been silently dropped"
)))
}
async fn vector_search_field(
&self,
collection: &str,
field_name: &str,
query: &[f32],
k: usize,
filter: Option<&MetadataFilter>,
) -> NodeDbResult<Vec<SearchResult>> {
let _ = (collection, query, k, filter);
Err(NodeDbError::storage(format!(
"vector_search_field is not implemented on this client; \
field_name={field_name} would have been silently dropped"
)))
}
async fn graph_shortest_path(
&self,
collection: &str,
from: &NodeId,
to: &NodeId,
max_depth: u8,
edge_filter: Option<&EdgeFilter>,
) -> NodeDbResult<Option<Vec<NodeId>>> {
if from == to {
return Ok(Some(vec![from.clone()]));
}
if max_depth == 0 {
return Ok(None);
}
let mut parent: std::collections::HashMap<NodeId, NodeId> =
std::collections::HashMap::new();
let mut frontier: Vec<NodeId> = vec![from.clone()];
for _ in 0..max_depth {
let mut next_frontier: Vec<NodeId> = Vec::new();
for node in &frontier {
let sg = self
.graph_traverse(collection, node, 1, edge_filter)
.await?;
for edge in &sg.edges {
if &edge.from != node {
continue;
}
let dst = &edge.to;
if dst == from || parent.contains_key(dst) {
continue;
}
parent.insert(dst.clone(), node.clone());
if dst == to {
let mut path = vec![to.clone()];
let mut cur = to.clone();
while &cur != from {
let p = parent
.get(&cur)
.expect("BFS reached `to` so all ancestors are tracked")
.clone();
path.push(p.clone());
cur = p;
}
path.reverse();
return Ok(Some(path));
}
next_frontier.push(dst.clone());
}
}
if next_frontier.is_empty() {
return Ok(None);
}
frontier = next_frontier;
}
Ok(None)
}
async fn text_search(
&self,
collection: &str,
field: &str,
query: &str,
top_k: usize,
params: TextSearchParams,
) -> NodeDbResult<Vec<SearchResult>> {
let _ = (collection, field, query, top_k, params);
Err(NodeDbError::storage(
"text_search is not implemented on this client",
))
}
async fn batch_vector_insert(
&self,
collection: &str,
vectors: &[(&str, &[f32])],
) -> NodeDbResult<()> {
for &(id, embedding) in vectors {
self.vector_insert(collection, id, embedding, None).await?;
}
Ok(())
}
async fn batch_graph_insert_edges(
&self,
collection: &str,
edges: &[(&str, &str, &str)],
) -> NodeDbResult<()> {
for &(from, to, label) in edges {
let src = NodeId::try_new(from)
.map_err(|e| NodeDbError::storage(format!("invalid node id: {e}")))?;
let dst = NodeId::try_new(to)
.map_err(|e| NodeDbError::storage(format!("invalid node id: {e}")))?;
self.graph_insert_edge(collection, &src, &dst, label, None)
.await?;
}
Ok(())
}
fn proto_version(&self) -> u16 {
0
}
fn capabilities(&self) -> u64 {
0
}
fn server_version(&self) -> String {
String::new()
}
fn limits(&self) -> Limits {
Limits::default()
}
async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult<QueryResult>;
async fn undrop_collection(&self, name: &str) -> NodeDbResult<()> {
let sql = format!("UNDROP COLLECTION {}", quote_ident(name));
self.execute_sql(&sql, &[]).await?;
Ok(())
}
async fn drop_collection_purge(&self, name: &str) -> NodeDbResult<()> {
let sql = format!("DROP COLLECTION {} PURGE", quote_ident(name));
self.execute_sql(&sql, &[]).await?;
Ok(())
}
async fn list_dropped_collections(&self) -> NodeDbResult<Vec<DroppedCollection>> {
let sql = "SELECT tenant_id, name, owner, engine_type, \
deactivated_at_ns, retention_expires_at_ns \
FROM _system.dropped_collections";
let result = self.execute_sql(sql, &[]).await?;
crate::row_decode::parse_dropped_collection_rows(&result.rows)
}
async fn on_collection_purged(&self, _handler: CollectionPurgedHandler) -> NodeDbResult<()> {
Err(NodeDbError::storage(
"on_collection_purged is not supported on this client — \
requires a push-capable sync connection (NodeDbLite or a \
sync-enabled remote client)",
))
}
}
fn quote_ident(name: &str) -> String {
let needs_quote = name.is_empty()
|| name.chars().next().is_some_and(|c| c.is_ascii_digit())
|| !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_');
if needs_quote {
let escaped = name.replace('"', "\"\"");
format!("\"{escaped}\"")
} else {
name.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::capabilities::Capabilities;
use std::collections::HashMap;
struct MockDb;
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl NodeDb for MockDb {
async fn vector_search(
&self,
_collection: &str,
_query: &[f32],
_k: usize,
_filter: Option<&MetadataFilter>,
) -> NodeDbResult<Vec<SearchResult>> {
Ok(vec![SearchResult {
id: "vec-1".into(),
node_id: None,
distance: 0.1,
metadata: HashMap::new(),
}])
}
async fn vector_insert(
&self,
_collection: &str,
_id: &str,
_embedding: &[f32],
_metadata: Option<Document>,
) -> NodeDbResult<()> {
Ok(())
}
async fn vector_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
Ok(())
}
async fn graph_traverse(
&self,
_collection: &str,
_start: &NodeId,
_depth: u8,
_edge_filter: Option<&EdgeFilter>,
) -> NodeDbResult<SubGraph> {
Ok(SubGraph::empty())
}
async fn graph_insert_edge(
&self,
_collection: &str,
from: &NodeId,
to: &NodeId,
edge_type: &str,
_properties: Option<Document>,
) -> NodeDbResult<EdgeId> {
EdgeId::try_first(from.clone(), to.clone(), edge_type)
.map_err(|e| NodeDbError::storage(format!("invalid edge label: {e}")))
}
async fn graph_delete_edge(
&self,
_collection: &str,
_edge_id: &EdgeId,
) -> NodeDbResult<()> {
Ok(())
}
async fn document_get(
&self,
_collection: &str,
id: &str,
) -> NodeDbResult<Option<Document>> {
let mut doc = Document::new(id);
doc.set("title", Value::String("test".into()));
Ok(Some(doc))
}
async fn document_put(&self, _collection: &str, _doc: Document) -> NodeDbResult<()> {
Ok(())
}
async fn document_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
Ok(())
}
async fn execute_sql(&self, _query: &str, _params: &[Value]) -> NodeDbResult<QueryResult> {
Ok(QueryResult::empty())
}
}
#[test]
fn trait_is_object_safe() {
fn _accepts_dyn(_db: &dyn NodeDb) {}
let db = MockDb;
_accepts_dyn(&db);
}
#[test]
fn trait_works_with_arc() {
use std::sync::Arc;
let db: Arc<dyn NodeDb> = Arc::new(MockDb);
let _ = db;
}
#[tokio::test]
async fn mock_vector_search() {
let db = MockDb;
let results = db
.vector_search("embeddings", &[0.1, 0.2, 0.3], 5, None)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].id, "vec-1");
assert!(results[0].distance < 1.0);
}
#[tokio::test]
async fn mock_vector_insert_and_delete() {
let db = MockDb;
db.vector_insert("coll", "v1", &[1.0, 2.0], None)
.await
.unwrap();
db.vector_delete("coll", "v1").await.unwrap();
}
#[tokio::test]
async fn mock_graph_operations() {
let db = MockDb;
let start = NodeId::try_new("alice").expect("test fixture");
let subgraph = db.graph_traverse("social", &start, 2, None).await.unwrap();
assert_eq!(subgraph.node_count(), 0);
let from = NodeId::try_new("alice").expect("test fixture");
let to = NodeId::try_new("bob").expect("test fixture");
let edge_id = db
.graph_insert_edge("social", &from, &to, "KNOWS", None)
.await
.unwrap();
assert_eq!(edge_id.src.as_str(), "alice");
assert_eq!(edge_id.dst.as_str(), "bob");
assert_eq!(edge_id.label, "KNOWS");
assert_eq!(edge_id.seq, 0);
db.graph_delete_edge("social", &edge_id).await.unwrap();
}
#[tokio::test]
async fn mock_document_operations() {
let db = MockDb;
let doc = db.document_get("notes", "n1").await.unwrap().unwrap();
assert_eq!(doc.id, "n1");
assert_eq!(doc.get_str("title"), Some("test"));
let mut new_doc = Document::new("n2");
new_doc.set("body", Value::String("hello".into()));
db.document_put("notes", new_doc).await.unwrap();
db.document_delete("notes", "n1").await.unwrap();
}
#[tokio::test]
async fn mock_execute_sql() {
let db = MockDb;
let result = db.execute_sql("SELECT 1", &[]).await.unwrap();
assert_eq!(result.row_count(), 0);
}
#[tokio::test]
async fn unified_api_pattern() {
use std::sync::Arc;
let db: Arc<dyn NodeDb> = Arc::new(MockDb);
let results = db
.vector_search("knowledge_base", &[0.1, 0.2], 5, None)
.await
.unwrap();
assert!(!results.is_empty());
let start = NodeId::from_validated(results[0].id.clone());
let _subgraph = db
.graph_traverse("knowledge_base", &start, 2, None)
.await
.unwrap();
let doc = Document::new("note-1");
db.document_put("notes", doc).await.unwrap();
}
#[test]
fn default_proto_version_is_zero() {
let db = MockDb;
assert_eq!(db.proto_version(), 0);
}
#[test]
fn default_capabilities_is_zero() {
let db = MockDb;
assert_eq!(db.capabilities(), 0);
let caps = Capabilities::from_raw(db.capabilities());
assert!(!caps.supports_streaming());
assert!(!caps.supports_graphrag());
}
#[test]
fn default_server_version_is_empty() {
let db = MockDb;
assert!(db.server_version().is_empty());
}
#[test]
fn default_limits_all_none() {
let db = MockDb;
let limits = db.limits();
assert!(limits.max_vector_dim.is_none());
assert!(limits.max_top_k.is_none());
assert!(limits.max_scan_limit.is_none());
assert!(limits.max_batch_size.is_none());
assert!(limits.max_crdt_delta_bytes.is_none());
assert!(limits.max_query_text_bytes.is_none());
assert!(limits.max_graph_depth.is_none());
}
#[test]
fn capabilities_newtype_smoke() {
use nodedb_types::protocol::{CAP_FTS, CAP_STREAMING};
let caps = Capabilities::from_raw(CAP_STREAMING | CAP_FTS);
assert!(caps.supports_streaming());
assert!(caps.supports_fts());
assert!(!caps.supports_graphrag());
assert!(!caps.supports_crdt());
}
}