fluvio_smartengine/engine/wasmtime/
engine.rs

1use std::fmt::{self, Debug};
2use std::future::Future;
3
4use anyhow::Result;
5use fluvio_smartmodule::Record;
6use tracing::debug;
7use wasmtime::{Engine, Module};
8
9use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};
10
11use crate::SmartModuleConfig;
12use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
13
14use super::init::SmartModuleInit;
15use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};
16
17use super::limiter::StoreResourceLimiter;
18use super::look_back::SmartModuleLookBack;
19use super::metrics::SmartModuleChainMetrics;
20use super::state::WasmState;
21use super::transforms::create_transform;
22
23// 1 GB
24const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000;
25
26#[derive(Clone)]
27pub struct SmartEngine(Engine);
28
29#[allow(clippy::new_without_default)]
30impl SmartEngine {
31    pub fn new() -> Self {
32        let mut config = wasmtime::Config::default();
33        config.consume_fuel(true);
34        Self(Engine::new(&config).expect("Config is static"))
35    }
36
37    pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState {
38        WasmState::new(&self.0, store_limiter)
39    }
40}
41
42impl Debug for SmartEngine {
43    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44        write!(f, "SmartModuleEngine")
45    }
46}
47
48/// Building SmartModule
49pub struct SmartModuleChainBuilder {
50    smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
51    store_limiter: StoreResourceLimiter,
52}
53
54impl SmartModuleChainBuilder {
55    /// Add SmartModule with a single transform and init
56    pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
57        self.smart_modules.push((config, bytes))
58    }
59
60    pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) {
61        self.store_limiter.set_memory_size(max_memory_bytes);
62    }
63
64    /// stop adding smartmodule and return SmartModuleChain that can be executed
65    pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
66        let mut instances = Vec::with_capacity(self.smart_modules.len());
67        let mut state = engine.new_state(self.store_limiter);
68        for (config, bytes) in self.smart_modules {
69            let module = Module::new(&engine.0, bytes)?;
70            let version = config.version();
71            let ctx = SmartModuleInstanceContext::instantiate(
72                &mut state,
73                module,
74                config.params,
75                version,
76                config.lookback,
77            )?;
78            let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
79            let look_back = SmartModuleLookBack::try_instantiate(&ctx, &mut state)?;
80            let transform = create_transform(&ctx, config.initial_data, &mut state)?;
81            let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform, version);
82
83            instance.call_init(&mut state)?;
84            instances.push(instance);
85        }
86
87        Ok(SmartModuleChainInstance {
88            store: state,
89            instances,
90        })
91    }
92}
93
94impl Default for SmartModuleChainBuilder {
95    fn default() -> Self {
96        let mut store_limiter = StoreResourceLimiter::default();
97        store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT);
98        Self {
99            smart_modules: Default::default(),
100            store_limiter,
101        }
102    }
103}
104
105impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
106    fn from(pair: (SmartModuleConfig, T)) -> Self {
107        let mut result = Self::default();
108        result.add_smart_module(pair.0, pair.1.into());
109        result
110    }
111}
112
113/// SmartModule Chain Instance that can be executed
114pub struct SmartModuleChainInstance {
115    store: WasmState,
116    instances: Vec<SmartModuleInstance>,
117}
118
119impl Debug for SmartModuleChainInstance {
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        write!(f, "SmartModuleChainInstance")
122    }
123}
124
125impl SmartModuleChainInstance {
126    #[cfg(test)]
127    pub(crate) fn instances(&self) -> &Vec<SmartModuleInstance> {
128        &self.instances
129    }
130
131    /// A single record is processed thru all smartmodules in the chain.
132    /// The output of one smartmodule is the input of the next smartmodule.
133    /// A single record may result in multiple records.
134    /// The output of the last smartmodule is added to the output of the chain.
135    pub fn process(
136        &mut self,
137        input: SmartModuleInput,
138        metric: &SmartModuleChainMetrics,
139    ) -> Result<SmartModuleOutput> {
140        let raw_len = input.raw_bytes().len();
141        debug!(raw_len, "sm raw input");
142        metric.add_bytes_in(raw_len as u64);
143
144        let base_offset = input.base_offset();
145        let base_timestamp = input.base_timestamp();
146
147        if let Some((last, instances)) = self.instances.split_last_mut() {
148            let mut next_input = input;
149
150            for instance in instances {
151                // pass raw inputs to transform instance
152                // each raw input may result in multiple records
153                let time = std::time::Instant::now();
154                self.store.top_up_fuel();
155                let output = instance.process(next_input, &mut self.store)?;
156                let fuel_used = self.store.get_used_fuel();
157                debug!(fuel_used, "fuel used");
158                metric.add_fuel_used(fuel_used, time.elapsed());
159
160                if let Some(ref smerr) = output.error {
161                    // encountered error, we stop processing and return partial output
162                    tracing::error!(err=?smerr);
163                    return Ok(output);
164                } else {
165                    next_input =
166                        SmartModuleInput::try_from_records(output.successes, instance.version())?;
167                    next_input.set_base_offset(base_offset);
168                    next_input.set_base_timestamp(base_timestamp);
169                }
170            }
171
172            let time = std::time::Instant::now();
173            self.store.top_up_fuel();
174            let output = last.process(next_input, &mut self.store)?;
175            if let Some(ref smerr) = output.error {
176                tracing::error!(err=?smerr);
177            }
178            let fuel_used = self.store.get_used_fuel();
179            debug!(fuel_used, "fuel used");
180            metric.add_fuel_used(fuel_used, time.elapsed());
181            let records_out = output.successes.len();
182            metric.add_records_out(records_out as u64);
183            debug!(records_out, "sm records out");
184            Ok(output)
185        } else {
186            #[allow(deprecated)]
187            let records = input.try_into_records(DEFAULT_SMARTENGINE_VERSION)?;
188
189            Ok(SmartModuleOutput::new(records))
190        }
191    }
192
193    pub async fn look_back<F, R>(
194        &mut self,
195        read_fn: F,
196        metrics: &SmartModuleChainMetrics,
197    ) -> Result<()>
198    where
199        R: Future<Output = Result<Vec<Record>>>,
200        F: Fn(Lookback) -> R,
201    {
202        debug!("look_back on chain with {} instances", self.instances.len());
203
204        for instance in self.instances.iter_mut() {
205            if let Some(lookback) = instance.lookback() {
206                debug!("look_back on instance");
207                let records: Vec<Record> = read_fn(lookback).await?;
208                let input: SmartModuleInput =
209                    SmartModuleInput::try_from_records(records, instance.version())?;
210
211                let time = std::time::Instant::now();
212                metrics.add_bytes_in(input.raw_bytes().len() as u64);
213                self.store.top_up_fuel();
214
215                let result = instance.call_look_back(input, &mut self.store);
216                let fuel_used = self.store.get_used_fuel();
217
218                debug!(fuel_used, "fuel used");
219                metrics.add_fuel_used(fuel_used, time.elapsed());
220                result?;
221            }
222        }
223
224        Ok(())
225    }
226}
227
228#[cfg(test)]
229mod test {
230
231    use crate::SmartModuleConfig;
232
233    #[test]
234    fn test_param() {
235        let config = SmartModuleConfig::builder()
236            .param("key", "apple")
237            .build()
238            .unwrap();
239
240        assert_eq!(config.params.get("key"), Some(&"apple".to_string()));
241    }
242}
243
244#[cfg(test)]
245mod chaining_test {
246
247    use fluvio_protocol::record::Record;
248    use fluvio_protocol::link::smartmodule::SmartModuleLookbackRuntimeError;
249    use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
250
251    use crate::engine::error::EngineError;
252    use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
253
254    use super::super::{
255        SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
256        metrics::SmartModuleChainMetrics,
257    };
258
259    const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
260    const SM_MAP: &str = "fluvio_smartmodule_map";
261    const SM_FILTER_LOOK_BACK: &str = "fluvio_smartmodule_filter_lookback";
262
263    use super::super::fixture::read_wasm_module;
264
265    #[ignore]
266    #[test]
267    fn test_chain_filter_map() {
268        let engine = SmartEngine::new();
269        let mut chain_builder = SmartModuleChainBuilder::default();
270        let metrics = SmartModuleChainMetrics::default();
271
272        chain_builder.add_smart_module(
273            SmartModuleConfig::builder()
274                .param("key", "a")
275                .build()
276                .unwrap(),
277            read_wasm_module(SM_FILTER_INIT),
278        );
279
280        chain_builder.add_smart_module(
281            SmartModuleConfig::builder().build().unwrap(),
282            read_wasm_module(SM_MAP),
283        );
284
285        let mut chain = chain_builder
286            .initialize(&engine)
287            .expect("failed to build chain");
288        assert_eq!(chain.instances().len(), 2);
289
290        let input = vec![Record::new("hello world")];
291        let output = chain
292            .process(
293                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
294                    .expect("input"),
295                &metrics,
296            )
297            .expect("process");
298        assert_eq!(output.successes.len(), 0); // no records passed
299
300        let input = vec![
301            Record::new("apple"),
302            Record::new("fruit"),
303            Record::new("banana"),
304        ];
305        let output = chain
306            .process(
307                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
308                    .expect("input"),
309                &metrics,
310            )
311            .expect("process");
312        assert_eq!(output.successes.len(), 2); // one record passed
313        assert_eq!(output.successes[0].value.as_ref(), b"APPLE");
314        assert_eq!(output.successes[1].value.as_ref(), b"BANANA");
315        assert!(metrics.fuel_used() > 0);
316        chain.store.top_up_fuel();
317        assert_eq!(chain.store.get_used_fuel(), 0);
318    }
319
320    const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";
321
322    #[ignore]
323    #[test]
324    fn test_chain_filter_aggregate() {
325        let engine = SmartEngine::new();
326        let mut chain_builder = SmartModuleChainBuilder::default();
327        let metrics = SmartModuleChainMetrics::default();
328
329        chain_builder.add_smart_module(
330            SmartModuleConfig::builder()
331                .param("key", "a")
332                .build()
333                .unwrap(),
334            read_wasm_module(SM_FILTER_INIT),
335        );
336
337        chain_builder.add_smart_module(
338            SmartModuleConfig::builder()
339                .initial_data(SmartModuleInitialData::with_aggregate(
340                    "zero".to_string().as_bytes().to_vec(),
341                ))
342                .build()
343                .unwrap(),
344            read_wasm_module(SM_AGGEGRATE),
345        );
346
347        let mut chain = chain_builder
348            .initialize(&engine)
349            .expect("failed to build chain");
350        assert_eq!(chain.instances().len(), 2);
351
352        let input = vec![
353            Record::new("apple"),
354            Record::new("fruit"),
355            Record::new("banana"),
356        ];
357        let output = chain
358            .process(
359                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
360                    .expect("input"),
361                &metrics,
362            )
363            .expect("process");
364        assert_eq!(output.successes.len(), 2); // one record passed
365        assert_eq!(output.successes[0].value().to_string(), "zeroapple");
366        assert_eq!(output.successes[1].value().to_string(), "zeroapplebanana");
367
368        let input = vec![Record::new("nothing")];
369        let output = chain
370            .process(
371                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
372                    .expect("input"),
373                &metrics,
374            )
375            .expect("process");
376        assert_eq!(output.successes.len(), 0); // one record passed
377
378        let input = vec![Record::new("elephant")];
379        let output = chain
380            .process(
381                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
382                    .expect("input"),
383                &metrics,
384            )
385            .expect("process");
386        assert_eq!(output.successes.len(), 1); // one record passed
387        assert_eq!(
388            output.successes[0].value().to_string(),
389            "zeroapplebananaelephant"
390        );
391    }
392
393    #[ignore]
394    #[test]
395    fn test_chain_filter_look_back() {
396        //given
397        let engine = SmartEngine::new();
398        let mut chain_builder = SmartModuleChainBuilder::default();
399        let metrics = SmartModuleChainMetrics::default();
400
401        chain_builder.add_smart_module(
402            SmartModuleConfig::builder()
403                .lookback(Some(Lookback::Last(1)))
404                .build()
405                .unwrap(),
406            read_wasm_module(SM_FILTER_LOOK_BACK),
407        );
408
409        let mut chain = chain_builder
410            .initialize(&engine)
411            .expect("failed to build chain");
412
413        // when
414        fluvio_future::task::run_block_on(chain.look_back(
415            |lookback| {
416                assert_eq!(lookback, Lookback::Last(1));
417                async { Ok(vec![Record::new("2")]) }
418            },
419            &metrics,
420        ))
421        .expect("chain look_back");
422
423        // then
424        let input = vec![Record::new("1"), Record::new("2"), Record::new("3")];
425        let output = chain
426            .process(
427                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
428                    .expect("input"),
429                &metrics,
430            )
431            .expect("process");
432        assert_eq!(output.successes.len(), 1); // one record passed
433        assert_eq!(output.successes[0].value().to_string(), "3");
434        assert!(metrics.fuel_used() > 0);
435        assert_eq!(metrics.invocation_count(), 2);
436    }
437
438    #[ignore]
439    #[test]
440    fn test_chain_filter_look_back_error_propagated() {
441        //given
442        let engine = SmartEngine::new();
443        let mut chain_builder = SmartModuleChainBuilder::default();
444        let metrics = SmartModuleChainMetrics::default();
445
446        chain_builder.add_smart_module(
447            SmartModuleConfig::builder()
448                .lookback(Some(Lookback::Last(1)))
449                .build()
450                .unwrap(),
451            read_wasm_module(SM_FILTER_LOOK_BACK),
452        );
453
454        let mut chain = chain_builder
455            .initialize(&engine)
456            .expect("failed to build chain");
457
458        // when
459        let res = fluvio_future::task::run_block_on(chain.look_back(
460            |lookback| {
461                assert_eq!(lookback, Lookback::Last(1));
462                async { Ok(vec![Record::new("wrong str")]) }
463            },
464            &metrics,
465        ));
466
467        // then
468        assert!(res.is_err());
469        assert_eq!(
470            res.unwrap_err()
471                .downcast::<SmartModuleLookbackRuntimeError>()
472                .expect("downcasted"),
473            SmartModuleLookbackRuntimeError {
474                hint: "invalid digit found in string".to_string(),
475                offset: 0,
476                record_key: None,
477                record_value: "wrong str".to_string().into()
478            }
479        );
480        assert!(metrics.fuel_used() > 0);
481        assert_eq!(metrics.invocation_count(), 1);
482    }
483
484    #[test]
485    fn test_empty_chain() {
486        //given
487        let engine = SmartEngine::new();
488        let chain_builder = SmartModuleChainBuilder::default();
489        let mut chain = chain_builder
490            .initialize(&engine)
491            .expect("failed to build chain");
492
493        assert_eq!(chain.store.get_used_fuel(), 0);
494
495        let record = vec![Record::new("input")];
496        let input = SmartModuleInput::try_from_records(record, DEFAULT_SMARTENGINE_VERSION)
497            .expect("valid input record");
498        let metrics = SmartModuleChainMetrics::default();
499        //when
500        let output = chain.process(input, &metrics).expect("process failed");
501
502        //then
503        assert_eq!(output.successes.len(), 1);
504        assert_eq!(output.successes[0].value().to_string(), "input");
505    }
506
507    #[ignore]
508    #[test]
509    fn test_unsufficient_memory_to_instantiate() {
510        //given
511        let engine = SmartEngine::new();
512        let mut chain_builder = SmartModuleChainBuilder::default();
513        let max_memory = 1_000; // 1 kb
514
515        chain_builder.add_smart_module(
516            SmartModuleConfig::builder()
517                .lookback(Some(Lookback::Last(1)))
518                .build()
519                .unwrap(),
520            read_wasm_module(SM_FILTER_LOOK_BACK),
521        );
522        chain_builder.set_store_memory_limit(max_memory);
523
524        // when
525        let res = chain_builder.initialize(&engine);
526
527        // then
528        assert!(res.is_err());
529        let err = res
530            .unwrap_err()
531            .downcast::<EngineError>()
532            .expect("EngineError expected");
533        assert!(matches!(
534            err,
535            EngineError::StoreMemoryExceeded {
536                current: _,
537                requested: _,
538                max
539            }
540            if max == max_memory
541        ))
542    }
543
544    #[ignore]
545    #[test]
546    fn test_look_back_unsufficient_memory() {
547        //given
548        let engine = SmartEngine::new();
549        let mut chain_builder = SmartModuleChainBuilder::default();
550        let metrics = SmartModuleChainMetrics::default();
551        let max_memory = 1_000_000 * 2; // 2mb
552
553        chain_builder.add_smart_module(
554            SmartModuleConfig::builder()
555                .lookback(Some(Lookback::Last(1000)))
556                .build()
557                .unwrap(),
558            read_wasm_module(SM_FILTER_LOOK_BACK),
559        );
560        chain_builder.set_store_memory_limit(max_memory);
561
562        let mut chain = chain_builder
563            .initialize(&engine)
564            .expect("failed to build chain");
565
566        // when
567        let res = fluvio_future::task::run_block_on(chain.look_back(
568            |_| {
569                let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
570                async { Ok(res) }
571            },
572            &metrics,
573        ));
574
575        // then
576        assert!(res.is_err());
577        let err = res
578            .unwrap_err()
579            .downcast::<EngineError>()
580            .expect("EngineError expected");
581        assert!(matches!(
582            err,
583            EngineError::StoreMemoryExceeded {
584                current: _,
585                requested: _,
586                max
587            }
588            if max == max_memory
589        ))
590    }
591
592    #[ignore]
593    #[test]
594    fn test_process_unsufficient_memory() {
595        //given
596        let engine = SmartEngine::new();
597        let mut chain_builder = SmartModuleChainBuilder::default();
598        let metrics = SmartModuleChainMetrics::default();
599        let max_memory = 1_000_000 * 2; // 2mb
600
601        chain_builder.add_smart_module(
602            SmartModuleConfig::builder().build().unwrap(),
603            read_wasm_module(SM_FILTER_LOOK_BACK),
604        );
605        chain_builder.set_store_memory_limit(max_memory);
606
607        let mut chain = chain_builder
608            .initialize(&engine)
609            .expect("failed to build chain");
610
611        // when
612        let input: Vec<Record> = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
613        let res = chain.process(
614            SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION).expect("input"),
615            &metrics,
616        );
617
618        // then
619        assert!(res.is_err());
620        let err = res
621            .unwrap_err()
622            .downcast::<EngineError>()
623            .expect("EngineError expected");
624        assert!(matches!(
625            err,
626            EngineError::StoreMemoryExceeded {
627                current: _,
628                requested: _,
629                max
630            }
631            if max == max_memory
632        ))
633    }
634}