exocore_transport/
streams.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{
7    channel::{mpsc, mpsc::SendError},
8    prelude::*,
9    sink::{Sink, SinkErrInto},
10    StreamExt,
11};
12use pin_project::pin_project;
13
14use crate::{Error, InEvent, OutEvent};
15
16/// Wraps mpsc Stream channel to map Transport's error without having a
17/// convoluted type
18pub struct MpscHandleStream {
19    receiver: mpsc::Receiver<InEvent>,
20}
21
22impl MpscHandleStream {
23    pub fn new(receiver: mpsc::Receiver<InEvent>) -> MpscHandleStream {
24        MpscHandleStream { receiver }
25    }
26}
27
28impl Stream for MpscHandleStream {
29    type Item = InEvent;
30
31    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        self.receiver.poll_next_unpin(cx)
33    }
34}
35
36/// Wraps mpsc Sink channel to map Transport's error without having a convoluted
37/// type
38#[pin_project]
39pub struct MpscHandleSink {
40    #[pin]
41    sender: SinkErrInto<mpsc::Sender<OutEvent>, OutEvent, Error>,
42}
43
44impl MpscHandleSink {
45    pub fn new(sender: mpsc::Sender<OutEvent>) -> MpscHandleSink {
46        MpscHandleSink {
47            sender: sender.sink_err_into(),
48        }
49    }
50}
51
52impl Sink<OutEvent> for MpscHandleSink {
53    type Error = Error;
54
55    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56        self.project().sender.poll_ready(cx)
57    }
58
59    fn start_send(self: Pin<&mut Self>, item: OutEvent) -> Result<(), Self::Error> {
60        self.project().sender.start_send(item)
61    }
62
63    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        self.project().sender.poll_flush(cx)
65    }
66
67    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68        self.project().sender.poll_close(cx)
69    }
70}
71
72impl From<SendError> for Error {
73    fn from(s: SendError) -> Self {
74        Error::Other(format!("Sink error: {}", s))
75    }
76}