Skip to main content

live_feed/
validate.rs

1//! Validation for descriptors and subscription requests.
2
3use std::fmt;
4
5use live_data::{
6    FeedDescriptor, FilterExpr, FormatPreference, SubscriptionDescriptor, TransportTag,
7};
8
9/// Reasons a descriptor or subscription request is refused.
10#[derive(Debug, Clone, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum ValidationError {
13    DuplicateFeedName(String),
14    EmptySchema(String),
15    EmptyTransports(String),
16    EmptyFormats(String),
17    EventTimeKeyNotInSchema { feed: String, key: String },
18    UnknownFeed(String),
19    UnknownColumn { feed: String, column: String },
20    FilterNotSupported { feed: String },
21    SamplingNotSupported { feed: String },
22    TransportNotSupported { feed: String, available: Vec<TransportTag> },
23    FormatNotSupported {
24        feed: String,
25        requested: FormatPreference,
26        available: Vec<FormatPreference>,
27    },
28}
29
30impl fmt::Display for ValidationError {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        match self {
33            Self::DuplicateFeedName(n) => write!(f, "duplicate feed name '{n}'"),
34            Self::EmptySchema(n) => write!(f, "feed '{n}' has empty schema"),
35            Self::EmptyTransports(n) => write!(f, "feed '{n}' has no transports declared"),
36            Self::EmptyFormats(n) => write!(f, "feed '{n}' has no formats declared"),
37            Self::EventTimeKeyNotInSchema { feed, key } => {
38                write!(f, "feed '{feed}' event_time_key '{key}' not in schema")
39            }
40            Self::UnknownFeed(n) => write!(f, "unknown feed '{n}'"),
41            Self::UnknownColumn { feed, column } => {
42                write!(f, "feed '{feed}' has no column '{column}'")
43            }
44            Self::FilterNotSupported { feed } => {
45                write!(f, "feed '{feed}' does not support filtering")
46            }
47            Self::SamplingNotSupported { feed } => {
48                write!(f, "feed '{feed}' does not support sampling")
49            }
50            Self::TransportNotSupported { feed, available } => write!(
51                f,
52                "feed '{feed}' does not support requested transport; available: {available:?}"
53            ),
54            Self::FormatNotSupported { feed, requested, available } => write!(
55                f,
56                "feed '{feed}' does not support format {requested:?}; available: {available:?}"
57            ),
58        }
59    }
60}
61
62impl std::error::Error for ValidationError {}
63
64/// Validate a descriptor in isolation: schema non-empty, at least one
65/// transport and one format advertised, and the declared
66/// `event_time_key` (when set) names an actual column.
67pub fn validate_descriptor(d: &FeedDescriptor) -> Result<(), ValidationError> {
68    if d.schema.fields.is_empty() {
69        return Err(ValidationError::EmptySchema(d.name.clone()));
70    }
71    if d.transports.is_empty() {
72        return Err(ValidationError::EmptyTransports(d.name.clone()));
73    }
74    if d.formats.is_empty() {
75        return Err(ValidationError::EmptyFormats(d.name.clone()));
76    }
77    if let Some(key) = &d.event_time_key {
78        if !d.schema.fields.iter().any(|f| &f.name == key) {
79            return Err(ValidationError::EventTimeKeyNotInSchema {
80                feed: d.name.clone(),
81                key: key.clone(),
82            });
83        }
84    }
85    Ok(())
86}
87
88/// Negotiate a subscription against a feed descriptor's advertised
89/// capabilities. Returns the chosen transport, format, and column
90/// projection on success; the caller pairs these with an endpoint to
91/// form a [`SubscribeAck`](live_data::SubscribeAck).
92///
93/// Validation order:
94/// 1. Columns - every requested column must exist in the schema.
95/// 2. Filter - non-empty filter requires `Capabilities.can_filter`.
96/// 3. Sampling - requires `Capabilities.can_sample`.
97/// 4. Transport - at least one preferred transport must intersect
98///    with what the feed advertises; an empty preference defaults to
99///    the feed's first transport.
100/// 5. Format - if specified, must be advertised; otherwise the feed's
101///    first format wins.
102pub fn negotiate_subscription(
103    descriptor: &FeedDescriptor,
104    req: &SubscriptionDescriptor,
105) -> Result<NegotiatedSubscription, ValidationError> {
106    if let Some(cols) = &req.columns {
107        for c in cols {
108            if !descriptor.schema.fields.iter().any(|f| &f.name == c) {
109                return Err(ValidationError::UnknownColumn {
110                    feed: descriptor.name.clone(),
111                    column: c.clone(),
112                });
113            }
114        }
115    }
116    if !matches!(req.filter, FilterExpr::Empty) && !descriptor.capabilities.can_filter {
117        return Err(ValidationError::FilterNotSupported { feed: descriptor.name.clone() });
118    }
119    if req.sampling.is_some() && !descriptor.capabilities.can_sample {
120        return Err(ValidationError::SamplingNotSupported { feed: descriptor.name.clone() });
121    }
122    let transport = if req.transport_pref.is_any() {
123        *descriptor
124            .transports
125            .first()
126            .ok_or_else(|| ValidationError::EmptyTransports(descriptor.name.clone()))?
127    } else {
128        *req.transport_pref
129            .iter()
130            .find(|t| descriptor.transports.contains(*t))
131            .ok_or_else(|| ValidationError::TransportNotSupported {
132                feed: descriptor.name.clone(),
133                available: descriptor.transports.clone(),
134            })?
135    };
136    let format = match req.format_pref {
137        Some(f) => {
138            if !descriptor.formats.contains(&f) {
139                return Err(ValidationError::FormatNotSupported {
140                    feed: descriptor.name.clone(),
141                    requested: f,
142                    available: descriptor.formats.clone(),
143                });
144            }
145            f
146        }
147        None => *descriptor
148            .formats
149            .first()
150            .ok_or_else(|| ValidationError::EmptyFormats(descriptor.name.clone()))?,
151    };
152    Ok(NegotiatedSubscription { transport, format, columns: req.columns.clone() })
153}
154
155/// Result of a successful subscription negotiation.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct NegotiatedSubscription {
158    pub transport: TransportTag,
159    pub format: FormatPreference,
160    pub columns: Option<Vec<String>>,
161}