cobs_codec/lib.rs
1//! This crate provides a COBS (Consistent Overhead Byte Stuffing) codec
2//! for Tokio.
3//!
4//! The COBS encoding is a very efficient framing method for network packets.
5//! Basically; it allows you to send messages consisting of any bytes
6//! while still being able to detect where messages start and end.
7//!
8//! This is achieved by ending encoded messages with a specific
9//! (customizable) byte called a sentinel.
10//! Any occurrence of this byte within the message is avoided by a substition
11//! scheme that adds very litte overhead: `O(1 + N/254)` worst case.
12//!
13//! See [the Wikipedia acticle on COBS][wiki] for details.
14//!
15//! [wiki]: https://www.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing
16//!
17//! ### Choosing a Sentinel Value
18//!
19//! This crate allows users to choose their own sentinel value.
20//! There are two guiding principles when choosing a value.
21//!
22//! **Size**: The encoding has the least overhead when the message
23//! contains one sentinel at least every 254 bytes.
24//! Note that this consideration is irrelevant for messages
25//! up to 254 bytes long.
26//!
27//! **Speed**: Encoding/decoding is fastest for messages with as few
28//! sentinel values as possible, ideally none.
29//!
30//!
31//! # Examples
32//!
33//! ```
34//! # #[tokio::main(flavor = "current_thread")]
35//! # async fn main() {
36//! use std::io::Cursor;
37//! use tokio_util::codec::{FramedWrite, FramedRead};
38//! use futures::{SinkExt, StreamExt};
39//!
40//! use cobs_codec::{Encoder, Decoder};
41//!
42//! // Choose a message separator that does not appear too frequently in your messages.
43//! const SENTINEL: u8 = 0x00;
44//!
45//! // It's a good idea to limit message size to prevent running out of memory.
46//! const MAX: usize = 32;
47//!
48//! let encoder = Encoder::<SENTINEL, MAX>::new();
49//! let decoder = Decoder::<SENTINEL, MAX>::new();
50//!
51//! // Imagine this buffer being sent from the server to the client.
52//! let mut buffer = Vec::with_capacity(128);
53//!
54//! let mut server_cursor = Cursor::new(&mut buffer);
55//! let mut server = FramedWrite::new(&mut server_cursor, encoder);
56//!
57//! // Send a few messages.
58//! assert!(server.send("hello").await.is_ok());
59//! assert!(server.send("world").await.is_ok());
60//!
61//! let mut client_cursor = Cursor::new(&mut buffer);
62//! let mut client = FramedRead::new(&mut client_cursor, decoder);
63//!
64//! // Receive the messages.
65//! assert_eq!(convert(&client.next().await), Some(Ok(b"hello".as_slice())));
66//! assert_eq!(convert(&client.next().await), Some(Ok(b"world".as_slice())));
67//! assert_eq!(convert(&client.next().await), None);
68//! # fn convert<E>(
69//! # bytes: &Option<Result<bytes::BytesMut, E>>,
70//! # ) -> Option<Result<&[u8], ()>> {
71//! # bytes
72//! # .as_ref()
73//! # .map(|res| res.as_ref().map(|bytes| bytes.as_ref()).map_err(|_| ()))
74//! # }
75//! # }
76//! ```
77
78#![forbid(unsafe_code)]
79
80use bytes::{Buf, BufMut, BytesMut};
81
82#[cfg(test)]
83mod test_utils;
84
85#[cfg(test)]
86mod test;
87
88const DEFAULT_SENTINEL: u8 = 0x00;
89const DEFAULT_MAX_LEN: usize = 0;
90
91/// The decode output buffer size if there is no frame length limit.
92const DEFAULT_DECODE_BUFFER_CAPACITY: usize = 8 * 1024;
93
94const MAX_RUN: usize = 254;
95
96const fn max_encoded_len(input_len: usize) -> usize {
97 let overhead = if input_len == 0 {
98 // In the special case of an empty message, we wind up generating one
99 // byte of overhead.
100 1
101 } else {
102 (input_len + 253) / 254
103 };
104 // +1 for terminator byte.
105 input_len + overhead + 1
106}
107
108const fn decode_buffer_cap(max_len: usize) -> usize {
109 if max_len == 0 {
110 // use a reasonable default in case the frame size is unlimited
111 DEFAULT_DECODE_BUFFER_CAPACITY
112 } else {
113 max_len
114 }
115}
116
117/// Encoding a len (between `0` and `MAX_RUN` inclusive) into a byte such that
118/// we avoid `SENTINEL`.
119#[inline(always)]
120fn encode_len<const SENTINEL: u8>(len: usize) -> u8 {
121 debug_assert!(len <= MAX_RUN);
122 // We're doing the addition on `usize` to ensure we don't generate
123 // additional zero extend instructions.
124 #[allow(clippy::collapsible_else_if)]
125 if SENTINEL == 0 {
126 len.wrapping_add(1) as u8
127 } else if SENTINEL == 255 {
128 assert!(SENTINEL as usize > MAX_RUN);
129 len as u8
130 } else {
131 if len >= SENTINEL as usize {
132 len.wrapping_add(1) as u8
133 } else {
134 len as u8
135 }
136 }
137}
138
139/// Decodes a length-or-terminator byte. If the byte is `SENTINEL`, returns `None`.
140/// Otherwise returns the length of the run encoded by the byte.
141#[inline(always)]
142fn decode_len<const SENTINEL: u8>(code: u8) -> Option<usize> {
143 let len = if SENTINEL == 0 {
144 usize::from(code).checked_sub(1)
145 } else if SENTINEL == 255 {
146 if code == SENTINEL {
147 None
148 } else {
149 Some(usize::from(code))
150 }
151 } else {
152 use std::cmp::Ordering;
153 match code.cmp(&SENTINEL) {
154 Ordering::Equal => None,
155 Ordering::Less => Some(usize::from(code)),
156 Ordering::Greater => Some(usize::from(code).wrapping_sub(1)),
157 }
158 };
159 if let Some(len) = len {
160 debug_assert!(len <= MAX_RUN);
161 };
162 len
163}
164
165#[inline(always)]
166fn encode<const SENTINEL: u8, const MAX_LEN: usize>(
167 input: &[u8],
168 output: &mut BytesMut,
169) {
170 output.reserve(max_encoded_len(input.len()));
171 if MAX_LEN != 0 {
172 debug_assert!(input.len() <= MAX_LEN);
173 }
174 if MAX_LEN != 0 && MAX_LEN <= MAX_RUN {
175 // The input is small enough to never need multiple chunks.
176 for run in input.split(|&b| b == SENTINEL) {
177 output.put_u8(encode_len::<SENTINEL>(run.len()));
178 output.put_slice(run);
179 }
180 } else {
181 let mut prev_run_was_maximal = false;
182
183 // The encoding process can be described in terms of "runs" of non-zero
184 // bytes in the input data. We process each run individually.
185 //
186 // Currently, the scanning-for-zeros loop here is the hottest part of the
187 // encode profile.
188 for mut run in input.split(|&b| b == SENTINEL) {
189 // If the last run we encoded was maximal length, we need to encode an
190 // explicit zero between it and our current `run`.
191 if prev_run_was_maximal {
192 output.put_u8(encode_len::<SENTINEL>(0));
193 }
194
195 // We can only encode a run of up to `MAX_RUN` bytes in COBS. This may
196 // require us to split `run` into multiple output chunks -- in the
197 // extreme case, if the input contains no zeroes, we'll process all of
198 // it here.
199 loop {
200 let chunk_len = usize::min(run.len(), MAX_RUN);
201 let (chunk, new_run) = run.split_at(chunk_len);
202 output.put_u8(encode_len::<SENTINEL>(chunk_len));
203 output.put_slice(chunk);
204
205 run = new_run;
206 prev_run_was_maximal = chunk_len == MAX_RUN;
207
208 // We test this condition here, rather than as a `while` loop,
209 // because we want to process empty runs once.
210 if run.is_empty() {
211 break;
212 }
213 }
214 }
215 }
216 output.put_u8(SENTINEL);
217}
218
219/// Frame encoder.
220///
221/// This type implements [`Encoder<impl AsRef<[i8]>>`](tokio_util::codec::Encoder);
222/// it encodes any message type that be converted to a byte slice
223/// using [`AsRef<[u8]>`](AsRef).
224///
225/// This type can be customized via generic parameters:\
226/// *`SENTINEL`*: Choose a byte to be used as a frame separator.
227/// The corresponding [`Decoder`] must use the same value.
228/// Refer to the crate documentation for more details on choosing a sentinel.\
229/// *`MAX_LEN`*: Choose the maximum size of a message,
230/// or set to `0` for unlimited message sizes.
231/// This parameter is used as an optimization.
232/// If any message exceeds this limit, encoding will panic.
233#[derive(Default, Debug)]
234pub struct Encoder<
235 const SENTINEL: u8 = DEFAULT_SENTINEL,
236 const MAX_LEN: usize = DEFAULT_MAX_LEN,
237>;
238
239impl<const SENTINEL: u8, const MAX_LEN: usize> Encoder<SENTINEL, MAX_LEN> {
240 /// Create a new encoder.
241 pub fn new() -> Self {
242 Self
243 }
244}
245
246impl<const SENTINEL: u8, const MAX_LEN: usize, T: AsRef<[u8]>>
247 tokio_util::codec::Encoder<T> for Encoder<SENTINEL, MAX_LEN>
248{
249 type Error = std::io::Error;
250
251 #[inline(always)]
252 fn encode(
253 &mut self,
254 item: T,
255 dst: &mut BytesMut,
256 ) -> Result<(), Self::Error> {
257 let bytes = item.as_ref();
258 assert!(MAX_LEN == 0 || bytes.len() <= MAX_LEN);
259 encode::<SENTINEL, MAX_LEN>(bytes, dst);
260 assert_eq!(dst.last(), Some(&SENTINEL));
261 Ok(())
262 }
263}
264
265#[derive(Debug)]
266enum DecoderReadResult {
267 NeedMoreData,
268 Frame(BytesMut),
269 UnexpectedSentinel,
270 FrameOverflow,
271}
272
273#[derive(Debug)]
274struct DecoderReadingState {
275 next_chunk_offset: usize,
276 output: BytesMut,
277 chunk_overflow: bool,
278}
279
280impl DecoderReadingState {
281 #[inline(always)]
282 fn new<const MAX_LEN: usize>(offset: usize) -> Self {
283 let mut this = Self {
284 next_chunk_offset: 0,
285 output: BytesMut::with_capacity(decode_buffer_cap(MAX_LEN)),
286 chunk_overflow: false,
287 };
288 this.update(offset);
289 this
290 }
291
292 #[inline(always)]
293 fn update(&mut self, offset: usize) {
294 self.next_chunk_offset = offset;
295 self.chunk_overflow = offset == MAX_RUN;
296 }
297
298 #[inline(always)]
299 fn read<const SENTINEL: u8, const MAX_LEN: usize>(
300 &mut self,
301 src: &mut BytesMut,
302 ) -> DecoderReadResult {
303 loop {
304 if src.is_empty() {
305 return DecoderReadResult::NeedMoreData;
306 }
307 // Process the remainder of a chunk.
308 if self.next_chunk_offset > 0 {
309 let len = usize::min(self.next_chunk_offset, src.len());
310 if MAX_LEN != 0 && self.output.len() + len > MAX_LEN {
311 return DecoderReadResult::FrameOverflow;
312 }
313 self.next_chunk_offset -= len;
314 let chunk = src.split_to(len);
315 if chunk.contains(&SENTINEL) {
316 return DecoderReadResult::UnexpectedSentinel;
317 }
318 self.output.put(chunk);
319 if src.is_empty() {
320 return DecoderReadResult::NeedMoreData;
321 }
322 }
323 // Process the start of a new chunk.
324 debug_assert!(self.next_chunk_offset == 0);
325 debug_assert!(!src.is_empty());
326 if let Some(offset) = decode_len::<SENTINEL>(src.get_u8()) {
327 if !self.chunk_overflow {
328 if MAX_LEN != 0 && self.output.len() == MAX_LEN {
329 return DecoderReadResult::FrameOverflow;
330 }
331 self.output.put_u8(SENTINEL);
332 }
333 self.update(offset);
334 } else {
335 // The frame is complete.
336 let capacity = decode_buffer_cap(MAX_LEN);
337 let new_output = BytesMut::with_capacity(capacity);
338 let frame = std::mem::replace(&mut self.output, new_output);
339 return DecoderReadResult::Frame(frame);
340 }
341 }
342 }
343}
344
345#[derive(Debug)]
346enum DecoderState {
347 Initial,
348 Reading(DecoderReadingState),
349 Lost,
350}
351
352/// Frame decoder.
353///
354/// This type implements [`Decoder`](tokio_util::codec::Decoder);
355/// it decodes into [`BytesMut`].
356///
357/// This type can be customized via generic parameters:\
358/// *`SENTINEL`*: Choose a byte to be used as a frame separator.
359/// The corresponding [`Encoder`] must use the same value.
360/// Refer to the crate documentation for more details on choosing a sentinel.\
361/// *`MAX_LEN`*: Choose the maximum size of a message,
362/// or set to `0` for unlimited message sizes.
363/// This parameter is used as a safety measure to prevent
364/// running out of memory. If any message exceeds this limit,
365/// decoding will return [`DecodeError::FrameOverflow`].
366#[derive(Debug)]
367pub struct Decoder<
368 const SENTINEL: u8 = DEFAULT_SENTINEL,
369 const MAX_LEN: usize = DEFAULT_MAX_LEN,
370> {
371 state: DecoderState,
372}
373
374impl<const SENTINEL: u8, const MAX_LEN: usize> Decoder<SENTINEL, MAX_LEN> {
375 /// Create a new decoder.
376 pub fn new() -> Self {
377 Self {
378 state: DecoderState::Initial,
379 }
380 }
381}
382
383impl<const SENTINEL: u8, const MAX_LEN: usize> Default
384 for Decoder<SENTINEL, MAX_LEN>
385{
386 fn default() -> Self {
387 Self::new()
388 }
389}
390
391/// Error while decoding.
392#[derive(thiserror::Error, Debug)]
393pub enum DecodeError {
394 /// An error occured while reading from the underlying IO object.
395 ///
396 /// This variant is not used by this crate itself
397 /// since decoding does not interact with IO,
398 /// but is required to implement [`Decoder`](tokio_util::codec::Decoder)
399 /// because [`FramedRead`](tokio_util::codec::FramedRead)
400 /// wraps both the decoder and the IO object and needs
401 /// to present a single error type.
402 #[error(transparent)]
403 Io(#[from] std::io::Error),
404 /// A frame was found to start with a sentinel byte.
405 ///
406 /// This variant indicates corrupted data,
407 /// either by the sender or during transmission.
408 #[error("missing frame")]
409 MissingFrame,
410 /// The sentinel byte was found in an invalid position.
411 ///
412 /// This variant indicates corrupted data,
413 /// either by the sender or during transmission.
414 #[error("unexpected sentinel")]
415 UnexpectedSentinel,
416 /// The frame was longer than the limit.
417 ///
418 /// This variant is never returned by unlimited decoders.
419 ///
420 /// Either the data was corrupted during transmission,
421 /// or the sender encoded a frame that exceeds the limit.
422 #[error("frame overflow")]
423 FrameOverflow,
424}
425
426impl<const SENTINEL: u8, const MAX_LEN: usize> tokio_util::codec::Decoder
427 for Decoder<SENTINEL, MAX_LEN>
428{
429 type Item = BytesMut;
430 type Error = DecodeError;
431
432 fn decode(
433 &mut self,
434 src: &mut BytesMut,
435 ) -> Result<Option<BytesMut>, Self::Error> {
436 loop {
437 if matches!(self.state, DecoderState::Initial) {
438 src.reserve(max_encoded_len(decode_buffer_cap(MAX_LEN)));
439 if src.is_empty() {
440 // Need more data to start a new frame.
441 return Ok(None);
442 } else if let Some(offset) =
443 decode_len::<SENTINEL>(src.get_u8())
444 {
445 // The first byte of a frame is the offset to the next sentinel
446 // value in the frame or the sentinel that marks its end.
447 let read_state =
448 DecoderReadingState::new::<MAX_LEN>(offset);
449 self.state = DecoderState::Reading(read_state);
450 } else {
451 // A frame may not start with a sentinel value.
452 //
453 // Either this is the first byte received
454 // or it follows a previous sentinal that ended the last frame.
455 //
456 // Note that this case could be used to send a signal
457 // distinct from any other message.
458 return Err(DecodeError::MissingFrame);
459 }
460 }
461 match &mut self.state {
462 DecoderState::Initial => unreachable!(),
463 DecoderState::Reading(state) => {
464 match state.read::<SENTINEL, MAX_LEN>(src) {
465 DecoderReadResult::NeedMoreData => return Ok(None),
466 DecoderReadResult::Frame(frame) => {
467 self.state = DecoderState::Initial;
468 return Ok(Some(frame));
469 }
470 DecoderReadResult::UnexpectedSentinel => {
471 self.state = DecoderState::Initial;
472 return Err(DecodeError::UnexpectedSentinel);
473 }
474 DecoderReadResult::FrameOverflow => {
475 self.state = DecoderState::Lost;
476 return Err(DecodeError::FrameOverflow);
477 }
478 }
479 }
480 DecoderState::Lost => {
481 if let Some(index) =
482 src.iter().position(|byte| *byte == SENTINEL)
483 {
484 let _ = src.split_to(index + 1);
485 let total_capacity =
486 max_encoded_len(decode_buffer_cap(MAX_LEN));
487 src.reserve(total_capacity.saturating_sub(src.len()));
488 self.state = DecoderState::Initial;
489 } else {
490 src.clear();
491 return Ok(None);
492 }
493 }
494 }
495 }
496 }
497}
498
499/// Frame codec.
500///
501/// This type contains both an [`Encoder`] and a [`Decoder`]
502/// and implements [`Encoder`](tokio_util::codec::Encoder)
503/// as well as [`Decoder`](tokio_util::codec::Decoder).
504///
505/// Refer to the underlying encoder and decoder types
506/// for details on the generic parameters.
507#[derive(Debug)]
508pub struct Codec<
509 const SENTINEL_ENCODE: u8 = DEFAULT_SENTINEL,
510 const SENTINEL_DECODE: u8 = DEFAULT_SENTINEL,
511 const MAX_LEN_ENCODE: usize = DEFAULT_MAX_LEN,
512 const MAX_LEN_DECODE: usize = DEFAULT_MAX_LEN,
513> {
514 encoder: Encoder<SENTINEL_ENCODE, MAX_LEN_ENCODE>,
515 decoder: Decoder<SENTINEL_DECODE, MAX_LEN_DECODE>,
516}
517
518impl<
519 const SENTINEL_ENCODE: u8,
520 const SENTINEL_DECODE: u8,
521 const MAX_LEN_ENCODE: usize,
522 const MAX_LEN_DECODE: usize,
523 > Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
524{
525 /// Create a new codec.
526 pub fn new() -> Self {
527 Self {
528 encoder: Encoder::new(),
529 decoder: Decoder::new(),
530 }
531 }
532}
533
534impl<
535 const SENTINEL_ENCODE: u8,
536 const SENTINEL_DECODE: u8,
537 const MAX_LEN_ENCODE: usize,
538 const MAX_LEN_DECODE: usize,
539 > Default
540 for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
541{
542 fn default() -> Self {
543 Self::new()
544 }
545}
546
547impl<
548 const SENTINEL_ENCODE: u8,
549 const SENTINEL_DECODE: u8,
550 const MAX_LEN_ENCODE: usize,
551 const MAX_LEN_DECODE: usize,
552 T: AsRef<[u8]>,
553 > tokio_util::codec::Encoder<T>
554 for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
555{
556 type Error = std::io::Error;
557
558 fn encode(
559 &mut self,
560 item: T,
561 dst: &mut BytesMut,
562 ) -> Result<(), Self::Error> {
563 self.encoder.encode(item, dst)
564 }
565}
566
567impl<
568 const SENTINEL_ENCODE: u8,
569 const SENTINEL_DECODE: u8,
570 const MAX_LEN_ENCODE: usize,
571 const MAX_LEN_DECODE: usize,
572 > tokio_util::codec::Decoder
573 for Codec<SENTINEL_ENCODE, SENTINEL_DECODE, MAX_LEN_ENCODE, MAX_LEN_DECODE>
574{
575 type Item = BytesMut;
576 type Error = DecodeError;
577
578 fn decode(
579 &mut self,
580 src: &mut BytesMut,
581 ) -> Result<Option<BytesMut>, Self::Error> {
582 self.decoder.decode(src)
583 }
584}