dbsp 0.290.0

Continuous streaming analytics engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
//! # Layer file format
//!
//! A layer file is a sequence of variable-sized binary blocks, each a multiple
//! of 512 bytes in length.  The order of the blocks in a file is unspecified,
//! except that the last block in a file is a [`FileTrailer`] block that is
//! exactly 512 bytes.
//!
//! The layer file implementation uses [`mod@binrw`] for serializing and
//! deserializing fixed-length data, and [`rkyv`] for serializing and
//! deserializing variable-length data.  The layer file implementation
//! configures [`mod@binrw`] for little-endian input and output.
//!
//! Each block begins with an 8-byte [`BlockHeader`].
//!
//! # Data blocks
//!
//! A data block consists of the following, in order:
//!
//! * [`DataBlockHeader`].
//!
//! * [`Item<K, A>`](`super::Item`) data items serialized with [`rkyv`].  The
//!   number of these is specified in [`DataBlockHeader::n_values`].
//!
//! * The "value map", which holds a byte offset from the start of the block to
//!   the start of each [`Item`](`super::Item`).  The offset to the value map is
//!   specified in [`DataBlockHeader::value_map_ofs`] and the format in
//!   [`DataBlockHeader::value_map_varint`].
//!
//! * The "row groups", which point from a row in this column to the associated
//!   rows in the next column.  The offset to the value map is specified in
//!   [`DataBlockHeader::row_groups_ofs`] and the format in
//!   [`DataBlockHeader::row_group_varint`].  Data blocks in the last column do
//!   not have row groups.
//!
//! # Index blocks
//!
//! An index block consists of the following, in order:
//!
//! * [`IndexBlockHeader`].
//!
//! * A sequence of bounds serialized with [`rkyv`], one pair for each of
//!   [`IndexBlockHeader::n_children`].  The first value in each pair is the
//!   smallest value in the child tree and the second is the largest value.
//!
//! * A "bound map", which holds a byte offset from the start of the block to
//!   the start of each bound.  The offset to the bound map is specified in
//!   [`IndexBlockHeader::bound_map_offset`] and the format in
//!   [`IndexBlockHeader::bound_map_varint`].
//!
//! * An array of "row totals", one for each of
//!   [`IndexBlockHeader::n_children`].  The first row total is the total number
//!   of rows in the first child tree, the second row total is that plus the
//!   total number of rows in the second child tree, and so on.
//!
//! * An array of "child offsets", one for each of
//!   [`IndexBlockHeader::n_children`].  Each one of these is the offset from
//!   the beginning of the file to the child block, expressed in 512-byte units.
//!
//! * An array of "child sizes", one for each of
//!   [`IndexBlockHeader::n_children`].  Each one of these is the size of the
//!   corresponding child block, expressed in 512-byte units.
//!
//! # Compression
//!
//! If [`FileTrailer::compression`] is not `None`, then each block in the file
//! other than the trailer block itself is compressed using the indicated
//! algorithm. A compressed block consists of:
//!
//! * `compressed_len`, a 4-byte little-endian integer that indicates the number
//!   of bytes of compressed data to follow.
//!
//! * `compressed_len` bytes of compressed data.
//!
//! * Padding with 0-bytes to a 512-byte alignment.
//!
//! Decompressing a compressed block yields the regular index or data block
//! format starting with a [`BlockHeader`].
use crate::storage::tracking_bloom_filter::TrackingBloomFilter;
use crate::storage::{buffer_cache::FBuf, file::BLOOM_FILTER_SEED};
use binrw::{BinRead, BinResult, BinWrite, Error as BinError, binrw, binwrite};
#[cfg(doc)]
use crc32c;
use fastbloom::BloomFilter;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use size_of::SizeOf;

/// Increment this on each incompatible change.
///
/// - v1: Initial version.
/// - v2: TODO.
/// - v3: Bloom filter format change.
/// - v4: Tup None optimizations.
/// - v5: Change in representation for Timestamp, ShortInterval
///
/// When a new version is created, make sure to generate new golden
/// files for it in crate `storage-test-compat` to check for
/// backwards compatibility.
pub const VERSION_NUMBER: u32 = 5;

/// Magic number for data blocks.
pub const DATA_BLOCK_MAGIC: [u8; 4] = *b"LFDB";

/// Magic number for index blocks.
pub const INDEX_BLOCK_MAGIC: [u8; 4] = *b"LFIB";

/// Magic number for the file trailer block.
pub const FILE_TRAILER_BLOCK_MAGIC: [u8; 4] = *b"LFFT";

/// Magic number for filter blocks.
pub const FILTER_BLOCK_MAGIC: [u8; 4] = *b"LFFB";

/// 8-byte header at the beginning of each block.
///
/// A block does not identify its own size, so any reference to a block must
/// also include the block's size.
#[binrw]
#[derive(Copy, Clone, Debug)]
pub struct BlockHeader {
    /// 32-bit [`crc32c`] checksum of the remainder of the block.
    pub checksum: u32,

    /// Magic number.  Magic numbers begin with `LF`, which stands for "layer
    /// file".
    pub magic: [u8; 4],
}

impl BlockHeader {
    pub(crate) fn new(magic: &[u8; 4]) -> Self {
        Self {
            checksum: 0,
            magic: *magic,
        }
    }
}

/// Additional metadata added to the file by the writer.
#[binrw]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct BatchMetadata {
    /// The number of records with negative weights in the batch.
    pub negative_weight_count: u64,
}

/// File trailer block.
///
/// Padded with zeros to exactly fill a 4-kB block.
///
/// Serialized and deserialized automatically with [`mod@binrw`].
#[binrw]
#[derive(Clone, Debug)]
pub struct FileTrailer {
    /// Block header with "LFFT" magic.
    #[brw(assert(header.magic == FILE_TRAILER_BLOCK_MAGIC, "file trailer has bad magic"))]
    pub header: BlockHeader,

    /// Currently, must be [`VERSION_NUMBER`].  In the future, this allows for
    /// detecting version changes and supporting backward compatibility.
    pub version: u32,

    /// Type of compression.
    #[bw(write_with = Compression::write_opt)]
    #[br(parse_with = Compression::parse_opt)]
    pub compression: Option<Compression>,

    /// Number of columns.
    #[bw(calc(columns.len() as u32))]
    pub n_columns: u32,

    /// The columns.
    #[br(count = n_columns)]
    pub columns: Vec<FileTrailerColumn>,

    /// File offset in bytes of the [FilterBlock].
    ///
    /// This is 0 if there is no filter block, or if the filter block size is
    /// bigger than `i32::MAX`.
    pub filter_offset: u64,

    /// Size in bytes of the [FilterBlock].
    ///
    /// This is 0 if there is no filter block, or if the filter block size is
    /// bigger than `i32::MAX`.
    pub filter_size: u32,

    /// Compatible feature bitmap.
    ///
    /// Each 1-bit in this bitmap indicates that a particular feature is
    /// enabled.  A reader that does not support a feature in this bitmap can
    /// still read the file (and log that a feature that it does not support is
    /// in use).
    ///
    /// One compatible feature is set: [COMPATIBLE_FEATURE_FILTER64].
    pub compatible_features: u64,

    /// Incompatible feature bitmap.
    ///
    /// Each 1-bit in this bitmap indicates that a particular feature is
    /// enabled.  A reader that does not support a feature in this bitmap must
    /// not attempt to read the file.
    ///
    /// If any of these bits are set, the version number must be at least 3.
    ///
    /// No incompatible features are currently defined.  This bitmap is for
    /// future expansion.
    pub incompatible_features: u64,

    /// File offset in bytes of the [FilterBlock].
    ///
    /// This is 0 if there is no filter block, or if the filter block size is
    /// less than `i32::MAX`.  If this is nonzero, then
    /// [COMPATIBLE_FEATURE_FILTER64] is set to 1 in
    /// [FileTrailer::compatible_features].
    pub filter_offset64: u64,

    /// Size in bytes of the [FilterBlock].
    ///
    /// This is 0 if there is no filter block, or if the filter block size is
    /// less than `i32::MAX`.  If this is nonzero, then
    /// [COMPATIBLE_FEATURE_FILTER64] is set to 1 in
    /// [FileTrailer::compatible_features].
    pub filter_size64: u64,

    /// Additional metadata added to the file by the writer.
    pub metadata: BatchMetadata,
}

impl FileTrailer {
    /// Returns the unsupported compatible features, if any.
    pub fn unsupported_compatible_features(&self) -> Option<u64> {
        let unsupported_compatible_features = self.compatible_features
            & !COMPATIBLE_FEATURE_FILTER64
            & !COMPATIBLE_FEATURE_NEGATIVE_WEIGHT_COUNT;
        if unsupported_compatible_features != 0 {
            Some(unsupported_compatible_features)
        } else {
            None
        }
    }

    /// Returns true if `feature` is set in the compatible feature bitmap.
    pub fn has_compatible_feature(&self, feature: u64) -> bool {
        (self.compatible_features & feature) != 0
    }

    /// Returns true if this file trailer has a 64-bit filter.
    pub fn has_filter64(&self) -> bool {
        self.has_compatible_feature(COMPATIBLE_FEATURE_FILTER64)
    }
}

/// Bit set to 1 in [FileTrailer::compatible_features] if a file has a Bloom
/// filter whose size does not fit in 32 bits.
pub const COMPATIBLE_FEATURE_FILTER64: u64 = 1 << 0;

/// Bit set to 1 in [FileTrailer::compatible_features] if the writer
/// added the `metadata` field to the trailer, including the `negative_weight_count` field.
/// This feature is backward and forward compatible, as trailers without this field will be
/// deserialized as if its value is 0. Conversely, old readers will simply ignore the field.
pub const COMPATIBLE_FEATURE_NEGATIVE_WEIGHT_COUNT: u64 = 1 << 1;

/// Information about a column.
///
/// Embedded inside the [`FileTrailer`] block.
///
/// Serialized and deserialized automatically with [`mod@binrw`].
#[binrw]
#[derive(Debug, Copy, Clone)]
pub struct FileTrailerColumn {
    /// File offset in bytes of the top-level block.  If the column has no rows,
    /// this should be 0.
    pub node_offset: u64,

    /// Length of the top-level block in bytes.  If the column has no rows, this
    /// should be 0.
    pub node_size: u32,

    /// Type of the top-level node.  If the column has no rows, this should be
    /// [`NodeType::Data`].
    #[brw(align_after = 4)]
    pub node_type: NodeType,

    /// Number of rows in the column.  Column 0 may have any number of rows;
    /// subsequent columns must each have more rows than the previous.
    pub n_rows: u64,
}

/// Type of a node in a column B-tree.
///
/// Serialized and deserialized automatically with [`mod@binrw`].
#[derive(Copy, Clone, PartialEq, Eq, Debug, SizeOf)]
#[binrw]
#[brw(repr(u8))]
pub enum NodeType {
    /// A data node that begins with a [`DataBlockHeader`].
    Data = 0,

    /// An index node that begins with a [`IndexBlockHeader`].
    Index = 1,
}

pub(crate) trait FixedLen {
    const LEN: usize;
}

/// Index block header.
///
/// Serialized and deserialized automatically with [`mod@binrw`].
#[binrw]
pub struct IndexBlockHeader {
    /// Block header with "LFIB" magic.
    #[brw(assert(header.magic == INDEX_BLOCK_MAGIC, "index block has bad magic"))]
    pub header: BlockHeader,

    /// Offset, in bytes from the beginning of the block, to the bound map.
    ///
    /// The bound map has `2 * n_children` entries.
    pub bound_map_offset: u32,

    /// Offset, in bytes from the beginning of the block, to the row totals.
    ///
    /// There are [`n_children`](Self::n_children) row totals.
    pub row_totals_offset: u32,

    /// Offset, in bytes from the beginning of the block, to the child offsets.
    ///
    /// There are [`n_children`](Self::n_children) child offsets.
    pub child_offsets_offset: u32,

    /// Offset, in bytes from the beginning of the block, to the child sizes.
    ///
    /// There are [`n_children`](Self::n_children) child sizes.
    pub child_sizes_offset: u32,

    /// Number of child nodes.
    pub n_children: u16,

    /// Child node type.  All of the child nodes have the same type.
    pub child_type: NodeType,

    /// The representation of the bound map.
    pub bound_map_varint: Varint,

    /// The representation of the row totals.
    pub row_total_varint: Varint,

    /// The representation of the child offsets.
    pub child_offset_varint: Varint,

    /// The representation of the child sizes.
    #[brw(align_after = 16)]
    pub child_size_varint: Varint,
}

impl FixedLen for IndexBlockHeader {
    const LEN: usize = 32;
}

/// Header for each data block.
///
/// Serialized and deserialized automatically with [`mod@binrw`].
#[binrw]
pub struct DataBlockHeader {
    /// Block header with `LFDB` magic.
    #[brw(assert(header.magic == DATA_BLOCK_MAGIC, "data block has bad magic"))]
    pub header: BlockHeader,

    /// Number of values (rows) in the block.
    pub n_values: u32,

    /// Offset, in bytes from the beginning of the block, to the value map.
    pub value_map_ofs: u32,

    /// Offset, in bytes from the beginning of the block, to the row groups.
    pub row_groups_ofs: u32,

    /// The representation of the value map.
    ///
    /// The value map is, logically, an array of [`n_values`](Self::n_values)
    /// integers, in which the `i`th entry is a byte offset from the
    /// beginning of the block to the [`Item<K, A>`](`super::Item`) that
    /// represents the `i` value in the data block.
    ///
    /// The physical representation of the value map depends on
    /// [`value_map_varint`](Self::value_map_varint):
    ///
    /// * When this is `Some(varint)`, [`value_map_ofs`](Self::value_map_ofs)
    ///   points to an array of [`n_values`](Self::n_values) integers, each
    ///   `varint` bytes long, that directly represent the value map.
    ///
    /// * When this is `None`, [`value_map_ofs`](Self::value_map_ofs) points to
    ///   an array of 2 32-bit integers `(start, stride)`.  The `i`th value in
    ///   the value map is then calculated as `start + stride * i`.
    ///
    /// This single-byte value is serialized as either a valid [`Varint`] for
    /// `Some(<value>)` or as a zero byte for `None`.
    #[bw(write_with = Varint::write_opt)]
    #[br(parse_with = Varint::parse_opt)]
    pub value_map_varint: Option<Varint>,

    /// The representation of row groups.
    ///
    /// In columns other than the last column, this value is `Some(varint)` and
    /// the row groups are an array of `n_values + 1` `varint`-byte integers
    /// starting at byte offset `value_map_ofs`(Self::value_map_ofs) within this
    /// block.  The entries with indexes `i` and `i + 1` are the range of rows
    /// in the next column associated with this column's row `i`.
    ///
    /// In the last column, this value is `None` and
    /// [`value_map_ofs`](Self::value_map_ofs) should be 0.
    ///
    /// This single-byte value is serialized as either a valid [`Varint`] for
    /// `Some(<value>)` or as a zero byte for `None`.
    #[bw(write_with = Varint::write_opt)]
    #[br(parse_with = Varint::parse_opt)]
    #[brw(align_after = 16)]
    pub row_group_varint: Option<Varint>,
}

impl FixedLen for DataBlockHeader {
    const LEN: usize = 32;
}

/// Variable-length integer identifier.
///
/// A `Varint` identifies the size of integers in arrays.  This saves space when
/// the integers are small, which is common in practice.  Saving space to reduce
/// overhead is important because it reduces the size and the depth of the index
/// structure.
///
/// To save space, arrays of `Varint`s values aren't aligned.
#[derive(Copy, Clone, Debug, PartialEq, Eq, FromPrimitive)]
#[binrw]
#[brw(repr(u8))]
pub enum Varint {
    /// 8-bit integer.
    B8 = 1,
    /// 16-bit integer.
    B16 = 2,
    /// 24-bit integer.
    B24 = 3,
    /// 32-bit integer.
    B32 = 4,
    /// 48-bit integer.
    B48 = 6,
    /// 64-bit integer.
    B64 = 8,
}
impl Varint {
    pub(crate) fn from_max_value(max_value: u64) -> Varint {
        #[allow(clippy::unusual_byte_groupings, clippy::match_overlapping_arm)]
        match max_value {
            ..=0xff => Varint::B8,
            ..=0xffff => Varint::B16,
            ..=0xffff_ff => Varint::B24,
            ..=0xffff_ffff => Varint::B32,
            ..=0xffff_ffff_ffff => Varint::B48,
            _ => Varint::B64,
        }
    }
    pub(crate) fn from_len(len: usize) -> Varint {
        Self::from_max_value(len as u64 - 1)
    }
    pub(crate) fn alignment(&self) -> usize {
        match self {
            Self::B24 => 1,
            Self::B48 => 2,
            _ => *self as usize,
        }
    }
    pub(crate) fn align(&self, offset: usize) -> usize {
        next_multiple_of_pow2(offset, self.alignment())
    }
    pub(crate) fn len(&self) -> usize {
        *self as usize
    }
    pub(crate) fn put(&self, dst: &mut FBuf, value: u64) {
        #[allow(clippy::unnecessary_cast)]
        match *self {
            Self::B8 => dst.push(value as u8),
            Self::B16 => dst.extend_from_slice(&(value as u16).to_le_bytes()),
            Self::B24 => dst.extend_from_slice(&(value as u32).to_le_bytes()[..3]),
            Self::B32 => dst.extend_from_slice(&(value as u32).to_le_bytes()),
            Self::B48 => dst.extend_from_slice(&(value as u64).to_le_bytes()[..6]),
            Self::B64 => dst.extend_from_slice(&(value as u64).to_le_bytes()),
        }
    }
    pub(crate) fn get(&self, src: &FBuf, offset: usize) -> u64 {
        let mut raw = [0u8; 8];
        raw[..self.len()].copy_from_slice(&src[offset..offset + self.len()]);
        u64::from_le_bytes(raw)
    }
    #[binrw::parser(reader, endian)]
    pub(crate) fn parse_opt() -> BinResult<Option<Varint>> {
        let byte: u8 = <_>::read_options(reader, endian, ())?;
        match byte {
            0 => Ok(None),
            _ => match FromPrimitive::from_u8(byte) {
                Some(varint) => Ok(Some(varint)),
                None => Err(BinError::NoVariantMatch {
                    pos: reader.stream_position()? - 1,
                }),
            },
        }
    }
    #[binrw::writer(writer, endian)]
    pub(crate) fn write_opt(value: &Option<Varint>) -> BinResult<()> {
        value
            .map_or(0, |varint| varint as u8)
            .write_options(writer, endian, ())
    }
}

// Rounds up `offset` to the next multiple of `alignment`, which must be a power
// of 2.  This is equivalent to `offset.next_multiple(alignment)` except for the
// assumption about `alignment` being a power of 2, which allows it to be faster
// and smaller in the case where the compiler can't see the power-of-2 property.
fn next_multiple_of_pow2(offset: usize, alignment: usize) -> usize {
    let mask = alignment - 1;
    (offset + mask) & !mask
}

/// Type of compression.
#[derive(Copy, Clone, Debug, PartialEq, Eq, FromPrimitive, SizeOf)]
#[binrw]
#[brw(repr(u8))]
pub enum Compression {
    /// [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)).
    Snappy = 1,
}

impl Compression {
    #[binrw::parser(reader, endian)]
    pub(crate) fn parse_opt() -> BinResult<Option<Self>> {
        let byte: u8 = <_>::read_options(reader, endian, ())?;
        match byte {
            0 => Ok(None),
            _ => match FromPrimitive::from_u8(byte) {
                Some(value) => Ok(Some(value)),
                None => Err(BinError::NoVariantMatch {
                    pos: reader.stream_position()? - 1,
                }),
            },
        }
    }
    #[binrw::writer(writer, endian)]
    pub(crate) fn write_opt(value: &Option<Self>) -> BinResult<()> {
        value
            .map_or(0, |value| value as u8)
            .write_options(writer, endian, ())
    }
}

/// A block representing a Bloom filter.
///
/// The Bloom filter contains a member for each key in column 0.
#[binrw]
pub struct FilterBlock {
    /// Block header with "LFFB" magic.
    #[brw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))]
    pub header: BlockHeader,

    /// [BloomFilter::num_hashes].
    pub num_hashes: u32,

    /// Number of elements in `data`.
    #[bw(try_calc(u64::try_from(data.len())))]
    pub len: u64,

    /// Bloom filter contents.
    #[br(count = len)]
    pub data: Vec<u64>,
}

impl From<FilterBlock> for TrackingBloomFilter {
    fn from(block: FilterBlock) -> Self {
        TrackingBloomFilter::new(
            BloomFilter::from_vec(block.data)
                .seed(&BLOOM_FILTER_SEED)
                .hashes(block.num_hashes),
        )
    }
}

/// A block representing a Bloom filter (with data by reference).
#[binwrite]
pub struct FilterBlockRef<'a> {
    /// Block header with "LFFB" magic.
    #[bw(assert(header.magic == FILTER_BLOCK_MAGIC, "filter block has bad magic"))]
    pub header: BlockHeader,

    /// [BloomFilter::num_hashes].
    pub num_hashes: u32,

    /// Number of elements in `data`.
    #[bw(try_calc(u64::try_from(data.len())))]
    pub len: u64,

    /// Bloom filter contents.
    pub data: &'a [u64],
}

impl<'a> From<&'a TrackingBloomFilter> for FilterBlockRef<'a> {
    fn from(value: &'a TrackingBloomFilter) -> Self {
        FilterBlockRef {
            header: BlockHeader::new(&FILTER_BLOCK_MAGIC),
            num_hashes: value.num_hashes(),
            data: value.as_slice(),
        }
    }
}