reduct_rs/
record.rs

1// Copyright 2023 ReductStore
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6pub mod query;
7pub mod read_record;
8pub mod remove_record;
9pub mod update_record;
10pub mod write_batched_records;
11pub mod write_record;
12
13use bytes::{Bytes, BytesMut};
14
15use futures::stream::Stream;
16
17use futures_util::StreamExt;
18use reduct_base::error::ReductError;
19
20use std::fmt::{Debug, Formatter};
21use std::pin::Pin;
22
23use async_stream::stream;
24
25use std::time::SystemTime;
26
27pub use reduct_base::Labels;
28
29pub type RecordStream = Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Send + Sync>>;
30
31impl Debug for Record {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        f.debug_struct("Record")
34            .field("timestamp", &self.timestamp())
35            .field("labels", &self.labels())
36            .field("content_type", &self.content_type())
37            .field("content_length", &self.content_length())
38            .finish()
39    }
40}
41
42/// A record is a timestamped piece of data with labels
43pub struct Record {
44    timestamp: u64,
45    labels: Labels,
46    content_type: String,
47    content_length: u64,
48    data: Option<RecordStream>,
49}
50
51pub struct RecordBuilder {
52    record: Record,
53}
54
55impl Record {
56    pub fn builder() -> RecordBuilder {
57        RecordBuilder::new()
58    }
59
60    /// Unix timestamp in microseconds
61    pub fn timestamp_us(&self) -> u64 {
62        self.timestamp
63    }
64
65    /// Timestamp as a SystemTime
66    pub fn timestamp(&self) -> SystemTime {
67        SystemTime::UNIX_EPOCH + std::time::Duration::from_micros(self.timestamp)
68    }
69
70    /// Labels associated with the record
71    pub fn labels(&self) -> &Labels {
72        &self.labels
73    }
74
75    /// Content type of the record
76    pub fn content_type(&self) -> &str {
77        &self.content_type
78    }
79
80    /// Content length of the record
81    pub fn content_length(&self) -> usize {
82        self.content_length as usize
83    }
84
85    /// Content of the record
86    ///
87    /// This consumes the record and returns bytes
88    pub fn bytes(
89        self,
90    ) -> Pin<Box<dyn futures::Future<Output = Result<Bytes, ReductError>> + Send + Sync>> {
91        Box::pin(async move {
92            if let Some(mut data) = self.data {
93                let mut bytes = BytesMut::new();
94                while let Some(chunk) = data.next().await {
95                    bytes.extend_from_slice(&chunk?);
96                }
97                Ok(bytes.into())
98            } else {
99                Ok(Bytes::new())
100            }
101        })
102    }
103
104    /// Content of the record as a stream
105    pub fn stream_bytes(
106        self,
107    ) -> Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Sync + Send>> {
108        if let Some(data) = self.data {
109            data
110        } else {
111            let stream = stream! {
112               yield Ok(Bytes::new());
113            };
114            Box::pin(stream)
115        }
116    }
117}
118
119impl RecordBuilder {
120    pub fn new() -> Self {
121        Self {
122            record: Record {
123                timestamp: from_system_time(SystemTime::now()),
124                labels: Default::default(),
125                content_type: "".to_string(),
126                content_length: 0,
127                data: None,
128            },
129        }
130    }
131
132    /// Set the timestamp of the record to write as a unix timestamp in microseconds.
133    pub fn timestamp_us(mut self, timestamp: u64) -> Self {
134        self.record.timestamp = timestamp;
135        self
136    }
137
138    /// Set the timestamp of the record to write.
139    pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
140        self.record.timestamp = from_system_time(timestamp);
141        self
142    }
143
144    /// Set the labels of the record to write.
145    /// This replaces all existing labels.
146    pub fn labels(mut self, labels: Labels) -> Self {
147        self.record.labels = labels;
148        self
149    }
150
151    /// Add a label to the record to write.
152    pub fn add_label<Str>(mut self, key: Str, value: Str) -> Self
153    where
154        Str: Into<String>,
155    {
156        self.record.labels.insert(key.into(), value.into());
157        self
158    }
159
160    /// Set the content type of the record to write.
161    pub fn content_type<Str>(mut self, content_type: Str) -> Self
162    where
163        Str: Into<String>,
164    {
165        self.record.content_type = content_type.into();
166        self
167    }
168
169    /// Set the content length of the record to write
170    ///
171    /// Note: use this with stream data
172    pub fn content_length(mut self, content_length: usize) -> Self {
173        self.record.content_length = content_length as u64;
174        self
175    }
176
177    /// Set the content of the record
178    ///
179    /// Note: use this with data that fits in memory
180    pub fn data<D>(mut self, data: D) -> Self
181    where
182        D: Into<Bytes>,
183    {
184        let bytes = data.into();
185        self.record.content_length = bytes.len() as u64;
186        self.record.data = Some(Box::pin(futures::stream::once(async move { Ok(bytes) })));
187        self
188    }
189
190    /// Set the content of the record as a stream
191    pub fn stream(mut self, stream: RecordStream) -> Self {
192        self.record.data = Some(stream);
193        self
194    }
195
196    /// Build the record
197    /// This consumes the builder
198    pub fn build(self) -> Record {
199        self.record
200    }
201}
202
203pub(crate) fn from_system_time(timestamp: SystemTime) -> u64 {
204    timestamp
205        .duration_since(SystemTime::UNIX_EPOCH)
206        .unwrap()
207        .as_micros() as u64
208}