essential_transaction_storage/
lib.rs

1#![deny(missing_docs)]
2//! # Transaction Storage
3//!
4//! Provides a transactional layer on top of a state storage.
5
6use essential_state_read_vm::StateRead;
7use essential_storage::{key_range, QueryState, StateStorage};
8use essential_types::{ContentAddress, Key, Value, Word};
9use futures::future::FutureExt;
10use imbl::HashMap;
11use std::{pin::Pin, sync::Arc};
12use thiserror::Error;
13
14#[cfg(test)]
15mod tests;
16
17/// Utility trait to provide transactional semantics on top of a state storage.
18pub trait Transaction {
19    /// Start a new transaction.
20    fn transaction(self) -> TransactionStorage<Self>
21    where
22        Self: StateStorage + Sized;
23}
24
25impl<S> Transaction for S
26where
27    S: StateStorage,
28{
29    fn transaction(self) -> TransactionStorage<Self> {
30        TransactionStorage::new(self)
31    }
32}
33
34/// Wrapper around a state storage that provides transactional semantics.
35#[derive(Clone)]
36pub struct TransactionStorage<S> {
37    state: HashMap<ContentAddress, HashMap<Key, Mutation>>,
38    storage: S,
39}
40
41/// View of a transaction.
42#[derive(Clone)]
43pub struct TransactionView<S>(Arc<TransactionStorage<S>>);
44
45#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
46enum Mutation {
47    Insert(Vec<Word>),
48    Delete,
49}
50
51/// Error for transaction view.
52#[derive(Debug, Error)]
53pub enum TransactionViewError {
54    /// Error during read
55    #[error("failed to read")]
56    ReadError(#[from] anyhow::Error),
57}
58
59impl<S> StateRead for TransactionView<S>
60where
61    S: StateStorage + Clone + Send + Sync + 'static,
62{
63    type Error = TransactionViewError;
64
65    type Future =
66        Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<Word>>, Self::Error>> + Send>>;
67
68    fn key_range(&self, contract_addr: ContentAddress, key: Key, num_words: usize) -> Self::Future {
69        let storage = self.clone();
70        async move { key_range(&storage, contract_addr, key, num_words).await }.boxed()
71    }
72}
73
74impl<S> TransactionStorage<S> {
75    /// Create a new transaction storage around the given state storage.
76    pub fn new(storage: S) -> Self {
77        Self {
78            state: Default::default(),
79            storage,
80        }
81    }
82
83    /// Create a view of this transaction.
84    pub fn view(&self) -> TransactionView<S>
85    where
86        S: Clone,
87    {
88        TransactionView(Arc::new(Self {
89            state: self.state.clone(),
90            storage: self.storage.clone(),
91        }))
92    }
93
94    /// Take a snapshow of this transaction.
95    pub fn snapshot(&self) -> Self
96    where
97        S: Clone,
98    {
99        Self {
100            state: self.state.clone(),
101            storage: self.storage.clone(),
102        }
103    }
104
105    /// Commit the transaction.
106    pub async fn commit(&mut self) -> anyhow::Result<()>
107    where
108        S: StateStorage,
109    {
110        let updates = self.state.clone().into_iter().flat_map(|(address, m)| {
111            m.into_iter().map(move |(key, mutation)| {
112                (
113                    address.clone(),
114                    key,
115                    match mutation {
116                        Mutation::Insert(v) => v,
117                        Mutation::Delete => Vec::new(),
118                    },
119                )
120            })
121        });
122        self.storage.update_state_batch(updates).await?;
123        self.state.clear();
124        Ok(())
125    }
126
127    /// Extract the updates from this transaction.
128    pub fn into_updates(self) -> impl Iterator<Item = (ContentAddress, Key, Value)> {
129        self.state.into_iter().flat_map(|(address, m)| {
130            m.into_iter().map(move |(key, mutation)| {
131                (
132                    address.clone(),
133                    key,
134                    match mutation {
135                        Mutation::Insert(v) => v,
136                        Mutation::Delete => Vec::new(),
137                    },
138                )
139            })
140        })
141    }
142
143    /// Rollback the transaction.
144    pub fn rollback(&mut self) {
145        self.state.clear()
146    }
147
148    /// Update the state of this transaction.
149    pub async fn update_state(
150        &mut self,
151        address: &ContentAddress,
152        key: Key,
153        value: Vec<Word>,
154    ) -> anyhow::Result<Vec<Word>>
155    where
156        S: QueryState,
157    {
158        let m = self.state.entry(address.clone()).or_default();
159        let entry = m.entry(key.clone());
160        let mutation = match entry {
161            imbl::hashmap::Entry::Occupied(mut v) => {
162                if value.is_empty() {
163                    Some(v.insert(Mutation::Delete))
164                } else {
165                    Some(v.insert(Mutation::Insert(value)))
166                }
167            }
168            imbl::hashmap::Entry::Vacant(v) => {
169                if value.is_empty() {
170                    v.insert(Mutation::Delete);
171                } else {
172                    v.insert(Mutation::Insert(value));
173                }
174                None
175            }
176        };
177
178        match mutation {
179            Some(Mutation::Insert(v)) => Ok(v),
180            Some(Mutation::Delete) => Ok(Vec::new()),
181            None => self.storage.query_state(address, &key).await,
182        }
183    }
184
185    /// Apply state changes without returning the previous value.
186    pub fn apply_state(&mut self, address: &ContentAddress, key: Key, value: Vec<Word>) {
187        let m = self.state.entry(address.clone()).or_default();
188        let entry = m.entry(key);
189        match entry {
190            imbl::hashmap::Entry::Occupied(mut v) => {
191                if value.is_empty() {
192                    v.insert(Mutation::Delete);
193                } else {
194                    v.insert(Mutation::Insert(value));
195                }
196            }
197            imbl::hashmap::Entry::Vacant(v) => {
198                if value.is_empty() {
199                    v.insert(Mutation::Delete);
200                } else {
201                    v.insert(Mutation::Insert(value));
202                }
203            }
204        }
205    }
206
207    /// Query the state of this transaction.
208    pub async fn query_state(
209        &self,
210        address: &ContentAddress,
211        key: &Key,
212    ) -> anyhow::Result<Vec<Word>>
213    where
214        S: QueryState,
215    {
216        let mutation = self.state.get(address).and_then(|m| m.get(key)).cloned();
217        match mutation {
218            Some(Mutation::Insert(v)) => Ok(v),
219            Some(Mutation::Delete) => Ok(Vec::new()),
220            None => self.storage.query_state(address, key).await,
221        }
222    }
223}
224
225impl<S> QueryState for TransactionView<S>
226where
227    S: QueryState + Clone + Send + Sync + 'static,
228{
229    async fn query_state(&self, address: &ContentAddress, key: &Key) -> anyhow::Result<Vec<Word>> {
230        self.0.query_state(address, key).await
231    }
232}