quee 0.1.0

Quee is a contract-driven message queue for distributed system
Documentation
use std::collections::HashMap;

use parser::Contract;
use serde::{Deserialize, Serialize};

use tonic::transport::Channel;

#[path = "engine.rs"]
pub(crate) mod engine;

use engine::{
    core_client::CoreClient, handshake_client::HandshakeClient, ContractRequest, Message,
    MessageBuf, MessageStatus, Ping, QueueBuf,
};

pub mod parser;

pub struct XMessage<T> {
    #[allow(dead_code)]
    id: i64,
    inner: T,
}

impl<T> XMessage<T> {
    pub fn new(id: i64, msg: T) -> Self {
        Self { id, inner: msg }
    }

    pub fn into_inner(self) -> T {
        self.inner
    }
}

#[derive(Debug)]
pub enum Error {
    HandshakeError,
    PingError,
    InternalError(String),
    ProduceSerializeError,
    ProduceError,
    ContractSendError,
}

#[derive(PartialEq)]
pub enum ConnectionState {
    Idle,
    Ready,
    Lost,
}

pub struct Connection {
    #[allow(dead_code)]
    address: String,
    token: String,
    hs_client: HandshakeClient<Channel>,
    core_client: CoreClient<Channel>,
    state: ConnectionState,

    pub on_connect: fn() -> (),
    pub on_reconnect: fn() -> (),
    pub on_error: fn(String) -> (),
    pub on_disconnect: fn(String) -> (),
}

fn sys_out(msg: String) {
    println!("{}", msg)
}

impl Connection {
    pub async fn create(addr: String) -> Result<Self, Error> {
        let client = HandshakeClient::connect(addr.clone()).await.map_err(|e| {
            println!("Error while handshake: {}", e);
            return Error::HandshakeError;
        })?;

        let core_client = CoreClient::connect(addr.clone()).await.map_err(|e| {
            println!("Error while core handshake: {}", e);
            return Error::HandshakeError;
        })?;

        let conn = Self {
            address: addr,
            token: "".into(),
            hs_client: client,
            core_client,
            state: ConnectionState::Idle,
            on_connect: || {},
            on_reconnect: || {},
            on_error: sys_out,
            on_disconnect: sys_out,
        };

        Ok(conn)
    }

    pub async fn connect(&mut self) -> Result<(), Error> {
        self.heartbeat().await?;

        (self.on_connect)();
        // TODO: start calling the server to check for its
        // liveliness
        Ok(())
    }

    async fn heartbeat(&mut self) -> Result<(), Error> {
        let reply = self
            .hs_client
            .ping(Ping {
                version: "1".into(),
            })
            .await
            .map_err(|e| {
                self.state = ConnectionState::Lost;
                println!("Error while pinging for channel: {}", e);
                return Error::PingError;
            })?;

        let resp = reply.into_inner();
        if !resp.ok {
            self.state = ConnectionState::Idle;
            return Err(Error::InternalError(resp.message));
        }

        self.state = ConnectionState::Ready;
        self.token = resp.token;

        Ok(())
    }

    pub fn get_state(&self) -> &ConnectionState {
        &self.state
    }

    // TODO: for now let it be here ..but need to think of some place
    // better suited for this job
    // Here we clone a SHIT TON of things, find a better way
    pub async fn load_contract(&mut self, contract: Contract) -> Result<(), Error> {
        let mut mbufs: HashMap<String, MessageBuf> = HashMap::new();
        let mut qbufs: HashMap<String, QueueBuf> = HashMap::new();
        for k in contract.queue.keys() {
            let v = contract.queue.get(k).unwrap();
            let msg = contract.message.get(&v.message).unwrap();
            mbufs.insert(v.message.clone(), MessageBuf { inner: msg.clone() });
            qbufs.insert(
                k.clone(),
                QueueBuf {
                    ns: v.ns.clone(),
                    fork: v.fork.clone(),
                    push_back: v.push_back,
                    message: v.message.clone(),
                },
            );
        }
        let contract_req = ContractRequest {
            message: mbufs,
            queues: qbufs,
        };

        // TODO: take in token here!!
        let reply = self
            .core_client
            .load_contract(contract_req)
            .await
            .map_err(|e| {
                eprintln!("Error while loading contract: {}", e);
                Error::ContractSendError
            })?;

        let reply = reply.into_inner();
        if !reply.ok {
            return Err(Error::InternalError(reply.message.unwrap()));
        }
        Ok(())
    }
}

pub struct Queue<'a, M>
where
    M: Serialize + Deserialize<'a>,
{
    name: String,
    conn: &'a mut Connection,
    consume_fn: Option<fn(XMessage<M>) -> MessageStatus>,
}

impl<'a, M> Queue<'a, M>
where
    M: Serialize + Deserialize<'a>,
{
    pub fn new(conn: &'a mut Connection, name: String) -> Self {
        Self {
            name,
            conn,
            consume_fn: None,
        }
    }

    pub fn consume(&mut self, handler: fn(XMessage<M>) -> MessageStatus) {
        self.consume_fn = Some(handler);
    }

    // TODO: maybe create our own errors instead of dyn Error
    pub async fn produce(&mut self, message: &M) -> Result<bool, Error> {
        let j: String = serde_json::to_string(message).map_err(|e| {
            eprintln!("Error while converting into json string: {}", e);
            return Error::ProduceSerializeError;
        })?;

        // TODO: create id here

        // send this message
        let payload = j.into_bytes();
        let msg = Message {
            id: 1,
            payload: payload.clone(), // See something here about not cloning and using
            length: payload.len() as i64,
            topic: self.name.clone(),
        };

        let reply = self.conn.core_client.send(msg).await.map_err(|e| {
            eprintln!("Error: e : {}", e);
            return Error::ProduceError;
        })?;

        let reply = reply.into_inner();
        if !reply.ok {
            return Err(Error::InternalError(reply.message.unwrap()));
        }

        Ok(true)
    }
}