v_common/search/
ft_client.rs1use crate::search::common::{FTQuery, QueryResult};
2use crate::v_api::common_type::ResultCode;
3use nng::options::{Options, RecvTimeout, SendTimeout};
4use nng::{Message, Protocol, Socket};
5use serde_json::Value;
6use std::time::Duration;
7use std::{thread, time};
8
9pub struct FTClient {
10 client: Socket,
11 addr: String,
12 is_ready: bool,
13}
14
15impl FTClient {
16 pub fn new(_ro_client_addr: String) -> FTClient {
17 FTClient {
18 client: Socket::new(Protocol::Req0).unwrap(),
19 addr: _ro_client_addr,
20 is_ready: false,
21 }
22 }
23
24 pub fn connect(&mut self) -> bool {
25 if let Err(e) = self.client.dial(self.addr.as_str()) {
26 error!("ft-client:fail dial to ft-service, [{}], err={}", self.addr, e);
27 self.client = Socket::new(Protocol::Req0).unwrap();
28 } else {
29 info!("success connect to ft-service, [{}]", self.addr);
30
31 if let Err(e) = self.client.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
32 error!("fail set recv timeout, err={}", e);
33 }
34 if let Err(e) = self.client.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
35 error!("fail set send timeout, err={}", e);
36 }
37
38 self.is_ready = true;
39 }
40 self.is_ready
41 }
42
43 pub fn query(&mut self, query: FTQuery) -> QueryResult {
44 let mut res = QueryResult::default();
45
46 if !self.is_ready {
47 while !self.connect() {
48 error!("not ready, sleep...");
49 thread::sleep(time::Duration::from_millis(3000));
50 }
51 }
52
53 if !self.is_ready {
54 res.result_code = ResultCode::NotReady;
55 return res;
56 }
57
58 let req = Message::from(query.as_string().as_bytes());
59
60 if let Err(e) = self.client.send(req) {
61 error!("fail send to search module, err={:?}", e);
62 res.result_code = ResultCode::NotReady;
63 self.is_ready = false;
64 return res;
65 }
66
67 let wmsg = self.client.recv();
69
70 if let Err(e) = wmsg {
71 error!("fail recv from search module, err={:?}", e);
72 res.result_code = ResultCode::NotReady;
73 self.is_ready = false;
74 return res;
75 }
76
77 let msg = wmsg.unwrap();
78
79 let reply = String::from_utf8_lossy(&msg);
80
81 let v: Value = if let Ok(v) = serde_json::from_str(&reply) {
82 v
83 } else {
84 Value::Null
85 };
86
87 res.result_code = ResultCode::from_i64(v["result_code"].as_i64().unwrap_or_default());
88
89 if res.result_code == ResultCode::Ok {
90 let jarray: &Vec<_> = v["result"].as_array().expect("array");
91 res.result = jarray.iter().map(|v| v.as_str().unwrap_or_default().to_owned()).collect();
92
93 res.count = v["count"].as_i64().unwrap_or_default();
94 res.estimated = v["estimated"].as_i64().unwrap_or_default();
95 res.processed = v["processed"].as_i64().unwrap_or_default();
96 res.cursor = v["cursor"].as_i64().unwrap_or_default();
97 }
98
99 res
101 }
102}