fluvio_socket/
socket.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use std::fmt;

use tracing::{debug, instrument};

use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestMessage;
use fluvio_protocol::api::ResponseMessage;

use fluvio_future::net::{
    BoxReadConnection, BoxWriteConnection, ConnectionFd, DefaultDomainConnector, TcpDomainConnector,
};

use super::SocketError;
use crate::FluvioSink;
use crate::FluvioStream;

/// Socket abstract that can send and receive fluvio objects
pub struct FluvioSocket {
    sink: FluvioSink,
    stream: FluvioStream,
    stale: bool,
}

impl fmt::Debug for FluvioSocket {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Socket({})", self.id())
    }
}

impl FluvioSocket {
    pub fn new(sink: FluvioSink, stream: FluvioStream) -> Self {
        Self {
            sink,
            stream,
            stale: false,
        }
    }

    pub fn split(self) -> (FluvioSink, FluvioStream) {
        (self.sink, self.stream)
    }

    /// mark as stale
    pub fn set_stale(&mut self) {
        self.stale = true;
    }

    pub fn is_stale(&self) -> bool {
        self.stale
    }

    pub fn get_mut_sink(&mut self) -> &mut FluvioSink {
        &mut self.sink
    }

    pub fn get_mut_stream(&mut self) -> &mut FluvioStream {
        &mut self.stream
    }

    pub fn id(&self) -> ConnectionFd {
        self.sink.id()
    }

    /// as client, send request and wait for reply from server
    pub async fn send<R>(
        &mut self,
        req_msg: &RequestMessage<R>,
    ) -> Result<ResponseMessage<R::Response>, SocketError>
    where
        R: Request,
    {
        self.sink.send_request(req_msg).await?;
        self.stream.next_response(req_msg).await
    }
}

impl FluvioSocket {
    #[allow(clippy::clone_on_copy)]
    pub fn from_stream(
        write: BoxWriteConnection,
        read: BoxReadConnection,
        fd: ConnectionFd,
    ) -> Self {
        Self::new(
            FluvioSink::new(write, fd.clone()),
            FluvioStream::new(fd, read),
        )
    }

    /// connect to target address with connector
    #[instrument(skip(connector))]
    pub async fn connect_with_connector(
        addr: &str,
        connector: &dyn TcpDomainConnector,
    ) -> Result<Self, SocketError> {
        debug!("connecting to addr at: {}", addr);

        let (write, read, fd) = connector.connect(addr).await.map_err(|e| {
            let emsg = e.to_string();
            SocketError::Io {
                source: e,
                msg: format!("{emsg}, can't connect to {addr}"),
            }
        })?;

        Ok(Self::from_stream(write, read, fd))
    }
}

impl From<(FluvioSink, FluvioStream)> for FluvioSocket {
    fn from(pair: (FluvioSink, FluvioStream)) -> Self {
        let (sink, stream) = pair;
        Self::new(sink, stream)
    }
}

impl FluvioSocket {
    pub async fn connect(addr: &str) -> Result<Self, SocketError> {
        let connector = DefaultDomainConnector::new();
        Self::connect_with_connector(addr, &connector).await
    }
}

cfg_if::cfg_if! {
    if #[cfg(any(unix, windows))] {
        use fluvio_future::net::{
            AsConnectionFd, TcpStream,
        };
        impl From<TcpStream> for FluvioSocket {
            fn from(tcp_stream: TcpStream) -> Self {
                let fd = tcp_stream.as_connection_fd();
                Self::from_stream(Box::new(tcp_stream.clone()),Box::new(tcp_stream), fd)
            }
        }
    }
}