fluvio_smartengine/engine/
config.rs1use std::time::Duration;
2
3use derive_builder::Builder;
4
5use fluvio_protocol::Version;
6use fluvio_smartmodule::SMARTMODULE_TIMESTAMPS_VERSION;
7use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
8
9pub const DEFAULT_SMARTENGINE_VERSION: Version = SMARTMODULE_TIMESTAMPS_VERSION;
10
11#[derive(Debug, Clone)]
13#[non_exhaustive]
14pub enum SmartModuleInitialData {
15 None,
16 Aggregate { accumulator: Vec<u8> },
17}
18
19impl SmartModuleInitialData {
20 pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
21 Self::Aggregate { accumulator }
22 }
23}
24
25impl Default for SmartModuleInitialData {
26 fn default() -> Self {
27 Self::None
28 }
29}
30
31#[derive(Builder)]
33pub struct SmartModuleConfig {
34 #[builder(default, setter(strip_option))]
35 pub(crate) initial_data: SmartModuleInitialData,
36 #[builder(default)]
37 pub(crate) params: SmartModuleExtraParams,
38 #[builder(default, setter(into, strip_option))]
40 pub(crate) version: Option<i16>,
41 #[builder(default)]
42 pub(crate) lookback: Option<Lookback>,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum Lookback {
47 Last(u64),
48 Age { age: Duration, last: u64 },
49}
50
51impl SmartModuleConfigBuilder {
52 pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
54 let new = self;
55 let mut params = new.params.take().unwrap_or_default();
56 params.insert(key.into(), value.into());
57 new.params = Some(params);
58 new
59 }
60}
61
62impl SmartModuleConfig {
63 pub fn builder() -> SmartModuleConfigBuilder {
64 SmartModuleConfigBuilder::default()
65 }
66
67 pub(crate) fn version(&self) -> i16 {
68 self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
69 }
70
71 pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
72 self.lookback = lookback;
73 }
74}
75
76#[cfg(feature = "transformation")]
77impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
78 fn from(step: crate::transformation::TransformationStep) -> Self {
79 Self {
80 initial_data: SmartModuleInitialData::None,
81 params: step
82 .with
83 .into_iter()
84 .map(|(k, v)| (k, v.into()))
85 .collect::<std::collections::BTreeMap<String, String>>()
86 .into(),
87 version: None,
88 lookback: step.lookback.map(|l| l.into()),
89 }
90 }
91}
92
93#[cfg(feature = "transformation")]
94impl From<crate::transformation::Lookback> for Lookback {
95 fn from(value: crate::transformation::Lookback) -> Self {
96 match value.age {
97 Some(age) => Self::Age {
98 age,
99 last: value.last,
100 },
101 None => Self::Last(value.last),
102 }
103 }
104}
105
106impl From<&fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
107 fn from(value: &fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
108 match value.age {
109 Some(age) => Self::Age {
110 age,
111 last: value.last,
112 },
113 None => Self::Last(value.last),
114 }
115 }
116}