Skip to main content

v_module_queue/
api.rs

1use nng::options::{Options, RecvTimeout, SendTimeout};
2use nng::{Protocol, Socket};
3use std::time::Duration;
4
5#[derive(Eq, PartialEq, Debug, Clone)]
6#[repr(u16)]
7pub enum IndvOp {
8    Put = 1,
9    SetIn = 45,
10    AddTo = 47,
11    RemoveFrom = 48,
12    Remove = 51,
13    None = 52,
14    RemovePredicates = 53,
15}
16
17impl IndvOp {
18    pub fn from_i64(value: i64) -> IndvOp {
19        match value {
20            1 => IndvOp::Put,
21            51 => IndvOp::Remove,
22            47 => IndvOp::AddTo,
23            45 => IndvOp::SetIn,
24            48 => IndvOp::RemoveFrom,
25            52 => IndvOp::None,
26            53 => IndvOp::RemovePredicates,
27            _ => IndvOp::None,
28        }
29    }
30
31    pub fn to_i64(&self) -> i64 {
32        match self {
33            IndvOp::Put => 1,
34            IndvOp::Remove => 51,
35            IndvOp::AddTo => 47,
36            IndvOp::SetIn => 45,
37            IndvOp::RemoveFrom => 48,
38            IndvOp::None => 52,
39            IndvOp::RemovePredicates => 53,
40        }
41    }
42
43    pub fn as_string(&self) -> String {
44        match self {
45            IndvOp::Put => "put",
46            IndvOp::Remove => "remove",
47            IndvOp::AddTo => "add_to",
48            IndvOp::SetIn => "set_in",
49            IndvOp::RemoveFrom => "remove_from",
50            IndvOp::None => "none",
51            IndvOp::RemovePredicates => "remove_predicates",
52        }
53        .to_string()
54    }
55}
56
57#[derive(Clone)]
58struct NngClient {
59    name: String,
60    soc: Socket,
61    addr: String,
62    is_ready: bool,
63}
64
65impl NngClient {
66    fn new(name: &str, addr: String) -> NngClient {
67        NngClient {
68            name: name.to_owned(),
69            soc: Socket::new(Protocol::Req0).unwrap(),
70            addr,
71            is_ready: false,
72        }
73    }
74
75    fn connect(&mut self) -> bool {
76        if self.addr.is_empty() {
77            error!("nng {} : invalid addr: [{}]", self.name, self.addr);
78            return self.is_ready;
79        }
80
81        if let Err(e) = self.soc.dial(self.addr.as_str()) {
82            error!("nng {}: fail dial to [{}], err={}", self.name, self.addr, e);
83        } else {
84            info!("nng {}: success connect to [{}]", self.name, self.addr);
85            self.is_ready = true;
86
87            if let Err(e) = self.soc.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
88                error!("nng {}: fail set recv timeout, err={}", self.name, e);
89            }
90            if let Err(e) = self.soc.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
91                error!("nng {}: fail set send timeout, err={}", self.name, e);
92            }
93        }
94        self.is_ready
95    }
96}
97
98#[derive(Clone)]
99pub struct MStorageClient {
100    client: NngClient,
101}
102
103impl MStorageClient {
104    pub fn new(addr: String) -> MStorageClient {
105        MStorageClient {
106            client: NngClient::new("mstorage client", addr),
107        }
108    }
109
110    pub fn connect(&mut self) -> bool {
111        self.client.connect()
112    }
113}