pub mod query;
pub mod read_record;
pub mod remove_record;
pub mod update_record;
pub mod write_batched_records;
pub mod write_record;
use bytes::{Bytes, BytesMut};
use futures::stream::Stream;
use futures_util::StreamExt;
use reduct_base::error::ReductError;
use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use async_stream::stream;
use std::time::SystemTime;
pub use reduct_base::Labels;
pub type RecordStream = Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Send + Sync>>;
impl Debug for Record {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Record")
.field("timestamp", &self.timestamp())
.field("labels", &self.labels())
.field("content_type", &self.content_type())
.field("content_length", &self.content_length())
.finish()
}
}
pub struct Record {
timestamp: u64,
labels: Labels,
content_type: String,
content_length: u64,
data: Option<RecordStream>,
}
pub struct RecordBuilder {
record: Record,
}
impl Record {
pub fn builder() -> RecordBuilder {
RecordBuilder::new()
}
pub fn timestamp_us(&self) -> u64 {
self.timestamp
}
pub fn timestamp(&self) -> SystemTime {
SystemTime::UNIX_EPOCH + std::time::Duration::from_micros(self.timestamp)
}
pub fn labels(&self) -> &Labels {
&self.labels
}
pub fn content_type(&self) -> &str {
&self.content_type
}
pub fn content_length(&self) -> usize {
self.content_length as usize
}
pub fn bytes(
self,
) -> Pin<Box<dyn futures::Future<Output = Result<Bytes, ReductError>> + Send + Sync>> {
Box::pin(async move {
if let Some(mut data) = self.data {
let mut bytes = BytesMut::new();
while let Some(chunk) = data.next().await {
bytes.extend_from_slice(&chunk?);
}
Ok(bytes.into())
} else {
Ok(Bytes::new())
}
})
}
pub fn stream_bytes(
self,
) -> Pin<Box<dyn Stream<Item = Result<Bytes, ReductError>> + Sync + Send>> {
if let Some(data) = self.data {
data
} else {
let stream = stream! {
yield Ok(Bytes::new());
};
Box::pin(stream)
}
}
}
impl RecordBuilder {
pub fn new() -> Self {
Self {
record: Record {
timestamp: from_system_time(SystemTime::now()),
labels: Default::default(),
content_type: "".to_string(),
content_length: 0,
data: None,
},
}
}
pub fn timestamp_us(mut self, timestamp: u64) -> Self {
self.record.timestamp = timestamp;
self
}
pub fn timestamp(mut self, timestamp: SystemTime) -> Self {
self.record.timestamp = from_system_time(timestamp);
self
}
pub fn labels(mut self, labels: Labels) -> Self {
self.record.labels = labels;
self
}
pub fn add_label<Str>(mut self, key: Str, value: Str) -> Self
where
Str: Into<String>,
{
self.record.labels.insert(key.into(), value.into());
self
}
pub fn content_type<Str>(mut self, content_type: Str) -> Self
where
Str: Into<String>,
{
self.record.content_type = content_type.into();
self
}
pub fn content_length(mut self, content_length: usize) -> Self {
self.record.content_length = content_length as u64;
self
}
pub fn data<D>(mut self, data: D) -> Self
where
D: Into<Bytes>,
{
let bytes = data.into();
self.record.content_length = bytes.len() as u64;
self.record.data = Some(Box::pin(futures::stream::once(async move { Ok(bytes) })));
self
}
pub fn stream(mut self, stream: RecordStream) -> Self {
self.record.data = Some(stream);
self
}
pub fn build(self) -> Record {
self.record
}
}
pub(crate) fn from_system_time(timestamp: SystemTime) -> u64 {
timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_micros() as u64
}