1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
//! # Datacake LMDB
//!
//! A pre-built implementation of the datacake-eventual-consistency `Storage` trait, this allows you to set up
//! a persistent cluster immediately without any hassle of implementing a correct store.
//!
//! For more info see <https://github.com/lnx-search/datacake>
//!
//! ## Example
//!
//! ```rust
//! use std::env::temp_dir;
//! use anyhow::Result;
//! use uuid::Uuid;
//! use datacake_eventual_consistency::EventuallyConsistentStoreExtension;
//! use datacake_node::{
//! ConnectionConfig,
//! Consistency,
//! DCAwareSelector,
//! DatacakeNodeBuilder,
//! };
//! use datacake_lmdb::LmdbStorage;
//!
//! static KEYSPACE: &str = "lmdb-store";
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! tracing_subscriber::fmt::init();
//!
//! let temp_dir = temp_dir().join(Uuid::new_v4().to_string());
//! std::fs::create_dir_all(&temp_dir)?;
//!
//! let store = LmdbStorage::open(temp_dir).await?;
//!
//! let addr = test_helper::get_unused_addr();
//! let connection_cfg = ConnectionConfig::new(addr, addr, Vec::<String>::new());
//!
//! let node = DatacakeNodeBuilder::<DCAwareSelector>::new(1, connection_cfg)
//! .connect()
//! .await?;
//! let store = node
//! .add_extension(EventuallyConsistentStoreExtension::new(store))
//! .await?;
//!
//! let handle = store.handle();
//!
//! handle.put(KEYSPACE, 1, b"Hello, world".to_vec(), Consistency::All).await?;
//!
//! let doc = handle
//! .get(KEYSPACE, 1)
//! .await?
//! .expect("Document should not be none");
//! assert_eq!(doc.id(), 1);
//! assert_eq!(doc.data(), b"Hello, world");
//!
//! handle.del(KEYSPACE, 1, Consistency::All).await?;
//! let doc = handle.get(KEYSPACE, 1).await?;
//! assert!(doc.is_none(), "No document should not exist!");
//!
//! handle.del(KEYSPACE, 2, Consistency::All).await?;
//! let doc = handle.get(KEYSPACE, 2).await?;
//! assert!(doc.is_none(), "No document should not exist!");
//!
//! node.shutdown().await;
//!
//! Ok(())
//! }
//! ```
mod db;
use std::path::Path;
use async_trait::async_trait;
use datacake_crdt::{HLCTimestamp, Key};
use datacake_eventual_consistency::{
BulkMutationError,
Document,
DocumentMetadata,
Storage,
};
pub use db::StorageHandle;
pub use heed;
pub use heed::Error;
pub struct LmdbStorage {
db: StorageHandle,
}
impl LmdbStorage {
/// Connects to the LMDB database.
/// This spawns 1 background threads with actions being executed within that thread.
/// This approach reduces the affect of writes blocking reads and vice-versa.
pub async fn open(path: impl AsRef<Path>) -> heed::Result<Self> {
let db = StorageHandle::open(path).await?;
Ok(Self { db })
}
/// Access to the LMDB storage handle.
///
/// This allows you to access the LMDB db directly
/// including it's environment, but it does
/// not provide any access to the KV databases used
/// by the datacake storage layer.
pub fn handle(&self) -> &StorageHandle {
&self.db
}
}
#[async_trait]
impl Storage for LmdbStorage {
type Error = heed::Error;
type DocsIter = Box<dyn Iterator<Item = Document>>;
type MetadataIter = Box<dyn Iterator<Item = (Key, HLCTimestamp, bool)>>;
async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error> {
self.handle().keyspace_list().await
}
async fn iter_metadata(
&self,
keyspace: &str,
) -> Result<Self::MetadataIter, Self::Error> {
self.handle()
.get_metadata(keyspace)
.await
.map(|v| Box::new(v.into_iter()) as Self::MetadataIter)
}
async fn remove_tombstones(
&self,
keyspace: &str,
keys: impl Iterator<Item = Key> + Send,
) -> Result<(), BulkMutationError<Self::Error>> {
self.handle()
.remove_tombstones(keyspace, keys)
.await
.map_err(BulkMutationError::empty_with_error)
}
async fn put(&self, keyspace: &str, document: Document) -> Result<(), Self::Error> {
self.handle().put_kv(keyspace, document).await
}
async fn multi_put(
&self,
keyspace: &str,
documents: impl Iterator<Item = Document> + Send,
) -> Result<(), BulkMutationError<Self::Error>> {
self.handle()
.put_many_kv(keyspace, documents)
.await
.map_err(BulkMutationError::empty_with_error)
}
async fn mark_as_tombstone(
&self,
keyspace: &str,
doc_id: Key,
timestamp: HLCTimestamp,
) -> Result<(), Self::Error> {
self.handle()
.mark_tombstone(keyspace, doc_id, timestamp)
.await
}
async fn mark_many_as_tombstone(
&self,
keyspace: &str,
documents: impl Iterator<Item = DocumentMetadata> + Send,
) -> Result<(), BulkMutationError<Self::Error>> {
self.handle()
.mark_many_as_tombstone(keyspace, documents)
.await
.map_err(BulkMutationError::empty_with_error)
}
async fn get(
&self,
keyspace: &str,
doc_id: Key,
) -> Result<Option<Document>, Self::Error> {
self.handle().get(keyspace, doc_id).await
}
async fn multi_get(
&self,
keyspace: &str,
doc_ids: impl Iterator<Item = Key> + Send,
) -> Result<Self::DocsIter, Self::Error> {
self.handle()
.get_many(keyspace, doc_ids)
.await
.map(|v| Box::new(v.into_iter()) as Self::DocsIter)
}
}
#[cfg(test)]
mod tests {
use std::env::temp_dir;
use datacake_eventual_consistency::test_suite;
use uuid::Uuid;
use crate::LmdbStorage;
#[tokio::test]
async fn test_storage_logic() {
let path = temp_dir().join(Uuid::new_v4().to_string());
std::fs::create_dir_all(&path).unwrap();
let storage = LmdbStorage::open(path).await.expect("Open DB");
test_suite::run_test_suite(storage).await;
}
}