fluvio_socket/
stream.rs

1use std::fmt;
2use std::fmt::Debug;
3use std::io::Cursor;
4use std::io::Error as IoError;
5use std::io::ErrorKind;
6
7use fluvio_future::net::ConnectionFd;
8use fluvio_future::net::{BoxReadConnection};
9use fluvio_protocol::api::{ApiMessage, Request, RequestMessage, ResponseMessage};
10use fluvio_protocol::codec::FluvioCodec;
11use fluvio_protocol::Decoder as FluvioDecoder;
12use futures_util::stream::{Stream, StreamExt};
13use tokio_util::codec::{FramedRead};
14use tokio_util::compat::FuturesAsyncReadCompatExt;
15use tokio_util::compat::Compat;
16use tracing::debug;
17use tracing::error;
18use tracing::trace;
19
20use crate::SocketError;
21
22type FrameStream = FramedRead<Compat<BoxReadConnection>, FluvioCodec>;
23
24/// inner flv stream which is generic over stream
25pub struct FluvioStream {
26    inner: FrameStream,
27    id: ConnectionFd,
28}
29
30impl Debug for FluvioStream {
31    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32        write!(f, "Stream({})", self.id)
33    }
34}
35
36impl FluvioStream {
37    pub fn new(id: ConnectionFd, stream: BoxReadConnection) -> Self {
38        Self {
39            inner: FramedRead::new(stream.compat(), FluvioCodec::new()),
40            id,
41        }
42    }
43
44    pub fn get_mut_tcp_stream(&mut self) -> &mut FrameStream {
45        &mut self.inner
46    }
47
48    /// as server, get stream of request coming from client
49    pub fn request_stream<R>(
50        &mut self,
51    ) -> impl Stream<Item = Result<RequestMessage<R>, SocketError>> + '_
52    where
53        RequestMessage<R>: FluvioDecoder + Debug,
54    {
55        (&mut self.inner).map(|req_bytes_r| match req_bytes_r {
56            Ok(req_bytes) => {
57                let mut src = Cursor::new(&req_bytes);
58                let msg: RequestMessage<R> = RequestMessage::decode_from(&mut src, 0)?;
59                Ok(msg)
60            }
61            Err(err) => Err(SocketError::Io {
62                source: err,
63                msg: "request stream".to_string(),
64            }),
65        })
66    }
67
68    /// as server, get next request from client
69    pub async fn next_request_item<R>(&mut self) -> Option<Result<RequestMessage<R>, SocketError>>
70    where
71        RequestMessage<R>: FluvioDecoder + Debug,
72    {
73        let mut stream = self.request_stream();
74        stream.next().await
75    }
76
77    /// as client, get next response from server
78    pub async fn next_response<R>(
79        &mut self,
80        req_msg: &RequestMessage<R>,
81    ) -> Result<ResponseMessage<R::Response>, SocketError>
82    where
83        R: Request,
84    {
85        trace!(api = R::API_KEY, "waiting for response");
86        let next = self.inner.next().await;
87        if let Some(result) = next {
88            match result {
89                Ok(req_bytes) => {
90                    let response = req_msg.decode_response(
91                        &mut Cursor::new(&req_bytes),
92                        req_msg.header.api_version(),
93                    )?;
94                    trace!( len = req_bytes.len(), response = ?response,"received");
95                    Ok(response)
96                }
97                Err(source) => {
98                    error!("error receiving response: {:?}", source);
99                    Err(SocketError::Io {
100                        source,
101                        msg: "next response".to_string(),
102                    })
103                }
104            }
105        } else {
106            debug!("no more response. server has terminated connection");
107            Err(IoError::new(ErrorKind::UnexpectedEof, "server has terminated connection").into())
108        }
109    }
110
111    /// as server, get api request (PublicRequest, InternalRequest, etc)
112    pub fn api_stream<R, A>(&mut self) -> impl Stream<Item = Result<R, SocketError>> + '_
113    where
114        R: ApiMessage<ApiKey = A>,
115        A: FluvioDecoder + Debug,
116    {
117        (&mut self.inner).map(|req_bytes_r| match req_bytes_r {
118            Ok(req_bytes) => {
119                trace!("received bytes from client len: {}", req_bytes.len());
120                let mut src = Cursor::new(&req_bytes);
121                R::decode_from(&mut src).map_err(|err| err.into())
122            }
123            Err(err) => Err(SocketError::Io {
124                source: err,
125                msg: "api stream".to_string(),
126            }),
127        })
128    }
129
130    pub async fn next_api_item<R, A>(&mut self) -> Option<Result<R, SocketError>>
131    where
132        R: ApiMessage<ApiKey = A>,
133        A: FluvioDecoder + Debug,
134    {
135        let mut stream = self.api_stream();
136        stream.next().await
137    }
138
139    pub fn response_stream<R>(
140        &mut self,
141        req_msg: RequestMessage<R>,
142    ) -> impl Stream<Item = R::Response> + '_
143    where
144        R: Request,
145    {
146        let version = req_msg.header.api_version();
147        (&mut self.inner).filter_map(move |req_bytes| async move {
148            match req_bytes {
149                Ok(mut bytes) => match ResponseMessage::decode_from(&mut bytes, version) {
150                    Ok(res_msg) => {
151                        trace!("receive response: {:#?}", &res_msg);
152                        Some(res_msg.response)
153                    }
154                    Err(err) => {
155                        error!("error decoding response: {:?}", err);
156                        None
157                    }
158                },
159                Err(err) => {
160                    error!("error receiving response: {:?}", err);
161                    None
162                }
163            }
164        })
165    }
166}
167
168/*
169impl From<FrameStream> for FluvioStream {
170    fn from(stream: FrameStream) -> Self {
171        FluvioStream {stream)
172    }
173}
174*/