Skip to main content

nodedb_client/
traits.rs

1//! The `NodeDb` trait: unified query interface for both Origin and Lite.
2//!
3//! Application code writes against this trait once. The runtime determines
4//! whether queries execute locally (in-memory engines on Lite) or remotely
5//! (pgwire to Origin).
6//!
7//! All methods are `async` — on native this runs on Tokio, on WASM this
8//! runs on `wasm-bindgen-futures`.
9
10use async_trait::async_trait;
11
12use nodedb_types::document::Document;
13use nodedb_types::error::NodeDbResult;
14use nodedb_types::filter::{EdgeFilter, MetadataFilter};
15use nodedb_types::id::{EdgeId, NodeId};
16use nodedb_types::result::{QueryResult, SearchResult, SubGraph};
17use nodedb_types::text_search::TextSearchParams;
18use nodedb_types::value::Value;
19
20/// Unified database interface for NodeDB.
21///
22/// Two implementations:
23/// - `NodeDbLite`: executes queries against in-memory HNSW/CSR/Loro engines
24///   on the edge device. Writes produce CRDT deltas synced to Origin in background.
25/// - `NodeDbRemote`: translates trait calls into parameterized SQL and sends
26///   them over pgwire to the Origin cluster.
27///
28/// The developer writes agent logic once. Switching between local and cloud
29/// is a one-line configuration change.
30#[async_trait]
31pub trait NodeDb: Send + Sync {
32    // ─── Vector Operations ───────────────────────────────────────────
33
34    /// Search for the `k` nearest vectors to `query` in `collection`.
35    ///
36    /// Returns results ordered by ascending distance. Optional metadata
37    /// filter constrains which vectors are considered.
38    ///
39    /// On Lite: direct in-memory HNSW search. Sub-millisecond.
40    /// On Remote: translated to `SELECT ... ORDER BY embedding <-> $1 LIMIT $2`.
41    async fn vector_search(
42        &self,
43        collection: &str,
44        query: &[f32],
45        k: usize,
46        filter: Option<&MetadataFilter>,
47    ) -> NodeDbResult<Vec<SearchResult>>;
48
49    /// Insert a vector with optional metadata into `collection`.
50    ///
51    /// On Lite: inserts into in-memory HNSW + emits CRDT delta + persists to SQLite.
52    /// On Remote: translated to `INSERT INTO collection (id, embedding, metadata) VALUES (...)`.
53    async fn vector_insert(
54        &self,
55        collection: &str,
56        id: &str,
57        embedding: &[f32],
58        metadata: Option<Document>,
59    ) -> NodeDbResult<()>;
60
61    /// Delete a vector by ID from `collection`.
62    ///
63    /// On Lite: marks deleted in HNSW + emits CRDT tombstone.
64    /// On Remote: `DELETE FROM collection WHERE id = $1`.
65    async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
66
67    // ─── Graph Operations ────────────────────────────────────────────
68
69    /// Traverse the graph from `start` up to `depth` hops.
70    ///
71    /// Returns the discovered subgraph (nodes + edges). Optional edge filter
72    /// constrains which edges are followed during traversal.
73    ///
74    /// On Lite: direct CSR pointer-chasing in contiguous memory. Microseconds.
75    /// On Remote: `SELECT * FROM graph_traverse($1, $2, $3)`.
76    async fn graph_traverse(
77        &self,
78        start: &NodeId,
79        depth: u8,
80        edge_filter: Option<&EdgeFilter>,
81    ) -> NodeDbResult<SubGraph>;
82
83    /// Insert a directed edge from `from` to `to` with the given label.
84    ///
85    /// Returns the generated edge ID.
86    ///
87    /// On Lite: appends to mutable adjacency buffer + CRDT delta + SQLite.
88    /// On Remote: `INSERT INTO edges (src, dst, label, properties) VALUES (...)`.
89    async fn graph_insert_edge(
90        &self,
91        from: &NodeId,
92        to: &NodeId,
93        edge_type: &str,
94        properties: Option<Document>,
95    ) -> NodeDbResult<EdgeId>;
96
97    /// Delete a graph edge by ID.
98    ///
99    /// On Lite: marks deleted + CRDT tombstone.
100    /// On Remote: `DELETE FROM edges WHERE id = $1`.
101    async fn graph_delete_edge(&self, edge_id: &EdgeId) -> NodeDbResult<()>;
102
103    // ─── Document Operations ─────────────────────────────────────────
104
105    /// Get a document by ID from `collection`.
106    ///
107    /// On Lite: direct Loro state read. Sub-millisecond.
108    /// On Remote: `SELECT * FROM collection WHERE id = $1`.
109    async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult<Option<Document>>;
110
111    /// Put (insert or update) a document into `collection`.
112    ///
113    /// The document's `id` field determines the key. If a document with that
114    /// ID already exists, it is overwritten (last-writer-wins locally; CRDT
115    /// merge on sync).
116    ///
117    /// On Lite: Loro apply + CRDT delta + SQLite persist.
118    /// On Remote: `INSERT ... ON CONFLICT (id) DO UPDATE SET ...`.
119    async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()>;
120
121    /// Delete a document by ID from `collection`.
122    ///
123    /// On Lite: Loro delete + CRDT tombstone.
124    /// On Remote: `DELETE FROM collection WHERE id = $1`.
125    async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
126
127    // ─── Named Vector Fields ──────────────────────────────────────────
128
129    /// Insert a vector into a named field within a collection.
130    ///
131    /// Enables multiple embeddings per collection (e.g., "title_embedding",
132    /// "body_embedding") with independent HNSW indexes.
133    /// Default: delegates to `vector_insert()` ignoring field_name.
134    async fn vector_insert_field(
135        &self,
136        collection: &str,
137        field_name: &str,
138        id: &str,
139        embedding: &[f32],
140        metadata: Option<Document>,
141    ) -> NodeDbResult<()> {
142        let _ = field_name;
143        self.vector_insert(collection, id, embedding, metadata)
144            .await
145    }
146
147    /// Search a named vector field.
148    ///
149    /// Default: delegates to `vector_search()` ignoring field_name.
150    async fn vector_search_field(
151        &self,
152        collection: &str,
153        field_name: &str,
154        query: &[f32],
155        k: usize,
156        filter: Option<&MetadataFilter>,
157    ) -> NodeDbResult<Vec<SearchResult>> {
158        let _ = field_name;
159        self.vector_search(collection, query, k, filter).await
160    }
161
162    // ─── Graph Shortest Path ────────────────────────────────────────
163
164    /// Find the shortest path between two nodes.
165    ///
166    /// Returns the path as a list of node IDs, or None if no path exists
167    /// within `max_depth` hops. Uses bidirectional BFS.
168    async fn graph_shortest_path(
169        &self,
170        from: &NodeId,
171        to: &NodeId,
172        max_depth: u8,
173        edge_filter: Option<&EdgeFilter>,
174    ) -> NodeDbResult<Option<Vec<NodeId>>> {
175        let _ = (from, to, max_depth, edge_filter);
176        Ok(None)
177    }
178
179    // ─── Text Search ────────────────────────────────────────────────
180
181    /// Full-text search with BM25 scoring.
182    ///
183    /// Returns document IDs with relevance scores, ordered by descending score.
184    /// Pass [`TextSearchParams::default()`] for standard OR-mode non-fuzzy search.
185    async fn text_search(
186        &self,
187        collection: &str,
188        query: &str,
189        top_k: usize,
190        params: TextSearchParams,
191    ) -> NodeDbResult<Vec<SearchResult>> {
192        let _ = (collection, query, top_k, params);
193        Ok(Vec::new())
194    }
195
196    // ─── Batch Operations ───────────────────────────────────────────
197
198    /// Batch insert vectors — amortizes CRDT delta export to O(1) per batch.
199    async fn batch_vector_insert(
200        &self,
201        collection: &str,
202        vectors: &[(&str, &[f32])],
203    ) -> NodeDbResult<()> {
204        for &(id, embedding) in vectors {
205            self.vector_insert(collection, id, embedding, None).await?;
206        }
207        Ok(())
208    }
209
210    /// Batch insert graph edges — amortizes CRDT delta export to O(1) per batch.
211    async fn batch_graph_insert_edges(&self, edges: &[(&str, &str, &str)]) -> NodeDbResult<()> {
212        for &(from, to, label) in edges {
213            self.graph_insert_edge(&NodeId::new(from), &NodeId::new(to), label, None)
214                .await?;
215        }
216        Ok(())
217    }
218
219    // ─── SQL Escape Hatch ────────────────────────────────────────────
220
221    /// Execute a raw SQL query with parameters.
222    ///
223    /// On Lite: requires the `sql` feature flag (compiles in DataFusion parser).
224    ///   Returns `NodeDbError::SqlNotEnabled` if the feature is not compiled in.
225    /// On Remote: pass-through to Origin via pgwire.
226    ///
227    /// For most AI agent workloads, the typed methods above are sufficient
228    /// and faster. Use this for BI tools, existing ORMs, or ad-hoc queries.
229    async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult<QueryResult>;
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use std::collections::HashMap;
236
237    /// Mock implementation to verify the trait is object-safe and
238    /// can be used as `Arc<dyn NodeDb>`.
239    struct MockDb;
240
241    #[async_trait]
242    impl NodeDb for MockDb {
243        async fn vector_search(
244            &self,
245            _collection: &str,
246            _query: &[f32],
247            _k: usize,
248            _filter: Option<&MetadataFilter>,
249        ) -> NodeDbResult<Vec<SearchResult>> {
250            Ok(vec![SearchResult {
251                id: "vec-1".into(),
252                node_id: None,
253                distance: 0.1,
254                metadata: HashMap::new(),
255            }])
256        }
257
258        async fn vector_insert(
259            &self,
260            _collection: &str,
261            _id: &str,
262            _embedding: &[f32],
263            _metadata: Option<Document>,
264        ) -> NodeDbResult<()> {
265            Ok(())
266        }
267
268        async fn vector_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
269            Ok(())
270        }
271
272        async fn graph_traverse(
273            &self,
274            _start: &NodeId,
275            _depth: u8,
276            _edge_filter: Option<&EdgeFilter>,
277        ) -> NodeDbResult<SubGraph> {
278            Ok(SubGraph::empty())
279        }
280
281        async fn graph_insert_edge(
282            &self,
283            from: &NodeId,
284            to: &NodeId,
285            edge_type: &str,
286            _properties: Option<Document>,
287        ) -> NodeDbResult<EdgeId> {
288            Ok(EdgeId::from_components(
289                from.as_str(),
290                to.as_str(),
291                edge_type,
292            ))
293        }
294
295        async fn graph_delete_edge(&self, _edge_id: &EdgeId) -> NodeDbResult<()> {
296            Ok(())
297        }
298
299        async fn document_get(
300            &self,
301            _collection: &str,
302            id: &str,
303        ) -> NodeDbResult<Option<Document>> {
304            let mut doc = Document::new(id);
305            doc.set("title", Value::String("test".into()));
306            Ok(Some(doc))
307        }
308
309        async fn document_put(&self, _collection: &str, _doc: Document) -> NodeDbResult<()> {
310            Ok(())
311        }
312
313        async fn document_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
314            Ok(())
315        }
316
317        async fn execute_sql(&self, _query: &str, _params: &[Value]) -> NodeDbResult<QueryResult> {
318            Ok(QueryResult::empty())
319        }
320    }
321
322    /// Verify the trait is object-safe (can be used as `dyn NodeDb`).
323    #[test]
324    fn trait_is_object_safe() {
325        fn _accepts_dyn(_db: &dyn NodeDb) {}
326        let db = MockDb;
327        _accepts_dyn(&db);
328    }
329
330    /// Verify the trait can be wrapped in `Arc<dyn NodeDb>`.
331    #[test]
332    fn trait_works_with_arc() {
333        use std::sync::Arc;
334        let db: Arc<dyn NodeDb> = Arc::new(MockDb);
335        // Just verify it compiles — the Arc<dyn> pattern is the primary API.
336        let _ = db;
337    }
338
339    #[tokio::test]
340    async fn mock_vector_search() {
341        let db = MockDb;
342        let results = db
343            .vector_search("embeddings", &[0.1, 0.2, 0.3], 5, None)
344            .await
345            .unwrap();
346        assert_eq!(results.len(), 1);
347        assert_eq!(results[0].id, "vec-1");
348        assert!(results[0].distance < 1.0);
349    }
350
351    #[tokio::test]
352    async fn mock_vector_insert_and_delete() {
353        let db = MockDb;
354        db.vector_insert("coll", "v1", &[1.0, 2.0], None)
355            .await
356            .unwrap();
357        db.vector_delete("coll", "v1").await.unwrap();
358    }
359
360    #[tokio::test]
361    async fn mock_graph_operations() {
362        let db = MockDb;
363        let start = NodeId::new("alice");
364        let subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
365        assert_eq!(subgraph.node_count(), 0);
366
367        let from = NodeId::new("alice");
368        let to = NodeId::new("bob");
369        let edge_id = db
370            .graph_insert_edge(&from, &to, "KNOWS", None)
371            .await
372            .unwrap();
373        assert_eq!(edge_id.as_str(), "alice--KNOWS-->bob");
374
375        db.graph_delete_edge(&edge_id).await.unwrap();
376    }
377
378    #[tokio::test]
379    async fn mock_document_operations() {
380        let db = MockDb;
381        let doc = db.document_get("notes", "n1").await.unwrap().unwrap();
382        assert_eq!(doc.id, "n1");
383        assert_eq!(doc.get_str("title"), Some("test"));
384
385        let mut new_doc = Document::new("n2");
386        new_doc.set("body", Value::String("hello".into()));
387        db.document_put("notes", new_doc).await.unwrap();
388
389        db.document_delete("notes", "n1").await.unwrap();
390    }
391
392    #[tokio::test]
393    async fn mock_execute_sql() {
394        let db = MockDb;
395        let result = db.execute_sql("SELECT 1", &[]).await.unwrap();
396        assert_eq!(result.row_count(), 0);
397    }
398
399    /// Verify the full "one API, any runtime" pattern from the TDD.
400    #[tokio::test]
401    async fn unified_api_pattern() {
402        use std::sync::Arc;
403
404        // This is the pattern from NodeDB.md:
405        // let db: Arc<dyn NodeDb> = Arc::new(NodeDbLite::open(...));
406        //   OR
407        // let db: Arc<dyn NodeDb> = Arc::new(NodeDbRemote::connect(...));
408        //
409        // Application code is identical either way:
410        let db: Arc<dyn NodeDb> = Arc::new(MockDb);
411
412        let results = db
413            .vector_search("knowledge_base", &[0.1, 0.2], 5, None)
414            .await
415            .unwrap();
416        assert!(!results.is_empty());
417
418        let start = NodeId::new(results[0].id.clone());
419        let _subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
420
421        let doc = Document::new("note-1");
422        db.document_put("notes", doc).await.unwrap();
423    }
424}