use std::fmt;
use live_data::{
FeedDescriptor, FilterExpr, FormatPreference, SubscriptionDescriptor, TransportTag,
};
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ValidationError {
DuplicateFeedName(String),
EmptySchema(String),
EmptyTransports(String),
EmptyFormats(String),
EventTimeKeyNotInSchema { feed: String, key: String },
UnknownFeed(String),
UnknownColumn { feed: String, column: String },
FilterNotSupported { feed: String },
SamplingNotSupported { feed: String },
TransportNotSupported { feed: String, available: Vec<TransportTag> },
FormatNotSupported {
feed: String,
requested: FormatPreference,
available: Vec<FormatPreference>,
},
}
impl fmt::Display for ValidationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DuplicateFeedName(n) => write!(f, "duplicate feed name '{n}'"),
Self::EmptySchema(n) => write!(f, "feed '{n}' has empty schema"),
Self::EmptyTransports(n) => write!(f, "feed '{n}' has no transports declared"),
Self::EmptyFormats(n) => write!(f, "feed '{n}' has no formats declared"),
Self::EventTimeKeyNotInSchema { feed, key } => {
write!(f, "feed '{feed}' event_time_key '{key}' not in schema")
}
Self::UnknownFeed(n) => write!(f, "unknown feed '{n}'"),
Self::UnknownColumn { feed, column } => {
write!(f, "feed '{feed}' has no column '{column}'")
}
Self::FilterNotSupported { feed } => {
write!(f, "feed '{feed}' does not support filtering")
}
Self::SamplingNotSupported { feed } => {
write!(f, "feed '{feed}' does not support sampling")
}
Self::TransportNotSupported { feed, available } => write!(
f,
"feed '{feed}' does not support requested transport; available: {available:?}"
),
Self::FormatNotSupported { feed, requested, available } => write!(
f,
"feed '{feed}' does not support format {requested:?}; available: {available:?}"
),
}
}
}
impl std::error::Error for ValidationError {}
pub fn validate_descriptor(d: &FeedDescriptor) -> Result<(), ValidationError> {
if d.schema.fields.is_empty() {
return Err(ValidationError::EmptySchema(d.name.clone()));
}
if d.transports.is_empty() {
return Err(ValidationError::EmptyTransports(d.name.clone()));
}
if d.formats.is_empty() {
return Err(ValidationError::EmptyFormats(d.name.clone()));
}
if let Some(key) = &d.event_time_key {
if !d.schema.fields.iter().any(|f| &f.name == key) {
return Err(ValidationError::EventTimeKeyNotInSchema {
feed: d.name.clone(),
key: key.clone(),
});
}
}
Ok(())
}
pub fn negotiate_subscription(
descriptor: &FeedDescriptor,
req: &SubscriptionDescriptor,
) -> Result<NegotiatedSubscription, ValidationError> {
if let Some(cols) = &req.columns {
for c in cols {
if !descriptor.schema.fields.iter().any(|f| &f.name == c) {
return Err(ValidationError::UnknownColumn {
feed: descriptor.name.clone(),
column: c.clone(),
});
}
}
}
if !matches!(req.filter, FilterExpr::Empty) && !descriptor.capabilities.can_filter {
return Err(ValidationError::FilterNotSupported { feed: descriptor.name.clone() });
}
if req.sampling.is_some() && !descriptor.capabilities.can_sample {
return Err(ValidationError::SamplingNotSupported { feed: descriptor.name.clone() });
}
let transport = if req.transport_pref.is_any() {
*descriptor
.transports
.first()
.ok_or_else(|| ValidationError::EmptyTransports(descriptor.name.clone()))?
} else {
*req.transport_pref
.iter()
.find(|t| descriptor.transports.contains(*t))
.ok_or_else(|| ValidationError::TransportNotSupported {
feed: descriptor.name.clone(),
available: descriptor.transports.clone(),
})?
};
let format = match req.format_pref {
Some(f) => {
if !descriptor.formats.contains(&f) {
return Err(ValidationError::FormatNotSupported {
feed: descriptor.name.clone(),
requested: f,
available: descriptor.formats.clone(),
});
}
f
}
None => *descriptor
.formats
.first()
.ok_or_else(|| ValidationError::EmptyFormats(descriptor.name.clone()))?,
};
Ok(NegotiatedSubscription { transport, format, columns: req.columns.clone() })
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NegotiatedSubscription {
pub transport: TransportTag,
pub format: FormatPreference,
pub columns: Option<Vec<String>>,
}