1use crate::{
20 encode::{encode, encode_internal, encode_to_vec},
21 rabin::Rabin,
22 schema::{AvroSchema, ResolvedOwnedSchema, ResolvedSchema, Schema},
23 ser::Serializer,
24 types::Value,
25 AvroResult, Codec, Error,
26};
27use serde::Serialize;
28use std::{collections::HashMap, io::Write, marker::PhantomData};
29
30const DEFAULT_BLOCK_SIZE: usize = 16000;
31const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
32
33#[derive(typed_builder::TypedBuilder)]
35pub struct Writer<'a, W> {
36 schema: &'a Schema,
37 writer: W,
38 #[builder(default, setter(skip))]
39 resolved_schema: Option<ResolvedSchema<'a>>,
40 #[builder(default = Codec::Null)]
41 codec: Codec,
42 #[builder(default = DEFAULT_BLOCK_SIZE)]
43 block_size: usize,
44 #[builder(default = Vec::with_capacity(block_size), setter(skip))]
45 buffer: Vec<u8>,
46 #[builder(default, setter(skip))]
47 serializer: Serializer,
48 #[builder(default = 0, setter(skip))]
49 num_values: usize,
50 #[builder(default = generate_sync_marker())]
51 marker: [u8; 16],
52 #[builder(default = false, setter(skip))]
53 has_header: bool,
54 #[builder(default)]
55 user_metadata: HashMap<String, Value>,
56}
57
58impl<'a, W: Write> Writer<'a, W> {
59 pub fn new(schema: &'a Schema, writer: W) -> Self {
63 Writer::with_codec(schema, writer, Codec::Null)
64 }
65
66 pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
69 let mut w = Self::builder()
70 .schema(schema)
71 .writer(writer)
72 .codec(codec)
73 .build();
74 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
75 w
76 }
77
78 pub fn with_schemata(
83 schema: &'a Schema,
84 schemata: Vec<&'a Schema>,
85 writer: W,
86 codec: Codec,
87 ) -> Self {
88 let mut w = Self::builder()
89 .schema(schema)
90 .writer(writer)
91 .codec(codec)
92 .build();
93 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
94 w
95 }
96
97 pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
101 Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
102 }
103
104 pub fn append_to_with_codec(
107 schema: &'a Schema,
108 writer: W,
109 codec: Codec,
110 marker: [u8; 16],
111 ) -> Self {
112 let mut w = Self::builder()
113 .schema(schema)
114 .writer(writer)
115 .codec(codec)
116 .marker(marker)
117 .build();
118 w.has_header = true;
119 w.resolved_schema = ResolvedSchema::try_from(schema).ok();
120 w
121 }
122
123 pub fn append_to_with_codec_schemata(
126 schema: &'a Schema,
127 schemata: Vec<&'a Schema>,
128 writer: W,
129 codec: Codec,
130 marker: [u8; 16],
131 ) -> Self {
132 let mut w = Self::builder()
133 .schema(schema)
134 .writer(writer)
135 .codec(codec)
136 .marker(marker)
137 .build();
138 w.has_header = true;
139 w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
140 w
141 }
142
143 pub fn schema(&self) -> &'a Schema {
145 self.schema
146 }
147
148 pub fn append<T: Into<Value>>(&mut self, value: T) -> AvroResult<usize> {
157 let n = self.maybe_write_header()?;
158
159 let avro = value.into();
160 self.append_value_ref(&avro).map(|m| m + n)
161 }
162
163 pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
171 let n = self.maybe_write_header()?;
172
173 match self.resolved_schema {
175 Some(ref rs) => {
176 write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
177 self.num_values += 1;
178
179 if self.buffer.len() >= self.block_size {
180 return self.flush().map(|b| b + n);
181 }
182
183 Ok(n)
184 }
185 None => {
186 let rs = ResolvedSchema::try_from(self.schema)?;
187 self.resolved_schema = Some(rs);
188 self.append_value_ref(value)
189 }
190 }
191 }
192
193 pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
203 let avro_value = value.serialize(&mut self.serializer)?;
204 self.append(avro_value)
205 }
206
207 pub fn extend<I, T: Into<Value>>(&mut self, values: I) -> AvroResult<usize>
215 where
216 I: IntoIterator<Item = T>,
217 {
218 let mut num_bytes = 0;
233 for value in values {
234 num_bytes += self.append(value)?;
235 }
236 num_bytes += self.flush()?;
237
238 Ok(num_bytes)
239 }
240
241 pub fn extend_ser<I, T: Serialize>(&mut self, values: I) -> AvroResult<usize>
250 where
251 I: IntoIterator<Item = T>,
252 {
253 let mut num_bytes = 0;
268 for value in values {
269 num_bytes += self.append_ser(value)?;
270 }
271 num_bytes += self.flush()?;
272
273 Ok(num_bytes)
274 }
275
276 pub fn extend_from_slice(&mut self, values: &[Value]) -> AvroResult<usize> {
284 let mut num_bytes = 0;
285 for value in values {
286 num_bytes += self.append_value_ref(value)?;
287 }
288 num_bytes += self.flush()?;
289
290 Ok(num_bytes)
291 }
292
293 pub fn flush(&mut self) -> AvroResult<usize> {
298 if self.num_values == 0 {
299 return Ok(0);
300 }
301
302 self.codec.compress(&mut self.buffer)?;
303
304 let num_values = self.num_values;
305 let stream_len = self.buffer.len();
306
307 let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
308 + self.append_raw(&stream_len.into(), &Schema::Long)?
309 + self
310 .writer
311 .write(self.buffer.as_ref())
312 .map_err(Error::WriteBytes)?
313 + self.append_marker()?;
314
315 self.buffer.clear();
316 self.num_values = 0;
317
318 Ok(num_bytes)
319 }
320
321 pub fn into_inner(mut self) -> AvroResult<W> {
326 self.maybe_write_header()?;
327 self.flush()?;
328 Ok(self.writer)
329 }
330
331 fn append_marker(&mut self) -> AvroResult<usize> {
333 self.writer.write(&self.marker).map_err(Error::WriteMarker)
336 }
337
338 fn append_raw(&mut self, value: &Value, schema: &Schema) -> AvroResult<usize> {
340 self.append_bytes(encode_to_vec(value, schema)?.as_ref())
341 }
342
343 fn append_bytes(&mut self, bytes: &[u8]) -> AvroResult<usize> {
345 self.writer.write(bytes).map_err(Error::WriteBytes)
346 }
347
348 pub fn add_user_metadata<T: AsRef<[u8]>>(&mut self, key: String, value: T) -> AvroResult<()> {
351 if !self.has_header {
352 if key.starts_with("avro.") {
353 return Err(Error::InvalidMetadataKey(key));
354 }
355 self.user_metadata
356 .insert(key, Value::Bytes(value.as_ref().to_vec()));
357 Ok(())
358 } else {
359 Err(Error::FileHeaderAlreadyWritten)
360 }
361 }
362
363 fn header(&self) -> Result<Vec<u8>, Error> {
365 let schema_bytes = serde_json::to_string(self.schema)
366 .map_err(Error::ConvertJsonToString)?
367 .into_bytes();
368
369 let mut metadata = HashMap::with_capacity(2);
370 metadata.insert("avro.schema", Value::Bytes(schema_bytes));
371 metadata.insert("avro.codec", self.codec.into());
372
373 for (k, v) in &self.user_metadata {
374 metadata.insert(k.as_str(), v.clone());
375 }
376
377 let mut header = Vec::new();
378 header.extend_from_slice(AVRO_OBJECT_HEADER);
379 encode(&metadata.into(), &Schema::map(Schema::Bytes), &mut header)?;
380 header.extend_from_slice(&self.marker);
381
382 Ok(header)
383 }
384
385 fn maybe_write_header(&mut self) -> AvroResult<usize> {
386 if !self.has_header {
387 let header = self.header()?;
388 let n = self.append_bytes(header.as_ref())?;
389 self.has_header = true;
390 Ok(n)
391 } else {
392 Ok(0)
393 }
394 }
395}
396
397fn write_avro_datum<T: Into<Value>>(
403 schema: &Schema,
404 value: T,
405 buffer: &mut Vec<u8>,
406) -> Result<(), Error> {
407 let avro = value.into();
408 if !avro.validate(schema) {
409 return Err(Error::Validation);
410 }
411 encode(&avro, schema, buffer)?;
412 Ok(())
413}
414
415fn write_avro_datum_schemata<T: Into<Value>>(
416 schema: &Schema,
417 schemata: Vec<&Schema>,
418 value: T,
419 buffer: &mut Vec<u8>,
420) -> AvroResult<()> {
421 let avro = value.into();
422 let rs = ResolvedSchema::try_from(schemata)?;
423 let names = rs.get_names();
424 let enclosing_namespace = schema.namespace();
425 if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
426 return Err(Error::Validation);
427 }
428 encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
429}
430
431pub struct GenericSingleObjectWriter {
435 buffer: Vec<u8>,
436 resolved: ResolvedOwnedSchema,
437}
438
439impl GenericSingleObjectWriter {
440 pub fn new_with_capacity(
441 schema: &Schema,
442 initial_buffer_cap: usize,
443 ) -> AvroResult<GenericSingleObjectWriter> {
444 let fingerprint = schema.fingerprint::<Rabin>();
445 let mut buffer = Vec::with_capacity(initial_buffer_cap);
446 let header = [
447 0xC3,
448 0x01,
449 fingerprint.bytes[0],
450 fingerprint.bytes[1],
451 fingerprint.bytes[2],
452 fingerprint.bytes[3],
453 fingerprint.bytes[4],
454 fingerprint.bytes[5],
455 fingerprint.bytes[6],
456 fingerprint.bytes[7],
457 ];
458 buffer.extend_from_slice(&header);
459
460 Ok(GenericSingleObjectWriter {
461 buffer,
462 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
463 })
464 }
465
466 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
468 if self.buffer.len() != 10 {
469 Err(Error::IllegalSingleObjectWriterState)
470 } else {
471 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
472 writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
473 let len = self.buffer.len();
474 self.buffer.truncate(10);
475 Ok(len)
476 }
477 }
478
479 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
481 self.write_value_ref(&v, writer)
482 }
483}
484
485pub struct SpecificSingleObjectWriter<T>
487where
488 T: AvroSchema,
489{
490 inner: GenericSingleObjectWriter,
491 _model: PhantomData<T>,
492}
493
494impl<T> SpecificSingleObjectWriter<T>
495where
496 T: AvroSchema,
497{
498 pub fn with_capacity(buffer_cap: usize) -> AvroResult<SpecificSingleObjectWriter<T>> {
499 let schema = T::get_schema();
500 Ok(SpecificSingleObjectWriter {
501 inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?,
502 _model: PhantomData,
503 })
504 }
505}
506
507impl<T> SpecificSingleObjectWriter<T>
508where
509 T: AvroSchema + Into<Value>,
510{
511 pub fn write_value<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
514 let v: Value = data.into();
515 self.inner.write_value_ref(&v, writer)
516 }
517}
518
519impl<T> SpecificSingleObjectWriter<T>
520where
521 T: AvroSchema + Serialize,
522{
523 pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> AvroResult<usize> {
526 let mut serializer = Serializer::default();
527 let v = data.serialize(&mut serializer)?;
528 self.inner.write_value_ref(&v, writer)
529 }
530
531 pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> AvroResult<usize> {
534 self.write_ref(&data, writer)
535 }
536}
537
538fn write_value_ref_resolved(
539 schema: &Schema,
540 resolved_schema: &ResolvedSchema,
541 value: &Value,
542 buffer: &mut Vec<u8>,
543) -> AvroResult<()> {
544 match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
545 Some(reason) => Err(Error::ValidationWithReason {
546 value: value.clone(),
547 schema: schema.clone(),
548 reason,
549 }),
550 None => encode_internal(
551 value,
552 schema,
553 resolved_schema.get_names(),
554 &schema.namespace(),
555 buffer,
556 ),
557 }
558}
559
560fn write_value_ref_owned_resolved(
561 resolved_schema: &ResolvedOwnedSchema,
562 value: &Value,
563 buffer: &mut Vec<u8>,
564) -> AvroResult<()> {
565 let root_schema = resolved_schema.get_root_schema();
566 if let Some(reason) = value.validate_internal(
567 root_schema,
568 resolved_schema.get_names(),
569 &root_schema.namespace(),
570 ) {
571 return Err(Error::ValidationWithReason {
572 value: value.clone(),
573 schema: root_schema.clone(),
574 reason,
575 });
576 }
577 encode_internal(
578 value,
579 root_schema,
580 resolved_schema.get_names(),
581 &root_schema.namespace(),
582 buffer,
583 )?;
584 Ok(())
585}
586
587pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Vec<u8>> {
594 let mut buffer = Vec::new();
595 write_avro_datum(schema, value, &mut buffer)?;
596 Ok(buffer)
597}
598
599pub fn to_avro_datum_schemata<T: Into<Value>>(
604 schema: &Schema,
605 schemata: Vec<&Schema>,
606 value: T,
607) -> AvroResult<Vec<u8>> {
608 let mut buffer = Vec::new();
609 write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
610 Ok(buffer)
611}
612
613#[cfg(not(target_arch = "wasm32"))]
614fn generate_sync_marker() -> [u8; 16] {
615 let mut marker = [0_u8; 16];
616 std::iter::repeat_with(rand::random)
617 .take(16)
618 .enumerate()
619 .for_each(|(i, n)| marker[i] = n);
620 marker
621}
622
623#[cfg(target_arch = "wasm32")]
624fn generate_sync_marker() -> [u8; 16] {
625 let mut marker = [0_u8; 16];
626 std::iter::repeat_with(quad_rand::rand)
627 .take(4)
628 .flat_map(|i| i.to_be_bytes())
629 .enumerate()
630 .for_each(|(i, n)| marker[i] = n);
631 marker
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637 use crate::{
638 decimal::Decimal,
639 duration::{Days, Duration, Millis, Months},
640 schema::{DecimalSchema, FixedSchema, Name},
641 types::Record,
642 util::zig_i64,
643 Reader,
644 };
645 use pretty_assertions::assert_eq;
646 use serde::{Deserialize, Serialize};
647
648 use apache_avro_test_helper::TestResult;
649
650 const AVRO_OBJECT_HEADER_LEN: usize = AVRO_OBJECT_HEADER.len();
651
652 const SCHEMA: &str = r#"
653 {
654 "type": "record",
655 "name": "test",
656 "fields": [
657 {
658 "name": "a",
659 "type": "long",
660 "default": 42
661 },
662 {
663 "name": "b",
664 "type": "string"
665 }
666 ]
667 }
668 "#;
669 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
670
671 #[test]
672 fn test_to_avro_datum() -> TestResult {
673 let schema = Schema::parse_str(SCHEMA)?;
674 let mut record = Record::new(&schema).unwrap();
675 record.put("a", 27i64);
676 record.put("b", "foo");
677
678 let mut expected = Vec::new();
679 zig_i64(27, &mut expected);
680 zig_i64(3, &mut expected);
681 expected.extend([b'f', b'o', b'o']);
682
683 assert_eq!(to_avro_datum(&schema, record)?, expected);
684
685 Ok(())
686 }
687
688 #[test]
689 fn test_union_not_null() -> TestResult {
690 let schema = Schema::parse_str(UNION_SCHEMA)?;
691 let union = Value::Union(1, Box::new(Value::Long(3)));
692
693 let mut expected = Vec::new();
694 zig_i64(1, &mut expected);
695 zig_i64(3, &mut expected);
696
697 assert_eq!(to_avro_datum(&schema, union)?, expected);
698
699 Ok(())
700 }
701
702 #[test]
703 fn test_union_null() -> TestResult {
704 let schema = Schema::parse_str(UNION_SCHEMA)?;
705 let union = Value::Union(0, Box::new(Value::Null));
706
707 let mut expected = Vec::new();
708 zig_i64(0, &mut expected);
709
710 assert_eq!(to_avro_datum(&schema, union)?, expected);
711
712 Ok(())
713 }
714
715 fn logical_type_test<T: Into<Value> + Clone>(
716 schema_str: &'static str,
717
718 expected_schema: &Schema,
719 value: Value,
720
721 raw_schema: &Schema,
722 raw_value: T,
723 ) -> TestResult {
724 let schema = Schema::parse_str(schema_str)?;
725 assert_eq!(&schema, expected_schema);
726 let ser = to_avro_datum(&schema, value.clone())?;
728 let raw_ser = to_avro_datum(raw_schema, raw_value)?;
729 assert_eq!(ser, raw_ser);
730
731 let mut r = ser.as_slice();
733 let de = crate::from_avro_datum(&schema, &mut r, None)?;
734 assert_eq!(de, value);
735 Ok(())
736 }
737
738 #[test]
739 fn date() -> TestResult {
740 logical_type_test(
741 r#"{"type": "int", "logicalType": "date"}"#,
742 &Schema::Date,
743 Value::Date(1_i32),
744 &Schema::Int,
745 1_i32,
746 )
747 }
748
749 #[test]
750 fn time_millis() -> TestResult {
751 logical_type_test(
752 r#"{"type": "int", "logicalType": "time-millis"}"#,
753 &Schema::TimeMillis,
754 Value::TimeMillis(1_i32),
755 &Schema::Int,
756 1_i32,
757 )
758 }
759
760 #[test]
761 fn time_micros() -> TestResult {
762 logical_type_test(
763 r#"{"type": "long", "logicalType": "time-micros"}"#,
764 &Schema::TimeMicros,
765 Value::TimeMicros(1_i64),
766 &Schema::Long,
767 1_i64,
768 )
769 }
770
771 #[test]
772 fn timestamp_millis() -> TestResult {
773 logical_type_test(
774 r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
775 &Schema::TimestampMillis,
776 Value::TimestampMillis(1_i64),
777 &Schema::Long,
778 1_i64,
779 )
780 }
781
782 #[test]
783 fn timestamp_micros() -> TestResult {
784 logical_type_test(
785 r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
786 &Schema::TimestampMicros,
787 Value::TimestampMicros(1_i64),
788 &Schema::Long,
789 1_i64,
790 )
791 }
792
793 #[test]
794 fn decimal_fixed() -> TestResult {
795 let size = 30;
796 let inner = Schema::Fixed(FixedSchema {
797 name: Name::new("decimal")?,
798 aliases: None,
799 doc: None,
800 size,
801 default: None,
802 attributes: Default::default(),
803 });
804 let value = vec![0u8; size];
805 logical_type_test(
806 r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"}, "logicalType": "decimal", "precision": 20, "scale": 5}"#,
807 &Schema::Decimal(DecimalSchema {
808 precision: 20,
809 scale: 5,
810 inner: Box::new(inner.clone()),
811 }),
812 Value::Decimal(Decimal::from(value.clone())),
813 &inner,
814 Value::Fixed(size, value),
815 )
816 }
817
818 #[test]
819 fn decimal_bytes() -> TestResult {
820 let inner = Schema::Bytes;
821 let value = vec![0u8; 10];
822 logical_type_test(
823 r#"{"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 3}"#,
824 &Schema::Decimal(DecimalSchema {
825 precision: 4,
826 scale: 3,
827 inner: Box::new(inner.clone()),
828 }),
829 Value::Decimal(Decimal::from(value.clone())),
830 &inner,
831 value,
832 )
833 }
834
835 #[test]
836 fn duration() -> TestResult {
837 let inner = Schema::Fixed(FixedSchema {
838 name: Name::new("duration")?,
839 aliases: None,
840 doc: None,
841 size: 12,
842 default: None,
843 attributes: Default::default(),
844 });
845 let value = Value::Duration(Duration::new(
846 Months::new(256),
847 Days::new(512),
848 Millis::new(1024),
849 ));
850 logical_type_test(
851 r#"{"type": {"type": "fixed", "name": "duration", "size": 12}, "logicalType": "duration"}"#,
852 &Schema::Duration,
853 value,
854 &inner,
855 Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
856 )
857 }
858
859 #[test]
860 fn test_writer_append() -> TestResult {
861 let schema = Schema::parse_str(SCHEMA)?;
862 let mut writer = Writer::new(&schema, Vec::new());
863
864 let mut record = Record::new(&schema).unwrap();
865 record.put("a", 27i64);
866 record.put("b", "foo");
867
868 let n1 = writer.append(record.clone())?;
869 let n2 = writer.append(record.clone())?;
870 let n3 = writer.flush()?;
871 let result = writer.into_inner()?;
872
873 assert_eq!(n1 + n2 + n3, result.len());
874
875 let mut data = Vec::new();
876 zig_i64(27, &mut data);
877 zig_i64(3, &mut data);
878 data.extend(b"foo");
879 data.extend(data.clone());
880
881 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
883 let last_data_byte = result.len() - 16;
885 assert_eq!(
886 &result[last_data_byte - data.len()..last_data_byte],
887 data.as_slice()
888 );
889
890 Ok(())
891 }
892
893 #[test]
894 fn test_writer_extend() -> TestResult {
895 let schema = Schema::parse_str(SCHEMA)?;
896 let mut writer = Writer::new(&schema, Vec::new());
897
898 let mut record = Record::new(&schema).unwrap();
899 record.put("a", 27i64);
900 record.put("b", "foo");
901 let record_copy = record.clone();
902 let records = vec![record, record_copy];
903
904 let n1 = writer.extend(records)?;
905 let n2 = writer.flush()?;
906 let result = writer.into_inner()?;
907
908 assert_eq!(n1 + n2, result.len());
909
910 let mut data = Vec::new();
911 zig_i64(27, &mut data);
912 zig_i64(3, &mut data);
913 data.extend(b"foo");
914 data.extend(data.clone());
915
916 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
918 let last_data_byte = result.len() - 16;
920 assert_eq!(
921 &result[last_data_byte - data.len()..last_data_byte],
922 data.as_slice()
923 );
924
925 Ok(())
926 }
927
928 #[derive(Debug, Clone, Deserialize, Serialize)]
929 struct TestSerdeSerialize {
930 a: i64,
931 b: String,
932 }
933
934 #[test]
935 fn test_writer_append_ser() -> TestResult {
936 let schema = Schema::parse_str(SCHEMA)?;
937 let mut writer = Writer::new(&schema, Vec::new());
938
939 let record = TestSerdeSerialize {
940 a: 27,
941 b: "foo".to_owned(),
942 };
943
944 let n1 = writer.append_ser(record)?;
945 let n2 = writer.flush()?;
946 let result = writer.into_inner()?;
947
948 assert_eq!(n1 + n2, result.len());
949
950 let mut data = Vec::new();
951 zig_i64(27, &mut data);
952 zig_i64(3, &mut data);
953 data.extend(b"foo");
954
955 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
957 let last_data_byte = result.len() - 16;
959 assert_eq!(
960 &result[last_data_byte - data.len()..last_data_byte],
961 data.as_slice()
962 );
963
964 Ok(())
965 }
966
967 #[test]
968 fn test_writer_extend_ser() -> TestResult {
969 let schema = Schema::parse_str(SCHEMA)?;
970 let mut writer = Writer::new(&schema, Vec::new());
971
972 let record = TestSerdeSerialize {
973 a: 27,
974 b: "foo".to_owned(),
975 };
976 let record_copy = record.clone();
977 let records = vec![record, record_copy];
978
979 let n1 = writer.extend_ser(records)?;
980 let n2 = writer.flush()?;
981 let result = writer.into_inner()?;
982
983 assert_eq!(n1 + n2, result.len());
984
985 let mut data = Vec::new();
986 zig_i64(27, &mut data);
987 zig_i64(3, &mut data);
988 data.extend(b"foo");
989 data.extend(data.clone());
990
991 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
993 let last_data_byte = result.len() - 16;
995 assert_eq!(
996 &result[last_data_byte - data.len()..last_data_byte],
997 data.as_slice()
998 );
999
1000 Ok(())
1001 }
1002
1003 fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
1004 Writer::with_codec(schema, Vec::new(), Codec::Deflate)
1005 }
1006
1007 fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
1008 Writer::builder()
1009 .writer(Vec::new())
1010 .schema(schema)
1011 .codec(Codec::Deflate)
1012 .block_size(100)
1013 .build()
1014 }
1015
1016 fn check_writer(mut writer: Writer<'_, Vec<u8>>, schema: &Schema) -> TestResult {
1017 let mut record = Record::new(schema).unwrap();
1018 record.put("a", 27i64);
1019 record.put("b", "foo");
1020
1021 let n1 = writer.append(record.clone())?;
1022 let n2 = writer.append(record.clone())?;
1023 let n3 = writer.flush()?;
1024 let result = writer.into_inner()?;
1025
1026 assert_eq!(n1 + n2 + n3, result.len());
1027
1028 let mut data = Vec::new();
1029 zig_i64(27, &mut data);
1030 zig_i64(3, &mut data);
1031 data.extend(b"foo");
1032 data.extend(data.clone());
1033 Codec::Deflate.compress(&mut data)?;
1034
1035 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1037 let last_data_byte = result.len() - 16;
1039 assert_eq!(
1040 &result[last_data_byte - data.len()..last_data_byte],
1041 data.as_slice()
1042 );
1043
1044 Ok(())
1045 }
1046
1047 #[test]
1048 fn test_writer_with_codec() -> TestResult {
1049 let schema = Schema::parse_str(SCHEMA)?;
1050 let writer = make_writer_with_codec(&schema);
1051 check_writer(writer, &schema)
1052 }
1053
1054 #[test]
1055 fn test_writer_with_builder() -> TestResult {
1056 let schema = Schema::parse_str(SCHEMA)?;
1057 let writer = make_writer_with_builder(&schema);
1058 check_writer(writer, &schema)
1059 }
1060
1061 #[test]
1062 fn test_logical_writer() -> TestResult {
1063 const LOGICAL_TYPE_SCHEMA: &str = r#"
1064 {
1065 "type": "record",
1066 "name": "logical_type_test",
1067 "fields": [
1068 {
1069 "name": "a",
1070 "type": [
1071 "null",
1072 {
1073 "type": "long",
1074 "logicalType": "timestamp-micros"
1075 }
1076 ]
1077 }
1078 ]
1079 }
1080 "#;
1081 let codec = Codec::Deflate;
1082 let schema = Schema::parse_str(LOGICAL_TYPE_SCHEMA)?;
1083 let mut writer = Writer::builder()
1084 .schema(&schema)
1085 .codec(codec)
1086 .writer(Vec::new())
1087 .build();
1088
1089 let mut record1 = Record::new(&schema).unwrap();
1090 record1.put(
1091 "a",
1092 Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
1093 );
1094
1095 let mut record2 = Record::new(&schema).unwrap();
1096 record2.put("a", Value::Union(0, Box::new(Value::Null)));
1097
1098 let n1 = writer.append(record1)?;
1099 let n2 = writer.append(record2)?;
1100 let n3 = writer.flush()?;
1101 let result = writer.into_inner()?;
1102
1103 assert_eq!(n1 + n2 + n3, result.len());
1104
1105 let mut data = Vec::new();
1106 zig_i64(1, &mut data);
1108 zig_i64(1234, &mut data);
1109
1110 zig_i64(0, &mut data);
1112 codec.compress(&mut data)?;
1113
1114 assert_eq!(&result[..AVRO_OBJECT_HEADER_LEN], AVRO_OBJECT_HEADER);
1116 let last_data_byte = result.len() - 16;
1118 assert_eq!(
1119 &result[last_data_byte - data.len()..last_data_byte],
1120 data.as_slice()
1121 );
1122
1123 Ok(())
1124 }
1125
1126 #[test]
1127 fn test_avro_3405_writer_add_metadata_success() -> TestResult {
1128 let schema = Schema::parse_str(SCHEMA)?;
1129 let mut writer = Writer::new(&schema, Vec::new());
1130
1131 writer.add_user_metadata("stringKey".to_string(), String::from("stringValue"))?;
1132 writer.add_user_metadata("strKey".to_string(), "strValue")?;
1133 writer.add_user_metadata("bytesKey".to_string(), b"bytesValue")?;
1134 writer.add_user_metadata("vecKey".to_string(), vec![1, 2, 3])?;
1135
1136 let mut record = Record::new(&schema).unwrap();
1137 record.put("a", 27i64);
1138 record.put("b", "foo");
1139
1140 writer.append(record.clone())?;
1141 writer.append(record.clone())?;
1142 writer.flush()?;
1143 let result = writer.into_inner()?;
1144
1145 assert_eq!(result.len(), 260);
1146
1147 Ok(())
1148 }
1149
1150 #[test]
1151 fn test_avro_3881_metadata_empty_body() -> TestResult {
1152 let schema = Schema::parse_str(SCHEMA)?;
1153 let mut writer = Writer::new(&schema, Vec::new());
1154 writer.add_user_metadata("a".to_string(), "b")?;
1155 let result = writer.into_inner()?;
1156
1157 let reader = Reader::with_schema(&schema, &result[..])?;
1158 let mut expected = HashMap::new();
1159 expected.insert("a".to_string(), vec![b'b']);
1160 assert_eq!(reader.user_metadata(), &expected);
1161 assert_eq!(reader.into_iter().count(), 0);
1162
1163 Ok(())
1164 }
1165
1166 #[test]
1167 fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
1168 let schema = Schema::parse_str(SCHEMA)?;
1169 let mut writer = Writer::new(&schema, Vec::new());
1170
1171 let mut record = Record::new(&schema).unwrap();
1172 record.put("a", 27i64);
1173 record.put("b", "foo");
1174 writer.append(record.clone())?;
1175
1176 match writer.add_user_metadata("stringKey".to_string(), String::from("value2")) {
1177 Err(e @ Error::FileHeaderAlreadyWritten) => {
1178 assert_eq!(e.to_string(), "The file metadata is already flushed.")
1179 }
1180 Err(e) => panic!("Unexpected error occurred while writing user metadata: {e:?}"),
1181 Ok(_) => panic!("Expected an error that metadata cannot be added after adding data"),
1182 }
1183
1184 Ok(())
1185 }
1186
1187 #[test]
1188 fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() -> TestResult {
1189 let schema = Schema::parse_str(SCHEMA)?;
1190 let mut writer = Writer::new(&schema, Vec::new());
1191
1192 let key = "avro.stringKey".to_string();
1193 match writer.add_user_metadata(key.clone(), "value") {
1194 Err(ref e @ Error::InvalidMetadataKey(_)) => {
1195 assert_eq!(e.to_string(), format!("Metadata keys starting with 'avro.' are reserved for internal usage: {key}."))
1196 }
1197 Err(e) => panic!(
1198 "Unexpected error occurred while writing user metadata with reserved prefix ('avro.'): {e:?}"
1199 ),
1200 Ok(_) => panic!("Expected an error that the metadata key cannot be prefixed with 'avro.'"),
1201 }
1202
1203 Ok(())
1204 }
1205
1206 #[test]
1207 fn test_avro_3405_writer_add_metadata_with_builder_api_success() -> TestResult {
1208 let schema = Schema::parse_str(SCHEMA)?;
1209
1210 let mut user_meta_data: HashMap<String, Value> = HashMap::new();
1211 user_meta_data.insert(
1212 "stringKey".to_string(),
1213 Value::String("stringValue".to_string()),
1214 );
1215 user_meta_data.insert("bytesKey".to_string(), Value::Bytes(b"bytesValue".to_vec()));
1216 user_meta_data.insert("vecKey".to_string(), Value::Bytes(vec![1, 2, 3]));
1217
1218 let writer: Writer<'_, Vec<u8>> = Writer::builder()
1219 .writer(Vec::new())
1220 .schema(&schema)
1221 .user_metadata(user_meta_data.clone())
1222 .build();
1223
1224 assert_eq!(writer.user_metadata, user_meta_data);
1225
1226 Ok(())
1227 }
1228
1229 #[derive(Serialize, Clone)]
1230 struct TestSingleObjectWriter {
1231 a: i64,
1232 b: f64,
1233 c: Vec<String>,
1234 }
1235
1236 impl AvroSchema for TestSingleObjectWriter {
1237 fn get_schema() -> Schema {
1238 let schema = r#"
1239 {
1240 "type":"record",
1241 "name":"TestSingleObjectWrtierSerialize",
1242 "fields":[
1243 {
1244 "name":"a",
1245 "type":"long"
1246 },
1247 {
1248 "name":"b",
1249 "type":"double"
1250 },
1251 {
1252 "name":"c",
1253 "type":{
1254 "type":"array",
1255 "items":"string"
1256 }
1257 }
1258 ]
1259 }
1260 "#;
1261 Schema::parse_str(schema).unwrap()
1262 }
1263 }
1264
1265 impl From<TestSingleObjectWriter> for Value {
1266 fn from(obj: TestSingleObjectWriter) -> Value {
1267 Value::Record(vec![
1268 ("a".into(), obj.a.into()),
1269 ("b".into(), obj.b.into()),
1270 (
1271 "c".into(),
1272 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
1273 ),
1274 ])
1275 }
1276 }
1277
1278 #[test]
1279 fn test_single_object_writer() -> TestResult {
1280 let mut buf: Vec<u8> = Vec::new();
1281 let obj = TestSingleObjectWriter {
1282 a: 300,
1283 b: 34.555,
1284 c: vec!["cat".into(), "dog".into()],
1285 };
1286 let mut writer = GenericSingleObjectWriter::new_with_capacity(
1287 &TestSingleObjectWriter::get_schema(),
1288 1024,
1289 )
1290 .expect("Should resolve schema");
1291 let value = obj.into();
1292 let written_bytes = writer
1293 .write_value_ref(&value, &mut buf)
1294 .expect("Error serializing properly");
1295
1296 assert!(buf.len() > 10, "no bytes written");
1297 assert_eq!(buf.len(), written_bytes);
1298 assert_eq!(buf[0], 0xC3);
1299 assert_eq!(buf[1], 0x01);
1300 assert_eq!(
1301 &buf[2..10],
1302 &TestSingleObjectWriter::get_schema()
1303 .fingerprint::<Rabin>()
1304 .bytes[..]
1305 );
1306 let mut msg_binary = Vec::new();
1307 encode(
1308 &value,
1309 &TestSingleObjectWriter::get_schema(),
1310 &mut msg_binary,
1311 )
1312 .expect("encode should have failed by here as a dependency of any writing");
1313 assert_eq!(&buf[10..], &msg_binary[..]);
1314
1315 Ok(())
1316 }
1317
1318 #[test]
1319 fn test_writer_parity() -> TestResult {
1320 let obj1 = TestSingleObjectWriter {
1321 a: 300,
1322 b: 34.555,
1323 c: vec!["cat".into(), "dog".into()],
1324 };
1325
1326 let mut buf1: Vec<u8> = Vec::new();
1327 let mut buf2: Vec<u8> = Vec::new();
1328 let mut buf3: Vec<u8> = Vec::new();
1329
1330 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
1331 &TestSingleObjectWriter::get_schema(),
1332 1024,
1333 )
1334 .expect("Should resolve schema");
1335 let mut specific_writer =
1336 SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
1337 .expect("Resolved should pass");
1338 specific_writer
1339 .write(obj1.clone(), &mut buf1)
1340 .expect("Serialization expected");
1341 specific_writer
1342 .write_value(obj1.clone(), &mut buf2)
1343 .expect("Serialization expected");
1344 generic_writer
1345 .write_value(obj1.into(), &mut buf3)
1346 .expect("Serialization expected");
1347 assert_eq!(buf1, buf2);
1348 assert_eq!(buf1, buf3);
1349
1350 Ok(())
1351 }
1352
1353 #[test]
1354 fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
1355 const SCHEMA: &str = r#"
1356 {
1357 "type": "record",
1358 "name": "Conference",
1359 "fields": [
1360 {"type": "string", "name": "name"},
1361 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1362 ]
1363 }"#;
1364
1365 #[derive(Debug, PartialEq, Eq, Clone, Serialize)]
1366 pub struct Conference {
1367 pub name: String,
1368 pub time: Option<i64>,
1369 }
1370
1371 let conf = Conference {
1372 name: "RustConf".to_string(),
1373 time: Some(1234567890),
1374 };
1375
1376 let schema = Schema::parse_str(SCHEMA)?;
1377 let mut writer = Writer::new(&schema, Vec::new());
1378
1379 let bytes = writer.append_ser(conf)?;
1380
1381 assert_eq!(198, bytes);
1382
1383 Ok(())
1384 }
1385
1386 #[test]
1387 fn avro_4014_validation_returns_a_detailed_error() -> TestResult {
1388 const SCHEMA: &str = r#"
1389 {
1390 "type": "record",
1391 "name": "Conference",
1392 "fields": [
1393 {"type": "string", "name": "name"},
1394 {"type": ["null", "long"], "name": "date", "aliases" : [ "time2", "time" ]}
1395 ]
1396 }"#;
1397
1398 #[derive(Debug, PartialEq, Clone, Serialize)]
1399 pub struct Conference {
1400 pub name: String,
1401 pub time: Option<f64>, }
1403
1404 let conf = Conference {
1405 name: "RustConf".to_string(),
1406 time: Some(12345678.90),
1407 };
1408
1409 let schema = Schema::parse_str(SCHEMA)?;
1410 let mut writer = Writer::new(&schema, Vec::new());
1411
1412 match writer.append_ser(conf) {
1413 Ok(bytes) => panic!("Expected an error, but got {} bytes written", bytes),
1414 Err(e) => {
1415 assert_eq!(
1416 e.to_string(),
1417 r#"Value Record([("name", String("RustConf")), ("time", Union(1, Double(12345678.9)))]) does not match schema Record(RecordSchema { name: Name { name: "Conference", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "name", doc: None, aliases: None, default: None, schema: String, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "date", doc: None, aliases: Some(["time2", "time"]), default: None, schema: Union(UnionSchema { schemas: [Null, Long], variant_index: {Null: 0, Long: 1} }), order: Ascending, position: 1, custom_attributes: {} }], lookup: {"date": 1, "name": 0, "time": 1, "time2": 1}, attributes: {} }): Reason: Unsupported value-schema combination! Value: Double(12345678.9), schema: Long"#
1418 );
1419 }
1420 }
1421 Ok(())
1422 }
1423}