use std::io::Cursor;
use std::time::Duration;
use std::{collections::BTreeMap, fmt::Display};
use std::fmt;
use fluvio_protocol::{Decoder, Encoder, Version};
use fluvio_protocol::record::{Offset, Record};
use fluvio_protocol::types::Timestamp;
use crate::SmartModuleRecord;
pub const SMARTMODULE_TIMESTAMPS_VERSION: Version = 22;
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleExtraParams {
inner: BTreeMap<String, String>,
#[fluvio(min_version = 20)]
lookback: Option<Lookback>,
}
impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
SmartModuleExtraParams {
inner,
..Default::default()
}
}
}
impl SmartModuleExtraParams {
pub fn new(params: BTreeMap<String, String>, lookback: Option<Lookback>) -> Self {
Self {
inner: params,
lookback,
}
}
pub fn get(&self, key: &str) -> Option<&String> {
self.inner.get(key)
}
pub fn insert(&mut self, key: String, value: String) {
self.inner.insert(key, value);
}
pub fn lookback(&self) -> Option<&Lookback> {
self.lookback.as_ref()
}
pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
self.lookback = lookback;
}
}
#[derive(Debug, Default, Clone, Encoder, Decoder, PartialEq, Eq)]
pub struct Lookback {
pub last: u64,
#[fluvio(min_version = 21)]
pub age: Option<Duration>,
}
impl Lookback {
pub fn last(last: u64) -> Self {
Self {
last,
..Default::default()
}
}
pub fn age(age: Duration, last: Option<u64>) -> Self {
Self {
last: last.unwrap_or_default(),
age: Some(age),
}
}
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleInput {
base_offset: Offset,
raw_bytes: Vec<u8>,
#[deprecated]
#[fluvio(max_version = 22)]
params: SmartModuleExtraParams,
#[fluvio(min_version = 16, max_version = 22)]
join_record: Vec<u8>,
#[fluvio(min_version = 22)]
base_timestamp: Timestamp,
}
impl SmartModuleInput {
pub fn new(raw_bytes: Vec<u8>, base_offset: Offset, base_timestamp: Timestamp) -> Self {
Self {
base_offset,
raw_bytes,
base_timestamp,
..Default::default()
}
}
pub fn base_offset(&self) -> Offset {
self.base_offset
}
pub fn set_base_offset(&mut self, base_offset: Offset) {
self.base_offset = base_offset;
}
pub fn base_timestamp(&self) -> Timestamp {
self.base_timestamp
}
pub fn set_base_timestamp(&mut self, base_timestamp: Timestamp) {
self.base_timestamp = base_timestamp;
}
pub fn raw_bytes(&self) -> &[u8] {
&self.raw_bytes
}
pub fn into_raw_bytes(self) -> Vec<u8> {
self.raw_bytes
}
pub fn parts(self) -> (Vec<u8>, Vec<u8>) {
(self.raw_bytes, self.join_record)
}
#[deprecated = "use SmartModuleRecord instead. Read more here: https://www.fluvio.io/smartmodules/smdk/smartmodulerecord/."]
pub fn try_into_records(mut self, version: Version) -> Result<Vec<Record>, std::io::Error> {
Decoder::decode_from(&mut Cursor::new(&mut self.raw_bytes), version)
}
pub fn try_into_smartmodule_records(
self,
version: Version,
) -> Result<Vec<SmartModuleRecord>, std::io::Error> {
let base_offset = self.base_offset();
let base_timestamp = self.base_timestamp();
let records_input = self.into_raw_bytes();
let mut records: Vec<Record> = vec![];
Decoder::decode(&mut records, &mut Cursor::new(records_input), version)?;
let records = records
.into_iter()
.map(|inner_record| SmartModuleRecord {
inner_record,
base_offset,
base_timestamp,
})
.collect();
Ok(records)
}
pub fn try_from_records(
records: Vec<Record>,
version: Version,
) -> Result<Self, std::io::Error> {
let mut raw_bytes = Vec::new();
records.encode(&mut raw_bytes, version)?;
Ok(SmartModuleInput {
raw_bytes,
..Default::default()
})
}
}
impl Display for SmartModuleInput {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"SmartModuleInput {{ base_offset: {:?}, base_timestamp: {:?}, record_data: {:?}, join_data: {:#?} }}",
self.base_offset,
self.base_timestamp,
self.raw_bytes.len(),
self.join_record.len()
)
}
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleAggregateInput {
pub base: SmartModuleInput,
pub accumulator: Vec<u8>,
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleInitInput {
pub params: SmartModuleExtraParams,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_to_sm_input_and_back() {
let records = vec![
Record::new("apple"),
Record::new("fruit"),
Record::new("banana"),
];
#[allow(deprecated)]
let sm_input: SmartModuleInput =
SmartModuleInput::try_from_records(records, SMARTMODULE_TIMESTAMPS_VERSION)
.expect("records to input conversion failed");
#[allow(deprecated)]
let records_decoded: Vec<Record> = sm_input
.try_into_records(SMARTMODULE_TIMESTAMPS_VERSION)
.expect("input to records conversion failed");
assert_eq!(records_decoded[0].value.as_ref(), b"apple");
assert_eq!(records_decoded[1].value.as_ref(), b"fruit");
assert_eq!(records_decoded[2].value.as_ref(), b"banana");
}
#[test]
fn sets_the_provided_value_as_timestamp() {
let mut sm_input = SmartModuleInput::new(vec![0, 1, 2, 3], 0, 0);
assert_eq!(sm_input.base_timestamp, 0);
assert_eq!(sm_input.base_timestamp(), 0);
sm_input.set_base_timestamp(1234);
assert_eq!(sm_input.base_timestamp, 1234);
assert_eq!(sm_input.base_timestamp(), 1234);
}
}