open-protocol-client 0.0.0

Library for implementing Atlas Copco/Torque Open Protocol clients.
Documentation
use std::collections::VecDeque;
use std::io;
use bytes::{Buf, BytesMut};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use open_protocol::{Header, Message};
use open_protocol::decode::{self, Decoder, Decode};
use crate::client::{ConnectionError, Event};

pub struct Network {
    pub socket: TcpStream,
    pub read_buf: BytesMut,
}

impl Network {
    pub fn new(socket: TcpStream) -> Self {
        Self {
            socket,
            read_buf: BytesMut::with_capacity(10 * 1024),
        }
    }

    async fn read_bytes(&mut self, required: usize) -> io::Result<usize> {
        let mut total_read = 0;
        loop {
            let read = self.socket.read_buf(&mut self.read_buf).await?;

            if 0 == read {
                return if self.read_buf.is_empty() {
                    Err(io::Error::new(
                        io::ErrorKind::ConnectionAborted,
                        "connection closed by peer",
                    ))
                } else {
                    Err(io::Error::new(
                        io::ErrorKind::ConnectionReset,
                        "connection reset by peer",
                    ))
                };
            }

            total_read += read;
            if total_read >= required {
                return Ok(total_read);
            }
        }
    }

    pub async fn read(&mut self, events: &mut VecDeque<Event>) -> io::Result<()> {
        loop {
            let required = match read_message(&mut self.read_buf) {
                Ok(message) => {
                    events.push_back(Event::Incoming(message));
                    return Ok(());
                },
                Err(decode::Error::InsufficientBytes { have, need }) => need - have,
                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
            };

            self.read_bytes(required).await?;
        }
    }

    pub async fn flush(&mut self, write_buf: &mut BytesMut) -> Result<(), ConnectionError> {
        if write_buf.is_empty() {
            return Ok(());
        }

        self.socket.write_all(&write_buf[..]).await?;
        write_buf.clear();
        Ok(())
    }
}

fn read_message(stream: &mut BytesMut) -> decode::Result<Message> {
    if stream.len() < 20 {
        return Err(decode::Error::InsufficientBytes { have: stream.len(), need: 20 });
    }

    let mut decoder = Decoder::new(&stream[..]);
    let header = Header::decode(&mut decoder)?;

    if stream.len() < (header.length as usize) {
        return Err(decode::Error::InsufficientBytes { have: stream.len(), need: header.length as usize });
    }

    let message = Message::decode_payload(header.mid, header.revision_number(), &mut decoder)?;
    decoder.expect_char(0x0 as char)?;
    stream.advance((header.length + 1) as usize);
    Ok(message)
}