1use std::collections::btree_map::{BTreeMap, Entry};
2use std::fmt;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use destream::{de, en};
7use futures::{future, TryFutureExt, TryStreamExt};
8use get_size::GetSize;
9use log::debug;
10
11use tc_error::*;
12use tc_scalar::Scalar;
13use tc_transact::hash::{Digest, Hash, Output, Sha256};
14use tc_transact::TxnId;
15use tc_value::Value;
16
17use super::StoreEntry;
18
19pub enum MutationPending<Txn, FE> {
20 Delete(Value),
21 Put(Txn, Value, StoreEntry<Txn, FE>),
22}
23
24#[derive(Clone, Eq, PartialEq)]
25pub enum MutationRecord {
26 Delete(Value),
27 Put(Value, Scalar),
28}
29
30impl<'a, D: Digest> Hash<D> for &'a MutationRecord {
31 fn hash(self) -> Output<D> {
32 match self {
33 MutationRecord::Delete(key) => Hash::<D>::hash(key),
34 MutationRecord::Put(key, value) => Hash::<D>::hash((key, value)),
35 }
36 }
37}
38
39#[async_trait]
40impl de::FromStream for MutationRecord {
41 type Context = ();
42
43 async fn from_stream<D: de::Decoder>(_: (), decoder: &mut D) -> Result<Self, D::Error> {
44 decoder.decode_seq(MutationVisitor).await
45 }
46}
47
48impl<'en> en::ToStream<'en> for MutationRecord {
49 fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
50 use en::IntoStream;
51
52 match self {
53 Self::Delete(key) => (key,).into_stream(encoder),
54 Self::Put(key, value) => (key, value).into_stream(encoder),
55 }
56 }
57}
58
59impl<'en> en::IntoStream<'en> for MutationRecord {
60 fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
61 match self {
62 Self::Delete(key) => (key,).into_stream(encoder),
63 Self::Put(key, value) => (key, value).into_stream(encoder),
64 }
65 }
66}
67
68impl fmt::Debug for MutationRecord {
69 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70 match self {
71 Self::Delete(key) => write!(f, "DELETE {:?}", key),
72 Self::Put(key, value) => write!(f, "PUT {:?} <- {:?}", key, value),
73 }
74 }
75}
76
77struct MutationVisitor;
78
79#[async_trait]
80impl de::Visitor for MutationVisitor {
81 type Value = MutationRecord;
82
83 fn expecting() -> &'static str {
84 "a mutation record"
85 }
86
87 async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
88 let key = seq
89 .next_element(())
90 .await?
91 .ok_or_else(|| de::Error::invalid_length(0, Self::expecting()))?;
92
93 match seq.next_element(()).await? {
94 Some(value) => Ok(MutationRecord::Put(key, value)),
95 None => Ok(MutationRecord::Delete(key)),
96 }
97 }
98}
99
100#[derive(Clone, Eq, PartialEq)]
102pub struct ChainBlock {
103 last_hash: Bytes,
104 pub mutations: BTreeMap<TxnId, Vec<MutationRecord>>,
105}
106
107impl GetSize for ChainBlock {
108 fn get_size(&self) -> usize {
109 let size = self
110 .mutations
111 .iter()
112 .map(|txn| txn.get_size())
113 .sum::<usize>();
114
115 self.last_hash.len() + size
116 }
117}
118
119impl ChainBlock {
120 fn txn_hash<'a, R>(txn_id: &TxnId, mutations: R) -> Output<Sha256>
121 where
122 R: IntoIterator<Item = &'a MutationRecord>,
123 {
124 let mut mutation_hasher = Sha256::new();
125 mutation_hasher.update(Hash::<Sha256>::hash(txn_id));
126
127 for mutation in mutations {
128 mutation_hasher.update(Hash::<Sha256>::hash(mutation));
129 }
130
131 mutation_hasher.finalize()
132 }
133
134 pub fn hash<'a, M, R>(last_hash: &'a [u8], mutations: M) -> Output<Sha256>
136 where
137 M: IntoIterator<Item = (&'a TxnId, &'a R)> + 'a,
138 &'a R: IntoIterator<Item = &'a MutationRecord> + 'a,
139 {
140 let mut hasher = Sha256::new();
141 hasher.update(last_hash);
142
143 let mut txn_hasher = Sha256::new();
144 for (txn_id, mutations) in mutations {
145 txn_hasher.update(Self::txn_hash(txn_id, mutations));
146 }
147
148 hasher.update(txn_hasher.finalize());
149 hasher.finalize()
150 }
151
152 pub fn pending_hash<'a, M, R, P>(
154 last_hash: &'a [u8],
155 mutations: M,
156 txn_id: &TxnId,
157 pending: P,
158 ) -> Output<Sha256>
159 where
160 M: IntoIterator<Item = (&'a TxnId, &'a R)> + 'a,
161 &'a R: IntoIterator<Item = &'a MutationRecord> + 'a,
162 P: IntoIterator<Item = &'a MutationRecord> + 'a,
163 {
164 let mut hasher = Sha256::new();
165 hasher.update(last_hash);
166
167 let mut txn_hasher = Sha256::new();
168 for (txn_id, mutations) in mutations {
169 txn_hasher.update(Self::txn_hash(txn_id, mutations));
170 }
171
172 txn_hasher.update(Self::txn_hash(txn_id, pending));
173
174 hasher.update(txn_hasher.finalize());
175 hasher.finalize()
176 }
177
178 pub fn new<H: Into<Bytes>>(hash: H) -> Self {
180 Self {
181 last_hash: hash.into(),
182 mutations: BTreeMap::new(),
183 }
184 }
185
186 pub fn with_txn<H: Into<Bytes>>(hash: H, txn_id: TxnId) -> Self {
188 let mut mutations = BTreeMap::new();
189 mutations.insert(txn_id, Vec::new());
190
191 Self {
192 last_hash: hash.into(),
193 mutations,
194 }
195 }
196
197 pub fn with_mutations(hash: Bytes, mutations: BTreeMap<TxnId, Vec<MutationRecord>>) -> Self {
199 Self {
200 last_hash: hash,
201 mutations,
202 }
203 }
204
205 pub(super) fn append(&mut self, txn_id: TxnId, mutation: MutationRecord) {
207 match self.mutations.entry(txn_id) {
208 Entry::Vacant(entry) => {
209 entry.insert(vec![mutation]);
210 }
211 Entry::Occupied(mut entry) => {
212 entry.get_mut().push(mutation);
213 }
214 }
215 }
216
217 pub fn append_delete(&mut self, txn_id: TxnId, key: Value) {
219 self.append(txn_id, MutationRecord::Delete(key))
220 }
221
222 pub fn append_put(&mut self, txn_id: TxnId, key: Value, value: Scalar) {
224 debug!("ChainBlock::append_put {} <- {:?}", key, value);
225 self.append(txn_id, MutationRecord::Put(key, value))
226 }
227
228 pub fn current_hash(&self) -> Output<Sha256> {
230 Self::hash(&self.last_hash, &self.mutations)
231 }
232
233 pub fn last_hash(&self) -> &Bytes {
235 &self.last_hash
236 }
237
238 pub async fn size(&self) -> TCResult<usize> {
241 let encoded = tbon::en::encode(self)
242 .map_err(|cause| internal!("TBON encoding error").consume(cause))?;
243
244 encoded
245 .map_err(|cause| internal!("TBON encoding error").consume(cause))
246 .try_fold(0, |size, chunk| future::ready(Ok(size + chunk.len())))
247 .await
248 }
249}
250
251#[async_trait]
252impl de::FromStream for ChainBlock {
253 type Context = ();
254
255 async fn from_stream<D: de::Decoder>(context: (), decoder: &mut D) -> Result<Self, D::Error> {
256 de::FromStream::from_stream(context, decoder)
257 .map_ok(|(hash, mutations)| Self {
258 last_hash: hash,
259 mutations,
260 })
261 .map_err(|e| de::Error::custom(format!("failed to decode ChainBlock: {}", e)))
262 .await
263 }
264}
265
266impl<'en> en::IntoStream<'en> for ChainBlock {
267 fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
268 en::IntoStream::into_stream((self.last_hash, self.mutations), encoder)
269 }
270}
271
272impl<'en> en::ToStream<'en> for ChainBlock {
273 fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
274 en::IntoStream::into_stream((&self.last_hash, &self.mutations), encoder)
275 }
276}
277
278#[cfg(debug_assertions)]
279impl fmt::Debug for ChainBlock {
280 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
281 writeln!(f, "chain block:")?;
282 writeln!(f, "\thash: {}", hex::encode(&self.last_hash))?;
283 writeln!(f, "\tentries: {}", self.mutations.len())?;
284
285 for (txn_id, mutations) in &self.mutations {
286 writeln!(f, "\t\t{}: {:?}", txn_id, mutations)?;
287 }
288
289 Ok(())
290 }
291}
292
293#[cfg(not(debug_assertions))]
294impl fmt::Debug for ChainBlock {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 write!(
297 f,
298 "(a Chain block starting at hash {} with {} entries)",
299 hex::encode(&self.last_hash),
300 self.mutations.len()
301 )
302 }
303}