1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::mpsc;
use std::thread::spawn;
use protobuf::Message;
use failure::bail;
use crate::error::Result;
use crate::messages::abci::{Request, Response};
use crate::varint;
pub const MAX_MESSAGE_LENGTH: usize = 256 * 1024;
pub struct Connection {
read_channel: mpsc::Receiver<Result<Request>>,
write_channel: mpsc::SyncSender<Response>,
socket: TcpStream
}
impl Connection {
pub fn new(socket: TcpStream) -> Result<Self> {
Self::buffered(socket, 0)
}
pub fn buffered(socket: TcpStream, capacity: usize) -> Result<Self> {
let read_socket = socket.try_clone()?;
let read_channel = Self::create_reader(read_socket, capacity);
let write_socket = socket.try_clone()?;
let write_channel = Self::create_writer(write_socket, capacity);
Ok(Connection {
read_channel,
write_channel,
socket
})
}
pub fn read(&self) -> Result<Request> {
Ok(self.read_channel.recv()??)
}
pub fn write(&self, res: Response) -> Result<()> {
self.write_channel.send(res)?;
Ok(())
}
pub fn close(mut self) -> Result<()> {
self.end()
}
fn create_reader(socket: TcpStream, capacity: usize) -> mpsc::Receiver<Result<Request>> {
let (sender, receiver) = mpsc::sync_channel(capacity);
spawn(move || read(socket, sender));
receiver
}
fn create_writer(socket: TcpStream, capacity: usize) -> mpsc::SyncSender<Response> {
let (sender, receiver) = mpsc::sync_channel(capacity);
spawn(move || write(socket, receiver));
sender
}
fn end(&mut self) -> Result<()> {
self.socket.shutdown(std::net::Shutdown::Both)?;
Ok(())
}
}
impl Drop for Connection {
fn drop(&mut self) {
match self.end() {
Err(err) => panic!(err),
_ => {}
};
}
}
fn read(mut socket: TcpStream, sender: mpsc::SyncSender<Result<Request>>) {
let mut buf = [0 as u8; MAX_MESSAGE_LENGTH];
let mut read_request = || -> Result<Request> {
let length = varint::read(&mut socket)? as usize;
if length > MAX_MESSAGE_LENGTH {
bail!("Incoming ABCI request exceeds maximum length ({})", length);
}
socket.read_exact(&mut buf[..length])?;
let req: Request = protobuf::parse_from_bytes(&buf[..length])?;
Ok(req)
};
loop {
sender.send(read_request()).unwrap();
}
}
fn write(mut socket: TcpStream, receiver: mpsc::Receiver<Response>) {
let mut write_response = || -> Result<()> {
let res: Response = receiver.recv().unwrap();
let mut buf = [0 as u8; 8];
let length = res.compute_size() as i64;
let varint_length = varint::encode(&mut buf, length);
socket.write(&buf[..varint_length])?;
res.write_to_writer(&mut socket)?;
Ok(())
};
loop {
if let Err(err) = write_response() {
panic!(err)
}
}
}