fluvio_smartengine/engine/
config.rs

1use std::time::Duration;
2
3use derive_builder::Builder;
4
5use fluvio_protocol::Version;
6use fluvio_smartmodule::SMARTMODULE_TIMESTAMPS_VERSION;
7use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
8
9pub const DEFAULT_SMARTENGINE_VERSION: Version = SMARTMODULE_TIMESTAMPS_VERSION;
10
11/// Initial seed data to passed, this will be send back as part of the output
12#[derive(Debug, Clone)]
13#[non_exhaustive]
14pub enum SmartModuleInitialData {
15    None,
16    Aggregate { accumulator: Vec<u8> },
17}
18
19impl SmartModuleInitialData {
20    pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
21        Self::Aggregate { accumulator }
22    }
23}
24
25impl Default for SmartModuleInitialData {
26    fn default() -> Self {
27        Self::None
28    }
29}
30
31/// SmartModule configuration
32#[derive(Builder)]
33pub struct SmartModuleConfig {
34    #[builder(default, setter(strip_option))]
35    pub(crate) initial_data: SmartModuleInitialData,
36    #[builder(default)]
37    pub(crate) params: SmartModuleExtraParams,
38    // this will be deprecated in the future
39    #[builder(default, setter(into, strip_option))]
40    pub(crate) version: Option<i16>,
41    #[builder(default)]
42    pub(crate) lookback: Option<Lookback>,
43    // into makes the field required
44    #[builder(setter(into))]
45    pub(crate) smartmodule_names: Vec<String>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum Lookback {
50    Last(u64),
51    Age { age: Duration, last: u64 },
52}
53
54impl SmartModuleConfigBuilder {
55    /// add initial parameters
56    pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
57        let new = self;
58        let mut params = new.params.take().unwrap_or_default();
59        params.insert(key.into(), value.into());
60        new.params = Some(params);
61        new
62    }
63}
64
65impl SmartModuleConfig {
66    pub fn builder() -> SmartModuleConfigBuilder {
67        SmartModuleConfigBuilder::default()
68    }
69
70    pub(crate) fn version(&self) -> i16 {
71        self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
72    }
73
74    pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
75        self.lookback = lookback;
76    }
77}
78
79#[cfg(feature = "transformation")]
80impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
81    fn from(step: crate::transformation::TransformationStep) -> Self {
82        let names = step.uses.clone();
83        Self {
84            initial_data: SmartModuleInitialData::None,
85            params: step
86                .with
87                .into_iter()
88                .map(|(k, v)| (k, v.into()))
89                .collect::<std::collections::BTreeMap<String, String>>()
90                .into(),
91            version: None,
92            lookback: step.lookback.map(|l| l.into()),
93            smartmodule_names: vec![names],
94        }
95    }
96}
97
98#[cfg(feature = "transformation")]
99impl From<crate::transformation::Lookback> for Lookback {
100    fn from(value: crate::transformation::Lookback) -> Self {
101        match value.age {
102            Some(age) => Self::Age {
103                age,
104                last: value.last,
105            },
106            None => Self::Last(value.last),
107        }
108    }
109}
110
111impl From<&fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
112    fn from(value: &fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
113        match value.age {
114            Some(age) => Self::Age {
115                age,
116                last: value.last,
117            },
118            None => Self::Last(value.last),
119        }
120    }
121}