radicle_node/
wire.rs

1mod frame;
2mod message;
3mod protocol;
4mod varint;
5
6pub use frame::StreamId;
7pub use message::{AddressType, MessageType};
8pub use protocol::{Control, Wire, WireReader, WireSession, WireWriter};
9use radicle::node::UserAgent;
10
11use std::collections::BTreeMap;
12use std::convert::TryFrom;
13use std::ops::Deref;
14use std::str::FromStr;
15use std::string::FromUtf8Error;
16use std::{io, mem};
17
18use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
19use cyphernet::addr::tor;
20
21use crate::crypto::{PublicKey, Signature, Unverified};
22use crate::git;
23use crate::git::fmt;
24use crate::identity::RepoId;
25use crate::node;
26use crate::node::Alias;
27use crate::prelude::*;
28use crate::service::filter;
29use crate::storage::refs::Refs;
30use crate::storage::refs::RefsAt;
31use crate::storage::refs::SignedRefs;
32use crate::Timestamp;
33
34/// The default type we use to represent sizes on the wire.
35///
36/// Since wire messages are limited to 64KB by the transport layer,
37/// two bytes is enough to represent any message.
38///
39/// Note that in certain cases, we may use a smaller type.
40pub type Size = u16;
41
42#[derive(thiserror::Error, Debug)]
43pub enum Error {
44    #[error("i/o: {0}")]
45    Io(#[from] io::Error),
46    #[error("UTF-8 error: {0}")]
47    FromUtf8(#[from] FromUtf8Error),
48    #[error("invalid size: expected {expected}, got {actual}")]
49    InvalidSize { expected: usize, actual: usize },
50    #[error("invalid filter size: {0}")]
51    InvalidFilterSize(usize),
52    #[error("invalid channel type {0:x}")]
53    InvalidStreamKind(u8),
54    #[error(transparent)]
55    InvalidRefName(#[from] fmt::Error),
56    #[error(transparent)]
57    InvalidAlias(#[from] node::AliasError),
58    #[error("invalid user agent string: {0:?}")]
59    InvalidUserAgent(String),
60    #[error("invalid control message with type `{0}`")]
61    InvalidControlMessage(u8),
62    #[error("invalid protocol version header `{0:x?}`")]
63    InvalidProtocolVersion([u8; 4]),
64    #[error("invalid onion address: {0}")]
65    InvalidOnionAddr(#[from] tor::OnionAddrDecodeError),
66    #[error("invalid timestamp: {0}")]
67    InvalidTimestamp(u64),
68    #[error("wrong protocol version `{0}`")]
69    WrongProtocolVersion(u8),
70    #[error("unknown address type `{0}`")]
71    UnknownAddressType(u8),
72    #[error("unknown message type `{0}`")]
73    UnknownMessageType(u16),
74    #[error("unknown info type `{0}`")]
75    UnknownInfoType(u16),
76    #[error("unexpected bytes")]
77    UnexpectedBytes,
78}
79
80impl Error {
81    /// Whether we've reached the end of file. This will be true when we fail to decode
82    /// a message because there's not enough data in the stream.
83    pub fn is_eof(&self) -> bool {
84        matches!(self, Self::Io(err) if err.kind() == io::ErrorKind::UnexpectedEof)
85    }
86}
87
88/// Things that can be encoded as binary.
89pub trait Encode {
90    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error>;
91}
92
93/// Things that can be decoded from binary.
94pub trait Decode: Sized {
95    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error>;
96}
97
98/// Encode an object into a byte vector.
99///
100/// # Panics
101///
102/// If the encoded object exceeds [`Size::MAX`].
103pub fn serialize<T: Encode + ?Sized>(data: &T) -> Vec<u8> {
104    let mut buffer = Vec::new();
105    // SAFETY: We expect this to panic if the user passes
106    // in data that exceeds the maximum allowed size.
107    #[allow(clippy::unwrap_used)]
108    let len = data.encode(&mut buffer).unwrap();
109
110    debug_assert_eq!(len, buffer.len());
111
112    buffer
113}
114
115/// Decode an object from a vector.
116pub fn deserialize<T: Decode>(data: &[u8]) -> Result<T, Error> {
117    let mut cursor = io::Cursor::new(data);
118    let obj = T::decode(&mut cursor)?;
119
120    if cursor.position() as usize != cursor.get_ref().len() {
121        return Err(Error::UnexpectedBytes);
122    }
123    Ok(obj)
124}
125
126impl Encode for u8 {
127    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
128        writer.write_u8(*self)?;
129
130        Ok(mem::size_of::<Self>())
131    }
132}
133
134impl Encode for u16 {
135    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
136        writer.write_u16::<NetworkEndian>(*self)?;
137
138        Ok(mem::size_of::<Self>())
139    }
140}
141
142impl Encode for u32 {
143    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
144        writer.write_u32::<NetworkEndian>(*self)?;
145
146        Ok(mem::size_of::<Self>())
147    }
148}
149
150impl Encode for u64 {
151    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
152        writer.write_u64::<NetworkEndian>(*self)?;
153
154        Ok(mem::size_of::<Self>())
155    }
156}
157
158impl Encode for PublicKey {
159    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
160        self.deref().encode(writer)
161    }
162}
163
164impl<const T: usize> Encode for &[u8; T] {
165    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
166        writer.write_all(&**self)?;
167        Ok(mem::size_of::<Self>())
168    }
169}
170
171impl<const T: usize> Encode for [u8; T] {
172    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
173        writer.write_all(self)?;
174
175        Ok(mem::size_of::<Self>())
176    }
177}
178
179impl<T> Encode for &[T]
180where
181    T: Encode,
182{
183    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
184        let mut n = (self.len() as Size).encode(writer)?;
185
186        for item in self.iter() {
187            n += item.encode(writer)?;
188        }
189        Ok(n)
190    }
191}
192
193impl<T, const N: usize> Encode for BoundedVec<T, N>
194where
195    T: Encode,
196{
197    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
198        self.as_slice().encode(writer)
199    }
200}
201
202impl Encode for &str {
203    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
204        assert!(self.len() <= u8::MAX as usize);
205
206        let n = (self.len() as u8).encode(writer)?;
207        let bytes = self.as_bytes();
208
209        // Nb. Don't use the [`Encode`] instance here for &[u8], because we are prefixing the
210        // length ourselves.
211        writer.write_all(bytes)?;
212
213        Ok(n + bytes.len())
214    }
215}
216
217impl Encode for String {
218    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
219        self.as_str().encode(writer)
220    }
221}
222
223impl Encode for git::Url {
224    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
225        self.to_string().encode(writer)
226    }
227}
228
229impl Encode for RepoId {
230    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
231        self.deref().encode(writer)
232    }
233}
234
235impl Encode for Refs {
236    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
237        let len: Size = self
238            .len()
239            .try_into()
240            .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
241        let mut n = len.encode(writer)?;
242
243        for (name, oid) in self.iter() {
244            n += name.as_str().encode(writer)?;
245            n += oid.encode(writer)?;
246        }
247        Ok(n)
248    }
249}
250
251impl Encode for cyphernet::addr::tor::OnionAddrV3 {
252    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
253        self.into_raw_bytes().encode(writer)
254    }
255}
256
257impl Encode for UserAgent {
258    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
259        self.as_ref().encode(writer)
260    }
261}
262
263impl Encode for Alias {
264    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
265        self.as_ref().encode(writer)
266    }
267}
268
269impl<A, B> Encode for (A, B)
270where
271    A: Encode,
272    B: Encode,
273{
274    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
275        let mut n = self.0.encode(writer)?;
276        n += self.1.encode(writer)?;
277        Ok(n)
278    }
279}
280
281impl Encode for git::RefString {
282    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
283        self.as_str().encode(writer)
284    }
285}
286
287impl Encode for Signature {
288    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
289        self.deref().encode(writer)
290    }
291}
292
293impl Encode for git::Oid {
294    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
295        // Nb. We use length-encoding here to support future SHA-2 object ids.
296        self.as_bytes().encode(writer)
297    }
298}
299
300////////////////////////////////////////////////////////////////////////////////
301
302impl Decode for PublicKey {
303    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
304        let buf: [u8; 32] = Decode::decode(reader)?;
305
306        Ok(PublicKey::from(buf))
307    }
308}
309
310impl Decode for Refs {
311    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
312        let len = Size::decode(reader)?;
313        let mut refs = BTreeMap::new();
314
315        for _ in 0..len {
316            let name = String::decode(reader)?;
317            let name = git::RefString::try_from(name).map_err(Error::from)?;
318            let oid = git::Oid::decode(reader)?;
319
320            refs.insert(name, oid);
321        }
322        Ok(refs.into())
323    }
324}
325
326impl Decode for git::RefString {
327    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
328        let ref_str = String::decode(reader)?;
329        git::RefString::try_from(ref_str).map_err(Error::from)
330    }
331}
332
333impl Decode for UserAgent {
334    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
335        String::decode(reader)
336            .and_then(|s| UserAgent::from_str(&s).map_err(Error::InvalidUserAgent))
337    }
338}
339
340impl Decode for Alias {
341    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
342        String::decode(reader).and_then(|s| Alias::from_str(&s).map_err(Error::from))
343    }
344}
345
346impl<A, B> Decode for (A, B)
347where
348    A: Decode,
349    B: Decode,
350{
351    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
352        let a = A::decode(reader)?;
353        let b = B::decode(reader)?;
354        Ok((a, b))
355    }
356}
357
358impl Decode for git::Oid {
359    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
360        let len = Size::decode(reader)? as usize;
361        #[allow(non_upper_case_globals)]
362        const expected: usize = mem::size_of::<git::raw::Oid>();
363
364        if len != expected {
365            return Err(Error::InvalidSize {
366                expected,
367                actual: len,
368            });
369        }
370
371        let buf: [u8; expected] = Decode::decode(reader)?;
372        let oid = git::raw::Oid::from_bytes(&buf).expect("the buffer is exactly the right size");
373        let oid = git::Oid::from(oid);
374
375        Ok(oid)
376    }
377}
378
379impl Decode for Signature {
380    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
381        let bytes: [u8; 64] = Decode::decode(reader)?;
382
383        Ok(Signature::from(bytes))
384    }
385}
386
387impl Decode for u8 {
388    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
389        reader.read_u8().map_err(Error::from)
390    }
391}
392
393impl Decode for u16 {
394    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
395        reader.read_u16::<NetworkEndian>().map_err(Error::from)
396    }
397}
398
399impl Decode for u32 {
400    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
401        reader.read_u32::<NetworkEndian>().map_err(Error::from)
402    }
403}
404
405impl Decode for u64 {
406    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
407        reader.read_u64::<NetworkEndian>().map_err(Error::from)
408    }
409}
410
411impl<const N: usize> Decode for [u8; N] {
412    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
413        let mut ary = [0; N];
414        reader.read_exact(&mut ary)?;
415
416        Ok(ary)
417    }
418}
419
420impl<T, const N: usize> Decode for BoundedVec<T, N>
421where
422    T: Decode,
423{
424    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
425        let len: usize = Size::decode(reader)? as usize;
426        let mut items = Self::with_capacity(len).map_err(|_| Error::InvalidSize {
427            expected: Self::max(),
428            actual: len,
429        })?;
430
431        for _ in 0..items.capacity() {
432            let item = T::decode(reader)?;
433            items.push(item).ok();
434        }
435        Ok(items)
436    }
437}
438
439impl Decode for String {
440    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
441        let len = u8::decode(reader)?;
442        let mut bytes = vec![0; len as usize];
443
444        reader.read_exact(&mut bytes)?;
445
446        let string = String::from_utf8(bytes)?;
447
448        Ok(string)
449    }
450}
451
452impl Decode for RepoId {
453    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
454        let oid: git::Oid = Decode::decode(reader)?;
455
456        Ok(Self::from(oid))
457    }
458}
459
460impl Encode for filter::Filter {
461    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
462        let mut n = 0;
463
464        n += self.deref().as_bytes().encode(writer)?;
465
466        Ok(n)
467    }
468}
469
470impl Decode for filter::Filter {
471    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
472        let size: usize = Size::decode(reader)? as usize;
473        if !filter::FILTER_SIZES.contains(&size) {
474            return Err(Error::InvalidFilterSize(size));
475        }
476
477        let mut bytes = vec![0; size];
478        reader.read_exact(&mut bytes[..])?;
479
480        let f = filter::BloomFilter::from(bytes);
481        debug_assert_eq!(f.hashes(), filter::FILTER_HASHES);
482
483        Ok(Self::from(f))
484    }
485}
486
487impl<V> Encode for SignedRefs<V> {
488    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
489        let mut n = 0;
490
491        n += self.id.encode(writer)?;
492        n += self.refs.encode(writer)?;
493        n += self.signature.encode(writer)?;
494
495        Ok(n)
496    }
497}
498
499impl Decode for SignedRefs<Unverified> {
500    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
501        let id = NodeId::decode(reader)?;
502        let refs = Refs::decode(reader)?;
503        let signature = Signature::decode(reader)?;
504
505        Ok(Self::new(refs, id, signature))
506    }
507}
508
509impl Encode for RefsAt {
510    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
511        let mut n = 0;
512
513        n += self.remote.encode(writer)?;
514        n += self.at.encode(writer)?;
515
516        Ok(n)
517    }
518}
519
520impl Decode for RefsAt {
521    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
522        let remote = NodeId::decode(reader)?;
523        let at = git::Oid::decode(reader)?;
524        Ok(Self { remote, at })
525    }
526}
527
528impl Encode for node::Features {
529    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
530        self.deref().encode(writer)
531    }
532}
533
534impl Decode for node::Features {
535    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
536        let features = u64::decode(reader)?;
537
538        Ok(Self::from(features))
539    }
540}
541
542impl Decode for tor::OnionAddrV3 {
543    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
544        let bytes: [u8; tor::ONION_V3_RAW_LEN] = Decode::decode(reader)?;
545        let addr = tor::OnionAddrV3::from_raw_bytes(bytes)?;
546
547        Ok(addr)
548    }
549}
550
551impl Encode for Timestamp {
552    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
553        self.deref().encode(writer)
554    }
555}
556
557impl Decode for Timestamp {
558    fn decode<R: io::Read + ?Sized>(reader: &mut R) -> Result<Self, Error> {
559        let millis = u64::decode(reader)?;
560        let ts = Timestamp::try_from(millis).map_err(Error::InvalidTimestamp)?;
561
562        Ok(ts)
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use qcheck;
570    use qcheck_macros::quickcheck;
571
572    use crate::crypto::Unverified;
573    use crate::storage::refs::SignedRefs;
574    use crate::test::assert_matches;
575
576    #[quickcheck]
577    fn prop_u8(input: u8) {
578        assert_eq!(deserialize::<u8>(&serialize(&input)).unwrap(), input);
579    }
580
581    #[quickcheck]
582    fn prop_u16(input: u16) {
583        assert_eq!(deserialize::<u16>(&serialize(&input)).unwrap(), input);
584    }
585
586    #[quickcheck]
587    fn prop_u32(input: u32) {
588        assert_eq!(deserialize::<u32>(&serialize(&input)).unwrap(), input);
589    }
590
591    #[quickcheck]
592    fn prop_u64(input: u64) {
593        assert_eq!(deserialize::<u64>(&serialize(&input)).unwrap(), input);
594    }
595
596    #[quickcheck]
597    fn prop_string(input: String) -> qcheck::TestResult {
598        if input.len() > u8::MAX as usize {
599            return qcheck::TestResult::discard();
600        }
601        assert_eq!(deserialize::<String>(&serialize(&input)).unwrap(), input);
602
603        qcheck::TestResult::passed()
604    }
605
606    #[quickcheck]
607    fn prop_vec(input: BoundedVec<String, 16>) {
608        assert_eq!(
609            deserialize::<BoundedVec<String, 16>>(&serialize(&input.as_slice())).unwrap(),
610            input
611        );
612    }
613
614    #[quickcheck]
615    fn prop_pubkey(input: PublicKey) {
616        assert_eq!(deserialize::<PublicKey>(&serialize(&input)).unwrap(), input);
617    }
618
619    #[quickcheck]
620    fn prop_filter(input: filter::Filter) {
621        assert_eq!(
622            deserialize::<filter::Filter>(&serialize(&input)).unwrap(),
623            input
624        );
625    }
626
627    #[quickcheck]
628    fn prop_id(input: RepoId) {
629        assert_eq!(deserialize::<RepoId>(&serialize(&input)).unwrap(), input);
630    }
631
632    #[quickcheck]
633    fn prop_refs(input: Refs) {
634        assert_eq!(deserialize::<Refs>(&serialize(&input)).unwrap(), input);
635    }
636
637    #[quickcheck]
638    fn prop_tuple(input: (String, String)) {
639        assert_eq!(
640            deserialize::<(String, String)>(&serialize(&input)).unwrap(),
641            input
642        );
643    }
644
645    #[quickcheck]
646    fn prop_signature(input: [u8; 64]) {
647        let signature = Signature::from(input);
648
649        assert_eq!(
650            deserialize::<Signature>(&serialize(&signature)).unwrap(),
651            signature
652        );
653    }
654
655    #[quickcheck]
656    fn prop_oid(input: [u8; 20]) {
657        let oid = git::Oid::try_from(input.as_slice()).unwrap();
658
659        assert_eq!(deserialize::<git::Oid>(&serialize(&oid)).unwrap(), oid);
660    }
661
662    #[quickcheck]
663    fn prop_signed_refs(input: SignedRefs<Unverified>) {
664        assert_eq!(
665            deserialize::<SignedRefs<Unverified>>(&serialize(&input)).unwrap(),
666            input
667        );
668    }
669
670    #[test]
671    fn test_string() {
672        assert_eq!(
673            serialize(&String::from("hello")),
674            vec![5, b'h', b'e', b'l', b'l', b'o']
675        );
676    }
677
678    #[test]
679    fn test_alias() {
680        assert_eq!(
681            serialize(&Alias::from_str("hello").unwrap()),
682            vec![5, b'h', b'e', b'l', b'l', b'o']
683        );
684    }
685
686    #[test]
687    fn test_filter_invalid() {
688        let b = bloomy::BloomFilter::with_size(filter::FILTER_SIZE_M / 3);
689        let f = filter::Filter::from(b);
690        let bytes = serialize(&f);
691
692        assert_matches!(
693            deserialize::<filter::Filter>(&bytes).unwrap_err(),
694            Error::InvalidFilterSize(_)
695        );
696    }
697
698    #[test]
699    fn test_bounded_vec_limit() {
700        let v: BoundedVec<u8, 2> = vec![1, 2].try_into().unwrap();
701        let buf = serialize(&v);
702
703        assert_matches!(
704            deserialize::<BoundedVec<u8, 1>>(&buf),
705            Err(Error::InvalidSize {
706                expected: 1,
707                actual: 2
708            }),
709            "fail when vector is too small for buffer",
710        );
711
712        assert!(
713            deserialize::<BoundedVec<u8, 2>>(&buf).is_ok(),
714            "successfully decode vector of same size",
715        );
716    }
717}