1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
cfg_if::cfg_if! {
    if #[cfg(unix)] {
        use std::os::unix::io::AsRawFd;
        use std::os::unix::io::RawFd;
    }
}

use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::StreamExt;
use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;

use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestMessage;
use fluvio_protocol::api::ResponseMessage;
use fluvio_protocol::codec::FluvioCodec;

use fluvio_future::net::DefaultTcpDomainConnector;
use fluvio_future::net::TcpDomainConnector;
use fluvio_future::net::TcpStream;

use super::FlvSocketError;
use crate::InnerFlvSink;
use crate::InnerFlvStream;

pub type FlvSocket = InnerFlvSocket<TcpStream>;

#[cfg(feature = "tls")]
pub type AllFlvSocket = InnerFlvSocket<fluvio_future::native_tls::AllTcpStream>;

/// Socket abstract that can send and receive fluvio objects
#[derive(Debug)]
pub struct InnerFlvSocket<S> {
    sink: InnerFlvSink<S>,
    stream: InnerFlvStream<S>,
    stale: bool,
}

unsafe impl<S> Sync for InnerFlvSocket<S> {}

impl<S> InnerFlvSocket<S> {
    pub fn new(sink: InnerFlvSink<S>, stream: InnerFlvStream<S>) -> Self {
        Self {
            sink,
            stream,
            stale: false,
        }
    }

    pub fn split(self) -> (InnerFlvSink<S>, InnerFlvStream<S>) {
        (self.sink, self.stream)
    }

    /// mark as stale
    pub fn set_stale(&mut self) {
        self.stale = true;
    }

    pub fn is_stale(&self) -> bool {
        self.stale
    }

    pub fn get_mut_sink(&mut self) -> &mut InnerFlvSink<S> {
        &mut self.sink
    }

    pub fn get_mut_stream(&mut self) -> &mut InnerFlvStream<S> {
        &mut self.stream
    }

    pub fn id(&self) -> RawFd {
        self.sink.id()
    }
}

impl<S> InnerFlvSocket<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send,
{
    /// connect to target address with connector
    pub async fn connect_with_connector<C>(
        addr: &str,
        connector: &C,
    ) -> Result<Self, FlvSocketError>
    where
        C: TcpDomainConnector<WrapperStream = S>,
    {
        debug!("trying to connect to addr at: {}", addr);
        let (tcp_stream, fd) = connector.connect(addr).await?;
        Ok(Self::from_stream(tcp_stream, fd))
    }

    pub fn from_stream(tcp_stream: S, raw_fd: RawFd) -> Self {
        let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
        let (sink, stream) = framed.split();
        Self::new(InnerFlvSink::new(sink, raw_fd), stream.into())
    }

    /// as client, send request and wait for reply from server
    pub async fn send<R>(
        &mut self,
        req_msg: &RequestMessage<R>,
    ) -> Result<ResponseMessage<R::Response>, FlvSocketError>
    where
        R: Request,
    {
        self.sink.send_request(&req_msg).await?;

        self.stream.next_response(&req_msg).await
    }
}

impl<S> From<(InnerFlvSink<S>, InnerFlvStream<S>)> for InnerFlvSocket<S> {
    fn from(pair: (InnerFlvSink<S>, InnerFlvStream<S>)) -> Self {
        let (sink, stream) = pair;
        Self::new(sink, stream)
    }
}

impl FlvSocket {
    pub async fn connect(addr: &str) -> Result<Self, FlvSocketError> {
        Self::connect_with_connector(addr, &DefaultTcpDomainConnector {}).await
    }
}

cfg_if::cfg_if! {
    if #[cfg(unix)] {
        impl From<TcpStream> for FlvSocket {
            fn from(tcp_stream: TcpStream) -> Self {
                let fd = tcp_stream.as_raw_fd();
                Self::from_stream(tcp_stream, fd)
            }
        }
    }
}