1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
use actix_http::BoxedPayloadStream;
use actix_web::{
    dev,
    web::{BufMut as _, Bytes, BytesMut},
    FromRequest, HttpRequest,
};
use futures_core::future::LocalBoxFuture;
use futures_util::StreamExt as _;
use local_channel::mpsc;
use tokio::try_join;
use tracing::trace;

pub(crate) fn body_extractor_fold<T, Init, Out>(
    req: &HttpRequest,
    payload: &mut dev::Payload,
    init: Init,
    mut update_fn: impl FnMut(&mut Init, &HttpRequest, Bytes) + 'static,
    mut finalize_fn: impl FnMut(T, Bytes, Init) -> Out + 'static,
) -> LocalBoxFuture<'static, Result<Out, T::Error>>
where
    T: FromRequest,
    Init: 'static,
{
    let req = req.clone();
    let payload = payload.take();

    Box::pin(async move {
        let (tx, mut rx) = mpsc::channel();

        // wrap payload in stream that reads chunks and clones them (cheaply) back here
        let proxy_stream: BoxedPayloadStream = Box::pin(payload.inspect(move |res| {
            if let Ok(chunk) = res {
                trace!("yielding {} byte chunk", chunk.len());
                tx.send(chunk.clone()).unwrap();
            }
        }));

        trace!("creating proxy payload");
        let mut proxy_payload = dev::Payload::from(proxy_stream);
        let body_fut = T::from_request(&req, &mut proxy_payload);

        let mut body_buf = BytesMut::new();

        // run update function as chunks are yielded from channel
        let hash_fut = async {
            let mut accumulator = init;
            while let Some(chunk) = rx.recv().await {
                trace!("updating hasher with {} byte chunk", chunk.len());
                body_buf.put_slice(&chunk);
                update_fn(&mut accumulator, &req, chunk)
            }
            Ok(accumulator)
        };

        trace!("driving both futures");
        let (body, hash) = try_join!(body_fut, hash_fut)?;

        let out = (finalize_fn)(body, body_buf.freeze(), hash);

        Ok(out)
    })
}