fluvio_smartengine/engine/
config.rs1use std::time::Duration;
2
3use derive_builder::Builder;
4
5use fluvio_protocol::Version;
6use fluvio_smartmodule::SMARTMODULE_TIMESTAMPS_VERSION;
7use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
8
9pub const DEFAULT_SMARTENGINE_VERSION: Version = SMARTMODULE_TIMESTAMPS_VERSION;
10
11#[derive(Debug, Clone)]
13#[non_exhaustive]
14pub enum SmartModuleInitialData {
15 None,
16 Aggregate { accumulator: Vec<u8> },
17}
18
19impl SmartModuleInitialData {
20 pub fn with_aggregate(accumulator: Vec<u8>) -> Self {
21 Self::Aggregate { accumulator }
22 }
23}
24
25impl Default for SmartModuleInitialData {
26 fn default() -> Self {
27 Self::None
28 }
29}
30
31#[derive(Builder)]
33pub struct SmartModuleConfig {
34 #[builder(default, setter(strip_option))]
35 pub(crate) initial_data: SmartModuleInitialData,
36 #[builder(default)]
37 pub(crate) params: SmartModuleExtraParams,
38 #[builder(default, setter(into, strip_option))]
40 pub(crate) version: Option<i16>,
41 #[builder(default)]
42 pub(crate) lookback: Option<Lookback>,
43 #[builder(setter(into))]
45 pub(crate) smartmodule_names: Vec<String>,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum Lookback {
50 Last(u64),
51 Age { age: Duration, last: u64 },
52}
53
54impl SmartModuleConfigBuilder {
55 pub fn param(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
57 let new = self;
58 let mut params = new.params.take().unwrap_or_default();
59 params.insert(key.into(), value.into());
60 new.params = Some(params);
61 new
62 }
63}
64
65impl SmartModuleConfig {
66 pub fn builder() -> SmartModuleConfigBuilder {
67 SmartModuleConfigBuilder::default()
68 }
69
70 pub(crate) fn version(&self) -> i16 {
71 self.version.unwrap_or(DEFAULT_SMARTENGINE_VERSION)
72 }
73
74 pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
75 self.lookback = lookback;
76 }
77}
78
79#[cfg(feature = "transformation")]
80impl From<crate::transformation::TransformationStep> for SmartModuleConfig {
81 fn from(step: crate::transformation::TransformationStep) -> Self {
82 let names = step.uses.clone();
83 Self {
84 initial_data: SmartModuleInitialData::None,
85 params: step
86 .with
87 .into_iter()
88 .map(|(k, v)| (k, v.into()))
89 .collect::<std::collections::BTreeMap<String, String>>()
90 .into(),
91 version: None,
92 lookback: step.lookback.map(|l| l.into()),
93 smartmodule_names: vec![names],
94 }
95 }
96}
97
98#[cfg(feature = "transformation")]
99impl From<crate::transformation::Lookback> for Lookback {
100 fn from(value: crate::transformation::Lookback) -> Self {
101 match value.age {
102 Some(age) => Self::Age {
103 age,
104 last: value.last,
105 },
106 None => Self::Last(value.last),
107 }
108 }
109}
110
111impl From<&fluvio_smartmodule::dataplane::smartmodule::Lookback> for Lookback {
112 fn from(value: &fluvio_smartmodule::dataplane::smartmodule::Lookback) -> Self {
113 match value.age {
114 Some(age) => Self::Age {
115 age,
116 last: value.last,
117 },
118 None => Self::Last(value.last),
119 }
120 }
121}