exocore_transport/
streams.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{
7 channel::{mpsc, mpsc::SendError},
8 prelude::*,
9 sink::{Sink, SinkErrInto},
10 StreamExt,
11};
12use pin_project::pin_project;
13
14use crate::{Error, InEvent, OutEvent};
15
16pub struct MpscHandleStream {
19 receiver: mpsc::Receiver<InEvent>,
20}
21
22impl MpscHandleStream {
23 pub fn new(receiver: mpsc::Receiver<InEvent>) -> MpscHandleStream {
24 MpscHandleStream { receiver }
25 }
26}
27
28impl Stream for MpscHandleStream {
29 type Item = InEvent;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 self.receiver.poll_next_unpin(cx)
33 }
34}
35
36#[pin_project]
39pub struct MpscHandleSink {
40 #[pin]
41 sender: SinkErrInto<mpsc::Sender<OutEvent>, OutEvent, Error>,
42}
43
44impl MpscHandleSink {
45 pub fn new(sender: mpsc::Sender<OutEvent>) -> MpscHandleSink {
46 MpscHandleSink {
47 sender: sender.sink_err_into(),
48 }
49 }
50}
51
52impl Sink<OutEvent> for MpscHandleSink {
53 type Error = Error;
54
55 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56 self.project().sender.poll_ready(cx)
57 }
58
59 fn start_send(self: Pin<&mut Self>, item: OutEvent) -> Result<(), Self::Error> {
60 self.project().sender.start_send(item)
61 }
62
63 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 self.project().sender.poll_flush(cx)
65 }
66
67 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68 self.project().sender.poll_close(cx)
69 }
70}
71
72impl From<SendError> for Error {
73 fn from(s: SendError) -> Self {
74 Error::Other(format!("Sink error: {}", s))
75 }
76}