protoql/
object_builder.rs

1use std::fmt::Debug;
2
3use buffertk::{stack_pack, v64, Packable};
4use prototk::field_types::*;
5use prototk::{FieldNumber, Tag, WireType};
6use sst::KeyValueRef;
7use tuple_key::{Element, KeyDataType, TupleKey, TupleKeyIterator};
8use zerror::Z;
9use zerror_core::ErrorCore;
10
11use super::{DataType, Error, IoToZ, Schema, SchemaEntry};
12
13/////////////////////////////////////////// MessageFrame ///////////////////////////////////////////
14
15/// MessageFrame captures the boundaries of a message and the internal holes left for sizes.
16#[derive(Copy, Clone, Debug)]
17enum MessageFrame {
18    Begin {
19        offset: usize,
20        tag_sz: usize,
21    },
22    BeginMapWithMessage {
23        offset: usize,
24        tag_sz: usize,
25        key_offset: usize,
26    },
27    End {
28        offset: usize,
29        begin: usize,
30    },
31}
32
33//////////////////////////////////////// MessageFrameWrapper ///////////////////////////////////////
34
35struct MessageFrameWrapper {
36    index: usize,
37}
38
39/////////////////////////////////////////// ProtoBuilder ///////////////////////////////////////////
40
41#[derive(Debug, Default)]
42struct ProtoBuilder {
43    msg: Vec<u8>,
44    frames: Vec<MessageFrame>,
45}
46
47impl ProtoBuilder {
48    fn begin_message(&mut self, tag: Tag, buffers: &[&[u8]]) -> MessageFrameWrapper {
49        let begin = MessageFrame::Begin {
50            offset: self.msg.len(),
51            tag_sz: tag.pack_sz(),
52        };
53        let wrap = MessageFrameWrapper {
54            index: self.frames.len(),
55        };
56        self.frames.push(begin);
57        stack_pack(tag).append_to_vec(&mut self.msg);
58        for _ in 0..8 {
59            self.msg.push(0);
60        }
61        for buffer in buffers {
62            self.msg.extend_from_slice(buffer)
63        }
64        wrap
65    }
66
67    fn begin_map_with_message(&mut self, tag: Tag, buffers: &[&[u8]]) -> MessageFrameWrapper {
68        let msg_len = self.msg.len();
69        stack_pack(&tag).append_to_vec(&mut self.msg);
70        for _ in 0..8 {
71            self.msg.push(0);
72        }
73        for buffer in buffers {
74            self.msg.extend_from_slice(buffer)
75        }
76        let begin = MessageFrame::BeginMapWithMessage {
77            offset: msg_len,
78            tag_sz: tag.pack_sz(),
79            key_offset: self.msg.len(),
80        };
81        let wrap = MessageFrameWrapper {
82            index: self.frames.len(),
83        };
84        self.frames.push(begin);
85        for _ in 0..8 {
86            self.msg.push(0);
87        }
88        wrap
89    }
90
91    fn end_message(&mut self, wrapper: MessageFrameWrapper) {
92        let end = MessageFrame::End {
93            offset: self.msg.len(),
94            begin: wrapper.index,
95        };
96        self.frames.push(end);
97    }
98
99    fn emit_inline(&mut self, value: &[u8]) {
100        self.msg.extend_from_slice(value);
101    }
102
103    fn emit_breakout(&mut self, tag: Tag, value: &[u8]) {
104        stack_pack(&tag).append_to_vec(&mut self.msg);
105        self.msg.extend_from_slice(value);
106    }
107
108    fn shift_frame(
109        &mut self,
110        offset_of_u64: usize,
111        in_progress_offset: usize,
112        msg_sz: usize,
113        bytes_dropped: usize,
114    ) -> Result<usize, Error> {
115        let post_u64_offset = offset_of_u64 + 8;
116        if post_u64_offset > in_progress_offset {
117            return Err(Error::LogicError {
118                core: ErrorCore::default(),
119                what: "offset_too_small".to_owned(),
120            })
121            .as_z()
122            .with_info("post_u64_offset", post_u64_offset)
123            .with_info("in_progress_offset", in_progress_offset);
124        }
125        let msg_sz_v64 = v64::from(msg_sz);
126        let msg_sz_v64_pack_sz = msg_sz_v64.pack_sz();
127        if msg_sz_v64_pack_sz > 8 {
128            return Err(Error::LogicError {
129                core: ErrorCore::default(),
130                what: "offset_too_small".to_owned(),
131            })
132            .as_z()
133            .with_info("msg_sz_v64_pack_sz", msg_sz_v64_pack_sz);
134        }
135        let newly_dropped_bytes = 8 - msg_sz_v64_pack_sz;
136        for src in (post_u64_offset..in_progress_offset).rev() {
137            let dst = src + bytes_dropped;
138            self.msg[dst] = self.msg[src];
139        }
140        let length_start = offset_of_u64 + bytes_dropped + newly_dropped_bytes;
141        let length_slice = &mut self.msg[length_start..length_start + msg_sz_v64_pack_sz];
142        stack_pack(msg_sz_v64).into_slice(length_slice);
143        Ok(newly_dropped_bytes)
144    }
145
146    fn seal(mut self) -> Result<Vec<u8>, Error> {
147        let mut in_progress = Vec::new();
148        let mut bytes_dropped = 0;
149        while !self.frames.is_empty() {
150            let frame_idx = self.frames.len() - 1;
151            let back = self.frames[frame_idx];
152            match back {
153                MessageFrame::Begin {
154                    offset: begin_offset,
155                    tag_sz,
156                } => {
157                    if in_progress.is_empty() {
158                        return Err(Error::LogicError {
159                            core: ErrorCore::default(),
160                            what: "in_progress was empty".to_owned(),
161                        });
162                    }
163                    let (in_progress_offset, in_progress_idx) = in_progress.pop().unwrap();
164                    if in_progress_idx != frame_idx {
165                        return Err(Error::LogicError {
166                            core: ErrorCore::default(),
167                            what: "index miscalculation".to_owned(),
168                        })
169                        .as_z()
170                        .with_info("in_progress_idx", in_progress_idx)
171                        .with_info("frame_idx", frame_idx);
172                    }
173                    let msg_sz = in_progress_offset - begin_offset - tag_sz - 8;
174                    let newly_dropped_bytes = self.shift_frame(
175                        begin_offset + tag_sz,
176                        in_progress_offset,
177                        msg_sz,
178                        bytes_dropped,
179                    )?;
180                    for tag_byte in (begin_offset..begin_offset + tag_sz).rev() {
181                        self.msg[tag_byte + newly_dropped_bytes] = self.msg[tag_byte];
182                    }
183                    bytes_dropped += newly_dropped_bytes;
184                    self.frames.pop();
185                }
186                MessageFrame::BeginMapWithMessage {
187                    offset: begin_offset,
188                    tag_sz,
189                    key_offset,
190                } => {
191                    if in_progress.is_empty() {
192                        return Err(Error::LogicError {
193                            core: ErrorCore::default(),
194                            what: "in_progress was empty".to_owned(),
195                        });
196                    }
197                    let (in_progress_offset, in_progress_idx) = in_progress.pop().unwrap();
198                    if in_progress_idx != frame_idx {
199                        return Err(Error::LogicError {
200                            core: ErrorCore::default(),
201                            what: "index miscalculation".to_owned(),
202                        })
203                        .as_z()
204                        .with_info("in_progress_idx", in_progress_idx)
205                        .with_info("frame_idx", frame_idx);
206                    }
207                    let msg_sz = in_progress_offset - key_offset - 8;
208                    let first_dropped_bytes =
209                        self.shift_frame(key_offset, in_progress_offset, msg_sz, bytes_dropped)?;
210                    bytes_dropped += first_dropped_bytes;
211                    let msg_sz =
212                        in_progress_offset - begin_offset - tag_sz - 16 + (8 - first_dropped_bytes);
213                    let second_dropped_bytes =
214                        self.shift_frame(begin_offset + tag_sz, key_offset, msg_sz, bytes_dropped)?;
215                    bytes_dropped += second_dropped_bytes;
216                    for tag_byte in (begin_offset..begin_offset + tag_sz).rev() {
217                        self.msg[tag_byte + bytes_dropped] = self.msg[tag_byte];
218                    }
219                    self.frames.pop();
220                }
221                MessageFrame::End { offset, begin } => {
222                    in_progress.push((offset, begin));
223                    self.frames.pop();
224                }
225            }
226        }
227        for i in 0..self.msg.len() - bytes_dropped {
228            self.msg[i] = self.msg[i + bytes_dropped];
229        }
230        self.msg.truncate(self.msg.len() - bytes_dropped);
231        Ok(self.msg)
232    }
233}
234
235///////////////////////////////////////////// utilities ////////////////////////////////////////////
236
237pub fn parse_as_prototk(val: &[u8], ty: KeyDataType) -> Result<Vec<u8>, &'static str> {
238    match ty {
239        KeyDataType::unit => Ok(Vec::new()),
240        KeyDataType::fixed32 => {
241            Ok(stack_pack(fixed32(<u32 as Element>::parse_from(val)?)).to_vec())
242        }
243        KeyDataType::fixed64 => {
244            Ok(stack_pack(fixed64(<u64 as Element>::parse_from(val)?)).to_vec())
245        }
246        KeyDataType::sfixed32 => {
247            Ok(stack_pack(sfixed32(<i32 as Element>::parse_from(val)?)).to_vec())
248        }
249        KeyDataType::sfixed64 => {
250            Ok(stack_pack(sfixed64(<i64 as Element>::parse_from(val)?)).to_vec())
251        }
252        KeyDataType::string => {
253            Ok(stack_pack(string(&<String as Element>::parse_from(val)?)).to_vec())
254        }
255    }
256}
257
258/////////////////////////////////////////// ObjectBuilder //////////////////////////////////////////
259
260pub struct ObjectBuilder {
261    schema: Schema,
262    builder: ProtoBuilder,
263    current_type: SchemaEntry,
264    frame_stack: Vec<Option<MessageFrameWrapper>>,
265    prev_tuple_key: TupleKey,
266}
267
268impl ObjectBuilder {
269    pub fn new(schema: Schema) -> Self {
270        let builder = ProtoBuilder::default();
271        let current_type = SchemaEntry::default();
272        let frame_stack = vec![];
273        let prev_tuple_key = TupleKey::default();
274        Self {
275            schema,
276            builder,
277            current_type,
278            frame_stack,
279            prev_tuple_key,
280        }
281    }
282
283    pub fn next(&mut self, kvr: KeyValueRef) -> Result<(), Error> {
284        if kvr.value.is_none() {
285            // NOTE(rescrv): deletes have no place in protoql; drop them silently.
286            return Ok(());
287        }
288        let schema_entry = match self.schema.lookup_schema_for_key(kvr.key)? {
289            Some(schema_entry) => schema_entry,
290            None => {
291                // NOTE(rescrv): keys that don't match schema have no place in protoql; drop them silently.
292                return Ok(());
293            }
294        };
295        let common = TupleKeyIterator::number_of_elements_in_common_prefix(
296            self.prev_tuple_key.iter(),
297            TupleKeyIterator::from(kvr.key),
298        );
299        self.prev_tuple_key = TupleKey::from(kvr.key);
300        while !self.current_type.is_extendable_by(schema_entry)
301            || self.current_type.key().elements().len() > common
302        {
303            self.current_type.pop_field();
304            // SAFETY(rescrv): We know we pushed onto the stack.
305            if let Some(frame) = self.frame_stack.pop().unwrap() {
306                self.builder.end_message(frame);
307            }
308            // We know that we only push to the stacks in unison.
309            // The zero key will always extend so we won't pop zero.
310            assert_eq!(
311                self.current_type.key().elements().len(),
312                self.frame_stack.len()
313            );
314        }
315        // We'll go until we add everything necessary to extend.
316        let mut tki = TupleKeyIterator::from(kvr.key);
317        for _ in 0..2 * self.current_type.key().elements().len() {
318            tki.next().ok_or(Error::Corruption {
319                core: ErrorCore::default(),
320                what: "tuple key exhausted".to_owned(),
321            })?;
322        }
323        while !schema_entry.is_extendable_by(&self.current_type) {
324            // We know the current type should be extensible by construction.
325            assert!(self.current_type.is_extendable_by(schema_entry));
326            // We know we have the longer key from the popping above.
327            let ct_sz = self.current_type.key().elements().len();
328            let st_sz = schema_entry.key().elements().len();
329            assert!(ct_sz < st_sz);
330            let next_field = &schema_entry.key().elements()[ct_sz];
331            let (value_ty, terminal) = if ct_sz + 1 >= st_sz {
332                (schema_entry.value(), true)
333            } else {
334                (DataType::message, false)
335            };
336            let _ = tki.next().ok_or(Error::Corruption {
337                core: ErrorCore::default(),
338                what: "tuple key exhausted".to_owned(),
339            })?;
340            let tk_key = tki.next().ok_or(Error::Corruption {
341                core: ErrorCore::default(),
342                what: "tuple key exhausted".to_owned(),
343            })?;
344            let tk_key =
345                parse_as_prototk(tk_key, next_field.ty()).map_err(|err| Error::Corruption {
346                    core: ErrorCore::default(),
347                    what: err.to_string(),
348                })?;
349            self.current_type.push_field(next_field.clone(), value_ty);
350            match (next_field.ty(), value_ty) {
351                (KeyDataType::unit, DataType::message) => {
352                    let msg_tag = Tag {
353                        field_number: next_field.number(),
354                        wire_type: value_ty.wire_type(),
355                    };
356                    self.frame_stack
357                        .push(Some(self.builder.begin_message(msg_tag, &[])));
358                    if terminal {
359                        self.builder.emit_inline(kvr.value.unwrap());
360                    }
361                }
362                (KeyDataType::unit, _) => {
363                    let unit_tag = Tag {
364                        field_number: next_field.number(),
365                        wire_type: value_ty.wire_type(),
366                    };
367                    self.frame_stack.push(None);
368                    if terminal {
369                        self.builder.emit_breakout(unit_tag, kvr.value.unwrap());
370                    }
371                }
372                (_, _) => {
373                    let map_tag = Tag {
374                        field_number: next_field.number(),
375                        wire_type: WireType::LengthDelimited,
376                    };
377                    let key_tag: &[u8] = &stack_pack(Tag {
378                        field_number: FieldNumber::must(1),
379                        wire_type: next_field.ty().wire_type(),
380                    })
381                    .to_vec();
382                    let value_tag: &[u8] = &stack_pack(Tag {
383                        field_number: FieldNumber::must(2),
384                        wire_type: value_ty.wire_type(),
385                    })
386                    .to_vec();
387                    self.frame_stack.push(Some(
388                        self.builder
389                            .begin_map_with_message(map_tag, &[key_tag, &tk_key, value_tag]),
390                    ));
391                    if terminal {
392                        self.builder.emit_inline(kvr.value.unwrap());
393                    }
394                }
395            }
396        }
397        Ok(())
398    }
399
400    pub fn seal(mut self) -> Result<Vec<u8>, Error> {
401        while !self.frame_stack.is_empty() {
402            // SAFETY(rescrv): We're popping from a non-empty stack.
403            if let Some(frame) = self.frame_stack.pop().unwrap() {
404                self.builder.end_message(frame);
405            }
406        }
407        self.builder.seal()
408    }
409}
410
411/////////////////////////////////////////////// tests //////////////////////////////////////////////
412
413#[cfg(test)]
414mod tests {
415    #[test]
416    fn default() {
417        ProtoBuilder::default();
418    }
419
420    #[test]
421    fn single_scalar_terminal() {
422        let mut pb = ProtoBuilder::default();
423        // A protocol buffers message of { 1:uint64 => 42 }.
424        let tag = Tag {
425            field_number: FieldNumber::must(1),
426            wire_type: WireType::Varint,
427        };
428        pb.emit_breakout(tag, &[42]);
429        // The message
430        let msg: &[u8] = &pb.seal().unwrap();
431        assert_eq!(&[8, 42], &msg);
432    }
433
434    #[test]
435    fn single_message_terminal() {
436        let mut pb = ProtoBuilder::default();
437        // A protocol buffers message of { 1:uint64 => 42 }.
438        let tag = Tag {
439            field_number: FieldNumber::must(1),
440            wire_type: WireType::LengthDelimited,
441        };
442        let begin = pb.begin_message(tag, &[&[8u8, 42]]);
443        pb.end_message(begin);
444        // The message
445        let msg: &[u8] = &pb.seal().unwrap();
446        assert_eq!(&[10, 2, 8, 42], &msg);
447    }
448
449    #[test]
450    fn scalar_within_message() {
451        let mut pb = ProtoBuilder::default();
452        // Let's create a protocol buffers message with a breakout field.
453        let tag = Tag {
454            field_number: FieldNumber::must(1),
455            wire_type: WireType::LengthDelimited,
456        };
457        let begin = pb.begin_message(tag, &[]);
458        // A protocol buffers message of { 1:uint64 => 42 }.
459        let tag = Tag {
460            field_number: FieldNumber::must(2),
461            wire_type: WireType::Varint,
462        };
463        pb.emit_breakout(tag, &[42]);
464        // Finish the message
465        pb.end_message(begin);
466        let msg: &[u8] = &pb.seal().unwrap();
467        assert_eq!(&[10, 2, 16, 42], &msg);
468    }
469
470    #[test]
471    fn message_within_message1() {
472        let mut pb = ProtoBuilder::default();
473        // Let's create a protocol buffers message with a breakout field.
474        let tag = Tag {
475            field_number: FieldNumber::must(1),
476            wire_type: WireType::LengthDelimited,
477        };
478        let begin = pb.begin_message(tag, &[]);
479        // A protocol buffers message of { 1:uint64 => 42 }.
480        pb.emit_inline(&[8, 42]);
481        // Finish the message
482        pb.end_message(begin);
483        let msg: &[u8] = &pb.seal().unwrap();
484        assert_eq!(&[10, 2, 8, 42], &msg);
485    }
486
487    #[test]
488    fn message_within_message2() {
489        let mut pb = ProtoBuilder::default();
490        // Let's create a protocol buffers message with a breakout field.
491        let tag = Tag {
492            field_number: FieldNumber::must(1),
493            wire_type: WireType::LengthDelimited,
494        };
495        let begin = pb.begin_message(tag, &[&[8u8, 42]]);
496        // Finish the message
497        pb.end_message(begin);
498        let msg: &[u8] = &pb.seal().unwrap();
499        assert_eq!(&[10, 2, 8, 42], &msg);
500    }
501
502    #[test]
503    fn map_with_scalar_key_scalar_value_two_entries() {
504        let mut pb = ProtoBuilder::default();
505        // The first key for the map.  The value will be a string.
506        let key_tag1: &[u8] = &stack_pack(Tag {
507            field_number: FieldNumber::must(1),
508            wire_type: WireType::Varint,
509        })
510        .to_vec();
511        let key_buf1: &[u8] = &[42];
512        let value_tag1: &[u8] = &stack_pack(Tag {
513            field_number: FieldNumber::must(2),
514            wire_type: WireType::LengthDelimited,
515        })
516        .to_vec();
517        let value_buf1: &[u8] = &[103, 117, 105, 116, 97, 114];
518        // The second key for the map.  The value will be a string.
519        let key_tag2: &[u8] = &stack_pack(Tag {
520            field_number: FieldNumber::must(1),
521            wire_type: WireType::Varint,
522        })
523        .to_vec();
524        let key_buf2: &[u8] = &[42];
525        let value_tag2: &[u8] = &stack_pack(Tag {
526            field_number: FieldNumber::must(2),
527            wire_type: WireType::LengthDelimited,
528        })
529        .to_vec();
530        let value_buf2: &[u8] = &[100, 114, 117, 109, 115];
531        // Let's create a protocol buffers message with a map field.
532        let tag = Tag {
533            field_number: FieldNumber::must(7),
534            wire_type: WireType::LengthDelimited,
535        };
536        let begin = pb.begin_map_with_message(tag, &[key_tag1, key_buf1, value_tag1]);
537        // Emit the value inline because we captured the value tag.
538        pb.emit_inline(value_buf1);
539        // Finish the message
540        pb.end_message(begin);
541        // Second verse, practically the first.
542        let begin = pb.begin_map_with_message(tag, &[key_tag2, key_buf2, value_tag2]);
543        // Emit the value inline because we captured the value tag.
544        pb.emit_inline(value_buf2);
545        // Finish the message
546        pb.end_message(begin);
547        let msg: &[u8] = &pb.seal().unwrap();
548        assert_eq!(
549            &[
550                58, 10, 8, 42, 18, 6, 103, 117, 105, 116, 97, 114, 58, 9, 8, 42, 18, 5, 100, 114,
551                117, 109, 115
552            ],
553            &msg
554        );
555    }
556
557    #[test]
558    fn map_with_scalar_key_message_value() {
559        let mut pb = ProtoBuilder::default();
560        // The key for the map.  The value will be a message.
561        let key_tag: &[u8] = &stack_pack(Tag {
562            field_number: FieldNumber::must(1),
563            wire_type: WireType::Varint,
564        })
565        .to_vec();
566        let key_buf: &[u8] = &[42];
567        let value_tag: &[u8] = &stack_pack(Tag {
568            field_number: FieldNumber::must(2),
569            wire_type: WireType::LengthDelimited,
570        })
571        .to_vec();
572        let value_buf: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7];
573        // Let's create a protocol buffers message with a map field.
574        let tag = Tag {
575            field_number: FieldNumber::must(7),
576            wire_type: WireType::LengthDelimited,
577        };
578        let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
579        // Emit the value inline because we captured the value tag.
580        pb.emit_inline(value_buf);
581        // Finish the message
582        pb.end_message(begin);
583        let msg: &[u8] = &pb.seal().unwrap();
584        assert_eq!(&[58, 12, 8, 42, 18, 8, 0, 1, 2, 3, 4, 5, 6, 7], &msg);
585    }
586
587    #[test]
588    fn map_with_message_key_message_value() {
589        let mut pb = ProtoBuilder::default();
590        // The key for the map.  The value will be a message.
591        let key_tag: &[u8] = &stack_pack(Tag {
592            field_number: FieldNumber::must(1),
593            wire_type: WireType::LengthDelimited,
594        })
595        .to_vec();
596        let key_buf: &[u8] = &[4, 42, 43, 44, 45];
597        let value_tag: &[u8] = &stack_pack(Tag {
598            field_number: FieldNumber::must(2),
599            wire_type: WireType::LengthDelimited,
600        })
601        .to_vec();
602        let value_buf: &[u8] = &[0, 1, 2, 3, 4, 5, 6, 7];
603        // Let's create a protocol buffers message with a map field.
604        let tag = Tag {
605            field_number: FieldNumber::must(7),
606            wire_type: WireType::LengthDelimited,
607        };
608        let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
609        // Emit the value inline because we captured the value tag.
610        pb.emit_inline(value_buf);
611        // Finish the message
612        pb.end_message(begin);
613        let msg: &[u8] = &pb.seal().unwrap();
614        assert_eq!(
615            &[58, 16, 10, 4, 42, 43, 44, 45, 18, 8, 0, 1, 2, 3, 4, 5, 6, 7],
616            &msg
617        );
618    }
619
620    #[test]
621    fn map_with_string_key_message_value() {
622        let mut pb = ProtoBuilder::default();
623        // The key for the map.  The value will be a message.
624        let key_tag: &[u8] = &stack_pack(Tag {
625            field_number: FieldNumber::must(1),
626            wire_type: WireType::LengthDelimited,
627        })
628        .to_vec();
629        let key_buf: &[u8] = &[5, 104, 101, 108, 108, 111];
630        let value_tag: &[u8] = &stack_pack(Tag {
631            field_number: FieldNumber::must(2),
632            wire_type: WireType::LengthDelimited,
633        })
634        .to_vec();
635        let value_buf: &[u8] = &[];
636        // Let's create a protocol buffers message with a map field.
637        let tag = Tag {
638            field_number: FieldNumber::must(5),
639            wire_type: WireType::LengthDelimited,
640        };
641        let begin = pb.begin_map_with_message(tag, &[key_tag, key_buf, value_tag]);
642        // Emit the value inline because we captured the value tag.
643        pb.emit_inline(value_buf);
644        // Finish the message
645        pb.end_message(begin);
646        let msg: &[u8] = &pb.seal().unwrap();
647        assert_eq!(&[42, 9, 10, 5, 104, 101, 108, 108, 111, 18, 0], &msg);
648    }
649
650    use tuple_key::Direction;
651
652    use crate::{SchemaKey, SchemaKeyElement};
653
654    use super::*;
655
656    fn test_schema() -> Schema {
657        let mut schema = Schema::default();
658        // Create field 1 such that it is a uint64 scalar.
659        schema
660            .add_to_schema(SchemaEntry::new(
661                SchemaKey::new(vec![SchemaKeyElement::new(
662                    FieldNumber::must(1),
663                    KeyDataType::unit,
664                    Direction::Forward,
665                )]),
666                DataType::uint64,
667            ))
668            .unwrap();
669        // Create field 2 such that it is a message.
670        schema
671            .add_to_schema(SchemaEntry::new(
672                SchemaKey::new(vec![SchemaKeyElement::new(
673                    FieldNumber::must(2),
674                    KeyDataType::unit,
675                    Direction::Forward,
676                )]),
677                DataType::message,
678            ))
679            .unwrap();
680        // Extend field 2 with a breakout.
681        schema
682            .add_to_schema(SchemaEntry::new(
683                SchemaKey::new(vec![
684                    SchemaKeyElement::new(
685                        FieldNumber::must(2),
686                        KeyDataType::unit,
687                        Direction::Forward,
688                    ),
689                    SchemaKeyElement::new(
690                        FieldNumber::must(3),
691                        KeyDataType::unit,
692                        Direction::Forward,
693                    ),
694                ]),
695                DataType::uint64,
696            ))
697            .unwrap();
698        // Create field 4 such that it is a map of fixed64 to string.
699        schema
700            .add_to_schema(SchemaEntry::new(
701                SchemaKey::new(vec![SchemaKeyElement::new(
702                    FieldNumber::must(4),
703                    KeyDataType::fixed64,
704                    Direction::Forward,
705                )]),
706                DataType::string,
707            ))
708            .unwrap();
709        // Create field 5 such that it is a map of string to message.
710        schema
711            .add_to_schema(SchemaEntry::new(
712                SchemaKey::new(vec![SchemaKeyElement::new(
713                    FieldNumber::must(5),
714                    KeyDataType::string,
715                    Direction::Forward,
716                )]),
717                DataType::message,
718            ))
719            .unwrap();
720        // Extend field 5 with a breakout.
721        schema
722            .add_to_schema(SchemaEntry::new(
723                SchemaKey::new(vec![
724                    SchemaKeyElement::new(
725                        FieldNumber::must(5),
726                        KeyDataType::string,
727                        Direction::Forward,
728                    ),
729                    SchemaKeyElement::new(
730                        FieldNumber::must(6),
731                        KeyDataType::unit,
732                        Direction::Forward,
733                    ),
734                ]),
735                DataType::uint64,
736            ))
737            .unwrap();
738        schema
739    }
740
741    #[test]
742    fn obj_builder_default() {
743        let schema = test_schema();
744        let obj_builder = ObjectBuilder::new(schema);
745        let buf = obj_builder.seal().unwrap();
746        assert!(buf.is_empty());
747    }
748
749    #[test]
750    fn unit_to_uint64() {
751        let mut key = TupleKey::default();
752        key.extend(FieldNumber::must(1));
753        let schema = test_schema();
754        let mut obj_builder = ObjectBuilder::new(schema);
755        obj_builder
756            .next(KeyValueRef {
757                key: key.as_bytes(),
758                timestamp: 0,
759                value: Some(&[42]),
760            })
761            .unwrap();
762        let buf = obj_builder.seal().unwrap();
763        assert_eq!(&[8, 42], &buf.as_slice());
764    }
765
766    #[test]
767    fn unit_to_message() {
768        let mut key = TupleKey::default();
769        key.extend(FieldNumber::must(2));
770        let schema = test_schema();
771        let mut obj_builder = ObjectBuilder::new(schema);
772        obj_builder
773            .next(KeyValueRef {
774                key: key.as_bytes(),
775                timestamp: 0,
776                value: Some(&[8, 42]),
777            })
778            .unwrap();
779        let buf = obj_builder.seal().unwrap();
780        assert_eq!(&[18, 2, 8, 42], &buf.as_slice());
781    }
782
783    #[test]
784    fn scalar_within_message1() {
785        let mut key = TupleKey::default();
786        key.extend(FieldNumber::must(2));
787        key.extend(FieldNumber::must(3));
788        let schema = test_schema();
789        let mut obj_builder = ObjectBuilder::new(schema);
790        obj_builder
791            .next(KeyValueRef {
792                key: key.as_bytes(),
793                timestamp: 0,
794                value: Some(&[42]),
795            })
796            .unwrap();
797        let buf = obj_builder.seal().unwrap();
798        assert_eq!(&[18, 2, 24, 42], &buf.as_slice());
799    }
800
801    #[test]
802    fn scalar_within_message2() {
803        let mut key1 = TupleKey::default();
804        key1.extend(FieldNumber::must(2));
805        let mut key2 = key1.clone();
806        key2.extend(FieldNumber::must(3));
807        let schema = test_schema();
808        let mut obj_builder = ObjectBuilder::new(schema);
809        obj_builder
810            .next(KeyValueRef {
811                key: key1.as_bytes(),
812                timestamp: 0,
813                value: Some(&[8, 33]),
814            })
815            .unwrap();
816        obj_builder
817            .next(KeyValueRef {
818                key: key2.as_bytes(),
819                timestamp: 0,
820                value: Some(&[42]),
821            })
822            .unwrap();
823        let buf = obj_builder.seal().unwrap();
824        assert_eq!(&[18, 4, 8, 33, 24, 42], &buf.as_slice());
825    }
826
827    #[test]
828    fn map_uint64_to_string1() {
829        let mut key = TupleKey::default();
830        key.extend_with_key(FieldNumber::must(4), 42u64, Direction::Forward);
831        let schema = test_schema();
832        let mut obj_builder = ObjectBuilder::new(schema);
833        obj_builder
834            .next(KeyValueRef {
835                key: key.as_bytes(),
836                timestamp: 0,
837                value: Some(&[104, 101, 108, 108, 111]),
838            })
839            .unwrap();
840        let buf = obj_builder.seal().unwrap();
841        assert_eq!(
842            &[34, 16, 9, 42, 0, 0, 0, 0, 0, 0, 0, 18, 5, 104, 101, 108, 108, 111],
843            &buf.as_slice()
844        );
845    }
846
847    #[test]
848    fn map_uint64_to_string2() {
849        let mut key1 = TupleKey::default();
850        key1.extend_with_key(FieldNumber::must(4), 42u64, Direction::Forward);
851        let mut key2 = TupleKey::default();
852        key2.extend_with_key(FieldNumber::must(4), 69u64, Direction::Forward);
853        let schema = test_schema();
854        let mut obj_builder = ObjectBuilder::new(schema);
855        obj_builder
856            .next(KeyValueRef {
857                key: key1.as_bytes(),
858                timestamp: 0,
859                value: Some(&[104, 101, 108, 108, 111]),
860            })
861            .unwrap();
862        obj_builder
863            .next(KeyValueRef {
864                key: key2.as_bytes(),
865                timestamp: 0,
866                value: Some(&[119, 111, 114, 108, 100]),
867            })
868            .unwrap();
869        let buf = obj_builder.seal().unwrap();
870        assert_eq!(
871            &[
872                34, 16, 9, 42, 0, 0, 0, 0, 0, 0, 0, 18, 5, 104, 101, 108, 108, 111, 34, 16, 9, 69,
873                0, 0, 0, 0, 0, 0, 0, 18, 5, 119, 111, 114, 108, 100
874            ],
875            &buf.as_slice(),
876        );
877    }
878
879    #[test]
880    fn map_string_to_message1() {
881        let mut key = TupleKey::default();
882        key.extend_with_key(FieldNumber::must(5), "hello".to_owned(), Direction::Forward);
883        let schema = test_schema();
884        let mut obj_builder = ObjectBuilder::new(schema);
885        obj_builder
886            .next(KeyValueRef {
887                key: key.as_bytes(),
888                timestamp: 0,
889                value: Some(&[]),
890            })
891            .unwrap();
892        let buf = obj_builder.seal().unwrap();
893        assert_eq!(
894            &[42, 9, 10, 5, 104, 101, 108, 108, 111, 18, 0],
895            &buf.as_slice()
896        );
897    }
898
899    #[test]
900    fn map_string_to_message2() {
901        let mut key1 = TupleKey::default();
902        key1.extend_with_key(FieldNumber::must(5), "hello".to_owned(), Direction::Forward);
903        let mut key2 = key1.clone();
904        key2.extend(FieldNumber::must(6));
905        let schema = test_schema();
906        let mut obj_builder = ObjectBuilder::new(schema);
907        obj_builder
908            .next(KeyValueRef {
909                key: key1.as_bytes(),
910                timestamp: 0,
911                value: Some(&[]),
912            })
913            .unwrap();
914        obj_builder
915            .next(KeyValueRef {
916                key: key2.as_bytes(),
917                timestamp: 0,
918                value: Some(&[33]),
919            })
920            .unwrap();
921        let buf = obj_builder.seal().unwrap();
922        assert_eq!(
923            &[42, 11, 10, 5, 104, 101, 108, 108, 111, 18, 2, 48, 33],
924            &buf.as_slice(),
925        );
926    }
927}