Skip to main content

meshdb_executor/
writer.rs

1use crate::error::Result;
2use meshdb_core::{Edge, EdgeId, Node, NodeId};
3use meshdb_storage::{
4    ConstraintScope, PropertyConstraintKind, PropertyConstraintSpec, StorageEngine,
5};
6
7/// `(label, property)` pair identifying a single-property node point
8/// / spatial index. Always length-1 on `property` side for now —
9/// composite spatial indexes are a separate design and get their own
10/// spec shape if they ship.
11pub type PointIndexSpec = (String, String);
12
13/// `(label, properties)` pair identifying a node property index.
14/// `properties` is a `Vec<String>` so composite indexes round-trip
15/// through the reader/writer boundary without truncating —
16/// previously this was `(String, String)` and `SHOW INDEXES`
17/// silently dropped everything past the first property.
18pub type NodeIndexSpec = (String, Vec<String>);
19
20/// `(edge_type, properties)` pair identifying an edge property
21/// index. Relationship-scope analogue of [`NodeIndexSpec`].
22pub type EdgeIndexSpec = (String, Vec<String>);
23
24/// Sink for mutating graph operations produced by the executor. Isolates
25/// write-side concerns from read-side traversal so we can plug in either a
26/// direct-to-storage writer (single-node mode) or a Raft-backed writer that
27/// proposes each mutation through consensus (cluster mode).
28///
29/// Methods are sync because the executor's iterator model is sync.
30/// Async-backed implementations (e.g. the Raft writer) bridge via
31/// `Handle::block_on`; callers must run the executor inside
32/// `spawn_blocking` so they don't stall the tokio runtime.
33pub trait GraphWriter {
34    fn put_node(&self, node: &Node) -> Result<()>;
35    fn put_edge(&self, edge: &Edge) -> Result<()>;
36    fn delete_edge(&self, id: EdgeId) -> Result<()>;
37    fn detach_delete_node(&self, id: NodeId) -> Result<()>;
38
39    /// Declare a new property index. `properties` is a slice so the
40    /// composite form (`CREATE INDEX FOR (n:L) ON (n.a, n.b)`) fits
41    /// the same surface as single-property. Default impl errors so
42    /// remote writers that don't yet support cluster-aware DDL
43    /// surface the limitation immediately; storage-backed writers
44    /// override via the blanket impl.
45    fn create_property_index(&self, _label: &str, _properties: &[String]) -> Result<()> {
46        Err(crate::error::Error::Unsupported(
47            "property-index DDL is not supported by this writer".into(),
48        ))
49    }
50
51    /// Tear down a property index. Mirrors [`Self::create_property_index`].
52    fn drop_property_index(&self, _label: &str, _properties: &[String]) -> Result<()> {
53        Err(crate::error::Error::Unsupported(
54            "property-index DDL is not supported by this writer".into(),
55        ))
56    }
57
58    /// Snapshot the currently-registered property indexes as
59    /// `(label, property)` pairs for `SHOW INDEXES`. Default impl
60    /// returns an empty list — remote writers will wire real
61    /// fan-out in Phase C.
62    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
63        Ok(Vec::new())
64    }
65
66    /// Relationship-scope analogue of
67    /// [`Self::create_property_index`].
68    fn create_edge_property_index(&self, _edge_type: &str, _properties: &[String]) -> Result<()> {
69        Err(crate::error::Error::Unsupported(
70            "edge-property-index DDL is not supported by this writer".into(),
71        ))
72    }
73
74    /// Tear down an edge property index. Mirrors
75    /// [`Self::create_edge_property_index`].
76    fn drop_edge_property_index(&self, _edge_type: &str, _properties: &[String]) -> Result<()> {
77        Err(crate::error::Error::Unsupported(
78            "edge-property-index DDL is not supported by this writer".into(),
79        ))
80    }
81
82    /// Snapshot the currently-registered edge property indexes as
83    /// `(edge_type, property)` pairs. Default impl returns an empty
84    /// list.
85    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
86        Ok(Vec::new())
87    }
88
89    /// Declare a point / spatial index on `(label, property)`.
90    /// Default impl errors — remote writers opt in via the blanket
91    /// `StorageEngine` impl or a cluster-aware override.
92    fn create_point_index(&self, _label: &str, _property: &str) -> Result<()> {
93        Err(crate::error::Error::Unsupported(
94            "point-index DDL is not supported by this writer".into(),
95        ))
96    }
97
98    /// Tear down a point index. Mirrors
99    /// [`Self::create_point_index`].
100    fn drop_point_index(&self, _label: &str, _property: &str) -> Result<()> {
101        Err(crate::error::Error::Unsupported(
102            "point-index DDL is not supported by this writer".into(),
103        ))
104    }
105
106    /// Snapshot the currently-registered point indexes as
107    /// `(label, property)` pairs. Default impl returns empty.
108    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
109        Ok(Vec::new())
110    }
111
112    /// Relationship-scope analogue of
113    /// [`Self::create_point_index`].
114    fn create_edge_point_index(&self, _edge_type: &str, _property: &str) -> Result<()> {
115        Err(crate::error::Error::Unsupported(
116            "edge-point-index DDL is not supported by this writer".into(),
117        ))
118    }
119
120    /// Tear down an edge point index. Mirrors
121    /// [`Self::create_edge_point_index`].
122    fn drop_edge_point_index(&self, _edge_type: &str, _property: &str) -> Result<()> {
123        Err(crate::error::Error::Unsupported(
124            "edge-point-index DDL is not supported by this writer".into(),
125        ))
126    }
127
128    /// Snapshot the currently-registered edge point indexes as
129    /// `(edge_type, property)` pairs.
130    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
131        Ok(Vec::new())
132    }
133
134    /// Declare a new property constraint. Default impl errors so
135    /// remote writers that haven't plumbed constraint DDL yet surface
136    /// the limitation immediately — storage-backed writers override
137    /// via the blanket impl. `properties` is a list to accommodate
138    /// composite kinds (`NodeKey`); single-property kinds pass a
139    /// one-element slice.
140    fn create_property_constraint(
141        &self,
142        _name: Option<&str>,
143        _scope: &ConstraintScope,
144        _properties: &[String],
145        _kind: PropertyConstraintKind,
146        _if_not_exists: bool,
147    ) -> Result<PropertyConstraintSpec> {
148        Err(crate::error::Error::Unsupported(
149            "constraint DDL is not supported by this writer".into(),
150        ))
151    }
152
153    /// Tear down a constraint by name. Mirrors
154    /// [`Self::create_property_constraint`].
155    fn drop_property_constraint(&self, _name: &str, _if_exists: bool) -> Result<()> {
156        Err(crate::error::Error::Unsupported(
157            "constraint DDL is not supported by this writer".into(),
158        ))
159    }
160
161    /// Snapshot the currently-registered constraints for
162    /// `SHOW CONSTRAINTS`. Default impl returns an empty list.
163    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
164        Ok(Vec::new())
165    }
166}
167
168/// Blanket impl: any **sized** type that implements [`StorageEngine`]
169/// is automatically a [`GraphWriter`]. See the matching
170/// [`crate::reader::GraphReader`] blanket for rationale, and
171/// [`StorageWriterAdapter`] for the trait-object adapter.
172impl<T: StorageEngine> GraphWriter for T {
173    fn put_node(&self, node: &Node) -> Result<()> {
174        StorageEngine::put_node(self, node)?;
175        Ok(())
176    }
177
178    fn put_edge(&self, edge: &Edge) -> Result<()> {
179        StorageEngine::put_edge(self, edge)?;
180        Ok(())
181    }
182
183    fn delete_edge(&self, id: EdgeId) -> Result<()> {
184        if StorageEngine::get_edge(self, id)?.is_some() {
185            StorageEngine::delete_edge(self, id)?;
186        }
187        Ok(())
188    }
189
190    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
191        StorageEngine::detach_delete_node(self, id)?;
192        Ok(())
193    }
194
195    fn create_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
196        StorageEngine::create_property_index_composite(self, label, properties)?;
197        Ok(())
198    }
199
200    fn drop_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
201        StorageEngine::drop_property_index_composite(self, label, properties)?;
202        Ok(())
203    }
204
205    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
206        Ok(StorageEngine::list_property_indexes(self)
207            .into_iter()
208            .map(|s| (s.label, s.properties))
209            .collect())
210    }
211
212    fn create_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
213        StorageEngine::create_edge_property_index_composite(self, edge_type, properties)?;
214        Ok(())
215    }
216
217    fn drop_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
218        StorageEngine::drop_edge_property_index_composite(self, edge_type, properties)?;
219        Ok(())
220    }
221
222    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
223        Ok(StorageEngine::list_edge_property_indexes(self)
224            .into_iter()
225            .map(|s| (s.edge_type, s.properties))
226            .collect())
227    }
228
229    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
230        StorageEngine::create_point_index(self, label, property)?;
231        Ok(())
232    }
233
234    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
235        StorageEngine::drop_point_index(self, label, property)?;
236        Ok(())
237    }
238
239    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
240        Ok(StorageEngine::list_point_indexes(self)
241            .into_iter()
242            .map(|s| (s.label, s.property))
243            .collect())
244    }
245
246    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
247        StorageEngine::create_edge_point_index(self, edge_type, property)?;
248        Ok(())
249    }
250
251    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
252        StorageEngine::drop_edge_point_index(self, edge_type, property)?;
253        Ok(())
254    }
255
256    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
257        Ok(StorageEngine::list_edge_point_indexes(self)
258            .into_iter()
259            .map(|s| (s.edge_type, s.property))
260            .collect())
261    }
262
263    fn create_property_constraint(
264        &self,
265        name: Option<&str>,
266        scope: &ConstraintScope,
267        properties: &[String],
268        kind: PropertyConstraintKind,
269        if_not_exists: bool,
270    ) -> Result<PropertyConstraintSpec> {
271        Ok(StorageEngine::create_property_constraint(
272            self,
273            name,
274            scope,
275            properties,
276            kind,
277            if_not_exists,
278        )?)
279    }
280
281    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
282        StorageEngine::drop_property_constraint(self, name, if_exists)?;
283        Ok(())
284    }
285
286    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
287        Ok(StorageEngine::list_property_constraints(self))
288    }
289}
290
291/// Adapter that lets a `&dyn StorageEngine` act as a `GraphWriter`.
292/// See [`crate::reader::StorageReaderAdapter`] for the rationale.
293pub struct StorageWriterAdapter<'a>(pub &'a dyn StorageEngine);
294
295impl GraphWriter for StorageWriterAdapter<'_> {
296    fn put_node(&self, node: &Node) -> Result<()> {
297        self.0.put_node(node)?;
298        Ok(())
299    }
300
301    fn put_edge(&self, edge: &Edge) -> Result<()> {
302        self.0.put_edge(edge)?;
303        Ok(())
304    }
305
306    fn delete_edge(&self, id: EdgeId) -> Result<()> {
307        if self.0.get_edge(id)?.is_some() {
308            self.0.delete_edge(id)?;
309        }
310        Ok(())
311    }
312
313    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
314        self.0.detach_delete_node(id)?;
315        Ok(())
316    }
317
318    fn create_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
319        self.0.create_property_index_composite(label, properties)?;
320        Ok(())
321    }
322
323    fn drop_property_index(&self, label: &str, properties: &[String]) -> Result<()> {
324        self.0.drop_property_index_composite(label, properties)?;
325        Ok(())
326    }
327
328    fn list_property_indexes(&self) -> Result<Vec<NodeIndexSpec>> {
329        Ok(self
330            .0
331            .list_property_indexes()
332            .into_iter()
333            .map(|s| (s.label, s.properties))
334            .collect())
335    }
336
337    fn create_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
338        self.0
339            .create_edge_property_index_composite(edge_type, properties)?;
340        Ok(())
341    }
342
343    fn drop_edge_property_index(&self, edge_type: &str, properties: &[String]) -> Result<()> {
344        self.0
345            .drop_edge_property_index_composite(edge_type, properties)?;
346        Ok(())
347    }
348
349    fn list_edge_property_indexes(&self) -> Result<Vec<EdgeIndexSpec>> {
350        Ok(self
351            .0
352            .list_edge_property_indexes()
353            .into_iter()
354            .map(|s| (s.edge_type, s.properties))
355            .collect())
356    }
357
358    fn create_point_index(&self, label: &str, property: &str) -> Result<()> {
359        self.0.create_point_index(label, property)?;
360        Ok(())
361    }
362
363    fn drop_point_index(&self, label: &str, property: &str) -> Result<()> {
364        self.0.drop_point_index(label, property)?;
365        Ok(())
366    }
367
368    fn list_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
369        Ok(self
370            .0
371            .list_point_indexes()
372            .into_iter()
373            .map(|s| (s.label, s.property))
374            .collect())
375    }
376
377    fn create_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
378        self.0.create_edge_point_index(edge_type, property)?;
379        Ok(())
380    }
381
382    fn drop_edge_point_index(&self, edge_type: &str, property: &str) -> Result<()> {
383        self.0.drop_edge_point_index(edge_type, property)?;
384        Ok(())
385    }
386
387    fn list_edge_point_indexes(&self) -> Result<Vec<PointIndexSpec>> {
388        Ok(self
389            .0
390            .list_edge_point_indexes()
391            .into_iter()
392            .map(|s| (s.edge_type, s.property))
393            .collect())
394    }
395
396    fn create_property_constraint(
397        &self,
398        name: Option<&str>,
399        scope: &ConstraintScope,
400        properties: &[String],
401        kind: PropertyConstraintKind,
402        if_not_exists: bool,
403    ) -> Result<PropertyConstraintSpec> {
404        Ok(self
405            .0
406            .create_property_constraint(name, scope, properties, kind, if_not_exists)?)
407    }
408
409    fn drop_property_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
410        self.0.drop_property_constraint(name, if_exists)?;
411        Ok(())
412    }
413
414    fn list_property_constraints(&self) -> Result<Vec<PropertyConstraintSpec>> {
415        Ok(self.0.list_property_constraints())
416    }
417}