1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
use derive_builder::Builder;
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;

const DEFAULT_SMARTENGINE_VERSION: i16 = 17;

/// Initial seed data to passed, this will be send back as part of the output
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SmartModuleInitialData {
    None,
    Aggregate { accumulator: Vec<u8> },
}

impl SmartModuleInitialData {
    pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
        Self::Aggregate { accumulator }
    }
}

impl Default for SmartModuleInitialData {
    fn default() -> Self {
        Self::None
    }
}

/// SmartModule configuration
#[derive(Builder)]
pub struct SmartModuleConfig {
    #[builder(default, setter(strip_option))]
    pub(crate) initial_data: SmartModuleInitialData,
    #[builder(default)]
    pub(crate) params: SmartModuleExtraParams,
    // this will be deprecated in the future
    #[builder(default, setter(into, strip_option))]
    pub(crate) version: Option<i16>,
    #[builder(default)]
    pub(crate) lookback: Option<Lookback>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Lookback {
    Last(u64),
}

impl SmartModuleConfigBuilder {
    /// add initial parameters
    pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
        let mut new = self;
        let mut params = new.params.take().unwrap_or_default();
        params.insert(key.into(), value.into());
        new.params = Some(params);
        new
    }
}

impl SmartModuleConfig {
    pub fn builder() -> SmartModuleConfigBuilder {
        SmartModuleConfigBuilder::default()
    }

    pub(crate) fn version(&self) -> i16 {
        self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
    }

    pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
        self.lookback = lookback;
    }
}

#[cfg(feature = "transformation")]
impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
    fn from(step: crate::transformation::TransformationStep) -> Self {
        Self {
            initial_data: SmartModuleInitialData::None,
            params: step
                .with
                .into_iter()
                .map(|(k, v)| (k, v.into()))
                .collect::<std::collections::BTreeMap<String, String>>()
                .into(),
            version: None,
            lookback: step.lookback.map(|l| l.into()),
        }
    }
}

#[cfg(feature = "transformation")]
impl From<crate::transformation::Lookback> for Lookback {
    fn from(value: crate::transformation::Lookback) -> Self {
        Self::Last(value.last)
    }
}

impl From<fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
    fn from(value: fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
        Self::Last(value.last)
    }
}

impl From<&fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
    fn from(value: &fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
        Self::Last(value.last)
    }
}