1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::mpsc;
use std::thread::spawn;
use protobuf::Message;
use failure::bail;
use crate::error::Result;
use crate::messages::abci::{Request, Response};
use crate::varint;

pub const MAX_MESSAGE_LENGTH: usize = 256 * 1024; // TODO: make configurable?

pub struct Connection {
    read_channel: mpsc::Receiver<Result<Request>>,
    write_channel: mpsc::SyncSender<Response>,
    socket: TcpStream
    // TODO: make generic for io::Read/Write
}

impl Connection {
    pub fn new(socket: TcpStream) -> Result<Self> {
        Self::buffered(socket, 0)
    }

    pub fn buffered(socket: TcpStream, capacity: usize) -> Result<Self> {
        let read_socket = socket.try_clone()?;
        let read_channel = Self::create_reader(read_socket, capacity);

        let write_socket = socket.try_clone()?;
        let write_channel = Self::create_writer(write_socket, capacity);

        Ok(Connection {
            read_channel,
            write_channel,
            socket
        })
    }

    pub fn read(&self) -> Result<Request> {
        Ok(self.read_channel.recv()??)
        // TODO: close connection if there was an error
    }

    pub fn write(&self, res: Response) -> Result<()> {
        self.write_channel.send(res)?;
        // TODO: get last write error?
        // TODO: close connection if there was an error
        Ok(())
    }

    pub fn close(mut self) -> Result<()> {
        self.end()
    }

    fn create_reader(socket: TcpStream, capacity: usize) -> mpsc::Receiver<Result<Request>> {
        let (sender, receiver) = mpsc::sync_channel(capacity);
        spawn(move || read(socket, sender));
        receiver
    }

    fn create_writer(socket: TcpStream, capacity: usize) -> mpsc::SyncSender<Response> {
        let (sender, receiver) = mpsc::sync_channel(capacity);
        spawn(move || write(socket, receiver));
        sender
    }

    fn end(&mut self) -> Result<()> {
        self.socket.shutdown(std::net::Shutdown::Both)?;
        // read and write threads will end as the connection will now error when
        // trying to use the socket or channels, whichever happens first
        Ok(())
    }
}

impl Drop for Connection {
    fn drop(&mut self) {
        match self.end() {
            // swallow NotConnected errors since we want to disconnect anyway
            // TODO:
            // Err(err) if err.as_fail() == std::io::ErrorKind::NotConnected
            //     => {},

            Err(err) => panic!(err),
            _ => {}
        };
    }
}

fn read(mut socket: TcpStream, sender: mpsc::SyncSender<Result<Request>>) {
    let mut buf = [0 as u8; MAX_MESSAGE_LENGTH];

    let mut read_request = || -> Result<Request> {
        let length = varint::read(&mut socket)? as usize;
        if length > MAX_MESSAGE_LENGTH {
            bail!("Incoming ABCI request exceeds maximum length ({})", length);
        }

        socket.read_exact(&mut buf[..length])?;

        let req: Request = protobuf::parse_from_bytes(&buf[..length])?;
        Ok(req)
    };

    loop {
        sender.send(read_request()).unwrap(); // TODO: silently exit on error?
    }
}

fn write(mut socket: TcpStream, receiver: mpsc::Receiver<Response>) {
    let mut write_response = || -> Result<()> {
        let res: Response = receiver.recv().unwrap(); // TODO: silently exit on error?

        let mut buf = [0 as u8; 8];
        let length = res.compute_size() as i64;
        let varint_length = varint::encode(&mut buf, length);
        socket.write(&buf[..varint_length])?;

        res.write_to_writer(&mut socket)?;

        Ok(())
    };
    
    loop {
        if let Err(err) = write_response() {
            panic!(err) // TODO: send in error channel
        }
    }
}