Skip to main content

v_common/v_api/
api_client.rs

1use v_individual_model::onto::individual::Individual;
2use crate::v_api::common_type::ResultCode;
3use nng::options::{Options, RecvTimeout, SendTimeout};
4use nng::{Message, Protocol, Socket};
5use serde_json::json;
6use serde_json::Value;
7use std::fmt;
8use std::net::IpAddr;
9use std::time::Duration;
10
11pub const ALL_MODULES: i64 = 0;
12
13#[derive(Debug)]
14pub struct ApiError {
15    pub result: ResultCode,
16    info: String,
17}
18
19impl fmt::Display for ApiError {
20    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21        write!(f, "There is an error: {} {:?}", self.info, self.result)
22    }
23}
24
25//impl Error for ApiError {}
26
27impl ApiError {
28    fn new(result: ResultCode, info: &str) -> Self {
29        ApiError {
30            result,
31            info: info.to_owned(),
32        }
33    }
34}
35
36impl Default for ApiError {
37    fn default() -> Self {
38        ApiError {
39            result: ResultCode::Zero,
40            info: Default::default(),
41        }
42    }
43}
44pub use v_module_queue::IndvOp;
45
46#[derive(Debug)]
47pub struct OpResult {
48    pub result: ResultCode,
49    pub op_id: i64,
50}
51
52impl OpResult {
53    pub fn res(r: ResultCode) -> Self {
54        OpResult {
55            result: r,
56            op_id: -1,
57        }
58    }
59}
60
61#[derive(Clone)]
62pub struct NngClient {
63    name: String,
64    soc: Socket,
65    addr: String,
66    is_ready: bool,
67}
68
69impl NngClient {
70    pub fn new(name: &str, addr: String) -> NngClient {
71        NngClient {
72            name: name.to_owned(),
73            soc: Socket::new(Protocol::Req0).unwrap(),
74            addr,
75            is_ready: false,
76        }
77    }
78
79    pub fn connect(&mut self) -> bool {
80        if self.addr.is_empty() {
81            error!("nng {} : invalid addr: [{}]", self.name, self.addr);
82            return self.is_ready;
83        }
84
85        if let Err(e) = self.soc.dial(self.addr.as_str()) {
86            error!("nng {}: fail dial to [{}], err={}", self.name, self.addr, e);
87        } else {
88            info!("nng {}: success connect to [{}]", self.name, self.addr);
89            self.is_ready = true;
90
91            if let Err(e) = self.soc.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
92                error!("nng {}: fail set recv timeout, err={}", self.name, e);
93            }
94            if let Err(e) = self.soc.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
95                error!("nng {}: fail set send timeout, err={}", self.name, e);
96            }
97        }
98        self.is_ready
99    }
100
101    pub(crate) fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
102        if !self.is_ready {
103            self.connect();
104        }
105        if !self.is_ready {
106            return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail connect", self.name)));
107        }
108
109        let req = Message::from(query.to_string().as_bytes());
110
111        if let Err(e) = self.soc.send(req) {
112            return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail send, err={:?}", self.name, e)));
113        }
114
115        // Wait for the response from the server.
116        let wmsg = self.soc.recv();
117
118        if let Err(e) = wmsg {
119            return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail recv, err={:?}", self.name, e)));
120        }
121
122        let msg = wmsg.unwrap();
123
124        debug!("nng-client: recv msg = {}", &String::from_utf8_lossy(&msg));
125
126        let reply = serde_json::from_str(&String::from_utf8_lossy(&msg));
127
128        if let Err(e) = reply {
129            return Err(ApiError::new(ResultCode::BadRequest, &format!("nng {}: fail parse result operation [put], err={:?}", self.name, e)));
130        }
131        Ok(reply.unwrap())
132    }
133}
134
135pub struct AuthClient {
136    client: NngClient,
137}
138
139impl AuthClient {
140    pub fn new(addr: String) -> AuthClient {
141        AuthClient {
142            client: NngClient::new("auth client", addr),
143        }
144    }
145
146    pub fn connect(&mut self) -> bool {
147        self.client.connect()
148    }
149
150    fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
151        match self.client.req_recv(query) {
152            Ok(v) => {
153                if let Some(r) = v["result"].as_i64() {
154                    let res = ResultCode::from_i64(r);
155                    if res != ResultCode::Ok {
156                        return Err(ApiError::new(res, "api:update - invalid \"data\" section"));
157                    }
158                    Ok(v)
159                } else {
160                    Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"))
161                }
162            },
163            Err(e) => Err(e),
164        }
165    }
166
167    pub fn authenticate(&mut self, login: &str, password: &Option<String>, addr: Option<IpAddr>, secret: &Option<String>, domain: Option<&str>, initiator: Option<&str>, provider: Option<&str>) -> Result<Value, ApiError> {
168        let mut query = json!({
169            "function": "authenticate",
170            "login": login,
171            "password": password,
172            "addr" : addr.unwrap().to_string(),
173            "secret" : secret
174        });
175
176        // Add optional parameters if provided
177        if let Some(d) = domain {
178            query["domain"] = json!(d);
179        }
180        if let Some(i) = initiator {
181            query["initiator"] = json!(i);
182        }
183        if let Some(p) = provider {
184            query["provider"] = json!(p);
185        }
186
187        self.req_recv(query)
188    }
189
190    pub fn get_ticket_trusted(&mut self, ticket: &str, login: Option<&String>, addr: Option<IpAddr>, domain: Option<&str>) -> Result<Value, ApiError> {
191        let mut query = json!({
192            "function": "get_ticket_trusted",
193            "login": login,
194            "addr" : addr.unwrap().to_string(),
195            "ticket": ticket,
196        });
197
198        // Add optional domain parameter if provided
199        if let Some(d) = domain {
200            query["domain"] = json!(d);
201        }
202
203        self.req_recv(query)
204    }
205
206    pub fn logout(&mut self, ticket: &Option<String>, addr: Option<IpAddr>) -> Result<Value, ApiError> {
207        let query = json!({
208            "function": "logout",
209            "addr" : addr.unwrap().to_string(),
210            "ticket": ticket
211        });
212        self.req_recv(query)
213    }
214}
215
216#[derive(Clone)]
217pub struct MStorageClient {
218    client: NngClient,
219    pub check_ticket_ip: bool,
220}
221
222impl MStorageClient {
223    pub fn new(addr: String) -> MStorageClient {
224        MStorageClient {
225            client: NngClient::new("mstorage client", addr),
226            check_ticket_ip: true,
227        }
228    }
229
230    pub fn connect(&mut self) -> bool {
231        self.client.connect()
232    }
233
234    pub fn update(&mut self, ticket: &str, cmd: IndvOp, indv: &Individual) -> OpResult {
235        match self.update_use_param(ticket, "", "", ALL_MODULES, cmd, indv) {
236            Ok(r) => r,
237            Err(e) => OpResult::res(e.result),
238        }
239    }
240
241    pub fn update_or_err(&mut self, ticket: &str, event_id: &str, src: &str, cmd: IndvOp, indv: &Individual) -> Result<OpResult, ApiError> {
242        self.update_use_param(ticket, event_id, src, ALL_MODULES, cmd, indv)
243    }
244
245    pub fn update_use_param(
246        &mut self,
247        ticket: &str,
248        event_id: &str,
249        src: &str,
250        assigned_subsystems: i64,
251        cmd: IndvOp,
252        indv: &Individual,
253    ) -> Result<OpResult, ApiError> {
254        let query = json!({
255            "function": cmd.as_string(),
256            "ticket": ticket,
257            "individuals": [indv.get_obj().as_json()],
258            "assigned_subsystems": assigned_subsystems,
259            "event_id" : event_id,
260            "src" : src,
261        });
262
263        self.update_form_json(query)
264    }
265
266    pub fn updates_use_param(
267        &mut self,
268        ticket: &str,
269        event_id: &str,
270        src: &str,
271        assigned_subsystems: i64,
272        cmd: IndvOp,
273        indvs: &[Individual],
274    ) -> Result<OpResult, ApiError> {
275        self.updates_use_param_with_addr((ticket, None), event_id, src, assigned_subsystems, cmd, indvs)
276    }
277
278    pub fn updates_use_param_with_addr(
279        &mut self,
280        ticket_addr: (&str, Option<IpAddr>),
281        event_id: &str,
282        src: &str,
283        assigned_subsystems: i64,
284        cmd: IndvOp,
285        indvs: &[Individual],
286    ) -> Result<OpResult, ApiError> {
287        let (ticket, addr) = ticket_addr;
288
289        let mut jindvs = vec![];
290        for indv in indvs {
291            jindvs.push(indv.get_obj().as_json());
292        }
293        let query = json!({
294            "function": cmd.as_string(),
295            "ticket": ticket,
296            "individuals": jindvs,
297            "assigned_subsystems": assigned_subsystems,
298            "event_id" : event_id,
299            "src" : src,
300            "addr": addr
301        });
302        self.update_form_json(query)
303    }
304
305    pub fn update_form_json(&mut self, query: Value) -> Result<OpResult, ApiError> {
306        let json: Value = self.client.req_recv(query)?;
307
308        if let Some(t) = json["type"].as_str() {
309            if t != "OpResult" {
310                return Err(ApiError::new(ResultCode::BadRequest, &format!("api:update - expecten \"type\" = \"OpResult\", found {}", t)));
311            }
312        } else {
313            return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"type\""));
314        }
315
316        if let Some(arr) = json["data"].as_array() {
317            if arr.len() != 1 {
318                return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
319            }
320
321            if let Some(res) = arr[0]["result"].as_i64() {
322                if let Some(op_id) = arr[0]["op_id"].as_i64() {
323                    return Ok(OpResult {
324                        result: ResultCode::from_i64(res),
325                        op_id,
326                    });
327                }
328            } else {
329                return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
330            }
331        } else {
332            return if let Some(res) = json["result"].as_i64() {
333                Ok(OpResult {
334                    result: ResultCode::from_i64(res),
335                    op_id: 0,
336                })
337            } else {
338                error!("api:update - not found \"data\"");
339                return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"data\""));
340            };
341        }
342
343        Err(ApiError::new(ResultCode::BadRequest, "api:update - unknown"))
344    }
345}