Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12};
13use sync_wrapper::SyncWrapper;
14
15/// Streaming body source that can optionally produce trailers.
16///
17/// Implement this on types that compute trailer headers dynamically as the body
18/// is read — for example, a hashing wrapper that produces a `Digest` trailer
19/// after all bytes have been streamed. For plain [`AsyncRead`] sources with no
20/// trailers, [`Body::new_streaming`] is simpler.
21pub trait BodySource: AsyncRead + Send + 'static {
22    /// Returns the trailers for this body, called after the body has been fully read.
23    ///
24    /// Implementations may clear internal state on this call; the result is
25    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
26    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
27}
28
29pin_project! {
30    struct PlainBody<T> {
31        #[pin]
32        async_read: T,
33    }
34}
35
36impl<T: AsyncRead> AsyncRead for PlainBody<T> {
37    fn poll_read(
38        self: Pin<&mut Self>,
39        cx: &mut Context<'_>,
40        buf: &mut [u8],
41    ) -> Poll<Result<usize>> {
42        self.project().async_read.poll_read(cx, buf)
43    }
44}
45
46impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
47    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
48        None
49    }
50}
51
52/// The trillium representation of a http body. This can contain
53/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
54/// [`AsyncRead`]/[`BodySource`] type.
55#[derive(Debug, Default)]
56pub struct Body(pub(crate) BodyType);
57
58impl Body {
59    /// Construct a new body from a streaming [`AsyncRead`] source. If
60    /// you have the body content in memory already, prefer
61    /// [`Body::new_static`] or one of the From conversions.
62    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
63        Self::new_with_trailers(PlainBody { async_read }, len)
64    }
65
66    /// Construct a new body from a [`BodySource`] that can produce trailers after
67    /// the body has been fully read.
68    ///
69    /// Use this when trailers must be computed dynamically from the body bytes,
70    /// for example to append a content hash.
71    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
72        Self(Streaming {
73            async_read: SyncWrapper::new(Box::pin(body)),
74            len,
75            done: false,
76            progress: 0,
77            chunked_framing: true,
78            keep_open: false,
79        })
80    }
81
82    /// Disable chunked-encoding framing emitted by [`AsyncRead`] for streaming bodies
83    /// of unknown length.
84    ///
85    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
86    /// implementation emits chunked framing so the h1 codec can write its bytes directly.
87    /// That framing is wrong for any consumer that wants raw body bytes.
88    #[doc(hidden)]
89    #[cfg(feature = "unstable")]
90    #[must_use]
91    pub fn without_chunked_framing(mut self) -> Self {
92        if let Streaming {
93            ref mut chunked_framing,
94            ..
95        } = self.0
96        {
97            *chunked_framing = false;
98        }
99        self
100    }
101
102    /// Normalize this body to an open, chunk-framed stream: its content goes out as
103    /// chunked-transfer chunks with **no** terminating `0\r\n`, leaving the outbound
104    /// stream open for a following upgrade to continue and eventually close.
105    ///
106    /// Fixed-length content (`Static`, or a streaming body with a known length) is
107    /// re-sourced through the chunked path so it too flows as ordinary chunks rather than
108    /// raw bytes — the caller doesn't have to hand-wrap it in a length-less streaming body.
109    /// An empty body stays empty (it contributes no bytes; the upgrade owns the whole
110    /// stream).
111    ///
112    /// The send site that consumes the body is responsible for *not* writing the
113    /// trailer-section terminator either; trailers (if any) ride onto the upgrade.
114    #[doc(hidden)]
115    #[cfg(feature = "unstable")]
116    #[must_use]
117    pub fn keep_open(mut self) -> Self {
118        self.set_keep_open();
119        self
120    }
121
122    /// In-crate counterpart to [`keep_open`](Self::keep_open) — the server send path sets
123    /// this from `should_upgrade()` and can't reach the `unstable`-gated public builder.
124    pub(crate) fn set_keep_open(&mut self) {
125        // Re-source fixed content through the chunked streaming path so it goes out as
126        // chunks instead of raw bytes. Streaming bodies are left in place (preserving their
127        // `BodySource`, hence any trailers) and just have their framing flags flipped below.
128        if matches!(self.0, Static { .. }) {
129            let reader = std::mem::take(self).into_reader();
130            *self = Self::new_streaming(reader, None);
131        }
132
133        if let Streaming {
134            ref mut len,
135            ref mut chunked_framing,
136            ref mut keep_open,
137            ..
138        } = self.0
139        {
140            *len = None;
141            *chunked_framing = true;
142            *keep_open = true;
143        }
144    }
145
146    /// Set whether this body's [`AsyncRead`] impl emits chunked framing for the
147    /// `len: None` case. The h1 send path drives this from the finalized response
148    /// headers: `chunked` when `Transfer-Encoding: chunked` is present, raw passthrough
149    /// (close-delimited) when neither it nor `Content-Length` is. Has no effect on
150    /// fixed-length or static bodies, which never chunk.
151    pub(crate) fn set_chunked_framing(&mut self, on: bool) {
152        if let Streaming {
153            ref mut chunked_framing,
154            ..
155        } = self.0
156        {
157            *chunked_framing = on;
158        }
159    }
160
161    /// Returns trailers from the body source, if any.
162    ///
163    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
164    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
165    /// [`Body::new_streaming`] or [`Body::new_static`].
166    #[doc(hidden)]
167    pub fn trailers(&mut self) -> Option<Headers> {
168        match &mut self.0 {
169            Streaming {
170                async_read, done, ..
171            } if *done => async_read.get_mut().as_mut().trailers(),
172            _ => None,
173        }
174    }
175
176    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
177    /// [u8]`.
178    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
179        Self(Static {
180            content: StaticContent::Cow(content.into()),
181            cursor: 0,
182        })
183    }
184
185    /// Retrieve a borrow of the static content in this body. If this
186    /// body is a streaming body or an empty body, this will return
187    /// None.
188    pub fn static_bytes(&self) -> Option<&[u8]> {
189        match &self.0 {
190            Static { content, .. } => Some(content.as_ref()),
191            _ => None,
192        }
193    }
194
195    /// Transform this Body into a dyn [`AsyncRead`], wrapping static content in
196    /// a [`Cursor`]. Unlike reading from the Body directly, this does not apply
197    /// chunked encoding.
198    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
199        match self.0 {
200            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
201            Static { content, .. } => Box::pin(Cursor::new(content)),
202            Empty => Box::pin(Cursor::new("")),
203        }
204    }
205
206    /// Consume this body and return the full content. If the body was constructed
207    /// with [`Body::new_streaming`], this will read the entire streaming body into
208    /// memory, awaiting the streaming source's completion.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if the underlying transport errors, or if a streaming body
213    /// has already been partially or fully read.
214    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
215        match self.0 {
216            Static { content, .. } => Ok(content.into_cow()),
217
218            Streaming {
219                async_read,
220                len,
221                progress: 0,
222                done: false,
223                ..
224            } => {
225                let mut async_read = async_read.into_inner();
226                let mut buf = len
227                    .and_then(|c| c.try_into().ok())
228                    .map(Vec::with_capacity)
229                    .unwrap_or_default();
230
231                async_read.read_to_end(&mut buf).await?;
232
233                Ok(Cow::Owned(buf))
234            }
235
236            Empty => Ok(Cow::Borrowed(b"")),
237
238            Streaming { .. } => Err(Error::other("body already read to completion")),
239        }
240    }
241
242    /// Retrieve the number of bytes that have been read from this
243    /// body
244    pub fn bytes_read(&self) -> u64 {
245        self.0.bytes_read()
246    }
247
248    /// returns the content length of this body, if known and
249    /// available.
250    pub fn len(&self) -> Option<u64> {
251        self.0.len()
252    }
253
254    /// determine if the this body represents no data
255    pub fn is_empty(&self) -> bool {
256        self.0.is_empty()
257    }
258
259    /// determine if the this body represents static content
260    pub fn is_static(&self) -> bool {
261        matches!(self.0, Static { .. })
262    }
263
264    /// determine if the this body represents streaming content
265    pub fn is_streaming(&self) -> bool {
266        matches!(self.0, Streaming { .. })
267    }
268
269    /// Consume this body and return its underlying [`BodySource`], if it is a streaming body.
270    ///
271    /// Returns `None` for static and empty bodies, whose content is already in memory and whose
272    /// length is already known. This is the extraction point a sender needs to buffer a
273    /// streaming body while preserving its trailer-producing source — unlike
274    /// [`into_reader`](Self::into_reader), which erases the source to a plain `AsyncRead`.
275    #[cfg(feature = "unstable")]
276    #[doc(hidden)]
277    pub fn into_body_source(self) -> Option<Pin<Box<dyn BodySource>>> {
278        match self.0 {
279            Streaming { async_read, .. } => Some(async_read.into_inner()),
280            _ => None,
281        }
282    }
283
284    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
285    ///
286    /// Static bodies clone cheaply — a `Cow` clone, which is a pointer copy for borrowed
287    /// `&'static` content and a `Vec` clone for owned content. The clone resets read
288    /// progress, so it can be sent again from the beginning. Empty bodies always clone
289    /// successfully.
290    #[doc(hidden)]
291    #[cfg(feature = "unstable")]
292    pub fn try_clone(&self) -> Option<Self> {
293        match &self.0 {
294            Empty => Some(Self::default()),
295            Static { content, .. } => Some(Self(Static {
296                content: content.clone(),
297                cursor: 0,
298            })),
299            Streaming { .. } => None,
300        }
301    }
302
303    /// Convert this body into an `H3Body` for reading
304    #[cfg(feature = "unstable")]
305    pub fn into_h3(self) -> H3Body {
306        H3Body::new(self)
307    }
308
309    /// Convert this body into an `H3Body` for reading
310    #[cfg(not(feature = "unstable"))]
311    pub(crate) fn into_h3(self) -> H3Body {
312        H3Body::new(self)
313    }
314
315    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
316    ///
317    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
318    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
319    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
320    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
321    pub(crate) fn into_h2(self) -> H2Body {
322        H2Body::new(self)
323    }
324}
325
326#[allow(
327    clippy::cast_sign_loss,
328    clippy::cast_possible_truncation,
329    clippy::cast_precision_loss,
330    reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
331              the subtraction always yields a non-negative usize-representable value"
332)]
333fn max_bytes_to_read(buf_len: usize) -> usize {
334    assert!(
335        buf_len >= 6,
336        "buffers of length {buf_len} are too small for this implementation.
337            if this is a problem for you, please open an issue"
338    );
339
340    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
341    // maximum number of bytes the hex representation of the remaining bytes might take
342    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
343    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
344}
345
346impl AsyncRead for Body {
347    fn poll_read(
348        mut self: Pin<&mut Self>,
349        cx: &mut Context<'_>,
350        buf: &mut [u8],
351    ) -> Poll<Result<usize>> {
352        match &mut self.0 {
353            Empty => Poll::Ready(Ok(0)),
354            Static { content, cursor } => {
355                let length = content.len();
356                if length == *cursor {
357                    return Poll::Ready(Ok(0));
358                }
359                let bytes = (length - *cursor).min(buf.len());
360                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
361                *cursor += bytes;
362                Poll::Ready(Ok(bytes))
363            }
364
365            Streaming {
366                async_read,
367                len: Some(len),
368                done,
369                progress,
370                ..
371            } => {
372                if *done {
373                    return Poll::Ready(Ok(0));
374                }
375
376                let max_bytes_to_read = (*len - *progress)
377                    .try_into()
378                    .unwrap_or(buf.len())
379                    .min(buf.len());
380
381                let bytes = ready!(
382                    async_read
383                        .get_mut()
384                        .as_mut()
385                        .poll_read(cx, &mut buf[..max_bytes_to_read])
386                )?;
387
388                if bytes == 0 {
389                    *done = true;
390                } else {
391                    *progress += bytes as u64;
392                }
393
394                Poll::Ready(Ok(bytes))
395            }
396
397            Streaming {
398                async_read,
399                len: None,
400                done,
401                progress,
402                chunked_framing,
403                keep_open,
404            } => {
405                if *done {
406                    return Poll::Ready(Ok(0));
407                }
408
409                if !*chunked_framing {
410                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
411                    if bytes == 0 {
412                        *done = true;
413                    } else {
414                        *progress += bytes as u64;
415                    }
416                    return Poll::Ready(Ok(bytes));
417                }
418
419                let max_bytes_to_read = max_bytes_to_read(buf.len());
420
421                let bytes = ready!(
422                    async_read
423                        .get_mut()
424                        .as_mut()
425                        .poll_read(cx, &mut buf[..max_bytes_to_read])
426                )?;
427
428                if bytes == 0 {
429                    *done = true;
430                    if *keep_open {
431                        // The outbound stream continues into an upgrade; the upgrade owns
432                        // the terminator. Emit no last-chunk marker.
433                        return Poll::Ready(Ok(0));
434                    }
435                    // Last-chunk marker only; the caller emits the trailer-section
436                    // (possibly empty) followed by the terminating `\r\n`. Trailers come
437                    // from `BodySource::trailers()` as structured `Headers`, not bytes,
438                    // and the caller writes them in one shot so this path doesn't need
439                    // a multi-poll state machine spanning buffers.
440                    buf[..3].copy_from_slice(b"0\r\n");
441                    return Poll::Ready(Ok(3));
442                }
443
444                *progress += bytes as u64;
445
446                let start = format!("{bytes:X}\r\n");
447                let start_length = start.len();
448                let total = bytes + start_length + 2;
449                buf.copy_within(..bytes, start_length);
450                buf[..start_length].copy_from_slice(start.as_bytes());
451                buf[total - 2..total].copy_from_slice(b"\r\n");
452                Poll::Ready(Ok(total))
453            }
454        }
455    }
456}
457
458struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
459impl Debug for SyncAsyncReader {
460    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
461        f.debug_struct("SyncAsyncReader").finish()
462    }
463}
464impl AsyncRead for SyncAsyncReader {
465    fn poll_read(
466        self: Pin<&mut Self>,
467        cx: &mut Context<'_>,
468        buf: &mut [u8],
469    ) -> Poll<Result<usize>> {
470        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
471    }
472}
473
474/// In-memory fixed-length body content. Each variant is cheap to clone: borrowed and
475/// shared variants copy a pointer, and the owned `Cow` variant clones its `Vec`.
476#[derive(Clone)]
477pub(crate) enum StaticContent {
478    Cow(Cow<'static, [u8]>),
479    Bytes(Arc<[u8]>),
480    Str(Arc<str>),
481}
482
483impl std::ops::Deref for StaticContent {
484    type Target = [u8];
485
486    fn deref(&self) -> &[u8] {
487        match self {
488            StaticContent::Cow(content) => content,
489            StaticContent::Bytes(content) => content,
490            StaticContent::Str(content) => content.as_bytes(),
491        }
492    }
493}
494
495impl AsRef<[u8]> for StaticContent {
496    fn as_ref(&self) -> &[u8] {
497        self
498    }
499}
500
501impl StaticContent {
502    /// Materialize as an owned `Cow`. The `Cow` variant passes through without copying;
503    /// the shared variants copy their bytes into a `Vec`.
504    fn into_cow(self) -> Cow<'static, [u8]> {
505        match self {
506            StaticContent::Cow(content) => content,
507            other => Cow::Owned(other.to_vec()),
508        }
509    }
510}
511
512#[derive(Default)]
513pub(crate) enum BodyType {
514    #[default]
515    Empty,
516
517    Static {
518        content: StaticContent,
519        cursor: usize,
520    },
521
522    Streaming {
523        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
524        progress: u64,
525        len: Option<u64>,
526        done: bool,
527        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits chunked
528        /// framing for the `len: None` case; when false (via
529        /// [`Body::without_chunked_framing`]), it passes through raw bytes.
530        chunked_framing: bool,
531        /// When true (via [`Body::keep_open`]), the chunked `len: None` read does not
532        /// emit the `0\r\n` last-chunk marker at EOF — the outbound stream is left open
533        /// for a following upgrade to terminate. Default false.
534        keep_open: bool,
535    },
536}
537
538impl Debug for BodyType {
539    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
540        match self {
541            Empty => f.debug_tuple("BodyType::Empty").finish(),
542            Static { content, cursor } => f
543                .debug_struct("BodyType::Static")
544                .field("content", &String::from_utf8_lossy(content))
545                .field("cursor", cursor)
546                .finish(),
547            Streaming {
548                len,
549                done,
550                progress,
551                ..
552            } => f
553                .debug_struct("BodyType::Streaming")
554                .field("async_read", &format_args!(".."))
555                .field("len", &len)
556                .field("done", &done)
557                .field("progress", &progress)
558                .finish(),
559        }
560    }
561}
562
563impl BodyType {
564    fn is_empty(&self) -> bool {
565        match *self {
566            Empty => true,
567            Static { ref content, .. } => content.is_empty(),
568            Streaming { len, .. } => len == Some(0),
569        }
570    }
571
572    fn len(&self) -> Option<u64> {
573        match *self {
574            Empty => Some(0),
575            Static { ref content, .. } => Some(content.len() as u64),
576            Streaming { len, .. } => len,
577        }
578    }
579
580    fn bytes_read(&self) -> u64 {
581        match *self {
582            Empty => 0,
583            Static { cursor, .. } => cursor as u64,
584            Streaming { progress, .. } => progress,
585        }
586    }
587}
588
589impl From<String> for Body {
590    fn from(s: String) -> Self {
591        s.into_bytes().into()
592    }
593}
594
595impl From<&'static str> for Body {
596    fn from(s: &'static str) -> Self {
597        s.as_bytes().into()
598    }
599}
600
601impl From<&'static [u8]> for Body {
602    fn from(content: &'static [u8]) -> Self {
603        Self::new_static(content)
604    }
605}
606
607impl From<Vec<u8>> for Body {
608    fn from(content: Vec<u8>) -> Self {
609        Self::new_static(content)
610    }
611}
612
613impl From<Cow<'static, [u8]>> for Body {
614    fn from(value: Cow<'static, [u8]>) -> Self {
615        Self::new_static(value)
616    }
617}
618
619impl From<Cow<'static, str>> for Body {
620    fn from(value: Cow<'static, str>) -> Self {
621        match value {
622            Cow::Borrowed(b) => b.into(),
623            Cow::Owned(o) => o.into(),
624        }
625    }
626}
627
628impl From<Arc<[u8]>> for Body {
629    fn from(content: Arc<[u8]>) -> Self {
630        Self(Static {
631            content: StaticContent::Bytes(content),
632            cursor: 0,
633        })
634    }
635}
636
637impl From<Arc<str>> for Body {
638    fn from(content: Arc<str>) -> Self {
639        Self(Static {
640            content: StaticContent::Str(content),
641            cursor: 0,
642        })
643    }
644}
645
646#[cfg(test)]
647mod test_shared_content {
648    use super::Body;
649    use futures_lite::future::block_on;
650    use std::sync::Arc;
651
652    #[test]
653    fn arc_bytes_roundtrips() {
654        let arc: Arc<[u8]> = Arc::from(&b"shared bytes"[..]);
655        let body = Body::from(Arc::clone(&arc));
656        assert_eq!(body.len(), Some(12));
657        assert_eq!(body.static_bytes(), Some(&b"shared bytes"[..]));
658        assert_eq!(
659            block_on(body.into_bytes()).unwrap().as_ref(),
660            b"shared bytes"
661        );
662        // the source Arc is still usable — the body shared, not consumed, the buffer
663        assert_eq!(&*arc, b"shared bytes");
664    }
665
666    #[test]
667    fn arc_str_roundtrips() {
668        let arc: Arc<str> = Arc::from("shared str");
669        let body = Body::from(arc);
670        assert_eq!(body.len(), Some(10));
671        assert_eq!(body.static_bytes(), Some(&b"shared str"[..]));
672        assert_eq!(block_on(body.into_bytes()).unwrap().as_ref(), b"shared str");
673    }
674
675    #[cfg(feature = "unstable")]
676    #[test]
677    fn shared_body_clones_without_copying_the_arc() {
678        let arc: Arc<[u8]> = Arc::from(&b"abc"[..]);
679        let body = Body::from(Arc::clone(&arc));
680        let clone = body.try_clone().expect("static bodies clone");
681        assert_eq!(clone.static_bytes(), Some(&b"abc"[..]));
682        // original + body + clone all reference the same allocation
683        assert_eq!(Arc::strong_count(&arc), 3);
684    }
685}
686
687#[cfg(test)]
688mod test_bytes_to_read {
689    #[test]
690    fn simple_check_of_known_values() {
691        // the marked rows are the most important part of this test,
692        // and a nonobvious but intentional consequence of the
693        // implementation. in order to avoid overflowing, we must use
694        // one fewer than the available buffer bytes because
695        // increasing the read size increase the number of framed
696        // bytes by two. This occurs when the hex representation of
697        // the content bytes is near an increase in order of magnitude
698        // (F->10, FF->100, FFF-> 1000, etc)
699        let values = vec![
700            (6, 1),       // 1
701            (7, 2),       // 2
702            (20, 15),     // F
703            (21, 15),     // F <-
704            (22, 16),     // 10
705            (23, 17),     // 11
706            (260, 254),   // FE
707            (261, 254),   // FE <-
708            (262, 255),   // FF <-
709            (263, 256),   // 100
710            (4100, 4093), // FFD
711            (4101, 4093), // FFD <-
712            (4102, 4094), // FFE <-
713            (4103, 4095), // FFF <-
714            (4104, 4096), // 1000
715        ];
716
717        for (input, expected) in values {
718            let actual = super::max_bytes_to_read(input);
719            assert_eq!(
720                actual, expected,
721                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
722            );
723
724            // testing the test:
725            let used_bytes = expected + 4 + format!("{expected:X}").len();
726            assert!(
727                used_bytes == input || used_bytes == input - 1,
728                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
729                input,
730                input,
731                input - 1,
732                used_bytes
733            );
734        }
735    }
736}