dataz/
kvtd.rs

1// Copyright 2022 Daniel Harrison. All Rights Reserved.
2
3//! `(Key, Value, Time, Diff)` tuples a la [Differential Dataflow]
4//!
5//! [Differential Dataflow]: https://crates.io/crates/differential-dataflow
6
7use crate::col::Col;
8use crate::{DynTable, Set, Table};
9
10/// Configuration for [Kvtd].
11#[derive(Debug)]
12pub struct KvtdConfig {
13    /// The number of bytes in the val column of each row.
14    pub val_bytes: usize,
15    /// The total number of rows in all batches.
16    pub num_rows: usize,
17    /// The maximum number of rows to include in any given batch.
18    pub max_rows_per_batch: usize,
19}
20
21/// `(Key, Val, Time, Diff)` tuples a la [Differential Dataflow]
22///
23/// [Differential Dataflow]: https://crates.io/crates/differential-dataflow
24#[derive(Debug)]
25pub struct Kvtd {
26    pub(crate) config: KvtdConfig,
27}
28
29impl Kvtd {}
30
31impl Set for Kvtd {
32    type Config = KvtdConfig;
33
34    fn init(config: Self::Config) -> Self {
35        Kvtd { config }
36    }
37
38    fn tables(&self) -> Vec<&dyn DynTable> {
39        vec![self]
40    }
41}
42
43impl DynTable for Kvtd {
44    fn name(&self) -> &'static str {
45        "kvtd"
46    }
47
48    fn num_batches(&self) -> usize {
49        (self.config.num_rows + self.config.max_rows_per_batch - 1) / self.config.max_rows_per_batch
50    }
51}
52
53impl Table for Kvtd {
54    type Data = (String, Vec<u8>, u64, i64);
55
56    fn gen_batch<C: Col<Self::Data>>(&self, idx: usize, batch: &mut C) {
57        batch.clear();
58
59        let row_start = idx * self.config.max_rows_per_batch;
60        let row_end = std::cmp::min(
61            row_start + self.config.max_rows_per_batch,
62            self.config.num_rows,
63        );
64        let len = row_end.saturating_sub(row_start);
65        if len == 0 {
66            return;
67        }
68
69        let mut key_buf = String::with_capacity(KEY_BYTES);
70        let mut val_buf = Vec::with_capacity(self.config.val_bytes);
71        for idx in 0..len {
72            key_buf.clear();
73            val_buf.clear();
74
75            to_hex(&mut key_buf, idx);
76            gen_vals(&mut val_buf, idx, 1, self.config.val_bytes);
77            let ts = idx as u64;
78            let diff = 1;
79            batch.push((key_buf.as_str(), val_buf.as_slice(), ts, diff));
80        }
81    }
82}
83
84const KEY_BYTES: usize = 64 / 4;
85#[allow(dead_code)]
86fn gen_keys(col: &mut String, start: usize, len: usize) {
87    col.clear();
88    col.reserve(len * KEY_BYTES);
89    for x in start..start + len {
90        to_hex(col, x);
91    }
92}
93
94fn gen_vals(col: &mut Vec<u8>, start: usize, len: usize, val_bytes: usize) {
95    col.clear();
96    col.reserve(len * val_bytes);
97
98    const LARGE_PRIME: usize = 18_446_744_073_709_551_557;
99    for idx in start..start + len {
100        // Generate val_bytes bytes using Knuth's multiplicative integer hashing
101        // method, seeded with the row_idx (plus one so that we don't start with
102        // all zeros for idx 0).
103        let mut x = idx + 1;
104        for _ in 0..val_bytes {
105            x = x.wrapping_mul(LARGE_PRIME);
106            // TODO: Do this 8 bytes at a time instead.
107            col.push(x as u8);
108        }
109    }
110}
111
112#[allow(dead_code)]
113fn gen_times(col: &mut Vec<u64>, start: usize, len: usize) {
114    col.clear();
115    col.reserve(len);
116    for x in start as u64..(start + len) as u64 {
117        col.push(x);
118    }
119}
120
121#[allow(dead_code)]
122fn gen_diffs(col: &mut Vec<i64>, len: usize) {
123    col.clear();
124    col.resize(len, 1i64);
125}
126
127fn to_hex(col: &mut String, x: usize) {
128    const TOP_FOUR_BITS_MASK: usize = 0xf000_0000_0000_0000;
129    const HEX_LOOKUP: &[char; 16] = &[
130        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f',
131    ];
132    let mut x = x;
133    for _ in 0..KEY_BYTES {
134        // TODO: Do this two characters at a time instead.
135        col.push(HEX_LOOKUP[(x & TOP_FOUR_BITS_MASK) >> 60]);
136        x = x << 4;
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[test]
145    fn hex() {
146        #[track_caller]
147        fn test_case(i: usize, expected: &str) {
148            let mut actual = String::new();
149            to_hex(&mut actual, i);
150            assert_eq!(actual, expected);
151        }
152
153        test_case(0, "0000000000000000");
154        test_case(1, "0000000000000001");
155        test_case(2, "0000000000000002");
156        test_case(3, "0000000000000003");
157        test_case(4, "0000000000000004");
158        test_case(5, "0000000000000005");
159        test_case(6, "0000000000000006");
160        test_case(7, "0000000000000007");
161        test_case(8, "0000000000000008");
162        test_case(9, "0000000000000009");
163        test_case(10, "000000000000000a");
164        test_case(11, "000000000000000b");
165        test_case(12, "000000000000000c");
166        test_case(13, "000000000000000d");
167        test_case(14, "000000000000000e");
168        test_case(15, "000000000000000f");
169        test_case(16, "0000000000000010");
170        test_case(17, "0000000000000011");
171        test_case(u64::MAX as usize, "ffffffffffffffff");
172    }
173}