Skip to main content

v_common/search/
ft_client.rs

1use crate::search::common::{FTQuery, QueryResult};
2use crate::v_api::common_type::ResultCode;
3use nng::options::{Options, RecvTimeout, SendTimeout};
4use nng::{Message, Protocol, Socket};
5use serde_json::Value;
6use std::time::Duration;
7use std::{thread, time};
8
9pub struct FTClient {
10    client: Socket,
11    addr: String,
12    is_ready: bool,
13}
14
15impl FTClient {
16    pub fn new(_ro_client_addr: String) -> FTClient {
17        FTClient {
18            client: Socket::new(Protocol::Req0).unwrap(),
19            addr: _ro_client_addr,
20            is_ready: false,
21        }
22    }
23
24    pub fn connect(&mut self) -> bool {
25        if let Err(e) = self.client.dial(self.addr.as_str()) {
26            error!("ft-client:fail dial to ft-service, [{}], err={}", self.addr, e);
27            self.client = Socket::new(Protocol::Req0).unwrap();
28        } else {
29            info!("success connect to ft-service, [{}]", self.addr);
30
31            if let Err(e) = self.client.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
32                error!("fail set recv timeout, err={}", e);
33            }
34            if let Err(e) = self.client.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
35                error!("fail set send timeout, err={}", e);
36            }
37
38            self.is_ready = true;
39        }
40        self.is_ready
41    }
42
43    pub fn query(&mut self, query: FTQuery) -> QueryResult {
44        let mut res = QueryResult::default();
45
46        if !self.is_ready {
47            while !self.connect() {
48                error!("not ready, sleep...");
49                thread::sleep(time::Duration::from_millis(3000));
50            }
51        }
52
53        if !self.is_ready {
54            res.result_code = ResultCode::NotReady;
55            return res;
56        }
57
58        let req = Message::from(query.as_string().as_bytes());
59
60        if let Err(e) = self.client.send(req) {
61            error!("fail send to search module, err={:?}", e);
62            res.result_code = ResultCode::NotReady;
63            self.is_ready = false;
64            return res;
65        }
66
67        // Wait for the response from the server.
68        let wmsg = self.client.recv();
69
70        if let Err(e) = wmsg {
71            error!("fail recv from search module, err={:?}", e);
72            res.result_code = ResultCode::NotReady;
73            self.is_ready = false;
74            return res;
75        }
76
77        let msg = wmsg.unwrap();
78
79        let reply = String::from_utf8_lossy(&msg);
80
81        let v: Value = if let Ok(v) = serde_json::from_str(&reply) {
82            v
83        } else {
84            Value::Null
85        };
86
87        res.result_code = ResultCode::from_i64(v["result_code"].as_i64().unwrap_or_default());
88
89        if res.result_code == ResultCode::Ok {
90            let jarray: &Vec<_> = v["result"].as_array().expect("array");
91            res.result = jarray.iter().map(|v| v.as_str().unwrap_or_default().to_owned()).collect();
92
93            res.count = v["count"].as_i64().unwrap_or_default();
94            res.estimated = v["estimated"].as_i64().unwrap_or_default();
95            res.processed = v["processed"].as_i64().unwrap_or_default();
96            res.cursor = v["cursor"].as_i64().unwrap_or_default();
97        }
98
99        //info!("msg={}", v);
100        res
101    }
102}