blocking_permit/
cleaver.rs1use std::fmt;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures_core::stream::Stream;
7
8pub trait Splittable: Sized {
12 fn split_if(&mut self, max: usize) -> Option<Self>;
18}
19
20impl Splittable for Bytes {
23 fn split_if(&mut self, max: usize) -> Option<Self> {
24 if self.len() > max {
25 Some(self.split_to(max))
26 } else {
27 None
28 }
29 }
30}
31
32#[must_use = "streams do nothing unless polled"]
38pub struct Cleaver<B, E, St>
39 where B: Splittable + Unpin,
40 St: Stream<Item=Result<B, E>>
41{
42 source: St,
43 rem: Option<B>,
44 max: usize,
45}
46
47impl<B, E, St> Cleaver<B, E, St>
48 where B: Splittable + Unpin,
49 St: Stream<Item=Result<B, E>>
50{
51 pub fn new(source: St, max: usize) -> Self {
55 assert!(max > 0);
56 Cleaver { source, rem: None, max }
57 }
58}
59
60impl<B, E, St> Stream for Cleaver<B, E, St>
61 where B: Splittable + Unpin,
62 St: Stream<Item=Result<B, E>>
63{
64 type Item = Result<B, E>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
67 -> Poll<Option<Self::Item>>
68 {
69 let this = unsafe { self.get_unchecked_mut() };
73
74 if let Some(ref mut b) = this.rem {
75 match b.split_if(this.max) {
76 Some(l) => {
77 return Poll::Ready(Some(Ok(l)))
78 }
79 None => {
80 return Poll::Ready(Some(Ok(this.rem.take().unwrap())));
81 }
82 }
83 }
84
85 let src = unsafe { Pin::new_unchecked(&mut this.source) };
86 match src.poll_next(cx) {
87 Poll::Ready(Some(Ok(b))) => {
88 this.rem = Some(b);
89 let this = unsafe { Pin::new_unchecked(this) };
91 this.poll_next(cx)
92 }
93 other => other,
94 }
95 }
96}
97
98impl<B, E, St> fmt::Debug for Cleaver<B, E, St>
99 where B: Splittable + Unpin,
100 St: Stream<Item=Result<B, E>> + fmt::Debug
101{
102 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103 f.debug_struct("Cleaver")
104 .field("source", &self.source)
105 .field(
106 "rem",
107 if self.rem.is_none() {
108 &"None"
109 } else {
110 &"Some(...)"
111 }
112 )
113 .field("max", &self.max)
114 .finish()
115 }
116}