1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::AsyncBody;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use http::HeaderMap;
use http_body::Body;
use self_reference::{RefDef, SelfReference};
#[pin_project::pin_project]
pub struct AsyncBodyAdapter<B>
where
B: AsyncBody + 'static,
{
#[pin]
state: SelfReference<B, AdapterStateRefDef<B>>,
}
impl<B> AsyncBodyAdapter<B>
where
B: AsyncBody,
{
pub fn new(body: B) -> Self {
Self {
state: SelfReference::new(body, || AdapterState::Empty),
}
}
}
enum AdapterState<'a, B>
where
B: AsyncBody + 'a,
{
ProcessingData(Pin<Box<dyn Future<Output = Option<Result<B::Data, B::Error>>> + 'a>>),
ProcessingTrailers(Pin<Box<dyn Future<Output = Result<Option<HeaderMap>, B::Error>> + 'a>>),
Empty,
}
impl<'a, B> AdapterState<'a, B>
where
B: AsyncBody + 'a,
{
pub fn is_empty(&self) -> bool {
if let AdapterState::Empty = self {
true
} else {
false
}
}
}
struct AdapterStateRefDef<B>(core::marker::PhantomData<B>);
impl<'this, B> RefDef<'this> for AdapterStateRefDef<B>
where
for<'a> B: AsyncBody + 'a,
{
type Type = AdapterState<'this, B>;
}
impl<B> Body for AsyncBodyAdapter<B>
where
for<'a> B: AsyncBody + 'a,
{
type Data = B::Data;
type Error = B::Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut proj = self.project();
let state = proj.state.as_mut().pin_mut();
if state.is_empty() {
proj.state
.as_mut()
.reset(|v| AdapterState::ProcessingData(v.data()));
}
let mut state = proj.state.pin_mut();
if let AdapterState::ProcessingData(fut) = state.as_mut().get_mut() {
return match fut.as_mut().poll(cx) {
Poll::Ready(v) => {
state.set(AdapterState::Empty);
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
};
}
unreachable!("bad async body adapter state while polling data!");
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
let mut proj = self.project();
let state = proj.state.as_mut().pin_mut();
if state.is_empty() {
proj.state
.as_mut()
.reset(|v| AdapterState::ProcessingTrailers(v.trailers()));
}
let mut state = proj.state.pin_mut();
if let AdapterState::ProcessingTrailers(fut) = state.as_mut().get_mut() {
return match fut.as_mut().poll(cx) {
Poll::Ready(v) => {
state.set(AdapterState::Empty);
Poll::Ready(v)
}
Poll::Pending => Poll::Pending,
};
}
unreachable!("bad async body adapter state while polling data!");
}
}