1use v_individual_model::onto::individual::Individual;
2use crate::v_api::common_type::ResultCode;
3use nng::options::{Options, RecvTimeout, SendTimeout};
4use nng::{Message, Protocol, Socket};
5use serde_json::json;
6use serde_json::Value;
7use std::fmt;
8use std::net::IpAddr;
9use std::time::Duration;
10
11pub const ALL_MODULES: i64 = 0;
12
13#[derive(Debug)]
14pub struct ApiError {
15 pub result: ResultCode,
16 info: String,
17}
18
19impl fmt::Display for ApiError {
20 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
21 write!(f, "There is an error: {} {:?}", self.info, self.result)
22 }
23}
24
25impl ApiError {
28 fn new(result: ResultCode, info: &str) -> Self {
29 ApiError {
30 result,
31 info: info.to_owned(),
32 }
33 }
34}
35
36impl Default for ApiError {
37 fn default() -> Self {
38 ApiError {
39 result: ResultCode::Zero,
40 info: Default::default(),
41 }
42 }
43}
44pub use v_module_queue::IndvOp;
45
46#[derive(Debug)]
47pub struct OpResult {
48 pub result: ResultCode,
49 pub op_id: i64,
50}
51
52impl OpResult {
53 pub fn res(r: ResultCode) -> Self {
54 OpResult {
55 result: r,
56 op_id: -1,
57 }
58 }
59}
60
61#[derive(Clone)]
62pub struct NngClient {
63 name: String,
64 soc: Socket,
65 addr: String,
66 is_ready: bool,
67}
68
69impl NngClient {
70 pub fn new(name: &str, addr: String) -> NngClient {
71 NngClient {
72 name: name.to_owned(),
73 soc: Socket::new(Protocol::Req0).unwrap(),
74 addr,
75 is_ready: false,
76 }
77 }
78
79 pub fn connect(&mut self) -> bool {
80 if self.addr.is_empty() {
81 error!("nng {} : invalid addr: [{}]", self.name, self.addr);
82 return self.is_ready;
83 }
84
85 if let Err(e) = self.soc.dial(self.addr.as_str()) {
86 error!("nng {}: fail dial to [{}], err={}", self.name, self.addr, e);
87 } else {
88 info!("nng {}: success connect to [{}]", self.name, self.addr);
89 self.is_ready = true;
90
91 if let Err(e) = self.soc.set_opt::<RecvTimeout>(Some(Duration::from_secs(30))) {
92 error!("nng {}: fail set recv timeout, err={}", self.name, e);
93 }
94 if let Err(e) = self.soc.set_opt::<SendTimeout>(Some(Duration::from_secs(30))) {
95 error!("nng {}: fail set send timeout, err={}", self.name, e);
96 }
97 }
98 self.is_ready
99 }
100
101 pub(crate) fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
102 if !self.is_ready {
103 self.connect();
104 }
105 if !self.is_ready {
106 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail connect", self.name)));
107 }
108
109 let req = Message::from(query.to_string().as_bytes());
110
111 if let Err(e) = self.soc.send(req) {
112 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail send, err={:?}", self.name, e)));
113 }
114
115 let wmsg = self.soc.recv();
117
118 if let Err(e) = wmsg {
119 return Err(ApiError::new(ResultCode::NotReady, &format!("nng {}: fail recv, err={:?}", self.name, e)));
120 }
121
122 let msg = wmsg.unwrap();
123
124 debug!("nng-client: recv msg = {}", &String::from_utf8_lossy(&msg));
125
126 let reply = serde_json::from_str(&String::from_utf8_lossy(&msg));
127
128 if let Err(e) = reply {
129 return Err(ApiError::new(ResultCode::BadRequest, &format!("nng {}: fail parse result operation [put], err={:?}", self.name, e)));
130 }
131 Ok(reply.unwrap())
132 }
133}
134
135pub struct AuthClient {
136 client: NngClient,
137}
138
139impl AuthClient {
140 pub fn new(addr: String) -> AuthClient {
141 AuthClient {
142 client: NngClient::new("auth client", addr),
143 }
144 }
145
146 pub fn connect(&mut self) -> bool {
147 self.client.connect()
148 }
149
150 fn req_recv(&mut self, query: Value) -> Result<Value, ApiError> {
151 match self.client.req_recv(query) {
152 Ok(v) => {
153 if let Some(r) = v["result"].as_i64() {
154 let res = ResultCode::from_i64(r);
155 if res != ResultCode::Ok {
156 return Err(ApiError::new(res, "api:update - invalid \"data\" section"));
157 }
158 Ok(v)
159 } else {
160 Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"))
161 }
162 },
163 Err(e) => Err(e),
164 }
165 }
166
167 pub fn authenticate(&mut self, login: &str, password: &Option<String>, addr: Option<IpAddr>, secret: &Option<String>, domain: Option<&str>, initiator: Option<&str>, provider: Option<&str>) -> Result<Value, ApiError> {
168 let mut query = json!({
169 "function": "authenticate",
170 "login": login,
171 "password": password,
172 "addr" : addr.unwrap().to_string(),
173 "secret" : secret
174 });
175
176 if let Some(d) = domain {
178 query["domain"] = json!(d);
179 }
180 if let Some(i) = initiator {
181 query["initiator"] = json!(i);
182 }
183 if let Some(p) = provider {
184 query["provider"] = json!(p);
185 }
186
187 self.req_recv(query)
188 }
189
190 pub fn get_ticket_trusted(&mut self, ticket: &str, login: Option<&String>, addr: Option<IpAddr>, domain: Option<&str>) -> Result<Value, ApiError> {
191 let mut query = json!({
192 "function": "get_ticket_trusted",
193 "login": login,
194 "addr" : addr.unwrap().to_string(),
195 "ticket": ticket,
196 });
197
198 if let Some(d) = domain {
200 query["domain"] = json!(d);
201 }
202
203 self.req_recv(query)
204 }
205
206 pub fn logout(&mut self, ticket: &Option<String>, addr: Option<IpAddr>) -> Result<Value, ApiError> {
207 let query = json!({
208 "function": "logout",
209 "addr" : addr.unwrap().to_string(),
210 "ticket": ticket
211 });
212 self.req_recv(query)
213 }
214}
215
216#[derive(Clone)]
217pub struct MStorageClient {
218 client: NngClient,
219 pub check_ticket_ip: bool,
220}
221
222impl MStorageClient {
223 pub fn new(addr: String) -> MStorageClient {
224 MStorageClient {
225 client: NngClient::new("mstorage client", addr),
226 check_ticket_ip: true,
227 }
228 }
229
230 pub fn connect(&mut self) -> bool {
231 self.client.connect()
232 }
233
234 pub fn update(&mut self, ticket: &str, cmd: IndvOp, indv: &Individual) -> OpResult {
235 match self.update_use_param(ticket, "", "", ALL_MODULES, cmd, indv) {
236 Ok(r) => r,
237 Err(e) => OpResult::res(e.result),
238 }
239 }
240
241 pub fn update_or_err(&mut self, ticket: &str, event_id: &str, src: &str, cmd: IndvOp, indv: &Individual) -> Result<OpResult, ApiError> {
242 self.update_use_param(ticket, event_id, src, ALL_MODULES, cmd, indv)
243 }
244
245 pub fn update_use_param(
246 &mut self,
247 ticket: &str,
248 event_id: &str,
249 src: &str,
250 assigned_subsystems: i64,
251 cmd: IndvOp,
252 indv: &Individual,
253 ) -> Result<OpResult, ApiError> {
254 let query = json!({
255 "function": cmd.as_string(),
256 "ticket": ticket,
257 "individuals": [indv.get_obj().as_json()],
258 "assigned_subsystems": assigned_subsystems,
259 "event_id" : event_id,
260 "src" : src,
261 });
262
263 self.update_form_json(query)
264 }
265
266 pub fn updates_use_param(
267 &mut self,
268 ticket: &str,
269 event_id: &str,
270 src: &str,
271 assigned_subsystems: i64,
272 cmd: IndvOp,
273 indvs: &[Individual],
274 ) -> Result<OpResult, ApiError> {
275 self.updates_use_param_with_addr((ticket, None), event_id, src, assigned_subsystems, cmd, indvs)
276 }
277
278 pub fn updates_use_param_with_addr(
279 &mut self,
280 ticket_addr: (&str, Option<IpAddr>),
281 event_id: &str,
282 src: &str,
283 assigned_subsystems: i64,
284 cmd: IndvOp,
285 indvs: &[Individual],
286 ) -> Result<OpResult, ApiError> {
287 let (ticket, addr) = ticket_addr;
288
289 let mut jindvs = vec![];
290 for indv in indvs {
291 jindvs.push(indv.get_obj().as_json());
292 }
293 let query = json!({
294 "function": cmd.as_string(),
295 "ticket": ticket,
296 "individuals": jindvs,
297 "assigned_subsystems": assigned_subsystems,
298 "event_id" : event_id,
299 "src" : src,
300 "addr": addr
301 });
302 self.update_form_json(query)
303 }
304
305 pub fn update_form_json(&mut self, query: Value) -> Result<OpResult, ApiError> {
306 let json: Value = self.client.req_recv(query)?;
307
308 if let Some(t) = json["type"].as_str() {
309 if t != "OpResult" {
310 return Err(ApiError::new(ResultCode::BadRequest, &format!("api:update - expecten \"type\" = \"OpResult\", found {}", t)));
311 }
312 } else {
313 return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"type\""));
314 }
315
316 if let Some(arr) = json["data"].as_array() {
317 if arr.len() != 1 {
318 return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
319 }
320
321 if let Some(res) = arr[0]["result"].as_i64() {
322 if let Some(op_id) = arr[0]["op_id"].as_i64() {
323 return Ok(OpResult {
324 result: ResultCode::from_i64(res),
325 op_id,
326 });
327 }
328 } else {
329 return Err(ApiError::new(ResultCode::BadRequest, "api:update - invalid \"data\" section"));
330 }
331 } else {
332 return if let Some(res) = json["result"].as_i64() {
333 Ok(OpResult {
334 result: ResultCode::from_i64(res),
335 op_id: 0,
336 })
337 } else {
338 error!("api:update - not found \"data\"");
339 return Err(ApiError::new(ResultCode::BadRequest, "api:update - not found \"data\""));
340 };
341 }
342
343 Err(ApiError::new(ResultCode::BadRequest, "api:update - unknown"))
344 }
345}