1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
use async_trait::async_trait; use sled::{ transaction::{ConflictableTransactionError, TransactionError}, IVec, }; use super::{err_into, error::StorageError, SledStorage}; use crate::{MutResult, Row, Schema, StoreMut}; macro_rules! try_into { ($self: expr, $expr: expr) => { match $expr.map_err(err_into) { Err(e) => { return Err(($self, e)); } Ok(v) => v, } }; } macro_rules! transaction { ($self: expr, $expr: expr) => {{ let result = $self.tree.transaction($expr).map_err(|e| match e { TransactionError::Abort(e) => e, TransactionError::Storage(e) => StorageError::Sled(e).into(), }); match result { Ok(_) => Ok(($self, ())), Err(e) => Err(($self, e)), } }}; } #[async_trait(?Send)] impl StoreMut<IVec> for SledStorage { async fn insert_schema(self, schema: &Schema) -> MutResult<Self, ()> { let key = format!("schema/{}", schema.table_name); let key = key.as_bytes(); let value = try_into!(self, bincode::serialize(schema)); try_into!(self, self.tree.insert(key, value)); Ok((self, ())) } async fn delete_schema(self, table_name: &str) -> MutResult<Self, ()> { let prefix = format!("data/{}/", table_name); let tree = &self.tree; for item in tree.scan_prefix(prefix.as_bytes()) { let (key, _) = try_into!(self, item); try_into!(self, tree.remove(key)); } let key = format!("schema/{}", table_name); try_into!(self, tree.remove(key)); Ok((self, ())) } async fn insert_data(self, table_name: &str, rows: Vec<Row>) -> MutResult<Self, ()> { transaction!(self, |tree| { for row in rows.iter() { let id = tree.generate_id()?; let id = id.to_be_bytes(); let prefix = format!("data/{}/", table_name); let bytes = prefix .into_bytes() .into_iter() .chain(id.iter().copied()) .collect::<Vec<_>>(); let key = IVec::from(bytes); let value = bincode::serialize(row) .map_err(err_into) .map_err(ConflictableTransactionError::Abort)?; tree.insert(key, value)?; } Ok(()) }) } async fn update_data(self, rows: Vec<(IVec, Row)>) -> MutResult<Self, ()> { transaction!(self, |tree| { for (key, row) in rows.iter() { let value = bincode::serialize(row) .map_err(err_into) .map_err(ConflictableTransactionError::Abort)?; tree.insert(key, value)?; } Ok(()) }) } async fn delete_data(self, keys: Vec<IVec>) -> MutResult<Self, ()> { transaction!(self, |tree| { for key in keys.iter() { tree.remove(key)?; } Ok(()) }) } }