1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use actix_http::BoxedPayloadStream;
use actix_web::{
dev,
web::{BufMut as _, Bytes, BytesMut},
FromRequest, HttpRequest,
};
use futures_core::future::LocalBoxFuture;
use futures_util::StreamExt as _;
use local_channel::mpsc;
use tokio::try_join;
use tracing::trace;
pub(crate) fn body_extractor_fold<T, Init, Out>(
req: &HttpRequest,
payload: &mut dev::Payload,
init: Init,
mut update_fn: impl FnMut(&mut Init, &HttpRequest, Bytes) + 'static,
mut finalize_fn: impl FnMut(T, Bytes, Init) -> Out + 'static,
) -> LocalBoxFuture<'static, Result<Out, T::Error>>
where
T: FromRequest,
Init: 'static,
{
let req = req.clone();
let payload = payload.take();
Box::pin(async move {
let (tx, mut rx) = mpsc::channel();
let proxy_stream: BoxedPayloadStream = Box::pin(payload.inspect(move |res| {
if let Ok(chunk) = res {
trace!("yielding {} byte chunk", chunk.len());
tx.send(chunk.clone()).unwrap();
}
}));
trace!("creating proxy payload");
let mut proxy_payload = dev::Payload::from(proxy_stream);
let body_fut = T::from_request(&req, &mut proxy_payload);
let mut body_buf = BytesMut::new();
let hash_fut = async {
let mut accumulator = init;
while let Some(chunk) = rx.recv().await {
trace!("updating hasher with {} byte chunk", chunk.len());
body_buf.put_slice(&chunk);
update_fn(&mut accumulator, &req, chunk)
}
Ok(accumulator)
};
trace!("driving both futures");
let (body, hash) = try_join!(body_fut, hash_fut)?;
let out = (finalize_fn)(body, body_buf.freeze(), hash);
Ok(out)
})
}