scrappy_utils/
stream.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use scrappy_service::{IntoService, Service};
6use futures::{FutureExt, Stream};
7
8use crate::mpsc;
9
10#[pin_project::pin_project]
11pub struct Dispatcher<S, T>
12where
13    S: Stream,
14    T: Service<Request = S::Item, Response = ()> + 'static,
15{
16    #[pin]
17    stream: S,
18    service: T,
19    err_rx: mpsc::Receiver<T::Error>,
20    err_tx: mpsc::Sender<T::Error>,
21}
22
23impl<S, T> Dispatcher<S, T>
24where
25    S: Stream,
26    T: Service<Request = S::Item, Response = ()> + 'static,
27{
28    pub fn new<F>(stream: S, service: F) -> Self
29    where
30        F: IntoService<T>,
31    {
32        let (err_tx, err_rx) = mpsc::channel();
33        Dispatcher {
34            err_rx,
35            err_tx,
36            stream,
37            service: service.into_service(),
38        }
39    }
40}
41
42impl<S, T> Future for Dispatcher<S, T>
43where
44    S: Stream,
45    T: Service<Request = S::Item, Response = ()> + 'static,
46{
47    type Output = Result<(), T::Error>;
48
49    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50        let mut this = self.as_mut().project();
51
52        if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) {
53            return Poll::Ready(Err(e));
54        }
55
56        loop {
57            return match this.service.poll_ready(cx)? {
58                Poll::Ready(_) => match this.stream.poll_next(cx) {
59                    Poll::Ready(Some(item)) => {
60                        let stop = this.err_tx.clone();
61                        scrappy_rt::spawn(this.service.call(item).map(move |res| {
62                            if let Err(e) = res {
63                                let _ = stop.send(e);
64                            }
65                        }));
66                        this = self.as_mut().project();
67                        continue;
68                    }
69                    Poll::Pending => Poll::Pending,
70                    Poll::Ready(None) => Poll::Ready(Ok(())),
71                },
72                Poll::Pending => Poll::Pending,
73            };
74        }
75    }
76}