Skip to main content

meshdb_executor/
writer.rs

1use crate::error::Result;
2use meshdb_core::{Edge, EdgeId, Node, NodeId};
3use meshdb_storage::StorageEngine;
4
5/// Sink for mutating graph operations produced by the executor. Isolates
6/// write-side concerns from read-side traversal so we can plug in either a
7/// direct-to-storage writer (single-node mode) or a Raft-backed writer that
8/// proposes each mutation through consensus (cluster mode).
9///
10/// Methods are sync because the executor's iterator model is sync.
11/// Async-backed implementations (e.g. the Raft writer) bridge via
12/// `Handle::block_on`; callers must run the executor inside
13/// `spawn_blocking` so they don't stall the tokio runtime.
14pub trait GraphWriter {
15    fn put_node(&self, node: &Node) -> Result<()>;
16    fn put_edge(&self, edge: &Edge) -> Result<()>;
17    fn delete_edge(&self, id: EdgeId) -> Result<()>;
18    fn detach_delete_node(&self, id: NodeId) -> Result<()>;
19
20    /// Declare a new property index. Default impl errors so remote
21    /// writers (Raft, routing) that don't yet support cluster-aware
22    /// DDL surface the limitation immediately. Storage-backed writers
23    /// override this via the blanket impl.
24    fn create_property_index(&self, _label: &str, _property: &str) -> Result<()> {
25        Err(crate::error::Error::Unsupported(
26            "property-index DDL is not supported by this writer".into(),
27        ))
28    }
29
30    /// Tear down a property index. Mirrors [`Self::create_property_index`].
31    fn drop_property_index(&self, _label: &str, _property: &str) -> Result<()> {
32        Err(crate::error::Error::Unsupported(
33            "property-index DDL is not supported by this writer".into(),
34        ))
35    }
36
37    /// Snapshot the currently-registered property indexes as
38    /// `(label, property)` pairs for `SHOW INDEXES`. Default impl
39    /// returns an empty list — remote writers will wire real
40    /// fan-out in Phase C.
41    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
42        Ok(Vec::new())
43    }
44}
45
46/// Blanket impl: any **sized** type that implements [`StorageEngine`]
47/// is automatically a [`GraphWriter`]. See the matching
48/// [`crate::reader::GraphReader`] blanket for rationale, and
49/// [`StorageWriterAdapter`] for the trait-object adapter.
50impl<T: StorageEngine> GraphWriter for T {
51    fn put_node(&self, node: &Node) -> Result<()> {
52        StorageEngine::put_node(self, node)?;
53        Ok(())
54    }
55
56    fn put_edge(&self, edge: &Edge) -> Result<()> {
57        StorageEngine::put_edge(self, edge)?;
58        Ok(())
59    }
60
61    fn delete_edge(&self, id: EdgeId) -> Result<()> {
62        if StorageEngine::get_edge(self, id)?.is_some() {
63            StorageEngine::delete_edge(self, id)?;
64        }
65        Ok(())
66    }
67
68    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
69        StorageEngine::detach_delete_node(self, id)?;
70        Ok(())
71    }
72
73    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
74        StorageEngine::create_property_index(self, label, property)?;
75        Ok(())
76    }
77
78    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
79        StorageEngine::drop_property_index(self, label, property)?;
80        Ok(())
81    }
82
83    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
84        Ok(StorageEngine::list_property_indexes(self)
85            .into_iter()
86            .map(|s| (s.label, s.property))
87            .collect())
88    }
89}
90
91/// Adapter that lets a `&dyn StorageEngine` act as a `GraphWriter`.
92/// See [`crate::reader::StorageReaderAdapter`] for the rationale.
93pub struct StorageWriterAdapter<'a>(pub &'a dyn StorageEngine);
94
95impl GraphWriter for StorageWriterAdapter<'_> {
96    fn put_node(&self, node: &Node) -> Result<()> {
97        self.0.put_node(node)?;
98        Ok(())
99    }
100
101    fn put_edge(&self, edge: &Edge) -> Result<()> {
102        self.0.put_edge(edge)?;
103        Ok(())
104    }
105
106    fn delete_edge(&self, id: EdgeId) -> Result<()> {
107        if self.0.get_edge(id)?.is_some() {
108            self.0.delete_edge(id)?;
109        }
110        Ok(())
111    }
112
113    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
114        self.0.detach_delete_node(id)?;
115        Ok(())
116    }
117
118    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
119        self.0.create_property_index(label, property)?;
120        Ok(())
121    }
122
123    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
124        self.0.drop_property_index(label, property)?;
125        Ok(())
126    }
127
128    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
129        Ok(self
130            .0
131            .list_property_indexes()
132            .into_iter()
133            .map(|s| (s.label, s.property))
134            .collect())
135    }
136}