time_key_stream_set/
time_key.rs

1use tsz_compress::prelude::*;
2
3///
4/// TimeKey is a trait that defines the interface for a time key.
5///
6/// A stream set will be
7/// * ordered on timestamp
8/// * columnar on partition
9/// * compressed at rest
10/// * backed by a u128 value
11pub trait TimeKey: Copy {
12    fn timestamp_us(&self) -> u64;
13    fn partition(&self) -> u64;
14    fn key(&self) -> TimeKeyItem {
15        TimeKeyItem::new(*self)
16    }
17}
18
19///
20/// A `UserDeviceTimeKey` is a time key that is used to deduplicate
21/// a stream of events from a user on a device.
22///
23/// It is assumed that the timestamp, user_id, and device_id columns
24/// are positive integers as with epoch time and serial database ids.
25///
26#[derive(Debug, Clone, Copy)]
27pub struct UserDeviceTimeKey {
28    pub timestamp_us: i64,
29    pub user_id: i32,
30    pub device_id: i32,
31}
32
33///
34/// Implement the `TimeKey` trait for `UserDeviceTimeKey`
35/// so that it can be used in a `TkStreamSet`.
36///
37impl TimeKey for UserDeviceTimeKey {
38    fn timestamp_us(&self) -> u64 {
39        self.timestamp_us as u64
40    }
41    fn partition(&self) -> u64 {
42        ((self.user_id as u64) << 32) | self.device_id as u64
43    }
44}
45
46///
47/// When a `TimeKey` is persisted it is ordered by partition and timestamp,
48/// and compressed using the `tsz_compress` crate for efficient storage.
49///
50#[derive(
51    Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, DeltaEncodable, Compressible, Decompressible,
52)]
53pub struct TimeKeyItem {
54    pub partition: i64,
55    pub timestamp_us: i64,
56}
57
58impl TimeKeyItem {
59    pub fn new<T: TimeKey>(key: T) -> Self {
60        TimeKeyItem {
61            partition: key.partition() as i64,
62            timestamp_us: key.timestamp_us() as i64,
63        }
64    }
65}