1use v_individual_model::onto::individual::Individual;
2use crate::v_api::common_type::ResultCode;
3use nng::options::{Options, RecvTimeout, SendTimeout};
4use nng::{Message, Protocol, Socket};
5use serde_json::json;
6use serde_json::Value;
7use std::fmt;
8use std::net::IpAddr;
9use std::time::Duration;
10
11pub const ALL_MODULES: i64 = 0;
12
13#[derive(Debug)]
14pub struct ApiError {
15 pub result: ResultCode,
16 info: String,
17}
18
19impl fmt::Display for ApiError {
20 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21 write!(f, "There is an error: {} {:?}", self.info, self.result)
22 }
23}
24
25impl ApiError {
28 fn new(result: ResultCode, info: &str) -> Self {
29 ApiError {
30 result,
31 info: info.to_owned(),
32 }
33 }
34}
35
36impl Default for ApiError {
37 fn default() -> Self {
38 ApiError {
39 result: ResultCode::Zero,
40 info: Default::default(),
41 }
42 }
43}
44#[derive(Eq, PartialEq, Debug, Clone)]
45#[repr(u16)]
46pub enum IndvOp {
47 Put = 1,
49
50 SetIn = 45,
52
53 AddTo = 47,
55
56 RemoveFrom = 48,
58
59 Remove = 51,
61
62 None = 52,
64
65 RemovePredicates = 53,
67}
68
69impl IndvOp {
70 pub fn from_i64(value: i64) -> IndvOp {
71 match value {
72 1 => IndvOp::Put,
73 51 => IndvOp::Remove,
74 47 => IndvOp::AddTo,
75 45 => IndvOp::SetIn,
76 48 => IndvOp::RemoveFrom,
77 52 => IndvOp::None,
78 53 => IndvOp::RemovePredicates,
79 _ => IndvOp::None,
81 }
82 }
83
84 pub fn to_i64(&self) -> i64 {
85 match self {
86 IndvOp::Put => 1,
87 IndvOp::Remove => 51,
88 IndvOp::AddTo => 47,
89 IndvOp::SetIn => 45,
90 IndvOp::RemoveFrom => 48,
91 IndvOp::None => 52,
92 IndvOp::RemovePredicates => 53,
93 }
94 }
95
96 pub fn as_string(&self) -> String {
97 match self {
98 IndvOp::Put => "put",
99 IndvOp::Remove => "remove",
100 IndvOp::AddTo => "add_to",
101 IndvOp::SetIn => "set_in",
102 IndvOp::RemoveFrom => "remove_from",
103 IndvOp::None => "none",
104 IndvOp::RemovePredicates => "remove_predicates",
105 }
106 .to_string()
107 }
108}
109
110#[derive(Debug)]
111pub struct OpResult {
112 pub result: ResultCode,
113 pub op_id: i64,
114}
115
116impl OpResult {
117 pub fn res(r: ResultCode) -> Self {
118 OpResult {
119 result: r,
120 op_id: -1,
121 }
122 }
123}
124
125#[derive(Clone)]
126pub struct NngClient {
127 name: String,
128 soc: Socket,
129 addr: String,
130 is_ready: bool,
131}
132
133impl NngClient {
134 pub fn new(name: &str, addr: String) -> NngClient {
135 NngClient {
136 name: name.to_owned(),
137 soc: Socket::new(Protocol::Req0).unwrap(),
138 addr,
139 is_ready: false,
140 }
141 }
142
143 pub fn connect(&mut self) -> bool {
144 if self.addr.is_empty() {
145 error!("nng {} : invalid addr: [{}]", self.name, self.addr);
146 return self.is_ready;
147 }
148
149 if let Err(e) = self.soc.dial(self.addr.as_str()) {
150 error!("nng {}: fail dial to [{}], err={}", self.name, self.addr, e);
151 } else {
152 info!("nng {}: success connect to [{}]", self.name, self.addr);
153 self.is_ready = true;
154
155 if let Err(e) = self.soc.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
156 error!("nng {}: fail set recv timeout, err={}", self.name, e);
157 }
158 if let Err(e) = self.soc.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
159 error!("nng {}: fail set send timeout, err={}", self.name, e);
160 }
161 }
162 self.is_ready
163 }
164
165 pub(crate) fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
166 if !self.is_ready {
167 self.connect();
168 }
169 if !self.is_ready {
170 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail connect", self.name)));
171 }
172
173 let req = Message::from(query.to_string().as_bytes());
174
175 if let Err(e) = self.soc.send(req) {
176 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail send, err={:?}", self.name, e)));
177 }
178
179 let wmsg = self.soc.recv();
181
182 if let Err(e) = wmsg {
183 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail recv, err={:?}", self.name, e)));
184 }
185
186 let msg = wmsg.unwrap();
187
188 debug!("nng-client: recv msg = {}", &String::from_utf8_lossy(&msg));
189
190 let reply = serde_json::from_str(&String::from_utf8_lossy(&msg));
191
192 if let Err(e) = reply {
193 return Err(ApiError::new(ResultCode::BadRequest, &format!("nng {}: fail parse result operation [put], err={:?}", self.name, e)));
194 }
195 Ok(reply.unwrap())
196 }
197}
198
199pub struct AuthClient {
200 client: NngClient,
201}
202
203impl AuthClient {
204 pub fn new(addr: String) -> AuthClient {
205 AuthClient {
206 client: NngClient::new("auth client", addr),
207 }
208 }
209
210 pub fn connect(&mut self) -> bool {
211 self.client.connect()
212 }
213
214 fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
215 match self.client.req_recv(query) {
216 Ok(v) => {
217 if let Some(r) = v["result"].as_i64() {
218 let res = ResultCode::from_i64(r);
219 if res != ResultCode::Ok {
220 return Err(ApiError::new(res, "api:update - invalid \"data\" section"));
221 }
222 Ok(v)
223 } else {
224 Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"))
225 }
226 },
227 Err(e) => Err(e),
228 }
229 }
230
231 pub fn authenticate(&mut self, login: &str, password: &Option<String>, addr: Option<IpAddr>, secret: &Option<String>, domain: Option<&str>, initiator: Option<&str>, provider: Option<&str>) -> Result<Value, ApiError> {
232 let mut query = json!({
233 "function": "authenticate",
234 "login": login,
235 "password": password,
236 "addr" : addr.unwrap().to_string(),
237 "secret" : secret
238 });
239
240 if let Some(d) = domain {
242 query["domain"] = json!(d);
243 }
244 if let Some(i) = initiator {
245 query["initiator"] = json!(i);
246 }
247 if let Some(p) = provider {
248 query["provider"] = json!(p);
249 }
250
251 self.req_recv(query)
252 }
253
254 pub fn get_ticket_trusted(&mut self, ticket: &str, login: Option<&String>, addr: Option<IpAddr>, domain: Option<&str>) -> Result<Value, ApiError> {
255 let mut query = json!({
256 "function": "get_ticket_trusted",
257 "login": login,
258 "addr" : addr.unwrap().to_string(),
259 "ticket": ticket,
260 });
261
262 if let Some(d) = domain {
264 query["domain"] = json!(d);
265 }
266
267 self.req_recv(query)
268 }
269
270 pub fn logout(&mut self, ticket: &Option<String>, addr: Option<IpAddr>) -> Result<Value, ApiError> {
271 let query = json!({
272 "function": "logout",
273 "addr" : addr.unwrap().to_string(),
274 "ticket": ticket
275 });
276 self.req_recv(query)
277 }
278}
279
280#[derive(Clone)]
281pub struct MStorageClient {
282 client: NngClient,
283 pub check_ticket_ip: bool,
284}
285
286impl MStorageClient {
287 pub fn new(addr: String) -> MStorageClient {
288 MStorageClient {
289 client: NngClient::new("mstorage client", addr),
290 check_ticket_ip: true,
291 }
292 }
293
294 pub fn connect(&mut self) -> bool {
295 self.client.connect()
296 }
297
298 pub fn update(&mut self, ticket: &str, cmd: IndvOp, indv: &Individual) -> OpResult {
299 match self.update_use_param(ticket, "", "", ALL_MODULES, cmd, indv) {
300 Ok(r) => r,
301 Err(e) => OpResult::res(e.result),
302 }
303 }
304
305 pub fn update_or_err(&mut self, ticket: &str, event_id: &str, src: &str, cmd: IndvOp, indv: &Individual) -> Result<OpResult, ApiError> {
306 self.update_use_param(ticket, event_id, src, ALL_MODULES, cmd, indv)
307 }
308
309 pub fn update_use_param(
310 &mut self,
311 ticket: &str,
312 event_id: &str,
313 src: &str,
314 assigned_subsystems: i64,
315 cmd: IndvOp,
316 indv: &Individual,
317 ) -> Result<OpResult, ApiError> {
318 let query = json!({
319 "function": cmd.as_string(),
320 "ticket": ticket,
321 "individuals": [indv.get_obj().as_json()],
322 "assigned_subsystems": assigned_subsystems,
323 "event_id" : event_id,
324 "src" : src,
325 });
326
327 self.update_form_json(query)
328 }
329
330 pub fn updates_use_param(
331 &mut self,
332 ticket: &str,
333 event_id: &str,
334 src: &str,
335 assigned_subsystems: i64,
336 cmd: IndvOp,
337 indvs: &[Individual],
338 ) -> Result<OpResult, ApiError> {
339 self.updates_use_param_with_addr((ticket, None), event_id, src, assigned_subsystems, cmd, indvs)
340 }
341
342 pub fn updates_use_param_with_addr(
343 &mut self,
344 ticket_addr: (&str, Option<IpAddr>),
345 event_id: &str,
346 src: &str,
347 assigned_subsystems: i64,
348 cmd: IndvOp,
349 indvs: &[Individual],
350 ) -> Result<OpResult, ApiError> {
351 let (ticket, addr) = ticket_addr;
352
353 let mut jindvs = vec![];
354 for indv in indvs {
355 jindvs.push(indv.get_obj().as_json());
356 }
357 let query = json!({
358 "function": cmd.as_string(),
359 "ticket": ticket,
360 "individuals": jindvs,
361 "assigned_subsystems": assigned_subsystems,
362 "event_id" : event_id,
363 "src" : src,
364 "addr": addr
365 });
366 self.update_form_json(query)
367 }
368
369 pub fn update_form_json(&mut self, query: Value) -> Result<OpResult, ApiError> {
370 let json: Value = self.client.req_recv(query)?;
371
372 if let Some(t) = json["type"].as_str() {
373 if t != "OpResult" {
374 return Err(ApiError::new(ResultCode::BadRequest, &format!("api:update - expecten \"type\" = \"OpResult\", found {}", t)));
375 }
376 } else {
377 return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"type\""));
378 }
379
380 if let Some(arr) = json["data"].as_array() {
381 if arr.len() != 1 {
382 return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
383 }
384
385 if let Some(res) = arr[0]["result"].as_i64() {
386 if let Some(op_id) = arr[0]["op_id"].as_i64() {
387 return Ok(OpResult {
388 result: ResultCode::from_i64(res),
389 op_id,
390 });
391 }
392 } else {
393 return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
394 }
395 } else {
396 return if let Some(res) = json["result"].as_i64() {
397 Ok(OpResult {
398 result: ResultCode::from_i64(res),
399 op_id: 0,
400 })
401 } else {
402 error!("api:update - not found \"data\"");
403 return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"data\""));
404 };
405 }
406
407 Err(ApiError::new(ResultCode::BadRequest, "api:update - unknown"))
408 }
409}