1use std::fmt;
4
5use live_data::{
6 FeedDescriptor, FilterExpr, FormatPreference, SubscriptionDescriptor, TransportTag,
7};
8
9#[derive(Debug, Clone, PartialEq, Eq)]
11#[non_exhaustive]
12pub enum ValidationError {
13 DuplicateFeedName(String),
14 EmptySchema(String),
15 EmptyTransports(String),
16 EmptyFormats(String),
17 EventTimeKeyNotInSchema { feed: String, key: String },
18 UnknownFeed(String),
19 UnknownColumn { feed: String, column: String },
20 FilterNotSupported { feed: String },
21 SamplingNotSupported { feed: String },
22 TransportNotSupported { feed: String, available: Vec<TransportTag> },
23 FormatNotSupported {
24 feed: String,
25 requested: FormatPreference,
26 available: Vec<FormatPreference>,
27 },
28}
29
30impl fmt::Display for ValidationError {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 match self {
33 Self::DuplicateFeedName(n) => write!(f, "duplicate feed name '{n}'"),
34 Self::EmptySchema(n) => write!(f, "feed '{n}' has empty schema"),
35 Self::EmptyTransports(n) => write!(f, "feed '{n}' has no transports declared"),
36 Self::EmptyFormats(n) => write!(f, "feed '{n}' has no formats declared"),
37 Self::EventTimeKeyNotInSchema { feed, key } => {
38 write!(f, "feed '{feed}' event_time_key '{key}' not in schema")
39 }
40 Self::UnknownFeed(n) => write!(f, "unknown feed '{n}'"),
41 Self::UnknownColumn { feed, column } => {
42 write!(f, "feed '{feed}' has no column '{column}'")
43 }
44 Self::FilterNotSupported { feed } => {
45 write!(f, "feed '{feed}' does not support filtering")
46 }
47 Self::SamplingNotSupported { feed } => {
48 write!(f, "feed '{feed}' does not support sampling")
49 }
50 Self::TransportNotSupported { feed, available } => write!(
51 f,
52 "feed '{feed}' does not support requested transport; available: {available:?}"
53 ),
54 Self::FormatNotSupported { feed, requested, available } => write!(
55 f,
56 "feed '{feed}' does not support format {requested:?}; available: {available:?}"
57 ),
58 }
59 }
60}
61
62impl std::error::Error for ValidationError {}
63
64pub fn validate_descriptor(d: &FeedDescriptor) -> Result<(), ValidationError> {
68 if d.schema.fields.is_empty() {
69 return Err(ValidationError::EmptySchema(d.name.clone()));
70 }
71 if d.transports.is_empty() {
72 return Err(ValidationError::EmptyTransports(d.name.clone()));
73 }
74 if d.formats.is_empty() {
75 return Err(ValidationError::EmptyFormats(d.name.clone()));
76 }
77 if let Some(key) = &d.event_time_key {
78 if !d.schema.fields.iter().any(|f| &f.name == key) {
79 return Err(ValidationError::EventTimeKeyNotInSchema {
80 feed: d.name.clone(),
81 key: key.clone(),
82 });
83 }
84 }
85 Ok(())
86}
87
88pub fn negotiate_subscription(
103 descriptor: &FeedDescriptor,
104 req: &SubscriptionDescriptor,
105) -> Result<NegotiatedSubscription, ValidationError> {
106 if let Some(cols) = &req.columns {
107 for c in cols {
108 if !descriptor.schema.fields.iter().any(|f| &f.name == c) {
109 return Err(ValidationError::UnknownColumn {
110 feed: descriptor.name.clone(),
111 column: c.clone(),
112 });
113 }
114 }
115 }
116 if !matches!(req.filter, FilterExpr::Empty) && !descriptor.capabilities.can_filter {
117 return Err(ValidationError::FilterNotSupported { feed: descriptor.name.clone() });
118 }
119 if req.sampling.is_some() && !descriptor.capabilities.can_sample {
120 return Err(ValidationError::SamplingNotSupported { feed: descriptor.name.clone() });
121 }
122 let transport = if req.transport_pref.is_any() {
123 *descriptor
124 .transports
125 .first()
126 .ok_or_else(|| ValidationError::EmptyTransports(descriptor.name.clone()))?
127 } else {
128 *req.transport_pref
129 .iter()
130 .find(|t| descriptor.transports.contains(*t))
131 .ok_or_else(|| ValidationError::TransportNotSupported {
132 feed: descriptor.name.clone(),
133 available: descriptor.transports.clone(),
134 })?
135 };
136 let format = match req.format_pref {
137 Some(f) => {
138 if !descriptor.formats.contains(&f) {
139 return Err(ValidationError::FormatNotSupported {
140 feed: descriptor.name.clone(),
141 requested: f,
142 available: descriptor.formats.clone(),
143 });
144 }
145 f
146 }
147 None => *descriptor
148 .formats
149 .first()
150 .ok_or_else(|| ValidationError::EmptyFormats(descriptor.name.clone()))?,
151 };
152 Ok(NegotiatedSubscription { transport, format, columns: req.columns.clone() })
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct NegotiatedSubscription {
158 pub transport: TransportTag,
159 pub format: FormatPreference,
160 pub columns: Option<Vec<String>>,
161}