1use std::{
4 collections::VecDeque,
5 io::Error as IoError,
6 num::{NonZeroU32, NonZeroU64, NonZeroUsize},
7 path::PathBuf,
8 result::Result as StdResult,
9 time::{Duration, SystemTime},
10};
11
12use base64::prelude::*;
13use thiserror::Error;
14
15use crate::{
16 fs::WriteResult,
17 time::{Interval, SystemInstant},
18};
19
20pub mod field;
22
23#[cfg(feature = "csv-storage")]
24pub mod csv;
25
26#[derive(Error, Debug)]
27pub enum Error {
28 #[error(transparent)]
29 Io(#[from] IoError),
30
31 #[cfg(feature = "csv-storage")]
32 #[error(transparent)]
33 Csv(#[from] ::csv::Error),
34
35 #[error(transparent)]
36 Other(#[from] anyhow::Error),
37}
38
39#[cfg(feature = "csv-storage")]
40impl From<crate::fs::csv::Error> for Error {
41 fn from(err: crate::fs::csv::Error) -> Self {
42 use crate::fs::csv::Error::*;
43 match err {
44 Io(err) => Error::Io(err),
45 Csv(err) => Error::Csv(err),
46 }
47 }
48}
49
50pub type Result<T> = StdResult<T, Error>;
51
52pub const MAX_PREALLOCATED_CAPACITY_LIMIT: usize = 16_384; #[derive(Debug, Clone)]
57pub struct StorageStatus {
58 pub descriptor: StorageDescriptor,
59 pub statistics: Option<StorageStatistics>,
60}
61
62#[derive(Debug, Clone, Copy, Eq, PartialEq)]
63pub enum TimeInterval {
64 Days(NonZeroU32),
65}
66
67const SECONDS_PER_DAY: u64 = 24 * 3_600;
68
69impl From<TimeInterval> for Duration {
70 fn from(from: TimeInterval) -> Self {
71 use TimeInterval::*;
72 match from {
73 Days(days) => Duration::from_secs(SECONDS_PER_DAY * u64::from(days.get())),
74 }
75 }
76}
77
78impl From<TimeInterval> for Interval {
79 fn from(from: TimeInterval) -> Self {
80 use TimeInterval::*;
81 match from {
82 Days(days) => Interval::Days(days.get()),
83 }
84 }
85}
86
87#[derive(Debug, Clone, Copy, Eq, PartialEq)]
88pub enum MemorySize {
89 Bytes(NonZeroU64),
90}
91
92#[derive(Debug, Clone, Eq, PartialEq)]
93pub struct StorageConfig {
94 pub retention_time: TimeInterval,
95 pub segmentation: StorageSegmentConfig,
96}
97
98#[derive(Debug, Clone, Eq, PartialEq)]
99pub struct StorageSegmentConfig {
100 pub time_interval: TimeInterval,
101 pub size_limit: MemorySize,
102}
103
104#[derive(Debug, Clone)]
105pub struct StorageDescriptor {
106 pub kind: String,
107 pub base_path: Option<PathBuf>,
108 pub binary_data_format: BinaryDataFormat,
109}
110
111#[derive(Debug, Clone)]
112pub struct StorageStatistics {
113 pub total_records: Option<usize>,
115
116 pub total_bytes: Option<u64>,
118
119 pub segments: Option<Vec<StorageSegmentStatistics>>,
121}
122
123#[derive(Debug, Clone)]
124pub struct StorageSegmentStatistics {
125 pub created_at: SystemTime,
126
127 pub total_records: usize,
128
129 pub total_bytes: Option<u64>,
131}
132
133pub trait ReadableRecordPrelude {
134 fn created_at_offset(&self) -> CreatedAtOffset;
135}
136
137pub trait WritableRecordPrelude {
138 fn set_created_at_offset(&mut self, created_at_offset: CreatedAtOffset);
139}
140
141pub type CreatedAtOffsetNanos = u64;
142
143#[derive(Default, Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
144pub struct CreatedAtOffset {
145 nanos: CreatedAtOffsetNanos,
146}
147
148impl CreatedAtOffset {
149 #[must_use]
150 pub fn system_time_from_origin(self, origin: SystemTime) -> SystemTime {
151 origin + Duration::from(self)
152 }
153
154 #[must_use]
155 pub const fn to_duration(self) -> Duration {
156 let Self { nanos } = self;
157 Duration::from_nanos(nanos)
158 }
159}
160
161impl From<CreatedAtOffsetNanos> for CreatedAtOffset {
162 fn from(nanos: CreatedAtOffsetNanos) -> Self {
163 Self { nanos }
164 }
165}
166
167impl From<CreatedAtOffset> for CreatedAtOffsetNanos {
168 fn from(from: CreatedAtOffset) -> Self {
169 let CreatedAtOffset { nanos } = from;
170 nanos
171 }
172}
173
174impl From<Duration> for CreatedAtOffset {
175 fn from(from: Duration) -> Self {
176 let nanos = from.as_nanos();
177 debug_assert!(nanos <= u128::from(CreatedAtOffsetNanos::MAX));
179 Self {
180 nanos: nanos as CreatedAtOffsetNanos,
181 }
182 }
183}
184
185impl From<CreatedAtOffset> for Duration {
186 fn from(from: CreatedAtOffset) -> Self {
187 from.to_duration()
188 }
189}
190
191#[derive(Debug, Default, Clone, Eq, PartialEq)]
192pub struct RecordPreludeFilter {
193 pub since_created_at: Option<SystemTime>,
194 pub until_created_at: Option<SystemTime>,
195}
196
197pub trait RecordStorageBase {
198 fn descriptor(&self) -> &StorageDescriptor;
199
200 fn config(&self) -> &StorageConfig;
201
202 fn replace_config(&mut self, new_config: StorageConfig) -> StorageConfig;
203
204 fn perform_housekeeping(&mut self) -> Result<()>;
205
206 fn retain_all_records_created_since(&mut self, created_since: SystemTime) -> Result<()>;
208
209 fn report_statistics(&mut self) -> Result<StorageStatistics>;
210}
211
212#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
214pub enum BinaryDataFormat {
215 #[default]
219 Bytes,
220
221 Utf8,
225}
226
227fn encode_binary_data_bytes(input: impl AsRef<[u8]>) -> String {
228 BASE64_STANDARD_NO_PAD.encode(input)
229}
230
231fn encode_binary_data_utf8(input: Vec<u8>) -> anyhow::Result<String> {
232 String::from_utf8(input).map_err(Into::into)
233}
234
235pub fn encode_binary_data_into_string(
236 input: Vec<u8>,
237 format: BinaryDataFormat,
238) -> anyhow::Result<String> {
239 match format {
240 BinaryDataFormat::Bytes => Ok(encode_binary_data_bytes(&input)),
241 BinaryDataFormat::Utf8 => encode_binary_data_utf8(input),
242 }
243}
244
245fn decode_binary_data_bytes(input: impl AsRef<[u8]>) -> anyhow::Result<Vec<u8>> {
246 BASE64_STANDARD_NO_PAD.decode(input).map_err(Into::into)
247}
248
249fn decode_binary_data_utf8(input: String) -> Vec<u8> {
250 input.into_bytes()
251}
252
253pub fn decode_binary_data_from_string(
254 input: String,
255 format: BinaryDataFormat,
256) -> anyhow::Result<Vec<u8>> {
257 match format {
258 BinaryDataFormat::Bytes => decode_binary_data_bytes(&input),
259 BinaryDataFormat::Utf8 => Ok(decode_binary_data_utf8(input)),
260 }
261}
262
263pub trait RecordStorageWrite<R>: RecordStorageBase
264where
265 R: WritableRecordPrelude,
266{
267 fn append_record(
268 &mut self,
269 created_at: &SystemInstant,
270 record: R,
271 ) -> Result<(WriteResult, CreatedAtOffset)>;
272}
273
274pub trait RecordStorageRead<R>: RecordStorageBase {
275 fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<(SystemTime, R)>>;
276}
277
278#[allow(missing_debug_implementations)]
279pub struct InMemoryRecordStorage<R> {
280 config: StorageConfig,
281 descriptor: StorageDescriptor,
282 created_at_origin: SystemInstant,
283 records: VecDeque<R>,
284 _record_phantom: std::marker::PhantomData<R>,
285}
286
287impl<R> InMemoryRecordStorage<R>
288where
289 R: Clone,
290{
291 #[must_use]
292 pub fn new(config: StorageConfig) -> Self {
293 let descriptor = StorageDescriptor {
294 kind: "in-memory".to_string(),
295 base_path: None,
296 binary_data_format: Default::default(), };
298 Self {
299 config,
300 descriptor,
301 created_at_origin: SystemInstant::now(),
302 records: VecDeque::with_capacity(MAX_PREALLOCATED_CAPACITY_LIMIT),
303 _record_phantom: Default::default(),
304 }
305 }
306
307 pub fn recent_records(&mut self, limit: NonZeroUsize) -> Result<Vec<R>> {
308 let total_count = self.records.len();
309 let limited_count = limit.get().min(total_count);
310 Ok(self
311 .records
312 .iter()
313 .skip(total_count - limited_count)
314 .cloned()
315 .collect())
316 }
317}
318
319impl<R> RecordStorageBase for InMemoryRecordStorage<R>
320where
321 R: ReadableRecordPrelude,
322{
323 fn descriptor(&self) -> &StorageDescriptor {
324 &self.descriptor
325 }
326
327 fn config(&self) -> &StorageConfig {
328 &self.config
329 }
330
331 fn replace_config(&mut self, new_config: StorageConfig) -> StorageConfig {
332 std::mem::replace(&mut self.config, new_config)
333 }
334
335 fn perform_housekeeping(&mut self) -> Result<()> {
336 Ok(())
337 }
338
339 fn retain_all_records_created_since(&mut self, created_since: SystemTime) -> Result<()> {
340 let created_since_offset = created_since
341 .duration_since(self.created_at_origin.system_time())
342 .unwrap_or_default()
343 .into();
344 while let Some(first) = self.records.front() {
345 if first.created_at_offset() >= created_since_offset {
346 break;
347 }
348 self.records.pop_front();
349 }
350 Ok(())
351 }
352
353 fn report_statistics(&mut self) -> Result<StorageStatistics> {
354 Ok(StorageStatistics {
355 total_records: Some(self.records.len()),
356 total_bytes: None,
357 segments: None,
358 })
359 }
360}
361
362impl<R> RecordStorageWrite<R> for InMemoryRecordStorage<R>
363where
364 R: ReadableRecordPrelude + WritableRecordPrelude,
365{
366 fn append_record(
367 &mut self,
368 created_at: &SystemInstant,
369 mut record: R,
370 ) -> Result<(WriteResult, CreatedAtOffset)> {
371 debug_assert!(created_at.instant() >= self.created_at_origin.instant());
372 let created_at_offset =
373 CreatedAtOffset::from(created_at.instant() - self.created_at_origin.instant());
374 debug_assert_eq!(record.created_at_offset(), Default::default()); record.set_created_at_offset(created_at_offset);
376 self.records.push_back(record);
377 Ok((Ok(()), created_at_offset))
378 }
379}