cobs_codec/
lib.rs

1//! This crate provides a COBS (Consistent Overhead Byte Stuffing) codec
2//! for Tokio.
3//!
4//! The COBS encoding is a very efficient framing method for network packets.
5//! Basically; it allows you to send messages consisting of any bytes
6//! while still being able to detect where messages start and end.
7//!
8//! This is achieved by ending encoded messages with a specific
9//! (customizable) byte called a sentinel.
10//! Any occurrence of this byte within the message is avoided by a substition
11//! scheme that adds very litte overhead: `O(1 + N/254)` worst case.
12//!
13//! See [the Wikipedia acticle on COBS][wiki] for details.
14//!
15//! [wiki]: https://www.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
16//!
17//! ### Choosing a Sentinel Value
18//!
19//! This crate allows users to choose their own sentinel value.
20//! There are two guiding principles when choosing a value.
21//!
22//! **Size**: The encoding has the least overhead when the message
23//!   contains one sentinel at least every 254 bytes.
24//!  Note that this consideration is irrelevant for messages
25//!  up to 254 bytes long.
26//!
27//! **Speed**: Encoding/decoding is fastest for messages with as few
28//!   sentinel values as possible, ideally none.
29//!
30//!
31//! # Examples
32//!
33//! ```
34//! # #[tokio::main(flavor = "current_thread")]
35//! # async fn main() {
36//! use std::io::Cursor;
37//! use tokio_util::codec::{FramedWrite, FramedRead};
38//! use futures::{SinkExt, StreamExt};
39//!
40//! use cobs_codec::{Encoder, Decoder};
41//!
42//! // Choose a message separator that does not appear too frequently in your messages.
43//! const SENTINEL: u8 = 0x00;
44//!
45//! // It's a good idea to limit message size to prevent running out of memory.
46//! const MAX: usize = 32;
47//!
48//! let encoder = Encoder::<SENTINEL, MAX>::new();
49//! let decoder = Decoder::<SENTINEL, MAX>::new();
50//!
51//! // Imagine this buffer being sent from the server to the client.
52//! let mut buffer = Vec::with_capacity(128);
53//!
54//! let mut server_cursor = Cursor::new(&mut buffer);
55//! let mut server = FramedWrite::new(&mut server_cursor, encoder);
56//!
57//! // Send a few messages.
58//! assert!(server.send("hello").await.is_ok());
59//! assert!(server.send("world").await.is_ok());
60//!
61//! let mut client_cursor = Cursor::new(&mut buffer);
62//! let mut client = FramedRead::new(&mut client_cursor, decoder);
63//!
64//! // Receive the messages.
65//! assert_eq!(convert(&client.next().await), Some(Ok(b"hello".as_slice())));
66//! assert_eq!(convert(&client.next().await), Some(Ok(b"world".as_slice())));
67//! assert_eq!(convert(&client.next().await), None);
68//! # fn convert<E>(
69//! #     bytes: &Option<Result<bytes::BytesMut, E>>,
70//! # ) -> Option<Result<&[u8], ()>> {
71//! #     bytes
72//! #         .as_ref()
73//! #         .map(|res| res.as_ref().map(|bytes| bytes.as_ref()).map_err(|_| ()))
74//! # }
75//! # }
76//! ```
77
78#![forbid(unsafe_code)]
79
80use bytes::{Buf, BufMut, BytesMut};
81
82#[cfg(test)]
83mod test_utils;
84
85#[cfg(test)]
86mod test;
87
88const DEFAULT_SENTINEL: u8 = 0x00;
89const DEFAULT_MAX_LEN: usize = 0;
90
91/// The decode output buffer size if there is no frame length limit.
92const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8 * 1024;
93
94const MAX_RUN: usize = 254;
95
96const fn max_encoded_len(input_len: usize) -> usize {
97    let overhead = if input_len == 0 {
98        // In the special case of an empty message, we wind up generating one
99        // byte of overhead.
100        1
101    } else {
102        (input_len + 253) / 254
103    };
104    // +1 for terminator byte.
105    input_len + overhead + 1
106}
107
108const fn decode_buffer_cap(max_len: usize) -> usize {
109    if max_len == 0 {
110        // use a reasonable default in case the frame size is unlimited
111        DEFAULT_DECODE_BUFFER_CAPACITY
112    } else {
113        max_len
114    }
115}
116
117/// Encoding a len (between `0` and `MAX_RUN` inclusive) into a byte such that
118/// we avoid `SENTINEL`.
119#[inline(always)]
120fn encode_len<const SENTINEL: u8>(len: usize) -> u8 {
121    debug_assert!(len <= MAX_RUN);
122    // We're doing the addition on `usize` to ensure we don't generate
123    // additional zero extend instructions.
124    #[allow(clippy::collapsible_else_if)]
125    if SENTINEL == 0 {
126        len.wrapping_add(1) as u8
127    } else if SENTINEL == 255 {
128        assert!(SENTINEL as usize > MAX_RUN);
129        len as u8
130    } else {
131        if len >= SENTINEL as usize {
132            len.wrapping_add(1) as u8
133        } else {
134            len as u8
135        }
136    }
137}
138
139/// Decodes a length-or-terminator byte. If the byte is `SENTINEL`, returns `None`.
140/// Otherwise returns the length of the run encoded by the byte.
141#[inline(always)]
142fn decode_len<const SENTINEL: u8>(code: u8) -> Option<usize> {
143    let len = if SENTINEL == 0 {
144        usize::from(code).checked_sub(1)
145    } else if SENTINEL == 255 {
146        if code == SENTINEL {
147            None
148        } else {
149            Some(usize::from(code))
150        }
151    } else {
152        use std::cmp::Ordering;
153        match code.cmp(&SENTINEL) {
154            Ordering::Equal => None,
155            Ordering::Less => Some(usize::from(code)),
156            Ordering::Greater => Some(usize::from(code).wrapping_sub(1)),
157        }
158    };
159    if let Some(len) = len {
160        debug_assert!(len <= MAX_RUN);
161    };
162    len
163}
164
165#[inline(always)]
166fn encode<const SENTINEL: u8, const MAX_LEN: usize>(
167    input: &[u8],
168    output: &mut BytesMut,
169) {
170    output.reserve(max_encoded_len(input.len()));
171    if MAX_LEN != 0 {
172        debug_assert!(input.len() <= MAX_LEN);
173    }
174    if MAX_LEN != 0 && MAX_LEN <= MAX_RUN {
175        // The input is small enough to never need multiple chunks.
176        for run in input.split(|&b| b == SENTINEL) {
177            output.put_u8(encode_len::<SENTINEL>(run.len()));
178            output.put_slice(run);
179        }
180    } else {
181        let mut prev_run_was_maximal = false;
182
183        // The encoding process can be described in terms of "runs" of non-zero
184        // bytes in the input data. We process each run individually.
185        //
186        // Currently, the scanning-for-zeros loop here is the hottest part of the
187        // encode profile.
188        for mut run in input.split(|&b| b == SENTINEL) {
189            // If the last run we encoded was maximal length, we need to encode an
190            // explicit zero between it and our current `run`.
191            if prev_run_was_maximal {
192                output.put_u8(encode_len::<SENTINEL>(0));
193            }
194
195            // We can only encode a run of up to `MAX_RUN` bytes in COBS. This may
196            // require us to split `run` into multiple output chunks -- in the
197            // extreme case, if the input contains no zeroes, we'll process all of
198            // it here.
199            loop {
200                let chunk_len = usize::min(run.len(), MAX_RUN);
201                let (chunk, new_run) = run.split_at(chunk_len);
202                output.put_u8(encode_len::<SENTINEL>(chunk_len));
203                output.put_slice(chunk);
204
205                run = new_run;
206                prev_run_was_maximal = chunk_len == MAX_RUN;
207
208                // We test this condition here, rather than as a `while` loop,
209                // because we want to process empty runs once.
210                if run.is_empty() {
211                    break;
212                }
213            }
214        }
215    }
216    output.put_u8(SENTINEL);
217}
218
219/// Frame encoder.
220///
221/// This type implements [`Encoder<impl AsRef<[i8]>>`](tokio_util::codec::Encoder);
222/// it encodes any message type that be converted to a byte slice
223/// using [`AsRef<[u8]>`](AsRef).
224///
225/// This type can be customized via generic parameters:\
226/// *`SENTINEL`*: Choose a byte to be used as a frame separator.
227///   The corresponding [`Decoder`] must use the same value.
228///   Refer to the crate documentation for more details on choosing a sentinel.\
229/// *`MAX_LEN`*: Choose the maximum size of a message,
230///   or set to `0` for unlimited message sizes.
231///   This parameter is used as an optimization.
232///   If any message exceeds this limit, encoding will panic.
233#[derive(Default, Debug)]
234pub struct Encoder<
235    const SENTINEL: u8 = DEFAULT_SENTINEL,
236    const MAX_LEN: usize = DEFAULT_MAX_LEN,
237>;
238
239impl<const SENTINEL: u8, const MAX_LEN: usize> Encoder<SENTINEL, MAX_LEN> {
240    /// Create a new encoder.
241    pub fn new() -> Self {
242        Self
243    }
244}
245
246impl<const SENTINEL: u8, const MAX_LEN: usize, T: AsRef<[u8]>>
247    tokio_util::codec::Encoder<T> for Encoder<SENTINEL, MAX_LEN>
248{
249    type Error = std::io::Error;
250
251    #[inline(always)]
252    fn encode(
253        &mut self,
254        item: T,
255        dst: &mut BytesMut,
256    ) -> Result<(), Self::Error> {
257        let bytes = item.as_ref();
258        assert!(MAX_LEN == 0 || bytes.len() <= MAX_LEN);
259        encode::<SENTINEL, MAX_LEN>(bytes, dst);
260        assert_eq!(dst.last(), Some(&SENTINEL));
261        Ok(())
262    }
263}
264
265#[derive(Debug)]
266enum DecoderReadResult {
267    NeedMoreData,
268    Frame(BytesMut),
269    UnexpectedSentinel,
270    FrameOverflow,
271}
272
273#[derive(Debug)]
274struct DecoderReadingState {
275    next_chunk_offset: usize,
276    output: BytesMut,
277    chunk_overflow: bool,
278}
279
280impl DecoderReadingState {
281    #[inline(always)]
282    fn new<const MAX_LEN: usize>(offset: usize) -> Self {
283        let mut this = Self {
284            next_chunk_offset: 0,
285            output: BytesMut::with_capacity(decode_buffer_cap(MAX_LEN)),
286            chunk_overflow: false,
287        };
288        this.update(offset);
289        this
290    }
291
292    #[inline(always)]
293    fn update(&mut self, offset: usize) {
294        self.next_chunk_offset = offset;
295        self.chunk_overflow = offset == MAX_RUN;
296    }
297
298    #[inline(always)]
299    fn read<const SENTINEL: u8, const MAX_LEN: usize>(
300        &mut self,
301        src: &mut BytesMut,
302    ) -> DecoderReadResult {
303        loop {
304            if src.is_empty() {
305                return DecoderReadResult::NeedMoreData;
306            }
307            // Process the remainder of a chunk.
308            if self.next_chunk_offset > 0 {
309                let len = usize::min(self.next_chunk_offset, src.len());
310                if MAX_LEN != 0 && self.output.len() + len > MAX_LEN {
311                    return DecoderReadResult::FrameOverflow;
312                }
313                self.next_chunk_offset -= len;
314                let chunk = src.split_to(len);
315                if chunk.contains(&SENTINEL) {
316                    return DecoderReadResult::UnexpectedSentinel;
317                }
318                self.output.put(chunk);
319                if src.is_empty() {
320                    return DecoderReadResult::NeedMoreData;
321                }
322            }
323            // Process the start of a new chunk.
324            debug_assert!(self.next_chunk_offset == 0);
325            debug_assert!(!src.is_empty());
326            if let Some(offset) = decode_len::<SENTINEL>(src.get_u8()) {
327                if !self.chunk_overflow {
328                    if MAX_LEN != 0 && self.output.len() == MAX_LEN {
329                        return DecoderReadResult::FrameOverflow;
330                    }
331                    self.output.put_u8(SENTINEL);
332                }
333                self.update(offset);
334            } else {
335                // The frame is complete.
336                let capacity = decode_buffer_cap(MAX_LEN);
337                let new_output = BytesMut::with_capacity(capacity);
338                let frame = std::mem::replace(&mut self.output, new_output);
339                return DecoderReadResult::Frame(frame);
340            }
341        }
342    }
343}
344
345#[derive(Debug)]
346enum DecoderState {
347    Initial,
348    Reading(DecoderReadingState),
349    Lost,
350}
351
352/// Frame decoder.
353///
354/// This type implements [`Decoder`](tokio_util::codec::Decoder);
355/// it decodes into [`BytesMut`].
356///
357/// This type can be customized via generic parameters:\
358/// *`SENTINEL`*: Choose a byte to be used as a frame separator.
359///   The corresponding [`Encoder`] must use the same value.
360///   Refer to the crate documentation for more details on choosing a sentinel.\
361/// *`MAX_LEN`*: Choose the maximum size of a message,
362///   or set to `0` for unlimited message sizes.
363///   This parameter is used as a safety measure to prevent
364///   running out of memory. If any message exceeds this limit,
365///   decoding will return [`DecodeError::FrameOverflow`].
366#[derive(Debug)]
367pub struct Decoder<
368    const SENTINEL: u8 = DEFAULT_SENTINEL,
369    const MAX_LEN: usize = DEFAULT_MAX_LEN,
370> {
371    state: DecoderState,
372}
373
374impl<const SENTINEL: u8, const MAX_LEN: usize> Decoder<SENTINEL, MAX_LEN> {
375    /// Create a new decoder.
376    pub fn new() -> Self {
377        Self {
378            state: DecoderState::Initial,
379        }
380    }
381}
382
383impl<const SENTINEL: u8, const MAX_LEN: usize> Default
384    for Decoder<SENTINEL, MAX_LEN>
385{
386    fn default() -> Self {
387        Self::new()
388    }
389}
390
391/// Error while decoding.
392#[derive(thiserror::Error, Debug)]
393pub enum DecodeError {
394    /// An error occured while reading from the underlying IO object.
395    ///
396    /// This variant is not used by this crate itself
397    /// since decoding does not interact with IO,
398    /// but is required to implement [`Decoder`](tokio_util::codec::Decoder)
399    /// because [`FramedRead`](tokio_util::codec::FramedRead)
400    /// wraps both the decoder and the IO object and needs
401    /// to present a single error type.
402    #[error(transparent)]
403    Io(#[from] std::io::Error),
404    /// A frame was found to start with a sentinel byte.
405    ///
406    /// This variant indicates corrupted data,
407    /// either by the sender or during transmission.
408    #[error("missing frame")]
409    MissingFrame,
410    /// The sentinel byte was found in an invalid position.
411    ///
412    /// This variant indicates corrupted data,
413    /// either by the sender or during transmission.
414    #[error("unexpected sentinel")]
415    UnexpectedSentinel,
416    /// The frame was longer than the limit.
417    ///
418    /// This variant is never returned by unlimited decoders.
419    ///
420    /// Either the data was corrupted during transmission,
421    /// or the sender encoded a frame that exceeds the limit.
422    #[error("frame overflow")]
423    FrameOverflow,
424}
425
426impl<const SENTINEL: u8, const MAX_LEN: usize> tokio_util::codec::Decoder
427    for Decoder<SENTINEL, MAX_LEN>
428{
429    type Item = BytesMut;
430    type Error = DecodeError;
431
432    fn decode(
433        &mut self,
434        src: &mut BytesMut,
435    ) -> Result<Option<BytesMut>, Self::Error> {
436        loop {
437            if matches!(self.state, DecoderState::Initial) {
438                src.reserve(max_encoded_len(decode_buffer_cap(MAX_LEN)));
439                if src.is_empty() {
440                    // Need more data to start a new frame.
441                    return Ok(None);
442                } else if let Some(offset) =
443                    decode_len::<SENTINEL>(src.get_u8())
444                {
445                    // The first byte of a frame is the offset to the next sentinel
446                    // value in the frame or the sentinel that marks its end.
447                    let read_state =
448                        DecoderReadingState::new::<MAX_LEN>(offset);
449                    self.state = DecoderState::Reading(read_state);
450                } else {
451                    // A frame may not start with a sentinel value.
452                    //
453                    // Either this is the first byte received
454                    // or it follows a previous sentinal that ended the last frame.
455                    //
456                    // Note that this case could be used to send a signal
457                    // distinct from any other message.
458                    return Err(DecodeError::MissingFrame);
459                }
460            }
461            match &mut self.state {
462                DecoderState::Initial => unreachable!(),
463                DecoderState::Reading(state) => {
464                    match state.read::<SENTINEL, MAX_LEN>(src) {
465                        DecoderReadResult::NeedMoreData => return Ok(None),
466                        DecoderReadResult::Frame(frame) => {
467                            self.state = DecoderState::Initial;
468                            return Ok(Some(frame));
469                        }
470                        DecoderReadResult::UnexpectedSentinel => {
471                            self.state = DecoderState::Initial;
472                            return Err(DecodeError::UnexpectedSentinel);
473                        }
474                        DecoderReadResult::FrameOverflow => {
475                            self.state = DecoderState::Lost;
476                            return Err(DecodeError::FrameOverflow);
477                        }
478                    }
479                }
480                DecoderState::Lost => {
481                    if let Some(index) =
482                        src.iter().position(|byte| *byte == SENTINEL)
483                    {
484                        let _ = src.split_to(index + 1);
485                        let total_capacity =
486                            max_encoded_len(decode_buffer_cap(MAX_LEN));
487                        src.reserve(total_capacity.saturating_sub(src.len()));
488                        self.state = DecoderState::Initial;
489                    } else {
490                        src.clear();
491                        return Ok(None);
492                    }
493                }
494            }
495        }
496    }
497}
498
499/// Frame codec.
500///
501/// This type contains both an [`Encoder`] and a [`Decoder`]
502/// and implements [`Encoder`](tokio_util::codec::Encoder)
503/// as well as [`Decoder`](tokio_util::codec::Decoder).
504///
505/// Refer to the underlying encoder and decoder types
506/// for details on the generic parameters.
507#[derive(Debug)]
508pub struct Codec<
509    const SENTINEL_ENCODE: u8 = DEFAULT_SENTINEL,
510    const SENTINEL_DECODE: u8 = DEFAULT_SENTINEL,
511    const MAX_LEN_ENCODE: usize = DEFAULT_MAX_LEN,
512    const MAX_LEN_DECODE: usize = DEFAULT_MAX_LEN,
513> {
514    encoder: Encoder<SENTINEL_ENCODE, MAX_LEN_ENCODE>,
515    decoder: Decoder<SENTINEL_DECODE, MAX_LEN_DECODE>,
516}
517
518impl<
519        const SENTINEL_ENCODE: u8,
520        const SENTINEL_DECODE: u8,
521        const MAX_LEN_ENCODE: usize,
522        const MAX_LEN_DECODE: usize,
523    > Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
524{
525    /// Create a new codec.
526    pub fn new() -> Self {
527        Self {
528            encoder: Encoder::new(),
529            decoder: Decoder::new(),
530        }
531    }
532}
533
534impl<
535        const SENTINEL_ENCODE: u8,
536        const SENTINEL_DECODE: u8,
537        const MAX_LEN_ENCODE: usize,
538        const MAX_LEN_DECODE: usize,
539    > Default
540    for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
541{
542    fn default() -> Self {
543        Self::new()
544    }
545}
546
547impl<
548        const SENTINEL_ENCODE: u8,
549        const SENTINEL_DECODE: u8,
550        const MAX_LEN_ENCODE: usize,
551        const MAX_LEN_DECODE: usize,
552        T: AsRef<[u8]>,
553    > tokio_util::codec::Encoder<T>
554    for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
555{
556    type Error = std::io::Error;
557
558    fn encode(
559        &mut self,
560        item: T,
561        dst: &mut BytesMut,
562    ) -> Result<(), Self::Error> {
563        self.encoder.encode(item, dst)
564    }
565}
566
567impl<
568        const SENTINEL_ENCODE: u8,
569        const SENTINEL_DECODE: u8,
570        const MAX_LEN_ENCODE: usize,
571        const MAX_LEN_DECODE: usize,
572    > tokio_util::codec::Decoder
573    for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
574{
575    type Item = BytesMut;
576    type Error = DecodeError;
577
578    fn decode(
579        &mut self,
580        src: &mut BytesMut,
581    ) -> Result<Option<BytesMut>, Self::Error> {
582        self.decoder.decode(src)
583    }
584}