use serde::{Deserialize, Serialize};
use crate::error::{Result, RoutingError};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RouterConfig {
#[serde(default = "default_router_name")]
pub name: String,
pub routes: Vec<Route>,
}
fn default_router_name() -> String {
"zerodds-router".to_string()
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Route {
pub name: String,
pub input: Endpoint,
pub output: Endpoint,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filter: Option<ContentFilter>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transform: Option<Transform>,
#[serde(default = "default_true")]
pub loop_guard: bool,
#[serde(default)]
pub preserve_source_timestamp: bool,
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Endpoint {
pub domain: i32,
pub topic: String,
#[serde(default = "default_type")]
pub type_name: String,
#[serde(default)]
pub keyed: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub partition: Vec<String>,
#[serde(default)]
pub qos: QosSpec,
}
fn default_type() -> String {
"*".to_string()
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QosSpec {
#[serde(default = "default_true")]
pub reliable: bool,
#[serde(default)]
pub durability: Durability,
#[serde(default)]
pub ownership: Ownership,
#[serde(default)]
pub ownership_strength: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data_representation: Option<Vec<i16>>,
}
impl Default for QosSpec {
fn default() -> Self {
Self {
reliable: true,
durability: Durability::default(),
ownership: Ownership::default(),
ownership_strength: 0,
data_representation: None,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Durability {
#[default]
Volatile,
TransientLocal,
Transient,
Persistent,
}
impl From<Durability> for zerodds_qos::DurabilityKind {
fn from(d: Durability) -> Self {
match d {
Durability::Volatile => zerodds_qos::DurabilityKind::Volatile,
Durability::TransientLocal => zerodds_qos::DurabilityKind::TransientLocal,
Durability::Transient => zerodds_qos::DurabilityKind::Transient,
Durability::Persistent => zerodds_qos::DurabilityKind::Persistent,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Ownership {
#[default]
Shared,
Exclusive,
}
impl From<Ownership> for zerodds_qos::OwnershipKind {
fn from(o: Ownership) -> Self {
match o {
Ownership::Shared => zerodds_qos::OwnershipKind::Shared,
Ownership::Exclusive => zerodds_qos::OwnershipKind::Exclusive,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContentFilter {
pub expression: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub parameters: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct Transform {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub rename: Vec<RenameRule>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub set_const: Vec<SetConstRule>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub drop: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RenameRule {
pub to: String,
pub from: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SetConstRule {
pub field: String,
pub value: String,
}
impl RouterConfig {
pub fn validate(&self) -> Result<()> {
let mut seen = std::collections::BTreeSet::new();
for r in &self.routes {
if r.name.trim().is_empty() {
return Err(RoutingError::Config("route with empty name".into()));
}
if !seen.insert(r.name.clone()) {
return Err(RoutingError::Config(format!(
"duplicate route name '{}'",
r.name
)));
}
if r.input.keyed != r.output.keyed {
return Err(RoutingError::Config(format!(
"route '{}': input/output keyedness disagree",
r.name
)));
}
let same_endpoint = r.input.domain == r.output.domain
&& r.input.topic == r.output.topic
&& r.input.partition == r.output.partition;
if same_endpoint && !r.loop_guard {
return Err(RoutingError::Config(format!(
"route '{}': identical input and output endpoint without loop_guard \
would forward to itself",
r.name
)));
}
}
Ok(())
}
pub fn from_json(s: &str) -> Result<Self> {
let cfg: Self =
serde_json::from_str(s).map_err(|e| RoutingError::Config(format!("json: {e}")))?;
cfg.validate()?;
Ok(cfg)
}
pub fn to_json(&self) -> Result<String> {
serde_json::to_string_pretty(self).map_err(|e| RoutingError::Config(format!("json: {e}")))
}
pub fn from_xml(s: &str) -> Result<Self> {
let cfg = crate::xml::parse_router_xml(s)?;
cfg.validate()?;
Ok(cfg)
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn route(name: &str, in_dom: i32, out_dom: i32) -> Route {
Route {
name: name.into(),
input: Endpoint {
domain: in_dom,
topic: "T".into(),
type_name: "*".into(),
keyed: false,
partition: vec![],
qos: QosSpec::default(),
},
output: Endpoint {
domain: out_dom,
topic: "T".into(),
type_name: "*".into(),
keyed: false,
partition: vec![],
qos: QosSpec::default(),
},
filter: None,
transform: None,
loop_guard: true,
preserve_source_timestamp: false,
}
}
#[test]
fn validate_accepts_distinct_domains() {
let c = RouterConfig {
name: "r".into(),
routes: vec![route("a", 0, 1)],
};
assert!(c.validate().is_ok());
}
#[test]
fn validate_rejects_duplicate_names() {
let c = RouterConfig {
name: "r".into(),
routes: vec![route("a", 0, 1), route("a", 1, 2)],
};
assert!(c.validate().is_err());
}
#[test]
fn validate_rejects_self_loop_without_guard() {
let mut r = route("a", 7, 7);
r.loop_guard = false;
let c = RouterConfig {
name: "r".into(),
routes: vec![r],
};
assert!(c.validate().is_err());
}
#[test]
fn validate_rejects_keyedness_mismatch() {
let mut r = route("a", 0, 1);
r.input.keyed = true;
let c = RouterConfig {
name: "r".into(),
routes: vec![r],
};
assert!(c.validate().is_err());
}
#[test]
fn json_roundtrip() {
let c = RouterConfig {
name: "r".into(),
routes: vec![route("a", 0, 1)],
};
let js = c.to_json().unwrap();
let back = RouterConfig::from_json(&js).unwrap();
assert_eq!(c, back);
}
#[test]
fn durability_maps_to_qos() {
assert_eq!(
zerodds_qos::DurabilityKind::from(Durability::Transient),
zerodds_qos::DurabilityKind::Transient
);
}
}