1use std::fmt;
2use std::fmt::Debug;
3use std::io::Cursor;
4use std::io::Error as IoError;
5use std::io::ErrorKind;
6
7use fluvio_future::net::ConnectionFd;
8use fluvio_future::net::{BoxReadConnection};
9use fluvio_protocol::api::{ApiMessage, Request, RequestMessage, ResponseMessage};
10use fluvio_protocol::codec::FluvioCodec;
11use fluvio_protocol::Decoder as FluvioDecoder;
12use futures_util::stream::{Stream, StreamExt};
13use tokio_util::codec::{FramedRead};
14use tokio_util::compat::FuturesAsyncReadCompatExt;
15use tokio_util::compat::Compat;
16use tracing::debug;
17use tracing::error;
18use tracing::trace;
19
20use crate::SocketError;
21
22type FrameStream = FramedRead<Compat<BoxReadConnection>, FluvioCodec>;
23
24pub struct FluvioStream {
26 inner: FrameStream,
27 id: ConnectionFd,
28}
29
30impl Debug for FluvioStream {
31 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
32 write!(f, "Stream({})", self.id)
33 }
34}
35
36impl FluvioStream {
37 pub fn new(id: ConnectionFd, stream: BoxReadConnection) -> Self {
38 Self {
39 inner: FramedRead::new(stream.compat(), FluvioCodec::new()),
40 id,
41 }
42 }
43
44 pub fn get_mut_tcp_stream(&mut self) -> &mut FrameStream {
45 &mut self.inner
46 }
47
48 pub fn request_stream<R>(
50 &mut self,
51 ) -> impl Stream<Item = Result<RequestMessage<R>, SocketError>> + '_
52 where
53 RequestMessage<R>: FluvioDecoder + Debug,
54 {
55 (&mut self.inner).map(|req_bytes_r| match req_bytes_r {
56 Ok(req_bytes) => {
57 let mut src = Cursor::new(&req_bytes);
58 let msg: RequestMessage<R> = RequestMessage::decode_from(&mut src, 0)?;
59 Ok(msg)
60 }
61 Err(err) => Err(SocketError::Io {
62 source: err,
63 msg: "request stream".to_string(),
64 }),
65 })
66 }
67
68 pub async fn next_request_item<R>(&mut self) -> Option<Result<RequestMessage<R>, SocketError>>
70 where
71 RequestMessage<R>: FluvioDecoder + Debug,
72 {
73 let mut stream = self.request_stream();
74 stream.next().await
75 }
76
77 pub async fn next_response<R>(
79 &mut self,
80 req_msg: &RequestMessage<R>,
81 ) -> Result<ResponseMessage<R::Response>, SocketError>
82 where
83 R: Request,
84 {
85 trace!(api = R::API_KEY, "waiting for response");
86 let next = self.inner.next().await;
87 if let Some(result) = next {
88 match result {
89 Ok(req_bytes) => {
90 let response = req_msg.decode_response(
91 &mut Cursor::new(&req_bytes),
92 req_msg.header.api_version(),
93 )?;
94 trace!( len = req_bytes.len(), response = ?response,"received");
95 Ok(response)
96 }
97 Err(source) => {
98 error!("error receiving response: {:?}", source);
99 Err(SocketError::Io {
100 source,
101 msg: "next response".to_string(),
102 })
103 }
104 }
105 } else {
106 debug!("no more response. server has terminated connection");
107 Err(IoError::new(ErrorKind::UnexpectedEof, "server has terminated connection").into())
108 }
109 }
110
111 pub fn api_stream<R, A>(&mut self) -> impl Stream<Item = Result<R, SocketError>> + '_
113 where
114 R: ApiMessage<ApiKey = A>,
115 A: FluvioDecoder + Debug,
116 {
117 (&mut self.inner).map(|req_bytes_r| match req_bytes_r {
118 Ok(req_bytes) => {
119 trace!("received bytes from client len: {}", req_bytes.len());
120 let mut src = Cursor::new(&req_bytes);
121 R::decode_from(&mut src).map_err(|err| err.into())
122 }
123 Err(err) => Err(SocketError::Io {
124 source: err,
125 msg: "api stream".to_string(),
126 }),
127 })
128 }
129
130 pub async fn next_api_item<R, A>(&mut self) -> Option<Result<R, SocketError>>
131 where
132 R: ApiMessage<ApiKey = A>,
133 A: FluvioDecoder + Debug,
134 {
135 let mut stream = self.api_stream();
136 stream.next().await
137 }
138
139 pub fn response_stream<R>(
140 &mut self,
141 req_msg: RequestMessage<R>,
142 ) -> impl Stream<Item = R::Response> + '_
143 where
144 R: Request,
145 {
146 let version = req_msg.header.api_version();
147 (&mut self.inner).filter_map(move |req_bytes| async move {
148 match req_bytes {
149 Ok(mut bytes) => match ResponseMessage::decode_from(&mut bytes, version) {
150 Ok(res_msg) => {
151 trace!("receive response: {:#?}", &res_msg);
152 Some(res_msg.response)
153 }
154 Err(err) => {
155 error!("error decoding response: {:?}", err);
156 None
157 }
158 },
159 Err(err) => {
160 error!("error receiving response: {:?}", err);
161 None
162 }
163 }
164 })
165 }
166}
167
168