1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use std::fmt::Debug;
use std::io::Cursor;
use std::io::Error as IoError;
use std::io::ErrorKind;

use fluvio_future::net::TcpStream;
use fluvio_protocol::api::{ApiMessage, Request, RequestMessage, ResponseMessage};
use fluvio_protocol::codec::FluvioCodec;
use fluvio_protocol::Decoder as FluvioDecoder;
use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::stream::{SplitStream, Stream, StreamExt};
use tokio_util::codec::Framed;
use tokio_util::compat::Compat;
use tracing::error;
use tracing::trace;

use crate::FlvSocketError;

pub type FlvStream = InnerFlvStream<TcpStream>;

#[cfg(feature = "tls")]
pub type AllFlvStream = InnerFlvStream<fluvio_future::native_tls::AllTcpStream>;

type FrameStream<S> = SplitStream<Framed<Compat<S>, FluvioCodec>>;

/// inner flv stream which is generic over stream
#[derive(Debug)]
pub struct InnerFlvStream<S>(FrameStream<S>);

impl<S> InnerFlvStream<S>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    pub fn get_mut_tcp_stream(&mut self) -> &mut FrameStream<S> {
        &mut self.0
    }

    /// as server, get stream of request coming from client
    pub fn request_stream<R>(
        &mut self,
    ) -> impl Stream<Item = Result<RequestMessage<R>, FlvSocketError>> + '_
    where
        RequestMessage<R>: FluvioDecoder + Debug,
    {
        (&mut self.0).map(|req_bytes_r| match req_bytes_r {
            Ok(req_bytes) => {
                let mut src = Cursor::new(&req_bytes);
                let msg: RequestMessage<R> = RequestMessage::decode_from(&mut src, 0)?;
                Ok(msg)
            }
            Err(err) => Err(err.into()),
        })
    }

    /// as server, get next request from client
    pub async fn next_request_item<R>(
        &mut self,
    ) -> Option<Result<RequestMessage<R>, FlvSocketError>>
    where
        RequestMessage<R>: FluvioDecoder + Debug,
    {
        let mut stream = self.request_stream();
        stream.next().await
    }

    /// as client, get next response from server
    pub async fn next_response<R>(
        &mut self,
        req_msg: &RequestMessage<R>,
    ) -> Result<ResponseMessage<R::Response>, FlvSocketError>
    where
        R: Request,
    {
        trace!("waiting for response");
        let next = self.0.next().await;
        if let Some(result) = next {
            match result {
                Ok(req_bytes) => {
                    let response = req_msg.decode_response(
                        &mut Cursor::new(&req_bytes),
                        req_msg.header.api_version(),
                    )?;
                    trace!("received {} bytes: {:#?}", req_bytes.len(), &response);
                    Ok(response)
                }
                Err(source) => {
                    error!("error receiving response: {:?}", source);
                    Err(FlvSocketError::IoError(source))
                }
            }
        } else {
            error!("no more response. server has terminated connection");
            Err(IoError::new(ErrorKind::UnexpectedEof, "server has terminated connection").into())
        }
    }

    /// as server, get api request (PublicRequest, InternalRequest, etc)
    pub fn api_stream<R, A>(&mut self) -> impl Stream<Item = Result<R, FlvSocketError>> + '_
    where
        R: ApiMessage<ApiKey = A>,
        A: FluvioDecoder + Debug,
    {
        (&mut self.0).map(|req_bytes_r| match req_bytes_r {
            Ok(req_bytes) => {
                trace!("received bytes from client len: {}", req_bytes.len());
                let mut src = Cursor::new(&req_bytes);
                R::decode_from(&mut src).map_err(|err| err.into())
            }
            Err(err) => Err(err.into()),
        })
    }

    pub async fn next_api_item<R, A>(&mut self) -> Option<Result<R, FlvSocketError>>
    where
        R: ApiMessage<ApiKey = A>,
        A: FluvioDecoder + Debug,
    {
        let mut stream = self.api_stream();
        stream.next().await
    }

    pub fn response_stream<R>(
        &mut self,
        req_msg: RequestMessage<R>,
    ) -> impl Stream<Item = R::Response> + '_
    where
        R: Request,
    {
        let version = req_msg.header.api_version();
        (&mut self.0).filter_map(move |req_bytes| async move {
            match req_bytes {
                Ok(mut bytes) => match ResponseMessage::decode_from(&mut bytes, version) {
                    Ok(res_msg) => {
                        trace!("receive response: {:#?}", &res_msg);
                        Some(res_msg.response)
                    }
                    Err(err) => {
                        error!("error decoding response: {:?}", err);
                        None
                    }
                },
                Err(err) => {
                    error!("error receiving response: {:?}", err);
                    None
                }
            }
        })
    }
}

impl<S> From<FrameStream<S>> for InnerFlvStream<S> {
    fn from(stream: FrameStream<S>) -> Self {
        InnerFlvStream(stream)
    }
}