nodedb_client/traits.rs
1//! The `NodeDb` trait: unified query interface for both Origin and Lite.
2//!
3//! Application code writes against this trait once. The runtime determines
4//! whether queries execute locally (in-memory engines on Lite) or remotely
5//! (pgwire to Origin).
6//!
7//! All methods are `async` — on native this runs on Tokio, on WASM this
8//! runs on `wasm-bindgen-futures`.
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use nodedb_types::document::Document;
15use nodedb_types::dropped_collection::DroppedCollection;
16use nodedb_types::error::{NodeDbError, NodeDbResult};
17use nodedb_types::filter::{EdgeFilter, MetadataFilter};
18use nodedb_types::id::{EdgeId, NodeId};
19use nodedb_types::result::{QueryResult, SearchResult, SubGraph};
20use nodedb_types::text_search::TextSearchParams;
21use nodedb_types::value::Value;
22
23/// Event passed to `NodeDb::on_collection_purged` handlers.
24///
25/// Emitted on the sync client when Origin pushes a `CollectionPurged`
26/// wire message and on Lite after local hard-delete completes, so
27/// application code can flush UI caches, drop derived indexes, etc.
28/// Handler callsites must not block — the dispatch path is on the
29/// sync client's receive loop.
30#[derive(Debug, Clone)]
31pub struct CollectionPurgedEvent {
32 pub tenant_id: u32,
33 pub name: String,
34 /// WAL LSN at which the purge was applied. Handlers can compare
35 /// this against locally-observed LSNs for resume/replay logic.
36 pub purge_lsn: u64,
37}
38
39/// Handler registered via `NodeDb::on_collection_purged`. Fn-ref
40/// (not FnMut) so the same handler can fire from multiple threads
41/// without interior mutability ceremony at every call site.
42pub type CollectionPurgedHandler = Arc<dyn Fn(CollectionPurgedEvent) + Send + Sync + 'static>;
43
44/// Unified database interface for NodeDB.
45///
46/// Two implementations:
47/// - `NodeDbLite`: executes queries against in-memory HNSW/CSR/Loro engines
48/// on the edge device. Writes produce CRDT deltas synced to Origin in background.
49/// - `NodeDbRemote`: translates trait calls into parameterized SQL and sends
50/// them over pgwire to the Origin cluster.
51///
52/// The developer writes agent logic once. Switching between local and cloud
53/// is a one-line configuration change.
54#[async_trait]
55pub trait NodeDb: Send + Sync {
56 // ─── Vector Operations ───────────────────────────────────────────
57
58 /// Search for the `k` nearest vectors to `query` in `collection`.
59 ///
60 /// Returns results ordered by ascending distance. Optional metadata
61 /// filter constrains which vectors are considered.
62 ///
63 /// On Lite: direct in-memory HNSW search. Sub-millisecond.
64 /// On Remote: translated to `SELECT ... ORDER BY embedding <-> $1 LIMIT $2`.
65 async fn vector_search(
66 &self,
67 collection: &str,
68 query: &[f32],
69 k: usize,
70 filter: Option<&MetadataFilter>,
71 ) -> NodeDbResult<Vec<SearchResult>>;
72
73 /// Insert a vector with optional metadata into `collection`.
74 ///
75 /// On Lite: inserts into in-memory HNSW + emits CRDT delta + persists to SQLite.
76 /// On Remote: translated to `INSERT INTO collection (id, embedding, metadata) VALUES (...)`.
77 async fn vector_insert(
78 &self,
79 collection: &str,
80 id: &str,
81 embedding: &[f32],
82 metadata: Option<Document>,
83 ) -> NodeDbResult<()>;
84
85 /// Delete a vector by ID from `collection`.
86 ///
87 /// On Lite: marks deleted in HNSW + emits CRDT tombstone.
88 /// On Remote: `DELETE FROM collection WHERE id = $1`.
89 async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
90
91 // ─── Graph Operations ────────────────────────────────────────────
92
93 /// Traverse the graph from `start` up to `depth` hops.
94 ///
95 /// Returns the discovered subgraph (nodes + edges). Optional edge filter
96 /// constrains which edges are followed during traversal.
97 ///
98 /// On Lite: direct CSR pointer-chasing in contiguous memory. Microseconds.
99 /// On Remote: `SELECT * FROM graph_traverse($1, $2, $3)`.
100 async fn graph_traverse(
101 &self,
102 start: &NodeId,
103 depth: u8,
104 edge_filter: Option<&EdgeFilter>,
105 ) -> NodeDbResult<SubGraph>;
106
107 /// Insert a directed edge from `from` to `to` with the given label.
108 ///
109 /// Returns the generated edge ID.
110 ///
111 /// On Lite: appends to mutable adjacency buffer + CRDT delta + SQLite.
112 /// On Remote: `INSERT INTO edges (src, dst, label, properties) VALUES (...)`.
113 async fn graph_insert_edge(
114 &self,
115 from: &NodeId,
116 to: &NodeId,
117 edge_type: &str,
118 properties: Option<Document>,
119 ) -> NodeDbResult<EdgeId>;
120
121 /// Delete a graph edge by ID.
122 ///
123 /// On Lite: marks deleted + CRDT tombstone.
124 /// On Remote: `DELETE FROM edges WHERE id = $1`.
125 async fn graph_delete_edge(&self, edge_id: &EdgeId) -> NodeDbResult<()>;
126
127 // ─── Document Operations ─────────────────────────────────────────
128
129 /// Get a document by ID from `collection`.
130 ///
131 /// On Lite: direct Loro state read. Sub-millisecond.
132 /// On Remote: `SELECT * FROM collection WHERE id = $1`.
133 async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult<Option<Document>>;
134
135 /// Put (insert or update) a document into `collection`.
136 ///
137 /// The document's `id` field determines the key. If a document with that
138 /// ID already exists, it is overwritten (last-writer-wins locally; CRDT
139 /// merge on sync).
140 ///
141 /// On Lite: Loro apply + CRDT delta + SQLite persist.
142 /// On Remote: `INSERT ... ON CONFLICT (id) DO UPDATE SET ...`.
143 async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()>;
144
145 /// Delete a document by ID from `collection`.
146 ///
147 /// On Lite: Loro delete + CRDT tombstone.
148 /// On Remote: `DELETE FROM collection WHERE id = $1`.
149 async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
150
151 // ─── Named Vector Fields ──────────────────────────────────────────
152
153 /// Insert a vector into a named field within a collection.
154 ///
155 /// Enables multiple embeddings per collection (e.g., "title_embedding",
156 /// "body_embedding") with independent HNSW indexes.
157 /// Default: delegates to `vector_insert()` ignoring field_name.
158 async fn vector_insert_field(
159 &self,
160 collection: &str,
161 field_name: &str,
162 id: &str,
163 embedding: &[f32],
164 metadata: Option<Document>,
165 ) -> NodeDbResult<()> {
166 let _ = field_name;
167 self.vector_insert(collection, id, embedding, metadata)
168 .await
169 }
170
171 /// Search a named vector field.
172 ///
173 /// Default: delegates to `vector_search()` ignoring field_name.
174 async fn vector_search_field(
175 &self,
176 collection: &str,
177 field_name: &str,
178 query: &[f32],
179 k: usize,
180 filter: Option<&MetadataFilter>,
181 ) -> NodeDbResult<Vec<SearchResult>> {
182 let _ = field_name;
183 self.vector_search(collection, query, k, filter).await
184 }
185
186 // ─── Graph Shortest Path ────────────────────────────────────────
187
188 /// Find the shortest path between two nodes.
189 ///
190 /// Returns the path as a list of node IDs, or None if no path exists
191 /// within `max_depth` hops. Uses bidirectional BFS.
192 async fn graph_shortest_path(
193 &self,
194 from: &NodeId,
195 to: &NodeId,
196 max_depth: u8,
197 edge_filter: Option<&EdgeFilter>,
198 ) -> NodeDbResult<Option<Vec<NodeId>>> {
199 let _ = (from, to, max_depth, edge_filter);
200 Ok(None)
201 }
202
203 // ─── Text Search ────────────────────────────────────────────────
204
205 /// Full-text search with BM25 scoring.
206 ///
207 /// Returns document IDs with relevance scores, ordered by descending score.
208 /// Pass [`TextSearchParams::default()`] for standard OR-mode non-fuzzy search.
209 async fn text_search(
210 &self,
211 collection: &str,
212 query: &str,
213 top_k: usize,
214 params: TextSearchParams,
215 ) -> NodeDbResult<Vec<SearchResult>> {
216 let _ = (collection, query, top_k, params);
217 Ok(Vec::new())
218 }
219
220 // ─── Batch Operations ───────────────────────────────────────────
221
222 /// Batch insert vectors — amortizes CRDT delta export to O(1) per batch.
223 async fn batch_vector_insert(
224 &self,
225 collection: &str,
226 vectors: &[(&str, &[f32])],
227 ) -> NodeDbResult<()> {
228 for &(id, embedding) in vectors {
229 self.vector_insert(collection, id, embedding, None).await?;
230 }
231 Ok(())
232 }
233
234 /// Batch insert graph edges — amortizes CRDT delta export to O(1) per batch.
235 async fn batch_graph_insert_edges(&self, edges: &[(&str, &str, &str)]) -> NodeDbResult<()> {
236 for &(from, to, label) in edges {
237 self.graph_insert_edge(&NodeId::new(from), &NodeId::new(to), label, None)
238 .await?;
239 }
240 Ok(())
241 }
242
243 // ─── SQL Escape Hatch ────────────────────────────────────────────
244
245 /// Execute a raw SQL query with parameters.
246 ///
247 /// On Lite: requires the `sql` feature flag (compiles in DataFusion parser).
248 /// Returns `NodeDbError::SqlNotEnabled` if the feature is not compiled in.
249 /// On Remote: pass-through to Origin via pgwire.
250 ///
251 /// For most AI agent workloads, the typed methods above are sufficient
252 /// and faster. Use this for BI tools, existing ORMs, or ad-hoc queries.
253 async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult<QueryResult>;
254
255 // ─── Collection Lifecycle (soft-delete / undrop / hard-delete) ───
256
257 /// Restore a soft-deleted collection within its retention window.
258 ///
259 /// Equivalent to `UNDROP COLLECTION <name>`. Fails with 42P01 if
260 /// the retention window has elapsed and the row is gone, or with
261 /// 42501 if the caller is neither preserved owner nor admin.
262 ///
263 /// Default impl routes through `execute_sql` so any implementation
264 /// that can execute SQL inherits the correct behavior for free.
265 async fn undrop_collection(&self, name: &str) -> NodeDbResult<()> {
266 let sql = format!("UNDROP COLLECTION {}", quote_ident(name));
267 self.execute_sql(&sql, &[]).await?;
268 Ok(())
269 }
270
271 /// Hard-delete a collection, skipping soft-delete and retention.
272 ///
273 /// Equivalent to `DROP COLLECTION <name> PURGE`. Admin-only on the
274 /// server; the server rejects non-admin callers with 42501.
275 /// Bypasses the retention safety net — data is unrecoverable.
276 async fn drop_collection_purge(&self, name: &str) -> NodeDbResult<()> {
277 let sql = format!("DROP COLLECTION {} PURGE", quote_ident(name));
278 self.execute_sql(&sql, &[]).await?;
279 Ok(())
280 }
281
282 /// List every soft-deleted collection in the current tenant that
283 /// is still within its retention window.
284 ///
285 /// Equivalent to `SELECT tenant_id, name, owner, deactivated_at_ns,
286 /// retention_expires_at_ns FROM _system.dropped_collections`.
287 /// Returns `Vec<DroppedCollection>` — empty if no soft-deleted rows
288 /// exist for the caller's tenant.
289 async fn list_dropped_collections(&self) -> NodeDbResult<Vec<DroppedCollection>> {
290 let sql = "SELECT tenant_id, name, owner, engine_type, \
291 deactivated_at_ns, retention_expires_at_ns \
292 FROM _system.dropped_collections";
293 let result = self.execute_sql(sql, &[]).await?;
294 parse_dropped_collection_rows(&result)
295 }
296
297 /// Register a handler fired when a collection the caller has
298 /// synced is purged on Origin and the local copy is removed.
299 ///
300 /// Default impl returns `NodeDbError::storage` with a
301 /// `"not supported"` detail — implementations that maintain a
302 /// sync client (Lite, any future push-capable remote client)
303 /// override with registration into their internal handler list.
304 /// Stateless clients (pgwire-only `NodeDbRemote`) have nothing
305 /// to push, so the default rejection is the correct behavior.
306 async fn on_collection_purged(&self, _handler: CollectionPurgedHandler) -> NodeDbResult<()> {
307 Err(NodeDbError::storage(
308 "on_collection_purged is not supported on this client — \
309 requires a push-capable sync connection (NodeDbLite or a \
310 sync-enabled remote client)",
311 ))
312 }
313}
314
315/// Quote a SQL identifier. Mirrors the pgwire-side rule used by
316/// `remote_parse::quote_identifier`: wrap in double-quotes only if
317/// the name contains anything other than `[A-Za-z0-9_]` or starts
318/// with a digit. Unquoted fast-path keeps the usual case cheap.
319fn quote_ident(name: &str) -> String {
320 let needs_quote = name.is_empty()
321 || name.chars().next().is_some_and(|c| c.is_ascii_digit())
322 || !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_');
323 if needs_quote {
324 let escaped = name.replace('"', "\"\"");
325 format!("\"{escaped}\"")
326 } else {
327 name.to_string()
328 }
329}
330
331/// Decode `_system.dropped_collections` rows into
332/// `Vec<DroppedCollection>`. Each row is a `Vec<Value>` aligned with
333/// the column order declared in the SELECT list above.
334fn parse_dropped_collection_rows(result: &QueryResult) -> NodeDbResult<Vec<DroppedCollection>> {
335 let mut out = Vec::with_capacity(result.rows.len());
336 for row in &result.rows {
337 if row.len() < 6 {
338 return Err(NodeDbError::storage(format!(
339 "dropped_collections row has {} columns; expected 6 \
340 (tenant_id, name, owner, engine_type, deactivated_at_ns, \
341 retention_expires_at_ns)",
342 row.len()
343 )));
344 }
345 out.push(DroppedCollection {
346 tenant_id: value_as_u32(&row[0])?,
347 name: value_as_string(&row[1])?,
348 owner: value_as_string(&row[2])?,
349 engine_type: value_as_string(&row[3])?,
350 deactivated_at_ns: value_as_u64(&row[4])?,
351 retention_expires_at_ns: value_as_u64(&row[5])?,
352 });
353 }
354 Ok(out)
355}
356
357fn value_as_u32(v: &Value) -> NodeDbResult<u32> {
358 match v {
359 Value::Integer(i) => Ok(*i as u32),
360 Value::String(s) => s
361 .parse::<u32>()
362 .map_err(|e| NodeDbError::storage(format!("parse u32 from '{s}': {e}"))),
363 _ => Err(NodeDbError::storage(format!(
364 "expected integer for u32 column, got {v:?}"
365 ))),
366 }
367}
368
369fn value_as_u64(v: &Value) -> NodeDbResult<u64> {
370 match v {
371 Value::Integer(i) => Ok(*i as u64),
372 Value::String(s) => s
373 .parse::<u64>()
374 .map_err(|e| NodeDbError::storage(format!("parse u64 from '{s}': {e}"))),
375 _ => Err(NodeDbError::storage(format!(
376 "expected integer for u64 column, got {v:?}"
377 ))),
378 }
379}
380
381fn value_as_string(v: &Value) -> NodeDbResult<String> {
382 match v {
383 Value::String(s) => Ok(s.clone()),
384 Value::Null => Ok(String::new()),
385 other => Err(NodeDbError::storage(format!(
386 "expected string column, got {other:?}"
387 ))),
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use std::collections::HashMap;
395
396 /// Mock implementation to verify the trait is object-safe and
397 /// can be used as `Arc<dyn NodeDb>`.
398 struct MockDb;
399
400 #[async_trait]
401 impl NodeDb for MockDb {
402 async fn vector_search(
403 &self,
404 _collection: &str,
405 _query: &[f32],
406 _k: usize,
407 _filter: Option<&MetadataFilter>,
408 ) -> NodeDbResult<Vec<SearchResult>> {
409 Ok(vec![SearchResult {
410 id: "vec-1".into(),
411 node_id: None,
412 distance: 0.1,
413 metadata: HashMap::new(),
414 }])
415 }
416
417 async fn vector_insert(
418 &self,
419 _collection: &str,
420 _id: &str,
421 _embedding: &[f32],
422 _metadata: Option<Document>,
423 ) -> NodeDbResult<()> {
424 Ok(())
425 }
426
427 async fn vector_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
428 Ok(())
429 }
430
431 async fn graph_traverse(
432 &self,
433 _start: &NodeId,
434 _depth: u8,
435 _edge_filter: Option<&EdgeFilter>,
436 ) -> NodeDbResult<SubGraph> {
437 Ok(SubGraph::empty())
438 }
439
440 async fn graph_insert_edge(
441 &self,
442 from: &NodeId,
443 to: &NodeId,
444 edge_type: &str,
445 _properties: Option<Document>,
446 ) -> NodeDbResult<EdgeId> {
447 Ok(EdgeId::from_components(
448 from.as_str(),
449 to.as_str(),
450 edge_type,
451 ))
452 }
453
454 async fn graph_delete_edge(&self, _edge_id: &EdgeId) -> NodeDbResult<()> {
455 Ok(())
456 }
457
458 async fn document_get(
459 &self,
460 _collection: &str,
461 id: &str,
462 ) -> NodeDbResult<Option<Document>> {
463 let mut doc = Document::new(id);
464 doc.set("title", Value::String("test".into()));
465 Ok(Some(doc))
466 }
467
468 async fn document_put(&self, _collection: &str, _doc: Document) -> NodeDbResult<()> {
469 Ok(())
470 }
471
472 async fn document_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
473 Ok(())
474 }
475
476 async fn execute_sql(&self, _query: &str, _params: &[Value]) -> NodeDbResult<QueryResult> {
477 Ok(QueryResult::empty())
478 }
479 }
480
481 /// Verify the trait is object-safe (can be used as `dyn NodeDb`).
482 #[test]
483 fn trait_is_object_safe() {
484 fn _accepts_dyn(_db: &dyn NodeDb) {}
485 let db = MockDb;
486 _accepts_dyn(&db);
487 }
488
489 /// Verify the trait can be wrapped in `Arc<dyn NodeDb>`.
490 #[test]
491 fn trait_works_with_arc() {
492 use std::sync::Arc;
493 let db: Arc<dyn NodeDb> = Arc::new(MockDb);
494 // Just verify it compiles — the Arc<dyn> pattern is the primary API.
495 let _ = db;
496 }
497
498 #[tokio::test]
499 async fn mock_vector_search() {
500 let db = MockDb;
501 let results = db
502 .vector_search("embeddings", &[0.1, 0.2, 0.3], 5, None)
503 .await
504 .unwrap();
505 assert_eq!(results.len(), 1);
506 assert_eq!(results[0].id, "vec-1");
507 assert!(results[0].distance < 1.0);
508 }
509
510 #[tokio::test]
511 async fn mock_vector_insert_and_delete() {
512 let db = MockDb;
513 db.vector_insert("coll", "v1", &[1.0, 2.0], None)
514 .await
515 .unwrap();
516 db.vector_delete("coll", "v1").await.unwrap();
517 }
518
519 #[tokio::test]
520 async fn mock_graph_operations() {
521 let db = MockDb;
522 let start = NodeId::new("alice");
523 let subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
524 assert_eq!(subgraph.node_count(), 0);
525
526 let from = NodeId::new("alice");
527 let to = NodeId::new("bob");
528 let edge_id = db
529 .graph_insert_edge(&from, &to, "KNOWS", None)
530 .await
531 .unwrap();
532 assert_eq!(edge_id.as_str(), "alice--KNOWS-->bob");
533
534 db.graph_delete_edge(&edge_id).await.unwrap();
535 }
536
537 #[tokio::test]
538 async fn mock_document_operations() {
539 let db = MockDb;
540 let doc = db.document_get("notes", "n1").await.unwrap().unwrap();
541 assert_eq!(doc.id, "n1");
542 assert_eq!(doc.get_str("title"), Some("test"));
543
544 let mut new_doc = Document::new("n2");
545 new_doc.set("body", Value::String("hello".into()));
546 db.document_put("notes", new_doc).await.unwrap();
547
548 db.document_delete("notes", "n1").await.unwrap();
549 }
550
551 #[tokio::test]
552 async fn mock_execute_sql() {
553 let db = MockDb;
554 let result = db.execute_sql("SELECT 1", &[]).await.unwrap();
555 assert_eq!(result.row_count(), 0);
556 }
557
558 /// Verify the full "one API, any runtime" pattern from the TDD.
559 #[tokio::test]
560 async fn unified_api_pattern() {
561 use std::sync::Arc;
562
563 // This is the pattern from NodeDB.md:
564 // let db: Arc<dyn NodeDb> = Arc::new(NodeDbLite::open(...));
565 // OR
566 // let db: Arc<dyn NodeDb> = Arc::new(NodeDbRemote::connect(...));
567 //
568 // Application code is identical either way:
569 let db: Arc<dyn NodeDb> = Arc::new(MockDb);
570
571 let results = db
572 .vector_search("knowledge_base", &[0.1, 0.2], 5, None)
573 .await
574 .unwrap();
575 assert!(!results.is_empty());
576
577 let start = NodeId::new(results[0].id.clone());
578 let _subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
579
580 let doc = Document::new("note-1");
581 db.document_put("notes", doc).await.unwrap();
582 }
583}