agilulf_protocol 0.1.0

Agilulf Protocol serializer and deserializer
Documentation
use super::message::{MessageHead, PartHead};
use super::{AsyncReadBuffer, ProtocolError, Result, Slice};

use futures::{AsyncRead, Stream};

async fn read_message<T: AsyncRead + Unpin>(buf: &mut AsyncReadBuffer<T>) -> Result<Vec<Vec<u8>>> {
    let mut message = Vec::new();

    let line = buf.read_line().await?;
    let head = MessageHead::from_buf(line)?;
    for _ in 0..head.count {
        let part = buf.read_line().await?;
        let head = PartHead::from_buf(part)?;
        let mut content = buf.read_exact(head.size + 2).await?; // 2 for \r\n
        let content = content.drain(0..content.len() - 2).collect();

        message.push(content);
    }

    Ok(message)
}

#[derive(Clone)]
pub struct PutCommand {
    pub key: Slice,
    pub value: Slice,
}

#[derive(Clone)]
pub struct GetCommand {
    pub key: Slice,
}

#[derive(Clone)]
pub struct ScanCommand {
    pub start: Slice,
    pub end: Slice,
}

#[derive(Clone)]
pub struct DeleteCommand {
    pub key: Slice,
}

#[derive(Clone)]
pub enum Command {
    PUT(PutCommand),
    GET(GetCommand),
    DELETE(DeleteCommand),
    SCAN(ScanCommand),
}

impl Command {
    fn from_message(mut message: Vec<Vec<u8>>) -> Result<Command> {
        let command = std::str::from_utf8(&message[0])?.to_uppercase();
        match command.as_ref() {
            "PUT" => {
                if message.len() == 3 {
                    let value = Slice(message.remove(2));
                    let key = Slice(message.remove(1));
                    Ok(Command::PUT(PutCommand { key, value }))
                } else {
                    Err(ProtocolError::GrammarCheckFailed(
                        "PUT should have two arguments",
                    ))
                }
            }
            "GET" => {
                if message.len() == 2 {
                    let key = Slice(message.remove(1));
                    Ok(Command::GET(GetCommand { key }))
                } else {
                    Err(ProtocolError::GrammarCheckFailed(
                        "GET should have one argument",
                    ))
                }
            }
            "DELETE" => {
                if message.len() == 2 {
                    let key = Slice(message.remove(1));
                    Ok(Command::DELETE(DeleteCommand { key }))
                } else {
                    Err(ProtocolError::GrammarCheckFailed(
                        "DELETE should have one argument",
                    ))
                }
            }
            "SCAN" => {
                if message.len() == 3 {
                    let end = Slice(message.remove(2));
                    let start = Slice(message.remove(1));
                    Ok(Command::SCAN(ScanCommand { start, end }))
                } else {
                    Err(ProtocolError::GrammarCheckFailed(
                        "SCAN should have two arguments",
                    ))
                }
            }
            _ => Err(ProtocolError::CommandNotSupport(command)),
        }
    }
}

async fn read_command<T: AsyncRead + Unpin>(buf: &mut AsyncReadBuffer<T>) -> Result<Command> {
    let message = read_message(buf).await?;

    Ok(Command::from_message(message)?)
}

trait AppendSlice {
    fn append_part(&mut self, slice: &[u8]);
}

impl AppendSlice for Vec<u8> {
    fn append_part(&mut self, slice: &[u8]) {
        self.extend_from_slice((PartHead { size: slice.len() }).into_bytes().as_slice());
        self.extend_from_slice(slice);
        self.extend_from_slice(b"\r\n");
    }
}

impl Into<Vec<u8>> for Command {
    fn into(self) -> Vec<u8> {
        let mut message = Vec::new();
        match self {
            Command::PUT(command) => {
                message.extend_from_slice((MessageHead { count: 3 }).into_bytes().as_slice());

                message.append_part(b"PUT");
                message.append_part(command.key.0.as_slice());
                message.append_part(command.value.0.as_slice());
            }
            Command::GET(command) => {
                message.extend_from_slice((MessageHead { count: 2 }).into_bytes().as_slice());

                message.append_part(b"GET");
                message.append_part(command.key.0.as_slice());
            }
            Command::DELETE(command) => {
                message.extend_from_slice((MessageHead { count: 2 }).into_bytes().as_slice());

                message.append_part(b"DELETE");
                message.append_part(command.key.0.as_slice());
            }
            Command::SCAN(command) => {
                message.extend_from_slice((MessageHead { count: 3 }).into_bytes().as_slice());

                message.append_part(b"SCAN");
                message.append_part(command.start.0.as_slice());
                message.append_part(command.end.0.as_slice());
            }
        }

        message
    }
}

impl<T: AsyncRead + Unpin + 'static> AsyncReadBuffer<T> {
    /// Convert a `AsyncReadBuffer` into `Stream<Item = Result<Command>>`.
    pub fn into_command_stream(self) -> impl Stream<Item = Result<Command>> {
        futures::stream::unfold(self, |mut buffer| {
            let future = async move {
                let command = read_command(&mut buffer).await;
                Some((command, buffer))
            };
            Box::pin(future)
        })
    }
}