radicle_protocol/wire/
frame.rs

1//! Framing protocol.
2#![warn(clippy::missing_docs_in_private_items)]
3use std::{fmt, io};
4
5use bytes::{Buf, BufMut};
6use radicle::node::Link;
7
8use crate::service::Message;
9use crate::{wire, wire::varint, wire::varint::VarInt, PROTOCOL_VERSION};
10
11/// Protocol version strings all start with the magic sequence `rad`, followed
12/// by a version number.
13pub const PROTOCOL_VERSION_STRING: Version = Version([b'r', b'a', b'd', PROTOCOL_VERSION]);
14
15/// Control open byte.
16const CONTROL_OPEN: u8 = 0;
17/// Control close byte.
18const CONTROL_CLOSE: u8 = 1;
19/// Control EOF byte.
20const CONTROL_EOF: u8 = 2;
21
22/// Protocol version.
23#[derive(Debug, PartialEq, Eq)]
24pub struct Version([u8; 4]);
25
26impl Version {
27    /// Version number.
28    pub fn number(&self) -> u8 {
29        self.0[3]
30    }
31}
32
33impl wire::Encode for Version {
34    fn encode(&self, buf: &mut impl BufMut) {
35        buf.put_slice(&PROTOCOL_VERSION_STRING.0);
36    }
37}
38
39impl wire::Decode for Version {
40    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
41        let mut version = [0u8; 4];
42
43        buf.try_copy_to_slice(&mut version[..])?;
44
45        if version != PROTOCOL_VERSION_STRING.0 {
46            return Err(wire::Error::InvalidProtocolVersion(version));
47        }
48        Ok(Self(version))
49    }
50}
51
52/// Identifies a (multiplexed) stream.
53///
54/// Stream IDs are variable-length integers with the least significant 3 bits
55/// denoting the stream type and initiator.
56///
57/// The first bit denotes the initiator (outbound or inbound), while the second
58/// and third bit denote the stream type. See `StreamKind`.
59///
60/// In a situation where Alice connects to Bob, Alice will have the initiator
61/// bit set to `1` for all streams she creates, while Bob will have it set to `0`.
62///
63/// This ensures that Stream IDs never collide.
64/// Additionally, Stream IDs must never be re-used within a connection.
65///
66/// +=======+==================================+
67/// | Bits  | Stream Type                      |
68/// +=======+==================================+
69/// | 0b000 | Outbound Control stream          |
70/// +-------+----------------------------------+
71/// | 0b001 | Inbound Control stream           |
72/// +-------+----------------------------------+
73/// | 0b010 | Outbound Gossip stream           |
74/// +-------+----------------------------------+
75/// | 0b011 | Inbound Gossip stream            |
76/// +-------+----------------------------------+
77/// | 0b100 | Outbound Git stream              |
78/// +-------+----------------------------------+
79/// | 0b101 | Inbound Git stream               |
80/// +-------+----------------------------------+
81///
82#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
83pub struct StreamId(VarInt);
84
85impl StreamId {
86    /// Get the initiator of this stream.
87    pub fn link(&self) -> Link {
88        let n = *self.0;
89        if 0b1 & n == 0 {
90            Link::Outbound
91        } else {
92            Link::Inbound
93        }
94    }
95
96    /// Get the kind of stream this is.
97    pub fn kind(&self) -> Result<StreamKind, u8> {
98        let id = *self.0;
99        let kind = ((id >> 1) & 0b11) as u8;
100
101        StreamKind::try_from(kind)
102    }
103
104    /// Create a control identifier.
105    pub fn control(link: Link) -> Self {
106        let link = if link.is_outbound() { 0 } else { 1 };
107        Self(VarInt::from(((StreamKind::Control as u8) << 1) | link))
108    }
109
110    /// Create a gossip identifier.
111    pub fn gossip(link: Link) -> Self {
112        let link = if link.is_outbound() { 0 } else { 1 };
113        Self(VarInt::from(((StreamKind::Gossip as u8) << 1) | link))
114    }
115
116    /// Create a git identifier.
117    pub fn git(link: Link) -> Self {
118        let link = if link.is_outbound() { 0 } else { 1 };
119        Self(VarInt::from(((StreamKind::Git as u8) << 1) | link))
120    }
121
122    /// Get the nth identifier while preserving the stream type and initiator.
123    pub fn nth(self, n: u64) -> Result<Self, varint::BoundsExceeded> {
124        let id = *self.0 + (n << 3);
125        VarInt::new(id).map(Self)
126    }
127}
128
129impl From<StreamId> for u64 {
130    fn from(value: StreamId) -> Self {
131        *value.0
132    }
133}
134
135impl From<StreamId> for VarInt {
136    fn from(value: StreamId) -> Self {
137        value.0
138    }
139}
140
141impl fmt::Display for StreamId {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        write!(f, "{}", *self.0)
144    }
145}
146
147impl wire::Decode for StreamId {
148    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
149        let id = VarInt::decode(buf)?;
150        Ok(Self(id))
151    }
152}
153
154impl wire::Encode for StreamId {
155    fn encode(&self, buf: &mut impl BufMut) {
156        self.0.encode(buf)
157    }
158}
159
160/// Type of stream.
161#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
162#[repr(u8)]
163pub enum StreamKind {
164    /// Control stream, used to open and close streams.
165    Control = 0b00,
166    /// Gossip stream, used to exchange messages.
167    Gossip = 0b01,
168    /// Git stream, used for replication.
169    Git = 0b10,
170}
171
172impl TryFrom<u8> for StreamKind {
173    type Error = u8;
174
175    fn try_from(value: u8) -> Result<Self, Self::Error> {
176        match value {
177            0b00 => Ok(StreamKind::Control),
178            0b01 => Ok(StreamKind::Gossip),
179            0b10 => Ok(StreamKind::Git),
180            n => Err(n),
181        }
182    }
183}
184
185/// Protocol frame.
186///
187/// ```text
188///  0                   1                   2                   3
189///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
190/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
191/// |      'r'      |      'a'      |      'd'      |      0x1      | Version
192/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
193/// |                     Stream ID                           |TTT|I| Stream ID with Stream [T]ype and [I]nitiator bits
194/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
195/// |                     Data                                   ...| Data (variable size)
196/// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
197/// ```
198#[derive(Debug, PartialEq, Eq)]
199pub struct Frame<M = Message> {
200    /// The protocol version.
201    pub version: Version,
202    /// The stream identifier.
203    pub stream: StreamId,
204    /// The frame payload.
205    pub data: FrameData<M>,
206}
207
208impl<M> Frame<M> {
209    /// Create a 'git' protocol frame.
210    pub fn git(stream: StreamId, data: Vec<u8>) -> Self {
211        Self {
212            version: PROTOCOL_VERSION_STRING,
213            stream,
214            data: FrameData::Git(data),
215        }
216    }
217
218    /// Create a 'control' protocol frame.
219    pub fn control(link: Link, ctrl: Control) -> Self {
220        Self {
221            version: PROTOCOL_VERSION_STRING,
222            stream: StreamId::control(link),
223            data: FrameData::Control(ctrl),
224        }
225    }
226
227    /// Create a 'gossip' protocol frame.
228    pub fn gossip(link: Link, msg: M) -> Self {
229        Self {
230            version: PROTOCOL_VERSION_STRING,
231            stream: StreamId::gossip(link),
232            data: FrameData::Gossip(msg),
233        }
234    }
235}
236
237impl<M: wire::Encode> Frame<M> {
238    /// Serialize frame to bytes.
239    pub fn to_bytes(&self) -> Vec<u8> {
240        wire::serialize(self)
241    }
242}
243
244/// Frame payload.
245#[derive(Debug, PartialEq, Eq)]
246pub enum FrameData<M> {
247    /// Control frame payload.
248    Control(Control),
249    /// Gossip frame payload.
250    Gossip(M),
251    /// Git frame payload. May contain packet-lines as well as packfile data.
252    Git(Vec<u8>),
253}
254
255/// A control message sent over a control stream.
256#[derive(Debug, PartialEq, Eq)]
257pub enum Control {
258    /// Open a new stream.
259    Open {
260        /// The stream to open.
261        stream: StreamId,
262    },
263    /// Close an existing stream.
264    Close {
265        /// The stream to close.
266        stream: StreamId,
267    },
268    /// Signal an end-of-file. This can be used to simulate connections terminating
269    /// without having to close the connection. These control messages are turned into
270    /// [`io::ErrorKind::UnexpectedEof`] errors on read.
271    Eof {
272        /// The stream to send an EOF on.
273        stream: StreamId,
274    },
275}
276
277impl wire::Decode for Control {
278    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
279        let command = u8::decode(buf)?;
280        match command {
281            CONTROL_OPEN => {
282                let stream = StreamId::decode(buf)?;
283                Ok(Control::Open { stream })
284            }
285            CONTROL_CLOSE => {
286                let stream = StreamId::decode(buf)?;
287                Ok(Control::Close { stream })
288            }
289            CONTROL_EOF => {
290                let stream = StreamId::decode(buf)?;
291                Ok(Control::Eof { stream })
292            }
293            other => Err(wire::Error::InvalidControlMessage(other)),
294        }
295    }
296}
297
298impl wire::Encode for Control {
299    fn encode(&self, buf: &mut impl BufMut) {
300        match self {
301            Self::Open { stream: id } => {
302                CONTROL_OPEN.encode(buf);
303                id.encode(buf);
304            }
305            Self::Eof { stream: id } => {
306                CONTROL_EOF.encode(buf);
307                id.encode(buf);
308            }
309            Self::Close { stream: id } => {
310                CONTROL_CLOSE.encode(buf);
311                id.encode(buf);
312            }
313        }
314    }
315}
316
317impl<M: wire::Decode> wire::Decode for Frame<M> {
318    fn decode(buf: &mut impl Buf) -> Result<Self, wire::Error> {
319        let version = Version::decode(buf)?;
320        if version.number() != PROTOCOL_VERSION {
321            return Err(wire::Error::WrongProtocolVersion(version.number()));
322        }
323        let stream = StreamId::decode(buf)?;
324
325        match stream.kind() {
326            Ok(StreamKind::Control) => {
327                let ctrl = Control::decode(buf)?;
328                let frame = Frame {
329                    version,
330                    stream,
331                    data: FrameData::Control(ctrl),
332                };
333                Ok(frame)
334            }
335            Ok(StreamKind::Gossip) => {
336                let data = varint::payload::decode(buf)?;
337                let mut cursor = io::Cursor::new(data);
338                let msg = M::decode(&mut cursor)?;
339                let frame = Frame {
340                    version,
341                    stream,
342                    data: FrameData::Gossip(msg),
343                };
344
345                // Nb. If there is data after the `Message` that is not decoded,
346                // it is simply dropped here.
347
348                Ok(frame)
349            }
350            Ok(StreamKind::Git) => {
351                let data = varint::payload::decode(buf)?;
352                Ok(Frame::git(stream, data))
353            }
354            Err(n) => Err(wire::Error::InvalidStreamKind(n)),
355        }
356    }
357}
358
359impl<M: wire::Encode> wire::Encode for Frame<M> {
360    fn encode(&self, buf: &mut impl BufMut) {
361        self.version.encode(buf);
362        self.stream.encode(buf);
363        match &self.data {
364            FrameData::Control(ctrl) => ctrl.encode(buf),
365            FrameData::Git(data) => varint::payload::encode(data, buf),
366            FrameData::Gossip(msg) => varint::payload::encode(&wire::serialize(msg), buf),
367        }
368    }
369}
370
371#[cfg(test)]
372mod test {
373    use super::*;
374
375    #[test]
376    fn test_stream_id() {
377        assert_eq!(StreamId(VarInt(0b000)).kind().unwrap(), StreamKind::Control);
378        assert_eq!(StreamId(VarInt(0b010)).kind().unwrap(), StreamKind::Gossip);
379        assert_eq!(StreamId(VarInt(0b100)).kind().unwrap(), StreamKind::Git);
380        assert_eq!(StreamId(VarInt(0b001)).link(), Link::Inbound);
381        assert_eq!(StreamId(VarInt(0b000)).link(), Link::Outbound);
382        assert_eq!(StreamId(VarInt(0b101)).link(), Link::Inbound);
383        assert_eq!(StreamId(VarInt(0b100)).link(), Link::Outbound);
384
385        assert_eq!(StreamId::git(Link::Outbound), StreamId(VarInt(0b100)));
386        assert_eq!(StreamId::control(Link::Outbound), StreamId(VarInt(0b000)));
387        assert_eq!(StreamId::gossip(Link::Outbound), StreamId(VarInt(0b010)));
388
389        assert_eq!(StreamId::git(Link::Inbound), StreamId(VarInt(0b101)));
390        assert_eq!(StreamId::control(Link::Inbound), StreamId(VarInt(0b001)));
391        assert_eq!(StreamId::gossip(Link::Inbound), StreamId(VarInt(0b011)));
392    }
393}