blake_streams_core/
lib.rs1pub use crate::buffer::{SliceBuffer, SliceInfo};
2pub use crate::read::StreamReader;
3pub use crate::store::StreamStorage;
4pub use crate::stream::{DocId, Head, PeerId, SignedHead, Slice, Stream, StreamId};
5pub use crate::write::StreamWriter;
6pub use bao::Hash;
7pub use ed25519_dalek::{Keypair, PublicKey, SecretKey};
8
9mod buffer;
10mod read;
11mod store;
12mod stream;
13mod write;
14
15#[cfg(test)]
16mod tests {
17 use super::*;
18 use anyhow::Result;
19 use bao::decode::SliceDecoder;
20 use rand::RngCore;
21 use std::io::{BufReader, Read, Write};
22 use tempdir::TempDir;
23
24 fn rand_bytes(size: usize) -> Vec<u8> {
25 let mut rng = rand::thread_rng();
26 let mut data = Vec::with_capacity(size);
27 data.resize(data.capacity(), 0);
28 rng.fill_bytes(&mut data);
29 data
30 }
31
32 fn keypair(secret: [u8; 32]) -> Keypair {
33 let secret = SecretKey::from_bytes(&secret).unwrap();
34 let public = PublicKey::from(&secret);
35 Keypair { secret, public }
36 }
37
38 #[test]
39 fn test_append_stream() -> Result<()> {
40 let tmp = TempDir::new("test_append_stream")?;
41 let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
42 let data = rand_bytes(1_000_000);
43
44 let doc = DocId::unique();
45 let mut stream = storage.append(doc)?;
46 stream.write_all(&data)?;
47 stream.commit()?;
48
49 let stream = storage.slice(stream.id(), 0, data.len() as u64)?;
50 let mut reader = BufReader::new(stream);
51 let mut data2 = Vec::with_capacity(data.len());
52 data2.resize(data2.capacity(), 0);
53 reader.read_exact(&mut data2)?;
54 assert_eq!(data, data2);
55
56 Ok(())
57 }
58
59 #[test]
60 fn test_extract_slice() -> Result<()> {
61 let tmp = TempDir::new("test_extract_slice")?;
62 let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
63 let data = rand_bytes(1027);
64
65 let doc = DocId::unique();
66 let mut stream = storage.append(doc)?;
67 stream.write_all(&data)?;
68 stream.commit()?;
69
70 let offset = 8;
71 let len = 32;
72 let slice = data[offset..(offset + len)].to_vec();
73
74 let mut stream = storage.slice(stream.id(), offset as u64, len as u64)?;
75 let mut slice2 = vec![];
76 stream.read_to_end(&mut slice2)?;
77 assert_eq!(slice2, slice);
78
79 let mut vslice = Slice::default();
80 storage.extract(stream.id(), offset as u64, len as u64, &mut vslice)?;
81
82 let mut slice2 = vec![];
83 vslice.head.verify(stream.id())?;
84 let mut decoder = SliceDecoder::new(
85 &vslice.data[..],
86 &Hash::from(vslice.head.head.hash),
87 offset as u64,
88 len as u64,
89 );
90 decoder.read_to_end(&mut slice2)?;
91 assert_eq!(slice2, slice);
92 Ok(())
93 }
94
95 #[test]
96 fn test_sync() -> Result<()> {
97 let tmp = TempDir::new("test_sync_1")?;
98 let mut storage = StreamStorage::open(tmp.path(), keypair([0; 32]))?;
99 let data = rand_bytes(8192);
100
101 let doc = DocId::unique();
102 let mut stream = storage.append(doc)?;
103 stream.write_all(&data[..4096])?;
104 let head1 = stream.commit()?;
105 stream.write_all(&data[4096..])?;
106 let head2 = stream.commit()?;
107
108 let tmp = TempDir::new("test_sync_2")?;
109 let mut storage2 = StreamStorage::open(tmp.path(), keypair([1; 32]))?;
110 let stream = storage2.subscribe(stream.id())?;
111 let mut stream = SliceBuffer::new(stream, 1024);
112
113 let mut slice = Slice::default();
114 for head in [head1, head2].iter() {
115 head.verify(stream.id())?;
116 stream.prepare(head.head().len() - stream.head().head().len);
117 for i in 0..stream.slices().len() {
118 let info = &stream.slices()[i];
119 storage.extract(stream.id(), info.offset, info.len, &mut slice)?;
120 stream.add_slice(&slice, i)?;
121 }
122 stream.commit(*head.sig())?;
123 }
124
125 let mut stream = storage2.slice(stream.id(), 0, 8192)?;
126 let mut data2 = vec![];
127 stream.read_to_end(&mut data2)?;
128 assert_eq!(data, data2);
129
130 let tmp = TempDir::new("test_sync_3")?;
131 let mut storage = StreamStorage::open(tmp.path(), keypair([1; 32]))?;
132 let stream = storage.subscribe(stream.id())?;
133 let mut stream = SliceBuffer::new(stream, 1024);
134
135 let mut slice = Slice::default();
136 for head in [head1, head2].iter() {
137 head.verify(stream.id()).unwrap();
138 stream.prepare(head.head().len() - stream.head().head().len);
139 for i in 0..stream.slices().len() {
140 let info = &stream.slices()[i];
141 storage2.extract(stream.id(), info.offset, info.len, &mut slice)?;
142 stream.add_slice(&slice, i)?;
143 }
144 stream.commit(*head.sig())?;
145 }
146
147 let mut stream = storage.slice(stream.id(), 0, 8192)?;
148 let mut data2 = vec![];
149 stream.read_to_end(&mut data2)?;
150 assert_eq!(data, data2);
151
152 Ok(())
153 }
154}