blake_streams_core/
stream.rs

1use anyhow::Result;
2use ed25519_dalek::{Keypair, PublicKey, Signature, Signer};
3use fnv::FnvHashSet;
4use parking_lot::Mutex;
5use rkyv::ser::serializers::AllocSerializer;
6use rkyv::ser::Serializer;
7use rkyv::{AlignedVec, Archive, Deserialize, Serialize};
8use std::sync::Arc;
9use zerocopy::{AsBytes, FromBytes};
10
11#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
12#[archive(as = "DocId")]
13#[repr(C)]
14#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
15pub struct DocId(u128);
16
17impl std::fmt::Debug for DocId {
18    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
19        let mut doc_id = [0; 24];
20        base64::encode_config_slice(&self.0.to_be_bytes(), base64::URL_SAFE, &mut doc_id);
21        write!(f, "{}", std::str::from_utf8(&doc_id).expect("wtf?"))
22    }
23}
24
25impl std::fmt::Display for DocId {
26    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
27        write!(f, "{:?}", self)
28    }
29}
30
31impl std::str::FromStr for DocId {
32    type Err = anyhow::Error;
33
34    fn from_str(s: &str) -> Result<Self, Self::Err> {
35        if s.len() != 24 {
36            return Err(anyhow::anyhow!("invalid doc_id length {}", s.len()));
37        }
38        let mut doc_id = [0; 16];
39        base64::decode_config_slice(s, base64::URL_SAFE, &mut doc_id)?;
40        Ok(Self(u128::from_be_bytes(doc_id)))
41    }
42}
43
44impl DocId {
45    pub fn unique() -> Self {
46        let mut bytes = [0; 16];
47        getrandom::getrandom(&mut bytes).expect("failed to get random bytes");
48        Self(u128::from_be_bytes(bytes))
49    }
50}
51
52#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
53#[archive(as = "PeerId")]
54#[repr(C)]
55#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
56pub struct PeerId([u8; 32]);
57
58impl std::fmt::Debug for PeerId {
59    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
60        let mut peer_id = [0; 44];
61        base64::encode_config_slice(&self.0, base64::URL_SAFE, &mut peer_id);
62        write!(f, "{}", std::str::from_utf8(&peer_id).expect("wtf?"))
63    }
64}
65
66impl std::fmt::Display for PeerId {
67    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
68        write!(f, "{:?}", self)
69    }
70}
71
72impl std::str::FromStr for PeerId {
73    type Err = anyhow::Error;
74
75    fn from_str(s: &str) -> Result<Self, Self::Err> {
76        if s.len() != 44 {
77            return Err(anyhow::anyhow!("invalid peer_id length {}", s.len()));
78        }
79        let mut peer_id = [0; 32];
80        base64::decode_config_slice(s, base64::URL_SAFE, &mut peer_id)?;
81        Ok(Self(peer_id))
82    }
83}
84
85impl From<PublicKey> for PeerId {
86    fn from(key: PublicKey) -> Self {
87        Self(key.to_bytes())
88    }
89}
90
91impl From<PeerId> for PublicKey {
92    fn from(peer_id: PeerId) -> Self {
93        PublicKey::from_bytes(&peer_id.0).unwrap()
94    }
95}
96
97#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
98#[archive(as = "StreamId")]
99#[repr(C)]
100#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
101pub struct StreamId {
102    peer: PeerId,
103    doc: DocId,
104}
105
106impl std::fmt::Debug for StreamId {
107    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
108        write!(f, "{}{}", self.peer, self.doc)
109    }
110}
111
112impl std::fmt::Display for StreamId {
113    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114        write!(f, "{:?}", self)
115    }
116}
117
118impl std::str::FromStr for StreamId {
119    type Err = anyhow::Error;
120
121    fn from_str(s: &str) -> Result<Self, Self::Err> {
122        if s.len() != 192 {
123            return Err(anyhow::anyhow!("invalid stream_id length {}", s.len()));
124        }
125        let (peer, doc) = s.split_at(128);
126        let peer = peer.parse()?;
127        let doc = doc.parse()?;
128        Ok(Self { peer, doc })
129    }
130}
131
132impl StreamId {
133    pub fn new(peer: PeerId, doc: DocId) -> Self {
134        Self { peer, doc }
135    }
136
137    pub fn peer(&self) -> PeerId {
138        self.peer
139    }
140
141    pub fn doc(&self) -> DocId {
142        self.doc
143    }
144}
145
146#[derive(
147    Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Debug, Eq, PartialEq,
148)]
149#[archive(as = "Head")]
150#[repr(C)]
151#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
152pub struct Head {
153    pub id: StreamId,
154    pub hash: [u8; 32],
155    pub len: u64,
156    _padding: u64,
157}
158
159impl Head {
160    pub fn id(&self) -> &StreamId {
161        &self.id
162    }
163
164    pub fn hash(&self) -> &[u8; 32] {
165        &self.hash
166    }
167
168    pub fn len(&self) -> u64 {
169        self.len
170    }
171}
172
173impl Head {
174    pub(crate) fn new(id: StreamId) -> Self {
175        Self {
176            id,
177            hash: [
178                175, 19, 73, 185, 245, 249, 161, 166, 160, 64, 77, 234, 54, 220, 201, 73, 155, 203,
179                37, 201, 173, 193, 18, 183, 204, 154, 147, 202, 228, 31, 50, 98,
180            ],
181            len: 0,
182            _padding: 0,
183        }
184    }
185}
186
187#[derive(
188    Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Debug, Eq, PartialEq,
189)]
190#[archive(as = "SignedHead")]
191#[repr(C)]
192#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
193pub struct SignedHead {
194    pub head: Head,
195    #[cfg_attr(feature = "serde-derive", serde(with = "serde_big_array::BigArray"))]
196    pub sig: [u8; 64],
197}
198
199impl Default for SignedHead {
200    fn default() -> Self {
201        Self::new(StreamId {
202            peer: PeerId([0; 32]),
203            doc: DocId(0),
204        })
205    }
206}
207
208impl SignedHead {
209    pub fn head(&self) -> &Head {
210        &self.head
211    }
212
213    pub fn sig(&self) -> &[u8; 64] {
214        &self.sig
215    }
216
217    pub fn verify(&self, id: &StreamId) -> Result<()> {
218        if id != self.head().id() {
219            return Err(anyhow::anyhow!("missmatched stream id"));
220        }
221        let sig = Signature::from(self.sig);
222        PublicKey::from(id.peer()).verify_strict(self.head.as_bytes(), &sig)?;
223        Ok(())
224    }
225}
226
227impl SignedHead {
228    pub(crate) fn new(id: StreamId) -> Self {
229        Self {
230            head: Head::new(id),
231            sig: [0; 64],
232        }
233    }
234
235    pub(crate) fn sign(&mut self, key: &Keypair) {
236        debug_assert_eq!(PeerId::from(key.public), self.head.id().peer());
237        self.sig = key.sign(self.head.as_bytes()).to_bytes();
238    }
239
240    pub(crate) fn set_signature(&mut self, sig: [u8; 64]) -> Result<()> {
241        let sig2 = Signature::from(sig);
242        PublicKey::from(self.head.id().peer()).verify_strict(self.head.as_bytes(), &sig2)?;
243        self.sig = sig;
244        Ok(())
245    }
246}
247
248#[derive(Archive, Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
249pub struct Stream {
250    pub(crate) head: SignedHead,
251    pub(crate) outboard: Vec<u8>,
252}
253
254impl Stream {
255    pub fn head(&self) -> &Head {
256        self.head.head()
257    }
258}
259
260impl Stream {
261    pub(crate) fn new(id: StreamId) -> Self {
262        Self {
263            head: SignedHead::new(id),
264            outboard: vec![0, 0, 0, 0, 0, 0, 0, 0],
265        }
266    }
267
268    pub(crate) fn to_bytes(&self) -> Result<AlignedVec> {
269        let mut ser = AllocSerializer::<4096>::default();
270        ser.serialize_value(self).unwrap();
271        Ok(ser.into_serializer().into_inner())
272    }
273}
274
275#[derive(Archive, Deserialize, Serialize, Clone, Debug, Default, Eq, PartialEq)]
276#[repr(C)]
277#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
278pub struct Slice {
279    pub head: SignedHead,
280    pub data: Vec<u8>,
281}
282
283impl Slice {
284    pub fn with_capacity(capacity: usize) -> Self {
285        Self {
286            head: Default::default(),
287            data: Vec::with_capacity(capacity),
288        }
289    }
290
291    pub fn to_bytes(&self) -> Vec<u8> {
292        let mut ser = AllocSerializer::<4096>::default();
293        ser.serialize_value(self).unwrap();
294        ser.into_serializer().into_inner().into_vec()
295    }
296}
297
298pub(crate) struct StreamLock {
299    id: StreamId,
300    locks: Arc<Mutex<FnvHashSet<StreamId>>>,
301}
302
303impl StreamLock {
304    pub fn new(id: StreamId, locks: Arc<Mutex<FnvHashSet<StreamId>>>) -> Self {
305        Self { id, locks }
306    }
307}
308
309impl Drop for StreamLock {
310    fn drop(&mut self) {
311        let mut locks = self.locks.lock();
312        debug_assert!(locks.remove(&self.id));
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    #[test]
321    fn test_default_stream() {
322        let (outboard, hash) = bao::encode::outboard(&[]);
323        let id = StreamId::new(PeerId([0; 32]), DocId(42));
324        let expect = Stream {
325            head: SignedHead {
326                head: Head {
327                    id,
328                    hash: *hash.as_bytes(),
329                    len: 0,
330                },
331                sig: [0; 64],
332            },
333            outboard,
334        };
335        let actual = Stream::new(id);
336        assert_eq!(actual, expect);
337    }
338}