blocking_permit/
cleaver.rs

1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures_core::stream::Stream;
7
8/// Trait for buffer types that may be split at a maximum length.
9///
10/// This is enabled via the *cleaver* feature.
11pub trait Splittable: Sized {
12    /// Split if larger than a maximum length.
13    ///
14    /// If self has length greater than the specified maximum, split it into
15    /// two, returning the new leading segment and with self mutated to
16    /// contain the remainder.
17    fn split_if(&mut self, max: usize) -> Option<Self>;
18}
19
20/// This implementation is inexpensive, relying on `Bytes::split_to` which does
21/// not copy.
22impl Splittable for Bytes {
23    fn split_if(&mut self, max: usize) -> Option<Self> {
24        if self.len() > max {
25            Some(self.split_to(max))
26        } else {
27            None
28        }
29    }
30}
31
32/// A `Stream` adapter that splits buffers from a source to a given, maximum
33/// length.
34///
35/// This may be useful to limit the amount of time spent processing each `Item`
36/// of a `Splittable` stream.  This is enabled via the *cleaver* feature.
37#[must_use = "streams do nothing unless polled"]
38pub struct Cleaver<B, E, St>
39    where B: Splittable + Unpin,
40          St: Stream<Item=Result<B, E>>
41{
42    source: St,
43    rem: Option<B>,
44    max: usize,
45}
46
47impl<B, E, St> Cleaver<B, E, St>
48    where B: Splittable + Unpin,
49          St: Stream<Item=Result<B, E>>
50{
51    /// Construct with source and maximum size to split on.
52    ///
53    /// The size to split on must be at least 1.
54    pub fn new(source: St, max: usize) -> Self {
55        assert!(max > 0);
56        Cleaver { source, rem: None, max }
57    }
58}
59
60impl<B, E, St> Stream for Cleaver<B, E, St>
61    where B: Splittable + Unpin,
62          St: Stream<Item=Result<B, E>>
63{
64    type Item = Result<B, E>;
65
66    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
67        -> Poll<Option<Self::Item>>
68    {
69        // Safety: This is for projection to src below, which is exclusively
70        // owned by this wrapper and never moved. The `unsafe` could be
71        // avoided, but at the cost of requiring the source stream be `Unpin`.
72        let this = unsafe { self.get_unchecked_mut() };
73
74        if let Some(ref mut b) = this.rem {
75            match b.split_if(this.max) {
76                Some(l) => {
77                    return Poll::Ready(Some(Ok(l)))
78                }
79                None => {
80                    return Poll::Ready(Some(Ok(this.rem.take().unwrap())));
81                }
82            }
83        }
84
85        let src = unsafe { Pin::new_unchecked(&mut this.source) };
86        match src.poll_next(cx) {
87            Poll::Ready(Some(Ok(b))) => {
88                this.rem = Some(b);
89                // recurse for proper waking
90                let this = unsafe { Pin::new_unchecked(this) };
91                this.poll_next(cx)
92            }
93            other => other,
94        }
95    }
96}
97
98impl<B, E, St> fmt::Debug for Cleaver<B, E, St>
99    where B: Splittable + Unpin,
100          St: Stream<Item=Result<B, E>> + fmt::Debug
101{
102    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103        f.debug_struct("Cleaver")
104            .field("source", &self.source)
105            .field(
106                "rem",
107                if self.rem.is_none() {
108                    &"None"
109                } else {
110                    &"Some(...)"
111                }
112            )
113            .field("max", &self.max)
114            .finish()
115    }
116}