1use std::fmt::Debug;
2
3use buffertk::{stack_pack, v64, Packable};
4use prototk::field_types::*;
5use prototk::{FieldNumber, Tag, WireType};
6use sst::KeyValueRef;
7use tuple_key::{Element, KeyDataType, TupleKey, TupleKeyIterator};
8use zerror::Z;
9use zerror_core::ErrorCore;
10
11use super::{DataType, Error, IoToZ, Schema, SchemaEntry};
12
13#[derive(Copy, Clone, Debug)]
17enum MessageFrame {
18 Begin {
19 offset: usize,
20 tag_sz: usize,
21 },
22 BeginMapWithMessage {
23 offset: usize,
24 tag_sz: usize,
25 key_offset: usize,
26 },
27 End {
28 offset: usize,
29 begin: usize,
30 },
31}
32
33struct MessageFrameWrapper {
36 index: usize,
37}
38
39#[derive(Debug, Default)]
42struct ProtoBuilder {
43 msg: Vec<u8>,
44 frames: Vec<MessageFrame>,
45}
46
47impl ProtoBuilder {
48 fn begin_message(&mut self, tag: Tag, buffers: &[&[u8]]) -> MessageFrameWrapper {
49 let begin = MessageFrame::Begin {
50 offset: self.msg.len(),
51 tag_sz: tag.pack_sz(),
52 };
53 let wrap = MessageFrameWrapper {
54 index: self.frames.len(),
55 };
56 self.frames.push(begin);
57 stack_pack(tag).append_to_vec(&mut self.msg);
58 for _ in 0..8 {
59 self.msg.push(0);
60 }
61 for buffer in buffers {
62 self.msg.extend_from_slice(buffer)
63 }
64 wrap
65 }
66
67 fn begin_map_with_message(&mut self, tag: Tag, buffers: &[&[u8]]) -> MessageFrameWrapper {
68 let msg_len = self.msg.len();
69 stack_pack(&tag).append_to_vec(&mut self.msg);
70 for _ in 0..8 {
71 self.msg.push(0);
72 }
73 for buffer in buffers {
74 self.msg.extend_from_slice(buffer)
75 }
76 let begin = MessageFrame::BeginMapWithMessage {
77 offset: msg_len,
78 tag_sz: tag.pack_sz(),
79 key_offset: self.msg.len(),
80 };
81 let wrap = MessageFrameWrapper {
82 index: self.frames.len(),
83 };
84 self.frames.push(begin);
85 for _ in 0..8 {
86 self.msg.push(0);
87 }
88 wrap
89 }
90
91 fn end_message(&mut self, wrapper: MessageFrameWrapper) {
92 let end = MessageFrame::End {
93 offset: self.msg.len(),
94 begin: wrapper.index,
95 };
96 self.frames.push(end);
97 }
98
99 fn emit_inline(&mut self, value: &[u8]) {
100 self.msg.extend_from_slice(value);
101 }
102
103 fn emit_breakout(&mut self, tag: Tag, value: &[u8]) {
104 stack_pack(&tag).append_to_vec(&mut self.msg);
105 self.msg.extend_from_slice(value);
106 }
107
108 fn shift_frame(
109 &mut self,
110 offset_of_u64: usize,
111 in_progress_offset: usize,
112 msg_sz: usize,
113 bytes_dropped: usize,
114 ) -> Result<usize, Error> {
115 let post_u64_offset = offset_of_u64 + 8;
116 if post_u64_offset > in_progress_offset {
117 return Err(Error::LogicError {
118 core: ErrorCore::default(),
119 what: "offset_too_small".to_owned(),
120 })
121 .as_z()
122 .with_info("post_u64_offset", post_u64_offset)
123 .with_info("in_progress_offset", in_progress_offset);
124 }
125 let msg_sz_v64 = v64::from(msg_sz);
126 let msg_sz_v64_pack_sz = msg_sz_v64.pack_sz();
127 if msg_sz_v64_pack_sz > 8 {
128 return Err(Error::LogicError {
129 core: ErrorCore::default(),
130 what: "offset_too_small".to_owned(),
131 })
132 .as_z()
133 .with_info("msg_sz_v64_pack_sz", msg_sz_v64_pack_sz);
134 }
135 let newly_dropped_bytes = 8 - msg_sz_v64_pack_sz;
136 for src in (post_u64_offset..in_progress_offset).rev() {
137 let dst = src + bytes_dropped;
138 self.msg[dst] = self.msg[src];
139 }
140 let length_start = offset_of_u64 + bytes_dropped + newly_dropped_bytes;
141 let length_slice = &mut self.msg[length_start..length_start + msg_sz_v64_pack_sz];
142 stack_pack(msg_sz_v64).into_slice(length_slice);
143 Ok(newly_dropped_bytes)
144 }
145
146 fn seal(mut self) -> Result<Vec<u8>, Error> {
147 let mut in_progress = Vec::new();
148 let mut bytes_dropped = 0;
149 while !self.frames.is_empty() {
150 let frame_idx = self.frames.len() - 1;
151 let back = self.frames[frame_idx];
152 match back {
153 MessageFrame::Begin {
154 offset: begin_offset,
155 tag_sz,
156 } => {
157 if in_progress.is_empty() {
158 return Err(Error::LogicError {
159 core: ErrorCore::default(),
160 what: "in_progress was empty".to_owned(),
161 });
162 }
163 let (in_progress_offset, in_progress_idx) = in_progress.pop().unwrap();
164 if in_progress_idx != frame_idx {
165 return Err(Error::LogicError {
166 core: ErrorCore::default(),
167 what: "index miscalculation".to_owned(),
168 })
169 .as_z()
170 .with_info("in_progress_idx", in_progress_idx)
171 .with_info("frame_idx", frame_idx);
172 }
173 let msg_sz = in_progress_offset - begin_offset - tag_sz - 8;
174 let newly_dropped_bytes = self.shift_frame(
175 begin_offset + tag_sz,
176 in_progress_offset,
177 msg_sz,
178 bytes_dropped,
179 )?;
180 for tag_byte in (begin_offset..begin_offset + tag_sz).rev() {
181 self.msg[tag_byte + newly_dropped_bytes] = self.msg[tag_byte];
182 }
183 bytes_dropped += newly_dropped_bytes;
184 self.frames.pop();
185 }
186 MessageFrame::BeginMapWithMessage {
187 offset: begin_offset,
188 tag_sz,
189 key_offset,
190 } => {
191 if in_progress.is_empty() {
192 return Err(Error::LogicError {
193 core: ErrorCore::default(),
194 what: "in_progress was empty".to_owned(),
195 });
196 }
197 let (in_progress_offset, in_progress_idx) = in_progress.pop().unwrap();
198 if in_progress_idx != frame_idx {
199 return Err(Error::LogicError {
200 core: ErrorCore::default(),
201 what: "index miscalculation".to_owned(),
202 })
203 .as_z()
204 .with_info("in_progress_idx", in_progress_idx)
205 .with_info("frame_idx", frame_idx);
206 }
207 let msg_sz = in_progress_offset - key_offset - 8;
208 let first_dropped_bytes =
209 self.shift_frame(key_offset, in_progress_offset, msg_sz, bytes_dropped)?;
210 bytes_dropped += first_dropped_bytes;
211 let msg_sz =
212 in_progress_offset - begin_offset - tag_sz - 16 + (8 - first_dropped_bytes);
213 let second_dropped_bytes =
214 self.shift_frame(begin_offset + tag_sz, key_offset, msg_sz, bytes_dropped)?;
215 bytes_dropped += second_dropped_bytes;
216 for tag_byte in (begin_offset..begin_offset + tag_sz).rev() {
217 self.msg[tag_byte + bytes_dropped] = self.msg[tag_byte];
218 }
219 self.frames.pop();
220 }
221 MessageFrame::End { offset, begin } => {
222 in_progress.push((offset, begin));
223 self.frames.pop();
224 }
225 }
226 }
227 for i in 0..self.msg.len() - bytes_dropped {
228 self.msg[i] = self.msg[i + bytes_dropped];
229 }
230 self.msg.truncate(self.msg.len() - bytes_dropped);
231 Ok(self.msg)
232 }
233}
234
235pub fn parse_as_prototk(val: &[u8], ty: KeyDataType) -> Result<Vec<u8>, &'static str> {
238 match ty {
239 KeyDataType::unit => Ok(Vec::new()),
240 KeyDataType::fixed32 => {
241 Ok(stack_pack(fixed32(<u32 as Element>::parse_from(val)?)).to_vec())
242 }
243 KeyDataType::fixed64 => {
244 Ok(stack_pack(fixed64(<u64 as Element>::parse_from(val)?)).to_vec())
245 }
246 KeyDataType::sfixed32 => {
247 Ok(stack_pack(sfixed32(<i32 as Element>::parse_from(val)?)).to_vec())
248 }
249 KeyDataType::sfixed64 => {
250 Ok(stack_pack(sfixed64(<i64 as Element>::parse_from(val)?)).to_vec())
251 }
252 KeyDataType::string => {
253 Ok(stack_pack(string(&<String as Element>::parse_from(val)?)).to_vec())
254 }
255 }
256}
257
258pub struct ObjectBuilder {
261 schema: Schema,
262 builder: ProtoBuilder,
263 current_type: SchemaEntry,
264 frame_stack: Vec<Option<MessageFrameWrapper>>,
265 prev_tuple_key: TupleKey,
266}
267
268impl ObjectBuilder {
269 pub fn new(schema: Schema) -> Self {
270 let builder = ProtoBuilder::default();
271 let current_type = SchemaEntry::default();
272 let frame_stack = vec![];
273 let prev_tuple_key = TupleKey::default();
274 Self {
275 schema,
276 builder,
277 current_type,
278 frame_stack,
279 prev_tuple_key,
280 }
281 }
282
283 pub fn next(&mut self, kvr: KeyValueRef) -> Result<(), Error> {
284 if kvr.value.is_none() {
285 return Ok(());
287 }
288 let schema_entry = match self.schema.lookup_schema_for_key(kvr.key)? {
289 Some(schema_entry) => schema_entry,
290 None => {
291 return Ok(());
293 }
294 };
295 let common = TupleKeyIterator::number_of_elements_in_common_prefix(
296 self.prev_tuple_key.iter(),
297 TupleKeyIterator::from(kvr.key),
298 );
299 self.prev_tuple_key = TupleKey::from(kvr.key);
300 while !self.current_type.is_extendable_by(schema_entry)
301 || self.current_type.key().elements().len() > common
302 {
303 self.current_type.pop_field();
304 if let Some(frame) = self.frame_stack.pop().unwrap() {
306 self.builder.end_message(frame);
307 }
308 assert_eq!(
311 self.current_type.key().elements().len(),
312 self.frame_stack.len()
313 );
314 }
315 let mut tki = TupleKeyIterator::from(kvr.key);
317 for _ in 0..2 * self.current_type.key().elements().len() {
318 tki.next().ok_or(Error::Corruption {
319 core: ErrorCore::default(),
320 what: "tuple key exhausted".to_owned(),
321 })?;
322 }
323 while !schema_entry.is_extendable_by(&self.current_type) {
324 assert!(self.current_type.is_extendable_by(schema_entry));
326 let ct_sz = self.current_type.key().elements().len();
328 let st_sz = schema_entry.key().elements().len();
329 assert!(ct_sz < st_sz);
330 let next_field = &schema_entry.key().elements()[ct_sz];
331 let (value_ty, terminal) = if ct_sz + 1 >= st_sz {
332 (schema_entry.value(), true)
333 } else {
334 (DataType::message, false)
335 };
336 let _ = tki.next().ok_or(Error::Corruption {
337 core: ErrorCore::default(),
338 what: "tuple key exhausted".to_owned(),
339 })?;
340 let tk_key = tki.next().ok_or(Error::Corruption {
341 core: ErrorCore::default(),
342 what: "tuple key exhausted".to_owned(),
343 })?;
344 let tk_key =
345 parse_as_prototk(tk_key, next_field.ty()).map_err(|err| Error::Corruption {
346 core: ErrorCore::default(),
347 what: err.to_string(),
348 })?;
349 self.current_type.push_field(next_field.clone(), value_ty);
350 match (next_field.ty(), value_ty) {
351 (KeyDataType::unit, DataType::message) => {
352 let msg_tag = Tag {
353 field_number: next_field.number(),
354 wire_type: value_ty.wire_type(),
355 };
356 self.frame_stack
357 .push(Some(self.builder.begin_message(msg_tag, &[])));
358 if terminal {
359 self.builder.emit_inline(kvr.value.unwrap());
360 }
361 }
362 (KeyDataType::unit, _) => {
363 let unit_tag = Tag {
364 field_number: next_field.number(),
365 wire_type: value_ty.wire_type(),
366 };
367 self.frame_stack.push(None);
368 if terminal {
369 self.builder.emit_breakout(unit_tag, kvr.value.unwrap());
370 }
371 }
372 (_, _) => {
373 let map_tag = Tag {
374 field_number: next_field.number(),
375 wire_type: WireType::LengthDelimited,
376 };
377 let key_tag: &[u8] = &stack_pack(Tag {
378 field_number: FieldNumber::must(1),
379 wire_type: next_field.ty().wire_type(),
380 })
381 .to_vec();
382 let value_tag: &[u8] = &stack_pack(Tag {
383 field_number: FieldNumber::must(2),
384 wire_type: value_ty.wire_type(),
385 })
386 .to_vec();
387 self.frame_stack.push(Some(
388 self.builder
389 .begin_map_with_message(map_tag, &[key_tag, &tk_key, value_tag]),
390 ));
391 if terminal {
392 self.builder.emit_inline(kvr.value.unwrap());
393 }
394 }
395 }
396 }
397 Ok(())
398 }
399
400 pub fn seal(mut self) -> Result<Vec<u8>, Error> {
401 while !self.frame_stack.is_empty() {
402 if let Some(frame) = self.frame_stack.pop().unwrap() {
404 self.builder.end_message(frame);
405 }
406 }
407 self.builder.seal()
408 }
409}
410
411#[cfg(test)]
414mod tests {
415 #[test]
416 fn default() {
417 ProtoBuilder::default();
418 }
419
420 #[test]
421 fn single_scalar_terminal() {
422 let mut pb = ProtoBuilder::default();
423 let tag = Tag {
425 field_number: FieldNumber::must(1),
426 wire_type: WireType::Varint,
427 };
428 pb.emit_breakout(tag, &[42]);
429 let msg: &[u8] = &pb.seal().unwrap();
431 assert_eq!(&[8, 42], &msg);
432 }
433
434 #[test]
435 fn single_message_terminal() {
436 let mut pb = ProtoBuilder::default();
437 let tag = Tag {
439 field_number: FieldNumber::must(1),
440 wire_type: WireType::LengthDelimited,
441 };
442 let begin = pb.begin_message(tag, &[&[8u8, 42]]);
443 pb.end_message(begin);
444 let msg: &[u8] = &pb.seal().unwrap();
446 assert_eq!(&[10, 2, 8, 42], &msg);
447 }
448
449 #[test]
450 fn scalar_within_message() {
451 let mut pb = ProtoBuilder::default();
452 let tag = Tag {
454 field_number: FieldNumber::must(1),
455 wire_type: WireType::LengthDelimited,
456 };
457 let begin = pb.begin_message(tag, &[]);
458 let tag = Tag {
460 field_number: FieldNumber::must(2),
461 wire_type: WireType::Varint,
462 };
463 pb.emit_breakout(tag, &[42]);
464 pb.end_message(begin);
466 let msg: &[u8] = &pb.seal().unwrap();
467 assert_eq!(&[10, 2, 16, 42], &msg);
468 }
469
470 #[test]
471 fn message_within_message1() {
472 let mut pb = ProtoBuilder::default();
473 let tag = Tag {
475 field_number: FieldNumber::must(1),
476 wire_type: WireType::LengthDelimited,
477 };
478 let begin = pb.begin_message(tag, &[]);
479 pb.emit_inline(&[8, 42]);
481 pb.end_message(begin);
483 let msg: &[u8] = &pb.seal().unwrap();
484 assert_eq!(&[10, 2, 8, 42], &msg);
485 }
486
487 #[test]
488 fn message_within_message2() {
489 let mut pb = ProtoBuilder::default();
490 let tag = Tag {
492 field_number: FieldNumber::must(1),
493 wire_type: WireType::LengthDelimited,
494 };
495 let begin = pb.begin_message(tag, &[&[8u8, 42]]);
496 pb.end_message(begin);
498 let msg: &[u8] = &pb.seal().unwrap();
499 assert_eq!(&[10, 2, 8, 42], &msg);
500 }
501
502 #[test]
503 fn map_with_scalar_key_scalar_value_two_entries() {
504 let mut pb = ProtoBuilder::default();
505 let key_tag1: &[u8] = &stack_pack(Tag {
507 field_number: FieldNumber::must(1),
508 wire_type: WireType::Varint,
509 })
510 .to_vec();
511 let key_buf1: &[u8] = &[42];
512 let value_tag1: &[u8] = &stack_pack(Tag {
513 field_number: FieldNumber::must(2),
514 wire_type: WireType::LengthDelimited,
515 })
516 .to_vec();
517 let value_buf1: &[u8] = &[103, 117, 105, 116, 97, 114];
518 let key_tag2: &[u8] = &stack_pack(Tag {
520 field_number: FieldNumber::must(1),
521 wire_type: WireType::Varint,
522 })
523 .to_vec();
524 let key_buf2: &[u8] = &[42];
525 let value_tag2: &[u8] = &stack_pack(Tag {
526 field_number: FieldNumber::must(2),
527 wire_type: WireType::LengthDelimited,
528 })
529 .to_vec();
530 let value_buf2: &[u8] = &[100, 114, 117, 109, 115];
531 let tag = Tag {
533 field_number: FieldNumber::must(7),
534 wire_type: WireType::LengthDelimited,
535 };
536 let begin = pb.begin_map_with_message(tag, &[key_tag1, key_buf1, value_tag1]);
537 pb.emit_inline(value_buf1);
539 pb.end_message(begin);
541 let begin = pb.begin_map_with_message(tag, &[key_tag2, key_buf2, value_tag2]);
543 pb.emit_inline(value_buf2);
545 pb.end_message(begin);
547 let msg: &[u8] = &pb.seal().unwrap();
548 assert_eq!(
549 &[
550 58, 10, 8, 42, 18, 6, 103, 117, 105, 116, 97, 114, 58, 9, 8, 42, 18, 5, 100, 114,
551 117, 109, 115
552 ],
553 &msg
554 );
555 }
556
557 #[test]
558 fn map_with_scalar_key_message_value() {
559 let mut pb = ProtoBuilder::default();
560 let key_tag: &[u8] = &stack_pack(Tag {
562 field_number: FieldNumber::must(1),
563 wire_type: WireType::Varint,
564 })
565 .to_vec();
566 let key_buf: &[u8] = &[42];
567 let value_tag: &[u8] = &stack_pack(Tag {
568 field_number: FieldNumber::must(2),
569 wire_type: WireType::LengthDelimited,
570 })
571 .to_vec();
572 let value_buf: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7];
573 let tag = Tag {
575 field_number: FieldNumber::must(7),
576 wire_type: WireType::LengthDelimited,
577 };
578 let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
579 pb.emit_inline(value_buf);
581 pb.end_message(begin);
583 let msg: &[u8] = &pb.seal().unwrap();
584 assert_eq!(&[58, 12, 8, 42, 18, 8, 0, 1, 2, 3, 4, 5, 6, 7], &msg);
585 }
586
587 #[test]
588 fn map_with_message_key_message_value() {
589 let mut pb = ProtoBuilder::default();
590 let key_tag: &[u8] = &stack_pack(Tag {
592 field_number: FieldNumber::must(1),
593 wire_type: WireType::LengthDelimited,
594 })
595 .to_vec();
596 let key_buf: &[u8] = &[4, 42, 43, 44, 45];
597 let value_tag: &[u8] = &stack_pack(Tag {
598 field_number: FieldNumber::must(2),
599 wire_type: WireType::LengthDelimited,
600 })
601 .to_vec();
602 let value_buf: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7];
603 let tag = Tag {
605 field_number: FieldNumber::must(7),
606 wire_type: WireType::LengthDelimited,
607 };
608 let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
609 pb.emit_inline(value_buf);
611 pb.end_message(begin);
613 let msg: &[u8] = &pb.seal().unwrap();
614 assert_eq!(
615 &[58, 16, 10, 4, 42, 43, 44, 45, 18, 8, 0, 1, 2, 3, 4, 5, 6, 7],
616 &msg
617 );
618 }
619
620 #[test]
621 fn map_with_string_key_message_value() {
622 let mut pb = ProtoBuilder::default();
623 let key_tag: &[u8] = &stack_pack(Tag {
625 field_number: FieldNumber::must(1),
626 wire_type: WireType::LengthDelimited,
627 })
628 .to_vec();
629 let key_buf: &[u8] = &[5, 104, 101, 108, 108, 111];
630 let value_tag: &[u8] = &stack_pack(Tag {
631 field_number: FieldNumber::must(2),
632 wire_type: WireType::LengthDelimited,
633 })
634 .to_vec();
635 let value_buf: &[u8] = &[];
636 let tag = Tag {
638 field_number: FieldNumber::must(5),
639 wire_type: WireType::LengthDelimited,
640 };
641 let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
642 pb.emit_inline(value_buf);
644 pb.end_message(begin);
646 let msg: &[u8] = &pb.seal().unwrap();
647 assert_eq!(&[42, 9, 10, 5, 104, 101, 108, 108, 111, 18, 0], &msg);
648 }
649
650 use tuple_key::Direction;
651
652 use crate::{SchemaKey, SchemaKeyElement};
653
654 use super::*;
655
656 fn test_schema() -> Schema {
657 let mut schema = Schema::default();
658 schema
660 .add_to_schema(SchemaEntry::new(
661 SchemaKey::new(vec![SchemaKeyElement::new(
662 FieldNumber::must(1),
663 KeyDataType::unit,
664 Direction::Forward,
665 )]),
666 DataType::uint64,
667 ))
668 .unwrap();
669 schema
671 .add_to_schema(SchemaEntry::new(
672 SchemaKey::new(vec![SchemaKeyElement::new(
673 FieldNumber::must(2),
674 KeyDataType::unit,
675 Direction::Forward,
676 )]),
677 DataType::message,
678 ))
679 .unwrap();
680 schema
682 .add_to_schema(SchemaEntry::new(
683 SchemaKey::new(vec![
684 SchemaKeyElement::new(
685 FieldNumber::must(2),
686 KeyDataType::unit,
687 Direction::Forward,
688 ),
689 SchemaKeyElement::new(
690 FieldNumber::must(3),
691 KeyDataType::unit,
692 Direction::Forward,
693 ),
694 ]),
695 DataType::uint64,
696 ))
697 .unwrap();
698 schema
700 .add_to_schema(SchemaEntry::new(
701 SchemaKey::new(vec![SchemaKeyElement::new(
702 FieldNumber::must(4),
703 KeyDataType::fixed64,
704 Direction::Forward,
705 )]),
706 DataType::string,
707 ))
708 .unwrap();
709 schema
711 .add_to_schema(SchemaEntry::new(
712 SchemaKey::new(vec![SchemaKeyElement::new(
713 FieldNumber::must(5),
714 KeyDataType::string,
715 Direction::Forward,
716 )]),
717 DataType::message,
718 ))
719 .unwrap();
720 schema
722 .add_to_schema(SchemaEntry::new(
723 SchemaKey::new(vec![
724 SchemaKeyElement::new(
725 FieldNumber::must(5),
726 KeyDataType::string,
727 Direction::Forward,
728 ),
729 SchemaKeyElement::new(
730 FieldNumber::must(6),
731 KeyDataType::unit,
732 Direction::Forward,
733 ),
734 ]),
735 DataType::uint64,
736 ))
737 .unwrap();
738 schema
739 }
740
741 #[test]
742 fn obj_builder_default() {
743 let schema = test_schema();
744 let obj_builder = ObjectBuilder::new(schema);
745 let buf = obj_builder.seal().unwrap();
746 assert!(buf.is_empty());
747 }
748
749 #[test]
750 fn unit_to_uint64() {
751 let mut key = TupleKey::default();
752 key.extend(FieldNumber::must(1));
753 let schema = test_schema();
754 let mut obj_builder = ObjectBuilder::new(schema);
755 obj_builder
756 .next(KeyValueRef {
757 key: key.as_bytes(),
758 timestamp: 0,
759 value: Some(&[42]),
760 })
761 .unwrap();
762 let buf = obj_builder.seal().unwrap();
763 assert_eq!(&[8, 42], &buf.as_slice());
764 }
765
766 #[test]
767 fn unit_to_message() {
768 let mut key = TupleKey::default();
769 key.extend(FieldNumber::must(2));
770 let schema = test_schema();
771 let mut obj_builder = ObjectBuilder::new(schema);
772 obj_builder
773 .next(KeyValueRef {
774 key: key.as_bytes(),
775 timestamp: 0,
776 value: Some(&[8, 42]),
777 })
778 .unwrap();
779 let buf = obj_builder.seal().unwrap();
780 assert_eq!(&[18, 2, 8, 42], &buf.as_slice());
781 }
782
783 #[test]
784 fn scalar_within_message1() {
785 let mut key = TupleKey::default();
786 key.extend(FieldNumber::must(2));
787 key.extend(FieldNumber::must(3));
788 let schema = test_schema();
789 let mut obj_builder = ObjectBuilder::new(schema);
790 obj_builder
791 .next(KeyValueRef {
792 key: key.as_bytes(),
793 timestamp: 0,
794 value: Some(&[42]),
795 })
796 .unwrap();
797 let buf = obj_builder.seal().unwrap();
798 assert_eq!(&[18, 2, 24, 42], &buf.as_slice());
799 }
800
801 #[test]
802 fn scalar_within_message2() {
803 let mut key1 = TupleKey::default();
804 key1.extend(FieldNumber::must(2));
805 let mut key2 = key1.clone();
806 key2.extend(FieldNumber::must(3));
807 let schema = test_schema();
808 let mut obj_builder = ObjectBuilder::new(schema);
809 obj_builder
810 .next(KeyValueRef {
811 key: key1.as_bytes(),
812 timestamp: 0,
813 value: Some(&[8, 33]),
814 })
815 .unwrap();
816 obj_builder
817 .next(KeyValueRef {
818 key: key2.as_bytes(),
819 timestamp: 0,
820 value: Some(&[42]),
821 })
822 .unwrap();
823 let buf = obj_builder.seal().unwrap();
824 assert_eq!(&[18, 4, 8, 33, 24, 42], &buf.as_slice());
825 }
826
827 #[test]
828 fn map_uint64_to_string1() {
829 let mut key = TupleKey::default();
830 key.extend_with_key(FieldNumber::must(4), 42u64, Direction::Forward);
831 let schema = test_schema();
832 let mut obj_builder = ObjectBuilder::new(schema);
833 obj_builder
834 .next(KeyValueRef {
835 key: key.as_bytes(),
836 timestamp: 0,
837 value: Some(&[104, 101, 108, 108, 111]),
838 })
839 .unwrap();
840 let buf = obj_builder.seal().unwrap();
841 assert_eq!(
842 &[34, 16, 9, 42, 0, 0, 0, 0, 0, 0, 0, 18, 5, 104, 101, 108, 108, 111],
843 &buf.as_slice()
844 );
845 }
846
847 #[test]
848 fn map_uint64_to_string2() {
849 let mut key1 = TupleKey::default();
850 key1.extend_with_key(FieldNumber::must(4), 42u64, Direction::Forward);
851 let mut key2 = TupleKey::default();
852 key2.extend_with_key(FieldNumber::must(4), 69u64, Direction::Forward);
853 let schema = test_schema();
854 let mut obj_builder = ObjectBuilder::new(schema);
855 obj_builder
856 .next(KeyValueRef {
857 key: key1.as_bytes(),
858 timestamp: 0,
859 value: Some(&[104, 101, 108, 108, 111]),
860 })
861 .unwrap();
862 obj_builder
863 .next(KeyValueRef {
864 key: key2.as_bytes(),
865 timestamp: 0,
866 value: Some(&[119, 111, 114, 108, 100]),
867 })
868 .unwrap();
869 let buf = obj_builder.seal().unwrap();
870 assert_eq!(
871 &[
872 34, 16, 9, 42, 0, 0, 0, 0, 0, 0, 0, 18, 5, 104, 101, 108, 108, 111, 34, 16, 9, 69,
873 0, 0, 0, 0, 0, 0, 0, 18, 5, 119, 111, 114, 108, 100
874 ],
875 &buf.as_slice(),
876 );
877 }
878
879 #[test]
880 fn map_string_to_message1() {
881 let mut key = TupleKey::default();
882 key.extend_with_key(FieldNumber::must(5), "hello".to_owned(), Direction::Forward);
883 let schema = test_schema();
884 let mut obj_builder = ObjectBuilder::new(schema);
885 obj_builder
886 .next(KeyValueRef {
887 key: key.as_bytes(),
888 timestamp: 0,
889 value: Some(&[]),
890 })
891 .unwrap();
892 let buf = obj_builder.seal().unwrap();
893 assert_eq!(
894 &[42, 9, 10, 5, 104, 101, 108, 108, 111, 18, 0],
895 &buf.as_slice()
896 );
897 }
898
899 #[test]
900 fn map_string_to_message2() {
901 let mut key1 = TupleKey::default();
902 key1.extend_with_key(FieldNumber::must(5), "hello".to_owned(), Direction::Forward);
903 let mut key2 = key1.clone();
904 key2.extend(FieldNumber::must(6));
905 let schema = test_schema();
906 let mut obj_builder = ObjectBuilder::new(schema);
907 obj_builder
908 .next(KeyValueRef {
909 key: key1.as_bytes(),
910 timestamp: 0,
911 value: Some(&[]),
912 })
913 .unwrap();
914 obj_builder
915 .next(KeyValueRef {
916 key: key2.as_bytes(),
917 timestamp: 0,
918 value: Some(&[33]),
919 })
920 .unwrap();
921 let buf = obj_builder.seal().unwrap();
922 assert_eq!(
923 &[42, 11, 10, 5, 104, 101, 108, 108, 111, 18, 2, 48, 33],
924 &buf.as_slice(),
925 );
926 }
927}