1use std::collections::BTreeMap;
18
19use crate::{
20 ARROW_LIST_FIELD_NAME, AsJsonValue, AsKafkaRecord, Error, Generator, Result, Validator,
21};
22
23use bytes::Bytes;
24
25use serde_json::Value;
26
27use tansu_sans_io::{ErrorCode, record::inflated::Batch};
28use tracing::{debug, instrument, warn};
29
30#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
31mod arrow;
32
33#[derive(Debug, Default)]
34pub struct Schema {
35 key: Option<jsonschema::Validator>,
36 value: Option<jsonschema::Validator>,
37
38 #[allow(dead_code)]
39 ids: BTreeMap<String, i32>,
40}
41
42#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
43pub(crate) enum MessageKind {
44 Key,
45 Meta,
46 Value,
47}
48
49impl AsRef<str> for MessageKind {
50 fn as_ref(&self) -> &str {
51 match self {
52 MessageKind::Key => "key",
53 MessageKind::Meta => "meta",
54 MessageKind::Value => "value",
55 }
56 }
57}
58
59fn validate(validator: Option<&jsonschema::Validator>, encoded: Option<Bytes>) -> Result<()> {
60 debug!(validator = ?validator, ?encoded);
61
62 validator
63 .map_or(Ok(()), |validator| {
64 encoded.map_or(Err(Error::Api(ErrorCode::InvalidRecord)), |encoded| {
65 serde_json::from_reader(&encoded[..])
66 .map_err(|err| {
67 warn!(?err, ?encoded);
68 Error::Api(ErrorCode::InvalidRecord)
69 })
70 .inspect(|instance| debug!(?instance))
71 .and_then(|instance| {
72 validator
73 .validate(&instance)
74 .inspect_err(|err| warn!(?err, ?validator, %instance))
75 .map_err(|_err| Error::Api(ErrorCode::InvalidRecord))
76 })
77 })
78 })
79 .inspect(|r| debug!(?r))
80 .inspect_err(|err| warn!(?err))
81}
82
83impl TryFrom<Bytes> for Schema {
84 type Error = Error;
85
86 fn try_from(encoded: Bytes) -> Result<Self, Self::Error> {
87 debug!(encoded = &encoded[..]);
88 const PROPERTIES: &str = "properties";
89
90 let mut schema = serde_json::from_slice::<Value>(&encoded[..])?;
91
92 let key = schema
93 .get(PROPERTIES)
94 .and_then(|properties| properties.get(MessageKind::Key.as_ref()))
95 .inspect(|key| debug!(?key))
96 .and_then(|key| jsonschema::validator_for(key).ok());
97
98 let value = schema
99 .get(PROPERTIES)
100 .and_then(|properties| properties.get(MessageKind::Value.as_ref()))
101 .inspect(|value| debug!(?value))
102 .and_then(|value| jsonschema::validator_for(value).ok());
103
104 let meta =
105 serde_json::from_slice::<Value>(&Bytes::from_static(include_bytes!("meta.json")))
106 .inspect(|meta| debug!(%meta))?;
107
108 _ = schema
109 .get_mut(PROPERTIES)
110 .and_then(|properties| properties.as_object_mut())
111 .inspect(|properties| debug!(?properties))
112 .and_then(|object| object.insert(MessageKind::Meta.as_ref().to_owned(), meta));
113
114 let ids = field_ids(&schema);
115 debug!(?ids);
116
117 Ok(Self { key, value, ids })
118 }
119}
120
121impl Validator for Schema {
122 #[instrument(skip(self, batch), ret)]
123 fn validate(&self, batch: &Batch) -> Result<()> {
124 for record in &batch.records {
125 debug!(?record);
126
127 validate(self.key.as_ref(), record.key.clone())
128 .and(validate(self.value.as_ref(), record.value.clone()))?
129 }
130
131 Ok(())
132 }
133}
134
135impl AsKafkaRecord for Schema {
136 fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
137 let mut builder = tansu_sans_io::record::Record::builder();
138
139 if let Some(value) = value.get(MessageKind::Key.as_ref()) {
140 debug!(?value);
141
142 if self.key.is_some() {
143 builder = builder.key(serde_json::to_vec(value).map(Bytes::from).map(Into::into)?);
144 }
145 }
146
147 if let Some(value) = value.get(MessageKind::Value.as_ref()) {
148 debug!(?value);
149
150 if self.value.is_some() {
151 builder =
152 builder.value(serde_json::to_vec(value).map(Bytes::from).map(Into::into)?);
153 }
154 }
155
156 Ok(builder)
157 }
158}
159
160impl Generator for Schema {
161 fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
162 todo!()
163 }
164}
165
166impl AsJsonValue for Schema {
167 fn as_json_value(&self, batch: &Batch) -> Result<Value> {
168 let _ = batch;
169 todo!()
170 }
171}
172
173#[instrument(skip(schema), ret)]
174fn field_ids(schema: &Value) -> BTreeMap<String, i32> {
175 fn field_ids_with_path(path: &[&str], schema: &Value, id: &mut i32) -> BTreeMap<String, i32> {
176 debug!(?path, %schema, id);
177
178 let mut ids = BTreeMap::new();
179
180 match schema.get("type").and_then(|r#type| r#type.as_str()) {
181 Some("object") => {
182 if let Some(properties) = schema
183 .get("properties")
184 .and_then(|properties| properties.as_object())
185 {
186 for (k, v) in properties {
187 let mut path = Vec::from(path);
188 path.push(k);
189
190 _ = ids.insert(path.join("."), *id);
191 *id += 1;
192
193 ids.extend(field_ids_with_path(&path[..], v, id))
194 }
195 }
196 }
197
198 Some("array") => {
199 let mut path = Vec::from(path);
200 path.push(ARROW_LIST_FIELD_NAME);
201 _ = ids.insert(path.join("."), *id);
202 *id += 1;
203
204 if let Some(items) = schema.get("items") {
205 debug!(?items);
206
207 ids.extend(field_ids_with_path(&path[..], items, id))
208 }
209 }
210
211 None | Some(_) => (),
212 }
213
214 ids
215 }
216
217 let mut ids = BTreeMap::new();
218 let mut id = 1;
219 let kinds = [MessageKind::Meta, MessageKind::Key, MessageKind::Value];
220
221 for kind in kinds {
222 if schema
223 .get("properties")
224 .and_then(|schema| schema.get(kind.as_ref()))
225 .inspect(|schema| debug!(?kind, ?schema))
226 .is_some()
227 {
228 _ = ids.insert(kind.as_ref().into(), id);
229 id += 1;
230 }
231 }
232
233 for kind in kinds {
234 if let Some(schema) = schema
235 .get("properties")
236 .and_then(|schema| schema.get(kind.as_ref()))
237 .inspect(|schema| debug!(?kind, ?schema))
238 {
239 ids.extend(field_ids_with_path(&[kind.as_ref()], schema, &mut id));
240 }
241 }
242
243 ids
244}
245
246#[cfg(test)]
247mod tests {
248 use crate::Registry;
249
250 use super::*;
251
252 use object_store::{ObjectStoreExt, PutPayload, memory::InMemory, path::Path};
253
254 use serde_json::json;
255 use std::{fs::File, sync::Arc, thread};
256 use tansu_sans_io::record::Record;
257 use tracing::subscriber::DefaultGuard;
258 use tracing_subscriber::EnvFilter;
259
260 fn init_tracing() -> Result<DefaultGuard> {
261 Ok(tracing::subscriber::set_default(
262 tracing_subscriber::fmt()
263 .with_level(true)
264 .with_line_number(true)
265 .with_thread_names(false)
266 .with_env_filter(
267 EnvFilter::from_default_env()
268 .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
269 )
270 .with_writer(
271 thread::current()
272 .name()
273 .ok_or(Error::Message(String::from("unnamed thread")))
274 .and_then(|name| {
275 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
276 .map_err(Into::into)
277 })
278 .map(Arc::new)?,
279 )
280 .finish(),
281 ))
282 }
283
284 #[test]
285 fn assign_field_id() {
286 let schema = json!({
287 "type": "object",
288 "properties": {
289 "key": {
290 "type": "number"
291 },
292 "value": {
293 "type": "object",
294 "properties": {
295 "name": {
296 "type": "string",
297 },
298 "email": {
299 "type": "string",
300 "format": "email"
301 }
302 }
303 }
304 }
305 });
306
307 let ids = field_ids(&schema);
308
309 assert!(ids.contains_key("key"));
310 assert!(ids.contains_key("value"));
311 assert!(ids.contains_key("value.name"));
312 assert!(ids.contains_key("value.email"));
313 }
314
315 #[test]
316 fn assign_field_id_with_array() {
317 let schema = json!({
318 "type": "object",
319 "properties": {
320 "key": {
321 "type": "number"
322 },
323 "value": {
324 "type": "array",
325 "items": {
326 "type": "string"
327 }
328 }
329 }
330 });
331
332 let ids = field_ids(&schema);
333
334 assert!(ids.contains_key("key"));
335 assert!(ids.contains_key("value"));
336 assert!(ids.contains_key("value.element"));
337 }
338
339 #[tokio::test]
340 async fn key_only_invalid_record() -> Result<()> {
341 let _guard = init_tracing()?;
342
343 let topic = "def";
344
345 let payload = serde_json::to_vec(&json!({
346 "type": "object",
347 "properties": {
348 "key": {
349 "type": "number"
350 },
351 "value": {
352 "type": "object",
353 "properties": {
354 "name": {
355 "type": "string",
356 },
357 "email": {
358 "type": "string",
359 "format": "email"
360 }
361 }
362 }
363 }
364 }))
365 .map(Bytes::from)
366 .map(PutPayload::from)?;
367
368 let object_store = InMemory::new();
369 let location = Path::from(format!("{topic}.json"));
370 _ = object_store.put(&location, payload).await?;
371
372 let registry = Registry::new(object_store);
373
374 let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
375
376 let batch = Batch::builder()
377 .base_timestamp(1_234_567_890 * 1_000)
378 .record(Record::builder().key(key.clone().into()))
379 .build()?;
380
381 assert!(matches!(
382 registry.validate(topic, &batch).await,
383 Err(Error::Api(ErrorCode::InvalidRecord))
384 ));
385
386 Ok(())
387 }
388
389 #[tokio::test]
390 async fn value_only_invalid_record() -> Result<()> {
391 let _guard = init_tracing()?;
392
393 let topic = "def";
394
395 let payload = serde_json::to_vec(&json!({
396 "type": "object",
397 "properties": {
398 "key": {
399 "type": "number"
400 },
401 "value": {
402 "type": "object",
403 "properties": {
404 "name": {
405 "type": "string",
406 },
407 "email": {
408 "type": "string",
409 "format": "email"
410 }
411 }
412 }
413 }
414 }))
415 .map(Bytes::from)
416 .map(PutPayload::from)?;
417
418 let object_store = InMemory::new();
419 let location = Path::from(format!("{topic}.json"));
420 _ = object_store.put(&location, payload).await?;
421
422 let registry = Registry::new(object_store);
423
424 let value = serde_json::to_vec(&json!({
425 "name": "alice",
426 "email": "alice@example.com"}))
427 .map(Bytes::from)?;
428
429 let batch = Batch::builder()
430 .base_timestamp(1_234_567_890 * 1_000)
431 .record(Record::builder().value(value.clone().into()))
432 .build()?;
433
434 assert!(matches!(
435 registry.validate(topic, &batch).await,
436 Err(Error::Api(ErrorCode::InvalidRecord))
437 ));
438
439 Ok(())
440 }
441
442 #[tokio::test]
443 async fn key_and_value() -> Result<()> {
444 let _guard = init_tracing()?;
445
446 let topic = "def";
447
448 let payload = serde_json::to_vec(&json!({
449 "type": "object",
450 "properties": {
451 "key": {
452 "type": "number"
453 },
454 "value": {
455 "type": "object",
456 "properties": {
457 "name": {
458 "type": "string",
459 },
460 "email": {
461 "type": "string",
462 "format": "email"
463 }
464 }
465 }
466 }
467 }))
468 .map(Bytes::from)
469 .map(PutPayload::from)?;
470
471 let object_store = InMemory::new();
472 let location = Path::from(format!("{topic}.json"));
473 _ = object_store.put(&location, payload).await?;
474
475 let registry = Registry::new(object_store);
476
477 let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
478
479 let value = serde_json::to_vec(&json!({
480 "name": "alice",
481 "email": "alice@example.com"}))
482 .map(Bytes::from)?;
483
484 let batch = Batch::builder()
485 .base_timestamp(1_234_567_890 * 1_000)
486 .record(
487 Record::builder()
488 .key(key.clone().into())
489 .value(value.clone().into()),
490 )
491 .build()?;
492
493 registry.validate(topic, &batch).await
494 }
495
496 #[tokio::test]
497 async fn no_schema() -> Result<()> {
498 let _guard = init_tracing()?;
499
500 let topic = "def";
501
502 let registry = Registry::new(InMemory::new());
503
504 let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
505 let value = Bytes::from_static(b"Consectetur adipiscing elit");
506
507 let batch = Batch::builder()
508 .base_timestamp(1_234_567_890 * 1_000)
509 .record(
510 Record::builder()
511 .key(key.clone().into())
512 .value(value.clone().into()),
513 )
514 .build()?;
515
516 registry.validate(topic, &batch).await
517 }
518
519 #[tokio::test]
520 async fn empty_schema() -> Result<()> {
521 let _guard = init_tracing()?;
522
523 let topic = "def";
524
525 let payload = serde_json::to_vec(&json!({}))
526 .map(Bytes::from)
527 .map(PutPayload::from)?;
528
529 let object_store = InMemory::new();
530 let location = Path::from(format!("{topic}.json"));
531 _ = object_store.put(&location, payload).await?;
532
533 let registry = Registry::new(object_store);
534
535 let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
536 let value = Bytes::from_static(b"Consectetur adipiscing elit");
537
538 let batch = Batch::builder()
539 .base_timestamp(1_234_567_890 * 1_000)
540 .record(
541 Record::builder()
542 .key(key.clone().into())
543 .value(value.clone().into()),
544 )
545 .build()?;
546
547 registry.validate(topic, &batch).await
548 }
549
550 #[tokio::test]
551 async fn key_schema_only() -> Result<()> {
552 let _guard = init_tracing()?;
553
554 let topic = "def";
555
556 let payload = serde_json::to_vec(&json!({
557 "type": "object",
558 "properties": {
559 "key": {
560 "type": "number"
561 },
562 }
563 }))
564 .map(Bytes::from)
565 .map(PutPayload::from)?;
566
567 let object_store = InMemory::new();
568 let location = Path::from(format!("{topic}.json"));
569 _ = object_store.put(&location, payload).await?;
570
571 let registry = Registry::new(object_store);
572
573 let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
574
575 let value = Bytes::from_static(b"Consectetur adipiscing elit");
576
577 let batch = Batch::builder()
578 .base_timestamp(1_234_567_890 * 1_000)
579 .record(
580 Record::builder()
581 .key(key.clone().into())
582 .value(value.clone().into()),
583 )
584 .build()?;
585
586 registry.validate(topic, &batch).await
587 }
588
589 #[tokio::test]
590 async fn bad_key() -> Result<()> {
591 let _guard = init_tracing()?;
592
593 let topic = "def";
594
595 let payload = serde_json::to_vec(&json!({
596 "type": "object",
597 "properties": {
598 "key": {
599 "type": "number"
600 },
601 }
602 }))
603 .map(Bytes::from)
604 .map(PutPayload::from)?;
605
606 let object_store = InMemory::new();
607 let location = Path::from(format!("{topic}.json"));
608 _ = object_store.put(&location, payload).await?;
609
610 let registry = Registry::new(object_store);
611
612 let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
613
614 let batch = Batch::builder()
615 .base_timestamp(1_234_567_890 * 1_000)
616 .record(Record::builder().key(key.clone().into()))
617 .build()?;
618
619 assert!(matches!(
620 registry.validate(topic, &batch).await,
621 Err(Error::Api(ErrorCode::InvalidRecord))
622 ));
623
624 Ok(())
625 }
626
627 #[tokio::test]
628 async fn value_schema_only() -> Result<()> {
629 let _guard = init_tracing()?;
630
631 let topic = "def";
632
633 let payload = serde_json::to_vec(&json!({
634 "type": "object",
635 "properties": {
636 "value": {
637 "type": "object",
638 "properties": {
639 "name": {
640 "type": "string",
641 },
642 "email": {
643 "type": "string",
644 "format": "email"
645 }
646 }
647 }
648 }
649 }))
650 .map(Bytes::from)
651 .map(PutPayload::from)?;
652
653 let object_store = InMemory::new();
654 let location = Path::from(format!("{topic}.json"));
655 _ = object_store.put(&location, payload).await?;
656
657 let registry = Registry::new(object_store);
658
659 let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
660
661 let value = serde_json::to_vec(&json!({
662 "name": "alice",
663 "email": "alice@example.com"}))
664 .map(Bytes::from)?;
665
666 let batch = Batch::builder()
667 .base_timestamp(1_234_567_890 * 1_000)
668 .record(
669 Record::builder()
670 .key(key.clone().into())
671 .value(value.clone().into()),
672 )
673 .build()?;
674
675 registry.validate(topic, &batch).await
676 }
677
678 #[tokio::test]
679 async fn bad_value() -> Result<()> {
680 let _guard = init_tracing()?;
681
682 let topic = "def";
683
684 let payload = serde_json::to_vec(&json!({
685 "type": "object",
686 "properties": {
687 "value": {
688 "type": "object",
689 "properties": {
690 "name": {
691 "type": "string",
692 },
693 "email": {
694 "type": "string",
695 "format": "email"
696 }
697 }
698 }
699 }
700 }))
701 .map(Bytes::from)
702 .map(PutPayload::from)?;
703
704 let object_store = InMemory::new();
705 let location = Path::from(format!("{topic}.json"));
706 _ = object_store.put(&location, payload).await?;
707
708 let registry = Registry::new(object_store);
709
710 let value = Bytes::from_static(b"Consectetur adipiscing elit");
711
712 let batch = Batch::builder()
713 .base_timestamp(1_234_567_890 * 1_000)
714 .record(Record::builder().value(value.clone().into()))
715 .build()?;
716
717 assert!(matches!(
718 registry.validate(topic, &batch).await,
719 Err(Error::Api(ErrorCode::InvalidRecord))
720 ));
721
722 Ok(())
723 }
724
725 #[test]
726 fn integer_type_can_be_float_dot_zero() -> Result<()> {
727 let schema = json!({"type": "integer"});
728 let validator = jsonschema::validator_for(&schema)?;
729
730 assert!(validator.is_valid(&json!(42)));
731 assert!(validator.is_valid(&json!(-1)));
732 assert!(validator.is_valid(&json!(1.0)));
733
734 Ok(())
735 }
736
737 #[test]
738 fn array_with_items_type_basic_output() -> Result<()> {
739 let _guard = init_tracing()?;
740
741 let schema = serde_json::to_vec(&json!({
742 "type": "object",
743 "properties": {
744 "value": {
745 "type": "array",
746 "items": {
747 "type": "number"
748 }
749 }
750 }
751 }))
752 .map_err(Into::into)
753 .map(Bytes::from)
754 .and_then(Schema::try_from)?;
755
756 assert!(
757 schema
758 .value
759 .as_ref()
760 .unwrap()
761 .evaluate(&json!([1, 2, 3, 4, 5]))
762 .flag()
763 .valid
764 );
765
766 assert!(
767 schema
768 .value
769 .as_ref()
770 .unwrap()
771 .evaluate(&json!([-1, 2.3, 3, 4.0, 5]))
772 .flag()
773 .valid
774 );
775
776 assert!(
777 !schema
778 .value
779 .as_ref()
780 .unwrap()
781 .evaluate(&json!([3, "different", { "types": "of values" }]))
782 .flag()
783 .valid
784 );
785
786 assert!(
787 !schema
788 .value
789 .as_ref()
790 .unwrap()
791 .evaluate(&json!({"Not": "an array"}))
792 .flag()
793 .valid,
794 );
795
796 Ok(())
797 }
798
799 #[test]
800 fn array_basic_output() -> Result<()> {
801 let _guard = init_tracing()?;
802
803 let schema = serde_json::to_vec(&json!({
804 "type": "object",
805 "properties": {
806 "value": {
807 "type": "array",
808 }
809 }
810 }))
811 .map_err(Into::into)
812 .map(Bytes::from)
813 .and_then(Schema::try_from)?;
814
815 assert!(
816 schema
817 .value
818 .as_ref()
819 .unwrap()
820 .evaluate(&json!([1, 2, 3, 4, 5]))
821 .flag()
822 .valid
823 );
824
825 assert!(
826 schema
827 .value
828 .as_ref()
829 .unwrap()
830 .evaluate(&json!([3, "different", { "types": "of values" }]))
831 .flag()
832 .valid
833 );
834
835 assert!(
836 !schema
837 .value
838 .as_ref()
839 .unwrap()
840 .evaluate(&json!({"Not": "an array"}))
841 .flag()
842 .valid,
843 );
844
845 Ok(())
846 }
847
848 #[test]
849 fn schema_basic_output() -> Result<()> {
850 let _guard = init_tracing()?;
851
852 let schema = serde_json::to_vec(&json!({
853 "type": "object",
854 "properties": {
855 "key": {
856 "type": "number"
857 },
858 "value": {
859 "type": "object",
860 "properties": {
861 "name": {
862 "type": "string",
863 },
864 "email": {
865 "type": "string",
866 "format": "email"
867 }
868 }
869 }
870 }
871 }))
872 .map_err(Into::into)
873 .map(Bytes::from)
874 .and_then(Schema::try_from)?;
875
876 debug!(?schema);
877
878 assert!(
879 schema
880 .key
881 .as_ref()
882 .unwrap()
883 .evaluate(&json!(12321))
884 .flag()
885 .valid
886 );
887
888 assert!(
889 schema
890 .value
891 .as_ref()
892 .unwrap()
893 .evaluate(&json!({"name": "alice", "email": "alice@example.com"}))
894 .flag()
895 .valid
896 );
897
898 Ok(())
899 }
900}