Skip to main content

meshdb_executor/
writer.rs

1use crate::error::Result;
2use meshdb_core::{Edge, EdgeId, Node, NodeId};
3use meshdb_storage::{
4    ConstraintScope, PropertyConstraintKind, PropertyConstraintSpec, StorageEngine,
5};
6
7/// `(label, property)` pair identifying a single-property node point
8/// / spatial index. Always length-1 on `property` side for now —
9/// composite spatial indexes are a separate design and get their own
10/// spec shape if they ship.
11pub type PointIndexSpec = (String, String);
12
13/// `(label, properties)` pair identifying a node property index.
14/// `properties` is a `Vec<String>` so composite indexes round-trip
15/// through the reader/writer boundary without truncating —
16/// previously this was `(String, String)` and `SHOW INDEXES`
17/// silently dropped everything past the first property.
18pub type NodeIndexSpec = (String, Vec<String>);
19
20/// `(edge_type, properties)` pair identifying an edge property
21/// index. Relationship-scope analogue of [`NodeIndexSpec`].
22pub type EdgeIndexSpec = (String, Vec<String>);
23
24/// Sink for mutating graph operations produced by the executor. Isolates
25/// write-side concerns from read-side traversal so we can plug in either a
26/// direct-to-storage writer (single-node mode) or a Raft-backed writer that
27/// proposes each mutation through consensus (cluster mode).
28///
29/// Methods are sync because the executor's iterator model is sync.
30/// Async-backed implementations (e.g. the Raft writer) bridge via
31/// `Handle::block_on`; callers must run the executor inside
32/// `spawn_blocking` so they don't stall the tokio runtime.
33pub trait GraphWriter {
34    fn put_node(&self, node: &Node) -> Result<()>;
35    fn put_edge(&self, edge: &Edge) -> Result<()>;
36    fn delete_edge(&self, id: EdgeId) -> Result<()>;
37    fn detach_delete_node(&self, id: NodeId) -> Result<()>;
38
39    /// Declare a new property index. `properties` is a slice so the
40    /// composite form (`CREATE INDEX FOR (n:L) ON (n.a, n.b)`) fits
41    /// the same surface as single-property. Default impl errors so
42    /// remote writers that don't yet support cluster-aware DDL
43    /// surface the limitation immediately; storage-backed writers
44    /// override via the blanket impl.
45    fn create_property_index(&self, _label: &str, _properties: &[String]) -> Result<()> {
46        Err(crate::error::Error::Unsupported(
47            "property-index DDL is not supported by this writer".into(),
48        ))
49    }
50
51    /// Tear down a property index. Mirrors [`Self::create_property_index`].
52    fn drop_property_index(&self, _label: &str, _properties: &[String]) -> Result<()> {
53        Err(crate::error::Error::Unsupported(
54            "property-index DDL is not supported by this writer".into(),
55        ))
56    }
57
58    /// Snapshot the currently-registered property indexes as
59    /// `(label, property)` pairs for `SHOW INDEXES`. Default impl
60    /// returns an empty list — remote writers will wire real
61    /// fan-out in Phase C.
62    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
63        Ok(Vec::new())
64    }
65
66    /// Relationship-scope analogue of
67    /// [`Self::create_property_index`].
68    fn create_edge_property_index(&self, _edge_type: &str, _properties: &[String]) -> Result<()> {
69        Err(crate::error::Error::Unsupported(
70            "edge-property-index DDL is not supported by this writer".into(),
71        ))
72    }
73
74    /// Tear down an edge property index. Mirrors
75    /// [`Self::create_edge_property_index`].
76    fn drop_edge_property_index(&self, _edge_type: &str, _properties: &[String]) -> Result<()> {
77        Err(crate::error::Error::Unsupported(
78            "edge-property-index DDL is not supported by this writer".into(),
79        ))
80    }
81
82    /// Snapshot the currently-registered edge property indexes as
83    /// `(edge_type, property)` pairs. Default impl returns an empty
84    /// list.
85    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
86        Ok(Vec::new())
87    }
88
89    /// Declare a point / spatial index on `(label, property)`.
90    /// Default impl errors — remote writers opt in via the blanket
91    /// `StorageEngine` impl or a cluster-aware override.
92    fn create_point_index(&self, _label: &str, _property: &str) -> Result<()> {
93        Err(crate::error::Error::Unsupported(
94            "point-index DDL is not supported by this writer".into(),
95        ))
96    }
97
98    /// Tear down a point index. Mirrors
99    /// [`Self::create_point_index`].
100    fn drop_point_index(&self, _label: &str, _property: &str) -> Result<()> {
101        Err(crate::error::Error::Unsupported(
102            "point-index DDL is not supported by this writer".into(),
103        ))
104    }
105
106    /// Snapshot the currently-registered point indexes as
107    /// `(label, property)` pairs. Default impl returns empty.
108    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
109        Ok(Vec::new())
110    }
111
112    /// Relationship-scope analogue of
113    /// [`Self::create_point_index`].
114    fn create_edge_point_index(&self, _edge_type: &str, _property: &str) -> Result<()> {
115        Err(crate::error::Error::Unsupported(
116            "edge-point-index DDL is not supported by this writer".into(),
117        ))
118    }
119
120    /// Tear down an edge point index. Mirrors
121    /// [`Self::create_edge_point_index`].
122    fn drop_edge_point_index(&self, _edge_type: &str, _property: &str) -> Result<()> {
123        Err(crate::error::Error::Unsupported(
124            "edge-point-index DDL is not supported by this writer".into(),
125        ))
126    }
127
128    /// Snapshot the currently-registered edge point indexes as
129    /// `(edge_type, property)` pairs.
130    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
131        Ok(Vec::new())
132    }
133
134    /// Declare a new property constraint. Default impl errors so
135    /// remote writers that haven't plumbed constraint DDL yet surface
136    /// the limitation immediately — storage-backed writers override
137    /// via the blanket impl. `properties` is a list to accommodate
138    /// composite kinds (`NodeKey`); single-property kinds pass a
139    /// one-element slice.
140    fn create_property_constraint(
141        &self,
142        _name: Option<&str>,
143        _scope: &ConstraintScope,
144        _properties: &[String],
145        _kind: PropertyConstraintKind,
146        _if_not_exists: bool,
147    ) -> Result<PropertyConstraintSpec> {
148        Err(crate::error::Error::Unsupported(
149            "constraint DDL is not supported by this writer".into(),
150        ))
151    }
152
153    /// Tear down a constraint by name. Mirrors
154    /// [`Self::create_property_constraint`].
155    fn drop_property_constraint(&self, _name: &str, _if_exists: bool) -> Result<()> {
156        Err(crate::error::Error::Unsupported(
157            "constraint DDL is not supported by this writer".into(),
158        ))
159    }
160
161    /// Snapshot the currently-registered constraints for
162    /// `SHOW CONSTRAINTS`. Default impl returns an empty list.
163    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
164        Ok(Vec::new())
165    }
166
167    /// Install (or replace) an `apoc.trigger.*` registration.
168    /// `spec_blob` is the serde-encoded trigger spec — opaque
169    /// to the writer; the storage layer just persists the
170    /// bytes. Cluster-aware writers buffer this as a
171    /// `GraphCommand::InstallTrigger` so the commit path
172    /// replicates it; direct-to-storage writers persist
173    /// immediately.
174    fn install_trigger(&self, _name: &str, _spec_blob: &[u8]) -> Result<()> {
175        Err(crate::error::Error::Unsupported(
176            "trigger DDL is not supported by this writer".into(),
177        ))
178    }
179
180    /// Drop a registered trigger by name. Idempotent — a
181    /// missing name is not an error so routing-mode rollback
182    /// can re-issue partially-applied DROPs.
183    fn drop_trigger(&self, _name: &str) -> Result<()> {
184        Err(crate::error::Error::Unsupported(
185            "trigger DDL is not supported by this writer".into(),
186        ))
187    }
188}
189
190/// Blanket impl: any **sized** type that implements [`StorageEngine`]
191/// is automatically a [`GraphWriter`]. See the matching
192/// [`crate::reader::GraphReader`] blanket for rationale, and
193/// [`StorageWriterAdapter`] for the trait-object adapter.
194impl<T: StorageEngine> GraphWriter for T {
195    fn put_node(&self, node: &Node) -> Result<()> {
196        StorageEngine::put_node(self, node)?;
197        Ok(())
198    }
199
200    fn put_edge(&self, edge: &Edge) -> Result<()> {
201        StorageEngine::put_edge(self, edge)?;
202        Ok(())
203    }
204
205    fn delete_edge(&self, id: EdgeId) -> Result<()> {
206        if StorageEngine::get_edge(self, id)?.is_some() {
207            StorageEngine::delete_edge(self, id)?;
208        }
209        Ok(())
210    }
211
212    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
213        StorageEngine::detach_delete_node(self, id)?;
214        Ok(())
215    }
216
217    fn create_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
218        StorageEngine::create_property_index_composite(self, label, properties)?;
219        Ok(())
220    }
221
222    fn drop_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
223        StorageEngine::drop_property_index_composite(self, label, properties)?;
224        Ok(())
225    }
226
227    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
228        Ok(StorageEngine::list_property_indexes(self)
229            .into_iter()
230            .map(|s| (s.label, s.properties))
231            .collect())
232    }
233
234    fn create_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
235        StorageEngine::create_edge_property_index_composite(self, edge_type, properties)?;
236        Ok(())
237    }
238
239    fn drop_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
240        StorageEngine::drop_edge_property_index_composite(self, edge_type, properties)?;
241        Ok(())
242    }
243
244    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
245        Ok(StorageEngine::list_edge_property_indexes(self)
246            .into_iter()
247            .map(|s| (s.edge_type, s.properties))
248            .collect())
249    }
250
251    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
252        StorageEngine::create_point_index(self, label, property)?;
253        Ok(())
254    }
255
256    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
257        StorageEngine::drop_point_index(self, label, property)?;
258        Ok(())
259    }
260
261    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
262        Ok(StorageEngine::list_point_indexes(self)
263            .into_iter()
264            .map(|s| (s.label, s.property))
265            .collect())
266    }
267
268    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
269        StorageEngine::create_edge_point_index(self, edge_type, property)?;
270        Ok(())
271    }
272
273    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
274        StorageEngine::drop_edge_point_index(self, edge_type, property)?;
275        Ok(())
276    }
277
278    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
279        Ok(StorageEngine::list_edge_point_indexes(self)
280            .into_iter()
281            .map(|s| (s.edge_type, s.property))
282            .collect())
283    }
284
285    fn create_property_constraint(
286        &self,
287        name: Option<&str>,
288        scope: &ConstraintScope,
289        properties: &[String],
290        kind: PropertyConstraintKind,
291        if_not_exists: bool,
292    ) -> Result<PropertyConstraintSpec> {
293        Ok(StorageEngine::create_property_constraint(
294            self,
295            name,
296            scope,
297            properties,
298            kind,
299            if_not_exists,
300        )?)
301    }
302
303    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
304        StorageEngine::drop_property_constraint(self, name, if_exists)?;
305        Ok(())
306    }
307
308    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
309        Ok(StorageEngine::list_property_constraints(self))
310    }
311
312    fn install_trigger(&self, name: &str, spec_blob: &[u8]) -> Result<()> {
313        StorageEngine::put_trigger(self, name, spec_blob)?;
314        Ok(())
315    }
316
317    fn drop_trigger(&self, name: &str) -> Result<()> {
318        StorageEngine::delete_trigger(self, name)?;
319        Ok(())
320    }
321}
322
323/// Adapter that lets a `&dyn StorageEngine` act as a `GraphWriter`.
324/// See [`crate::reader::StorageReaderAdapter`] for the rationale.
325pub struct StorageWriterAdapter<'a>(pub &'a dyn StorageEngine);
326
327impl GraphWriter for StorageWriterAdapter<'_> {
328    fn put_node(&self, node: &Node) -> Result<()> {
329        self.0.put_node(node)?;
330        Ok(())
331    }
332
333    fn put_edge(&self, edge: &Edge) -> Result<()> {
334        self.0.put_edge(edge)?;
335        Ok(())
336    }
337
338    fn delete_edge(&self, id: EdgeId) -> Result<()> {
339        if self.0.get_edge(id)?.is_some() {
340            self.0.delete_edge(id)?;
341        }
342        Ok(())
343    }
344
345    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
346        self.0.detach_delete_node(id)?;
347        Ok(())
348    }
349
350    fn create_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
351        self.0.create_property_index_composite(label, properties)?;
352        Ok(())
353    }
354
355    fn drop_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
356        self.0.drop_property_index_composite(label, properties)?;
357        Ok(())
358    }
359
360    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
361        Ok(self
362            .0
363            .list_property_indexes()
364            .into_iter()
365            .map(|s| (s.label, s.properties))
366            .collect())
367    }
368
369    fn create_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
370        self.0
371            .create_edge_property_index_composite(edge_type, properties)?;
372        Ok(())
373    }
374
375    fn drop_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
376        self.0
377            .drop_edge_property_index_composite(edge_type, properties)?;
378        Ok(())
379    }
380
381    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
382        Ok(self
383            .0
384            .list_edge_property_indexes()
385            .into_iter()
386            .map(|s| (s.edge_type, s.properties))
387            .collect())
388    }
389
390    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
391        self.0.create_point_index(label, property)?;
392        Ok(())
393    }
394
395    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
396        self.0.drop_point_index(label, property)?;
397        Ok(())
398    }
399
400    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
401        Ok(self
402            .0
403            .list_point_indexes()
404            .into_iter()
405            .map(|s| (s.label, s.property))
406            .collect())
407    }
408
409    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
410        self.0.create_edge_point_index(edge_type, property)?;
411        Ok(())
412    }
413
414    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
415        self.0.drop_edge_point_index(edge_type, property)?;
416        Ok(())
417    }
418
419    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
420        Ok(self
421            .0
422            .list_edge_point_indexes()
423            .into_iter()
424            .map(|s| (s.edge_type, s.property))
425            .collect())
426    }
427
428    fn create_property_constraint(
429        &self,
430        name: Option<&str>,
431        scope: &ConstraintScope,
432        properties: &[String],
433        kind: PropertyConstraintKind,
434        if_not_exists: bool,
435    ) -> Result<PropertyConstraintSpec> {
436        Ok(self
437            .0
438            .create_property_constraint(name, scope, properties, kind, if_not_exists)?)
439    }
440
441    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
442        self.0.drop_property_constraint(name, if_exists)?;
443        Ok(())
444    }
445
446    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
447        Ok(self.0.list_property_constraints())
448    }
449
450    fn install_trigger(&self, name: &str, spec_blob: &[u8]) -> Result<()> {
451        self.0.put_trigger(name, spec_blob)?;
452        Ok(())
453    }
454
455    fn drop_trigger(&self, name: &str) -> Result<()> {
456        self.0.delete_trigger(name)?;
457        Ok(())
458    }
459}