time_key_stream_set/
lib.rs1use serde::{Deserialize, Serialize};
2use std::{
3 fmt::{Display, Formatter},
4 path::PathBuf,
5 time::Duration,
6};
7use stream_set::TkStreamSetState;
8use tempdir::TempDir;
9
10pub mod stream_set;
11pub mod time_key;
12
13pub mod prelude {
14 pub use crate::time_key::{TimeKey, TimeKeyItem, UserDeviceTimeKey};
15 pub use crate::{MemoryLimit, TkStreamSet, TkStreamSetBuilder, TkssError, TkssResult};
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
28pub enum MemoryLimit {
29 VeryLow,
31 Low,
33 Medium,
35 High,
37 VeryHigh,
39 Unlimited,
41}
42
43#[derive(Debug)]
49pub struct TkStreamSet {
50 memory_limit: MemoryLimit,
51 working_dir: PathBuf,
52 segment_time_interval: Duration,
53 segment_retention_period: Duration,
54 state: TkStreamSetState,
55}
56
57#[derive(Default)]
61pub struct TkStreamSetBuilder {
62 memory_limit: Option<MemoryLimit>,
63 working_dir: Option<PathBuf>,
64 segment_time_interval: Option<Duration>,
65 segment_retention_period: Option<Duration>,
66}
67
68impl TkStreamSetBuilder {
69 pub fn new() -> TkStreamSetBuilder {
74 TkStreamSetBuilder::default()
75 }
76
77 pub async fn build(self) -> TkssResult<TkStreamSet> {
83 let memory_limit = self.memory_limit.unwrap_or(MemoryLimit::VeryLow);
84 let working_dir = self.working_dir.unwrap_or_else(|| {
85 let tmp = TempDir::new("tkstreamset").unwrap();
86 tmp.into_path()
87 });
88 let segment_time_interval = self
89 .segment_time_interval
90 .unwrap_or(Duration::from_secs(60 * 60));
91 let segment_retention_period = self
92 .segment_retention_period
93 .unwrap_or(Duration::from_secs(60 * 60 * 24 * 7));
94 let mut sset = TkStreamSet {
95 memory_limit,
96 working_dir,
97 segment_time_interval,
98 segment_retention_period,
99 state: TkStreamSetState::default(),
100 };
101 sset.initialize().await?;
102 Ok(sset)
103 }
104
105 pub fn with_memory_limit(self, limit: MemoryLimit) -> TkStreamSetBuilder {
114 TkStreamSetBuilder {
115 memory_limit: Some(limit),
116 ..self
117 }
118 }
119
120 pub fn with_working_dir(self, dir: PathBuf) -> TkStreamSetBuilder {
128 TkStreamSetBuilder {
129 working_dir: Some(dir),
130 ..self
131 }
132 }
133
134 pub fn with_segment_time_interval(self, interval: Duration) -> TkStreamSetBuilder {
143 TkStreamSetBuilder {
144 segment_time_interval: Some(interval),
145 ..self
146 }
147 }
148
149 pub fn with_segment_retention_period(self, period: Duration) -> TkStreamSetBuilder {
158 TkStreamSetBuilder {
159 segment_retention_period: Some(period),
160 ..self
161 }
162 }
163}
164
165pub type TkssResult<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
166
167#[derive(Debug, Clone)]
168pub enum TkssError {
169 Unknown(String),
170 InvalidDirectory(PathBuf),
171 InvalidHeader(PathBuf),
172 InvalidLength(PathBuf),
173}
174
175impl Display for TkssError {
176 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177 match self {
178 TkssError::Unknown(s) => write!(f, "Unknown error: {}", s),
179 TkssError::InvalidDirectory(p) => write!(f, "Invalid directory: {}", p.display()),
180 TkssError::InvalidHeader(p) => write!(f, "Invalid header: {}", p.display()),
181 TkssError::InvalidLength(p) => write!(f, "Invalid length: {}", p.display()),
182 }
183 }
184}
185
186impl std::error::Error for TkssError {}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[tokio::test]
193 async fn can_build_instance() {
194 let t = TkStreamSetBuilder::new().build().await.unwrap();
195 assert_eq!(t.memory_limit, MemoryLimit::VeryLow);
196
197 let t = TkStreamSetBuilder::new()
198 .with_memory_limit(MemoryLimit::Medium)
199 .build()
200 .await
201 .unwrap();
202 assert_eq!(t.memory_limit, MemoryLimit::Medium);
203
204 let t = TkStreamSetBuilder::new()
205 .with_memory_limit(MemoryLimit::High)
206 .with_working_dir(PathBuf::from("/tmp"))
207 .build()
208 .await
209 .unwrap();
210 assert_eq!(t.memory_limit, MemoryLimit::High);
211 assert_eq!(t.working_dir, PathBuf::from("/tmp"));
212 }
213}