tc_chain/data/
block.rs

1use std::collections::btree_map::{BTreeMap, Entry};
2use std::fmt;
3
4use async_trait::async_trait;
5use bytes::Bytes;
6use destream::{de, en};
7use futures::{future, TryFutureExt, TryStreamExt};
8use get_size::GetSize;
9use log::debug;
10
11use tc_error::*;
12use tc_scalar::Scalar;
13use tc_transact::hash::{Digest, Hash, Output, Sha256};
14use tc_transact::TxnId;
15use tc_value::Value;
16
17use super::StoreEntry;
18
19pub enum MutationPending<Txn, FE> {
20    Delete(Value),
21    Put(Txn, Value, StoreEntry<Txn, FE>),
22}
23
24#[derive(Clone, Eq, PartialEq)]
25pub enum MutationRecord {
26    Delete(Value),
27    Put(Value, Scalar),
28}
29
30impl<'a, D: Digest> Hash<D> for &'a MutationRecord {
31    fn hash(self) -> Output<D> {
32        match self {
33            MutationRecord::Delete(key) => Hash::<D>::hash(key),
34            MutationRecord::Put(key, value) => Hash::<D>::hash((key, value)),
35        }
36    }
37}
38
39#[async_trait]
40impl de::FromStream for MutationRecord {
41    type Context = ();
42
43    async fn from_stream<D: de::Decoder>(_: (), decoder: &mut D) -> Result<Self, D::Error> {
44        decoder.decode_seq(MutationVisitor).await
45    }
46}
47
48impl<'en> en::ToStream<'en> for MutationRecord {
49    fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
50        use en::IntoStream;
51
52        match self {
53            Self::Delete(key) => (key,).into_stream(encoder),
54            Self::Put(key, value) => (key, value).into_stream(encoder),
55        }
56    }
57}
58
59impl<'en> en::IntoStream<'en> for MutationRecord {
60    fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
61        match self {
62            Self::Delete(key) => (key,).into_stream(encoder),
63            Self::Put(key, value) => (key, value).into_stream(encoder),
64        }
65    }
66}
67
68impl fmt::Debug for MutationRecord {
69    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70        match self {
71            Self::Delete(key) => write!(f, "DELETE {:?}", key),
72            Self::Put(key, value) => write!(f, "PUT {:?} <- {:?}", key, value),
73        }
74    }
75}
76
77struct MutationVisitor;
78
79#[async_trait]
80impl de::Visitor for MutationVisitor {
81    type Value = MutationRecord;
82
83    fn expecting() -> &'static str {
84        "a mutation record"
85    }
86
87    async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
88        let key = seq
89            .next_element(())
90            .await?
91            .ok_or_else(|| de::Error::invalid_length(0, Self::expecting()))?;
92
93        match seq.next_element(()).await? {
94            Some(value) => Ok(MutationRecord::Put(key, value)),
95            None => Ok(MutationRecord::Delete(key)),
96        }
97    }
98}
99
100/// A single filesystem block belonging to a `Chain`.
101#[derive(Clone, Eq, PartialEq)]
102pub struct ChainBlock {
103    last_hash: Bytes,
104    pub mutations: BTreeMap<TxnId, Vec<MutationRecord>>,
105}
106
107impl GetSize for ChainBlock {
108    fn get_size(&self) -> usize {
109        let size = self
110            .mutations
111            .iter()
112            .map(|txn| txn.get_size())
113            .sum::<usize>();
114
115        self.last_hash.len() + size
116    }
117}
118
119impl ChainBlock {
120    fn txn_hash<'a, R>(txn_id: &TxnId, mutations: R) -> Output<Sha256>
121    where
122        R: IntoIterator<Item = &'a MutationRecord>,
123    {
124        let mut mutation_hasher = Sha256::new();
125        mutation_hasher.update(Hash::<Sha256>::hash(txn_id));
126
127        for mutation in mutations {
128            mutation_hasher.update(Hash::<Sha256>::hash(mutation));
129        }
130
131        mutation_hasher.finalize()
132    }
133
134    /// Compute the hash of a [`ChainBlock`] with the given contents
135    pub fn hash<'a, M, R>(last_hash: &'a [u8], mutations: M) -> Output<Sha256>
136    where
137        M: IntoIterator<Item = (&'a TxnId, &'a R)> + 'a,
138        &'a R: IntoIterator<Item = &'a MutationRecord> + 'a,
139    {
140        let mut hasher = Sha256::new();
141        hasher.update(last_hash);
142
143        let mut txn_hasher = Sha256::new();
144        for (txn_id, mutations) in mutations {
145            txn_hasher.update(Self::txn_hash(txn_id, mutations));
146        }
147
148        hasher.update(txn_hasher.finalize());
149        hasher.finalize()
150    }
151
152    /// Compute the hash of a [`ChainBlock`] with the given contents and pending contents
153    pub fn pending_hash<'a, M, R, P>(
154        last_hash: &'a [u8],
155        mutations: M,
156        txn_id: &TxnId,
157        pending: P,
158    ) -> Output<Sha256>
159    where
160        M: IntoIterator<Item = (&'a TxnId, &'a R)> + 'a,
161        &'a R: IntoIterator<Item = &'a MutationRecord> + 'a,
162        P: IntoIterator<Item = &'a MutationRecord> + 'a,
163    {
164        let mut hasher = Sha256::new();
165        hasher.update(last_hash);
166
167        let mut txn_hasher = Sha256::new();
168        for (txn_id, mutations) in mutations {
169            txn_hasher.update(Self::txn_hash(txn_id, mutations));
170        }
171
172        txn_hasher.update(Self::txn_hash(txn_id, pending));
173
174        hasher.update(txn_hasher.finalize());
175        hasher.finalize()
176    }
177
178    /// Return a new, empty block.
179    pub fn new<H: Into<Bytes>>(hash: H) -> Self {
180        Self {
181            last_hash: hash.into(),
182            mutations: BTreeMap::new(),
183        }
184    }
185
186    /// Return a new, empty block with an empty mutation list for the given `TxnId`.
187    pub fn with_txn<H: Into<Bytes>>(hash: H, txn_id: TxnId) -> Self {
188        let mut mutations = BTreeMap::new();
189        mutations.insert(txn_id, Vec::new());
190
191        Self {
192            last_hash: hash.into(),
193            mutations,
194        }
195    }
196
197    /// Return a new, empty block with an the given mutation list for the given `TxnId`.
198    pub fn with_mutations(hash: Bytes, mutations: BTreeMap<TxnId, Vec<MutationRecord>>) -> Self {
199        Self {
200            last_hash: hash,
201            mutations,
202        }
203    }
204
205    /// Append a [`MutationRecord`] to this [`ChainBlock`]
206    pub(super) fn append(&mut self, txn_id: TxnId, mutation: MutationRecord) {
207        match self.mutations.entry(txn_id) {
208            Entry::Vacant(entry) => {
209                entry.insert(vec![mutation]);
210            }
211            Entry::Occupied(mut entry) => {
212                entry.get_mut().push(mutation);
213            }
214        }
215    }
216
217    /// Append a DELETE op to this `ChainBlock`
218    pub fn append_delete(&mut self, txn_id: TxnId, key: Value) {
219        self.append(txn_id, MutationRecord::Delete(key))
220    }
221
222    /// Append a PUT op to the this `ChainBlock`
223    pub fn append_put(&mut self, txn_id: TxnId, key: Value, value: Scalar) {
224        debug!("ChainBlock::append_put {} <- {:?}", key, value);
225        self.append(txn_id, MutationRecord::Put(key, value))
226    }
227
228    /// The current hash of this block
229    pub fn current_hash(&self) -> Output<Sha256> {
230        Self::hash(&self.last_hash, &self.mutations)
231    }
232
233    /// The hash of the previous block in the chain
234    pub fn last_hash(&self) -> &Bytes {
235        &self.last_hash
236    }
237
238    /// The current size of this block
239    // TODO: delete
240    pub async fn size(&self) -> TCResult<usize> {
241        let encoded = tbon::en::encode(self)
242            .map_err(|cause| internal!("TBON encoding error").consume(cause))?;
243
244        encoded
245            .map_err(|cause| internal!("TBON encoding error").consume(cause))
246            .try_fold(0, |size, chunk| future::ready(Ok(size + chunk.len())))
247            .await
248    }
249}
250
251#[async_trait]
252impl de::FromStream for ChainBlock {
253    type Context = ();
254
255    async fn from_stream<D: de::Decoder>(context: (), decoder: &mut D) -> Result<Self, D::Error> {
256        de::FromStream::from_stream(context, decoder)
257            .map_ok(|(hash, mutations)| Self {
258                last_hash: hash,
259                mutations,
260            })
261            .map_err(|e| de::Error::custom(format!("failed to decode ChainBlock: {}", e)))
262            .await
263    }
264}
265
266impl<'en> en::IntoStream<'en> for ChainBlock {
267    fn into_stream<E: en::Encoder<'en>>(self, encoder: E) -> Result<E::Ok, E::Error> {
268        en::IntoStream::into_stream((self.last_hash, self.mutations), encoder)
269    }
270}
271
272impl<'en> en::ToStream<'en> for ChainBlock {
273    fn to_stream<E: en::Encoder<'en>>(&'en self, encoder: E) -> Result<E::Ok, E::Error> {
274        en::IntoStream::into_stream((&self.last_hash, &self.mutations), encoder)
275    }
276}
277
278#[cfg(debug_assertions)]
279impl fmt::Debug for ChainBlock {
280    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
281        writeln!(f, "chain block:")?;
282        writeln!(f, "\thash: {}", hex::encode(&self.last_hash))?;
283        writeln!(f, "\tentries: {}", self.mutations.len())?;
284
285        for (txn_id, mutations) in &self.mutations {
286            writeln!(f, "\t\t{}: {:?}", txn_id, mutations)?;
287        }
288
289        Ok(())
290    }
291}
292
293#[cfg(not(debug_assertions))]
294impl fmt::Debug for ChainBlock {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        write!(
297            f,
298            "(a Chain block starting at hash {} with {} entries)",
299            hex::encode(&self.last_hash),
300            self.mutations.len()
301        )
302    }
303}