1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(unix)]
use std::os::unix::io::RawFd;
use log::debug;
use futures::stream::StreamExt;
use tokio_util::codec::Framed;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use futures::io::{AsyncRead, AsyncWrite};
use kf_protocol::api::Request;
use kf_protocol::api::RequestMessage;
use kf_protocol::api::ResponseMessage;
use kf_protocol::transport::KfCodec;
use flv_future_aio::net::TcpStream;
use flv_future_aio::net::TcpDomainConnector;
use flv_future_aio::net::DefaultTcpDomainConnector;
use flv_future_aio::net::tls::AllTcpStream;
use crate::InnerKfSink;
use crate::InnerKfStream;
use super::KfSocketError;
pub type KfSocket = InnerKfSocket<TcpStream>;
pub type AllKfSocket = InnerKfSocket<AllTcpStream>;
#[derive(Debug)]
pub struct InnerKfSocket<S> {
sink: InnerKfSink<S>,
stream: InnerKfStream<S>,
stale: bool,
}
unsafe impl <S>Sync for InnerKfSocket<S> {}
impl <S>InnerKfSocket<S> {
pub fn new(sink: InnerKfSink<S>, stream: InnerKfStream<S>) -> Self {
InnerKfSocket {
sink,
stream,
stale: false,
}
}
pub fn split(self) -> (InnerKfSink<S>, InnerKfStream<S>) {
(self.sink, self.stream)
}
pub fn set_stale(&mut self) {
self.stale = true;
}
pub fn is_stale(&self) -> bool {
self.stale
}
pub fn get_mut_sink(&mut self) -> &mut InnerKfSink<S> {
&mut self.sink
}
pub fn get_mut_stream(&mut self) -> &mut InnerKfStream<S> {
&mut self.stream
}
}
impl <S>InnerKfSocket<S> where S: AsyncRead + AsyncWrite + Unpin + Send {
pub async fn connect_with_connector<C>(addr: &str,connector: &C) -> Result<Self, KfSocketError>
where C: TcpDomainConnector<WrapperStream=S>
{
debug!("trying to connect to addr at: {}", addr);
let (tcp_stream,fd) = connector.connect(addr).await?;
Ok(Self::from_stream(tcp_stream,fd))
}
pub fn from_stream(tcp_stream: S, raw_fd: RawFd) -> Self {
let framed = Framed::new(tcp_stream.compat(), KfCodec {});
let (sink, stream) = framed.split();
Self::new(InnerKfSink::new(sink, raw_fd),stream.into())
}
pub async fn send<R>(
&mut self,
req_msg: &RequestMessage<R>,
) -> Result<ResponseMessage<R::Response>, KfSocketError>
where
R: Request,
{
self.sink.send_request(&req_msg).await?;
self.stream.next_response(&req_msg).await
}
}
impl <S>From<(InnerKfSink<S>, InnerKfStream<S>)> for InnerKfSocket<S>
{
fn from(pair: (InnerKfSink<S>, InnerKfStream<S>)) -> Self {
let (sink, stream) = pair;
InnerKfSocket::new(sink, stream)
}
}
impl KfSocket {
pub async fn connect(addr: &str) -> Result<Self, KfSocketError>
{
Self::connect_with_connector(addr,&DefaultTcpDomainConnector::new()).await
}
}
impl From<TcpStream> for KfSocket {
fn from(tcp_stream: TcpStream) -> Self {
let fd = tcp_stream.as_raw_fd();
Self::from_stream(tcp_stream,fd)
}
}