bao_tree/io/
mod.rs

1//! Implementation of bao streaming for std io and tokio io
2use bytes::Bytes;
3
4use crate::{blake3, BlockSize, ChunkNum, ChunkRanges, TreeNode};
5
6mod error;
7
8pub use error::*;
9use range_collections::{range_set::RangeSetRange, RangeSetRef};
10
11#[cfg(feature = "tokio_fsm")]
12pub mod fsm;
13#[cfg(feature = "experimental-mixed")]
14pub mod mixed;
15pub mod outboard;
16pub mod sync;
17
18/// A parent hash pair.
19#[derive(Debug)]
20pub struct Parent {
21    /// The node in the tree for which the hashes are.
22    pub node: TreeNode,
23    /// The pair of hashes for the node.
24    pub pair: (blake3::Hash, blake3::Hash),
25}
26
27#[cfg(feature = "serde")]
28mod serde_support {
29    use serde::{ser::SerializeSeq, Deserialize, Serialize};
30
31    use super::{blake3, Parent, TreeNode};
32    impl Serialize for Parent {
33        fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
34            let (l, r) = self.pair;
35            let mut seq = serializer.serialize_seq(Some(2))?;
36            seq.serialize_element(&self.node)?;
37            seq.serialize_element(l.as_bytes())?;
38            seq.serialize_element(r.as_bytes())?;
39            seq.end()
40        }
41    }
42
43    impl<'a> Deserialize<'a> for Parent {
44        fn deserialize<D: serde::Deserializer<'a>>(deserializer: D) -> Result<Self, D::Error> {
45            struct ParentVisitor;
46            impl<'de> serde::de::Visitor<'de> for ParentVisitor {
47                type Value = Parent;
48
49                fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
50                    formatter.write_str("a parent node")
51                }
52
53                fn visit_seq<A: serde::de::SeqAccess<'de>>(
54                    self,
55                    mut seq: A,
56                ) -> Result<Self::Value, A::Error> {
57                    let node = seq.next_element::<TreeNode>()?.ok_or_else(|| {
58                        serde::de::Error::invalid_length(0, &"a parent node with 3 elements")
59                    })?;
60                    let l = seq.next_element::<[u8; 32]>()?.ok_or_else(|| {
61                        serde::de::Error::invalid_length(1, &"a parent node with 3 elements")
62                    })?;
63                    let r = seq.next_element::<[u8; 32]>()?.ok_or_else(|| {
64                        serde::de::Error::invalid_length(2, &"a parent node with 3 elements")
65                    })?;
66                    Ok(Parent {
67                        node,
68                        pair: (blake3::Hash::from(l), blake3::Hash::from(r)),
69                    })
70                }
71            }
72            deserializer.deserialize_seq(ParentVisitor)
73        }
74    }
75}
76
77/// A leaf node.
78#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
79pub struct Leaf {
80    /// The byte offset of the leaf in the file.
81    pub offset: u64,
82    /// The data of the leaf.
83    pub data: Bytes,
84}
85
86impl std::fmt::Debug for Leaf {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        f.debug_struct("Leaf")
89            .field("offset", &self.offset)
90            .field("data", &self.data.len())
91            .finish()
92    }
93}
94
95/// A content item for the bao streaming protocol.
96///
97/// After reading the initial header, the only possible items are `Parent` and
98/// `Leaf`.
99#[derive(Debug)]
100#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
101pub enum BaoContentItem {
102    /// a parent node, to update the outboard
103    Parent(Parent),
104    /// a leaf node, to write to the file
105    Leaf(Leaf),
106}
107
108impl From<Parent> for BaoContentItem {
109    fn from(p: Parent) -> Self {
110        Self::Parent(p)
111    }
112}
113
114impl From<Leaf> for BaoContentItem {
115    fn from(l: Leaf) -> Self {
116        Self::Leaf(l)
117    }
118}
119
120impl BaoContentItem {
121    /// True if this is a leaf node.
122    pub fn is_leaf(&self) -> bool {
123        matches!(self, BaoContentItem::Leaf(_))
124    }
125
126    /// True if this is a parent node.
127    pub fn is_parent(&self) -> bool {
128        matches!(self, BaoContentItem::Parent(_))
129    }
130}
131
132/// Given a range set of byte ranges, round it up to full chunks.
133///
134/// E.g. a byte range from 1..3 will be converted into the chunk range 0..1 (0..1024 bytes).
135pub fn round_up_to_chunks(ranges: &RangeSetRef<u64>) -> ChunkRanges {
136    let mut res = ChunkRanges::empty();
137    // we don't know if the ranges are overlapping, so we just compute the union
138    for item in ranges.iter() {
139        // full_chunks() rounds down, chunks() rounds up
140        match item {
141            RangeSetRange::RangeFrom(range) => {
142                res |= ChunkRanges::from(ChunkNum::full_chunks(*range.start)..)
143            }
144            RangeSetRange::Range(range) => {
145                res |= ChunkRanges::from(
146                    ChunkNum::full_chunks(*range.start)..ChunkNum::chunks(*range.end),
147                )
148            }
149        }
150    }
151    res
152}
153
154/// Given a range set of chunk ranges, round up to chunk groups of the given size.
155pub fn round_up_to_chunks_groups(ranges: ChunkRanges, chunk_size: BlockSize) -> ChunkRanges {
156    let mut res = ChunkRanges::empty();
157    for range in ranges.iter() {
158        res |= match range {
159            RangeSetRange::RangeFrom(range) => {
160                let start = ChunkNum::chunk_group_start(*range.start, chunk_size);
161                ChunkRanges::from(start..)
162            }
163            RangeSetRange::Range(range) => {
164                let start = ChunkNum::chunk_group_start(*range.start, chunk_size);
165                let end = ChunkNum::chunk_group_end(*range.end, chunk_size);
166                ChunkRanges::from(start..end)
167            }
168        }
169    }
170    res
171}
172
173/// Given a range set of byte ranges, round it up to chunk groups.
174///
175/// If we store outboard data at a level of granularity of `block_size`, we can only
176/// share full chunk groups because we don't have proofs for anything below a chunk group.
177pub fn full_chunk_groups(ranges: &ChunkRanges, block_size: BlockSize) -> ChunkRanges {
178    fn floor(value: u64, shift: u8) -> u64 {
179        value >> shift << shift
180    }
181
182    fn ceil(value: u64, shift: u8) -> u64 {
183        (value + (1 << shift) - 1) >> shift << shift
184    }
185    let mut res = ChunkRanges::empty();
186    for item in ranges.iter() {
187        match item {
188            RangeSetRange::RangeFrom(range) => {
189                let start = ceil(range.start.0, block_size.0);
190                res |= ChunkRanges::from(ChunkNum(start)..)
191            }
192            RangeSetRange::Range(range) => {
193                let start = ceil(range.start.0, block_size.0);
194                let end = floor(range.end.0, block_size.0);
195                if start < end {
196                    res |= ChunkRanges::from(ChunkNum(start)..ChunkNum(end))
197                }
198            }
199        }
200    }
201    res
202}
203
204pub(crate) fn combine_hash_pair(l: &blake3::Hash, r: &blake3::Hash) -> [u8; 64] {
205    let mut res = [0u8; 64];
206    let lb: &mut [u8; 32] = (&mut res[0..32]).try_into().unwrap();
207    *lb = *l.as_bytes();
208    let rb: &mut [u8; 32] = (&mut res[32..]).try_into().unwrap();
209    *rb = *r.as_bytes();
210    res
211}
212
213#[cfg(feature = "validate")]
214pub(crate) type LocalBoxFuture<'a, T> =
215    std::pin::Pin<Box<dyn std::future::Future<Output = T> + 'a>>;
216
217#[cfg(test)]
218mod tests {
219    use crate::{BlockSize, ChunkNum};
220
221    #[test]
222    fn test_chunk_group_start() {
223        let bs = BlockSize(4);
224        assert_eq!(ChunkNum::chunk_group_start(ChunkNum(0), bs), ChunkNum(0));
225        assert_eq!(ChunkNum::chunk_group_start(ChunkNum(1), bs), ChunkNum(0));
226        assert_eq!(ChunkNum::chunk_group_start(ChunkNum(16), bs), ChunkNum(16));
227        assert_eq!(ChunkNum::chunk_group_end(ChunkNum(0), bs), ChunkNum(0));
228        assert_eq!(ChunkNum::chunk_group_end(ChunkNum(1), bs), ChunkNum(16));
229        assert_eq!(ChunkNum::chunk_group_end(ChunkNum(16), bs), ChunkNum(16));
230        assert_eq!(ChunkNum::chunk_group_end(ChunkNum(17), bs), ChunkNum(32));
231    }
232}