pub use encoding::{
SmartModuleRuntimeError, SmartModuleInternalError, SmartModuleKind, SmartModuleInput,
SmartModuleAggregateInput, SmartModuleOutput, SmartModuleExtraParams,
SmartModuleAggregateOutput,
};
mod encoding {
use std::fmt::{self, Display};
use crate::Offset;
use crate::record::{Record, RecordData};
use fluvio_protocol::{Encoder, Decoder};
use std::collections::BTreeMap;
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleExtraParams {
inner: BTreeMap<String, String>,
}
impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
SmartModuleExtraParams { inner }
}
}
impl SmartModuleExtraParams {
pub fn get(&self, key: &str) -> Option<&String> {
self.inner.get(key)
}
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleInput {
pub base_offset: Offset,
pub record_data: Vec<u8>,
pub params: SmartModuleExtraParams,
#[fluvio(min_version = 16)]
pub join_record: Vec<u8>,
}
impl std::convert::TryFrom<Vec<Record>> for SmartModuleInput {
type Error = std::io::Error;
fn try_from(records: Vec<Record>) -> Result<Self, Self::Error> {
let mut record_data = Vec::new();
records.encode(&mut record_data, 0)?;
Ok(SmartModuleInput {
record_data,
..Default::default()
})
}
}
impl Display for SmartModuleInput {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"SmartModuleInput {{ base_offset: {:?}, record_data: {:?}, join_data: {:#?} }}",
self.base_offset,
self.record_data.len(),
self.join_record.len()
)
}
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleAggregateInput {
pub base: SmartModuleInput,
pub accumulator: Vec<u8>,
}
#[derive(Debug, Default, Encoder, Decoder)]
pub struct SmartModuleOutput {
pub successes: Vec<Record>,
pub error: Option<SmartModuleRuntimeError>,
}
#[derive(Debug, Default, Encoder, Decoder)]
pub struct SmartModuleAggregateOutput {
pub base: SmartModuleOutput,
#[fluvio(min_version = 16)]
pub accumulator: Vec<u8>,
}
#[repr(i32)]
#[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
#[non_exhaustive]
#[fluvio(encode_discriminant)]
pub enum SmartModuleInternalError {
#[error("encountered unknown error during SmartModule processing")]
UnknownError = -1,
#[error("failed to decode SmartModule base input")]
DecodingBaseInput = -11,
#[error("failed to decode SmartModule record input")]
DecodingRecords = -22,
#[error("failed to encode SmartModule output")]
EncodingOutput = -33,
#[error("failed to parse SmartModule extra params")]
ParsingExtraParams = -44,
#[error("undefined right record in Join SmartModule")]
UndefinedRightRecord = -55,
#[error("Init params are not found")]
InitParamsNotFound = -60,
}
impl Default for SmartModuleInternalError {
fn default() -> Self {
Self::UnknownError
}
}
#[derive(thiserror::Error, Debug, Default, Clone, PartialEq, Encoder, Decoder)]
pub struct SmartModuleRuntimeError {
pub hint: String,
pub offset: Offset,
pub kind: SmartModuleKind,
pub record_key: Option<RecordData>,
pub record_value: RecordData,
}
impl SmartModuleRuntimeError {
pub fn new(
record: &Record,
base_offset: Offset,
kind: SmartModuleKind,
error: eyre::Error,
) -> Self {
let hint = format!("{:?}", error);
let offset = base_offset + record.preamble.offset_delta();
let record_key = record.key.clone();
let record_value = record.value.clone();
Self {
hint,
offset,
kind,
record_key,
record_value,
}
}
}
impl fmt::Display for SmartModuleRuntimeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let key = self
.record_key
.as_ref()
.map(display_record_data)
.unwrap_or_else(|| "NULL".to_string());
let value = display_record_data(&self.record_value);
write!(
f,
"{}\n\n\
SmartModule Info: \n \
Type: {}\n \
Offset: {}\n \
Key: {}\n \
Value: {}",
self.hint, self.kind, self.offset, key, value,
)
}
}
fn display_record_data(record: &RecordData) -> String {
match std::str::from_utf8(record.as_ref()) {
Ok(s) => s.to_string(),
_ => format!("Binary: {} bytes", record.as_ref().len()),
}
}
#[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
pub enum SmartModuleKind {
Filter,
Map,
#[fluvio(min_version = 15)]
ArrayMap,
#[fluvio(min_version = 13)]
Aggregate,
#[fluvio(min_version = 16)]
FilterMap,
#[fluvio(min_version = 16)]
Join,
#[fluvio(min_version = 17)]
Generic,
}
impl Default for SmartModuleKind {
fn default() -> Self {
Self::Filter
}
}
impl fmt::Display for SmartModuleKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
}