Skip to main content

tansu_schema/
json.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! JSON schema
16
17use std::collections::BTreeMap;
18
19use crate::{
20    ARROW_LIST_FIELD_NAME, AsJsonValue, AsKafkaRecord, Error, Generator, Result, Validator,
21};
22
23use bytes::Bytes;
24
25use serde_json::Value;
26
27use tansu_sans_io::{ErrorCode, record::inflated::Batch};
28use tracing::{debug, instrument, warn};
29
30#[cfg(any(feature = "parquet", feature = "iceberg", feature = "delta"))]
31mod arrow;
32
33#[derive(Debug, Default)]
34pub struct Schema {
35    key: Option<jsonschema::Validator>,
36    value: Option<jsonschema::Validator>,
37
38    #[allow(dead_code)]
39    ids: BTreeMap<String, i32>,
40}
41
42#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
43pub(crate) enum MessageKind {
44    Key,
45    Meta,
46    Value,
47}
48
49impl AsRef<str> for MessageKind {
50    fn as_ref(&self) -> &str {
51        match self {
52            MessageKind::Key => "key",
53            MessageKind::Meta => "meta",
54            MessageKind::Value => "value",
55        }
56    }
57}
58
59fn validate(validator: Option<&jsonschema::Validator>, encoded: Option<Bytes>) -> Result<()> {
60    debug!(validator = ?validator, ?encoded);
61
62    validator
63        .map_or(Ok(()), |validator| {
64            encoded.map_or(Err(Error::Api(ErrorCode::InvalidRecord)), |encoded| {
65                serde_json::from_reader(&encoded[..])
66                    .map_err(|err| {
67                        warn!(?err, ?encoded);
68                        Error::Api(ErrorCode::InvalidRecord)
69                    })
70                    .inspect(|instance| debug!(?instance))
71                    .and_then(|instance| {
72                        validator
73                            .validate(&instance)
74                            .inspect_err(|err| warn!(?err, ?validator, %instance))
75                            .map_err(|_err| Error::Api(ErrorCode::InvalidRecord))
76                    })
77            })
78        })
79        .inspect(|r| debug!(?r))
80        .inspect_err(|err| warn!(?err))
81}
82
83impl TryFrom<Bytes> for Schema {
84    type Error = Error;
85
86    fn try_from(encoded: Bytes) -> Result<Self, Self::Error> {
87        debug!(encoded = &encoded[..]);
88        const PROPERTIES: &str = "properties";
89
90        let mut schema = serde_json::from_slice::<Value>(&encoded[..])?;
91
92        let key = schema
93            .get(PROPERTIES)
94            .and_then(|properties| properties.get(MessageKind::Key.as_ref()))
95            .inspect(|key| debug!(?key))
96            .and_then(|key| jsonschema::validator_for(key).ok());
97
98        let value = schema
99            .get(PROPERTIES)
100            .and_then(|properties| properties.get(MessageKind::Value.as_ref()))
101            .inspect(|value| debug!(?value))
102            .and_then(|value| jsonschema::validator_for(value).ok());
103
104        let meta =
105            serde_json::from_slice::<Value>(&Bytes::from_static(include_bytes!("meta.json")))
106                .inspect(|meta| debug!(%meta))?;
107
108        _ = schema
109            .get_mut(PROPERTIES)
110            .and_then(|properties| properties.as_object_mut())
111            .inspect(|properties| debug!(?properties))
112            .and_then(|object| object.insert(MessageKind::Meta.as_ref().to_owned(), meta));
113
114        let ids = field_ids(&schema);
115        debug!(?ids);
116
117        Ok(Self { key, value, ids })
118    }
119}
120
121impl Validator for Schema {
122    #[instrument(skip(self, batch), ret)]
123    fn validate(&self, batch: &Batch) -> Result<()> {
124        for record in &batch.records {
125            debug!(?record);
126
127            validate(self.key.as_ref(), record.key.clone())
128                .and(validate(self.value.as_ref(), record.value.clone()))?
129        }
130
131        Ok(())
132    }
133}
134
135impl AsKafkaRecord for Schema {
136    fn as_kafka_record(&self, value: &Value) -> Result<tansu_sans_io::record::Builder> {
137        let mut builder = tansu_sans_io::record::Record::builder();
138
139        if let Some(value) = value.get(MessageKind::Key.as_ref()) {
140            debug!(?value);
141
142            if self.key.is_some() {
143                builder = builder.key(serde_json::to_vec(value).map(Bytes::from).map(Into::into)?);
144            }
145        }
146
147        if let Some(value) = value.get(MessageKind::Value.as_ref()) {
148            debug!(?value);
149
150            if self.value.is_some() {
151                builder =
152                    builder.value(serde_json::to_vec(value).map(Bytes::from).map(Into::into)?);
153            }
154        }
155
156        Ok(builder)
157    }
158}
159
160impl Generator for Schema {
161    fn generate(&self) -> Result<tansu_sans_io::record::Builder> {
162        todo!()
163    }
164}
165
166impl AsJsonValue for Schema {
167    fn as_json_value(&self, batch: &Batch) -> Result<Value> {
168        let _ = batch;
169        todo!()
170    }
171}
172
173#[instrument(skip(schema), ret)]
174fn field_ids(schema: &Value) -> BTreeMap<String, i32> {
175    fn field_ids_with_path(path: &[&str], schema: &Value, id: &mut i32) -> BTreeMap<String, i32> {
176        debug!(?path, %schema, id);
177
178        let mut ids = BTreeMap::new();
179
180        match schema.get("type").and_then(|r#type| r#type.as_str()) {
181            Some("object") => {
182                if let Some(properties) = schema
183                    .get("properties")
184                    .and_then(|properties| properties.as_object())
185                {
186                    for (k, v) in properties {
187                        let mut path = Vec::from(path);
188                        path.push(k);
189
190                        _ = ids.insert(path.join("."), *id);
191                        *id += 1;
192
193                        ids.extend(field_ids_with_path(&path[..], v, id))
194                    }
195                }
196            }
197
198            Some("array") => {
199                let mut path = Vec::from(path);
200                path.push(ARROW_LIST_FIELD_NAME);
201                _ = ids.insert(path.join("."), *id);
202                *id += 1;
203
204                if let Some(items) = schema.get("items") {
205                    debug!(?items);
206
207                    ids.extend(field_ids_with_path(&path[..], items, id))
208                }
209            }
210
211            None | Some(_) => (),
212        }
213
214        ids
215    }
216
217    let mut ids = BTreeMap::new();
218    let mut id = 1;
219    let kinds = [MessageKind::Meta, MessageKind::Key, MessageKind::Value];
220
221    for kind in kinds {
222        if schema
223            .get("properties")
224            .and_then(|schema| schema.get(kind.as_ref()))
225            .inspect(|schema| debug!(?kind, ?schema))
226            .is_some()
227        {
228            _ = ids.insert(kind.as_ref().into(), id);
229            id += 1;
230        }
231    }
232
233    for kind in kinds {
234        if let Some(schema) = schema
235            .get("properties")
236            .and_then(|schema| schema.get(kind.as_ref()))
237            .inspect(|schema| debug!(?kind, ?schema))
238        {
239            ids.extend(field_ids_with_path(&[kind.as_ref()], schema, &mut id));
240        }
241    }
242
243    ids
244}
245
246#[cfg(test)]
247mod tests {
248    use crate::Registry;
249
250    use super::*;
251
252    use object_store::{ObjectStoreExt, PutPayload, memory::InMemory, path::Path};
253
254    use serde_json::json;
255    use std::{fs::File, sync::Arc, thread};
256    use tansu_sans_io::record::Record;
257    use tracing::subscriber::DefaultGuard;
258    use tracing_subscriber::EnvFilter;
259
260    fn init_tracing() -> Result<DefaultGuard> {
261        Ok(tracing::subscriber::set_default(
262            tracing_subscriber::fmt()
263                .with_level(true)
264                .with_line_number(true)
265                .with_thread_names(false)
266                .with_env_filter(
267                    EnvFilter::from_default_env()
268                        .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
269                )
270                .with_writer(
271                    thread::current()
272                        .name()
273                        .ok_or(Error::Message(String::from("unnamed thread")))
274                        .and_then(|name| {
275                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
276                                .map_err(Into::into)
277                        })
278                        .map(Arc::new)?,
279                )
280                .finish(),
281        ))
282    }
283
284    #[test]
285    fn assign_field_id() {
286        let schema = json!({
287            "type": "object",
288            "properties": {
289                "key": {
290                    "type": "number"
291                },
292                "value": {
293                    "type": "object",
294                    "properties": {
295                        "name": {
296                            "type": "string",
297                        },
298                        "email": {
299                            "type": "string",
300                            "format": "email"
301                        }
302                    }
303                }
304            }
305        });
306
307        let ids = field_ids(&schema);
308
309        assert!(ids.contains_key("key"));
310        assert!(ids.contains_key("value"));
311        assert!(ids.contains_key("value.name"));
312        assert!(ids.contains_key("value.email"));
313    }
314
315    #[test]
316    fn assign_field_id_with_array() {
317        let schema = json!({
318            "type": "object",
319            "properties": {
320                "key": {
321                    "type": "number"
322                },
323                "value": {
324                    "type": "array",
325                    "items": {
326                        "type": "string"
327                    }
328                }
329            }
330        });
331
332        let ids = field_ids(&schema);
333
334        assert!(ids.contains_key("key"));
335        assert!(ids.contains_key("value"));
336        assert!(ids.contains_key("value.element"));
337    }
338
339    #[tokio::test]
340    async fn key_only_invalid_record() -> Result<()> {
341        let _guard = init_tracing()?;
342
343        let topic = "def";
344
345        let payload = serde_json::to_vec(&json!({
346            "type": "object",
347            "properties": {
348                "key": {
349                    "type": "number"
350                },
351                "value": {
352                    "type": "object",
353                    "properties": {
354                        "name": {
355                            "type": "string",
356                        },
357                        "email": {
358                            "type": "string",
359                            "format": "email"
360                        }
361                    }
362                }
363            }
364        }))
365        .map(Bytes::from)
366        .map(PutPayload::from)?;
367
368        let object_store = InMemory::new();
369        let location = Path::from(format!("{topic}.json"));
370        _ = object_store.put(&location, payload).await?;
371
372        let registry = Registry::new(object_store);
373
374        let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
375
376        let batch = Batch::builder()
377            .base_timestamp(1_234_567_890 * 1_000)
378            .record(Record::builder().key(key.clone().into()))
379            .build()?;
380
381        assert!(matches!(
382            registry.validate(topic, &batch).await,
383            Err(Error::Api(ErrorCode::InvalidRecord))
384        ));
385
386        Ok(())
387    }
388
389    #[tokio::test]
390    async fn value_only_invalid_record() -> Result<()> {
391        let _guard = init_tracing()?;
392
393        let topic = "def";
394
395        let payload = serde_json::to_vec(&json!({
396            "type": "object",
397            "properties": {
398                "key": {
399                    "type": "number"
400                },
401                "value": {
402                    "type": "object",
403                    "properties": {
404                        "name": {
405                            "type": "string",
406                        },
407                        "email": {
408                            "type": "string",
409                            "format": "email"
410                        }
411                    }
412                }
413            }
414        }))
415        .map(Bytes::from)
416        .map(PutPayload::from)?;
417
418        let object_store = InMemory::new();
419        let location = Path::from(format!("{topic}.json"));
420        _ = object_store.put(&location, payload).await?;
421
422        let registry = Registry::new(object_store);
423
424        let value = serde_json::to_vec(&json!({
425            "name": "alice",
426            "email": "alice@example.com"}))
427        .map(Bytes::from)?;
428
429        let batch = Batch::builder()
430            .base_timestamp(1_234_567_890 * 1_000)
431            .record(Record::builder().value(value.clone().into()))
432            .build()?;
433
434        assert!(matches!(
435            registry.validate(topic, &batch).await,
436            Err(Error::Api(ErrorCode::InvalidRecord))
437        ));
438
439        Ok(())
440    }
441
442    #[tokio::test]
443    async fn key_and_value() -> Result<()> {
444        let _guard = init_tracing()?;
445
446        let topic = "def";
447
448        let payload = serde_json::to_vec(&json!({
449            "type": "object",
450            "properties": {
451                "key": {
452                    "type": "number"
453                },
454                "value": {
455                    "type": "object",
456                    "properties": {
457                        "name": {
458                            "type": "string",
459                        },
460                        "email": {
461                            "type": "string",
462                            "format": "email"
463                        }
464                    }
465                }
466            }
467        }))
468        .map(Bytes::from)
469        .map(PutPayload::from)?;
470
471        let object_store = InMemory::new();
472        let location = Path::from(format!("{topic}.json"));
473        _ = object_store.put(&location, payload).await?;
474
475        let registry = Registry::new(object_store);
476
477        let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
478
479        let value = serde_json::to_vec(&json!({
480                "name": "alice",
481                "email": "alice@example.com"}))
482        .map(Bytes::from)?;
483
484        let batch = Batch::builder()
485            .base_timestamp(1_234_567_890 * 1_000)
486            .record(
487                Record::builder()
488                    .key(key.clone().into())
489                    .value(value.clone().into()),
490            )
491            .build()?;
492
493        registry.validate(topic, &batch).await
494    }
495
496    #[tokio::test]
497    async fn no_schema() -> Result<()> {
498        let _guard = init_tracing()?;
499
500        let topic = "def";
501
502        let registry = Registry::new(InMemory::new());
503
504        let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
505        let value = Bytes::from_static(b"Consectetur adipiscing elit");
506
507        let batch = Batch::builder()
508            .base_timestamp(1_234_567_890 * 1_000)
509            .record(
510                Record::builder()
511                    .key(key.clone().into())
512                    .value(value.clone().into()),
513            )
514            .build()?;
515
516        registry.validate(topic, &batch).await
517    }
518
519    #[tokio::test]
520    async fn empty_schema() -> Result<()> {
521        let _guard = init_tracing()?;
522
523        let topic = "def";
524
525        let payload = serde_json::to_vec(&json!({}))
526            .map(Bytes::from)
527            .map(PutPayload::from)?;
528
529        let object_store = InMemory::new();
530        let location = Path::from(format!("{topic}.json"));
531        _ = object_store.put(&location, payload).await?;
532
533        let registry = Registry::new(object_store);
534
535        let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
536        let value = Bytes::from_static(b"Consectetur adipiscing elit");
537
538        let batch = Batch::builder()
539            .base_timestamp(1_234_567_890 * 1_000)
540            .record(
541                Record::builder()
542                    .key(key.clone().into())
543                    .value(value.clone().into()),
544            )
545            .build()?;
546
547        registry.validate(topic, &batch).await
548    }
549
550    #[tokio::test]
551    async fn key_schema_only() -> Result<()> {
552        let _guard = init_tracing()?;
553
554        let topic = "def";
555
556        let payload = serde_json::to_vec(&json!({
557            "type": "object",
558            "properties": {
559                "key": {
560                    "type": "number"
561                },
562            }
563        }))
564        .map(Bytes::from)
565        .map(PutPayload::from)?;
566
567        let object_store = InMemory::new();
568        let location = Path::from(format!("{topic}.json"));
569        _ = object_store.put(&location, payload).await?;
570
571        let registry = Registry::new(object_store);
572
573        let key = serde_json::to_vec(&json!(12320)).map(Bytes::from)?;
574
575        let value = Bytes::from_static(b"Consectetur adipiscing elit");
576
577        let batch = Batch::builder()
578            .base_timestamp(1_234_567_890 * 1_000)
579            .record(
580                Record::builder()
581                    .key(key.clone().into())
582                    .value(value.clone().into()),
583            )
584            .build()?;
585
586        registry.validate(topic, &batch).await
587    }
588
589    #[tokio::test]
590    async fn bad_key() -> Result<()> {
591        let _guard = init_tracing()?;
592
593        let topic = "def";
594
595        let payload = serde_json::to_vec(&json!({
596            "type": "object",
597            "properties": {
598                "key": {
599                    "type": "number"
600                },
601            }
602        }))
603        .map(Bytes::from)
604        .map(PutPayload::from)?;
605
606        let object_store = InMemory::new();
607        let location = Path::from(format!("{topic}.json"));
608        _ = object_store.put(&location, payload).await?;
609
610        let registry = Registry::new(object_store);
611
612        let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
613
614        let batch = Batch::builder()
615            .base_timestamp(1_234_567_890 * 1_000)
616            .record(Record::builder().key(key.clone().into()))
617            .build()?;
618
619        assert!(matches!(
620            registry.validate(topic, &batch).await,
621            Err(Error::Api(ErrorCode::InvalidRecord))
622        ));
623
624        Ok(())
625    }
626
627    #[tokio::test]
628    async fn value_schema_only() -> Result<()> {
629        let _guard = init_tracing()?;
630
631        let topic = "def";
632
633        let payload = serde_json::to_vec(&json!({
634            "type": "object",
635            "properties": {
636                "value": {
637                    "type": "object",
638                    "properties": {
639                        "name": {
640                            "type": "string",
641                        },
642                        "email": {
643                            "type": "string",
644                            "format": "email"
645                        }
646                    }
647                }
648            }
649        }))
650        .map(Bytes::from)
651        .map(PutPayload::from)?;
652
653        let object_store = InMemory::new();
654        let location = Path::from(format!("{topic}.json"));
655        _ = object_store.put(&location, payload).await?;
656
657        let registry = Registry::new(object_store);
658
659        let key = Bytes::from_static(b"Lorem ipsum dolor sit amet");
660
661        let value = serde_json::to_vec(&json!({
662                    "name": "alice",
663                    "email": "alice@example.com"}))
664        .map(Bytes::from)?;
665
666        let batch = Batch::builder()
667            .base_timestamp(1_234_567_890 * 1_000)
668            .record(
669                Record::builder()
670                    .key(key.clone().into())
671                    .value(value.clone().into()),
672            )
673            .build()?;
674
675        registry.validate(topic, &batch).await
676    }
677
678    #[tokio::test]
679    async fn bad_value() -> Result<()> {
680        let _guard = init_tracing()?;
681
682        let topic = "def";
683
684        let payload = serde_json::to_vec(&json!({
685            "type": "object",
686            "properties": {
687                "value": {
688                    "type": "object",
689                    "properties": {
690                        "name": {
691                            "type": "string",
692                        },
693                        "email": {
694                            "type": "string",
695                            "format": "email"
696                        }
697                    }
698                }
699            }
700        }))
701        .map(Bytes::from)
702        .map(PutPayload::from)?;
703
704        let object_store = InMemory::new();
705        let location = Path::from(format!("{topic}.json"));
706        _ = object_store.put(&location, payload).await?;
707
708        let registry = Registry::new(object_store);
709
710        let value = Bytes::from_static(b"Consectetur adipiscing elit");
711
712        let batch = Batch::builder()
713            .base_timestamp(1_234_567_890 * 1_000)
714            .record(Record::builder().value(value.clone().into()))
715            .build()?;
716
717        assert!(matches!(
718            registry.validate(topic, &batch).await,
719            Err(Error::Api(ErrorCode::InvalidRecord))
720        ));
721
722        Ok(())
723    }
724
725    #[test]
726    fn integer_type_can_be_float_dot_zero() -> Result<()> {
727        let schema = json!({"type": "integer"});
728        let validator = jsonschema::validator_for(&schema)?;
729
730        assert!(validator.is_valid(&json!(42)));
731        assert!(validator.is_valid(&json!(-1)));
732        assert!(validator.is_valid(&json!(1.0)));
733
734        Ok(())
735    }
736
737    #[test]
738    fn array_with_items_type_basic_output() -> Result<()> {
739        let _guard = init_tracing()?;
740
741        let schema = serde_json::to_vec(&json!({
742            "type": "object",
743            "properties": {
744                "value": {
745                    "type": "array",
746                    "items": {
747                        "type": "number"
748                    }
749                }
750            }
751        }))
752        .map_err(Into::into)
753        .map(Bytes::from)
754        .and_then(Schema::try_from)?;
755
756        assert!(
757            schema
758                .value
759                .as_ref()
760                .unwrap()
761                .evaluate(&json!([1, 2, 3, 4, 5]))
762                .flag()
763                .valid
764        );
765
766        assert!(
767            schema
768                .value
769                .as_ref()
770                .unwrap()
771                .evaluate(&json!([-1, 2.3, 3, 4.0, 5]))
772                .flag()
773                .valid
774        );
775
776        assert!(
777            !schema
778                .value
779                .as_ref()
780                .unwrap()
781                .evaluate(&json!([3, "different", { "types": "of values" }]))
782                .flag()
783                .valid
784        );
785
786        assert!(
787            !schema
788                .value
789                .as_ref()
790                .unwrap()
791                .evaluate(&json!({"Not": "an array"}))
792                .flag()
793                .valid,
794        );
795
796        Ok(())
797    }
798
799    #[test]
800    fn array_basic_output() -> Result<()> {
801        let _guard = init_tracing()?;
802
803        let schema = serde_json::to_vec(&json!({
804            "type": "object",
805            "properties": {
806                "value": {
807                    "type": "array",
808                }
809            }
810        }))
811        .map_err(Into::into)
812        .map(Bytes::from)
813        .and_then(Schema::try_from)?;
814
815        assert!(
816            schema
817                .value
818                .as_ref()
819                .unwrap()
820                .evaluate(&json!([1, 2, 3, 4, 5]))
821                .flag()
822                .valid
823        );
824
825        assert!(
826            schema
827                .value
828                .as_ref()
829                .unwrap()
830                .evaluate(&json!([3, "different", { "types": "of values" }]))
831                .flag()
832                .valid
833        );
834
835        assert!(
836            !schema
837                .value
838                .as_ref()
839                .unwrap()
840                .evaluate(&json!({"Not": "an array"}))
841                .flag()
842                .valid,
843        );
844
845        Ok(())
846    }
847
848    #[test]
849    fn schema_basic_output() -> Result<()> {
850        let _guard = init_tracing()?;
851
852        let schema = serde_json::to_vec(&json!({
853            "type": "object",
854            "properties": {
855                "key": {
856                    "type": "number"
857                },
858                "value": {
859                    "type": "object",
860                    "properties": {
861                        "name": {
862                            "type": "string",
863                        },
864                        "email": {
865                            "type": "string",
866                            "format": "email"
867                        }
868                    }
869                }
870            }
871        }))
872        .map_err(Into::into)
873        .map(Bytes::from)
874        .and_then(Schema::try_from)?;
875
876        debug!(?schema);
877
878        assert!(
879            schema
880                .key
881                .as_ref()
882                .unwrap()
883                .evaluate(&json!(12321))
884                .flag()
885                .valid
886        );
887
888        assert!(
889            schema
890                .value
891                .as_ref()
892                .unwrap()
893                .evaluate(&json!({"name": "alice", "email": "alice@example.com"}))
894                .flag()
895                .valid
896        );
897
898        Ok(())
899    }
900}