sdf_metadata/metadata/io/
topic.rs

1use crate::{
2    util::{
3        config_error::{ConfigError, INDENT},
4        sdf_types_map::SdfTypesMap,
5    },
6    wit::{
7        dataflow::{DefaultConfigurations, Topic},
8        io::TypeRef,
9        metadata::{OutputType, SdfKeyValue},
10    },
11};
12
13#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
14pub struct TopicValidationFailure {
15    pub name: String,
16    pub errors: Vec<TopicValidationError>,
17}
18
19impl ConfigError for TopicValidationFailure {
20    fn readable(&self, indents: usize) -> String {
21        let mut result = format!(
22            "{}Topic `{}` is invalid:\n",
23            INDENT.repeat(indents),
24            self.name
25        );
26
27        for error in &self.errors {
28            result.push_str(&error.readable(indents + 1));
29        }
30
31        result
32    }
33}
34
35#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
36pub enum TopicValidationError {
37    InvalidKeyRef(String),
38    InvalidValueRef(String),
39    Name(Vec<TopicNameError>),
40    MissingConverter,
41}
42
43impl ConfigError for TopicValidationError {
44    fn readable(&self, indents: usize) -> String {
45        let indent = INDENT.repeat(indents);
46
47        match self {
48            Self::InvalidKeyRef(key) => {
49                format!(
50                    "{}Referenced key type `{}` not found in config or imported types\n",
51                    indent, key
52                )
53            }
54            Self::InvalidValueRef(value) => {
55                format!(
56                    "{}Referenced type `{}` not found in config or imported types\n",
57                    indent, value
58                )
59            }
60            Self::Name(errors) => {
61                let mut result = format!("{}Topic name is invalid:\n", indent);
62
63                for error in errors {
64                    result.push_str(&error.readable(indents + 1));
65                }
66                result
67            }
68            Self::MissingConverter => {
69                format!(
70                    "{}Topic needs to have a \"converter\" specified for serializing/deserializing records\n",
71                    indent
72                )
73            }
74        }
75    }
76}
77
78#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
79pub enum TopicNameError {
80    Empty,
81    TooLong,
82    InvalidChars,
83    StartsOrEndsWithDash,
84}
85
86impl ConfigError for TopicNameError {
87    fn readable(&self, indents: usize) -> String {
88        let indent = INDENT.repeat(indents);
89
90        match self {
91            Self::Empty => format!("{}Name cannot be empty\n", indent),
92            Self::TooLong => format!(
93                "{}Name is too long, Topic names may only have {MAX_TOPIC_NAME_LEN} characters\n",
94                indent
95            ),
96            Self::InvalidChars => format!(
97                "{}Name may only contain lowercase alphanumeric characters or '-'\n",
98                indent
99            ),
100            Self::StartsOrEndsWithDash => {
101                format!("{}Name cannot start or end with a dash\n", indent)
102            }
103        }
104    }
105}
106
107#[derive(Debug, Clone, Eq, PartialEq)]
108pub struct KVSchemaType {
109    pub key: Option<TypeRef>,
110    pub value: TypeRef,
111}
112
113impl KVSchemaType {
114    pub fn timestamp() -> Self {
115        Self {
116            key: None,
117            value: TypeRef {
118                name: "s64".to_string(),
119            },
120        }
121    }
122}
123
124impl From<(Option<TypeRef>, TypeRef)> for KVSchemaType {
125    fn from((key, value): (Option<TypeRef>, TypeRef)) -> Self {
126        Self { key, value }
127    }
128}
129
130impl From<OutputType> for KVSchemaType {
131    fn from(output_type: OutputType) -> Self {
132        match output_type {
133            OutputType::Ref(r) => Self {
134                key: None,
135                value: r,
136            },
137            OutputType::KeyValue(SdfKeyValue { key, value }) => Self {
138                key: Some(key),
139                value,
140            },
141        }
142    }
143}
144
145const MAX_TOPIC_NAME_LEN: usize = 63;
146
147impl Topic {
148    pub fn validate(
149        &self,
150        default_configs: Option<DefaultConfigurations>,
151        types_map: &SdfTypesMap,
152    ) -> Result<(), TopicValidationFailure> {
153        let mut failure = TopicValidationFailure {
154            name: self.name.clone(),
155            errors: vec![],
156        };
157
158        if let Err(name_errors) = validate_topic_name(&self.name) {
159            failure.errors.push(TopicValidationError::Name(name_errors));
160        }
161
162        if let Some(key) = &self.schema.key {
163            if !types_map.contains_key(&key.type_.name) {
164                // Important! if we extract a ValidationError trait, see if we want to impl a push_str to
165                // to simplify things like this
166
167                failure
168                    .errors
169                    .push(TopicValidationError::InvalidKeyRef(key.type_.name.clone()));
170            }
171        }
172
173        if !types_map.contains_key(&self.schema.value.type_.name) {
174            failure.errors.push(TopicValidationError::InvalidValueRef(
175                self.schema.value.type_.name.clone(),
176            ))
177        }
178
179        if self
180            .schema
181            .value
182            .converter
183            .or(default_configs.and_then(|c| c.converter))
184            .is_none()
185        {
186            failure.errors.push(TopicValidationError::MissingConverter);
187        }
188        if failure.errors.is_empty() {
189            Ok(())
190        } else {
191            Err(failure)
192        }
193    }
194
195    pub fn type_(&self) -> KVSchemaType {
196        (
197            self.schema.key.as_ref().map(|key| key.type_.clone()),
198            self.schema.value.type_.clone(),
199        )
200            .into()
201    }
202}
203
204pub fn validate_topic_name(name: &str) -> Result<(), Vec<TopicNameError>> {
205    let mut errors = vec![];
206
207    if name.is_empty() {
208        errors.push(TopicNameError::Empty);
209    }
210
211    if name.len() > MAX_TOPIC_NAME_LEN {
212        errors.push(TopicNameError::TooLong);
213    }
214
215    if !name
216        .chars()
217        .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '-')
218    {
219        errors.push(TopicNameError::InvalidChars);
220    }
221
222    if name.ends_with('-') || name.starts_with('-') {
223        errors.push(TopicNameError::StartsOrEndsWithDash);
224    }
225
226    if errors.is_empty() {
227        Ok(())
228    } else {
229        Err(errors)
230    }
231}
232
233#[cfg(test)]
234mod test {
235    use crate::wit::io::{SchemaSerDe, TopicSchema, TypeRef, SerdeConverter::Json};
236    use crate::util::config_error::ConfigError;
237
238    use super::*;
239
240    #[test]
241    fn test_validate_topic_name_rejects_long_name() {
242        let name = "a".repeat(MAX_TOPIC_NAME_LEN + 1);
243        let res = validate_topic_name(&name).expect_err("should error for long name");
244
245        assert!(res.contains(&TopicNameError::TooLong));
246
247        assert_eq!(
248            res[0].readable(0),
249            "Name is too long, Topic names may only have 63 characters\n"
250        )
251    }
252
253    #[test]
254    fn test_validate_topic_name_rejects_non_alphanumeric_name() {
255        let name = "invalid-to&pic-name";
256        let res = validate_topic_name(name).expect_err("should error for invalid name");
257
258        assert!(res.contains(&TopicNameError::InvalidChars));
259
260        assert_eq!(
261            res[0].readable(0),
262            "Name may only contain lowercase alphanumeric characters or '-'\n"
263        )
264    }
265
266    #[test]
267    fn test_validate_topic_name_rejects_name_starting_with_dash() {
268        let name = "-invalid-topic-name";
269        let res = validate_topic_name(name).expect_err("should error for invalid name");
270
271        assert!(res.contains(&TopicNameError::StartsOrEndsWithDash));
272        assert_eq!(res[0].readable(0), "Name cannot start or end with a dash\n")
273    }
274
275    #[test]
276    fn test_validate_topic_name_rejects_name_ending_with_dash() {
277        let name = "invalid-topic-name-";
278        let res = validate_topic_name(name).expect_err("should error for invalid name");
279
280        assert!(res.contains(&TopicNameError::StartsOrEndsWithDash));
281        assert_eq!(res[0].readable(0), "Name cannot start or end with a dash\n")
282    }
283
284    #[test]
285    fn test_validate_rejects_invalid_topic_name() {
286        let types_map = SdfTypesMap::default();
287
288        let topic = Topic {
289            name: "invalid-to&pic-name".to_string(),
290            schema: TopicSchema {
291                key: None,
292                value: SchemaSerDe {
293                    converter: Some(Json),
294                    type_: TypeRef {
295                        name: "u8".to_string(),
296                    },
297                },
298            },
299            consumer: None,
300            producer: None,
301            profile: None,
302        };
303
304        let res = topic
305            .validate(None, &types_map)
306            .expect_err("should error for invalid name");
307
308        assert!(res.errors.contains(&TopicValidationError::Name(vec![
309            TopicNameError::InvalidChars
310        ])));
311        assert_eq!(
312            res.readable(0),
313            r#"Topic `invalid-to&pic-name` is invalid:
314    Topic name is invalid:
315        Name may only contain lowercase alphanumeric characters or '-'
316"#
317        )
318    }
319
320    #[test]
321    fn test_validate_rejects_invalid_record_key_datatype() {
322        let types_map = SdfTypesMap::default();
323
324        let topic = Topic {
325            name: "topic-name".to_string(),
326            schema: TopicSchema {
327                key: Some(SchemaSerDe {
328                    converter: Some(Json),
329                    type_: TypeRef {
330                        name: "foobar".to_string(),
331                    },
332                }),
333                value: SchemaSerDe {
334                    converter: Some(Json),
335                    type_: TypeRef {
336                        name: "u8".to_string(),
337                    },
338                },
339            },
340            consumer: None,
341            producer: None,
342            profile: None,
343        };
344
345        let res = topic
346            .validate(None, &types_map)
347            .expect_err("should error for invalid record key type");
348
349        assert!(res
350            .errors
351            .contains(&TopicValidationError::InvalidKeyRef("foobar".to_string())));
352        assert_eq!(
353            res.readable(0),
354            r#"Topic `topic-name` is invalid:
355    Referenced key type `foobar` not found in config or imported types
356"#
357        )
358    }
359
360    #[test]
361    fn test_validate_rejects_invalid_record_value_datatype() {
362        let types_map = SdfTypesMap::default();
363
364        let topic = Topic {
365            name: "topic-name".to_string(),
366            schema: TopicSchema {
367                key: None,
368                value: SchemaSerDe {
369                    converter: Some(Json),
370                    type_: TypeRef {
371                        name: "foobar".to_string(),
372                    },
373                },
374            },
375            consumer: None,
376            producer: None,
377            profile: None,
378        };
379
380        let res = topic
381            .validate(None, &types_map)
382            .expect_err("should error for invalid record type");
383
384        assert!(res
385            .errors
386            .contains(&TopicValidationError::InvalidValueRef("foobar".to_string())));
387        assert_eq!(
388            res.readable(0),
389            r#"Topic `topic-name` is invalid:
390    Referenced type `foobar` not found in config or imported types
391"#
392        )
393    }
394
395    #[test]
396    fn test_validate_rejects_topics_with_missing_converter() {
397        let types_map = SdfTypesMap::default();
398
399        let topic = Topic {
400            name: "topic-name".to_string(),
401            schema: TopicSchema {
402                key: None,
403                value: SchemaSerDe {
404                    converter: None,
405                    type_: TypeRef {
406                        name: "u8".to_string(),
407                    },
408                },
409            },
410            consumer: None,
411            producer: None,
412            profile: None,
413        };
414
415        let res = topic
416            .validate(None, &types_map)
417            .expect_err("should error missing converter");
418
419        assert!(res.errors.contains(&TopicValidationError::MissingConverter));
420        assert_eq!(
421            res.readable(0),
422            r#"Topic `topic-name` is invalid:
423    Topic needs to have a "converter" specified for serializing/deserializing records
424"#
425        )
426    }
427
428    #[test]
429    fn test_validate_accepts_valid_topic() {
430        let types_map = SdfTypesMap::default();
431
432        let topic = Topic {
433            name: "topic-name".to_string(),
434            schema: TopicSchema {
435                key: Some(SchemaSerDe {
436                    type_: TypeRef {
437                        name: "string".to_string(),
438                    },
439                    converter: Some(Json),
440                }),
441                value: SchemaSerDe {
442                    converter: Some(Json),
443                    type_: TypeRef {
444                        name: "u8".to_string(),
445                    },
446                },
447            },
448            consumer: None,
449            producer: None,
450            profile: None,
451        };
452
453        topic.validate(None, &types_map).expect("should validate");
454    }
455}