1use anyhow::Result;
2use ed25519_dalek::{Keypair, PublicKey, Signature, Signer};
3use fnv::FnvHashSet;
4use parking_lot::Mutex;
5use rkyv::ser::serializers::AllocSerializer;
6use rkyv::ser::Serializer;
7use rkyv::{AlignedVec, Archive, Deserialize, Serialize};
8use std::sync::Arc;
9use zerocopy::{AsBytes, FromBytes};
10
11#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
12#[archive(as = "DocId")]
13#[repr(C)]
14#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
15pub struct DocId(u128);
16
17impl std::fmt::Debug for DocId {
18 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
19 let mut doc_id = [0; 24];
20 base64::encode_config_slice(&self.0.to_be_bytes(), base64::URL_SAFE, &mut doc_id);
21 write!(f, "{}", std::str::from_utf8(&doc_id).expect("wtf?"))
22 }
23}
24
25impl std::fmt::Display for DocId {
26 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
27 write!(f, "{:?}", self)
28 }
29}
30
31impl std::str::FromStr for DocId {
32 type Err = anyhow::Error;
33
34 fn from_str(s: &str) -> Result<Self, Self::Err> {
35 if s.len() != 24 {
36 return Err(anyhow::anyhow!("invalid doc_id length {}", s.len()));
37 }
38 let mut doc_id = [0; 16];
39 base64::decode_config_slice(s, base64::URL_SAFE, &mut doc_id)?;
40 Ok(Self(u128::from_be_bytes(doc_id)))
41 }
42}
43
44impl DocId {
45 pub fn unique() -> Self {
46 let mut bytes = [0; 16];
47 getrandom::getrandom(&mut bytes).expect("failed to get random bytes");
48 Self(u128::from_be_bytes(bytes))
49 }
50}
51
52#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
53#[archive(as = "PeerId")]
54#[repr(C)]
55#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
56pub struct PeerId([u8; 32]);
57
58impl std::fmt::Debug for PeerId {
59 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
60 let mut peer_id = [0; 44];
61 base64::encode_config_slice(&self.0, base64::URL_SAFE, &mut peer_id);
62 write!(f, "{}", std::str::from_utf8(&peer_id).expect("wtf?"))
63 }
64}
65
66impl std::fmt::Display for PeerId {
67 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
68 write!(f, "{:?}", self)
69 }
70}
71
72impl std::str::FromStr for PeerId {
73 type Err = anyhow::Error;
74
75 fn from_str(s: &str) -> Result<Self, Self::Err> {
76 if s.len() != 44 {
77 return Err(anyhow::anyhow!("invalid peer_id length {}", s.len()));
78 }
79 let mut peer_id = [0; 32];
80 base64::decode_config_slice(s, base64::URL_SAFE, &mut peer_id)?;
81 Ok(Self(peer_id))
82 }
83}
84
85impl From<PublicKey> for PeerId {
86 fn from(key: PublicKey) -> Self {
87 Self(key.to_bytes())
88 }
89}
90
91impl From<PeerId> for PublicKey {
92 fn from(peer_id: PeerId) -> Self {
93 PublicKey::from_bytes(&peer_id.0).unwrap()
94 }
95}
96
97#[derive(Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Eq, Hash, PartialEq)]
98#[archive(as = "StreamId")]
99#[repr(C)]
100#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
101pub struct StreamId {
102 peer: PeerId,
103 doc: DocId,
104}
105
106impl std::fmt::Debug for StreamId {
107 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
108 write!(f, "{}{}", self.peer, self.doc)
109 }
110}
111
112impl std::fmt::Display for StreamId {
113 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
114 write!(f, "{:?}", self)
115 }
116}
117
118impl std::str::FromStr for StreamId {
119 type Err = anyhow::Error;
120
121 fn from_str(s: &str) -> Result<Self, Self::Err> {
122 if s.len() != 192 {
123 return Err(anyhow::anyhow!("invalid stream_id length {}", s.len()));
124 }
125 let (peer, doc) = s.split_at(128);
126 let peer = peer.parse()?;
127 let doc = doc.parse()?;
128 Ok(Self { peer, doc })
129 }
130}
131
132impl StreamId {
133 pub fn new(peer: PeerId, doc: DocId) -> Self {
134 Self { peer, doc }
135 }
136
137 pub fn peer(&self) -> PeerId {
138 self.peer
139 }
140
141 pub fn doc(&self) -> DocId {
142 self.doc
143 }
144}
145
146#[derive(
147 Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Debug, Eq, PartialEq,
148)]
149#[archive(as = "Head")]
150#[repr(C)]
151#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
152pub struct Head {
153 pub id: StreamId,
154 pub hash: [u8; 32],
155 pub len: u64,
156 _padding: u64,
157}
158
159impl Head {
160 pub fn id(&self) -> &StreamId {
161 &self.id
162 }
163
164 pub fn hash(&self) -> &[u8; 32] {
165 &self.hash
166 }
167
168 pub fn len(&self) -> u64 {
169 self.len
170 }
171}
172
173impl Head {
174 pub(crate) fn new(id: StreamId) -> Self {
175 Self {
176 id,
177 hash: [
178 175, 19, 73, 185, 245, 249, 161, 166, 160, 64, 77, 234, 54, 220, 201, 73, 155, 203,
179 37, 201, 173, 193, 18, 183, 204, 154, 147, 202, 228, 31, 50, 98,
180 ],
181 len: 0,
182 _padding: 0,
183 }
184 }
185}
186
187#[derive(
188 Archive, Deserialize, Serialize, AsBytes, FromBytes, Clone, Copy, Debug, Eq, PartialEq,
189)]
190#[archive(as = "SignedHead")]
191#[repr(C)]
192#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
193pub struct SignedHead {
194 pub head: Head,
195 #[cfg_attr(feature = "serde-derive", serde(with = "serde_big_array::BigArray"))]
196 pub sig: [u8; 64],
197}
198
199impl Default for SignedHead {
200 fn default() -> Self {
201 Self::new(StreamId {
202 peer: PeerId([0; 32]),
203 doc: DocId(0),
204 })
205 }
206}
207
208impl SignedHead {
209 pub fn head(&self) -> &Head {
210 &self.head
211 }
212
213 pub fn sig(&self) -> &[u8; 64] {
214 &self.sig
215 }
216
217 pub fn verify(&self, id: &StreamId) -> Result<()> {
218 if id != self.head().id() {
219 return Err(anyhow::anyhow!("missmatched stream id"));
220 }
221 let sig = Signature::from(self.sig);
222 PublicKey::from(id.peer()).verify_strict(self.head.as_bytes(), &sig)?;
223 Ok(())
224 }
225}
226
227impl SignedHead {
228 pub(crate) fn new(id: StreamId) -> Self {
229 Self {
230 head: Head::new(id),
231 sig: [0; 64],
232 }
233 }
234
235 pub(crate) fn sign(&mut self, key: &Keypair) {
236 debug_assert_eq!(PeerId::from(key.public), self.head.id().peer());
237 self.sig = key.sign(self.head.as_bytes()).to_bytes();
238 }
239
240 pub(crate) fn set_signature(&mut self, sig: [u8; 64]) -> Result<()> {
241 let sig2 = Signature::from(sig);
242 PublicKey::from(self.head.id().peer()).verify_strict(self.head.as_bytes(), &sig2)?;
243 self.sig = sig;
244 Ok(())
245 }
246}
247
248#[derive(Archive, Deserialize, Serialize, Clone, Debug, Eq, PartialEq)]
249pub struct Stream {
250 pub(crate) head: SignedHead,
251 pub(crate) outboard: Vec<u8>,
252}
253
254impl Stream {
255 pub fn head(&self) -> &Head {
256 self.head.head()
257 }
258}
259
260impl Stream {
261 pub(crate) fn new(id: StreamId) -> Self {
262 Self {
263 head: SignedHead::new(id),
264 outboard: vec![0, 0, 0, 0, 0, 0, 0, 0],
265 }
266 }
267
268 pub(crate) fn to_bytes(&self) -> Result<AlignedVec> {
269 let mut ser = AllocSerializer::<4096>::default();
270 ser.serialize_value(self).unwrap();
271 Ok(ser.into_serializer().into_inner())
272 }
273}
274
275#[derive(Archive, Deserialize, Serialize, Clone, Debug, Default, Eq, PartialEq)]
276#[repr(C)]
277#[cfg_attr(feature = "serde-derive", derive(serde::Deserialize, serde::Serialize))]
278pub struct Slice {
279 pub head: SignedHead,
280 pub data: Vec<u8>,
281}
282
283impl Slice {
284 pub fn with_capacity(capacity: usize) -> Self {
285 Self {
286 head: Default::default(),
287 data: Vec::with_capacity(capacity),
288 }
289 }
290
291 pub fn to_bytes(&self) -> Vec<u8> {
292 let mut ser = AllocSerializer::<4096>::default();
293 ser.serialize_value(self).unwrap();
294 ser.into_serializer().into_inner().into_vec()
295 }
296}
297
298pub(crate) struct StreamLock {
299 id: StreamId,
300 locks: Arc<Mutex<FnvHashSet<StreamId>>>,
301}
302
303impl StreamLock {
304 pub fn new(id: StreamId, locks: Arc<Mutex<FnvHashSet<StreamId>>>) -> Self {
305 Self { id, locks }
306 }
307}
308
309impl Drop for StreamLock {
310 fn drop(&mut self) {
311 let mut locks = self.locks.lock();
312 debug_assert!(locks.remove(&self.id));
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn test_default_stream() {
322 let (outboard, hash) = bao::encode::outboard(&[]);
323 let id = StreamId::new(PeerId([0; 32]), DocId(42));
324 let expect = Stream {
325 head: SignedHead {
326 head: Head {
327 id,
328 hash: *hash.as_bytes(),
329 len: 0,
330 },
331 sig: [0; 64],
332 },
333 outboard,
334 };
335 let actual = Stream::new(id);
336 assert_eq!(actual, expect);
337 }
338}