use crate::{
envelope::{
EnvelopeApp, EnvelopeDevice, EnvelopeKind, EnvelopeMeta, NorthwardEnvelope,
NorthwardEnvelopePayload, NorthwardEnvelopePayloadRef, NorthwardEnvelopeRef, WireEnvelope,
},
mapping::MappingError,
northward::{runtime_api::NorthwardRuntimeApi, NorthwardData, NorthwardEvent},
DataType, NGValue, PointValue,
};
use chrono::Utc;
use serde::Serialize;
use serde_json::Value;
use std::{collections::BTreeMap, sync::Arc};
use thiserror::Error;
#[derive(Debug, Clone, Copy)]
pub enum NorthwardUplinkFormat {
EnvelopeJson,
Kv {
include_meta: bool,
},
TimeseriesRows {
include_meta: bool,
},
}
#[derive(Debug, Error)]
pub enum EncodeError {
#[error("json encode failed: {0}")]
Json(#[from] serde_json::Error),
#[error("mapping failed: {0}")]
Mapping(#[from] MappingError),
}
#[derive(Debug, Error)]
pub enum DecodeError {
#[error("json decode failed: {0}")]
Json(#[from] serde_json::Error),
#[error("schema error: {0}")]
Schema(String),
#[error("payload decode failed: {0}")]
Payload(String),
#[error("mapping failed: {0}")]
Mapping(#[from] MappingError),
}
pub fn encode_uplink(
format: NorthwardUplinkFormat,
plugin_type: &Arc<str>,
app_id: i32,
app_name: &Arc<str>,
data: &NorthwardData,
runtime: &dyn NorthwardRuntimeApi,
) -> Result<Vec<u8>, EncodeError> {
match format {
NorthwardUplinkFormat::EnvelopeJson => {
let env = NorthwardEnvelopeRef::v1(NorthwardEnvelopePayloadRef::Data(data))
.with_meta(build_envelope_meta(plugin_type, app_id, app_name, data));
Ok(serde_json::to_vec(&env)?)
}
NorthwardUplinkFormat::Kv { include_meta } => {
let kv = build_kv(data, runtime, include_meta);
Ok(serde_json::to_vec(&kv)?)
}
NorthwardUplinkFormat::TimeseriesRows { include_meta } => {
let rows = build_timeseries_rows(data, runtime, include_meta);
Ok(serde_json::to_vec(&rows)?)
}
}
}
pub fn decode_downlink_envelope(
bytes: &[u8],
expected_kind: EnvelopeKind,
) -> Result<Option<NorthwardEvent>, DecodeError> {
let wire: WireEnvelope<Value> = serde_json::from_slice(bytes)?;
if wire.schema_version != 1 {
return Err(DecodeError::Schema(format!(
"unsupported schema_version {}",
wire.schema_version
)));
}
if wire.event.kind != expected_kind {
return Ok(None);
}
let env = NorthwardEnvelope::try_from(wire).map_err(|e| DecodeError::Payload(e.to_string()))?;
match (expected_kind, env.payload) {
(
EnvelopeKind::WritePoint,
NorthwardEnvelopePayload::Event(ev @ NorthwardEvent::WritePoint(_)),
) => Ok(Some(ev)),
(
EnvelopeKind::CommandReceived,
NorthwardEnvelopePayload::Event(ev @ NorthwardEvent::CommandReceived(_)),
) => Ok(Some(ev)),
(
EnvelopeKind::RpcResponseReceived,
NorthwardEnvelopePayload::Event(ev @ NorthwardEvent::RpcResponseReceived(_)),
) => Ok(Some(ev)),
(other, _) => Err(DecodeError::Schema(format!(
"expected_kind {other:?} is not a supported downlink kind"
))),
}
}
fn build_envelope_meta(
plugin_type: &Arc<str>,
app_id: i32,
app_name: &Arc<str>,
data: &NorthwardData,
) -> EnvelopeMeta {
let (device_id, device_name, device_type) = match data {
NorthwardData::DeviceConnected(d) => (
d.device_id,
Arc::<str>::from(d.device_name.as_str()),
Some(Arc::<str>::from(d.device_type.as_str())),
),
NorthwardData::DeviceDisconnected(d) => (
d.device_id,
Arc::<str>::from(d.device_name.as_str()),
Some(Arc::<str>::from(d.device_type.as_str())),
),
NorthwardData::Telemetry(t) => {
(t.device_id, Arc::<str>::from(t.device_name.as_str()), None)
}
NorthwardData::Attributes(a) => {
(a.device_id, Arc::<str>::from(a.device_name.as_str()), None)
}
NorthwardData::Alarm(a) => (a.device_id, Arc::<str>::from(a.device_name.as_str()), None),
NorthwardData::RpcResponse(r) => {
let name = r.device_name.as_deref().unwrap_or_default();
(r.device_id, Arc::<str>::from(name), None)
}
NorthwardData::WritePointResponse(r) => (r.device_id, Arc::<str>::from(""), None),
};
let ts_ms = match data {
NorthwardData::DeviceConnected(_) | NorthwardData::DeviceDisconnected(_) => {
Utc::now().timestamp_millis()
}
NorthwardData::Telemetry(t) => t.timestamp.timestamp_millis(),
NorthwardData::Attributes(a) => a.timestamp.timestamp_millis(),
NorthwardData::Alarm(a) => a.timestamp.timestamp_millis(),
NorthwardData::RpcResponse(r) => r.timestamp.timestamp_millis(),
NorthwardData::WritePointResponse(r) => r.completed_at.timestamp_millis(),
};
EnvelopeMeta {
ts_ms,
app: EnvelopeApp {
id: app_id,
name: Arc::clone(app_name),
plugin_type: Arc::clone(plugin_type),
},
device: EnvelopeDevice {
id: device_id,
name: device_name,
r#type: device_type,
},
}
}
#[derive(Debug, Serialize)]
pub struct KvEnvelope {
ts_ms: i64,
values: BTreeMap<Arc<str>, KvValue>,
}
#[derive(Debug, Serialize)]
pub struct KvTypedValue {
value: NGValue,
data_type: DataType,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
pub enum KvValue {
Plain(NGValue),
Typed(KvTypedValue),
}
fn build_kv(
data: &NorthwardData,
runtime: &dyn NorthwardRuntimeApi,
include_meta: bool,
) -> KvEnvelope {
match data {
NorthwardData::Telemetry(t) => {
let mut values: BTreeMap<Arc<str>, KvValue> = BTreeMap::new();
for pv in t.values.iter() {
if include_meta {
if let Some(pm) = runtime.get_point_meta(pv.point_id) {
values.insert(
Arc::clone(&pv.point_key),
KvValue::Typed(KvTypedValue {
value: pv.value.clone(),
data_type: pm.data_type,
}),
);
continue;
}
}
values.insert(Arc::clone(&pv.point_key), KvValue::Plain(pv.value.clone()));
}
KvEnvelope {
ts_ms: t.timestamp.timestamp_millis(),
values,
}
}
NorthwardData::Attributes(a) => {
let mut values: BTreeMap<Arc<str>, KvValue> = BTreeMap::new();
for pv in a
.client_attributes
.iter()
.chain(a.shared_attributes.iter())
.chain(a.server_attributes.iter())
{
if include_meta {
if let Some(pm) = runtime.get_point_meta(pv.point_id) {
values.insert(
Arc::clone(&pv.point_key),
KvValue::Typed(KvTypedValue {
value: pv.value.clone(),
data_type: pm.data_type,
}),
);
continue;
}
}
values.insert(Arc::clone(&pv.point_key), KvValue::Plain(pv.value.clone()));
}
KvEnvelope {
ts_ms: a.timestamp.timestamp_millis(),
values,
}
}
_ => unreachable!(),
}
}
#[derive(Debug, Serialize)]
pub struct TimeseriesRow {
ts_ms: i64,
point_id: i32,
point_key: Arc<str>,
value: NGValue,
data_type: Option<DataType>,
}
fn build_timeseries_rows(
data: &NorthwardData,
runtime: &dyn NorthwardRuntimeApi,
include_meta: bool,
) -> Vec<TimeseriesRow> {
let mk_row = |ts_ms: i64, pv: &PointValue| -> TimeseriesRow {
let data_type = if include_meta {
runtime.get_point_meta(pv.point_id).map(|m| m.data_type)
} else {
None
};
TimeseriesRow {
ts_ms,
point_id: pv.point_id,
point_key: Arc::clone(&pv.point_key),
value: pv.value.clone(),
data_type,
}
};
match data {
NorthwardData::Telemetry(t) => {
let ts_ms = t.timestamp.timestamp_millis();
t.values.iter().map(|pv| mk_row(ts_ms, pv)).collect()
}
NorthwardData::Attributes(a) => {
let ts_ms = a.timestamp.timestamp_millis();
a.client_attributes
.iter()
.chain(a.shared_attributes.iter())
.chain(a.server_attributes.iter())
.map(|pv| mk_row(ts_ms, pv))
.collect()
}
_ => vec![],
}
}