Skip to main content

nodedb_client/
traits.rs

1//! The `NodeDb` trait: unified query interface for both Origin and Lite.
2//!
3//! Application code writes against this trait once. The runtime determines
4//! whether queries execute locally (in-memory engines on Lite) or remotely
5//! (pgwire to Origin).
6//!
7//! All methods are `async` — on native this runs on Tokio, on WASM this
8//! runs on `wasm-bindgen-futures`.
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use nodedb_types::document::Document;
15use nodedb_types::dropped_collection::DroppedCollection;
16use nodedb_types::error::{NodeDbError, NodeDbResult};
17use nodedb_types::filter::{EdgeFilter, MetadataFilter};
18use nodedb_types::id::{EdgeId, NodeId};
19use nodedb_types::result::{QueryResult, SearchResult, SubGraph};
20use nodedb_types::text_search::TextSearchParams;
21use nodedb_types::value::Value;
22
23/// Event passed to `NodeDb::on_collection_purged` handlers.
24///
25/// Emitted on the sync client when Origin pushes a `CollectionPurged`
26/// wire message and on Lite after local hard-delete completes, so
27/// application code can flush UI caches, drop derived indexes, etc.
28/// Handler callsites must not block — the dispatch path is on the
29/// sync client's receive loop.
30#[derive(Debug, Clone)]
31pub struct CollectionPurgedEvent {
32    pub tenant_id: u32,
33    pub name: String,
34    /// WAL LSN at which the purge was applied. Handlers can compare
35    /// this against locally-observed LSNs for resume/replay logic.
36    pub purge_lsn: u64,
37}
38
39/// Handler registered via `NodeDb::on_collection_purged`. Fn-ref
40/// (not FnMut) so the same handler can fire from multiple threads
41/// without interior mutability ceremony at every call site.
42pub type CollectionPurgedHandler = Arc<dyn Fn(CollectionPurgedEvent) + Send + Sync + 'static>;
43
44/// Unified database interface for NodeDB.
45///
46/// Two implementations:
47/// - `NodeDbLite`: executes queries against in-memory HNSW/CSR/Loro engines
48///   on the edge device. Writes produce CRDT deltas synced to Origin in background.
49/// - `NodeDbRemote`: translates trait calls into parameterized SQL and sends
50///   them over pgwire to the Origin cluster.
51///
52/// The developer writes agent logic once. Switching between local and cloud
53/// is a one-line configuration change.
54#[async_trait]
55pub trait NodeDb: Send + Sync {
56    // ─── Vector Operations ───────────────────────────────────────────
57
58    /// Search for the `k` nearest vectors to `query` in `collection`.
59    ///
60    /// Returns results ordered by ascending distance. Optional metadata
61    /// filter constrains which vectors are considered.
62    ///
63    /// On Lite: direct in-memory HNSW search. Sub-millisecond.
64    /// On Remote: translated to `SELECT ... ORDER BY embedding <-> $1 LIMIT $2`.
65    async fn vector_search(
66        &self,
67        collection: &str,
68        query: &[f32],
69        k: usize,
70        filter: Option<&MetadataFilter>,
71    ) -> NodeDbResult<Vec<SearchResult>>;
72
73    /// Insert a vector with optional metadata into `collection`.
74    ///
75    /// On Lite: inserts into in-memory HNSW + emits CRDT delta + persists to SQLite.
76    /// On Remote: translated to `INSERT INTO collection (id, embedding, metadata) VALUES (...)`.
77    async fn vector_insert(
78        &self,
79        collection: &str,
80        id: &str,
81        embedding: &[f32],
82        metadata: Option<Document>,
83    ) -> NodeDbResult<()>;
84
85    /// Delete a vector by ID from `collection`.
86    ///
87    /// On Lite: marks deleted in HNSW + emits CRDT tombstone.
88    /// On Remote: `DELETE FROM collection WHERE id = $1`.
89    async fn vector_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
90
91    // ─── Graph Operations ────────────────────────────────────────────
92
93    /// Traverse the graph from `start` up to `depth` hops.
94    ///
95    /// Returns the discovered subgraph (nodes + edges). Optional edge filter
96    /// constrains which edges are followed during traversal.
97    ///
98    /// On Lite: direct CSR pointer-chasing in contiguous memory. Microseconds.
99    /// On Remote: `SELECT * FROM graph_traverse($1, $2, $3)`.
100    async fn graph_traverse(
101        &self,
102        start: &NodeId,
103        depth: u8,
104        edge_filter: Option<&EdgeFilter>,
105    ) -> NodeDbResult<SubGraph>;
106
107    /// Insert a directed edge from `from` to `to` with the given label.
108    ///
109    /// Returns the generated edge ID.
110    ///
111    /// On Lite: appends to mutable adjacency buffer + CRDT delta + SQLite.
112    /// On Remote: `INSERT INTO edges (src, dst, label, properties) VALUES (...)`.
113    async fn graph_insert_edge(
114        &self,
115        from: &NodeId,
116        to: &NodeId,
117        edge_type: &str,
118        properties: Option<Document>,
119    ) -> NodeDbResult<EdgeId>;
120
121    /// Delete a graph edge by ID.
122    ///
123    /// On Lite: marks deleted + CRDT tombstone.
124    /// On Remote: `DELETE FROM edges WHERE id = $1`.
125    async fn graph_delete_edge(&self, edge_id: &EdgeId) -> NodeDbResult<()>;
126
127    // ─── Document Operations ─────────────────────────────────────────
128
129    /// Get a document by ID from `collection`.
130    ///
131    /// On Lite: direct Loro state read. Sub-millisecond.
132    /// On Remote: `SELECT * FROM collection WHERE id = $1`.
133    async fn document_get(&self, collection: &str, id: &str) -> NodeDbResult<Option<Document>>;
134
135    /// Put (insert or update) a document into `collection`.
136    ///
137    /// The document's `id` field determines the key. If a document with that
138    /// ID already exists, it is overwritten (last-writer-wins locally; CRDT
139    /// merge on sync).
140    ///
141    /// On Lite: Loro apply + CRDT delta + SQLite persist.
142    /// On Remote: `INSERT ... ON CONFLICT (id) DO UPDATE SET ...`.
143    async fn document_put(&self, collection: &str, doc: Document) -> NodeDbResult<()>;
144
145    /// Delete a document by ID from `collection`.
146    ///
147    /// On Lite: Loro delete + CRDT tombstone.
148    /// On Remote: `DELETE FROM collection WHERE id = $1`.
149    async fn document_delete(&self, collection: &str, id: &str) -> NodeDbResult<()>;
150
151    // ─── Named Vector Fields ──────────────────────────────────────────
152
153    /// Insert a vector into a named field within a collection.
154    ///
155    /// Enables multiple embeddings per collection (e.g., "title_embedding",
156    /// "body_embedding") with independent HNSW indexes.
157    /// Default: delegates to `vector_insert()` ignoring field_name.
158    async fn vector_insert_field(
159        &self,
160        collection: &str,
161        field_name: &str,
162        id: &str,
163        embedding: &[f32],
164        metadata: Option<Document>,
165    ) -> NodeDbResult<()> {
166        let _ = field_name;
167        self.vector_insert(collection, id, embedding, metadata)
168            .await
169    }
170
171    /// Search a named vector field.
172    ///
173    /// Default: delegates to `vector_search()` ignoring field_name.
174    async fn vector_search_field(
175        &self,
176        collection: &str,
177        field_name: &str,
178        query: &[f32],
179        k: usize,
180        filter: Option<&MetadataFilter>,
181    ) -> NodeDbResult<Vec<SearchResult>> {
182        let _ = field_name;
183        self.vector_search(collection, query, k, filter).await
184    }
185
186    // ─── Graph Shortest Path ────────────────────────────────────────
187
188    /// Find the shortest path between two nodes.
189    ///
190    /// Returns the path as a list of node IDs, or None if no path exists
191    /// within `max_depth` hops. Uses bidirectional BFS.
192    async fn graph_shortest_path(
193        &self,
194        from: &NodeId,
195        to: &NodeId,
196        max_depth: u8,
197        edge_filter: Option<&EdgeFilter>,
198    ) -> NodeDbResult<Option<Vec<NodeId>>> {
199        let _ = (from, to, max_depth, edge_filter);
200        Ok(None)
201    }
202
203    // ─── Text Search ────────────────────────────────────────────────
204
205    /// Full-text search with BM25 scoring.
206    ///
207    /// Returns document IDs with relevance scores, ordered by descending score.
208    /// Pass [`TextSearchParams::default()`] for standard OR-mode non-fuzzy search.
209    async fn text_search(
210        &self,
211        collection: &str,
212        query: &str,
213        top_k: usize,
214        params: TextSearchParams,
215    ) -> NodeDbResult<Vec<SearchResult>> {
216        let _ = (collection, query, top_k, params);
217        Ok(Vec::new())
218    }
219
220    // ─── Batch Operations ───────────────────────────────────────────
221
222    /// Batch insert vectors — amortizes CRDT delta export to O(1) per batch.
223    async fn batch_vector_insert(
224        &self,
225        collection: &str,
226        vectors: &[(&str, &[f32])],
227    ) -> NodeDbResult<()> {
228        for &(id, embedding) in vectors {
229            self.vector_insert(collection, id, embedding, None).await?;
230        }
231        Ok(())
232    }
233
234    /// Batch insert graph edges — amortizes CRDT delta export to O(1) per batch.
235    async fn batch_graph_insert_edges(&self, edges: &[(&str, &str, &str)]) -> NodeDbResult<()> {
236        for &(from, to, label) in edges {
237            self.graph_insert_edge(&NodeId::new(from), &NodeId::new(to), label, None)
238                .await?;
239        }
240        Ok(())
241    }
242
243    // ─── SQL Escape Hatch ────────────────────────────────────────────
244
245    /// Execute a raw SQL query with parameters.
246    ///
247    /// On Lite: requires the `sql` feature flag (compiles in DataFusion parser).
248    ///   Returns `NodeDbError::SqlNotEnabled` if the feature is not compiled in.
249    /// On Remote: pass-through to Origin via pgwire.
250    ///
251    /// For most AI agent workloads, the typed methods above are sufficient
252    /// and faster. Use this for BI tools, existing ORMs, or ad-hoc queries.
253    async fn execute_sql(&self, query: &str, params: &[Value]) -> NodeDbResult<QueryResult>;
254
255    // ─── Collection Lifecycle (soft-delete / undrop / hard-delete) ───
256
257    /// Restore a soft-deleted collection within its retention window.
258    ///
259    /// Equivalent to `UNDROP COLLECTION <name>`. Fails with 42P01 if
260    /// the retention window has elapsed and the row is gone, or with
261    /// 42501 if the caller is neither preserved owner nor admin.
262    ///
263    /// Default impl routes through `execute_sql` so any implementation
264    /// that can execute SQL inherits the correct behavior for free.
265    async fn undrop_collection(&self, name: &str) -> NodeDbResult<()> {
266        let sql = format!("UNDROP COLLECTION {}", quote_ident(name));
267        self.execute_sql(&sql, &[]).await?;
268        Ok(())
269    }
270
271    /// Hard-delete a collection, skipping soft-delete and retention.
272    ///
273    /// Equivalent to `DROP COLLECTION <name> PURGE`. Admin-only on the
274    /// server; the server rejects non-admin callers with 42501.
275    /// Bypasses the retention safety net — data is unrecoverable.
276    async fn drop_collection_purge(&self, name: &str) -> NodeDbResult<()> {
277        let sql = format!("DROP COLLECTION {} PURGE", quote_ident(name));
278        self.execute_sql(&sql, &[]).await?;
279        Ok(())
280    }
281
282    /// List every soft-deleted collection in the current tenant that
283    /// is still within its retention window.
284    ///
285    /// Equivalent to `SELECT tenant_id, name, owner, deactivated_at_ns,
286    /// retention_expires_at_ns FROM _system.dropped_collections`.
287    /// Returns `Vec<DroppedCollection>` — empty if no soft-deleted rows
288    /// exist for the caller's tenant.
289    async fn list_dropped_collections(&self) -> NodeDbResult<Vec<DroppedCollection>> {
290        let sql = "SELECT tenant_id, name, owner, engine_type, \
291                   deactivated_at_ns, retention_expires_at_ns \
292                   FROM _system.dropped_collections";
293        let result = self.execute_sql(sql, &[]).await?;
294        parse_dropped_collection_rows(&result)
295    }
296
297    /// Register a handler fired when a collection the caller has
298    /// synced is purged on Origin and the local copy is removed.
299    ///
300    /// Default impl returns `NodeDbError::storage` with a
301    /// `"not supported"` detail — implementations that maintain a
302    /// sync client (Lite, any future push-capable remote client)
303    /// override with registration into their internal handler list.
304    /// Stateless clients (pgwire-only `NodeDbRemote`) have nothing
305    /// to push, so the default rejection is the correct behavior.
306    async fn on_collection_purged(&self, _handler: CollectionPurgedHandler) -> NodeDbResult<()> {
307        Err(NodeDbError::storage(
308            "on_collection_purged is not supported on this client — \
309             requires a push-capable sync connection (NodeDbLite or a \
310             sync-enabled remote client)",
311        ))
312    }
313}
314
315/// Quote a SQL identifier. Mirrors the pgwire-side rule used by
316/// `remote_parse::quote_identifier`: wrap in double-quotes only if
317/// the name contains anything other than `[A-Za-z0-9_]` or starts
318/// with a digit. Unquoted fast-path keeps the usual case cheap.
319fn quote_ident(name: &str) -> String {
320    let needs_quote = name.is_empty()
321        || name.chars().next().is_some_and(|c| c.is_ascii_digit())
322        || !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_');
323    if needs_quote {
324        let escaped = name.replace('"', "\"\"");
325        format!("\"{escaped}\"")
326    } else {
327        name.to_string()
328    }
329}
330
331/// Decode `_system.dropped_collections` rows into
332/// `Vec<DroppedCollection>`. Each row is a `Vec<Value>` aligned with
333/// the column order declared in the SELECT list above.
334fn parse_dropped_collection_rows(result: &QueryResult) -> NodeDbResult<Vec<DroppedCollection>> {
335    let mut out = Vec::with_capacity(result.rows.len());
336    for row in &result.rows {
337        if row.len() < 6 {
338            return Err(NodeDbError::storage(format!(
339                "dropped_collections row has {} columns; expected 6 \
340                 (tenant_id, name, owner, engine_type, deactivated_at_ns, \
341                 retention_expires_at_ns)",
342                row.len()
343            )));
344        }
345        out.push(DroppedCollection {
346            tenant_id: value_as_u32(&row[0])?,
347            name: value_as_string(&row[1])?,
348            owner: value_as_string(&row[2])?,
349            engine_type: value_as_string(&row[3])?,
350            deactivated_at_ns: value_as_u64(&row[4])?,
351            retention_expires_at_ns: value_as_u64(&row[5])?,
352        });
353    }
354    Ok(out)
355}
356
357fn value_as_u32(v: &Value) -> NodeDbResult<u32> {
358    match v {
359        Value::Integer(i) => Ok(*i as u32),
360        Value::String(s) => s
361            .parse::<u32>()
362            .map_err(|e| NodeDbError::storage(format!("parse u32 from '{s}': {e}"))),
363        _ => Err(NodeDbError::storage(format!(
364            "expected integer for u32 column, got {v:?}"
365        ))),
366    }
367}
368
369fn value_as_u64(v: &Value) -> NodeDbResult<u64> {
370    match v {
371        Value::Integer(i) => Ok(*i as u64),
372        Value::String(s) => s
373            .parse::<u64>()
374            .map_err(|e| NodeDbError::storage(format!("parse u64 from '{s}': {e}"))),
375        _ => Err(NodeDbError::storage(format!(
376            "expected integer for u64 column, got {v:?}"
377        ))),
378    }
379}
380
381fn value_as_string(v: &Value) -> NodeDbResult<String> {
382    match v {
383        Value::String(s) => Ok(s.clone()),
384        Value::Null => Ok(String::new()),
385        other => Err(NodeDbError::storage(format!(
386            "expected string column, got {other:?}"
387        ))),
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use std::collections::HashMap;
395
396    /// Mock implementation to verify the trait is object-safe and
397    /// can be used as `Arc<dyn NodeDb>`.
398    struct MockDb;
399
400    #[async_trait]
401    impl NodeDb for MockDb {
402        async fn vector_search(
403            &self,
404            _collection: &str,
405            _query: &[f32],
406            _k: usize,
407            _filter: Option<&MetadataFilter>,
408        ) -> NodeDbResult<Vec<SearchResult>> {
409            Ok(vec![SearchResult {
410                id: "vec-1".into(),
411                node_id: None,
412                distance: 0.1,
413                metadata: HashMap::new(),
414            }])
415        }
416
417        async fn vector_insert(
418            &self,
419            _collection: &str,
420            _id: &str,
421            _embedding: &[f32],
422            _metadata: Option<Document>,
423        ) -> NodeDbResult<()> {
424            Ok(())
425        }
426
427        async fn vector_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
428            Ok(())
429        }
430
431        async fn graph_traverse(
432            &self,
433            _start: &NodeId,
434            _depth: u8,
435            _edge_filter: Option<&EdgeFilter>,
436        ) -> NodeDbResult<SubGraph> {
437            Ok(SubGraph::empty())
438        }
439
440        async fn graph_insert_edge(
441            &self,
442            from: &NodeId,
443            to: &NodeId,
444            edge_type: &str,
445            _properties: Option<Document>,
446        ) -> NodeDbResult<EdgeId> {
447            Ok(EdgeId::from_components(
448                from.as_str(),
449                to.as_str(),
450                edge_type,
451            ))
452        }
453
454        async fn graph_delete_edge(&self, _edge_id: &EdgeId) -> NodeDbResult<()> {
455            Ok(())
456        }
457
458        async fn document_get(
459            &self,
460            _collection: &str,
461            id: &str,
462        ) -> NodeDbResult<Option<Document>> {
463            let mut doc = Document::new(id);
464            doc.set("title", Value::String("test".into()));
465            Ok(Some(doc))
466        }
467
468        async fn document_put(&self, _collection: &str, _doc: Document) -> NodeDbResult<()> {
469            Ok(())
470        }
471
472        async fn document_delete(&self, _collection: &str, _id: &str) -> NodeDbResult<()> {
473            Ok(())
474        }
475
476        async fn execute_sql(&self, _query: &str, _params: &[Value]) -> NodeDbResult<QueryResult> {
477            Ok(QueryResult::empty())
478        }
479    }
480
481    /// Verify the trait is object-safe (can be used as `dyn NodeDb`).
482    #[test]
483    fn trait_is_object_safe() {
484        fn _accepts_dyn(_db: &dyn NodeDb) {}
485        let db = MockDb;
486        _accepts_dyn(&db);
487    }
488
489    /// Verify the trait can be wrapped in `Arc<dyn NodeDb>`.
490    #[test]
491    fn trait_works_with_arc() {
492        use std::sync::Arc;
493        let db: Arc<dyn NodeDb> = Arc::new(MockDb);
494        // Just verify it compiles — the Arc<dyn> pattern is the primary API.
495        let _ = db;
496    }
497
498    #[tokio::test]
499    async fn mock_vector_search() {
500        let db = MockDb;
501        let results = db
502            .vector_search("embeddings", &[0.1, 0.2, 0.3], 5, None)
503            .await
504            .unwrap();
505        assert_eq!(results.len(), 1);
506        assert_eq!(results[0].id, "vec-1");
507        assert!(results[0].distance < 1.0);
508    }
509
510    #[tokio::test]
511    async fn mock_vector_insert_and_delete() {
512        let db = MockDb;
513        db.vector_insert("coll", "v1", &[1.0, 2.0], None)
514            .await
515            .unwrap();
516        db.vector_delete("coll", "v1").await.unwrap();
517    }
518
519    #[tokio::test]
520    async fn mock_graph_operations() {
521        let db = MockDb;
522        let start = NodeId::new("alice");
523        let subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
524        assert_eq!(subgraph.node_count(), 0);
525
526        let from = NodeId::new("alice");
527        let to = NodeId::new("bob");
528        let edge_id = db
529            .graph_insert_edge(&from, &to, "KNOWS", None)
530            .await
531            .unwrap();
532        assert_eq!(edge_id.as_str(), "alice--KNOWS-->bob");
533
534        db.graph_delete_edge(&edge_id).await.unwrap();
535    }
536
537    #[tokio::test]
538    async fn mock_document_operations() {
539        let db = MockDb;
540        let doc = db.document_get("notes", "n1").await.unwrap().unwrap();
541        assert_eq!(doc.id, "n1");
542        assert_eq!(doc.get_str("title"), Some("test"));
543
544        let mut new_doc = Document::new("n2");
545        new_doc.set("body", Value::String("hello".into()));
546        db.document_put("notes", new_doc).await.unwrap();
547
548        db.document_delete("notes", "n1").await.unwrap();
549    }
550
551    #[tokio::test]
552    async fn mock_execute_sql() {
553        let db = MockDb;
554        let result = db.execute_sql("SELECT 1", &[]).await.unwrap();
555        assert_eq!(result.row_count(), 0);
556    }
557
558    /// Verify the full "one API, any runtime" pattern from the TDD.
559    #[tokio::test]
560    async fn unified_api_pattern() {
561        use std::sync::Arc;
562
563        // This is the pattern from NodeDB.md:
564        // let db: Arc<dyn NodeDb> = Arc::new(NodeDbLite::open(...));
565        //   OR
566        // let db: Arc<dyn NodeDb> = Arc::new(NodeDbRemote::connect(...));
567        //
568        // Application code is identical either way:
569        let db: Arc<dyn NodeDb> = Arc::new(MockDb);
570
571        let results = db
572            .vector_search("knowledge_base", &[0.1, 0.2], 5, None)
573            .await
574            .unwrap();
575        assert!(!results.is_empty());
576
577        let start = NodeId::new(results[0].id.clone());
578        let _subgraph = db.graph_traverse(&start, 2, None).await.unwrap();
579
580        let doc = Document::new("note-1");
581        db.document_put("notes", doc).await.unwrap();
582    }
583}