1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::fmt::Display;
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
use log::trace;
use futures::stream::StreamExt;
use futures_codec::Framed;
use flv_future_aio::net::ToSocketAddrs;
use kf_protocol::api::Request;
use kf_protocol::api::RequestMessage;
use kf_protocol::api::ResponseMessage;
use kf_protocol::transport::KfCodec;
use flv_future_aio::net::AsyncTcpStream;
use crate::KfSink;
use crate::KfStream;
use super::KfSocketError;
#[derive(Debug)]
pub struct KfSocket {
sink: KfSink,
stream: KfStream,
stale: bool,
}
unsafe impl Sync for KfSocket {}
impl KfSocket {
pub fn new(sink: KfSink, stream: KfStream) -> Self {
KfSocket {
sink,
stream,
stale: false,
}
}
pub async fn connect<A>(addr: A) -> Result<KfSocket, KfSocketError>
where
A: ToSocketAddrs + Display,
{
trace!("trying to connect to server at: {}", addr);
let tcp_stream = AsyncTcpStream::connect(addr).await?;
Ok(tcp_stream.into())
}
pub fn split(self) -> (KfSink, KfStream) {
(self.sink, self.stream)
}
pub fn set_stale(&mut self) {
self.stale = true;
}
pub fn is_stale(&self) -> bool {
self.stale
}
pub fn get_mut_sink(&mut self) -> &mut KfSink {
&mut self.sink
}
pub fn get_mut_stream(&mut self) -> &mut KfStream {
&mut self.stream
}
pub async fn send<R>(
&mut self,
req_msg: &RequestMessage<R>,
) -> Result<ResponseMessage<R::Response>, KfSocketError>
where
R: Request,
{
self.sink.send_request(&req_msg).await?;
self.stream.next_response(&req_msg).await
}
}
impl From<AsyncTcpStream> for KfSocket {
fn from(tcp_stream: AsyncTcpStream) -> Self {
let fd = tcp_stream.as_raw_fd();
let framed = Framed::new(tcp_stream, KfCodec {});
let (sink, stream) = framed.split();
KfSocket {
sink: KfSink::new(sink, fd),
stream: stream.into(),
stale: false,
}
}
}
impl From<(KfSink, KfStream)> for KfSocket {
fn from(pair: (KfSink, KfStream)) -> Self {
let (sink, stream) = pair;
KfSocket::new(sink, stream)
}
}