meshdb-executor 0.1.0-alpha.2

Physical operators and query execution for Mesh
Documentation
use crate::error::Result;
use meshdb_core::{Edge, EdgeId, Node, NodeId};
use meshdb_storage::StorageEngine;

/// Sink for mutating graph operations produced by the executor. Isolates
/// write-side concerns from read-side traversal so we can plug in either a
/// direct-to-storage writer (single-node mode) or a Raft-backed writer that
/// proposes each mutation through consensus (cluster mode).
///
/// Methods are sync because the executor's iterator model is sync.
/// Async-backed implementations (e.g. the Raft writer) bridge via
/// `Handle::block_on`; callers must run the executor inside
/// `spawn_blocking` so they don't stall the tokio runtime.
pub trait GraphWriter {
    fn put_node(&self, node: &Node) -> Result<()>;
    fn put_edge(&self, edge: &Edge) -> Result<()>;
    fn delete_edge(&self, id: EdgeId) -> Result<()>;
    fn detach_delete_node(&self, id: NodeId) -> Result<()>;

    /// Declare a new property index. Default impl errors so remote
    /// writers (Raft, routing) that don't yet support cluster-aware
    /// DDL surface the limitation immediately. Storage-backed writers
    /// override this via the blanket impl.
    fn create_property_index(&self, _label: &str, _property: &str) -> Result<()> {
        Err(crate::error::Error::Unsupported(
            "property-index DDL is not supported by this writer".into(),
        ))
    }

    /// Tear down a property index. Mirrors [`Self::create_property_index`].
    fn drop_property_index(&self, _label: &str, _property: &str) -> Result<()> {
        Err(crate::error::Error::Unsupported(
            "property-index DDL is not supported by this writer".into(),
        ))
    }

    /// Snapshot the currently-registered property indexes as
    /// `(label, property)` pairs for `SHOW INDEXES`. Default impl
    /// returns an empty list — remote writers will wire real
    /// fan-out in Phase C.
    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
        Ok(Vec::new())
    }
}

/// Blanket impl: any **sized** type that implements [`StorageEngine`]
/// is automatically a [`GraphWriter`]. See the matching
/// [`crate::reader::GraphReader`] blanket for rationale, and
/// [`StorageWriterAdapter`] for the trait-object adapter.
impl<T: StorageEngine> GraphWriter for T {
    fn put_node(&self, node: &Node) -> Result<()> {
        StorageEngine::put_node(self, node)?;
        Ok(())
    }

    fn put_edge(&self, edge: &Edge) -> Result<()> {
        StorageEngine::put_edge(self, edge)?;
        Ok(())
    }

    fn delete_edge(&self, id: EdgeId) -> Result<()> {
        if StorageEngine::get_edge(self, id)?.is_some() {
            StorageEngine::delete_edge(self, id)?;
        }
        Ok(())
    }

    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
        StorageEngine::detach_delete_node(self, id)?;
        Ok(())
    }

    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
        StorageEngine::create_property_index(self, label, property)?;
        Ok(())
    }

    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
        StorageEngine::drop_property_index(self, label, property)?;
        Ok(())
    }

    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
        Ok(StorageEngine::list_property_indexes(self)
            .into_iter()
            .map(|s| (s.label, s.property))
            .collect())
    }
}

/// Adapter that lets a `&dyn StorageEngine` act as a `GraphWriter`.
/// See [`crate::reader::StorageReaderAdapter`] for the rationale.
pub struct StorageWriterAdapter<'a>(pub &'a dyn StorageEngine);

impl GraphWriter for StorageWriterAdapter<'_> {
    fn put_node(&self, node: &Node) -> Result<()> {
        self.0.put_node(node)?;
        Ok(())
    }

    fn put_edge(&self, edge: &Edge) -> Result<()> {
        self.0.put_edge(edge)?;
        Ok(())
    }

    fn delete_edge(&self, id: EdgeId) -> Result<()> {
        if self.0.get_edge(id)?.is_some() {
            self.0.delete_edge(id)?;
        }
        Ok(())
    }

    fn detach_delete_node(&self, id: NodeId) -> Result<()> {
        self.0.detach_delete_node(id)?;
        Ok(())
    }

    fn create_property_index(&self, label: &str, property: &str) -> Result<()> {
        self.0.create_property_index(label, property)?;
        Ok(())
    }

    fn drop_property_index(&self, label: &str, property: &str) -> Result<()> {
        self.0.drop_property_index(label, property)?;
        Ok(())
    }

    fn list_property_indexes(&self) -> Result<Vec<(String, String)>> {
        Ok(self
            .0
            .list_property_indexes()
            .into_iter()
            .map(|s| (s.label, s.property))
            .collect())
    }
}