use std::error::Error;
use chrono::{DateTime, LocalResult, TimeZone, Utc};
use flatbuffers::FlatBufferBuilder;
use futures_core::Stream;
use redpanda::{
message::{BorrowedMessage, Message},
producer::RedpandaRecord,
};
pub fn nanos_to_date_time(unix_ns: i64) -> LocalResult<DateTime<Utc>> {
Utc.timestamp_opt(unix_ns / 1_000_000_000, (unix_ns % 1_000_000_000) as u32)
}
pub trait MeasurementError: Error {
fn empty_payload_error() -> Self;
}
pub trait Measurement<'a>: Into<FlatBufferBuilder<'a>> {
type Error: MeasurementError;
const TOPIC_NAME: &'static str;
fn to_bytes(self) -> Vec<u8> {
let fbb: FlatBufferBuilder = self.into();
fbb.finished_data().to_vec()
}
fn to_message(self) -> RedpandaRecord
where
Self: Sized,
{
let payload: Vec<u8> = self.to_bytes();
RedpandaRecord::new(Self::TOPIC_NAME, None, payload, None)
}
fn from_bytes(bytes: &[u8]) -> Result<Self, Self::Error>
where
Self: Sized;
fn from_message(message: BorrowedMessage) -> Result<Self, Self::Error>
where
Self: Sized,
{
let bytes = match message.payload() {
Some(b) => b,
None => return Err(Self::Error::empty_payload_error()),
};
Self::from_bytes(bytes)
}
fn timestamp(&self) -> DateTime<Utc>;
fn timestamp_nanos(&self) -> i64 {
self.timestamp().timestamp_nanos()
}
fn source_id(&self) -> &str;
}
pub trait MeasurementStream {
type Item: for<'a> Measurement<'a>;
fn stream(&self) -> dyn Stream<Item = Self::Item>;
}