sylvia_iot_broker/routes/v1/application/
api.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    io::{Error as IoError, ErrorKind},
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use axum::{
11    Extension,
12    body::{Body, Bytes},
13    extract::State,
14    http::{StatusCode, header},
15    response::IntoResponse,
16};
17use chrono::{DateTime, TimeZone, Utc};
18use log::{debug, error, info, warn};
19use serde::{Deserialize, Serialize};
20use serde_json::{self, Map, Value};
21use tokio::time;
22use url::Url;
23
24use general_mq::{
25    Queue,
26    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27};
28use sylvia_iot_corelib::{
29    constants::ContentType,
30    err::{self, ErrResp},
31    http::{Json, Path, Query},
32    role::Role,
33    strings::{self, time_str},
34};
35
36use super::{
37    super::{
38        super::{ErrReq, State as AppState, middleware::GetTokenInfoData},
39        lib::{check_application, check_unit, gen_mgr_key},
40    },
41    request, response,
42};
43use crate::{
44    libs::{
45        config::BrokerCtrl as CfgCtrl,
46        mq::{
47            self, Connection, MgrStatus, Options as MgrOptions,
48            application::{ApplicationMgr, DlData, DlDataResp, EventHandler},
49            network::{DlData as NetworkDlData, NetworkMgr},
50        },
51    },
52    models::{
53        Cache, Model,
54        application::{
55            Application, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey, UpdateQueryCond,
56            Updates,
57        },
58        device, device_route, dldata_buffer, network_route,
59    },
60};
61
62struct MgrHandler {
63    model: Arc<dyn Model>,
64    cache: Option<Arc<dyn Cache>>,
65    network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
66    data_sender: Option<Queue>,
67}
68
69#[derive(Deserialize)]
70#[serde(tag = "operation")]
71enum RecvCtrlMsg {
72    #[serde(rename = "del-application")]
73    DelApplication { new: CtrlDelApplication },
74    #[serde(rename = "add-manager")]
75    AddManager { new: CtrlAddManager },
76    #[serde(rename = "del-manager")]
77    DelManager { new: String },
78}
79
80/// Control channel.
81#[derive(Serialize)]
82#[serde(untagged)]
83enum SendCtrlMsg {
84    DelApplication {
85        operation: String,
86        new: CtrlDelApplication,
87    },
88    AddManager {
89        operation: String,
90        new: CtrlAddManager,
91    },
92    DelManager {
93        operation: String,
94        new: String,
95    },
96}
97
98/// Data channel.
99#[derive(Serialize)]
100struct SendDataMsg {
101    kind: String,
102    data: SendDataKind,
103}
104
105#[derive(Serialize)]
106#[serde(untagged)]
107enum SendDataKind {
108    AppDlData {
109        #[serde(rename = "dataId")]
110        data_id: String,
111        proc: String,
112        status: i32,
113        #[serde(rename = "unitId")]
114        unit_id: String,
115        #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
116        device_id: Option<String>,
117        #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
118        network_code: Option<String>,
119        #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
120        network_addr: Option<String>,
121        profile: String,
122        data: String,
123        #[serde(skip_serializing_if = "Option::is_none")]
124        extension: Option<Map<String, Value>>,
125    },
126    NetDlData {
127        #[serde(rename = "dataId")]
128        data_id: String,
129        proc: String,
130        #[serde(rename = "pub")]
131        publish: String,
132        status: i32,
133        #[serde(rename = "unitId")]
134        unit_id: String,
135        #[serde(rename = "deviceId")]
136        device_id: String,
137        #[serde(rename = "networkCode")]
138        network_code: String,
139        #[serde(rename = "networkAddr")]
140        network_addr: String,
141        profile: String,
142        data: String,
143        #[serde(skip_serializing_if = "Option::is_none")]
144        extension: Option<Map<String, Value>>,
145    },
146}
147
148struct CtrlMsgOp;
149struct DataMsgKind;
150
151#[derive(Deserialize, Serialize)]
152struct CtrlDelApplication {
153    #[serde(rename = "unitId")]
154    unit_id: String,
155    #[serde(rename = "unitCode")]
156    unit_code: String,
157    #[serde(rename = "applicationId")]
158    application_id: String,
159    #[serde(rename = "applicationCode")]
160    application_code: String,
161}
162
163#[derive(Deserialize, Serialize)]
164struct CtrlAddManager {
165    #[serde(rename = "hostUri")]
166    host_uri: String,
167    #[serde(rename = "mgrOptions")]
168    mgr_options: MgrOptions,
169}
170
171struct CtrlSenderHandler;
172
173struct CtrlReceiverHandler {
174    model: Arc<dyn Model>,
175    cache: Option<Arc<dyn Cache>>,
176    mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
177    application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
178    network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
179    data_sender: Option<Queue>,
180}
181
182impl CtrlMsgOp {
183    const DEL_APPLICATION: &'static str = "del-application";
184    const ADD_MANAGER: &'static str = "add-manager";
185    const DEL_MANAGER: &'static str = "del-manager";
186}
187
188impl DataMsgKind {
189    const APP_DLDATA: &'static str = "application-dldata";
190    const NET_DLDATA: &'static str = "network-dldata";
191}
192
193const LIST_LIMIT_DEFAULT: u64 = 100;
194const LIST_CURSOR_MAX: u64 = 100;
195const ID_RAND_LEN: usize = 8;
196const DATA_ID_RAND_LEN: usize = 12;
197const DATA_EXPIRES_IN: i64 = 86400; // in seconds
198const CTRL_QUEUE_NAME: &'static str = "application";
199const DEF_DLDATA_STATUS: i32 = -2;
200
201/// Initialize application managers and channels.
202pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
203    const FN_NAME: &'static str = "init";
204
205    let q = new_ctrl_receiver(state, ctrl_conf)?;
206    {
207        state
208            .ctrl_receivers
209            .lock()
210            .unwrap()
211            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
212    }
213
214    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
215    // Wait for connected.
216    for _ in 0..500 {
217        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
218            break;
219        }
220        time::sleep(Duration::from_millis(10)).await;
221    }
222    if ctrl_sender.status() != Status::Connected {
223        error!(
224            "[{}] {} control sender not connected",
225            FN_NAME, CTRL_QUEUE_NAME
226        );
227        return Err(Box::new(IoError::new(
228            ErrorKind::NotConnected,
229            format!("control sender {} not connected", CTRL_QUEUE_NAME),
230        )));
231    }
232    if q.status() != Status::Connected {
233        error!(
234            "[{}] {} control receiver not connected",
235            FN_NAME, CTRL_QUEUE_NAME
236        );
237        return Err(Box::new(IoError::new(
238            ErrorKind::NotConnected,
239            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
240        )));
241    }
242
243    let cond = ListQueryCond {
244        ..Default::default()
245    };
246    let opts = ListOptions {
247        cond: &cond,
248        offset: None,
249        limit: None,
250        sort: None,
251        cursor_max: Some(LIST_CURSOR_MAX),
252    };
253    let mut list;
254    let mut cursor = None;
255    loop {
256        (list, cursor) = state.model.application().list(&opts, cursor).await?;
257        for item in list.iter() {
258            let url = Url::parse(item.host_uri.as_str())?;
259            let key = gen_mgr_key(item.unit_code.as_str(), item.code.as_str());
260            let opts = MgrOptions {
261                unit_id: item.unit_id.clone(),
262                unit_code: item.unit_code.clone(),
263                id: item.application_id.clone(),
264                name: item.code.clone(),
265                prefetch: Some(state.amqp_prefetch),
266                persistent: state.amqp_persistent,
267                shared_prefix: Some(state.mqtt_shared_prefix.clone()),
268            };
269            let handler = MgrHandler {
270                model: state.model.clone(),
271                cache: state.cache.clone(),
272                network_mgrs: state.network_mgrs.clone(),
273                data_sender: state.data_sender.clone(),
274            };
275            let mgr =
276                match ApplicationMgr::new(state.mq_conns.clone(), &url, opts, Arc::new(handler)) {
277                    Err(e) => {
278                        error!("[{}] new manager for {} error: {}", FN_NAME, key, e);
279                        return Err(Box::new(ErrResp::ErrRsc(Some(e))));
280                    }
281                    Ok(mgr) => mgr,
282                };
283            {
284                state
285                    .application_mgrs
286                    .lock()
287                    .unwrap()
288                    .insert(key.clone(), mgr);
289            }
290        }
291        if cursor.is_none() {
292            break;
293        }
294    }
295
296    Ok(())
297}
298
299/// Create control channel sender queue.
300pub fn new_ctrl_sender(
301    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
302    config: &CfgCtrl,
303) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
304    let url = match config.url.as_ref() {
305        None => {
306            return Err(Box::new(IoError::new(
307                ErrorKind::InvalidInput,
308                "empty control url",
309            )));
310        }
311        Some(url) => match Url::parse(url.as_str()) {
312            Err(e) => return Err(Box::new(e)),
313            Ok(url) => url,
314        },
315    };
316
317    match mq::control::new(
318        conn_pool.clone(),
319        &url,
320        config.prefetch,
321        CTRL_QUEUE_NAME,
322        false,
323        Arc::new(CtrlSenderHandler {}),
324        Arc::new(CtrlSenderHandler {}),
325    ) {
326        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
327        Ok(q) => Ok(Arc::new(Mutex::new(q))),
328    }
329}
330
331/// Create control channel receiver queue.
332pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
333    let url = match config.url.as_ref() {
334        None => {
335            return Err(Box::new(IoError::new(
336                ErrorKind::InvalidInput,
337                "empty control url",
338            )));
339        }
340        Some(url) => match Url::parse(url.as_str()) {
341            Err(e) => return Err(Box::new(e)),
342            Ok(url) => url,
343        },
344    };
345    let handler = Arc::new(CtrlReceiverHandler {
346        model: state.model.clone(),
347        cache: state.cache.clone(),
348        mq_conns: state.mq_conns.clone(),
349        application_mgrs: state.application_mgrs.clone(),
350        network_mgrs: state.network_mgrs.clone(),
351        data_sender: state.data_sender.clone(),
352    });
353    match mq::control::new(
354        state.mq_conns.clone(),
355        &url,
356        config.prefetch,
357        CTRL_QUEUE_NAME,
358        true,
359        handler.clone(),
360        handler,
361    ) {
362        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
363        Ok(q) => Ok(q),
364    }
365}
366
367/// `POST /{base}/api/v1/application`
368pub async fn post_application(
369    State(state): State<AppState>,
370    Extension(token_info): Extension<GetTokenInfoData>,
371    Json(body): Json<request::PostApplicationBody>,
372) -> impl IntoResponse {
373    const FN_NAME: &'static str = "post_application";
374
375    let user_id = token_info.user_id.as_str();
376    let roles = &token_info.roles;
377
378    let code = body.data.code.to_lowercase();
379    let host_uri = body.data.host_uri.as_str();
380    if !strings::is_code(code.as_str()) {
381        return Err(ErrResp::ErrParam(Some(
382            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
383        )));
384    }
385    let host_uri = match Url::parse(host_uri) {
386        Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
387        Ok(uri) => match mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
388            false => {
389                return Err(ErrResp::ErrParam(Some(
390                    "unsupport `hostUri` scheme".to_string(),
391                )));
392            }
393            true => uri,
394        },
395    };
396    if let Some(info) = body.data.info.as_ref() {
397        for (k, _) in info.iter() {
398            if k.len() == 0 {
399                return Err(ErrResp::ErrParam(Some(
400                    "`info` key must not be empty".to_string(),
401                )));
402            }
403        }
404    }
405    let unit_id = body.data.unit_id.as_str();
406    if unit_id.len() == 0 {
407        return Err(ErrResp::ErrParam(Some(
408            "`unitId` must with at least one character".to_string(),
409        )));
410    }
411    let unit_code = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
412        None => {
413            return Err(ErrResp::Custom(
414                ErrReq::UNIT_NOT_EXIST.0,
415                ErrReq::UNIT_NOT_EXIST.1,
416                None,
417            ));
418        }
419        Some(unit) => unit.code,
420    };
421    if check_code(FN_NAME, unit_id, code.as_str(), &state).await? {
422        return Err(ErrResp::Custom(
423            ErrReq::APPLICATION_EXIST.0,
424            ErrReq::APPLICATION_EXIST.1,
425            None,
426        ));
427    }
428
429    let now = Utc::now();
430    let application = Application {
431        application_id: strings::random_id(&now, ID_RAND_LEN),
432        code,
433        unit_id: unit_id.to_string(),
434        unit_code: unit_code.clone(),
435        created_at: now,
436        modified_at: now,
437        host_uri: host_uri.to_string(),
438        name: match body.data.name.as_ref() {
439            None => "".to_string(),
440            Some(name) => name.clone(),
441        },
442        info: match body.data.info.as_ref() {
443            None => Map::new(),
444            Some(info) => info.clone(),
445        },
446    };
447    if let Err(e) = state.model.application().add(&application).await {
448        error!("[{}] add error: {}", FN_NAME, e);
449        return Err(ErrResp::ErrDb(Some(e.to_string())));
450    }
451    add_manager(
452        FN_NAME,
453        &state,
454        &host_uri,
455        unit_id,
456        unit_code.as_str(),
457        application.application_id.as_str(),
458        application.code.as_str(),
459    )
460    .await?;
461    Ok(Json(response::PostApplication {
462        data: response::PostApplicationData {
463            application_id: application.application_id,
464        },
465    }))
466}
467
468/// `GET /{base}/api/v1/application/count`
469pub async fn get_application_count(
470    State(state): State<AppState>,
471    Extension(token_info): Extension<GetTokenInfoData>,
472    Query(query): Query<request::GetApplicationCountQuery>,
473) -> impl IntoResponse {
474    const FN_NAME: &'static str = "get_application_count";
475
476    let user_id = token_info.user_id.as_str();
477    let roles = &token_info.roles;
478
479    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
480        match query.unit.as_ref() {
481            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
482            Some(unit_id) => {
483                if unit_id.len() == 0 {
484                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
485                }
486            }
487        }
488    }
489    let unit_cond = match query.unit.as_ref() {
490        None => None,
491        Some(unit_id) => match unit_id.len() {
492            0 => None,
493            _ => {
494                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
495                    None => {
496                        return Err(ErrResp::Custom(
497                            ErrReq::UNIT_NOT_EXIST.0,
498                            ErrReq::UNIT_NOT_EXIST.1,
499                            None,
500                        ));
501                    }
502                    Some(_) => Some(unit_id.as_str()),
503                }
504            }
505        },
506    };
507    let mut code_cond = None;
508    let mut code_contains_cond = None;
509    if let Some(code) = query.code.as_ref() {
510        if code.len() > 0 {
511            code_cond = Some(code.as_str());
512        }
513    }
514    if code_cond.is_none() {
515        if let Some(contains) = query.contains.as_ref() {
516            if contains.len() > 0 {
517                code_contains_cond = Some(contains.as_str());
518            }
519        }
520    }
521    let cond = ListQueryCond {
522        unit_id: unit_cond,
523        code: code_cond,
524        code_contains: code_contains_cond,
525        ..Default::default()
526    };
527    match state.model.application().count(&cond).await {
528        Err(e) => {
529            error!("[{}] count error: {}", FN_NAME, e);
530            Err(ErrResp::ErrDb(Some(e.to_string())))
531        }
532        Ok(count) => Ok(Json(response::GetApplicationCount {
533            data: response::GetCountData { count },
534        })),
535    }
536}
537
538/// `GET /{base}/api/v1/application/list`
539pub async fn get_application_list(
540    State(state): State<AppState>,
541    Extension(token_info): Extension<GetTokenInfoData>,
542    Query(query): Query<request::GetApplicationListQuery>,
543) -> impl IntoResponse {
544    const FN_NAME: &'static str = "get_application_list";
545
546    let user_id = token_info.user_id.as_str();
547    let roles = &token_info.roles;
548
549    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
550        match query.unit.as_ref() {
551            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
552            Some(unit_id) => {
553                if unit_id.len() == 0 {
554                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
555                }
556            }
557        }
558    }
559    let unit_cond = match query.unit.as_ref() {
560        None => None,
561        Some(unit_id) => match unit_id.len() {
562            0 => None,
563            _ => {
564                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
565                    None => {
566                        return Err(ErrResp::Custom(
567                            ErrReq::UNIT_NOT_EXIST.0,
568                            ErrReq::UNIT_NOT_EXIST.1,
569                            None,
570                        ));
571                    }
572                    Some(_) => Some(unit_id.as_str()),
573                }
574            }
575        },
576    };
577    let mut code_cond = None;
578    let mut code_contains_cond = None;
579    if let Some(code) = query.code.as_ref() {
580        if code.len() > 0 {
581            code_cond = Some(code.as_str());
582        }
583    }
584    if code_cond.is_none() {
585        if let Some(contains) = query.contains.as_ref() {
586            if contains.len() > 0 {
587                code_contains_cond = Some(contains.as_str());
588            }
589        }
590    }
591    let cond = ListQueryCond {
592        unit_id: unit_cond,
593        code: code_cond,
594        code_contains: code_contains_cond,
595        ..Default::default()
596    };
597    let sort_cond = get_sort_cond(&query.sort)?;
598    let opts = ListOptions {
599        cond: &cond,
600        offset: query.offset,
601        limit: match query.limit {
602            None => Some(LIST_LIMIT_DEFAULT),
603            Some(limit) => match limit {
604                0 => None,
605                _ => Some(limit),
606            },
607        },
608        sort: Some(sort_cond.as_slice()),
609        cursor_max: Some(LIST_CURSOR_MAX),
610    };
611
612    let (list, cursor) = match state.model.application().list(&opts, None).await {
613        Err(e) => {
614            error!("[{}] list error: {}", FN_NAME, e);
615            return Err(ErrResp::ErrDb(Some(e.to_string())));
616        }
617        Ok((list, cursor)) => match cursor {
618            None => match query.format {
619                Some(request::ListFormat::Array) => {
620                    return Ok(Json(application_list_transform(&list)).into_response());
621                }
622                _ => {
623                    return Ok(Json(response::GetApplicationList {
624                        data: application_list_transform(&list),
625                    })
626                    .into_response());
627                }
628            },
629            Some(_) => (list, cursor),
630        },
631    };
632
633    let body = Body::from_stream(async_stream::stream! {
634        let unit_cond = match query.unit.as_ref() {
635            None => None,
636            Some(unit_id) => match unit_id.len() {
637                0 => None,
638                _ => Some(unit_id.as_str()),
639            },
640        };
641        let mut code_contains_cond = None;
642        if let Some(contains) = query.contains.as_ref() {
643            if contains.len() > 0 {
644                code_contains_cond = Some(contains.as_str());
645            }
646        }
647        let cond = ListQueryCond {
648            unit_id: unit_cond,
649            code_contains: code_contains_cond,
650            ..Default::default()
651        };
652        let opts = ListOptions {
653            cond: &cond,
654            offset: query.offset,
655            limit: match query.limit {
656                None => Some(LIST_LIMIT_DEFAULT),
657                Some(limit) => match limit {
658                    0 => None,
659                    _ => Some(limit),
660                },
661            },
662            sort: Some(sort_cond.as_slice()),
663            cursor_max: Some(LIST_CURSOR_MAX),
664        };
665
666        let mut list = list;
667        let mut cursor = cursor;
668        let mut is_first = true;
669        loop {
670            yield application_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
671            is_first = false;
672            if cursor.is_none() {
673                break;
674            }
675            let (_list, _cursor) = match state.model.application().list(&opts, cursor).await {
676                Err(_) => break,
677                Ok((list, cursor)) => (list, cursor),
678            };
679            list = _list;
680            cursor = _cursor;
681        }
682    });
683    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
684}
685
686/// `GET /{base}/api/v1/application/{applicationId}`
687pub async fn get_application(
688    State(state): State<AppState>,
689    Extension(token_info): Extension<GetTokenInfoData>,
690    Path(param): Path<request::ApplicationIdPath>,
691) -> impl IntoResponse {
692    const FN_NAME: &'static str = "get_application";
693
694    let user_id = token_info.user_id.as_str();
695    let roles = &token_info.roles;
696    let application_id = param.application_id.as_str();
697
698    match check_application(FN_NAME, application_id, user_id, false, roles, &state).await? {
699        None => Err(ErrResp::ErrNotFound(None)),
700        Some(application) => Ok(Json(response::GetApplication {
701            data: application_transform(&application),
702        })),
703    }
704}
705
706/// `PATCH /{base}/api/v1/application/{applicationId}`
707pub async fn patch_application(
708    State(state): State<AppState>,
709    Extension(token_info): Extension<GetTokenInfoData>,
710    Path(param): Path<request::ApplicationIdPath>,
711    Json(mut body): Json<request::PatchApplicationBody>,
712) -> impl IntoResponse {
713    const FN_NAME: &'static str = "patch_application";
714
715    let user_id = token_info.user_id.as_str();
716    let roles = &token_info.roles;
717    let application_id = param.application_id.as_str();
718
719    // To check if the application is for the user.
720    let application =
721        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
722            None => return Err(ErrResp::ErrNotFound(None)),
723            Some(application) => application,
724        };
725
726    let updates = get_updates(&mut body.data).await?;
727    let mut should_add_mgr = false;
728
729    // Remove old manager.
730    if let Some(host_uri) = updates.host_uri {
731        let uri = Url::parse(host_uri).unwrap();
732        if !uri.as_str().eq(application.host_uri.as_str()) {
733            delete_manager(FN_NAME, &state, &application).await?;
734            should_add_mgr = true;
735        }
736    }
737
738    // Update database.
739    let cond = UpdateQueryCond { application_id };
740    if let Err(e) = state.model.application().update(&cond, &updates).await {
741        error!("[{}] update error: {}", FN_NAME, e);
742        return Err(ErrResp::ErrDb(Some(e.to_string())));
743    }
744
745    // Add new manager.
746    if should_add_mgr {
747        if let Some(host_uri) = updates.host_uri {
748            let uri = Url::parse(host_uri).unwrap();
749            add_manager(
750                FN_NAME,
751                &state,
752                &uri,
753                application.unit_id.as_str(),
754                application.unit_code.as_str(),
755                application.application_id.as_str(),
756                application.code.as_str(),
757            )
758            .await?;
759        }
760    }
761    Ok(StatusCode::NO_CONTENT)
762}
763
764/// `DELETE /{base}/api/v1/application/{applicationId}`
765pub async fn delete_application(
766    State(state): State<AppState>,
767    Extension(token_info): Extension<GetTokenInfoData>,
768    Path(param): Path<request::ApplicationIdPath>,
769) -> impl IntoResponse {
770    const FN_NAME: &'static str = "delete_application";
771
772    let user_id = token_info.user_id.as_str();
773    let roles = &token_info.roles;
774    let application_id = param.application_id.as_str();
775
776    // To check if the application is for the user.
777    let application =
778        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await {
779            Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
780            Ok(application) => match application {
781                None => return Ok(StatusCode::NO_CONTENT),
782                Some(application) => application,
783            },
784        };
785
786    delete_manager(FN_NAME, &state, &application).await?;
787    del_application_rsc(FN_NAME, application_id, &state).await?;
788    send_del_ctrl_message(FN_NAME, application, &state).await?;
789
790    Ok(StatusCode::NO_CONTENT)
791}
792
793fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
794    match sort_args.as_ref() {
795        None => Ok(vec![SortCond {
796            key: SortKey::Code,
797            asc: true,
798        }]),
799        Some(args) => {
800            let mut args = args.split(",");
801            let mut sort_cond = vec![];
802            while let Some(arg) = args.next() {
803                let mut cond = arg.split(":");
804                let key = match cond.next() {
805                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
806                    Some(field) => match field {
807                        "code" => SortKey::Code,
808                        "created" => SortKey::CreatedAt,
809                        "modified" => SortKey::ModifiedAt,
810                        "name" => SortKey::Name,
811                        _ => {
812                            return Err(ErrResp::ErrParam(Some(format!(
813                                "invalid sort key {}",
814                                field
815                            ))));
816                        }
817                    },
818                };
819                let asc = match cond.next() {
820                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
821                    Some(asc) => match asc {
822                        "asc" => true,
823                        "desc" => false,
824                        _ => {
825                            return Err(ErrResp::ErrParam(Some(format!(
826                                "invalid sort asc {}",
827                                asc
828                            ))));
829                        }
830                    },
831                };
832                if cond.next().is_some() {
833                    return Err(ErrResp::ErrParam(Some(
834                        "invalid sort condition".to_string(),
835                    )));
836                }
837                sort_cond.push(SortCond { key, asc });
838            }
839            Ok(sort_cond)
840        }
841    }
842}
843
844async fn get_updates<'a>(
845    body: &'a mut request::PatchApplicationData,
846) -> Result<Updates<'a>, ErrResp> {
847    let mut updates = Updates {
848        ..Default::default()
849    };
850    let mut count = 0;
851    if let Some(host_uri) = body.host_uri.as_ref() {
852        match Url::parse(host_uri) {
853            Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
854            Ok(uri) => {
855                if !mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
856                    return Err(ErrResp::ErrParam(Some(
857                        "unsupport `hostUri` scheme".to_string(),
858                    )));
859                }
860                body.host_uri = Some(uri.to_string()); // change host name case.
861            }
862        }
863    }
864    if let Some(host_uri) = body.host_uri.as_ref() {
865        updates.host_uri = Some(host_uri.as_str());
866        count += 1;
867    }
868    if let Some(name) = body.name.as_ref() {
869        updates.name = Some(name.as_str());
870        count += 1;
871    }
872    if let Some(info) = body.info.as_ref() {
873        for (k, _) in info.iter() {
874            if k.len() == 0 {
875                return Err(ErrResp::ErrParam(Some(
876                    "`info` key must not be empty".to_string(),
877                )));
878            }
879        }
880        updates.info = Some(info);
881        count += 1;
882    }
883
884    if count == 0 {
885        return Err(ErrResp::ErrParam(Some(
886            "at least one parameter".to_string(),
887        )));
888    }
889    updates.modified_at = Some(Utc::now());
890    Ok(updates)
891}
892
893/// To check if the application code is used by the unit.
894///
895/// # Errors
896///
897/// Returns OK if the code is found or not. Otherwise errors will be returned.
898async fn check_code(
899    fn_name: &str,
900    unit_id: &str,
901    code: &str,
902    state: &AppState,
903) -> Result<bool, ErrResp> {
904    let cond = QueryCond {
905        unit_id: Some(unit_id),
906        code: Some(code),
907        ..Default::default()
908    };
909    match state.model.application().get(&cond).await {
910        Err(e) => {
911            error!("[{}] check code error: {}", fn_name, e);
912            return Err(ErrResp::ErrDb(Some(format!("check code error: {}", e))));
913        }
914        Ok(application) => match application {
915            None => Ok(false),
916            Some(_) => Ok(true),
917        },
918    }
919}
920
921fn application_list_transform(list: &Vec<Application>) -> Vec<response::GetApplicationData> {
922    let mut ret = vec![];
923    for application in list.iter() {
924        ret.push(application_transform(&application));
925    }
926    ret
927}
928
929fn application_list_transform_bytes(
930    list: &Vec<Application>,
931    with_start: bool,
932    with_end: bool,
933    format: Option<&request::ListFormat>,
934) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
935    let mut build_str = match with_start {
936        false => "".to_string(),
937        true => match format {
938            Some(request::ListFormat::Array) => "[".to_string(),
939            _ => "{\"data\":[".to_string(),
940        },
941    };
942    let mut is_first = with_start;
943
944    for item in list {
945        if is_first {
946            is_first = false;
947        } else {
948            build_str.push(',');
949        }
950        let json_str = match serde_json::to_string(&application_transform(item)) {
951            Err(e) => return Err(Box::new(e)),
952            Ok(str) => str,
953        };
954        build_str += json_str.as_str();
955    }
956
957    if with_end {
958        build_str += match format {
959            Some(request::ListFormat::Array) => "]",
960            _ => "]}",
961        }
962    }
963    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
964}
965
966fn application_transform(application: &Application) -> response::GetApplicationData {
967    response::GetApplicationData {
968        application_id: application.application_id.clone(),
969        code: application.code.clone(),
970        unit_id: application.unit_id.clone(),
971        unit_code: application.unit_code.clone(),
972        created_at: time_str(&application.created_at),
973        modified_at: time_str(&application.modified_at),
974        host_uri: application.host_uri.clone(),
975        name: application.name.clone(),
976        info: application.info.clone(),
977    }
978}
979
980async fn del_application_rsc(
981    fn_name: &str,
982    application_id: &str,
983    state: &AppState,
984) -> Result<(), ErrResp> {
985    let cond = network_route::QueryCond {
986        application_id: Some(application_id),
987        ..Default::default()
988    };
989    if let Err(e) = state.model.network_route().del(&cond).await {
990        error!("[{}] del network_route error: {}", fn_name, e);
991        return Err(ErrResp::ErrDb(Some(e.to_string())));
992    }
993
994    let cond = device_route::QueryCond {
995        application_id: Some(application_id),
996        ..Default::default()
997    };
998    if let Err(e) = state.model.device_route().del(&cond).await {
999        error!("[{}] del device_route error: {}", fn_name, e);
1000        return Err(ErrResp::ErrDb(Some(e.to_string())));
1001    }
1002
1003    let cond = dldata_buffer::QueryCond {
1004        application_id: Some(application_id),
1005        ..Default::default()
1006    };
1007    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
1008        error!("[{}] del dldata_buffer error: {}", fn_name, e);
1009        return Err(ErrResp::ErrDb(Some(e.to_string())));
1010    }
1011
1012    let cond = QueryCond {
1013        application_id: Some(application_id),
1014        ..Default::default()
1015    };
1016    if let Err(e) = state.model.application().del(&cond).await {
1017        error!("[{}] del application error: {}", fn_name, e);
1018        return Err(ErrResp::ErrDb(Some(e.to_string())));
1019    }
1020
1021    Ok(())
1022}
1023
1024/// Send delete control message.
1025async fn send_del_ctrl_message(
1026    fn_name: &str,
1027    application: Application,
1028    state: &AppState,
1029) -> Result<(), ErrResp> {
1030    if state.cache.is_some() {
1031        let msg = SendCtrlMsg::DelApplication {
1032            operation: CtrlMsgOp::DEL_APPLICATION.to_string(),
1033            new: CtrlDelApplication {
1034                unit_id: application.unit_id,
1035                unit_code: application.unit_code,
1036                application_id: application.application_id,
1037                application_code: application.code,
1038            },
1039        };
1040        let payload = match serde_json::to_vec(&msg) {
1041            Err(e) => {
1042                error!(
1043                    "[{}] marshal JSON for {} error: {}",
1044                    fn_name,
1045                    CtrlMsgOp::DEL_APPLICATION,
1046                    e
1047                );
1048                return Err(ErrResp::ErrRsc(Some(format!(
1049                    "marshal control message error: {}",
1050                    e
1051                ))));
1052            }
1053            Ok(payload) => payload,
1054        };
1055        let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1056        if let Err(e) = ctrl_sender.send_msg(payload).await {
1057            error!(
1058                "[{}] send control message for {} error: {}",
1059                fn_name,
1060                CtrlMsgOp::DEL_APPLICATION,
1061                e
1062            );
1063            return Err(ErrResp::ErrIntMsg(Some(format!(
1064                "send control message error: {}",
1065                e
1066            ))));
1067        }
1068    }
1069
1070    Ok(())
1071}
1072
1073/// To create a manager by:
1074/// - get a connection from the pool.
1075/// - register manager handlers.
1076async fn add_manager(
1077    fn_name: &str,
1078    state: &AppState,
1079    host_uri: &Url,
1080    unit_id: &str,
1081    unit_code: &str,
1082    id: &str,
1083    name: &str,
1084) -> Result<(), ErrResp> {
1085    let opts = MgrOptions {
1086        unit_id: unit_id.to_string(),
1087        unit_code: unit_code.to_string(),
1088        id: id.to_string(),
1089        name: name.to_string(),
1090        prefetch: Some(state.amqp_prefetch),
1091        persistent: state.amqp_persistent,
1092        shared_prefix: Some(state.mqtt_shared_prefix.clone()),
1093    };
1094    let msg = SendCtrlMsg::AddManager {
1095        operation: CtrlMsgOp::ADD_MANAGER.to_string(),
1096        new: CtrlAddManager {
1097            host_uri: host_uri.to_string(),
1098            mgr_options: opts,
1099        },
1100    };
1101    let payload = match serde_json::to_vec(&msg) {
1102        Err(e) => {
1103            error!("[{}] marshal JSON for {} error: {}", fn_name, name, e);
1104            return Err(ErrResp::ErrRsc(Some(format!("new manager error:{}", e))));
1105        }
1106        Ok(payload) => payload,
1107    };
1108    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1109    if let Err(e) = ctrl_sender.send_msg(payload).await {
1110        error!(
1111            "[{}] send control message for {} error: {}",
1112            fn_name, name, e
1113        );
1114        return Err(ErrResp::ErrIntMsg(Some(format!("new manager error:{}", e))));
1115    }
1116    Ok(())
1117}
1118
1119/// To delete an application manager.
1120async fn delete_manager(
1121    fn_name: &str,
1122    state: &AppState,
1123    application: &Application,
1124) -> Result<(), ErrResp> {
1125    let key = gen_mgr_key(application.unit_code.as_str(), application.code.as_str());
1126    let msg = SendCtrlMsg::DelManager {
1127        operation: CtrlMsgOp::DEL_MANAGER.to_string(),
1128        new: key.clone(),
1129    };
1130    let payload = match serde_json::to_vec(&msg) {
1131        Err(e) => {
1132            error!("[{}] marshal JSON for {} error: {}", fn_name, key, e);
1133            return Err(ErrResp::ErrRsc(Some(format!("delete manager error:{}", e))));
1134        }
1135        Ok(payload) => payload,
1136    };
1137    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
1138    if let Err(e) = ctrl_sender.send_msg(payload).await {
1139        error!(
1140            "[{}] send control message for {} error: {}",
1141            fn_name, key, e
1142        );
1143        return Err(ErrResp::ErrIntMsg(Some(format!(
1144            "delete manager error:{}",
1145            e
1146        ))));
1147    }
1148    Ok(())
1149}
1150
1151impl MgrHandler {
1152    /// Get device route information from cache or database. This function handles two cases:
1153    /// - with `network_code` and `network_addr` for private network devices.
1154    /// - with `device_id` for both private and public network devices.
1155    async fn get_device_route(
1156        &self,
1157        mgr: &ApplicationMgr,
1158        data: &Box<DlData>,
1159    ) -> Result<device_route::DeviceRouteCacheDlData, Box<DlDataResp>> {
1160        const FN_NAME: &'static str = "get_device_route";
1161
1162        if let Some(cache) = self.cache.as_ref() {
1163            match data.device_id.as_ref() {
1164                None => {
1165                    let cond = device_route::GetCacheQueryCond {
1166                        unit_code: mgr.unit_code(),
1167                        network_code: data.network_code.as_ref().unwrap().as_str(),
1168                        network_addr: data.network_addr.as_ref().unwrap().as_str(),
1169                    };
1170                    match cache.device_route().get_dldata(&cond).await {
1171                        Err(e) => {
1172                            error!("[{}] get device with error: {}", FN_NAME, e);
1173                            return Err(Box::new(DlDataResp {
1174                                correlation_id: data.correlation_id.clone(),
1175                                error: Some(err::E_DB.to_string()),
1176                                message: Some(format!("{}", e)),
1177                                ..Default::default()
1178                            }));
1179                        }
1180                        Ok(route) => match route {
1181                            None => {
1182                                warn!(
1183                                    "[{}] no device for {}.{}.{:?}",
1184                                    FN_NAME,
1185                                    mgr.unit_code(),
1186                                    mgr.name(),
1187                                    data.network_addr.as_ref()
1188                                );
1189                                return Err(Box::new(DlDataResp {
1190                                    correlation_id: data.correlation_id.clone(),
1191                                    error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1192                                    ..Default::default()
1193                                }));
1194                            }
1195                            Some(route) => return Ok(route),
1196                        },
1197                    }
1198                }
1199                Some(device_id) => {
1200                    let cond = device_route::GetCachePubQueryCond {
1201                        unit_id: mgr.unit_id(),
1202                        device_id: device_id.as_str(),
1203                    };
1204                    match cache.device_route().get_dldata_pub(&cond).await {
1205                        Err(e) => {
1206                            error!("[{}] get device with error: {}", FN_NAME, e);
1207                            return Err(Box::new(DlDataResp {
1208                                correlation_id: data.correlation_id.clone(),
1209                                error: Some(err::E_DB.to_string()),
1210                                message: Some(format!("{}", e)),
1211                                ..Default::default()
1212                            }));
1213                        }
1214                        Ok(route) => match route {
1215                            None => {
1216                                warn!(
1217                                    "[{}] no device for {}.{:?}",
1218                                    FN_NAME,
1219                                    mgr.unit_id(),
1220                                    data.device_id.as_ref(),
1221                                );
1222                                return Err(Box::new(DlDataResp {
1223                                    correlation_id: data.correlation_id.clone(),
1224                                    error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1225                                    ..Default::default()
1226                                }));
1227                            }
1228                            Some(route) => return Ok(route),
1229                        },
1230                    }
1231                }
1232            }
1233        }
1234
1235        // Get information from database.
1236        let cond = match data.device_id.as_ref() {
1237            None => device::QueryCond {
1238                device: Some(device::QueryOneCond {
1239                    unit_code: Some(mgr.unit_code()),
1240                    network_code: data.network_code.as_ref().unwrap().as_str(),
1241                    network_addr: data.network_addr.as_ref().unwrap().as_str(),
1242                }),
1243                ..Default::default()
1244            },
1245            Some(device_id) => device::QueryCond {
1246                unit_id: Some(mgr.unit_id()),
1247                device_id: Some(device_id.as_str()),
1248                ..Default::default()
1249            },
1250        };
1251        let device = match self.model.device().get(&cond).await {
1252            Err(e) => {
1253                error!("[{}] get device with error: {}", FN_NAME, e);
1254                return Err(Box::new(DlDataResp {
1255                    correlation_id: data.correlation_id.clone(),
1256                    error: Some(err::E_DB.to_string()),
1257                    message: Some(format!("{}", e)),
1258                    ..Default::default()
1259                }));
1260            }
1261            Ok(device) => match device {
1262                None => {
1263                    warn!(
1264                        "[{}] no device for {}.{:?} or {}.{}.{:?}",
1265                        FN_NAME,
1266                        mgr.unit_id(),
1267                        data.device_id.as_ref(),
1268                        mgr.unit_code(),
1269                        mgr.name(),
1270                        data.network_addr.as_ref()
1271                    );
1272                    return Err(Box::new(DlDataResp {
1273                        correlation_id: data.correlation_id.clone(),
1274                        error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1275                        ..Default::default()
1276                    }));
1277                }
1278                Some(device) => device,
1279            },
1280        };
1281        let unit_code = match device.unit_code.as_ref() {
1282            None => "",
1283            Some(_) => mgr.unit_code(),
1284        };
1285        Ok(device_route::DeviceRouteCacheDlData {
1286            net_mgr_key: gen_mgr_key(unit_code, device.network_code.as_str()),
1287            network_id: device.network_id,
1288            network_addr: device.network_addr,
1289            device_id: device.device_id,
1290            profile: device.profile,
1291        })
1292    }
1293
1294    async fn send_application_dldata_msg(
1295        &self,
1296        proc: &DateTime<Utc>,
1297        data_id: &str,
1298        unit_id: &str,
1299        profile: &str,
1300        data: &Box<DlData>,
1301    ) -> Result<(), ()> {
1302        const FN_NAME: &'static str = "send_application_dldata_msg";
1303
1304        if let Some(sender) = self.data_sender.as_ref() {
1305            let msg = SendDataMsg {
1306                kind: DataMsgKind::APP_DLDATA.to_string(),
1307                data: SendDataKind::AppDlData {
1308                    data_id: data_id.to_string(),
1309                    proc: time_str(proc),
1310                    status: DEF_DLDATA_STATUS,
1311                    unit_id: unit_id.to_string(),
1312                    device_id: data.device_id.clone(),
1313                    network_code: data.network_code.clone(),
1314                    network_addr: data.network_addr.clone(),
1315                    profile: profile.to_string(),
1316                    data: data.data.clone(),
1317                    extension: data.extension.clone(),
1318                },
1319            };
1320            let payload = match serde_json::to_vec(&msg) {
1321                Err(e) => {
1322                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
1323                    return Err(());
1324                }
1325                Ok(payload) => payload,
1326            };
1327            if let Err(e) = sender.send_msg(payload).await {
1328                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
1329                return Err(());
1330            }
1331        }
1332        Ok(())
1333    }
1334
1335    async fn send_network_dldata_msg(
1336        &self,
1337        proc: &DateTime<Utc>,
1338        netmgr_code: &str,
1339        dldata_buff: &dldata_buffer::DlDataBuffer,
1340        profile: &str,
1341        net_data: &NetworkDlData,
1342    ) -> Result<(), ()> {
1343        const FN_NAME: &'static str = "send_network_dldata_msg";
1344
1345        if let Some(sender) = self.data_sender.as_ref() {
1346            let msg = SendDataMsg {
1347                kind: DataMsgKind::NET_DLDATA.to_string(),
1348                data: SendDataKind::NetDlData {
1349                    data_id: dldata_buff.data_id.clone(),
1350                    proc: time_str(proc),
1351                    publish: net_data.publish.clone(),
1352                    status: DEF_DLDATA_STATUS,
1353                    unit_id: dldata_buff.unit_id.clone(),
1354                    device_id: dldata_buff.device_id.clone(),
1355                    network_code: netmgr_code.to_string(),
1356                    network_addr: dldata_buff.network_addr.clone(),
1357                    profile: profile.to_string(),
1358                    data: net_data.data.clone(),
1359                    extension: net_data.extension.clone(),
1360                },
1361            };
1362            let payload = match serde_json::to_vec(&msg) {
1363                Err(e) => {
1364                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
1365                    return Err(());
1366                }
1367                Ok(payload) => payload,
1368            };
1369            if let Err(e) = sender.send_msg(payload).await {
1370                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
1371                return Err(());
1372            }
1373        }
1374        Ok(())
1375    }
1376}
1377
1378#[async_trait]
1379impl EventHandler for MgrHandler {
1380    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus) {
1381        // Clear cache when manager status changed.
1382        if let Some(cache) = self.cache.as_ref() {
1383            if let Err(e) = cache.device().clear().await {
1384                error!(
1385                    "[on_status_change] {}.{} clear device cache error: {}",
1386                    mgr.unit_code(),
1387                    mgr.name(),
1388                    e
1389                );
1390            }
1391            if let Err(e) = cache.device_route().clear().await {
1392                error!(
1393                    "[on_status_change] {}.{} clear device_route cache error: {}",
1394                    mgr.unit_code(),
1395                    mgr.name(),
1396                    e
1397                );
1398            }
1399            if let Err(e) = cache.network_route().clear().await {
1400                error!(
1401                    "[on_status_change] {}.{} clear network_route cache error: {}",
1402                    mgr.unit_code(),
1403                    mgr.name(),
1404                    e
1405                );
1406            }
1407        }
1408
1409        match status {
1410            MgrStatus::NotReady => {
1411                error!(
1412                    "[on_status_change] {}.{} to NotReady",
1413                    mgr.unit_code(),
1414                    mgr.name()
1415                );
1416            }
1417            MgrStatus::Ready => {
1418                info!(
1419                    "[on_status_change] {}.{} to Ready",
1420                    mgr.unit_code(),
1421                    mgr.name()
1422                );
1423            }
1424        }
1425    }
1426
1427    // Do the following jobs:
1428    // - check if the destination device is valid for the unit.
1429    // - generate dldata buffer to trace data processing.
1430    // - send to the network manager.
1431    async fn on_dldata(
1432        &self,
1433        mgr: &ApplicationMgr,
1434        data: Box<DlData>,
1435    ) -> Result<Box<DlDataResp>, ()> {
1436        const FN_NAME: &'static str = "on_dldata";
1437
1438        // Check if the device is valid.
1439        let dldata_route = match self.get_device_route(mgr, &data).await {
1440            Err(e) => return Ok(e),
1441            Ok(route) => route,
1442        };
1443
1444        let now = Utc::now();
1445        let data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
1446
1447        self.send_application_dldata_msg(
1448            &now,
1449            data_id.as_str(),
1450            mgr.unit_id(),
1451            &dldata_route.profile,
1452            &data,
1453        )
1454        .await?;
1455
1456        // Check if the network exists.
1457        let network_mgr = {
1458            match self
1459                .network_mgrs
1460                .lock()
1461                .unwrap()
1462                .get(&dldata_route.net_mgr_key)
1463            {
1464                None => {
1465                    return Ok(Box::new(DlDataResp {
1466                        correlation_id: data.correlation_id,
1467                        error: Some(ErrReq::NETWORK_NOT_EXIST.1.to_string()),
1468                        ..Default::default()
1469                    }));
1470                }
1471                Some(mgr) => mgr.clone(),
1472            }
1473        };
1474
1475        let ts_nanos = match now.timestamp_nanos_opt() {
1476            None => {
1477                error!("[{}] cannot generate valid nanoseconds", FN_NAME);
1478                return Ok(Box::new(DlDataResp {
1479                    correlation_id: data.correlation_id.clone(),
1480                    error: Some(err::E_RSC.to_string()),
1481                    message: Some(format!("cannot generate valid nanoseconds")),
1482                    ..Default::default()
1483                }));
1484            }
1485            Some(ts) => ts,
1486        };
1487        let expired_at = Utc.timestamp_nanos(ts_nanos + DATA_EXPIRES_IN * 1_000_000_000);
1488        let dldata = dldata_buffer::DlDataBuffer {
1489            data_id: data_id.clone(),
1490            unit_id: mgr.unit_id().to_string(),
1491            unit_code: mgr.unit_code().to_string(),
1492            application_id: mgr.id().to_string(),
1493            application_code: mgr.name().to_string(),
1494            network_id: dldata_route.network_id,
1495            network_addr: dldata_route.network_addr.clone(),
1496            device_id: dldata_route.device_id,
1497            created_at: now,
1498            expired_at,
1499        };
1500        match self.model.dldata_buffer().add(&dldata).await {
1501            Err(e) => {
1502                error!("[{}] add data buffer with error: {}", FN_NAME, e);
1503                return Ok(Box::new(DlDataResp {
1504                    correlation_id: data.correlation_id,
1505                    error: Some(err::E_DB.to_string()),
1506                    message: Some(format!("{}", e)),
1507                    ..Default::default()
1508                }));
1509            }
1510            Ok(_) => (),
1511        }
1512
1513        let net_data = NetworkDlData {
1514            data_id,
1515            publish: time_str(&now),
1516            expires_in: DATA_EXPIRES_IN,
1517            network_addr: dldata_route.network_addr,
1518            data: data.data,
1519            extension: data.extension,
1520        };
1521        self.send_network_dldata_msg(
1522            &now,
1523            network_mgr.name(),
1524            &dldata,
1525            &dldata_route.profile,
1526            &net_data,
1527        )
1528        .await?;
1529        if let Err(e) = network_mgr.send_dldata(&net_data) {
1530            error!("[{}] send dldata to network with error: {}", FN_NAME, e);
1531            return Ok(Box::new(DlDataResp {
1532                correlation_id: data.correlation_id,
1533                error: Some(err::E_INT_MSG.to_string()),
1534                message: Some(format!("send data with error: {}", e)),
1535                ..Default::default()
1536            }));
1537        }
1538
1539        Ok(Box::new(DlDataResp {
1540            correlation_id: data.correlation_id,
1541            data_id: Some(net_data.data_id),
1542            ..Default::default()
1543        }))
1544    }
1545}
1546
1547#[async_trait]
1548impl QueueEventHandler for CtrlSenderHandler {
1549    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
1550        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
1551        let queue_name = queue.name();
1552        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
1553    }
1554
1555    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
1556        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
1557        let queue_name = queue.name();
1558        match status {
1559            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
1560            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
1561        }
1562    }
1563}
1564
1565#[async_trait]
1566impl MessageHandler for CtrlSenderHandler {
1567    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
1568}
1569
1570#[async_trait]
1571impl QueueEventHandler for CtrlReceiverHandler {
1572    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
1573        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
1574        let queue_name = queue.name();
1575        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
1576    }
1577
1578    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
1579        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
1580        let queue_name = queue.name();
1581        match status {
1582            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
1583            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
1584        }
1585    }
1586}
1587
1588#[async_trait]
1589impl MessageHandler for CtrlReceiverHandler {
1590    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
1591        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
1592        let queue_name = queue.name();
1593
1594        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
1595            Err(e) => {
1596                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
1597                warn!(
1598                    "[{}] {} parse JSON error: {}, src: {}",
1599                    FN_NAME, queue_name, e, src_str
1600                );
1601                if let Err(e) = msg.ack().await {
1602                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1603                }
1604                return;
1605            }
1606            Ok(msg) => msg,
1607        };
1608        match ctrl_msg {
1609            RecvCtrlMsg::DelApplication { new: _new } => {}
1610            RecvCtrlMsg::AddManager { new } => {
1611                let host_uri = match Url::parse(new.host_uri.as_str()) {
1612                    Err(e) => {
1613                        warn!("[{}] {} hostUri error: {}", FN_NAME, queue_name, e);
1614                        if let Err(e) = msg.ack().await {
1615                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1616                        }
1617                        return;
1618                    }
1619                    Ok(uri) => uri,
1620                };
1621                let handler = MgrHandler {
1622                    model: self.model.clone(),
1623                    cache: self.cache.clone(),
1624                    network_mgrs: self.network_mgrs.clone(),
1625                    data_sender: self.data_sender.clone(),
1626                };
1627                let unit_code = new.mgr_options.unit_code.clone();
1628                let name = new.mgr_options.name.clone();
1629                let mgr = match ApplicationMgr::new(
1630                    self.mq_conns.clone(),
1631                    &host_uri,
1632                    new.mgr_options,
1633                    Arc::new(handler),
1634                ) {
1635                    Err(e) => {
1636                        error!("[{}] {} new manager error: {}", FN_NAME, queue_name, e);
1637                        if let Err(e) = msg.ack().await {
1638                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1639                        }
1640                        return;
1641                    }
1642                    Ok(mgr) => {
1643                        debug!("[{}] {} new manager", FN_NAME, queue_name);
1644                        mgr
1645                    }
1646                };
1647                let key = gen_mgr_key(unit_code.as_str(), name.as_str());
1648                let old_mgr = {
1649                    self.application_mgrs
1650                        .lock()
1651                        .unwrap()
1652                        .insert(key.clone(), mgr)
1653                };
1654                if let Some(mgr) = old_mgr {
1655                    if let Err(e) = mgr.close().await {
1656                        error!(
1657                            "[{}] {} close old manager {} error: {}",
1658                            FN_NAME, queue_name, key, e
1659                        );
1660                    } else {
1661                        debug!("[{}] {} close old manager {}", FN_NAME, queue_name, key);
1662                    }
1663                }
1664                info!("[{}] {} manager {} added", FN_NAME, queue_name, key);
1665            }
1666            RecvCtrlMsg::DelManager { new } => {
1667                let old_mgr = { self.application_mgrs.lock().unwrap().remove(&new) };
1668                match old_mgr {
1669                    None => {
1670                        error!("[{}] {} get no manager {}", FN_NAME, queue_name, new);
1671                        if let Err(e) = msg.ack().await {
1672                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1673                        }
1674                        return;
1675                    }
1676                    Some(mgr) => {
1677                        if let Err(e) = mgr.close().await {
1678                            error!(
1679                                "[{}] {} close old manager {} error: {}",
1680                                FN_NAME, queue_name, new, e
1681                            );
1682                        } else {
1683                            debug!("[{}] {} close old manager {}", FN_NAME, queue_name, new);
1684                        }
1685                    }
1686                }
1687                info!("[{}] {} manager {} deleted", FN_NAME, queue_name, new);
1688            }
1689        }
1690        if let Err(e) = msg.ack().await {
1691            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
1692        }
1693    }
1694}