nodedb_client/traits.rs
1//! The `NodeDb` trait: unified query interface for both Origin and Lite.
2//!
3//! Application code writes against this trait once. The runtime determines
4//! whether queries execute locally (in-memory engines on Lite) or remotely
5//! (pgwire to Origin).
6//!
7//! All methods are `async` — on native this runs on Tokio, on WASM this
8//! runs on `wasm-bindgen-futures`.
9
10use async_trait::async_trait;
11
12use nodedb_types::document::Document;
13use nodedb_types::error::NodeDbResult;
14use nodedb_types::filter::{EdgeFilter, MetadataFilter};
15use nodedb_types::id::{EdgeId, NodeId};
16use nodedb_types::result::{QueryResult, SearchResult, SubGraph};
17use nodedb_types::text_search::TextSearchParams;
18use nodedb_types::value::Value;
19
20/// Unified database interface for NodeDB.
21///
22/// Two implementations:
23/// - `NodeDbLite`: executes queries against in-memory HNSW/CSR/Loro engines
24/// on the edge device. Writes produce CRDT deltas synced to Origin in background.
25/// - `NodeDbRemote`: translates trait calls into parameterized SQL and sends
26/// them over pgwire to the Origin cluster.
27///
28/// The developer writes agent logic once. Switching between local and cloud
29/// is a one-line configuration change.
30#[async_trait]
31pub trait NodeDb: Send + Sync {
32 // ─── Vector Operations ───────────────────────────────────────────
33
34 /// Search for the `k` nearest vectors to `query` in `collection`.
35 ///
36 /// Returns results ordered by ascending distance. Optional metadata
37 /// filter constrains which vectors are considered.
38 ///
39 /// On Lite: direct in-memory HNSW search. Sub-millisecond.
40 /// On Remote: translated to `SELECT ... ORDER BY embedding <-> $1 LIMIT $2`.
41 async fn vector_search(
42 &self,
43 collection: &str,
44 query: &[f32],
45 k: usize,
46 filter: Option<&MetadataFilter>,
47 ) -> NodeDbResult<Vec<SearchResult>>;
48
49 /// Insert a vector with optional metadata into `collection`.
50 ///
51 /// On Lite: inserts into in-memory HNSW + emits CRDT delta + persists to SQLite.
52 /// On Remote: translated to `INSERT INTO collection (id, embedding, metadata) VALUES (...)`.
53 async fn vector_insert(
54 &self,
55 collection: &str,
56 id: &str,
57 embedding: &[f32],
58 metadata: Option<Document>,
59 ) -> NodeDbResult<()>;
60
61 /// Delete a vector by ID from `collection`.
62 ///
63 /// On Lite: marks deleted in HNSW + emits CRDT tombstone.
64 /// On Remote: `DELETE FROM collection WHERE id = $1`.
65 async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
66
67 // ─── Graph Operations ────────────────────────────────────────────
68
69 /// Traverse the graph from `start` up to `depth` hops.
70 ///
71 /// Returns the discovered subgraph (nodes + edges). Optional edge filter
72 /// constrains which edges are followed during traversal.
73 ///
74 /// On Lite: direct CSR pointer-chasing in contiguous memory. Microseconds.
75 /// On Remote: `SELECT * FROM graph_traverse($1, $2, $3)`.
76 async fn graph_traverse(
77 &self,
78 start: &NodeId,
79 depth: u8,
80 edge_filter: Option<&EdgeFilter>,
81 ) -> NodeDbResult<SubGraph>;
82
83 /// Insert a directed edge from `from` to `to` with the given label.
84 ///
85 /// Returns the generated edge ID.
86 ///
87 /// On Lite: appends to mutable adjacency buffer + CRDT delta + SQLite.
88 /// On Remote: `INSERT INTO edges (src, dst, label, properties) VALUES (...)`.
89 async fn graph_insert_edge(
90 &self,
91 from: &NodeId,
92 to: &NodeId,
93 edge_type: &str,
94 properties: Option<Document>,
95 ) -> NodeDbResult<EdgeId>;
96
97 /// Delete a graph edge by ID.
98 ///
99 /// On Lite: marks deleted + CRDT tombstone.
100 /// On Remote: `DELETE FROM edges WHERE id = $1`.
101 async fn graph_delete_edge(&self, edge_id: &EdgeId) -> NodeDbResult<()>;
102
103 // ─── Document Operations ─────────────────────────────────────────
104
105 /// Get a document by ID from `collection`.
106 ///
107 /// On Lite: direct Loro state read. Sub-millisecond.
108 /// On Remote: `SELECT * FROM collection WHERE id = $1`.
109 async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult<Option<Document>>;
110
111 /// Put (insert or update) a document into `collection`.
112 ///
113 /// The document's `id` field determines the key. If a document with that
114 /// ID already exists, it is overwritten (last-writer-wins locally; CRDT
115 /// merge on sync).
116 ///
117 /// On Lite: Loro apply + CRDT delta + SQLite persist.
118 /// On Remote: `INSERT ... ON CONFLICT (id) DO UPDATE SET ...`.
119 async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()>;
120
121 /// Delete a document by ID from `collection`.
122 ///
123 /// On Lite: Loro delete + CRDT tombstone.
124 /// On Remote: `DELETE FROM collection WHERE id = $1`.
125 async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
126
127 // ─── Named Vector Fields ──────────────────────────────────────────
128
129 /// Insert a vector into a named field within a collection.
130 ///
131 /// Enables multiple embeddings per collection (e.g., "title_embedding",
132 /// "body_embedding") with independent HNSW indexes.
133 /// Default: delegates to `vector_insert()` ignoring field_name.
134 async fn vector_insert_field(
135 &self,
136 collection: &str,
137 field_name: &str,
138 id: &str,
139 embedding: &[f32],
140 metadata: Option<Document>,
141 ) -> NodeDbResult<()> {
142 let _ = field_name;
143 self.vector_insert(collection, id, embedding, metadata)
144 .await
145 }
146
147 /// Search a named vector field.
148 ///
149 /// Default: delegates to `vector_search()` ignoring field_name.
150 async fn vector_search_field(
151 &self,
152 collection: &str,
153 field_name: &str,
154 query: &[f32],
155 k: usize,
156 filter: Option<&MetadataFilter>,
157 ) -> NodeDbResult<Vec<SearchResult>> {
158 let _ = field_name;
159 self.vector_search(collection, query, k, filter).await
160 }
161
162 // ─── Graph Shortest Path ────────────────────────────────────────
163
164 /// Find the shortest path between two nodes.
165 ///
166 /// Returns the path as a list of node IDs, or None if no path exists
167 /// within `max_depth` hops. Uses bidirectional BFS.
168 async fn graph_shortest_path(
169 &self,
170 from: &NodeId,
171 to: &NodeId,
172 max_depth: u8,
173 edge_filter: Option<&EdgeFilter>,
174 ) -> NodeDbResult<Option<Vec<NodeId>>> {
175 let _ = (from, to, max_depth, edge_filter);
176 Ok(None)
177 }
178
179 // ─── Text Search ────────────────────────────────────────────────
180
181 /// Full-text search with BM25 scoring.
182 ///
183 /// Returns document IDs with relevance scores, ordered by descending score.
184 /// Pass [`TextSearchParams::default()`] for standard OR-mode non-fuzzy search.
185 async fn text_search(
186 &self,
187 collection: &str,
188 query: &str,
189 top_k: usize,
190 params: TextSearchParams,
191 ) -> NodeDbResult<Vec<SearchResult>> {
192 let _ = (collection, query, top_k, params);
193 Ok(Vec::new())
194 }
195
196 // ─── Batch Operations ───────────────────────────────────────────
197
198 /// Batch insert vectors — amortizes CRDT delta export to O(1) per batch.
199 async fn batch_vector_insert(
200 &self,
201 collection: &str,
202 vectors: &[(&str, &[f32])],
203 ) -> NodeDbResult<()> {
204 for &(id, embedding) in vectors {
205 self.vector_insert(collection, id, embedding, None).await?;
206 }
207 Ok(())
208 }
209
210 /// Batch insert graph edges — amortizes CRDT delta export to O(1) per batch.
211 async fn batch_graph_insert_edges(&self, edges: &[(&str, &str, &str)]) -> NodeDbResult<()> {
212 for &(from, to, label) in edges {
213 self.graph_insert_edge(&NodeId::new(from), &NodeId::new(to), label, None)
214 .await?;
215 }
216 Ok(())
217 }
218
219 // ─── SQL Escape Hatch ────────────────────────────────────────────
220
221 /// Execute a raw SQL query with parameters.
222 ///
223 /// On Lite: requires the `sql` feature flag (compiles in DataFusion parser).
224 /// Returns `NodeDbError::SqlNotEnabled` if the feature is not compiled in.
225 /// On Remote: pass-through to Origin via pgwire.
226 ///
227 /// For most AI agent workloads, the typed methods above are sufficient
228 /// and faster. Use this for BI tools, existing ORMs, or ad-hoc queries.
229 async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult<QueryResult>;
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use std::collections::HashMap;
236
237 /// Mock implementation to verify the trait is object-safe and
238 /// can be used as `Arc<dyn NodeDb>`.
239 struct MockDb;
240
241 #[async_trait]
242 impl NodeDb for MockDb {
243 async fn vector_search(
244 &self,
245 _collection: &str,
246 _query: &[f32],
247 _k: usize,
248 _filter: Option<&MetadataFilter>,
249 ) -> NodeDbResult<Vec<SearchResult>> {
250 Ok(vec![SearchResult {
251 id: "vec-1".into(),
252 node_id: None,
253 distance: 0.1,
254 metadata: HashMap::new(),
255 }])
256 }
257
258 async fn vector_insert(
259 &self,
260 _collection: &str,
261 _id: &str,
262 _embedding: &[f32],
263 _metadata: Option<Document>,
264 ) -> NodeDbResult<()> {
265 Ok(())
266 }
267
268 async fn vector_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
269 Ok(())
270 }
271
272 async fn graph_traverse(
273 &self,
274 _start: &NodeId,
275 _depth: u8,
276 _edge_filter: Option<&EdgeFilter>,
277 ) -> NodeDbResult<SubGraph> {
278 Ok(SubGraph::empty())
279 }
280
281 async fn graph_insert_edge(
282 &self,
283 from: &NodeId,
284 to: &NodeId,
285 edge_type: &str,
286 _properties: Option<Document>,
287 ) -> NodeDbResult<EdgeId> {
288 Ok(EdgeId::from_components(
289 from.as_str(),
290 to.as_str(),
291 edge_type,
292 ))
293 }
294
295 async fn graph_delete_edge(&self, _edge_id: &EdgeId) -> NodeDbResult<()> {
296 Ok(())
297 }
298
299 async fn document_get(
300 &self,
301 _collection: &str,
302 id: &str,
303 ) -> NodeDbResult<Option<Document>> {
304 let mut doc = Document::new(id);
305 doc.set("title", Value::String("test".into()));
306 Ok(Some(doc))
307 }
308
309 async fn document_put(&self, _collection: &str, _doc: Document) -> NodeDbResult<()> {
310 Ok(())
311 }
312
313 async fn document_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
314 Ok(())
315 }
316
317 async fn execute_sql(&self, _query: &str, _params: &[Value]) -> NodeDbResult<QueryResult> {
318 Ok(QueryResult::empty())
319 }
320 }
321
322 /// Verify the trait is object-safe (can be used as `dyn NodeDb`).
323 #[test]
324 fn trait_is_object_safe() {
325 fn _accepts_dyn(_db: &dyn NodeDb) {}
326 let db = MockDb;
327 _accepts_dyn(&db);
328 }
329
330 /// Verify the trait can be wrapped in `Arc<dyn NodeDb>`.
331 #[test]
332 fn trait_works_with_arc() {
333 use std::sync::Arc;
334 let db: Arc<dyn NodeDb> = Arc::new(MockDb);
335 // Just verify it compiles — the Arc<dyn> pattern is the primary API.
336 let _ = db;
337 }
338
339 #[tokio::test]
340 async fn mock_vector_search() {
341 let db = MockDb;
342 let results = db
343 .vector_search("embeddings", &[0.1, 0.2, 0.3], 5, None)
344 .await
345 .unwrap();
346 assert_eq!(results.len(), 1);
347 assert_eq!(results[0].id, "vec-1");
348 assert!(results[0].distance < 1.0);
349 }
350
351 #[tokio::test]
352 async fn mock_vector_insert_and_delete() {
353 let db = MockDb;
354 db.vector_insert("coll", "v1", &[1.0, 2.0], None)
355 .await
356 .unwrap();
357 db.vector_delete("coll", "v1").await.unwrap();
358 }
359
360 #[tokio::test]
361 async fn mock_graph_operations() {
362 let db = MockDb;
363 let start = NodeId::new("alice");
364 let subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
365 assert_eq!(subgraph.node_count(), 0);
366
367 let from = NodeId::new("alice");
368 let to = NodeId::new("bob");
369 let edge_id = db
370 .graph_insert_edge(&from, &to, "KNOWS", None)
371 .await
372 .unwrap();
373 assert_eq!(edge_id.as_str(), "alice--KNOWS-->bob");
374
375 db.graph_delete_edge(&edge_id).await.unwrap();
376 }
377
378 #[tokio::test]
379 async fn mock_document_operations() {
380 let db = MockDb;
381 let doc = db.document_get("notes", "n1").await.unwrap().unwrap();
382 assert_eq!(doc.id, "n1");
383 assert_eq!(doc.get_str("title"), Some("test"));
384
385 let mut new_doc = Document::new("n2");
386 new_doc.set("body", Value::String("hello".into()));
387 db.document_put("notes", new_doc).await.unwrap();
388
389 db.document_delete("notes", "n1").await.unwrap();
390 }
391
392 #[tokio::test]
393 async fn mock_execute_sql() {
394 let db = MockDb;
395 let result = db.execute_sql("SELECT 1", &[]).await.unwrap();
396 assert_eq!(result.row_count(), 0);
397 }
398
399 /// Verify the full "one API, any runtime" pattern from the TDD.
400 #[tokio::test]
401 async fn unified_api_pattern() {
402 use std::sync::Arc;
403
404 // This is the pattern from NodeDB.md:
405 // let db: Arc<dyn NodeDb> = Arc::new(NodeDbLite::open(...));
406 // OR
407 // let db: Arc<dyn NodeDb> = Arc::new(NodeDbRemote::connect(...));
408 //
409 // Application code is identical either way:
410 let db: Arc<dyn NodeDb> = Arc::new(MockDb);
411
412 let results = db
413 .vector_search("knowledge_base", &[0.1, 0.2], 5, None)
414 .await
415 .unwrap();
416 assert!(!results.is_empty());
417
418 let start = NodeId::new(results[0].id.clone());
419 let _subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
420
421 let doc = Document::new("note-1");
422 db.document_put("notes", doc).await.unwrap();
423 }
424}