v-module-queue 0.1.0

Veda module queue consumer runtime (without full-text / xapian stack)
Documentation
use nng::options::{Options, RecvTimeout, SendTimeout};
use nng::{Protocol, Socket};
use std::time::Duration;

#[derive(Eq, PartialEq, Debug, Clone)]
#[repr(u16)]
pub enum IndvOp {
    Put = 1,
    SetIn = 45,
    AddTo = 47,
    RemoveFrom = 48,
    Remove = 51,
    None = 52,
    RemovePredicates = 53,
}

impl IndvOp {
    pub fn from_i64(value: i64) -> IndvOp {
        match value {
            1 => IndvOp::Put,
            51 => IndvOp::Remove,
            47 => IndvOp::AddTo,
            45 => IndvOp::SetIn,
            48 => IndvOp::RemoveFrom,
            52 => IndvOp::None,
            53 => IndvOp::RemovePredicates,
            _ => IndvOp::None,
        }
    }

    pub fn to_i64(&self) -> i64 {
        match self {
            IndvOp::Put => 1,
            IndvOp::Remove => 51,
            IndvOp::AddTo => 47,
            IndvOp::SetIn => 45,
            IndvOp::RemoveFrom => 48,
            IndvOp::None => 52,
            IndvOp::RemovePredicates => 53,
        }
    }

    pub fn as_string(&self) -> String {
        match self {
            IndvOp::Put => "put",
            IndvOp::Remove => "remove",
            IndvOp::AddTo => "add_to",
            IndvOp::SetIn => "set_in",
            IndvOp::RemoveFrom => "remove_from",
            IndvOp::None => "none",
            IndvOp::RemovePredicates => "remove_predicates",
        }
        .to_string()
    }
}

#[derive(Clone)]
struct NngClient {
    name: String,
    soc: Socket,
    addr: String,
    is_ready: bool,
}

impl NngClient {
    fn new(name: &str, addr: String) -> NngClient {
        NngClient {
            name: name.to_owned(),
            soc: Socket::new(Protocol::Req0).unwrap(),
            addr,
            is_ready: false,
        }
    }

    fn connect(&mut self) -> bool {
        if self.addr.is_empty() {
            error!("nng {} : invalid addr: [{}]", self.name, self.addr);
            return self.is_ready;
        }

        if let Err(e) = self.soc.dial(self.addr.as_str()) {
            error!("nng {}: fail dial to [{}], err={}", self.name, self.addr, e);
        } else {
            info!("nng {}: success connect to [{}]", self.name, self.addr);
            self.is_ready = true;

            if let Err(e) = self.soc.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
                error!("nng {}: fail set recv timeout, err={}", self.name, e);
            }
            if let Err(e) = self.soc.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
                error!("nng {}: fail set send timeout, err={}", self.name, e);
            }
        }
        self.is_ready
    }
}

#[derive(Clone)]
pub struct MStorageClient {
    client: NngClient,
}

impl MStorageClient {
    pub fn new(addr: String) -> MStorageClient {
        MStorageClient {
            client: NngClient::new("mstorage client", addr),
        }
    }

    pub fn connect(&mut self) -> bool {
        self.client.connect()
    }
}