simple-triplestore 0.1.1

A simple graph database for storing triples with support for custom node and edge properties.
Documentation
use serde::{de::DeserializeOwned, Serialize};
use sled::transaction::{ConflictableTransactionError, Transactional};

use crate::{
    prelude::*,
    traits::{IdType, Mergeable, Property},
    MergeError, Triple,
};

use super::{SledTripleStore, SledTripleStoreError};

impl<
        Id: IdType,
        NodeProps: Property + Mergeable + Serialize + DeserializeOwned,
        EdgeProps: Property + Mergeable + Serialize + DeserializeOwned,
    > TripleStoreMerge<Id, NodeProps, EdgeProps> for SledTripleStore<Id, NodeProps, EdgeProps>
{
    fn merge<E: std::fmt::Debug>(
        &mut self,
        other: impl TripleStore<Id, NodeProps, EdgeProps, Error = E>,
    ) -> Result<(), MergeError<Self::Error, E>> {
        let (other_nodes, other_edges) = other.into_iter_nodes(crate::EdgeOrder::SPO);

        for r in other_nodes {
            let (id, data) = r.map_err(|e| MergeError::Right(e))?;
            self.merge_node(id, data).map_err(|e| MergeError::Left(e))?;
        }

        for r in other_edges {
            let (id, other_edge_props) = r.map_err(|e| MergeError::Right(e))?;
            self.merge_edge(id, other_edge_props)
                .map_err(|e| MergeError::Left(e))?;
        }

        Ok(())
    }

    fn merge_node(&mut self, node: Id, props: NodeProps) -> Result<(), Self::Error> {
        let key_bytes = &node.to_be_bytes();

        (&self.node_props)
            .transaction(|node_props| {
                match node_props.get(key_bytes.as_ref())? {
                    None => {
                        node_props.insert(
                            key_bytes.as_ref(),
                            bincode::serialize(&props).map_err(|e| {
                                ConflictableTransactionError::Abort(
                                    SledTripleStoreError::SerializationError(e),
                                )
                            })?,
                        )?;
                    }

                    Some(existing_value) => {
                        let mut old_props: NodeProps = bincode::deserialize(&existing_value)
                            .map_err(|e| {
                                ConflictableTransactionError::Abort(
                                    SledTripleStoreError::SerializationError(e),
                                )
                            })?;
                        old_props.merge(props.clone());
                        node_props.insert(
                            key_bytes.as_ref(),
                            bincode::serialize(&old_props).map_err(|e| {
                                ConflictableTransactionError::Abort(
                                    SledTripleStoreError::SerializationError(e),
                                )
                            })?,
                        )?;
                    }
                }

                Ok(())
            })
            .map_err(|e| match e {
                sled::transaction::TransactionError::Abort(e) => e,
                sled::transaction::TransactionError::Storage(e) => {
                    SledTripleStoreError::SledError(e)
                }
            })?;
        Ok(())
    }

    fn merge_edge(&mut self, triple: Triple<Id>, props: EdgeProps) -> Result<(), Self::Error> {
        let new_edge_props_id = self.id_generator.fresh().to_be_bytes();
        let spo_triple = Id::encode_spo_triple(&triple);
        let pos_triple = Id::encode_pos_triple(&triple);
        let osp_triple = Id::encode_osp_triple(&triple);

        (
            &self.edge_props,
            &self.spo_data,
            &self.pos_data,
            &self.osp_data,
        )
            .transaction(|(edge_props, spo_data, pos_data, osp_data)| {
                let old_edge_props_id =
                    spo_data.insert(spo_triple.as_ref(), new_edge_props_id.as_ref())?;
                pos_data.insert(pos_triple.as_ref(), new_edge_props_id.as_ref())?;
                osp_data.insert(osp_triple.as_ref(), new_edge_props_id.as_ref())?;

                match old_edge_props_id {
                    None => {
                        edge_props.insert(
                            new_edge_props_id.as_ref(),
                            bincode::serialize(&props).map_err(|e| {
                                ConflictableTransactionError::Abort(
                                    SledTripleStoreError::SerializationError(e),
                                )
                            })?,
                        )?;
                    }

                    Some(old_edge_props_id) => {
                        let old_value = edge_props.remove(old_edge_props_id)?;
                        match old_value {
                            None => {
                                edge_props.insert(
                                    new_edge_props_id.as_ref(),
                                    bincode::serialize(&props).map_err(|e| {
                                        ConflictableTransactionError::Abort(
                                            SledTripleStoreError::SerializationError(e),
                                        )
                                    })?,
                                )?;
                            }

                            Some(old_value) => {
                                let mut old_props: EdgeProps = bincode::deserialize(&old_value)
                                    .map_err(|e| {
                                        ConflictableTransactionError::Abort(
                                            SledTripleStoreError::SerializationError(e),
                                        )
                                    })?;
                                old_props.merge(props.clone());
                                edge_props.insert(
                                    new_edge_props_id.as_ref(),
                                    bincode::serialize(&old_props).map_err(|e| {
                                        ConflictableTransactionError::Abort(
                                            SledTripleStoreError::SerializationError(e),
                                        )
                                    })?,
                                )?;
                            }
                        }
                    }
                }

                Ok(())
            })
            .map_err(|e| match e {
                sled::transaction::TransactionError::Abort(e) => e,
                sled::transaction::TransactionError::Storage(e) => {
                    SledTripleStoreError::SledError(e)
                }
            })?;

        Ok(())
    }
}

#[cfg(test)]
mod test {
    use crate::{SledTripleStore, UlidIdGenerator};

    #[test]
    fn test_merge() {
        let mut temp_dirs = Vec::new();
        crate::conformance::merge::test_merge(|| {
            let (temp_dir, db) = crate::sled::create_test_db().expect("ok");
            let sled_db = SledTripleStore::new(&db, UlidIdGenerator::new()).expect("ok");
            temp_dirs.push((temp_dir, db));
            sled_db
        });
    }

    #[test]
    fn test_merge_node() {
        let mut temp_dirs = Vec::new();
        crate::conformance::merge::test_merge_node(|| {
            let (temp_dir, db) = crate::sled::create_test_db().expect("ok");
            let sled_db = SledTripleStore::new(&db, UlidIdGenerator::new()).expect("ok");
            temp_dirs.push((temp_dir, db));
            sled_db
        });
    }

    #[test]
    fn test_merge_edge() {
        let mut temp_dirs = Vec::new();
        crate::conformance::merge::test_merge_edge(|| {
            let (temp_dir, db) = crate::sled::create_test_db().expect("ok");
            let sled_db = SledTripleStore::new(&db, UlidIdGenerator::new()).expect("ok");
            temp_dirs.push((temp_dir, db));
            sled_db
        });
    }
}