1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;

use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::StreamExt;
use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;

use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestMessage;
use fluvio_protocol::api::ResponseMessage;
use fluvio_protocol::codec::FluvioCodec;

use fluvio_future::net::DefaultTcpDomainConnector;
use fluvio_future::net::TcpDomainConnector;
use fluvio_future::net::TcpStream;
use fluvio_future::tls::AllTcpStream;

use super::FlvSocketError;
use crate::InnerFlvSink;
use crate::InnerFlvStream;

pub type FlvSocket = InnerFlvSocket<TcpStream>;
pub type AllFlvSocket = InnerFlvSocket<AllTcpStream>;

/// FlvSocket is high level socket that can send and receive fluvio protocol
#[derive(Debug)]
pub struct InnerFlvSocket<S> {
    sink: InnerFlvSink<S>,
    stream: InnerFlvStream<S>,
    stale: bool,
}

unsafe impl<S> Sync for InnerFlvSocket<S> {}

impl<S> InnerFlvSocket<S> {
    pub fn new(sink: InnerFlvSink<S>, stream: InnerFlvStream<S>) -> Self {
        Self {
            sink,
            stream,
            stale: false,
        }
    }

    pub fn split(self) -> (InnerFlvSink<S>, InnerFlvStream<S>) {
        (self.sink, self.stream)
    }

    /// mark as stale
    pub fn set_stale(&mut self) {
        self.stale = true;
    }

    pub fn is_stale(&self) -> bool {
        self.stale
    }

    pub fn get_mut_sink(&mut self) -> &mut InnerFlvSink<S> {
        &mut self.sink
    }

    pub fn get_mut_stream(&mut self) -> &mut InnerFlvStream<S> {
        &mut self.stream
    }
}

impl<S> InnerFlvSocket<S>
where
    S: AsyncRead + AsyncWrite + Unpin + Send,
{
    /// connect to target address with connector
    pub async fn connect_with_connector<C>(
        addr: &str,
        connector: &C,
    ) -> Result<Self, FlvSocketError>
    where
        C: TcpDomainConnector<WrapperStream = S>,
    {
        debug!("trying to connect to addr at: {}", addr);
        let (tcp_stream, fd) = connector.connect(addr).await?;
        Ok(Self::from_stream(tcp_stream, fd))
    }

    pub fn from_stream(tcp_stream: S, raw_fd: RawFd) -> Self {
        let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
        let (sink, stream) = framed.split();
        Self::new(InnerFlvSink::new(sink, raw_fd), stream.into())
    }

    /// as client, send request and wait for reply from server
    pub async fn send<R>(
        &mut self,
        req_msg: &RequestMessage<R>,
    ) -> Result<ResponseMessage<R::Response>, FlvSocketError>
    where
        R: Request,
    {
        self.sink.send_request(&req_msg).await?;

        self.stream.next_response(&req_msg).await
    }
}

impl<S> From<(InnerFlvSink<S>, InnerFlvStream<S>)> for InnerFlvSocket<S> {
    fn from(pair: (InnerFlvSink<S>, InnerFlvStream<S>)) -> Self {
        let (sink, stream) = pair;
        Self::new(sink, stream)
    }
}

impl FlvSocket {
    pub async fn connect(addr: &str) -> Result<Self, FlvSocketError> {
        Self::connect_with_connector(addr, &DefaultTcpDomainConnector::new()).await
    }
}

impl From<TcpStream> for FlvSocket {
    fn from(tcp_stream: TcpStream) -> Self {
        let fd = tcp_stream.as_raw_fd();
        Self::from_stream(tcp_stream, fd)
    }
}