1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::os::unix::io::AsRawFd;
use std::os::unix::io::RawFd;
use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::StreamExt;
use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::debug;
use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestMessage;
use fluvio_protocol::api::ResponseMessage;
use fluvio_protocol::codec::FluvioCodec;
use fluvio_future::net::DefaultTcpDomainConnector;
use fluvio_future::net::TcpDomainConnector;
use fluvio_future::net::TcpStream;
use fluvio_future::tls::AllTcpStream;
use super::FlvSocketError;
use crate::InnerFlvSink;
use crate::InnerFlvStream;
pub type FlvSocket = InnerFlvSocket<TcpStream>;
pub type AllFlvSocket = InnerFlvSocket<AllTcpStream>;
#[derive(Debug)]
pub struct InnerFlvSocket<S> {
sink: InnerFlvSink<S>,
stream: InnerFlvStream<S>,
stale: bool,
}
unsafe impl<S> Sync for InnerFlvSocket<S> {}
impl<S> InnerFlvSocket<S> {
pub fn new(sink: InnerFlvSink<S>, stream: InnerFlvStream<S>) -> Self {
Self {
sink,
stream,
stale: false,
}
}
pub fn split(self) -> (InnerFlvSink<S>, InnerFlvStream<S>) {
(self.sink, self.stream)
}
pub fn set_stale(&mut self) {
self.stale = true;
}
pub fn is_stale(&self) -> bool {
self.stale
}
pub fn get_mut_sink(&mut self) -> &mut InnerFlvSink<S> {
&mut self.sink
}
pub fn get_mut_stream(&mut self) -> &mut InnerFlvStream<S> {
&mut self.stream
}
}
impl<S> InnerFlvSocket<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
pub async fn connect_with_connector<C>(
addr: &str,
connector: &C,
) -> Result<Self, FlvSocketError>
where
C: TcpDomainConnector<WrapperStream = S>,
{
debug!("trying to connect to addr at: {}", addr);
let (tcp_stream, fd) = connector.connect(addr).await?;
Ok(Self::from_stream(tcp_stream, fd))
}
pub fn from_stream(tcp_stream: S, raw_fd: RawFd) -> Self {
let framed = Framed::new(tcp_stream.compat(), FluvioCodec {});
let (sink, stream) = framed.split();
Self::new(InnerFlvSink::new(sink, raw_fd), stream.into())
}
pub async fn send<R>(
&mut self,
req_msg: &RequestMessage<R>,
) -> Result<ResponseMessage<R::Response>, FlvSocketError>
where
R: Request,
{
self.sink.send_request(&req_msg).await?;
self.stream.next_response(&req_msg).await
}
}
impl<S> From<(InnerFlvSink<S>, InnerFlvStream<S>)> for InnerFlvSocket<S> {
fn from(pair: (InnerFlvSink<S>, InnerFlvStream<S>)) -> Self {
let (sink, stream) = pair;
Self::new(sink, stream)
}
}
impl FlvSocket {
pub async fn connect(addr: &str) -> Result<Self, FlvSocketError> {
Self::connect_with_connector(addr, &DefaultTcpDomainConnector::new()).await
}
}
impl From<TcpStream> for FlvSocket {
fn from(tcp_stream: TcpStream) -> Self {
let fd = tcp_stream.as_raw_fd();
Self::from_stream(tcp_stream, fd)
}
}