use crate::envelope::EnvelopeKind;
use crate::mapping::{CompiledMappedJson, MappedJsonSpec, MappedRule};
use crate::northward::codec::{decode_downlink_envelope, DecodeError};
use crate::northward::payload::MappedJsonConfig;
use crate::{NorthwardEvent, ServerRpcResponse, WritePoint};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone, Copy)]
pub enum DownlinkKind {
WritePoint,
CommandReceived,
RpcResponseReceived,
}
impl From<DownlinkKind> for EnvelopeKind {
#[inline]
fn from(value: DownlinkKind) -> Self {
match value {
DownlinkKind::WritePoint => EnvelopeKind::WritePoint,
DownlinkKind::CommandReceived => EnvelopeKind::CommandReceived,
DownlinkKind::RpcResponseReceived => EnvelopeKind::RpcResponseReceived,
}
}
}
impl TryFrom<EnvelopeKind> for DownlinkKind {
type Error = ();
#[inline]
fn try_from(value: EnvelopeKind) -> Result<Self, Self::Error> {
match value {
EnvelopeKind::WritePoint => Ok(DownlinkKind::WritePoint),
EnvelopeKind::CommandReceived => Ok(DownlinkKind::CommandReceived),
EnvelopeKind::RpcResponseReceived => Ok(DownlinkKind::RpcResponseReceived),
_ => Err(()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DownlinkConfig {
#[serde(default = "default_enabled_true")]
pub enabled: bool,
#[serde(default)]
pub write_point: EventDownlink,
#[serde(default)]
pub command_received: EventDownlink,
#[serde(default)]
pub rpc_response_received: EventDownlink,
}
impl Default for DownlinkConfig {
fn default() -> Self {
Self {
enabled: true,
write_point: EventDownlink::default(),
command_received: EventDownlink::default(),
rpc_response_received: EventDownlink::default(),
}
}
}
#[inline]
fn default_enabled_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EventDownlink {
#[serde(default)]
pub enabled: bool,
#[serde(default = "EventDownlink::default_downlink_topic")]
pub topic: String,
#[serde(default)]
pub payload: DownlinkPayloadConfig,
#[serde(default)]
pub ack_policy: AckPolicy,
#[serde(default)]
pub failure_policy: FailurePolicy,
}
impl Default for EventDownlink {
fn default() -> Self {
Self {
enabled: false,
topic: EventDownlink::default_downlink_topic(),
payload: DownlinkPayloadConfig::default(),
ack_policy: AckPolicy::OnSuccess,
failure_policy: FailurePolicy::Drop,
}
}
}
impl EventDownlink {
fn default_downlink_topic() -> String {
"persistent://public/default/ng.downlink".to_string()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum DownlinkPayloadConfig {
#[default]
EnvelopeJson,
MappedJson {
config: MappedJsonConfig,
#[serde(default)]
filter: MappedDownlinkFilterConfig,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(tag = "mode", rename_all = "snake_case")]
pub enum MappedDownlinkFilterConfig {
#[default]
None,
JsonPointer {
pointer: String,
equals: String,
},
Property {
key: String,
equals: String,
},
Key {
equals: String,
},
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AckPolicy {
#[default]
OnSuccess,
Always,
Never,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum FailurePolicy {
#[default]
Drop,
Error,
}
#[derive(Debug, Clone)]
pub struct DownlinkRoute {
pub kind: DownlinkKind,
pub mapping: EventDownlink,
}
#[derive(Debug)]
pub struct DownlinkRouteTable {
pub topics: Arc<[String]>,
pub by_topic: HashMap<String, Arc<[DownlinkRoute]>>,
}
#[derive(Debug, Clone, Copy)]
pub struct KeyValue<'a> {
pub key: &'a str,
pub value: &'a str,
}
#[derive(Debug, Clone, Copy)]
pub struct DownlinkMessageMeta<'a> {
pub key: Option<&'a str>,
pub properties: Option<&'a [KeyValue<'a>]>,
}
pub fn validate_exact_topic(topic: &str) -> Result<String, String> {
let t = topic.trim();
if t.is_empty() {
return Err("downlink topic is empty".to_string());
}
if t.contains("{{") || t.contains("}}") {
return Err("downlink topic must be exact (templates are not supported)".to_string());
}
if t.contains('*') {
return Err("downlink topic must be exact (wildcards are not supported)".to_string());
}
if t.starts_with("re:") || t.starts_with("regex:") {
return Err("downlink topic must be exact (regex is not supported)".to_string());
}
Ok(t.to_string())
}
pub fn build_route_table(routes: Vec<DownlinkRoute>) -> Result<DownlinkRouteTable, String> {
let mut by_topic: HashMap<String, Vec<DownlinkRoute>> = HashMap::new();
for mut r in routes {
let normalized = validate_exact_topic(&r.mapping.topic)?;
r.mapping.topic = normalized.clone();
by_topic.entry(normalized).or_default().push(r);
}
for (topic, rs) in by_topic.iter() {
let Some(first) = rs.first() else {
continue;
};
for r in rs.iter().skip(1) {
if r.mapping.ack_policy != first.mapping.ack_policy {
return Err(format!(
"downlink routes on the same topic must share the same ack_policy (topic={topic})"
));
}
if r.mapping.failure_policy != first.mapping.failure_policy {
return Err(format!(
"downlink routes on the same topic must share the same failure_policy (topic={topic})"
));
}
}
}
let mut topics: Vec<String> = by_topic.keys().cloned().collect();
topics.sort();
let mut compiled: HashMap<String, Arc<[DownlinkRoute]>> =
HashMap::with_capacity(by_topic.len());
for (topic, mut rs) in by_topic {
rs.sort_by_key(|r| -route_specificity(r));
compiled.insert(topic, Arc::from(rs.into_boxed_slice()));
}
Ok(DownlinkRouteTable {
topics: Arc::from(topics.into_boxed_slice()),
by_topic: compiled,
})
}
#[inline]
fn route_specificity(r: &DownlinkRoute) -> i32 {
match &r.mapping.payload {
DownlinkPayloadConfig::EnvelopeJson => 50,
DownlinkPayloadConfig::MappedJson { filter, .. } => match filter {
MappedDownlinkFilterConfig::None => 0,
MappedDownlinkFilterConfig::Property { .. }
| MappedDownlinkFilterConfig::Key { .. } => 100,
MappedDownlinkFilterConfig::JsonPointer { .. } => 80,
},
}
}
fn mapped_filter_matches(
meta: &DownlinkMessageMeta<'_>,
input: &Value,
filter: &MappedDownlinkFilterConfig,
) -> bool {
match filter {
MappedDownlinkFilterConfig::None => true,
MappedDownlinkFilterConfig::JsonPointer { pointer, equals } => {
let Some(x) = input.pointer(pointer.as_str()) else {
return false;
};
match x {
Value::String(s) => s == equals,
Value::Number(n) => n.to_string() == *equals,
Value::Bool(b) => b.to_string() == *equals,
_ => false,
}
}
MappedDownlinkFilterConfig::Property { key, equals } => {
let Some(props) = meta.properties else {
return false;
};
props.iter().any(|kv| kv.key == *key && kv.value == *equals)
}
MappedDownlinkFilterConfig::Key { equals } => meta.key.is_some_and(|k| k == equals),
}
}
pub fn decode_event(
route: &DownlinkRoute,
meta: &DownlinkMessageMeta<'_>,
bytes: &[u8],
) -> Result<Option<NorthwardEvent>, DecodeError> {
match &route.mapping.payload {
DownlinkPayloadConfig::EnvelopeJson => {
decode_downlink_envelope(bytes, EnvelopeKind::from(route.kind))
}
DownlinkPayloadConfig::MappedJson { config, filter } => {
decode_mapped(route.kind, meta, bytes, config, filter)
}
}
}
fn decode_mapped(
kind: DownlinkKind,
meta: &DownlinkMessageMeta<'_>,
bytes: &[u8],
cfg: &MappedJsonConfig,
filter: &MappedDownlinkFilterConfig,
) -> Result<Option<NorthwardEvent>, DecodeError> {
let input: Value = serde_json::from_slice(bytes)?;
if !mapped_filter_matches(meta, &input, filter) {
return Ok(None);
}
let rules: Vec<MappedRule> = cfg
.iter()
.map(|(out_path, expr)| MappedRule {
out_path: out_path.clone(),
expr: expr.clone(),
})
.collect();
let spec = MappedJsonSpec { rules };
let compiled = CompiledMappedJson::compile(&spec)?;
let out = compiled.apply(&input)?;
match kind {
DownlinkKind::WritePoint => {
let wp: WritePoint =
serde_json::from_value(out).map_err(|e| DecodeError::Payload(e.to_string()))?;
Ok(Some(NorthwardEvent::WritePoint(wp)))
}
DownlinkKind::CommandReceived => {
let cmd: crate::Command =
serde_json::from_value(out).map_err(|e| DecodeError::Payload(e.to_string()))?;
Ok(Some(NorthwardEvent::CommandReceived(cmd)))
}
DownlinkKind::RpcResponseReceived => {
let resp: ServerRpcResponse =
serde_json::from_value(out).map_err(|e| DecodeError::Payload(e.to_string()))?;
Ok(Some(NorthwardEvent::RpcResponseReceived(resp)))
}
}
}