blake_streams_core/
lib.rs

1pub use crate::buffer::{SliceBuffer, SliceInfo};
2pub use crate::read::StreamReader;
3pub use crate::store::StreamStorage;
4pub use crate::stream::{DocId, Head, PeerId, SignedHead, Slice, Stream, StreamId};
5pub use crate::write::StreamWriter;
6pub use bao::Hash;
7pub use ed25519_dalek::{Keypair, PublicKey, SecretKey};
8
9mod buffer;
10mod read;
11mod store;
12mod stream;
13mod write;
14
15#[cfg(test)]
16mod tests {
17    use super::*;
18    use anyhow::Result;
19    use bao::decode::SliceDecoder;
20    use rand::RngCore;
21    use std::io::{BufReader, Read, Write};
22    use tempdir::TempDir;
23
24    fn rand_bytes(size: usize) -> Vec<u8> {
25        let mut rng = rand::thread_rng();
26        let mut data = Vec::with_capacity(size);
27        data.resize(data.capacity(), 0);
28        rng.fill_bytes(&mut data);
29        data
30    }
31
32    fn keypair(secret: [u8; 32]) -> Keypair {
33        let secret = SecretKey::from_bytes(&secret).unwrap();
34        let public = PublicKey::from(&secret);
35        Keypair { secret, public }
36    }
37
38    #[test]
39    fn test_append_stream() -> Result<()> {
40        let tmp = TempDir::new("test_append_stream")?;
41        let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
42        let data = rand_bytes(1_000_000);
43
44        let doc = DocId::unique();
45        let mut stream = storage.append(doc)?;
46        stream.write_all(&data)?;
47        stream.commit()?;
48
49        let stream = storage.slice(stream.id(), 0, data.len() as u64)?;
50        let mut reader = BufReader::new(stream);
51        let mut data2 = Vec::with_capacity(data.len());
52        data2.resize(data2.capacity(), 0);
53        reader.read_exact(&mut data2)?;
54        assert_eq!(data, data2);
55
56        Ok(())
57    }
58
59    #[test]
60    fn test_extract_slice() -> Result<()> {
61        let tmp = TempDir::new("test_extract_slice")?;
62        let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
63        let data = rand_bytes(1027);
64
65        let doc = DocId::unique();
66        let mut stream = storage.append(doc)?;
67        stream.write_all(&data)?;
68        stream.commit()?;
69
70        let offset = 8;
71        let len = 32;
72        let slice = data[offset..(offset + len)].to_vec();
73
74        let mut stream = storage.slice(stream.id(), offset as u64, len as u64)?;
75        let mut slice2 = vec![];
76        stream.read_to_end(&mut slice2)?;
77        assert_eq!(slice2, slice);
78
79        let mut vslice = Slice::default();
80        storage.extract(stream.id(), offset as u64, len as u64, &mut vslice)?;
81
82        let mut slice2 = vec![];
83        vslice.head.verify(stream.id())?;
84        let mut decoder = SliceDecoder::new(
85            &vslice.data[..],
86            &Hash::from(vslice.head.head.hash),
87            offset as u64,
88            len as u64,
89        );
90        decoder.read_to_end(&mut slice2)?;
91        assert_eq!(slice2, slice);
92        Ok(())
93    }
94
95    #[test]
96    fn test_sync() -> Result<()> {
97        let tmp = TempDir::new("test_sync_1")?;
98        let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
99        let data = rand_bytes(8192);
100
101        let doc = DocId::unique();
102        let mut stream = storage.append(doc)?;
103        stream.write_all(&data[..4096])?;
104        let head1 = stream.commit()?;
105        stream.write_all(&data[4096..])?;
106        let head2 = stream.commit()?;
107
108        let tmp = TempDir::new("test_sync_2")?;
109        let mut storage2 = StreamStorage::open(tmp.path(), keypair([1; 32]))?;
110        let stream = storage2.subscribe(stream.id())?;
111        let mut stream = SliceBuffer::new(stream, 1024);
112
113        let mut slice = Slice::default();
114        for head in [head1, head2].iter() {
115            head.verify(stream.id())?;
116            stream.prepare(head.head().len() - stream.head().head().len);
117            for i in 0..stream.slices().len() {
118                let info = &stream.slices()[i];
119                storage.extract(stream.id(), info.offset, info.len, &mut slice)?;
120                stream.add_slice(&slice, i)?;
121            }
122            stream.commit(*head.sig())?;
123        }
124
125        let mut stream = storage2.slice(stream.id(), 0, 8192)?;
126        let mut data2 = vec![];
127        stream.read_to_end(&mut data2)?;
128        assert_eq!(data, data2);
129
130        let tmp = TempDir::new("test_sync_3")?;
131        let mut storage = StreamStorage::open(tmp.path(), keypair([1; 32]))?;
132        let stream = storage.subscribe(stream.id())?;
133        let mut stream = SliceBuffer::new(stream, 1024);
134
135        let mut slice = Slice::default();
136        for head in [head1, head2].iter() {
137            head.verify(stream.id()).unwrap();
138            stream.prepare(head.head().len() - stream.head().head().len);
139            for i in 0..stream.slices().len() {
140                let info = &stream.slices()[i];
141                storage2.extract(stream.id(), info.offset, info.len, &mut slice)?;
142                stream.add_slice(&slice, i)?;
143            }
144            stream.commit(*head.sig())?;
145        }
146
147        let mut stream = storage.slice(stream.id(), 0, 8192)?;
148        let mut data2 = vec![];
149        stream.read_to_end(&mut data2)?;
150        assert_eq!(data, data2);
151
152        Ok(())
153    }
154}