1#![allow(dead_code)]
5
6use std::{
7 convert::Infallible,
8 io,
9 pin::Pin,
10 task::{Context, Poll, ready},
11};
12
13use actix_http::{BoxedPayloadStream, error::PayloadError};
14use actix_web::{dev, web::BufMut};
15use futures_core::Stream;
16use futures_util::StreamExt as _;
17use local_channel::mpsc;
18
19pub fn fork_request_payload(orig_payload: &mut dev::Payload) -> dev::Payload {
30 const TARGET: &str = concat!(module_path!(), "::fork_request_payload");
31
32 let payload = orig_payload.take();
33
34 let (tx, rx) = mpsc::channel();
35
36 let proxy_stream: BoxedPayloadStream = Box::pin(payload.inspect(move |res| {
37 match res {
38 Ok(chunk) => {
39 tracing::trace!(target: TARGET, "yielding {} byte chunk", chunk.len());
40 tx.send(Ok(chunk.clone())).unwrap();
41 }
42
43 Err(err) => tx
44 .send(Err(PayloadError::Io(io::Error::other(format!(
45 "error from original stream: {err}"
46 )))))
47 .unwrap(),
48 }
49 }));
50
51 tracing::trace!(target: TARGET, "creating proxy payload");
52 *orig_payload = dev::Payload::from(proxy_stream);
53
54 dev::Payload::Stream {
55 payload: Box::pin(rx),
56 }
57}
58
59pub(crate) struct MutWriter<'a, B>(pub(crate) &'a mut B);
66
67impl<B> MutWriter<'_, B> {
68 pub fn get_ref(&self) -> &B {
69 self.0
70 }
71}
72
73impl<B: BufMut> io::Write for MutWriter<'_, B> {
74 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
75 self.0.put_slice(buf);
76 Ok(buf.len())
77 }
78
79 fn flush(&mut self) -> io::Result<()> {
80 Ok(())
81 }
82}
83
84pin_project_lite::pin_project! {
85 pub struct InfallibleStream<S> {
87 #[pin]
88 stream: S,
89 }
90}
91
92impl<S> InfallibleStream<S> {
93 pub fn new(stream: S) -> Self {
95 Self { stream }
96 }
97}
98
99impl<S: Stream> Stream for InfallibleStream<S> {
100 type Item = Result<S::Item, Infallible>;
101
102 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103 Poll::Ready(ready!(self.project().stream.poll_next(cx)).map(Ok))
104 }
105
106 fn size_hint(&self) -> (usize, Option<usize>) {
107 self.stream.size_hint()
108 }
109}
110
111#[cfg(test)]
112#[derive(Debug, Clone, Default)]
113pub(crate) struct PollSeq<T> {
114 seq: std::collections::VecDeque<T>,
115}
116
117#[cfg(test)]
118mod poll_seq_impls {
119 use std::collections::VecDeque;
120
121 use futures_util::stream;
122
123 use super::*;
124
125 impl<T> PollSeq<T> {
126 pub fn new(seq: VecDeque<T>) -> Self {
127 Self { seq }
128 }
129 }
130
131 impl<T> PollSeq<Poll<Option<T>>> {
132 pub fn into_stream(mut self) -> impl Stream<Item = T> {
133 stream::poll_fn(move |_cx| match self.seq.pop_front() {
134 Some(item) => item,
135 None => Poll::Ready(None),
136 })
137 }
138 }
139
140 impl<T, const N: usize> From<[T; N]> for PollSeq<T> {
141 fn from(seq: [T; N]) -> Self {
142 Self::new(VecDeque::from(seq))
143 }
144 }
145}