essential_transaction_storage/
lib.rs1#![deny(missing_docs)]
2use essential_state_read_vm::StateRead;
7use essential_storage::{key_range, QueryState, StateStorage};
8use essential_types::{ContentAddress, Key, Value, Word};
9use futures::future::FutureExt;
10use imbl::HashMap;
11use std::{pin::Pin, sync::Arc};
12use thiserror::Error;
13
14#[cfg(test)]
15mod tests;
16
17pub trait Transaction {
19 fn transaction(self) -> TransactionStorage<Self>
21 where
22 Self: StateStorage + Sized;
23}
24
25impl<S> Transaction for S
26where
27 S: StateStorage,
28{
29 fn transaction(self) -> TransactionStorage<Self> {
30 TransactionStorage::new(self)
31 }
32}
33
34#[derive(Clone)]
36pub struct TransactionStorage<S> {
37 state: HashMap<ContentAddress, HashMap<Key, Mutation>>,
38 storage: S,
39}
40
41#[derive(Clone)]
43pub struct TransactionView<S>(Arc<TransactionStorage<S>>);
44
45#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
46enum Mutation {
47 Insert(Vec<Word>),
48 Delete,
49}
50
51#[derive(Debug, Error)]
53pub enum TransactionViewError {
54 #[error("failed to read")]
56 ReadError(#[from] anyhow::Error),
57}
58
59impl<S> StateRead for TransactionView<S>
60where
61 S: StateStorage + Clone + Send + Sync + 'static,
62{
63 type Error = TransactionViewError;
64
65 type Future =
66 Pin<Box<dyn std::future::Future<Output = Result<Vec<Vec<Word>>, Self::Error>> + Send>>;
67
68 fn key_range(&self, contract_addr: ContentAddress, key: Key, num_words: usize) -> Self::Future {
69 let storage = self.clone();
70 async move { key_range(&storage, contract_addr, key, num_words).await }.boxed()
71 }
72}
73
74impl<S> TransactionStorage<S> {
75 pub fn new(storage: S) -> Self {
77 Self {
78 state: Default::default(),
79 storage,
80 }
81 }
82
83 pub fn view(&self) -> TransactionView<S>
85 where
86 S: Clone,
87 {
88 TransactionView(Arc::new(Self {
89 state: self.state.clone(),
90 storage: self.storage.clone(),
91 }))
92 }
93
94 pub fn snapshot(&self) -> Self
96 where
97 S: Clone,
98 {
99 Self {
100 state: self.state.clone(),
101 storage: self.storage.clone(),
102 }
103 }
104
105 pub async fn commit(&mut self) -> anyhow::Result<()>
107 where
108 S: StateStorage,
109 {
110 let updates = self.state.clone().into_iter().flat_map(|(address, m)| {
111 m.into_iter().map(move |(key, mutation)| {
112 (
113 address.clone(),
114 key,
115 match mutation {
116 Mutation::Insert(v) => v,
117 Mutation::Delete => Vec::new(),
118 },
119 )
120 })
121 });
122 self.storage.update_state_batch(updates).await?;
123 self.state.clear();
124 Ok(())
125 }
126
127 pub fn into_updates(self) -> impl Iterator<Item = (ContentAddress, Key, Value)> {
129 self.state.into_iter().flat_map(|(address, m)| {
130 m.into_iter().map(move |(key, mutation)| {
131 (
132 address.clone(),
133 key,
134 match mutation {
135 Mutation::Insert(v) => v,
136 Mutation::Delete => Vec::new(),
137 },
138 )
139 })
140 })
141 }
142
143 pub fn rollback(&mut self) {
145 self.state.clear()
146 }
147
148 pub async fn update_state(
150 &mut self,
151 address: &ContentAddress,
152 key: Key,
153 value: Vec<Word>,
154 ) -> anyhow::Result<Vec<Word>>
155 where
156 S: QueryState,
157 {
158 let m = self.state.entry(address.clone()).or_default();
159 let entry = m.entry(key.clone());
160 let mutation = match entry {
161 imbl::hashmap::Entry::Occupied(mut v) => {
162 if value.is_empty() {
163 Some(v.insert(Mutation::Delete))
164 } else {
165 Some(v.insert(Mutation::Insert(value)))
166 }
167 }
168 imbl::hashmap::Entry::Vacant(v) => {
169 if value.is_empty() {
170 v.insert(Mutation::Delete);
171 } else {
172 v.insert(Mutation::Insert(value));
173 }
174 None
175 }
176 };
177
178 match mutation {
179 Some(Mutation::Insert(v)) => Ok(v),
180 Some(Mutation::Delete) => Ok(Vec::new()),
181 None => self.storage.query_state(address, &key).await,
182 }
183 }
184
185 pub fn apply_state(&mut self, address: &ContentAddress, key: Key, value: Vec<Word>) {
187 let m = self.state.entry(address.clone()).or_default();
188 let entry = m.entry(key);
189 match entry {
190 imbl::hashmap::Entry::Occupied(mut v) => {
191 if value.is_empty() {
192 v.insert(Mutation::Delete);
193 } else {
194 v.insert(Mutation::Insert(value));
195 }
196 }
197 imbl::hashmap::Entry::Vacant(v) => {
198 if value.is_empty() {
199 v.insert(Mutation::Delete);
200 } else {
201 v.insert(Mutation::Insert(value));
202 }
203 }
204 }
205 }
206
207 pub async fn query_state(
209 &self,
210 address: &ContentAddress,
211 key: &Key,
212 ) -> anyhow::Result<Vec<Word>>
213 where
214 S: QueryState,
215 {
216 let mutation = self.state.get(address).and_then(|m| m.get(key)).cloned();
217 match mutation {
218 Some(Mutation::Insert(v)) => Ok(v),
219 Some(Mutation::Delete) => Ok(Vec::new()),
220 None => self.storage.query_state(address, key).await,
221 }
222 }
223}
224
225impl<S> QueryState for TransactionView<S>
226where
227 S: QueryState + Clone + Send + Sync + 'static,
228{
229 async fn query_state(&self, address: &ContentAddress, key: &Key) -> anyhow::Result<Vec<Word>> {
230 self.0.query_state(address, key).await
231 }
232}