1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use scrappy_service::{IntoService, Service};
6use futures::{FutureExt, Stream};
7
8use crate::mpsc;
9
10#[pin_project::pin_project]
11pub struct Dispatcher<S, T>
12where
13 S: Stream,
14 T: Service<Request = S::Item, Response = ()> + 'static,
15{
16 #[pin]
17 stream: S,
18 service: T,
19 err_rx: mpsc::Receiver<T::Error>,
20 err_tx: mpsc::Sender<T::Error>,
21}
22
23impl<S, T> Dispatcher<S, T>
24where
25 S: Stream,
26 T: Service<Request = S::Item, Response = ()> + 'static,
27{
28 pub fn new<F>(stream: S, service: F) -> Self
29 where
30 F: IntoService<T>,
31 {
32 let (err_tx, err_rx) = mpsc::channel();
33 Dispatcher {
34 err_rx,
35 err_tx,
36 stream,
37 service: service.into_service(),
38 }
39 }
40}
41
42impl<S, T> Future for Dispatcher<S, T>
43where
44 S: Stream,
45 T: Service<Request = S::Item, Response = ()> + 'static,
46{
47 type Output = Result<(), T::Error>;
48
49 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
50 let mut this = self.as_mut().project();
51
52 if let Poll::Ready(Some(e)) = Pin::new(&mut this.err_rx).poll_next(cx) {
53 return Poll::Ready(Err(e));
54 }
55
56 loop {
57 return match this.service.poll_ready(cx)? {
58 Poll::Ready(_) => match this.stream.poll_next(cx) {
59 Poll::Ready(Some(item)) => {
60 let stop = this.err_tx.clone();
61 scrappy_rt::spawn(this.service.call(item).map(move |res| {
62 if let Err(e) = res {
63 let _ = stop.send(e);
64 }
65 }));
66 this = self.as_mut().project();
67 continue;
68 }
69 Poll::Pending => Poll::Pending,
70 Poll::Ready(None) => Poll::Ready(Ok(())),
71 },
72 Poll::Pending => Poll::Pending,
73 };
74 }
75 }
76}