use crate::alink::{AlinkRequest, AlinkResponse};
use crate::{Error, Result, ThreeTuple};
use log::*;
use regex::Regex;
use rumqttc::{AsyncClient, QoS};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DataModelMsg {
pub product_key: Option<String>,
pub device_name: Option<String>,
pub data: MsgEnum,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PropertyPost {
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct EventPost {
pub event_id: String,
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PropertySetReply {
pub msg_id: u64,
pub code: u64,
pub data: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct SyncServiceReply {
pub msg_id: u64,
pub rrpc_id: String,
pub service_id: String,
pub code: u64,
pub data: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct AsyncServiceReply {
pub msg_id: u64,
pub service_id: String,
pub code: u64,
pub data: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RawServiceReply {
pub rrpc_id: String,
pub data: Vec<u8>,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct GetDesired {
pub params: Vec<String>,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DeleteDesired {
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PropertyBatchPost {
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct HistoryPost {
pub params: Vec<Value>,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub enum MsgEnum {
PropertyPost(PropertyPost),
EventPost(EventPost),
PropertySetReply(PropertySetReply),
AsyncServiceReply(AsyncServiceReply),
SyncServiceReply(SyncServiceReply),
RawData(RawData),
RawServiceReply(RawServiceReply),
GetDesired(GetDesired),
DeleteDesired(DeleteDesired),
PropertyBatchPost(PropertyBatchPost),
HistoryPost(HistoryPost),
}
impl DataModelMsg {
pub fn new(data: MsgEnum) -> Self {
Self {
product_key: None,
device_name: None,
data,
}
}
pub fn to_payload(&self, ack: i32) -> Result<(String, Vec<u8>)> {
let pk = self.product_key.as_deref().unwrap_or("");
let dn = self.device_name.as_deref().unwrap_or("");
self.data.to_payload(pk, dn, ack)
}
}
impl DataModelMsg {
#[inline]
pub fn property_post(params: Value) -> Self {
DataModelMsg::new(MsgEnum::PropertyPost(PropertyPost { params }))
}
#[inline]
pub fn event_post(event_id: String, params: Value) -> Self {
DataModelMsg::new(MsgEnum::EventPost(EventPost { event_id, params }))
}
#[inline]
pub fn property_set_reply(code: u64, data: Value, msg_id: u64) -> Self {
DataModelMsg::new(MsgEnum::PropertySetReply(PropertySetReply {
msg_id,
code,
data,
}))
}
#[inline]
pub fn async_service_reply(code: u64, data: Value, msg_id: u64, service_id: String) -> Self {
DataModelMsg::new(MsgEnum::AsyncServiceReply(AsyncServiceReply {
msg_id,
code,
service_id,
data,
}))
}
#[inline]
pub fn sync_service_reply(
code: u64,
data: Value,
rrpc_id: String,
msg_id: u64,
service_id: String,
) -> Self {
DataModelMsg::new(MsgEnum::SyncServiceReply(SyncServiceReply {
rrpc_id,
msg_id,
code,
service_id,
data,
}))
}
#[inline]
pub fn raw_data(data: Vec<u8>) -> Self {
DataModelMsg::new(MsgEnum::RawData(RawData { data }))
}
#[inline]
pub fn raw_service_reply(data: Vec<u8>, rrpc_id: String) -> Self {
DataModelMsg::new(MsgEnum::RawServiceReply(RawServiceReply { rrpc_id, data }))
}
#[inline]
pub fn history_post(params: Vec<Value>) -> Self {
DataModelMsg::new(MsgEnum::HistoryPost(HistoryPost { params }))
}
}
impl MsgEnum {
pub fn to_payload(&self, pk: &str, dn: &str, ack: i32) -> Result<(String, Vec<u8>)> {
use MsgEnum::*;
match &self {
PropertyPost(data) => {
let topic = format!("/sys/{}/{}/thing/event/property/post", pk, dn);
let method = "thing.event.property.post";
let payload = AlinkRequest::new(method, data.params.clone(), ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
EventPost(data) => {
let topic = format!("/sys/{}/{}/thing/event/{}/post", pk, dn, data.event_id);
let method = format!("thing.event.{}.post", data.event_id);
let payload = AlinkRequest::new(&method, data.params.clone(), ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
PropertySetReply(data) => {
let topic = format!("/sys/{}/{}/thing/service/property/set_reply", pk, dn);
let payload = AlinkResponse::new(data.msg_id, data.code, data.data.clone());
Ok((topic, serde_json::to_vec(&payload)?))
}
AsyncServiceReply(data) => {
let topic = format!("/sys/{}/{}/thing/service/{}_reply", pk, dn, data.service_id);
let payload = AlinkResponse::new(data.msg_id, data.code, data.data.clone());
Ok((topic, serde_json::to_vec(&payload)?))
}
SyncServiceReply(data) => {
let topic = format!(
"/ext/rrpc/{}/sys/{}/{}/thing/service/{}",
data.rrpc_id, pk, dn, data.service_id
);
let payload = AlinkResponse::new(data.msg_id, data.code, data.data.clone());
Ok((topic, serde_json::to_vec(&payload)?))
}
RawData(data) => {
let topic = format!("/sys/{}/{}/thing/model/up_raw", pk, dn);
Ok((topic, data.data.clone()))
}
RawServiceReply(data) => {
let topic = format!(
"/ext/rrpc/{}/sys/{}/{}/thing/model/down_raw_reply",
data.rrpc_id, pk, dn
);
Ok((topic, data.data.clone()))
}
GetDesired(data) => {
let topic = format!("/sys/{}/{}/thing/property/desired/get", pk, dn);
let method = "thing.property.desired.get";
let params = Value::Array(
data.params
.iter()
.map(|p| Value::String(p.clone()))
.collect(),
);
let payload = AlinkRequest::new(method, params, ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
DeleteDesired(data) => {
let topic = format!("/sys/{}/{}/thing/property/desired/delete", pk, dn);
let method = "thing.property.desired.delete";
let payload = AlinkRequest::new(method, data.params.clone(), ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
PropertyBatchPost(data) => {
let topic = format!("/sys/{}/{}/thing/event/property/batch/post", pk, dn);
let method = "thing.event.property.batch.post";
let payload = AlinkRequest::new(method, data.params.clone(), ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
HistoryPost(data) => {
let topic = format!("/sys/{}/{}/thing/event/property/history/post", pk, dn);
let method = "thing.event.property.history.post";
let payload = AlinkRequest::new(method, Value::Array(data.params.clone()), ack);
Ok((topic, serde_json::to_vec(&payload)?))
}
}
}
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct DataModelRecv<T> {
pub product_key: String,
pub device_name: String,
pub data: T,
}
impl<T> DataModelRecv<T> {
pub fn new(product_key: &str, device_name: &str, data: T) -> Self {
Self {
product_key: product_key.to_string(),
device_name: device_name.to_string(),
data,
}
}
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct GenericReply {
pub msg_id: u64,
pub code: u64,
pub data: Value,
pub message: String,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct PropertySet {
pub msg_id: u64,
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct SyncServiceInvoke {
pub msg_id: u64,
pub rrpc_id: String,
pub service_id: String,
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct AsyncServiceInvoke {
pub msg_id: u64,
pub service_id: String,
pub params: Value,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RawData {
pub data: Vec<u8>,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RawServiceInvoke {
pub rrpc_id: String,
pub data: Vec<u8>,
}