1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use kayrx::krse::io::{AsyncRead, AsyncWrite}; use kayrx::codec::{Decoder, Encoder, Framed2 as Framed}; use kayrx::util::mpsc::Receiver; use futures::Stream; pub struct Connect<Io, Codec> where Codec: Encoder + Decoder, { io: Io, _t: PhantomData<Codec>, } impl<Io, Codec> Connect<Io, Codec> where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { pub(crate) fn new(io: Io) -> Self { Self { io, _t: PhantomData, } } pub fn codec( self, codec: Codec, ) -> ConnectResult<Io, (), Codec, Receiver<<Codec as Encoder>::Item>> { ConnectResult { state: (), out: None, framed: Framed::new(self.io, codec), } } } #[pin_project::pin_project] pub struct ConnectResult<Io, St, Codec: Encoder + Decoder, Out> { pub(crate) state: St, pub(crate) out: Option<Out>, pub(crate) framed: Framed<Io, Codec>, } impl<Io, St, Codec: Encoder + Decoder, Out: Unpin> ConnectResult<Io, St, Codec, Out> { #[inline] pub fn get_ref(&self) -> &Io { self.framed.get_ref() } #[inline] pub fn get_mut(&mut self) -> &mut Io { self.framed.get_mut() } pub fn out<U>(self, out: U) -> ConnectResult<Io, St, Codec, U> where U: Stream<Item = <Codec as Encoder>::Item> + Unpin, { ConnectResult { state: self.state, framed: self.framed, out: Some(out), } } #[inline] pub fn state<S>(self, state: S) -> ConnectResult<Io, S, Codec, Out> { ConnectResult { state, framed: self.framed, out: self.out, } } } impl<Io, St, Codec, Out> Stream for ConnectResult<Io, St, Codec, Out> where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { type Item = Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.project().framed.next_item(cx) } } impl<Io, St, Codec, Out> futures::Sink<<Codec as Encoder>::Item> for ConnectResult<Io, St, Codec, Out> where Io: AsyncRead + AsyncWrite, Codec: Encoder + Decoder, { type Error = <Codec as Encoder>::Error; fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { if self.framed.is_write_ready() { Poll::Ready(Ok(())) } else { Poll::Pending } } fn start_send( self: Pin<&mut Self>, item: <Codec as Encoder>::Item, ) -> Result<(), Self::Error> { self.project().framed.write(item) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.get_mut().framed.flush(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { self.get_mut().framed.close(cx) } }