time_key_stream_set/
lib.rs

1use serde::{Deserialize, Serialize};
2use std::{
3    fmt::{Display, Formatter},
4    path::PathBuf,
5    time::Duration,
6};
7use stream_set::TkStreamSetState;
8use tempdir::TempDir;
9
10pub mod stream_set;
11pub mod time_key;
12
13pub mod prelude {
14    pub use crate::time_key::{TimeKey, TimeKeyItem, UserDeviceTimeKey};
15    pub use crate::{MemoryLimit, TkStreamSet, TkStreamSetBuilder, TkssError, TkssResult};
16}
17
18///
19/// A `TkStreamSet` is a data structure that is primarily backed on disk.
20/// The data structure will only be temporarily loaded into memory with
21/// pages cached in memory as needed, but limited by the `MemoryLimit` setting.
22///
23/// The `MemoryLimit` is a soft limit that is attempted to be enforced with best
24/// effort but may be exceeded in some cases by a relatively small margin, ~5%.
25///
26
27#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
28pub enum MemoryLimit {
29    /// 1 GiB
30    VeryLow,
31    /// 8 GiB
32    Low,
33    /// 32 GiB
34    Medium,
35    /// 128 GiB
36    High,
37    /// 1 TiB
38    VeryHigh,
39    /// No enforced limit
40    Unlimited,
41}
42
43///
44/// A `TkStreamSet` is the top-level context for
45/// a _time key_ stream set used for deduplication of a
46/// mostly continuous stream of time-keyed data.
47///
48#[derive(Debug)]
49pub struct TkStreamSet {
50    memory_limit: MemoryLimit,
51    working_dir: PathBuf,
52    segment_time_interval: Duration,
53    segment_retention_period: Duration,
54    state: TkStreamSetState,
55}
56
57///
58/// A `TkStreamSetBuilder` is used to create a `TkStreamSet` with clear configuration.
59///
60#[derive(Default)]
61pub struct TkStreamSetBuilder {
62    memory_limit: Option<MemoryLimit>,
63    working_dir: Option<PathBuf>,
64    segment_time_interval: Option<Duration>,
65    segment_retention_period: Option<Duration>,
66}
67
68impl TkStreamSetBuilder {
69    ///
70    /// Create a new instance of the builder
71    /// with default configuration.
72    ///
73    pub fn new() -> TkStreamSetBuilder {
74        TkStreamSetBuilder::default()
75    }
76
77    ///
78    /// Create an instance while consuming the builder
79    ///
80    /// Default the rest of the configuration.
81    ///
82    pub async fn build(self) -> TkssResult<TkStreamSet> {
83        let memory_limit = self.memory_limit.unwrap_or(MemoryLimit::VeryLow);
84        let working_dir = self.working_dir.unwrap_or_else(|| {
85            let tmp = TempDir::new("tkstreamset").unwrap();
86            tmp.into_path()
87        });
88        let segment_time_interval = self
89            .segment_time_interval
90            .unwrap_or(Duration::from_secs(60 * 60));
91        let segment_retention_period = self
92            .segment_retention_period
93            .unwrap_or(Duration::from_secs(60 * 60 * 24 * 7));
94        let mut sset = TkStreamSet {
95            memory_limit,
96            working_dir,
97            segment_time_interval,
98            segment_retention_period,
99            state: TkStreamSetState::default(),
100        };
101        sset.initialize().await?;
102        Ok(sset)
103    }
104
105    ///
106    /// Set the memory limit for the stream set.
107    ///
108    /// If not set, the limit will be set to `MemoryLimit::VeryLow`.
109    /// If set, the limit will be enforced with best effort.
110    /// The limit is a soft limit and may be exceeded in some cases
111    /// by a relatively small margin, ~5%.
112    ///
113    pub fn with_memory_limit(self, limit: MemoryLimit) -> TkStreamSetBuilder {
114        TkStreamSetBuilder {
115            memory_limit: Some(limit),
116            ..self
117        }
118    }
119
120    ///
121    /// Set the working directory for the stream set.
122    ///
123    /// If not set, a temporary directory will be created
124    /// and cleaned up on process exit.
125    /// If set, the directory must exist and be writable.
126    ///
127    pub fn with_working_dir(self, dir: PathBuf) -> TkStreamSetBuilder {
128        TkStreamSetBuilder {
129            working_dir: Some(dir),
130            ..self
131        }
132    }
133
134    ///
135    /// Set the segment time interval for the stream set.
136    /// This determines the size of the time-keyed segments that need
137    /// to be loaded into memory to perform deduplication of a batch insert.
138    ///
139    /// If not set, the interval will be set to 1 hour.
140    /// If set, the interval will be enforced.
141    ///
142    pub fn with_segment_time_interval(self, interval: Duration) -> TkStreamSetBuilder {
143        TkStreamSetBuilder {
144            segment_time_interval: Some(interval),
145            ..self
146        }
147    }
148
149    ///
150    /// Set the segment retention period for the stream set.
151    /// This determines how long time-keyed segments are kept on disk (even if inactive or small).
152    /// This period is enforced while inserting data into the stream set.
153    ///
154    /// If not set, the retention period will be set to 7 days.
155    /// If set, the retention period will be enforced.
156    ///
157    pub fn with_segment_retention_period(self, period: Duration) -> TkStreamSetBuilder {
158        TkStreamSetBuilder {
159            segment_retention_period: Some(period),
160            ..self
161        }
162    }
163}
164
165pub type TkssResult<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
166
167#[derive(Debug, Clone)]
168pub enum TkssError {
169    Unknown(String),
170    InvalidDirectory(PathBuf),
171    InvalidHeader(PathBuf),
172    InvalidLength(PathBuf),
173}
174
175impl Display for TkssError {
176    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177        match self {
178            TkssError::Unknown(s) => write!(f, "Unknown error: {}", s),
179            TkssError::InvalidDirectory(p) => write!(f, "Invalid directory: {}", p.display()),
180            TkssError::InvalidHeader(p) => write!(f, "Invalid header: {}", p.display()),
181            TkssError::InvalidLength(p) => write!(f, "Invalid length: {}", p.display()),
182        }
183    }
184}
185
186impl std::error::Error for TkssError {}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[tokio::test]
193    async fn can_build_instance() {
194        let t = TkStreamSetBuilder::new().build().await.unwrap();
195        assert_eq!(t.memory_limit, MemoryLimit::VeryLow);
196
197        let t = TkStreamSetBuilder::new()
198            .with_memory_limit(MemoryLimit::Medium)
199            .build()
200            .await
201            .unwrap();
202        assert_eq!(t.memory_limit, MemoryLimit::Medium);
203
204        let t = TkStreamSetBuilder::new()
205            .with_memory_limit(MemoryLimit::High)
206            .with_working_dir(PathBuf::from("/tmp"))
207            .build()
208            .await
209            .unwrap();
210        assert_eq!(t.memory_limit, MemoryLimit::High);
211        assert_eq!(t.working_dir, PathBuf::from("/tmp"));
212    }
213}