1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use crate::common::{FTQuery, QueryResult};
use nng::{Message, Protocol, Socket};
use serde_json::Value;
use std::{thread, time};
use v_api::app::ResultCode;
use std::time::Duration;
use nng::options::{RecvTimeout, Options, SendTimeout};
pub struct FTClient {
client: Socket,
addr: String,
is_ready: bool,
}
impl FTClient {
pub fn new(_ro_client_addr: String) -> FTClient {
FTClient {
client: Socket::new(Protocol::Req0).unwrap(),
addr: _ro_client_addr,
is_ready: false,
}
}
pub fn connect(&mut self) -> bool {
if let Err(e) = self.client.dial(self.addr.as_str()) {
error!("ft-client:fail dial to ft-service, [{}], err={}", self.addr, e);
} else {
info!("success connect to ft-service, [{}]", self.addr);
if let Err(e) = self.client.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
error!("fail set recv timeout, err={}", e);
}
if let Err(e) = self.client.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
error!("fail set send timeout, err={}", e);
}
self.is_ready = true;
}
self.is_ready
}
pub fn query(&mut self, query: FTQuery) -> QueryResult {
let mut res = QueryResult::default();
if !self.is_ready {
while !self.connect() {
error!("not ready, sleep...");
thread::sleep(time::Duration::from_millis(3000));
}
}
if !self.is_ready {
res.result_code = ResultCode::NotReady;
return res;
}
let req = Message::from(query.as_string().as_bytes());
if let Err(e) = self.client.send(req) {
error!("fail send to search module, err={:?}", e);
res.result_code = ResultCode::NotReady;
return res;
}
let wmsg = self.client.recv();
if let Err(e) = wmsg {
error!("fail recv from search module, err={:?}", e);
res.result_code = ResultCode::NotReady;
return res;
}
let msg = wmsg.unwrap();
let reply = String::from_utf8_lossy(&msg);
let v: Value = if let Ok(v) = serde_json::from_str(&reply) {
v
} else {
Value::Null
};
res.result_code = ResultCode::from_i64(v["result_code"].as_i64().unwrap_or_default());
if res.result_code == ResultCode::Ok {
let jarray: &Vec<_> = &v["result"].as_array().expect("array");
res.result = jarray.iter().map(|v| v.as_str().unwrap_or_default().to_owned()).collect();
res.count = v["count"].as_i64().unwrap_or_default();
res.estimated = v["estimated"].as_i64().unwrap_or_default();
res.processed = v["processed"].as_i64().unwrap_or_default();
res.cursor = v["cursor"].as_i64().unwrap_or_default();
}
res
}
}