1pub mod query;
7pub mod read_record;
8pub mod remove_record;
9pub mod update_record;
10pub mod write_batched_records;
11pub mod write_record;
12
13use bytes::{Bytes, BytesMut};
14
15use futures::stream::Stream;
16
17use futures_util::StreamExt;
18use reduct_base::error::ReductError;
19
20use std::fmt::{Debug, Formatter};
21use std::pin::Pin;
22
23use async_stream::stream;
24
25use std::time::SystemTime;
26
27pub use reduct_base::Labels;
28
29pub type RecordStream = Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Send + Sync>>;
30
31impl Debug for Record {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 f.debug_struct("Record")
34 .field("timestamp", &self.timestamp())
35 .field("labels", &self.labels())
36 .field("content_type", &self.content_type())
37 .field("content_length", &self.content_length())
38 .finish()
39 }
40}
41
42pub struct Record {
44 timestamp: u64,
45 labels: Labels,
46 content_type: String,
47 content_length: u64,
48 data: Option<RecordStream>,
49}
50
51pub struct RecordBuilder {
52 record: Record,
53}
54
55impl Record {
56 pub fn builder() -> RecordBuilder {
57 RecordBuilder::new()
58 }
59
60 pub fn timestamp_us(&self) -> u64 {
62 self.timestamp
63 }
64
65 pub fn timestamp(&self) -> SystemTime {
67 SystemTime::UNIX_EPOCH + std::time::Duration::from_micros(self.timestamp)
68 }
69
70 pub fn labels(&self) -> &Labels {
72 &self.labels
73 }
74
75 pub fn content_type(&self) -> &str {
77 &self.content_type
78 }
79
80 pub fn content_length(&self) -> usize {
82 self.content_length as usize
83 }
84
85 pub fn bytes(
89 self,
90 ) -> Pin<Box<dyn futures::Future<Output = Result<Bytes, ReductError>> + Send + Sync>> {
91 Box::pin(async move {
92 if let Some(mut data) = self.data {
93 let mut bytes = BytesMut::new();
94 while let Some(chunk) = data.next().await {
95 bytes.extend_from_slice(&chunk?);
96 }
97 Ok(bytes.into())
98 } else {
99 Ok(Bytes::new())
100 }
101 })
102 }
103
104 pub fn stream_bytes(
106 self,
107 ) -> Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Sync + Send>> {
108 if let Some(data) = self.data {
109 data
110 } else {
111 let stream = stream! {
112 yield Ok(Bytes::new());
113 };
114 Box::pin(stream)
115 }
116 }
117}
118
119impl RecordBuilder {
120 pub fn new() -> Self {
121 Self {
122 record: Record {
123 timestamp: from_system_time(SystemTime::now()),
124 labels: Default::default(),
125 content_type: "".to_string(),
126 content_length: 0,
127 data: None,
128 },
129 }
130 }
131
132 pub fn timestamp_us(mut self, timestamp: u64) -> Self {
134 self.record.timestamp = timestamp;
135 self
136 }
137
138 pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
140 self.record.timestamp = from_system_time(timestamp);
141 self
142 }
143
144 pub fn labels(mut self, labels: Labels) -> Self {
147 self.record.labels = labels;
148 self
149 }
150
151 pub fn add_label<Str>(mut self, key: Str, value: Str) -> Self
153 where
154 Str: Into<String>,
155 {
156 self.record.labels.insert(key.into(), value.into());
157 self
158 }
159
160 pub fn content_type<Str>(mut self, content_type: Str) -> Self
162 where
163 Str: Into<String>,
164 {
165 self.record.content_type = content_type.into();
166 self
167 }
168
169 pub fn content_length(mut self, content_length: usize) -> Self {
173 self.record.content_length = content_length as u64;
174 self
175 }
176
177 pub fn data<D>(mut self, data: D) -> Self
181 where
182 D: Into<Bytes>,
183 {
184 let bytes = data.into();
185 self.record.content_length = bytes.len() as u64;
186 self.record.data = Some(Box::pin(futures::stream::once(async move { Ok(bytes) })));
187 self
188 }
189
190 pub fn stream(mut self, stream: RecordStream) -> Self {
192 self.record.data = Some(stream);
193 self
194 }
195
196 pub fn build(self) -> Record {
199 self.record
200 }
201}
202
203pub(crate) fn from_system_time(timestamp: SystemTime) -> u64 {
204 timestamp
205 .duration_since(SystemTime::UNIX_EPOCH)
206 .unwrap()
207 .as_micros() as u64
208}