live-feed 0.1.0

Publisher SDK for advertising and serving live data feeds. Consumers use the live-stream crate.
Documentation
//! Validation for descriptors and subscription requests.

use std::fmt;

use live_data::{
    FeedDescriptor, FilterExpr, FormatPreference, SubscriptionDescriptor, TransportTag,
};

/// Reasons a descriptor or subscription request is refused.
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ValidationError {
    DuplicateFeedName(String),
    EmptySchema(String),
    EmptyTransports(String),
    EmptyFormats(String),
    EventTimeKeyNotInSchema { feed: String, key: String },
    UnknownFeed(String),
    UnknownColumn { feed: String, column: String },
    FilterNotSupported { feed: String },
    SamplingNotSupported { feed: String },
    TransportNotSupported { feed: String, available: Vec<TransportTag> },
    FormatNotSupported {
        feed: String,
        requested: FormatPreference,
        available: Vec<FormatPreference>,
    },
}

impl fmt::Display for ValidationError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::DuplicateFeedName(n) => write!(f, "duplicate feed name '{n}'"),
            Self::EmptySchema(n) => write!(f, "feed '{n}' has empty schema"),
            Self::EmptyTransports(n) => write!(f, "feed '{n}' has no transports declared"),
            Self::EmptyFormats(n) => write!(f, "feed '{n}' has no formats declared"),
            Self::EventTimeKeyNotInSchema { feed, key } => {
                write!(f, "feed '{feed}' event_time_key '{key}' not in schema")
            }
            Self::UnknownFeed(n) => write!(f, "unknown feed '{n}'"),
            Self::UnknownColumn { feed, column } => {
                write!(f, "feed '{feed}' has no column '{column}'")
            }
            Self::FilterNotSupported { feed } => {
                write!(f, "feed '{feed}' does not support filtering")
            }
            Self::SamplingNotSupported { feed } => {
                write!(f, "feed '{feed}' does not support sampling")
            }
            Self::TransportNotSupported { feed, available } => write!(
                f,
                "feed '{feed}' does not support requested transport; available: {available:?}"
            ),
            Self::FormatNotSupported { feed, requested, available } => write!(
                f,
                "feed '{feed}' does not support format {requested:?}; available: {available:?}"
            ),
        }
    }
}

impl std::error::Error for ValidationError {}

/// Validate a descriptor in isolation: schema non-empty, at least one
/// transport and one format advertised, and the declared
/// `event_time_key` (when set) names an actual column.
pub fn validate_descriptor(d: &FeedDescriptor) -> Result<(), ValidationError> {
    if d.schema.fields.is_empty() {
        return Err(ValidationError::EmptySchema(d.name.clone()));
    }
    if d.transports.is_empty() {
        return Err(ValidationError::EmptyTransports(d.name.clone()));
    }
    if d.formats.is_empty() {
        return Err(ValidationError::EmptyFormats(d.name.clone()));
    }
    if let Some(key) = &d.event_time_key {
        if !d.schema.fields.iter().any(|f| &f.name == key) {
            return Err(ValidationError::EventTimeKeyNotInSchema {
                feed: d.name.clone(),
                key: key.clone(),
            });
        }
    }
    Ok(())
}

/// Negotiate a subscription against a feed descriptor's advertised
/// capabilities. Returns the chosen transport, format, and column
/// projection on success; the caller pairs these with an endpoint to
/// form a [`SubscribeAck`](live_data::SubscribeAck).
///
/// Validation order:
/// 1. Columns - every requested column must exist in the schema.
/// 2. Filter - non-empty filter requires `Capabilities.can_filter`.
/// 3. Sampling - requires `Capabilities.can_sample`.
/// 4. Transport - at least one preferred transport must intersect
///    with what the feed advertises; an empty preference defaults to
///    the feed's first transport.
/// 5. Format - if specified, must be advertised; otherwise the feed's
///    first format wins.
pub fn negotiate_subscription(
    descriptor: &FeedDescriptor,
    req: &SubscriptionDescriptor,
) -> Result<NegotiatedSubscription, ValidationError> {
    if let Some(cols) = &req.columns {
        for c in cols {
            if !descriptor.schema.fields.iter().any(|f| &f.name == c) {
                return Err(ValidationError::UnknownColumn {
                    feed: descriptor.name.clone(),
                    column: c.clone(),
                });
            }
        }
    }
    if !matches!(req.filter, FilterExpr::Empty) && !descriptor.capabilities.can_filter {
        return Err(ValidationError::FilterNotSupported { feed: descriptor.name.clone() });
    }
    if req.sampling.is_some() && !descriptor.capabilities.can_sample {
        return Err(ValidationError::SamplingNotSupported { feed: descriptor.name.clone() });
    }
    let transport = if req.transport_pref.is_any() {
        *descriptor
            .transports
            .first()
            .ok_or_else(|| ValidationError::EmptyTransports(descriptor.name.clone()))?
    } else {
        *req.transport_pref
            .iter()
            .find(|t| descriptor.transports.contains(*t))
            .ok_or_else(|| ValidationError::TransportNotSupported {
                feed: descriptor.name.clone(),
                available: descriptor.transports.clone(),
            })?
    };
    let format = match req.format_pref {
        Some(f) => {
            if !descriptor.formats.contains(&f) {
                return Err(ValidationError::FormatNotSupported {
                    feed: descriptor.name.clone(),
                    requested: f,
                    available: descriptor.formats.clone(),
                });
            }
            f
        }
        None => *descriptor
            .formats
            .first()
            .ok_or_else(|| ValidationError::EmptyFormats(descriptor.name.clone()))?,
    };
    Ok(NegotiatedSubscription { transport, format, columns: req.columns.clone() })
}

/// Result of a successful subscription negotiation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NegotiatedSubscription {
    pub transport: TransportTag,
    pub format: FormatPreference,
    pub columns: Option<Vec<String>>,
}