sdf_metadata/metadata/io/
topic.rs1use crate::{
2 util::{
3 config_error::{ConfigError, INDENT},
4 sdf_types_map::SdfTypesMap,
5 },
6 wit::{
7 dataflow::{DefaultConfigurations, Topic},
8 io::TypeRef,
9 metadata::{OutputType, SdfKeyValue},
10 },
11};
12
13#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
14pub struct TopicValidationFailure {
15 pub name: String,
16 pub errors: Vec<TopicValidationError>,
17}
18
19impl ConfigError for TopicValidationFailure {
20 fn readable(&self, indents: usize) -> String {
21 let mut result = format!(
22 "{}Topic `{}` is invalid:\n",
23 INDENT.repeat(indents),
24 self.name
25 );
26
27 for error in &self.errors {
28 result.push_str(&error.readable(indents + 1));
29 }
30
31 result
32 }
33}
34
35#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
36pub enum TopicValidationError {
37 InvalidKeyRef(String),
38 InvalidValueRef(String),
39 Name(Vec<TopicNameError>),
40 MissingConverter,
41}
42
43impl ConfigError for TopicValidationError {
44 fn readable(&self, indents: usize) -> String {
45 let indent = INDENT.repeat(indents);
46
47 match self {
48 Self::InvalidKeyRef(key) => {
49 format!(
50 "{}Referenced key type `{}` not found in config or imported types\n",
51 indent, key
52 )
53 }
54 Self::InvalidValueRef(value) => {
55 format!(
56 "{}Referenced type `{}` not found in config or imported types\n",
57 indent, value
58 )
59 }
60 Self::Name(errors) => {
61 let mut result = format!("{}Topic name is invalid:\n", indent);
62
63 for error in errors {
64 result.push_str(&error.readable(indents + 1));
65 }
66 result
67 }
68 Self::MissingConverter => {
69 format!(
70 "{}Topic needs to have a \"converter\" specified for serializing/deserializing records\n",
71 indent
72 )
73 }
74 }
75 }
76}
77
78#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
79pub enum TopicNameError {
80 Empty,
81 TooLong,
82 InvalidChars,
83 StartsOrEndsWithDash,
84}
85
86impl ConfigError for TopicNameError {
87 fn readable(&self, indents: usize) -> String {
88 let indent = INDENT.repeat(indents);
89
90 match self {
91 Self::Empty => format!("{}Name cannot be empty\n", indent),
92 Self::TooLong => format!(
93 "{}Name is too long, Topic names may only have {MAX_TOPIC_NAME_LEN} characters\n",
94 indent
95 ),
96 Self::InvalidChars => format!(
97 "{}Name may only contain lowercase alphanumeric characters or '-'\n",
98 indent
99 ),
100 Self::StartsOrEndsWithDash => {
101 format!("{}Name cannot start or end with a dash\n", indent)
102 }
103 }
104 }
105}
106
107#[derive(Debug, Clone, Eq, PartialEq)]
108pub struct KVSchemaType {
109 pub key: Option<TypeRef>,
110 pub value: TypeRef,
111}
112
113impl KVSchemaType {
114 pub fn timestamp() -> Self {
115 Self {
116 key: None,
117 value: TypeRef {
118 name: "s64".to_string(),
119 },
120 }
121 }
122}
123
124impl From<(Option<TypeRef>, TypeRef)> for KVSchemaType {
125 fn from((key, value): (Option<TypeRef>, TypeRef)) -> Self {
126 Self { key, value }
127 }
128}
129
130impl From<OutputType> for KVSchemaType {
131 fn from(output_type: OutputType) -> Self {
132 match output_type {
133 OutputType::Ref(r) => Self {
134 key: None,
135 value: r,
136 },
137 OutputType::KeyValue(SdfKeyValue { key, value }) => Self {
138 key: Some(key),
139 value,
140 },
141 }
142 }
143}
144
145const MAX_TOPIC_NAME_LEN: usize = 63;
146
147impl Topic {
148 pub fn validate(
149 &self,
150 default_configs: Option<DefaultConfigurations>,
151 types_map: &SdfTypesMap,
152 ) -> Result<(), TopicValidationFailure> {
153 let mut failure = TopicValidationFailure {
154 name: self.name.clone(),
155 errors: vec![],
156 };
157
158 if let Err(name_errors) = validate_topic_name(&self.name) {
159 failure.errors.push(TopicValidationError::Name(name_errors));
160 }
161
162 if let Some(key) = &self.schema.key {
163 if !types_map.contains_key(&key.type_.name) {
164 failure
168 .errors
169 .push(TopicValidationError::InvalidKeyRef(key.type_.name.clone()));
170 }
171 }
172
173 if !types_map.contains_key(&self.schema.value.type_.name) {
174 failure.errors.push(TopicValidationError::InvalidValueRef(
175 self.schema.value.type_.name.clone(),
176 ))
177 }
178
179 if self
180 .schema
181 .value
182 .converter
183 .or(default_configs.and_then(|c| c.converter))
184 .is_none()
185 {
186 failure.errors.push(TopicValidationError::MissingConverter);
187 }
188 if failure.errors.is_empty() {
189 Ok(())
190 } else {
191 Err(failure)
192 }
193 }
194
195 pub fn type_(&self) -> KVSchemaType {
196 (
197 self.schema.key.as_ref().map(|key| key.type_.clone()),
198 self.schema.value.type_.clone(),
199 )
200 .into()
201 }
202}
203
204pub fn validate_topic_name(name: &str) -> Result<(), Vec<TopicNameError>> {
205 let mut errors = vec![];
206
207 if name.is_empty() {
208 errors.push(TopicNameError::Empty);
209 }
210
211 if name.len() > MAX_TOPIC_NAME_LEN {
212 errors.push(TopicNameError::TooLong);
213 }
214
215 if !name
216 .chars()
217 .all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '-')
218 {
219 errors.push(TopicNameError::InvalidChars);
220 }
221
222 if name.ends_with('-') || name.starts_with('-') {
223 errors.push(TopicNameError::StartsOrEndsWithDash);
224 }
225
226 if errors.is_empty() {
227 Ok(())
228 } else {
229 Err(errors)
230 }
231}
232
233#[cfg(test)]
234mod test {
235 use crate::wit::io::{SchemaSerDe, TopicSchema, TypeRef, SerdeConverter::Json};
236 use crate::util::config_error::ConfigError;
237
238 use super::*;
239
240 #[test]
241 fn test_validate_topic_name_rejects_long_name() {
242 let name = "a".repeat(MAX_TOPIC_NAME_LEN + 1);
243 let res = validate_topic_name(&name).expect_err("should error for long name");
244
245 assert!(res.contains(&TopicNameError::TooLong));
246
247 assert_eq!(
248 res[0].readable(0),
249 "Name is too long, Topic names may only have 63 characters\n"
250 )
251 }
252
253 #[test]
254 fn test_validate_topic_name_rejects_non_alphanumeric_name() {
255 let name = "invalid-to&pic-name";
256 let res = validate_topic_name(name).expect_err("should error for invalid name");
257
258 assert!(res.contains(&TopicNameError::InvalidChars));
259
260 assert_eq!(
261 res[0].readable(0),
262 "Name may only contain lowercase alphanumeric characters or '-'\n"
263 )
264 }
265
266 #[test]
267 fn test_validate_topic_name_rejects_name_starting_with_dash() {
268 let name = "-invalid-topic-name";
269 let res = validate_topic_name(name).expect_err("should error for invalid name");
270
271 assert!(res.contains(&TopicNameError::StartsOrEndsWithDash));
272 assert_eq!(res[0].readable(0), "Name cannot start or end with a dash\n")
273 }
274
275 #[test]
276 fn test_validate_topic_name_rejects_name_ending_with_dash() {
277 let name = "invalid-topic-name-";
278 let res = validate_topic_name(name).expect_err("should error for invalid name");
279
280 assert!(res.contains(&TopicNameError::StartsOrEndsWithDash));
281 assert_eq!(res[0].readable(0), "Name cannot start or end with a dash\n")
282 }
283
284 #[test]
285 fn test_validate_rejects_invalid_topic_name() {
286 let types_map = SdfTypesMap::default();
287
288 let topic = Topic {
289 name: "invalid-to&pic-name".to_string(),
290 schema: TopicSchema {
291 key: None,
292 value: SchemaSerDe {
293 converter: Some(Json),
294 type_: TypeRef {
295 name: "u8".to_string(),
296 },
297 },
298 },
299 consumer: None,
300 producer: None,
301 profile: None,
302 };
303
304 let res = topic
305 .validate(None, &types_map)
306 .expect_err("should error for invalid name");
307
308 assert!(res.errors.contains(&TopicValidationError::Name(vec![
309 TopicNameError::InvalidChars
310 ])));
311 assert_eq!(
312 res.readable(0),
313 r#"Topic `invalid-to&pic-name` is invalid:
314 Topic name is invalid:
315 Name may only contain lowercase alphanumeric characters or '-'
316"#
317 )
318 }
319
320 #[test]
321 fn test_validate_rejects_invalid_record_key_datatype() {
322 let types_map = SdfTypesMap::default();
323
324 let topic = Topic {
325 name: "topic-name".to_string(),
326 schema: TopicSchema {
327 key: Some(SchemaSerDe {
328 converter: Some(Json),
329 type_: TypeRef {
330 name: "foobar".to_string(),
331 },
332 }),
333 value: SchemaSerDe {
334 converter: Some(Json),
335 type_: TypeRef {
336 name: "u8".to_string(),
337 },
338 },
339 },
340 consumer: None,
341 producer: None,
342 profile: None,
343 };
344
345 let res = topic
346 .validate(None, &types_map)
347 .expect_err("should error for invalid record key type");
348
349 assert!(res
350 .errors
351 .contains(&TopicValidationError::InvalidKeyRef("foobar".to_string())));
352 assert_eq!(
353 res.readable(0),
354 r#"Topic `topic-name` is invalid:
355 Referenced key type `foobar` not found in config or imported types
356"#
357 )
358 }
359
360 #[test]
361 fn test_validate_rejects_invalid_record_value_datatype() {
362 let types_map = SdfTypesMap::default();
363
364 let topic = Topic {
365 name: "topic-name".to_string(),
366 schema: TopicSchema {
367 key: None,
368 value: SchemaSerDe {
369 converter: Some(Json),
370 type_: TypeRef {
371 name: "foobar".to_string(),
372 },
373 },
374 },
375 consumer: None,
376 producer: None,
377 profile: None,
378 };
379
380 let res = topic
381 .validate(None, &types_map)
382 .expect_err("should error for invalid record type");
383
384 assert!(res
385 .errors
386 .contains(&TopicValidationError::InvalidValueRef("foobar".to_string())));
387 assert_eq!(
388 res.readable(0),
389 r#"Topic `topic-name` is invalid:
390 Referenced type `foobar` not found in config or imported types
391"#
392 )
393 }
394
395 #[test]
396 fn test_validate_rejects_topics_with_missing_converter() {
397 let types_map = SdfTypesMap::default();
398
399 let topic = Topic {
400 name: "topic-name".to_string(),
401 schema: TopicSchema {
402 key: None,
403 value: SchemaSerDe {
404 converter: None,
405 type_: TypeRef {
406 name: "u8".to_string(),
407 },
408 },
409 },
410 consumer: None,
411 producer: None,
412 profile: None,
413 };
414
415 let res = topic
416 .validate(None, &types_map)
417 .expect_err("should error missing converter");
418
419 assert!(res.errors.contains(&TopicValidationError::MissingConverter));
420 assert_eq!(
421 res.readable(0),
422 r#"Topic `topic-name` is invalid:
423 Topic needs to have a "converter" specified for serializing/deserializing records
424"#
425 )
426 }
427
428 #[test]
429 fn test_validate_accepts_valid_topic() {
430 let types_map = SdfTypesMap::default();
431
432 let topic = Topic {
433 name: "topic-name".to_string(),
434 schema: TopicSchema {
435 key: Some(SchemaSerDe {
436 type_: TypeRef {
437 name: "string".to_string(),
438 },
439 converter: Some(Json),
440 }),
441 value: SchemaSerDe {
442 converter: Some(Json),
443 type_: TypeRef {
444 name: "u8".to_string(),
445 },
446 },
447 },
448 consumer: None,
449 producer: None,
450 profile: None,
451 };
452
453 topic.validate(None, &types_map).expect("should validate");
454 }
455}