blake_streams_core/
buffer.rs

1use crate::{Hash, Head, SignedHead, Slice, StreamId, StreamWriter};
2use anyhow::Result;
3use bao::decode::SliceDecoder;
4use fnv::FnvHashSet;
5use std::io::{Read, Write};
6
7pub struct SliceBuffer {
8    stream: StreamWriter<()>,
9    slice_len: u64,
10    buf: Vec<u8>,
11    slices: Vec<SliceInfo>,
12    written: u64,
13    hashes: FnvHashSet<[u8; 32]>,
14}
15
16#[derive(Debug)]
17pub struct SliceInfo {
18    pub offset: u64,
19    pub len: u64,
20    pub written: bool,
21}
22
23impl SliceBuffer {
24    pub fn new(stream: StreamWriter<()>, slice_len: u64) -> Self {
25        Self {
26            stream,
27            slice_len,
28            buf: vec![],
29            slices: vec![],
30            written: 0,
31            hashes: Default::default(),
32        }
33    }
34
35    pub fn id(&self) -> &StreamId {
36        self.stream.id()
37    }
38
39    pub fn head(&self) -> &SignedHead {
40        self.stream.head()
41    }
42
43    pub fn prepare(&mut self, len: u64) {
44        assert!(self.commitable());
45        self.slices.clear();
46        self.slices.reserve((len % self.slice_len + 2) as _);
47        let mut pos = self.head().head().len();
48        let end = pos + len;
49        if pos % self.slice_len != 0 {
50            let alignment_slice = u64::min(self.slice_len - pos % self.slice_len, len);
51            self.slices.push(SliceInfo {
52                offset: pos,
53                len: alignment_slice,
54                written: false,
55            });
56            pos += alignment_slice;
57        }
58        while pos + self.slice_len < end {
59            self.slices.push(SliceInfo {
60                offset: pos,
61                len: self.slice_len,
62                written: false,
63            });
64            pos += self.slice_len;
65        }
66        if pos < end {
67            let final_slice = end - pos;
68            self.slices.push(SliceInfo {
69                offset: pos,
70                len: final_slice,
71                written: false,
72            });
73        }
74        self.buf.clear();
75        self.buf.reserve(len as usize);
76        unsafe { self.buf.set_len(len as usize) };
77        self.written = 0;
78    }
79
80    pub fn slices(&self) -> &[SliceInfo] {
81        &self.slices
82    }
83
84    pub fn add_slice(&mut self, slice: &Slice, i: usize) -> Result<()> {
85        let head = slice.head.head();
86        if !self.hashes.contains(head.hash()) {
87            slice.head.verify(self.stream.id())?;
88            self.hashes.insert(*head.hash());
89        }
90        let info = &self.slices[i];
91        if info.written {
92            return Ok(());
93        }
94        let mut decoder = SliceDecoder::new(
95            &slice.data[..],
96            &Hash::from(*head.hash()),
97            info.offset,
98            info.len,
99        );
100        let start = info.offset - self.head().head().len();
101        let end = start + info.len;
102        decoder.read_exact(&mut self.buf[(start as usize)..(end as usize)])?;
103        let mut end = [0u8];
104        assert_eq!(decoder.read(&mut end).unwrap(), 0);
105        self.slices[i].written = true;
106        self.written += 1;
107        Ok(())
108    }
109
110    pub fn commitable(&self) -> bool {
111        self.written >= self.slices.len() as u64
112    }
113
114    pub fn write_buffer(&mut self) -> Result<()> {
115        if !self.commitable() {
116            return Err(anyhow::anyhow!("missing slices"));
117        }
118        self.stream.write_all(&self.buf)?;
119        Ok(())
120    }
121
122    pub fn commit(&mut self, sig: [u8; 64]) -> Result<Head> {
123        self.write_buffer()?;
124        self.stream.flush()?;
125        Ok(self.stream.commit(sig)?)
126    }
127}