fluvio_socket/
socket.rs

1use std::fmt;
2
3use tracing::{debug, instrument};
4
5use fluvio_protocol::api::Request;
6use fluvio_protocol::api::RequestMessage;
7use fluvio_protocol::api::ResponseMessage;
8
9use fluvio_future::net::{
10    BoxReadConnection, BoxWriteConnection, ConnectionFd, DefaultDomainConnector, TcpDomainConnector,
11};
12
13use super::SocketError;
14use crate::FluvioSink;
15use crate::FluvioStream;
16
17/// Socket abstract that can send and receive fluvio objects
18pub struct FluvioSocket {
19    sink: FluvioSink,
20    stream: FluvioStream,
21    stale: bool,
22}
23
24impl fmt::Debug for FluvioSocket {
25    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
26        write!(f, "Socket({})", self.id())
27    }
28}
29
30impl FluvioSocket {
31    pub fn new(sink: FluvioSink, stream: FluvioStream) -> Self {
32        Self {
33            sink,
34            stream,
35            stale: false,
36        }
37    }
38
39    pub fn split(self) -> (FluvioSink, FluvioStream) {
40        (self.sink, self.stream)
41    }
42
43    /// mark as stale
44    pub fn set_stale(&mut self) {
45        self.stale = true;
46    }
47
48    pub fn is_stale(&self) -> bool {
49        self.stale
50    }
51
52    pub fn get_mut_sink(&mut self) -> &mut FluvioSink {
53        &mut self.sink
54    }
55
56    pub fn get_mut_stream(&mut self) -> &mut FluvioStream {
57        &mut self.stream
58    }
59
60    pub fn id(&self) -> ConnectionFd {
61        self.sink.id()
62    }
63
64    /// as client, send request and wait for reply from server
65    pub async fn send<R>(
66        &mut self,
67        req_msg: &RequestMessage<R>,
68    ) -> Result<ResponseMessage<R::Response>, SocketError>
69    where
70        R: Request,
71    {
72        self.sink.send_request(req_msg).await?;
73        self.stream.next_response(req_msg).await
74    }
75}
76
77impl FluvioSocket {
78    #[allow(clippy::clone_on_copy)]
79    pub fn from_stream(
80        write: BoxWriteConnection,
81        read: BoxReadConnection,
82        fd: ConnectionFd,
83    ) -> Self {
84        Self::new(
85            FluvioSink::new(write, fd.clone()),
86            FluvioStream::new(fd, read),
87        )
88    }
89
90    /// connect to target address with connector
91    #[instrument(skip(connector))]
92    pub async fn connect_with_connector(
93        addr: &str,
94        connector: &dyn TcpDomainConnector,
95    ) -> Result<Self, SocketError> {
96        debug!("connecting to addr at: {}", addr);
97
98        let (write, read, fd) = connector.connect(addr).await.map_err(|e| {
99            let emsg = e.to_string();
100            SocketError::Io {
101                source: e,
102                msg: format!("{emsg}, can't connect to {addr}"),
103            }
104        })?;
105
106        Ok(Self::from_stream(write, read, fd))
107    }
108}
109
110impl From<(FluvioSink, FluvioStream)> for FluvioSocket {
111    fn from(pair: (FluvioSink, FluvioStream)) -> Self {
112        let (sink, stream) = pair;
113        Self::new(sink, stream)
114    }
115}
116
117impl FluvioSocket {
118    pub async fn connect(addr: &str) -> Result<Self, SocketError> {
119        let connector = DefaultDomainConnector::new();
120        Self::connect_with_connector(addr, &connector).await
121    }
122}
123
124cfg_if::cfg_if! {
125    if #[cfg(any(unix, windows))] {
126        use fluvio_future::net::{
127            AsConnectionFd, TcpStream,
128        };
129        impl From<TcpStream> for FluvioSocket {
130            fn from(tcp_stream: TcpStream) -> Self {
131                let fd = tcp_stream.as_connection_fd();
132                Self::from_stream(Box::new(tcp_stream.clone()),Box::new(tcp_stream), fd)
133            }
134        }
135    }
136}