1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 io::{Error as IoError, ErrorKind},
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use axum::{
11 Extension,
12 body::{Body, Bytes},
13 extract::State,
14 http::{StatusCode, header},
15 response::IntoResponse,
16};
17use chrono::{DateTime, TimeZone, Utc};
18use log::{debug, error, info, warn};
19use serde::{Deserialize, Serialize};
20use serde_json::{self, Map, Value};
21use tokio::time;
22use url::Url;
23
24use general_mq::{
25 Queue,
26 queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27};
28use sylvia_iot_corelib::{
29 constants::ContentType,
30 err::{self, ErrResp},
31 http::{Json, Path, Query},
32 role::Role,
33 strings::{self, time_str},
34};
35
36use super::{
37 super::{
38 super::{ErrReq, State as AppState, middleware::GetTokenInfoData},
39 lib::{check_application, check_unit, gen_mgr_key},
40 },
41 request, response,
42};
43use crate::{
44 libs::{
45 config::BrokerCtrl as CfgCtrl,
46 mq::{
47 self, Connection, MgrStatus, Options as MgrOptions,
48 application::{ApplicationMgr, DlData, DlDataResp, EventHandler},
49 network::{DlData as NetworkDlData, NetworkMgr},
50 },
51 },
52 models::{
53 Cache, Model,
54 application::{
55 Application, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey, UpdateQueryCond,
56 Updates,
57 },
58 device, device_route, dldata_buffer, network_route,
59 },
60};
61
62struct MgrHandler {
63 model: Arc<dyn Model>,
64 cache: Option<Arc<dyn Cache>>,
65 network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
66 data_sender: Option<Queue>,
67}
68
69#[derive(Deserialize)]
70#[serde(tag = "operation")]
71enum RecvCtrlMsg {
72 #[serde(rename = "del-application")]
73 DelApplication { new: CtrlDelApplication },
74 #[serde(rename = "add-manager")]
75 AddManager { new: CtrlAddManager },
76 #[serde(rename = "del-manager")]
77 DelManager { new: String },
78}
79
80#[derive(Serialize)]
82#[serde(untagged)]
83enum SendCtrlMsg {
84 DelApplication {
85 operation: String,
86 new: CtrlDelApplication,
87 },
88 AddManager {
89 operation: String,
90 new: CtrlAddManager,
91 },
92 DelManager {
93 operation: String,
94 new: String,
95 },
96}
97
98#[derive(Serialize)]
100struct SendDataMsg {
101 kind: String,
102 data: SendDataKind,
103}
104
105#[derive(Serialize)]
106#[serde(untagged)]
107enum SendDataKind {
108 AppDlData {
109 #[serde(rename = "dataId")]
110 data_id: String,
111 proc: String,
112 status: i32,
113 #[serde(rename = "unitId")]
114 unit_id: String,
115 #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
116 device_id: Option<String>,
117 #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
118 network_code: Option<String>,
119 #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
120 network_addr: Option<String>,
121 profile: String,
122 data: String,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 extension: Option<Map<String, Value>>,
125 },
126 NetDlData {
127 #[serde(rename = "dataId")]
128 data_id: String,
129 proc: String,
130 #[serde(rename = "pub")]
131 publish: String,
132 status: i32,
133 #[serde(rename = "unitId")]
134 unit_id: String,
135 #[serde(rename = "deviceId")]
136 device_id: String,
137 #[serde(rename = "networkCode")]
138 network_code: String,
139 #[serde(rename = "networkAddr")]
140 network_addr: String,
141 profile: String,
142 data: String,
143 #[serde(skip_serializing_if = "Option::is_none")]
144 extension: Option<Map<String, Value>>,
145 },
146}
147
148struct CtrlMsgOp;
149struct DataMsgKind;
150
151#[derive(Deserialize, Serialize)]
152struct CtrlDelApplication {
153 #[serde(rename = "unitId")]
154 unit_id: String,
155 #[serde(rename = "unitCode")]
156 unit_code: String,
157 #[serde(rename = "applicationId")]
158 application_id: String,
159 #[serde(rename = "applicationCode")]
160 application_code: String,
161}
162
163#[derive(Deserialize, Serialize)]
164struct CtrlAddManager {
165 #[serde(rename = "hostUri")]
166 host_uri: String,
167 #[serde(rename = "mgrOptions")]
168 mgr_options: MgrOptions,
169}
170
171struct CtrlSenderHandler;
172
173struct CtrlReceiverHandler {
174 model: Arc<dyn Model>,
175 cache: Option<Arc<dyn Cache>>,
176 mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
177 application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
178 network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
179 data_sender: Option<Queue>,
180}
181
182impl CtrlMsgOp {
183 const DEL_APPLICATION: &'static str = "del-application";
184 const ADD_MANAGER: &'static str = "add-manager";
185 const DEL_MANAGER: &'static str = "del-manager";
186}
187
188impl DataMsgKind {
189 const APP_DLDATA: &'static str = "application-dldata";
190 const NET_DLDATA: &'static str = "network-dldata";
191}
192
193const LIST_LIMIT_DEFAULT: u64 = 100;
194const LIST_CURSOR_MAX: u64 = 100;
195const ID_RAND_LEN: usize = 8;
196const DATA_ID_RAND_LEN: usize = 12;
197const DATA_EXPIRES_IN: i64 = 86400; const CTRL_QUEUE_NAME: &'static str = "application";
199const DEF_DLDATA_STATUS: i32 = -2;
200
201pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
203 const FN_NAME: &'static str = "init";
204
205 let q = new_ctrl_receiver(state, ctrl_conf)?;
206 {
207 state
208 .ctrl_receivers
209 .lock()
210 .unwrap()
211 .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
212 }
213
214 let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
215 for _ in 0..500 {
217 if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
218 break;
219 }
220 time::sleep(Duration::from_millis(10)).await;
221 }
222 if ctrl_sender.status() != Status::Connected {
223 error!(
224 "[{}] {} control sender not connected",
225 FN_NAME, CTRL_QUEUE_NAME
226 );
227 return Err(Box::new(IoError::new(
228 ErrorKind::NotConnected,
229 format!("control sender {} not connected", CTRL_QUEUE_NAME),
230 )));
231 }
232 if q.status() != Status::Connected {
233 error!(
234 "[{}] {} control receiver not connected",
235 FN_NAME, CTRL_QUEUE_NAME
236 );
237 return Err(Box::new(IoError::new(
238 ErrorKind::NotConnected,
239 format!("control receiver {} not connected", CTRL_QUEUE_NAME),
240 )));
241 }
242
243 let cond = ListQueryCond {
244 ..Default::default()
245 };
246 let opts = ListOptions {
247 cond: &cond,
248 offset: None,
249 limit: None,
250 sort: None,
251 cursor_max: Some(LIST_CURSOR_MAX),
252 };
253 let mut list;
254 let mut cursor = None;
255 loop {
256 (list, cursor) = state.model.application().list(&opts, cursor).await?;
257 for item in list.iter() {
258 let url = Url::parse(item.host_uri.as_str())?;
259 let key = gen_mgr_key(item.unit_code.as_str(), item.code.as_str());
260 let opts = MgrOptions {
261 unit_id: item.unit_id.clone(),
262 unit_code: item.unit_code.clone(),
263 id: item.application_id.clone(),
264 name: item.code.clone(),
265 prefetch: Some(state.amqp_prefetch),
266 persistent: state.amqp_persistent,
267 shared_prefix: Some(state.mqtt_shared_prefix.clone()),
268 };
269 let handler = MgrHandler {
270 model: state.model.clone(),
271 cache: state.cache.clone(),
272 network_mgrs: state.network_mgrs.clone(),
273 data_sender: state.data_sender.clone(),
274 };
275 let mgr =
276 match ApplicationMgr::new(state.mq_conns.clone(), &url, opts, Arc::new(handler)) {
277 Err(e) => {
278 error!("[{}] new manager for {} error: {}", FN_NAME, key, e);
279 return Err(Box::new(ErrResp::ErrRsc(Some(e))));
280 }
281 Ok(mgr) => mgr,
282 };
283 {
284 state
285 .application_mgrs
286 .lock()
287 .unwrap()
288 .insert(key.clone(), mgr);
289 }
290 }
291 if cursor.is_none() {
292 break;
293 }
294 }
295
296 Ok(())
297}
298
299pub fn new_ctrl_sender(
301 conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
302 config: &CfgCtrl,
303) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
304 let url = match config.url.as_ref() {
305 None => {
306 return Err(Box::new(IoError::new(
307 ErrorKind::InvalidInput,
308 "empty control url",
309 )));
310 }
311 Some(url) => match Url::parse(url.as_str()) {
312 Err(e) => return Err(Box::new(e)),
313 Ok(url) => url,
314 },
315 };
316
317 match mq::control::new(
318 conn_pool.clone(),
319 &url,
320 config.prefetch,
321 CTRL_QUEUE_NAME,
322 false,
323 Arc::new(CtrlSenderHandler {}),
324 Arc::new(CtrlSenderHandler {}),
325 ) {
326 Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
327 Ok(q) => Ok(Arc::new(Mutex::new(q))),
328 }
329}
330
331pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
333 let url = match config.url.as_ref() {
334 None => {
335 return Err(Box::new(IoError::new(
336 ErrorKind::InvalidInput,
337 "empty control url",
338 )));
339 }
340 Some(url) => match Url::parse(url.as_str()) {
341 Err(e) => return Err(Box::new(e)),
342 Ok(url) => url,
343 },
344 };
345 let handler = Arc::new(CtrlReceiverHandler {
346 model: state.model.clone(),
347 cache: state.cache.clone(),
348 mq_conns: state.mq_conns.clone(),
349 application_mgrs: state.application_mgrs.clone(),
350 network_mgrs: state.network_mgrs.clone(),
351 data_sender: state.data_sender.clone(),
352 });
353 match mq::control::new(
354 state.mq_conns.clone(),
355 &url,
356 config.prefetch,
357 CTRL_QUEUE_NAME,
358 true,
359 handler.clone(),
360 handler,
361 ) {
362 Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
363 Ok(q) => Ok(q),
364 }
365}
366
367pub async fn post_application(
369 State(state): State<AppState>,
370 Extension(token_info): Extension<GetTokenInfoData>,
371 Json(body): Json<request::PostApplicationBody>,
372) -> impl IntoResponse {
373 const FN_NAME: &'static str = "post_application";
374
375 let user_id = token_info.user_id.as_str();
376 let roles = &token_info.roles;
377
378 let code = body.data.code.to_lowercase();
379 let host_uri = body.data.host_uri.as_str();
380 if !strings::is_code(code.as_str()) {
381 return Err(ErrResp::ErrParam(Some(
382 "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
383 )));
384 }
385 let host_uri = match Url::parse(host_uri) {
386 Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
387 Ok(uri) => match mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
388 false => {
389 return Err(ErrResp::ErrParam(Some(
390 "unsupport `hostUri` scheme".to_string(),
391 )));
392 }
393 true => uri,
394 },
395 };
396 if let Some(info) = body.data.info.as_ref() {
397 for (k, _) in info.iter() {
398 if k.len() == 0 {
399 return Err(ErrResp::ErrParam(Some(
400 "`info` key must not be empty".to_string(),
401 )));
402 }
403 }
404 }
405 let unit_id = body.data.unit_id.as_str();
406 if unit_id.len() == 0 {
407 return Err(ErrResp::ErrParam(Some(
408 "`unitId` must with at least one character".to_string(),
409 )));
410 }
411 let unit_code = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
412 None => {
413 return Err(ErrResp::Custom(
414 ErrReq::UNIT_NOT_EXIST.0,
415 ErrReq::UNIT_NOT_EXIST.1,
416 None,
417 ));
418 }
419 Some(unit) => unit.code,
420 };
421 if check_code(FN_NAME, unit_id, code.as_str(), &state).await? {
422 return Err(ErrResp::Custom(
423 ErrReq::APPLICATION_EXIST.0,
424 ErrReq::APPLICATION_EXIST.1,
425 None,
426 ));
427 }
428
429 let now = Utc::now();
430 let application = Application {
431 application_id: strings::random_id(&now, ID_RAND_LEN),
432 code,
433 unit_id: unit_id.to_string(),
434 unit_code: unit_code.clone(),
435 created_at: now,
436 modified_at: now,
437 host_uri: host_uri.to_string(),
438 name: match body.data.name.as_ref() {
439 None => "".to_string(),
440 Some(name) => name.clone(),
441 },
442 info: match body.data.info.as_ref() {
443 None => Map::new(),
444 Some(info) => info.clone(),
445 },
446 };
447 if let Err(e) = state.model.application().add(&application).await {
448 error!("[{}] add error: {}", FN_NAME, e);
449 return Err(ErrResp::ErrDb(Some(e.to_string())));
450 }
451 add_manager(
452 FN_NAME,
453 &state,
454 &host_uri,
455 unit_id,
456 unit_code.as_str(),
457 application.application_id.as_str(),
458 application.code.as_str(),
459 )
460 .await?;
461 Ok(Json(response::PostApplication {
462 data: response::PostApplicationData {
463 application_id: application.application_id,
464 },
465 }))
466}
467
468pub async fn get_application_count(
470 State(state): State<AppState>,
471 Extension(token_info): Extension<GetTokenInfoData>,
472 Query(query): Query<request::GetApplicationCountQuery>,
473) -> impl IntoResponse {
474 const FN_NAME: &'static str = "get_application_count";
475
476 let user_id = token_info.user_id.as_str();
477 let roles = &token_info.roles;
478
479 if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
480 match query.unit.as_ref() {
481 None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
482 Some(unit_id) => {
483 if unit_id.len() == 0 {
484 return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
485 }
486 }
487 }
488 }
489 let unit_cond = match query.unit.as_ref() {
490 None => None,
491 Some(unit_id) => match unit_id.len() {
492 0 => None,
493 _ => {
494 match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
495 None => {
496 return Err(ErrResp::Custom(
497 ErrReq::UNIT_NOT_EXIST.0,
498 ErrReq::UNIT_NOT_EXIST.1,
499 None,
500 ));
501 }
502 Some(_) => Some(unit_id.as_str()),
503 }
504 }
505 },
506 };
507 let mut code_cond = None;
508 let mut code_contains_cond = None;
509 if let Some(code) = query.code.as_ref() {
510 if code.len() > 0 {
511 code_cond = Some(code.as_str());
512 }
513 }
514 if code_cond.is_none() {
515 if let Some(contains) = query.contains.as_ref() {
516 if contains.len() > 0 {
517 code_contains_cond = Some(contains.as_str());
518 }
519 }
520 }
521 let cond = ListQueryCond {
522 unit_id: unit_cond,
523 code: code_cond,
524 code_contains: code_contains_cond,
525 ..Default::default()
526 };
527 match state.model.application().count(&cond).await {
528 Err(e) => {
529 error!("[{}] count error: {}", FN_NAME, e);
530 Err(ErrResp::ErrDb(Some(e.to_string())))
531 }
532 Ok(count) => Ok(Json(response::GetApplicationCount {
533 data: response::GetCountData { count },
534 })),
535 }
536}
537
538pub async fn get_application_list(
540 State(state): State<AppState>,
541 Extension(token_info): Extension<GetTokenInfoData>,
542 Query(query): Query<request::GetApplicationListQuery>,
543) -> impl IntoResponse {
544 const FN_NAME: &'static str = "get_application_list";
545
546 let user_id = token_info.user_id.as_str();
547 let roles = &token_info.roles;
548
549 if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
550 match query.unit.as_ref() {
551 None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
552 Some(unit_id) => {
553 if unit_id.len() == 0 {
554 return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
555 }
556 }
557 }
558 }
559 let unit_cond = match query.unit.as_ref() {
560 None => None,
561 Some(unit_id) => match unit_id.len() {
562 0 => None,
563 _ => {
564 match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
565 None => {
566 return Err(ErrResp::Custom(
567 ErrReq::UNIT_NOT_EXIST.0,
568 ErrReq::UNIT_NOT_EXIST.1,
569 None,
570 ));
571 }
572 Some(_) => Some(unit_id.as_str()),
573 }
574 }
575 },
576 };
577 let mut code_cond = None;
578 let mut code_contains_cond = None;
579 if let Some(code) = query.code.as_ref() {
580 if code.len() > 0 {
581 code_cond = Some(code.as_str());
582 }
583 }
584 if code_cond.is_none() {
585 if let Some(contains) = query.contains.as_ref() {
586 if contains.len() > 0 {
587 code_contains_cond = Some(contains.as_str());
588 }
589 }
590 }
591 let cond = ListQueryCond {
592 unit_id: unit_cond,
593 code: code_cond,
594 code_contains: code_contains_cond,
595 ..Default::default()
596 };
597 let sort_cond = get_sort_cond(&query.sort)?;
598 let opts = ListOptions {
599 cond: &cond,
600 offset: query.offset,
601 limit: match query.limit {
602 None => Some(LIST_LIMIT_DEFAULT),
603 Some(limit) => match limit {
604 0 => None,
605 _ => Some(limit),
606 },
607 },
608 sort: Some(sort_cond.as_slice()),
609 cursor_max: Some(LIST_CURSOR_MAX),
610 };
611
612 let (list, cursor) = match state.model.application().list(&opts, None).await {
613 Err(e) => {
614 error!("[{}] list error: {}", FN_NAME, e);
615 return Err(ErrResp::ErrDb(Some(e.to_string())));
616 }
617 Ok((list, cursor)) => match cursor {
618 None => match query.format {
619 Some(request::ListFormat::Array) => {
620 return Ok(Json(application_list_transform(&list)).into_response());
621 }
622 _ => {
623 return Ok(Json(response::GetApplicationList {
624 data: application_list_transform(&list),
625 })
626 .into_response());
627 }
628 },
629 Some(_) => (list, cursor),
630 },
631 };
632
633 let body = Body::from_stream(async_stream::stream! {
634 let unit_cond = match query.unit.as_ref() {
635 None => None,
636 Some(unit_id) => match unit_id.len() {
637 0 => None,
638 _ => Some(unit_id.as_str()),
639 },
640 };
641 let mut code_contains_cond = None;
642 if let Some(contains) = query.contains.as_ref() {
643 if contains.len() > 0 {
644 code_contains_cond = Some(contains.as_str());
645 }
646 }
647 let cond = ListQueryCond {
648 unit_id: unit_cond,
649 code_contains: code_contains_cond,
650 ..Default::default()
651 };
652 let opts = ListOptions {
653 cond: &cond,
654 offset: query.offset,
655 limit: match query.limit {
656 None => Some(LIST_LIMIT_DEFAULT),
657 Some(limit) => match limit {
658 0 => None,
659 _ => Some(limit),
660 },
661 },
662 sort: Some(sort_cond.as_slice()),
663 cursor_max: Some(LIST_CURSOR_MAX),
664 };
665
666 let mut list = list;
667 let mut cursor = cursor;
668 let mut is_first = true;
669 loop {
670 yield application_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
671 is_first = false;
672 if cursor.is_none() {
673 break;
674 }
675 let (_list, _cursor) = match state.model.application().list(&opts, cursor).await {
676 Err(_) => break,
677 Ok((list, cursor)) => (list, cursor),
678 };
679 list = _list;
680 cursor = _cursor;
681 }
682 });
683 Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
684}
685
686pub async fn get_application(
688 State(state): State<AppState>,
689 Extension(token_info): Extension<GetTokenInfoData>,
690 Path(param): Path<request::ApplicationIdPath>,
691) -> impl IntoResponse {
692 const FN_NAME: &'static str = "get_application";
693
694 let user_id = token_info.user_id.as_str();
695 let roles = &token_info.roles;
696 let application_id = param.application_id.as_str();
697
698 match check_application(FN_NAME, application_id, user_id, false, roles, &state).await? {
699 None => Err(ErrResp::ErrNotFound(None)),
700 Some(application) => Ok(Json(response::GetApplication {
701 data: application_transform(&application),
702 })),
703 }
704}
705
706pub async fn patch_application(
708 State(state): State<AppState>,
709 Extension(token_info): Extension<GetTokenInfoData>,
710 Path(param): Path<request::ApplicationIdPath>,
711 Json(mut body): Json<request::PatchApplicationBody>,
712) -> impl IntoResponse {
713 const FN_NAME: &'static str = "patch_application";
714
715 let user_id = token_info.user_id.as_str();
716 let roles = &token_info.roles;
717 let application_id = param.application_id.as_str();
718
719 let application =
721 match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
722 None => return Err(ErrResp::ErrNotFound(None)),
723 Some(application) => application,
724 };
725
726 let updates = get_updates(&mut body.data).await?;
727 let mut should_add_mgr = false;
728
729 if let Some(host_uri) = updates.host_uri {
731 let uri = Url::parse(host_uri).unwrap();
732 if !uri.as_str().eq(application.host_uri.as_str()) {
733 delete_manager(FN_NAME, &state, &application).await?;
734 should_add_mgr = true;
735 }
736 }
737
738 let cond = UpdateQueryCond { application_id };
740 if let Err(e) = state.model.application().update(&cond, &updates).await {
741 error!("[{}] update error: {}", FN_NAME, e);
742 return Err(ErrResp::ErrDb(Some(e.to_string())));
743 }
744
745 if should_add_mgr {
747 if let Some(host_uri) = updates.host_uri {
748 let uri = Url::parse(host_uri).unwrap();
749 add_manager(
750 FN_NAME,
751 &state,
752 &uri,
753 application.unit_id.as_str(),
754 application.unit_code.as_str(),
755 application.application_id.as_str(),
756 application.code.as_str(),
757 )
758 .await?;
759 }
760 }
761 Ok(StatusCode::NO_CONTENT)
762}
763
764pub async fn delete_application(
766 State(state): State<AppState>,
767 Extension(token_info): Extension<GetTokenInfoData>,
768 Path(param): Path<request::ApplicationIdPath>,
769) -> impl IntoResponse {
770 const FN_NAME: &'static str = "delete_application";
771
772 let user_id = token_info.user_id.as_str();
773 let roles = &token_info.roles;
774 let application_id = param.application_id.as_str();
775
776 let application =
778 match check_application(FN_NAME, application_id, user_id, true, roles, &state).await {
779 Err(e) => return Err(e), Ok(application) => match application {
781 None => return Ok(StatusCode::NO_CONTENT),
782 Some(application) => application,
783 },
784 };
785
786 delete_manager(FN_NAME, &state, &application).await?;
787 del_application_rsc(FN_NAME, application_id, &state).await?;
788 send_del_ctrl_message(FN_NAME, application, &state).await?;
789
790 Ok(StatusCode::NO_CONTENT)
791}
792
793fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
794 match sort_args.as_ref() {
795 None => Ok(vec![SortCond {
796 key: SortKey::Code,
797 asc: true,
798 }]),
799 Some(args) => {
800 let mut args = args.split(",");
801 let mut sort_cond = vec![];
802 while let Some(arg) = args.next() {
803 let mut cond = arg.split(":");
804 let key = match cond.next() {
805 None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
806 Some(field) => match field {
807 "code" => SortKey::Code,
808 "created" => SortKey::CreatedAt,
809 "modified" => SortKey::ModifiedAt,
810 "name" => SortKey::Name,
811 _ => {
812 return Err(ErrResp::ErrParam(Some(format!(
813 "invalid sort key {}",
814 field
815 ))));
816 }
817 },
818 };
819 let asc = match cond.next() {
820 None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
821 Some(asc) => match asc {
822 "asc" => true,
823 "desc" => false,
824 _ => {
825 return Err(ErrResp::ErrParam(Some(format!(
826 "invalid sort asc {}",
827 asc
828 ))));
829 }
830 },
831 };
832 if cond.next().is_some() {
833 return Err(ErrResp::ErrParam(Some(
834 "invalid sort condition".to_string(),
835 )));
836 }
837 sort_cond.push(SortCond { key, asc });
838 }
839 Ok(sort_cond)
840 }
841 }
842}
843
844async fn get_updates<'a>(
845 body: &'a mut request::PatchApplicationData,
846) -> Result<Updates<'a>, ErrResp> {
847 let mut updates = Updates {
848 ..Default::default()
849 };
850 let mut count = 0;
851 if let Some(host_uri) = body.host_uri.as_ref() {
852 match Url::parse(host_uri) {
853 Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
854 Ok(uri) => {
855 if !mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
856 return Err(ErrResp::ErrParam(Some(
857 "unsupport `hostUri` scheme".to_string(),
858 )));
859 }
860 body.host_uri = Some(uri.to_string()); }
862 }
863 }
864 if let Some(host_uri) = body.host_uri.as_ref() {
865 updates.host_uri = Some(host_uri.as_str());
866 count += 1;
867 }
868 if let Some(name) = body.name.as_ref() {
869 updates.name = Some(name.as_str());
870 count += 1;
871 }
872 if let Some(info) = body.info.as_ref() {
873 for (k, _) in info.iter() {
874 if k.len() == 0 {
875 return Err(ErrResp::ErrParam(Some(
876 "`info` key must not be empty".to_string(),
877 )));
878 }
879 }
880 updates.info = Some(info);
881 count += 1;
882 }
883
884 if count == 0 {
885 return Err(ErrResp::ErrParam(Some(
886 "at least one parameter".to_string(),
887 )));
888 }
889 updates.modified_at = Some(Utc::now());
890 Ok(updates)
891}
892
893async fn check_code(
899 fn_name: &str,
900 unit_id: &str,
901 code: &str,
902 state: &AppState,
903) -> Result<bool, ErrResp> {
904 let cond = QueryCond {
905 unit_id: Some(unit_id),
906 code: Some(code),
907 ..Default::default()
908 };
909 match state.model.application().get(&cond).await {
910 Err(e) => {
911 error!("[{}] check code error: {}", fn_name, e);
912 return Err(ErrResp::ErrDb(Some(format!("check code error: {}", e))));
913 }
914 Ok(application) => match application {
915 None => Ok(false),
916 Some(_) => Ok(true),
917 },
918 }
919}
920
921fn application_list_transform(list: &Vec<Application>) -> Vec<response::GetApplicationData> {
922 let mut ret = vec![];
923 for application in list.iter() {
924 ret.push(application_transform(&application));
925 }
926 ret
927}
928
929fn application_list_transform_bytes(
930 list: &Vec<Application>,
931 with_start: bool,
932 with_end: bool,
933 format: Option<&request::ListFormat>,
934) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
935 let mut build_str = match with_start {
936 false => "".to_string(),
937 true => match format {
938 Some(request::ListFormat::Array) => "[".to_string(),
939 _ => "{\"data\":[".to_string(),
940 },
941 };
942 let mut is_first = with_start;
943
944 for item in list {
945 if is_first {
946 is_first = false;
947 } else {
948 build_str.push(',');
949 }
950 let json_str = match serde_json::to_string(&application_transform(item)) {
951 Err(e) => return Err(Box::new(e)),
952 Ok(str) => str,
953 };
954 build_str += json_str.as_str();
955 }
956
957 if with_end {
958 build_str += match format {
959 Some(request::ListFormat::Array) => "]",
960 _ => "]}",
961 }
962 }
963 Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
964}
965
966fn application_transform(application: &Application) -> response::GetApplicationData {
967 response::GetApplicationData {
968 application_id: application.application_id.clone(),
969 code: application.code.clone(),
970 unit_id: application.unit_id.clone(),
971 unit_code: application.unit_code.clone(),
972 created_at: time_str(&application.created_at),
973 modified_at: time_str(&application.modified_at),
974 host_uri: application.host_uri.clone(),
975 name: application.name.clone(),
976 info: application.info.clone(),
977 }
978}
979
980async fn del_application_rsc(
981 fn_name: &str,
982 application_id: &str,
983 state: &AppState,
984) -> Result<(), ErrResp> {
985 let cond = network_route::QueryCond {
986 application_id: Some(application_id),
987 ..Default::default()
988 };
989 if let Err(e) = state.model.network_route().del(&cond).await {
990 error!("[{}] del network_route error: {}", fn_name, e);
991 return Err(ErrResp::ErrDb(Some(e.to_string())));
992 }
993
994 let cond = device_route::QueryCond {
995 application_id: Some(application_id),
996 ..Default::default()
997 };
998 if let Err(e) = state.model.device_route().del(&cond).await {
999 error!("[{}] del device_route error: {}", fn_name, e);
1000 return Err(ErrResp::ErrDb(Some(e.to_string())));
1001 }
1002
1003 let cond = dldata_buffer::QueryCond {
1004 application_id: Some(application_id),
1005 ..Default::default()
1006 };
1007 if let Err(e) = state.model.dldata_buffer().del(&cond).await {
1008 error!("[{}] del dldata_buffer error: {}", fn_name, e);
1009 return Err(ErrResp::ErrDb(Some(e.to_string())));
1010 }
1011
1012 let cond = QueryCond {
1013 application_id: Some(application_id),
1014 ..Default::default()
1015 };
1016 if let Err(e) = state.model.application().del(&cond).await {
1017 error!("[{}] del application error: {}", fn_name, e);
1018 return Err(ErrResp::ErrDb(Some(e.to_string())));
1019 }
1020
1021 Ok(())
1022}
1023
1024async fn send_del_ctrl_message(
1026 fn_name: &str,
1027 application: Application,
1028 state: &AppState,
1029) -> Result<(), ErrResp> {
1030 if state.cache.is_some() {
1031 let msg = SendCtrlMsg::DelApplication {
1032 operation: CtrlMsgOp::DEL_APPLICATION.to_string(),
1033 new: CtrlDelApplication {
1034 unit_id: application.unit_id,
1035 unit_code: application.unit_code,
1036 application_id: application.application_id,
1037 application_code: application.code,
1038 },
1039 };
1040 let payload = match serde_json::to_vec(&msg) {
1041 Err(e) => {
1042 error!(
1043 "[{}] marshal JSON for {} error: {}",
1044 fn_name,
1045 CtrlMsgOp::DEL_APPLICATION,
1046 e
1047 );
1048 return Err(ErrResp::ErrRsc(Some(format!(
1049 "marshal control message error: {}",
1050 e
1051 ))));
1052 }
1053 Ok(payload) => payload,
1054 };
1055 let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1056 if let Err(e) = ctrl_sender.send_msg(payload).await {
1057 error!(
1058 "[{}] send control message for {} error: {}",
1059 fn_name,
1060 CtrlMsgOp::DEL_APPLICATION,
1061 e
1062 );
1063 return Err(ErrResp::ErrIntMsg(Some(format!(
1064 "send control message error: {}",
1065 e
1066 ))));
1067 }
1068 }
1069
1070 Ok(())
1071}
1072
1073async fn add_manager(
1077 fn_name: &str,
1078 state: &AppState,
1079 host_uri: &Url,
1080 unit_id: &str,
1081 unit_code: &str,
1082 id: &str,
1083 name: &str,
1084) -> Result<(), ErrResp> {
1085 let opts = MgrOptions {
1086 unit_id: unit_id.to_string(),
1087 unit_code: unit_code.to_string(),
1088 id: id.to_string(),
1089 name: name.to_string(),
1090 prefetch: Some(state.amqp_prefetch),
1091 persistent: state.amqp_persistent,
1092 shared_prefix: Some(state.mqtt_shared_prefix.clone()),
1093 };
1094 let msg = SendCtrlMsg::AddManager {
1095 operation: CtrlMsgOp::ADD_MANAGER.to_string(),
1096 new: CtrlAddManager {
1097 host_uri: host_uri.to_string(),
1098 mgr_options: opts,
1099 },
1100 };
1101 let payload = match serde_json::to_vec(&msg) {
1102 Err(e) => {
1103 error!("[{}] marshal JSON for {} error: {}", fn_name, name, e);
1104 return Err(ErrResp::ErrRsc(Some(format!("new manager error:{}", e))));
1105 }
1106 Ok(payload) => payload,
1107 };
1108 let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1109 if let Err(e) = ctrl_sender.send_msg(payload).await {
1110 error!(
1111 "[{}] send control message for {} error: {}",
1112 fn_name, name, e
1113 );
1114 return Err(ErrResp::ErrIntMsg(Some(format!("new manager error:{}", e))));
1115 }
1116 Ok(())
1117}
1118
1119async fn delete_manager(
1121 fn_name: &str,
1122 state: &AppState,
1123 application: &Application,
1124) -> Result<(), ErrResp> {
1125 let key = gen_mgr_key(application.unit_code.as_str(), application.code.as_str());
1126 let msg = SendCtrlMsg::DelManager {
1127 operation: CtrlMsgOp::DEL_MANAGER.to_string(),
1128 new: key.clone(),
1129 };
1130 let payload = match serde_json::to_vec(&msg) {
1131 Err(e) => {
1132 error!("[{}] marshal JSON for {} error: {}", fn_name, key, e);
1133 return Err(ErrResp::ErrRsc(Some(format!("delete manager error:{}", e))));
1134 }
1135 Ok(payload) => payload,
1136 };
1137 let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1138 if let Err(e) = ctrl_sender.send_msg(payload).await {
1139 error!(
1140 "[{}] send control message for {} error: {}",
1141 fn_name, key, e
1142 );
1143 return Err(ErrResp::ErrIntMsg(Some(format!(
1144 "delete manager error:{}",
1145 e
1146 ))));
1147 }
1148 Ok(())
1149}
1150
1151impl MgrHandler {
1152 async fn get_device_route(
1156 &self,
1157 mgr: &ApplicationMgr,
1158 data: &Box<DlData>,
1159 ) -> Result<device_route::DeviceRouteCacheDlData, Box<DlDataResp>> {
1160 const FN_NAME: &'static str = "get_device_route";
1161
1162 if let Some(cache) = self.cache.as_ref() {
1163 match data.device_id.as_ref() {
1164 None => {
1165 let cond = device_route::GetCacheQueryCond {
1166 unit_code: mgr.unit_code(),
1167 network_code: data.network_code.as_ref().unwrap().as_str(),
1168 network_addr: data.network_addr.as_ref().unwrap().as_str(),
1169 };
1170 match cache.device_route().get_dldata(&cond).await {
1171 Err(e) => {
1172 error!("[{}] get device with error: {}", FN_NAME, e);
1173 return Err(Box::new(DlDataResp {
1174 correlation_id: data.correlation_id.clone(),
1175 error: Some(err::E_DB.to_string()),
1176 message: Some(format!("{}", e)),
1177 ..Default::default()
1178 }));
1179 }
1180 Ok(route) => match route {
1181 None => {
1182 warn!(
1183 "[{}] no device for {}.{}.{:?}",
1184 FN_NAME,
1185 mgr.unit_code(),
1186 mgr.name(),
1187 data.network_addr.as_ref()
1188 );
1189 return Err(Box::new(DlDataResp {
1190 correlation_id: data.correlation_id.clone(),
1191 error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1192 ..Default::default()
1193 }));
1194 }
1195 Some(route) => return Ok(route),
1196 },
1197 }
1198 }
1199 Some(device_id) => {
1200 let cond = device_route::GetCachePubQueryCond {
1201 unit_id: mgr.unit_id(),
1202 device_id: device_id.as_str(),
1203 };
1204 match cache.device_route().get_dldata_pub(&cond).await {
1205 Err(e) => {
1206 error!("[{}] get device with error: {}", FN_NAME, e);
1207 return Err(Box::new(DlDataResp {
1208 correlation_id: data.correlation_id.clone(),
1209 error: Some(err::E_DB.to_string()),
1210 message: Some(format!("{}", e)),
1211 ..Default::default()
1212 }));
1213 }
1214 Ok(route) => match route {
1215 None => {
1216 warn!(
1217 "[{}] no device for {}.{:?}",
1218 FN_NAME,
1219 mgr.unit_id(),
1220 data.device_id.as_ref(),
1221 );
1222 return Err(Box::new(DlDataResp {
1223 correlation_id: data.correlation_id.clone(),
1224 error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1225 ..Default::default()
1226 }));
1227 }
1228 Some(route) => return Ok(route),
1229 },
1230 }
1231 }
1232 }
1233 }
1234
1235 let cond = match data.device_id.as_ref() {
1237 None => device::QueryCond {
1238 device: Some(device::QueryOneCond {
1239 unit_code: Some(mgr.unit_code()),
1240 network_code: data.network_code.as_ref().unwrap().as_str(),
1241 network_addr: data.network_addr.as_ref().unwrap().as_str(),
1242 }),
1243 ..Default::default()
1244 },
1245 Some(device_id) => device::QueryCond {
1246 unit_id: Some(mgr.unit_id()),
1247 device_id: Some(device_id.as_str()),
1248 ..Default::default()
1249 },
1250 };
1251 let device = match self.model.device().get(&cond).await {
1252 Err(e) => {
1253 error!("[{}] get device with error: {}", FN_NAME, e);
1254 return Err(Box::new(DlDataResp {
1255 correlation_id: data.correlation_id.clone(),
1256 error: Some(err::E_DB.to_string()),
1257 message: Some(format!("{}", e)),
1258 ..Default::default()
1259 }));
1260 }
1261 Ok(device) => match device {
1262 None => {
1263 warn!(
1264 "[{}] no device for {}.{:?} or {}.{}.{:?}",
1265 FN_NAME,
1266 mgr.unit_id(),
1267 data.device_id.as_ref(),
1268 mgr.unit_code(),
1269 mgr.name(),
1270 data.network_addr.as_ref()
1271 );
1272 return Err(Box::new(DlDataResp {
1273 correlation_id: data.correlation_id.clone(),
1274 error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1275 ..Default::default()
1276 }));
1277 }
1278 Some(device) => device,
1279 },
1280 };
1281 let unit_code = match device.unit_code.as_ref() {
1282 None => "",
1283 Some(_) => mgr.unit_code(),
1284 };
1285 Ok(device_route::DeviceRouteCacheDlData {
1286 net_mgr_key: gen_mgr_key(unit_code, device.network_code.as_str()),
1287 network_id: device.network_id,
1288 network_addr: device.network_addr,
1289 device_id: device.device_id,
1290 profile: device.profile,
1291 })
1292 }
1293
1294 async fn send_application_dldata_msg(
1295 &self,
1296 proc: &DateTime<Utc>,
1297 data_id: &str,
1298 unit_id: &str,
1299 profile: &str,
1300 data: &Box<DlData>,
1301 ) -> Result<(), ()> {
1302 const FN_NAME: &'static str = "send_application_dldata_msg";
1303
1304 if let Some(sender) = self.data_sender.as_ref() {
1305 let msg = SendDataMsg {
1306 kind: DataMsgKind::APP_DLDATA.to_string(),
1307 data: SendDataKind::AppDlData {
1308 data_id: data_id.to_string(),
1309 proc: time_str(proc),
1310 status: DEF_DLDATA_STATUS,
1311 unit_id: unit_id.to_string(),
1312 device_id: data.device_id.clone(),
1313 network_code: data.network_code.clone(),
1314 network_addr: data.network_addr.clone(),
1315 profile: profile.to_string(),
1316 data: data.data.clone(),
1317 extension: data.extension.clone(),
1318 },
1319 };
1320 let payload = match serde_json::to_vec(&msg) {
1321 Err(e) => {
1322 error!("[{}] marshal JSON error: {}", FN_NAME, e);
1323 return Err(());
1324 }
1325 Ok(payload) => payload,
1326 };
1327 if let Err(e) = sender.send_msg(payload).await {
1328 error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
1329 return Err(());
1330 }
1331 }
1332 Ok(())
1333 }
1334
1335 async fn send_network_dldata_msg(
1336 &self,
1337 proc: &DateTime<Utc>,
1338 netmgr_code: &str,
1339 dldata_buff: &dldata_buffer::DlDataBuffer,
1340 profile: &str,
1341 net_data: &NetworkDlData,
1342 ) -> Result<(), ()> {
1343 const FN_NAME: &'static str = "send_network_dldata_msg";
1344
1345 if let Some(sender) = self.data_sender.as_ref() {
1346 let msg = SendDataMsg {
1347 kind: DataMsgKind::NET_DLDATA.to_string(),
1348 data: SendDataKind::NetDlData {
1349 data_id: dldata_buff.data_id.clone(),
1350 proc: time_str(proc),
1351 publish: net_data.publish.clone(),
1352 status: DEF_DLDATA_STATUS,
1353 unit_id: dldata_buff.unit_id.clone(),
1354 device_id: dldata_buff.device_id.clone(),
1355 network_code: netmgr_code.to_string(),
1356 network_addr: dldata_buff.network_addr.clone(),
1357 profile: profile.to_string(),
1358 data: net_data.data.clone(),
1359 extension: net_data.extension.clone(),
1360 },
1361 };
1362 let payload = match serde_json::to_vec(&msg) {
1363 Err(e) => {
1364 error!("[{}] marshal JSON error: {}", FN_NAME, e);
1365 return Err(());
1366 }
1367 Ok(payload) => payload,
1368 };
1369 if let Err(e) = sender.send_msg(payload).await {
1370 error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
1371 return Err(());
1372 }
1373 }
1374 Ok(())
1375 }
1376}
1377
1378#[async_trait]
1379impl EventHandler for MgrHandler {
1380 async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus) {
1381 if let Some(cache) = self.cache.as_ref() {
1383 if let Err(e) = cache.device().clear().await {
1384 error!(
1385 "[on_status_change] {}.{} clear device cache error: {}",
1386 mgr.unit_code(),
1387 mgr.name(),
1388 e
1389 );
1390 }
1391 if let Err(e) = cache.device_route().clear().await {
1392 error!(
1393 "[on_status_change] {}.{} clear device_route cache error: {}",
1394 mgr.unit_code(),
1395 mgr.name(),
1396 e
1397 );
1398 }
1399 if let Err(e) = cache.network_route().clear().await {
1400 error!(
1401 "[on_status_change] {}.{} clear network_route cache error: {}",
1402 mgr.unit_code(),
1403 mgr.name(),
1404 e
1405 );
1406 }
1407 }
1408
1409 match status {
1410 MgrStatus::NotReady => {
1411 error!(
1412 "[on_status_change] {}.{} to NotReady",
1413 mgr.unit_code(),
1414 mgr.name()
1415 );
1416 }
1417 MgrStatus::Ready => {
1418 info!(
1419 "[on_status_change] {}.{} to Ready",
1420 mgr.unit_code(),
1421 mgr.name()
1422 );
1423 }
1424 }
1425 }
1426
1427 async fn on_dldata(
1432 &self,
1433 mgr: &ApplicationMgr,
1434 data: Box<DlData>,
1435 ) -> Result<Box<DlDataResp>, ()> {
1436 const FN_NAME: &'static str = "on_dldata";
1437
1438 let dldata_route = match self.get_device_route(mgr, &data).await {
1440 Err(e) => return Ok(e),
1441 Ok(route) => route,
1442 };
1443
1444 let now = Utc::now();
1445 let data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
1446
1447 self.send_application_dldata_msg(
1448 &now,
1449 data_id.as_str(),
1450 mgr.unit_id(),
1451 &dldata_route.profile,
1452 &data,
1453 )
1454 .await?;
1455
1456 let network_mgr = {
1458 match self
1459 .network_mgrs
1460 .lock()
1461 .unwrap()
1462 .get(&dldata_route.net_mgr_key)
1463 {
1464 None => {
1465 return Ok(Box::new(DlDataResp {
1466 correlation_id: data.correlation_id,
1467 error: Some(ErrReq::NETWORK_NOT_EXIST.1.to_string()),
1468 ..Default::default()
1469 }));
1470 }
1471 Some(mgr) => mgr.clone(),
1472 }
1473 };
1474
1475 let ts_nanos = match now.timestamp_nanos_opt() {
1476 None => {
1477 error!("[{}] cannot generate valid nanoseconds", FN_NAME);
1478 return Ok(Box::new(DlDataResp {
1479 correlation_id: data.correlation_id.clone(),
1480 error: Some(err::E_RSC.to_string()),
1481 message: Some(format!("cannot generate valid nanoseconds")),
1482 ..Default::default()
1483 }));
1484 }
1485 Some(ts) => ts,
1486 };
1487 let expired_at = Utc.timestamp_nanos(ts_nanos + DATA_EXPIRES_IN * 1_000_000_000);
1488 let dldata = dldata_buffer::DlDataBuffer {
1489 data_id: data_id.clone(),
1490 unit_id: mgr.unit_id().to_string(),
1491 unit_code: mgr.unit_code().to_string(),
1492 application_id: mgr.id().to_string(),
1493 application_code: mgr.name().to_string(),
1494 network_id: dldata_route.network_id,
1495 network_addr: dldata_route.network_addr.clone(),
1496 device_id: dldata_route.device_id,
1497 created_at: now,
1498 expired_at,
1499 };
1500 match self.model.dldata_buffer().add(&dldata).await {
1501 Err(e) => {
1502 error!("[{}] add data buffer with error: {}", FN_NAME, e);
1503 return Ok(Box::new(DlDataResp {
1504 correlation_id: data.correlation_id,
1505 error: Some(err::E_DB.to_string()),
1506 message: Some(format!("{}", e)),
1507 ..Default::default()
1508 }));
1509 }
1510 Ok(_) => (),
1511 }
1512
1513 let net_data = NetworkDlData {
1514 data_id,
1515 publish: time_str(&now),
1516 expires_in: DATA_EXPIRES_IN,
1517 network_addr: dldata_route.network_addr,
1518 data: data.data,
1519 extension: data.extension,
1520 };
1521 self.send_network_dldata_msg(
1522 &now,
1523 network_mgr.name(),
1524 &dldata,
1525 &dldata_route.profile,
1526 &net_data,
1527 )
1528 .await?;
1529 if let Err(e) = network_mgr.send_dldata(&net_data) {
1530 error!("[{}] send dldata to network with error: {}", FN_NAME, e);
1531 return Ok(Box::new(DlDataResp {
1532 correlation_id: data.correlation_id,
1533 error: Some(err::E_INT_MSG.to_string()),
1534 message: Some(format!("send data with error: {}", e)),
1535 ..Default::default()
1536 }));
1537 }
1538
1539 Ok(Box::new(DlDataResp {
1540 correlation_id: data.correlation_id,
1541 data_id: Some(net_data.data_id),
1542 ..Default::default()
1543 }))
1544 }
1545}
1546
1547#[async_trait]
1548impl QueueEventHandler for CtrlSenderHandler {
1549 async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
1550 const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
1551 let queue_name = queue.name();
1552 error!("[{}] {} error: {}", FN_NAME, queue_name, err);
1553 }
1554
1555 async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
1556 const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
1557 let queue_name = queue.name();
1558 match status {
1559 Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
1560 _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
1561 }
1562 }
1563}
1564
1565#[async_trait]
1566impl MessageHandler for CtrlSenderHandler {
1567 async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
1568}
1569
1570#[async_trait]
1571impl QueueEventHandler for CtrlReceiverHandler {
1572 async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
1573 const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
1574 let queue_name = queue.name();
1575 error!("[{}] {} error: {}", FN_NAME, queue_name, err);
1576 }
1577
1578 async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
1579 const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
1580 let queue_name = queue.name();
1581 match status {
1582 Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
1583 _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
1584 }
1585 }
1586}
1587
1588#[async_trait]
1589impl MessageHandler for CtrlReceiverHandler {
1590 async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
1591 const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
1592 let queue_name = queue.name();
1593
1594 let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
1595 Err(e) => {
1596 let src_str: String = String::from_utf8_lossy(msg.payload()).into();
1597 warn!(
1598 "[{}] {} parse JSON error: {}, src: {}",
1599 FN_NAME, queue_name, e, src_str
1600 );
1601 if let Err(e) = msg.ack().await {
1602 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1603 }
1604 return;
1605 }
1606 Ok(msg) => msg,
1607 };
1608 match ctrl_msg {
1609 RecvCtrlMsg::DelApplication { new: _new } => {}
1610 RecvCtrlMsg::AddManager { new } => {
1611 let host_uri = match Url::parse(new.host_uri.as_str()) {
1612 Err(e) => {
1613 warn!("[{}] {} hostUri error: {}", FN_NAME, queue_name, e);
1614 if let Err(e) = msg.ack().await {
1615 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1616 }
1617 return;
1618 }
1619 Ok(uri) => uri,
1620 };
1621 let handler = MgrHandler {
1622 model: self.model.clone(),
1623 cache: self.cache.clone(),
1624 network_mgrs: self.network_mgrs.clone(),
1625 data_sender: self.data_sender.clone(),
1626 };
1627 let unit_code = new.mgr_options.unit_code.clone();
1628 let name = new.mgr_options.name.clone();
1629 let mgr = match ApplicationMgr::new(
1630 self.mq_conns.clone(),
1631 &host_uri,
1632 new.mgr_options,
1633 Arc::new(handler),
1634 ) {
1635 Err(e) => {
1636 error!("[{}] {} new manager error: {}", FN_NAME, queue_name, e);
1637 if let Err(e) = msg.ack().await {
1638 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1639 }
1640 return;
1641 }
1642 Ok(mgr) => {
1643 debug!("[{}] {} new manager", FN_NAME, queue_name);
1644 mgr
1645 }
1646 };
1647 let key = gen_mgr_key(unit_code.as_str(), name.as_str());
1648 let old_mgr = {
1649 self.application_mgrs
1650 .lock()
1651 .unwrap()
1652 .insert(key.clone(), mgr)
1653 };
1654 if let Some(mgr) = old_mgr {
1655 if let Err(e) = mgr.close().await {
1656 error!(
1657 "[{}] {} close old manager {} error: {}",
1658 FN_NAME, queue_name, key, e
1659 );
1660 } else {
1661 debug!("[{}] {} close old manager {}", FN_NAME, queue_name, key);
1662 }
1663 }
1664 info!("[{}] {} manager {} added", FN_NAME, queue_name, key);
1665 }
1666 RecvCtrlMsg::DelManager { new } => {
1667 let old_mgr = { self.application_mgrs.lock().unwrap().remove(&new) };
1668 match old_mgr {
1669 None => {
1670 error!("[{}] {} get no manager {}", FN_NAME, queue_name, new);
1671 if let Err(e) = msg.ack().await {
1672 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1673 }
1674 return;
1675 }
1676 Some(mgr) => {
1677 if let Err(e) = mgr.close().await {
1678 error!(
1679 "[{}] {} close old manager {} error: {}",
1680 FN_NAME, queue_name, new, e
1681 );
1682 } else {
1683 debug!("[{}] {} close old manager {}", FN_NAME, queue_name, new);
1684 }
1685 }
1686 }
1687 info!("[{}] {} manager {} deleted", FN_NAME, queue_name, new);
1688 }
1689 }
1690 if let Err(e) = msg.ack().await {
1691 error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1692 }
1693 }
1694}