fluvio_smartengine/engine/
config.rs

1use std::time::Duration;
2
3use derive_builder::Builder;
4
5use fluvio_protocol::Version;
6use fluvio_smartmodule::SMARTMODULE_TIMESTAMPS_VERSION;
7use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
8
9pub const DEFAULT_SMARTENGINE_VERSION: Version = SMARTMODULE_TIMESTAMPS_VERSION;
10
11/// Initial seed data to passed, this will be send back as part of the output
12#[derive(Debug, Clone)]
13#[non_exhaustive]
14pub enum SmartModuleInitialData {
15    None,
16    Aggregate { accumulator: Vec<u8> },
17}
18
19impl SmartModuleInitialData {
20    pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
21        Self::Aggregate { accumulator }
22    }
23}
24
25impl Default for SmartModuleInitialData {
26    fn default() -> Self {
27        Self::None
28    }
29}
30
31/// SmartModule configuration
32#[derive(Builder)]
33pub struct SmartModuleConfig {
34    #[builder(default, setter(strip_option))]
35    pub(crate) initial_data: SmartModuleInitialData,
36    #[builder(default)]
37    pub(crate) params: SmartModuleExtraParams,
38    // this will be deprecated in the future
39    #[builder(default, setter(into, strip_option))]
40    pub(crate) version: Option<i16>,
41    #[builder(default)]
42    pub(crate) lookback: Option<Lookback>,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum Lookback {
47    Last(u64),
48    Age { age: Duration, last: u64 },
49}
50
51impl SmartModuleConfigBuilder {
52    /// add initial parameters
53    pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
54        let new = self;
55        let mut params = new.params.take().unwrap_or_default();
56        params.insert(key.into(), value.into());
57        new.params = Some(params);
58        new
59    }
60}
61
62impl SmartModuleConfig {
63    pub fn builder() -> SmartModuleConfigBuilder {
64        SmartModuleConfigBuilder::default()
65    }
66
67    pub(crate) fn version(&self) -> i16 {
68        self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
69    }
70
71    pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
72        self.lookback = lookback;
73    }
74}
75
76#[cfg(feature = "transformation")]
77impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
78    fn from(step: crate::transformation::TransformationStep) -> Self {
79        Self {
80            initial_data: SmartModuleInitialData::None,
81            params: step
82                .with
83                .into_iter()
84                .map(|(k, v)| (k, v.into()))
85                .collect::<std::collections::BTreeMap<String, String>>()
86                .into(),
87            version: None,
88            lookback: step.lookback.map(|l| l.into()),
89        }
90    }
91}
92
93#[cfg(feature = "transformation")]
94impl From<crate::transformation::Lookback> for Lookback {
95    fn from(value: crate::transformation::Lookback) -> Self {
96        match value.age {
97            Some(age) => Self::Age {
98                age,
99                last: value.last,
100            },
101            None => Self::Last(value.last),
102        }
103    }
104}
105
106impl From<&fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
107    fn from(value: &fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
108        match value.age {
109            Some(age) => Self::Age {
110                age,
111                last: value.last,
112            },
113            None => Self::Last(value.last),
114        }
115    }
116}