use std::{fmt::Display, time::Duration};
use cfg_if::cfg_if;
use serde::{Deserialize, Serialize};
use crate::interface::{
DatabaseRetention as InterfaceDatabaseRetention, Retention as InterfaceRetention,
};
#[non_exhaustive]
#[derive(thiserror::Error, Debug)]
pub enum SchemaError {
#[error("expiry cannot be negative {0}")]
NegativeExpiry(i64),
#[error("database retention ttl cannot be negative {0}")]
NegativeDatabaseRetentionTtl(i64),
#[error("database retention ttl must be greater than 60s, instead of {0}")]
DatabaseRetentionTtlTooLow(u64),
#[error("database retention ttl is missing, but policy is use_ttl")]
MissingDatabaseRetentionTtl,
#[cfg(feature = "strict")]
#[cfg_attr(docsrs, doc(cfg(feature = "strict")))]
#[error("database_retention_ttl is set to {0}, but database_retention_policy is no_ttl")]
DatabaseRetentionTtlWithNoTtl(i64),
#[cfg(feature = "strict")]
#[cfg_attr(docsrs, doc(cfg(feature = "strict")))]
#[error("expiry is set to {0}, but retention is discard")]
ExpiryWithDiscard(i64),
}
fn is_none_or_empty<T>(value: &Option<T>) -> bool
where
T: AsRef<str>,
{
match value {
Some(value) => value.as_ref().is_empty(),
None => true,
}
}
fn is_none_or_default<T>(value: &Option<T>) -> bool
where
T: Default + Eq,
{
match value {
Some(value) => *value == T::default(),
None => true,
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[cfg_attr(feature = "strict", serde(deny_unknown_fields))]
pub struct InterfaceJson<T>
where
T: AsRef<str>,
{
pub interface_name: T,
pub version_major: i32,
pub version_minor: i32,
#[serde(rename = "type")]
pub interface_type: InterfaceType,
pub ownership: Ownership,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub aggregation: Option<Aggregation>,
#[serde(default, skip_serializing_if = "is_none_or_empty")]
pub description: Option<T>,
#[serde(default, skip_serializing_if = "is_none_or_empty")]
pub doc: Option<T>,
pub mappings: Vec<Mapping<T>>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(feature = "strict", serde(deny_unknown_fields))]
pub struct Mapping<T>
where
T: AsRef<str>,
{
pub endpoint: T,
#[serde(rename = "type")]
pub mapping_type: MappingType,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub reliability: Option<Reliability>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub explicit_timestamp: Option<bool>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub retention: Option<Retention>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub expiry: Option<i64>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub database_retention_policy: Option<DatabaseRetentionPolicy>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub database_retention_ttl: Option<i64>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub allow_unset: Option<bool>,
#[serde(default, skip_serializing_if = "is_none_or_default")]
pub required: Option<bool>,
#[serde(default, skip_serializing_if = "is_none_or_empty")]
pub description: Option<T>,
#[serde(default, skip_serializing_if = "is_none_or_empty")]
pub doc: Option<T>,
}
impl<T> Mapping<T>
where
T: AsRef<str>,
{
pub fn expiry_as_duration(&self) -> Result<Option<Duration>, SchemaError> {
self.expiry
.filter(|expiry| *expiry != 0)
.map(|expiry| {
u64::try_from(expiry)
.map(Duration::from_secs)
.map_err(|_| SchemaError::NegativeExpiry(expiry))
})
.transpose()
}
pub fn retention_with_expiry(&self) -> Result<InterfaceRetention, SchemaError> {
let retention = match self.retention.unwrap_or_default() {
Retention::Discard => {
if let Some(expiry) = self.expiry.filter(|exp| *exp > 0) {
cfg_if! {
if #[cfg(feature = "strict")] {
return Err(SchemaError::ExpiryWithDiscard(expiry));
} else {
tracing::warn!(expiry, "discard retention policy with expiry set, ignoring expiry");
}
}
}
InterfaceRetention::Discard
}
Retention::Volatile => InterfaceRetention::Volatile {
expiry: self.expiry_as_duration()?,
},
Retention::Stored => InterfaceRetention::Stored {
expiry: self.expiry_as_duration()?,
},
};
Ok(retention)
}
pub fn database_retention_ttl_as_duration(&self) -> Result<Option<Duration>, SchemaError> {
let Some(ttl) = self.database_retention_ttl else {
return Ok(None);
};
let ttl = u64::try_from(ttl).map_err(|_| SchemaError::NegativeDatabaseRetentionTtl(ttl))?;
if ttl < 60 {
return Err(SchemaError::DatabaseRetentionTtlTooLow(ttl));
}
Ok(Some(Duration::from_secs(ttl)))
}
pub fn database_retention_with_ttl(&self) -> Result<InterfaceDatabaseRetention, SchemaError> {
match self.database_retention_policy.unwrap_or_default() {
DatabaseRetentionPolicy::NoTtl => {
if let Some(ttl) = self.database_retention_ttl.filter(|ttl| *ttl > 0) {
cfg_if! {
if #[cfg(feature = "strict")] {
return Err(SchemaError::DatabaseRetentionTtlWithNoTtl(ttl))
} else {
tracing::warn!(ttl, "no_ttl retention policy with ttl set, ignoring ttl");
}
}
}
Ok(InterfaceDatabaseRetention::NoTtl)
}
DatabaseRetentionPolicy::UseTtl => {
let ttl = self
.database_retention_ttl_as_duration()
.and_then(|opt| opt.ok_or(SchemaError::MissingDatabaseRetentionTtl))?;
Ok(InterfaceDatabaseRetention::UseTtl { ttl })
}
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum InterfaceType {
Datastream,
Properties,
}
impl Display for InterfaceType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InterfaceType::Datastream => write!(f, "datastream"),
InterfaceType::Properties => write!(f, "properties"),
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Debug, Copy, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Ownership {
Device,
Server,
}
impl Ownership {
#[must_use]
pub fn is_device(&self) -> bool {
matches!(self, Self::Device)
}
#[must_use]
pub fn is_server(&self) -> bool {
matches!(self, Self::Server)
}
}
impl Display for Ownership {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Ownership::Device => write!(f, "device"),
Ownership::Server => write!(f, "server"),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy, Default)]
#[serde(rename_all = "snake_case")]
pub enum Aggregation {
#[default]
Individual,
Object,
}
impl Display for Aggregation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Aggregation::Individual => write!(f, "individual"),
Aggregation::Object => write!(f, "object"),
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Clone, Copy)]
#[serde(rename_all = "lowercase")]
pub enum MappingType {
Double,
Integer,
Boolean,
LongInteger,
String,
BinaryBlob,
DateTime,
DoubleArray,
IntegerArray,
BooleanArray,
LongIntegerArray,
StringArray,
BinaryBlobArray,
DateTimeArray,
}
impl Display for MappingType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MappingType::Double => write!(f, "double"),
MappingType::Integer => write!(f, "integer"),
MappingType::Boolean => write!(f, "boolean"),
MappingType::LongInteger => write!(f, "longinteger"),
MappingType::String => write!(f, "string"),
MappingType::BinaryBlob => write!(f, "binaryblob"),
MappingType::DateTime => write!(f, "datetime"),
MappingType::DoubleArray => write!(f, "doublearray"),
MappingType::IntegerArray => write!(f, "integerarray"),
MappingType::BooleanArray => write!(f, "booleanarray"),
MappingType::LongIntegerArray => write!(f, "longintegerarray"),
MappingType::StringArray => write!(f, "stringarray"),
MappingType::BinaryBlobArray => write!(f, "binaryblobarray"),
MappingType::DateTimeArray => write!(f, "datetimearray"),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Copy, Clone, Default, PartialOrd, Ord)]
#[serde(rename_all = "snake_case")]
pub enum Reliability {
#[default]
Unreliable,
Guaranteed,
Unique,
}
impl Reliability {
#[must_use]
pub fn is_unreliable(&self) -> bool {
matches!(self, Self::Unreliable)
}
#[must_use]
pub fn is_guaranteed(&self) -> bool {
matches!(self, Self::Guaranteed)
}
#[must_use]
pub fn is_unique(&self) -> bool {
matches!(self, Self::Unique)
}
}
impl Display for Reliability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Reliability::Unreliable => write!(f, "unreliable"),
Reliability::Guaranteed => write!(f, "guaranteed"),
Reliability::Unique => write!(f, "unique"),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Copy, Clone, Default)]
#[serde(rename_all = "snake_case")]
pub enum Retention {
#[default]
Discard,
Volatile,
Stored,
}
impl Display for Retention {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Retention::Discard => write!(f, "discard"),
Retention::Volatile => write!(f, "volatile"),
Retention::Stored => write!(f, "stored"),
}
}
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Copy, Clone, Default)]
#[serde(rename_all = "snake_case")]
pub enum DatabaseRetentionPolicy {
#[default]
NoTtl,
UseTtl,
}
impl Display for DatabaseRetentionPolicy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DatabaseRetentionPolicy::NoTtl => write!(f, "no_ttl"),
DatabaseRetentionPolicy::UseTtl => write!(f, "use_ttl"),
}
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use super::*;
#[cfg(feature = "strict")]
#[test]
fn should_be_strict() {
let json = r#"{
"interfaceS_name": "org.astarte-platform.genericproperties.Values",
"version_major": 1,
"version_minor": 0,
"type": "properties",
"ownArship": "server",
"description": "Interface description \"escaped\"",
"doc": "Interface doc \"escaped\"",
"mappings": [{
"endpoint": "/double_endpoint",
"type": "double",
"doc": "Mapping doc \"escaped\""
}]
}"#;
serde_json::from_str::<InterfaceJson<String>>(json)
.expect_err("should error for misspelled fields");
}
#[test]
fn should_get_expiry() {
let json = |expiry: i64| {
format!(
r#"{{
"interface_name": "org.astarte-platform.genericproperties.Values",
"version_major": 1,
"version_minor": 0,
"type": "properties",
"ownership": "server",
"mappings": [{{
"endpoint": "/double_endpoint",
"expiry": {expiry},
"type": "double"
}}]
}}"#
)
};
let i = serde_json::from_str::<InterfaceJson<String>>(&json(10)).unwrap();
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.expiry, Some(10));
assert_eq!(
mapping.expiry_as_duration().unwrap(),
Some(Duration::from_secs(10))
);
let i: InterfaceJson<String> = serde_json::from_str(&json(-42)).unwrap();
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.expiry, Some(-42));
assert!(matches!(
mapping.expiry_as_duration().unwrap_err(),
SchemaError::NegativeExpiry(-42)
));
let i: InterfaceJson<String> = serde_json::from_str(&json(0)).unwrap();
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.expiry, Some(0));
assert_eq!(mapping.expiry_as_duration().unwrap(), None);
let i: InterfaceJson<String> = serde_json::from_str(&json(1)).unwrap();
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.expiry, Some(1));
assert_eq!(
mapping.expiry_as_duration().unwrap(),
Some(Duration::from_secs(1))
);
}
#[test]
fn should_get_retention() {
let json = |ttl: i64| {
serde_json::from_str::<InterfaceJson<String>>(&format!(
r#"{{
"interface_name": "org.astarte-platform.genericproperties.Values",
"version_major": 1,
"version_minor": 0,
"type": "properties",
"ownership": "server",
"mappings": [{{
"endpoint": "/double_endpoint",
"database_retention_policy": "use_ttl",
"database_retention_ttl": {ttl},
"type": "double"
}}]
}}"#
))
.unwrap()
};
let i = json(60);
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.database_retention_ttl, Some(60));
assert_eq!(
mapping.database_retention_with_ttl().unwrap(),
InterfaceDatabaseRetention::UseTtl {
ttl: Duration::from_secs(60)
}
);
let i = json(0);
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.database_retention_ttl, Some(0));
assert!(matches!(
mapping.database_retention_with_ttl().unwrap_err(),
SchemaError::DatabaseRetentionTtlTooLow(0)
));
let i = json(-32);
let mapping = i.mappings.first().unwrap();
assert_eq!(mapping.database_retention_ttl, Some(-32));
assert!(matches!(
mapping.database_retention_with_ttl().unwrap_err(),
SchemaError::NegativeDatabaseRetentionTtl(-32)
));
}
#[test]
fn retention_and_expiry() {
let mut mapping = Mapping {
endpoint: "/some/path",
mapping_type: MappingType::Boolean,
reliability: None,
explicit_timestamp: None,
retention: Some(Retention::Discard),
expiry: Some(420),
database_retention_policy: None,
database_retention_ttl: None,
allow_unset: None,
required: None,
description: None,
doc: None,
};
cfg_if! {
if #[cfg(feature = "strict")] {
let err = mapping.retention_with_expiry().unwrap_err();
assert!(matches!(err, SchemaError::ExpiryWithDiscard(420)));
} else {
let retention = mapping.retention_with_expiry().unwrap();
assert_eq!(retention, InterfaceRetention::Discard);
}
}
mapping.retention = Some(Retention::Volatile);
let exp = InterfaceRetention::Volatile {
expiry: Some(Duration::from_secs(420)),
};
assert_eq!(mapping.retention_with_expiry().unwrap(), exp);
mapping.retention = Some(Retention::Stored);
let exp = InterfaceRetention::Stored {
expiry: Some(Duration::from_secs(420)),
};
assert_eq!(mapping.retention_with_expiry().unwrap(), exp);
mapping.expiry = None;
let exp = InterfaceRetention::Stored { expiry: None };
assert_eq!(mapping.retention_with_expiry().unwrap(), exp);
}
#[test]
fn database_retention_ttl() {
let mut mapping = Mapping {
endpoint: "/some/path",
mapping_type: MappingType::Boolean,
reliability: None,
explicit_timestamp: None,
retention: None,
expiry: None,
database_retention_policy: Some(DatabaseRetentionPolicy::NoTtl),
database_retention_ttl: Some(420),
allow_unset: None,
required: None,
description: None,
doc: None,
};
cfg_if! {
if #[cfg(feature = "strict")] {
let err = mapping.database_retention_with_ttl().unwrap_err();
assert!(matches!(err, SchemaError::DatabaseRetentionTtlWithNoTtl(420)));
} else {
let retention = mapping.database_retention_with_ttl().unwrap();
assert_eq!(retention, InterfaceDatabaseRetention::NoTtl);
}
}
mapping.database_retention_policy = Some(DatabaseRetentionPolicy::UseTtl);
let exp = InterfaceDatabaseRetention::UseTtl {
ttl: Duration::from_secs(420),
};
assert_eq!(mapping.database_retention_with_ttl().unwrap(), exp);
mapping.database_retention_ttl = None;
assert_eq!(mapping.database_retention_ttl_as_duration().unwrap(), None);
}
#[test]
fn interface_type_functions() {
assert_eq!(InterfaceType::Datastream.to_string(), "datastream");
assert_eq!(InterfaceType::Properties.to_string(), "properties");
}
#[test]
fn ownership_functions() {
assert_eq!(Ownership::Device.to_string(), "device");
assert_eq!(Ownership::Server.to_string(), "server");
assert!(Ownership::Server.is_server());
assert!(!Ownership::Device.is_server());
assert!(Ownership::Device.is_device());
assert!(!Ownership::Server.is_device());
}
#[test]
fn aggregation_functions() {
assert_eq!(Aggregation::Individual.to_string(), "individual");
assert_eq!(Aggregation::Object.to_string(), "object");
}
#[test]
fn mapping_type_functions() {
assert_eq!(MappingType::Double.to_string(), "double");
assert_eq!(MappingType::Integer.to_string(), "integer");
assert_eq!(MappingType::Boolean.to_string(), "boolean");
assert_eq!(MappingType::LongInteger.to_string(), "longinteger");
assert_eq!(MappingType::String.to_string(), "string");
assert_eq!(MappingType::BinaryBlob.to_string(), "binaryblob");
assert_eq!(MappingType::DateTime.to_string(), "datetime");
assert_eq!(MappingType::DoubleArray.to_string(), "doublearray");
assert_eq!(MappingType::IntegerArray.to_string(), "integerarray");
assert_eq!(MappingType::BooleanArray.to_string(), "booleanarray");
assert_eq!(
MappingType::LongIntegerArray.to_string(),
"longintegerarray"
);
assert_eq!(MappingType::StringArray.to_string(), "stringarray");
assert_eq!(MappingType::BinaryBlobArray.to_string(), "binaryblobarray");
assert_eq!(MappingType::DateTimeArray.to_string(), "datetimearray");
}
#[test]
fn reliability_functions() {
assert_eq!(Reliability::Unreliable.to_string(), "unreliable");
assert_eq!(Reliability::Guaranteed.to_string(), "guaranteed");
assert_eq!(Reliability::Unique.to_string(), "unique");
assert!(Reliability::Unreliable.is_unreliable());
assert!(Reliability::Guaranteed.is_guaranteed());
assert!(Reliability::Unique.is_unique());
}
#[test]
fn retention_functions() {
assert_eq!(Retention::Discard.to_string(), "discard");
assert_eq!(Retention::Volatile.to_string(), "volatile");
assert_eq!(Retention::Stored.to_string(), "stored");
}
#[test]
fn database_retention_policy_functions() {
assert_eq!(DatabaseRetentionPolicy::NoTtl.to_string(), "no_ttl");
assert_eq!(DatabaseRetentionPolicy::UseTtl.to_string(), "use_ttl");
}
}