1use std::fmt;
2
3use tracing::{debug, instrument};
4
5use fluvio_protocol::api::Request;
6use fluvio_protocol::api::RequestMessage;
7use fluvio_protocol::api::ResponseMessage;
8
9use fluvio_future::net::{
10 BoxReadConnection, BoxWriteConnection, ConnectionFd, DefaultDomainConnector, TcpDomainConnector,
11};
12
13use super::SocketError;
14use crate::FluvioSink;
15use crate::FluvioStream;
16
17pub struct FluvioSocket {
19 sink: FluvioSink,
20 stream: FluvioStream,
21 stale: bool,
22}
23
24impl fmt::Debug for FluvioSocket {
25 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26 write!(f, "Socket({})", self.id())
27 }
28}
29
30impl FluvioSocket {
31 pub fn new(sink: FluvioSink, stream: FluvioStream) -> Self {
32 Self {
33 sink,
34 stream,
35 stale: false,
36 }
37 }
38
39 pub fn split(self) -> (FluvioSink, FluvioStream) {
40 (self.sink, self.stream)
41 }
42
43 pub fn set_stale(&mut self) {
45 self.stale = true;
46 }
47
48 pub fn is_stale(&self) -> bool {
49 self.stale
50 }
51
52 pub fn get_mut_sink(&mut self) -> &mut FluvioSink {
53 &mut self.sink
54 }
55
56 pub fn get_mut_stream(&mut self) -> &mut FluvioStream {
57 &mut self.stream
58 }
59
60 pub fn id(&self) -> ConnectionFd {
61 self.sink.id()
62 }
63
64 pub async fn send<R>(
66 &mut self,
67 req_msg: &RequestMessage<R>,
68 ) -> Result<ResponseMessage<R::Response>, SocketError>
69 where
70 R: Request,
71 {
72 self.sink.send_request(req_msg).await?;
73 self.stream.next_response(req_msg).await
74 }
75}
76
77impl FluvioSocket {
78 #[allow(clippy::clone_on_copy)]
79 pub fn from_stream(
80 write: BoxWriteConnection,
81 read: BoxReadConnection,
82 fd: ConnectionFd,
83 ) -> Self {
84 Self::new(
85 FluvioSink::new(write, fd.clone()),
86 FluvioStream::new(fd, read),
87 )
88 }
89
90 #[instrument(skip(connector))]
92 pub async fn connect_with_connector(
93 addr: &str,
94 connector: &dyn TcpDomainConnector,
95 ) -> Result<Self, SocketError> {
96 debug!("connecting to addr at: {}", addr);
97
98 let (write, read, fd) = connector.connect(addr).await.map_err(|e| {
99 let emsg = e.to_string();
100 SocketError::Io {
101 source: e,
102 msg: format!("{emsg}, can't connect to {addr}"),
103 }
104 })?;
105
106 Ok(Self::from_stream(write, read, fd))
107 }
108}
109
110impl From<(FluvioSink, FluvioStream)> for FluvioSocket {
111 fn from(pair: (FluvioSink, FluvioStream)) -> Self {
112 let (sink, stream) = pair;
113 Self::new(sink, stream)
114 }
115}
116
117impl FluvioSocket {
118 pub async fn connect(addr: &str) -> Result<Self, SocketError> {
119 let connector = DefaultDomainConnector::new();
120 Self::connect_with_connector(addr, &connector).await
121 }
122}
123
124cfg_if::cfg_if! {
125 if #[cfg(any(unix, windows))] {
126 use fluvio_future::net::{
127 AsConnectionFd, TcpStream,
128 };
129 impl From<TcpStream> for FluvioSocket {
130 fn from(tcp_stream: TcpStream) -> Self {
131 let fd = tcp_stream.as_connection_fd();
132 Self::from_stream(Box::new(tcp_stream.clone()),Box::new(tcp_stream), fd)
133 }
134 }
135 }
136}