blake_streams_core/
buffer.rs1use crate::{Hash, Head, SignedHead, Slice, StreamId, StreamWriter};
2use anyhow::Result;
3use bao::decode::SliceDecoder;
4use fnv::FnvHashSet;
5use std::io::{Read, Write};
6
7pub struct SliceBuffer {
8 stream: StreamWriter<()>,
9 slice_len: u64,
10 buf: Vec<u8>,
11 slices: Vec<SliceInfo>,
12 written: u64,
13 hashes: FnvHashSet<[u8; 32]>,
14}
15
16#[derive(Debug)]
17pub struct SliceInfo {
18 pub offset: u64,
19 pub len: u64,
20 pub written: bool,
21}
22
23impl SliceBuffer {
24 pub fn new(stream: StreamWriter<()>, slice_len: u64) -> Self {
25 Self {
26 stream,
27 slice_len,
28 buf: vec![],
29 slices: vec![],
30 written: 0,
31 hashes: Default::default(),
32 }
33 }
34
35 pub fn id(&self) -> &StreamId {
36 self.stream.id()
37 }
38
39 pub fn head(&self) -> &SignedHead {
40 self.stream.head()
41 }
42
43 pub fn prepare(&mut self, len: u64) {
44 assert!(self.commitable());
45 self.slices.clear();
46 self.slices.reserve((len % self.slice_len + 2) as _);
47 let mut pos = self.head().head().len();
48 let end = pos + len;
49 if pos % self.slice_len != 0 {
50 let alignment_slice = u64::min(self.slice_len - pos % self.slice_len, len);
51 self.slices.push(SliceInfo {
52 offset: pos,
53 len: alignment_slice,
54 written: false,
55 });
56 pos += alignment_slice;
57 }
58 while pos + self.slice_len < end {
59 self.slices.push(SliceInfo {
60 offset: pos,
61 len: self.slice_len,
62 written: false,
63 });
64 pos += self.slice_len;
65 }
66 if pos < end {
67 let final_slice = end - pos;
68 self.slices.push(SliceInfo {
69 offset: pos,
70 len: final_slice,
71 written: false,
72 });
73 }
74 self.buf.clear();
75 self.buf.reserve(len as usize);
76 unsafe { self.buf.set_len(len as usize) };
77 self.written = 0;
78 }
79
80 pub fn slices(&self) -> &[SliceInfo] {
81 &self.slices
82 }
83
84 pub fn add_slice(&mut self, slice: &Slice, i: usize) -> Result<()> {
85 let head = slice.head.head();
86 if !self.hashes.contains(head.hash()) {
87 slice.head.verify(self.stream.id())?;
88 self.hashes.insert(*head.hash());
89 }
90 let info = &self.slices[i];
91 if info.written {
92 return Ok(());
93 }
94 let mut decoder = SliceDecoder::new(
95 &slice.data[..],
96 &Hash::from(*head.hash()),
97 info.offset,
98 info.len,
99 );
100 let start = info.offset - self.head().head().len();
101 let end = start + info.len;
102 decoder.read_exact(&mut self.buf[(start as usize)..(end as usize)])?;
103 let mut end = [0u8];
104 assert_eq!(decoder.read(&mut end).unwrap(), 0);
105 self.slices[i].written = true;
106 self.written += 1;
107 Ok(())
108 }
109
110 pub fn commitable(&self) -> bool {
111 self.written >= self.slices.len() as u64
112 }
113
114 pub fn write_buffer(&mut self) -> Result<()> {
115 if !self.commitable() {
116 return Err(anyhow::anyhow!("missing slices"));
117 }
118 self.stream.write_all(&self.buf)?;
119 Ok(())
120 }
121
122 pub fn commit(&mut self, sig: [u8; 64]) -> Result<Head> {
123 self.write_buffer()?;
124 self.stream.flush()?;
125 Ok(self.stream.commit(sig)?)
126 }
127}