fluvio_smartengine/engine/wasmtime/
engine.rs

1use std::fmt::{self, Debug};
2use std::future::Future;
3use std::collections::HashMap;
4
5use anyhow::Result;
6use fluvio_smartmodule::Record;
7use tracing::debug;
8use wasmtime::{Engine, Module};
9
10use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};
11
12use crate::SmartModuleConfig;
13use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
14
15use super::init::SmartModuleInit;
16use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};
17
18use super::limiter::StoreResourceLimiter;
19use super::look_back::SmartModuleLookBack;
20use super::metrics::SmartModuleChainMetrics;
21use super::state::WasmState;
22use super::transforms::create_transform;
23
24// 1 GB
25const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000;
26
27// tracing target
28const TTGT_SMARTMODULE_CALL: &str = "fluvio_smartengine::smartmodule::call";
29
30#[derive(Clone)]
31pub struct SmartEngine(Engine);
32
33#[allow(clippy::new_without_default)]
34impl SmartEngine {
35    pub fn new() -> Self {
36        let mut config = wasmtime::Config::default();
37        config.consume_fuel(true);
38        Self(Engine::new(&config).expect("Config is static"))
39    }
40
41    pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState {
42        WasmState::new(&self.0, store_limiter)
43    }
44}
45
46impl Debug for SmartEngine {
47    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48        write!(f, "SmartModuleEngine")
49    }
50}
51
52/// Building SmartModule
53pub struct SmartModuleChainBuilder {
54    smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
55    store_limiter: StoreResourceLimiter,
56}
57
58impl SmartModuleChainBuilder {
59    /// Add SmartModule with a single transform and init
60    pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
61        self.smart_modules.push((config, bytes))
62    }
63
64    pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) {
65        self.store_limiter.set_memory_size(max_memory_bytes);
66    }
67
68    /// stop adding smartmodule and return SmartModuleChain that can be executed
69    pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
70        let mut instances = Vec::with_capacity(self.smart_modules.len());
71        let mut state = engine.new_state(self.store_limiter);
72        for (config, bytes) in self.smart_modules {
73            let module = Module::new(&engine.0, bytes)?;
74            let version = config.version();
75            let ctx = SmartModuleInstanceContext::instantiate(
76                &mut state,
77                module,
78                config.params,
79                version,
80                config.lookback,
81                &config.smartmodule_names,
82            )?;
83            let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
84            let look_back = SmartModuleLookBack::try_instantiate(&ctx, &mut state)?;
85            let transform = create_transform(&ctx, config.initial_data, &mut state)?;
86            let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform, version);
87
88            instance.call_init(&mut state)?;
89            instances.push(instance);
90        }
91
92        Ok(SmartModuleChainInstance {
93            store: state,
94            instances,
95        })
96    }
97}
98
99impl Default for SmartModuleChainBuilder {
100    fn default() -> Self {
101        let mut store_limiter = StoreResourceLimiter::default();
102        store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT);
103        Self {
104            smart_modules: Default::default(),
105            store_limiter,
106        }
107    }
108}
109
110impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
111    fn from(pair: (SmartModuleConfig, T)) -> Self {
112        let mut result = Self::default();
113        result.add_smart_module(pair.0, pair.1.into());
114        result
115    }
116}
117
118/// SmartModule Chain Instance that can be executed
119pub struct SmartModuleChainInstance {
120    store: WasmState,
121    instances: Vec<SmartModuleInstance>,
122}
123
124impl Debug for SmartModuleChainInstance {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        write!(f, "SmartModuleChainInstance")
127    }
128}
129
130impl SmartModuleChainInstance {
131    #[cfg(test)]
132    pub(crate) fn instances(&self) -> &Vec<SmartModuleInstance> {
133        &self.instances
134    }
135
136    /// split the metrics among each smartmodule in the chain export
137    pub fn metrics_export(&self) -> HashMap<String, SmartModuleChainMetrics> {
138        let mut out = HashMap::<String, SmartModuleChainMetrics>::new();
139        for instance in self.instances.iter() {
140            let metrics = instance.metrics();
141            // depending on the number of smartmodules, split the metrics among each
142            // smartmodule in the output
143            let num_modules = metrics.smartmodule_names().len();
144
145            // fractional metric
146            let mfrac = SmartModuleChainMetrics::new(&[]);
147            mfrac.add_bytes_in(metrics.bytes_in() / num_modules as u64);
148            mfrac.add_records_out(metrics.records_out() / num_modules as u64);
149            mfrac.add_records_err(metrics.records_err() / num_modules as u64);
150            let frac_ms = metrics.cpu_ms() / num_modules as u64;
151            let frac_duration = std::time::Duration::from_millis(frac_ms);
152            mfrac.add_fuel_used(metrics.fuel_used() / num_modules as u64, frac_duration);
153            mfrac.add_invocation_count(metrics.invocation_count() / num_modules as u64);
154
155            for name in metrics.smartmodule_names() {
156                // if the name exists in the output, add the metrics to it
157                // otherwise, create a new entry
158                if let Some(existing_metrics) = out.get(name) {
159                    existing_metrics.append(&mfrac);
160                } else {
161                    let mfrac_w_name = SmartModuleChainMetrics::new(&[name.to_string()]);
162                    mfrac_w_name.append(&mfrac);
163                    out.insert(name.to_string(), mfrac_w_name);
164                }
165            }
166        }
167        self.metrics_reset();
168        out
169    }
170
171    pub fn metrics_reset(&self) {
172        for instance in self.instances.iter() {
173            instance.metrics().reset();
174        }
175    }
176
177    /// A single record is processed thru all smartmodules in the chain.
178    /// The output of one smartmodule is the input of the next smartmodule.
179    /// A single record may result in multiple records.
180    /// The output of the last smartmodule is added to the output of the chain.
181    pub fn process(&mut self, input: SmartModuleInput) -> Result<SmartModuleOutput> {
182        let raw_len = input.raw_bytes().len();
183        tracing::trace!(target = TTGT_SMARTMODULE_CALL, raw_len, "sm raw input");
184
185        let base_offset = input.base_offset();
186        let base_timestamp = input.base_timestamp();
187
188        if let Some((last, instances)) = self.instances.split_last_mut() {
189            let mut next_input = input;
190
191            for instance in instances {
192                self.store.top_up_fuel();
193                let output = instance.process(next_input, &mut self.store)?;
194                if let Some(ref smerr) = output.error {
195                    // encountered error, we stop processing and return partial output
196                    tracing::error!(err=?smerr);
197                    return Ok(output);
198                } else {
199                    next_input =
200                        SmartModuleInput::try_from_records(output.successes, instance.version())?;
201                    next_input.set_base_offset(base_offset);
202                    next_input.set_base_timestamp(base_timestamp);
203                }
204            }
205
206            self.store.top_up_fuel();
207            let output = last.process(next_input, &mut self.store)?;
208            if let Some(ref smerr) = output.error {
209                tracing::error!(err=?smerr);
210            }
211            let records_out = output.successes.len();
212            tracing::trace!(
213                target = TTGT_SMARTMODULE_CALL,
214                records_out,
215                "sm records out"
216            );
217            Ok(output)
218        } else {
219            #[allow(deprecated)]
220            let records = input.try_into_records(DEFAULT_SMARTENGINE_VERSION)?;
221
222            Ok(SmartModuleOutput::new(records))
223        }
224    }
225
226    pub async fn look_back<F, R>(&mut self, read_fn: F) -> Result<()>
227    where
228        R: Future<Output = Result<Vec<Record>>>,
229        F: Fn(Lookback) -> R,
230    {
231        debug!("look_back on chain with {} instances", self.instances.len());
232
233        for instance in self.instances.iter_mut() {
234            let metrics = instance.metrics();
235            if let Some(lookback) = instance.lookback() {
236                debug!("look_back on instance");
237                let records: Vec<Record> = read_fn(lookback).await?;
238                let input: SmartModuleInput =
239                    SmartModuleInput::try_from_records(records, instance.version())?;
240
241                let time = std::time::Instant::now();
242
243                metrics.add_bytes_in(input.raw_bytes().len() as u64);
244                self.store.top_up_fuel();
245
246                let result = instance.call_look_back(input, &mut self.store);
247                let fuel_used = self.store.get_used_fuel();
248
249                debug!(fuel_used, "fuel used");
250                metrics.add_fuel_used(fuel_used, time.elapsed());
251                metrics.add_invocation_count(1);
252                result?;
253            }
254        }
255
256        Ok(())
257    }
258}
259
260#[cfg(test)]
261mod test {
262
263    use crate::SmartModuleConfig;
264
265    #[test]
266    fn test_param() {
267        let config = SmartModuleConfig::builder()
268            .smartmodule_names(&["test".to_string()])
269            .param("key", "apple")
270            .build()
271            .unwrap();
272
273        assert_eq!(config.params.get("key"), Some(&"apple".to_string()));
274    }
275}
276
277#[cfg(test)]
278mod chaining_test {
279
280    use fluvio_protocol::record::Record;
281    use fluvio_protocol::link::smartmodule::SmartModuleLookbackRuntimeError;
282    use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
283
284    use crate::engine::error::EngineError;
285    use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
286
287    use super::super::{
288        SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
289    };
290
291    const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
292    const SM_MAP: &str = "fluvio_smartmodule_map";
293    const SM_FILTER_LOOK_BACK: &str = "fluvio_smartmodule_filter_lookback";
294
295    use super::super::fixture::read_wasm_module;
296
297    #[ignore]
298    #[test]
299    fn test_chain_filter_map() {
300        let engine = SmartEngine::new();
301        let mut chain_builder = SmartModuleChainBuilder::default();
302
303        let sm = read_wasm_module(SM_FILTER_INIT);
304        chain_builder.add_smart_module(
305            SmartModuleConfig::builder()
306                .smartmodule_names(&[sm.0])
307                .param("key", "a")
308                .build()
309                .unwrap(),
310            sm.1,
311        );
312
313        let sm = read_wasm_module(SM_MAP);
314        chain_builder.add_smart_module(
315            SmartModuleConfig::builder()
316                .smartmodule_names(&[sm.0])
317                .build()
318                .unwrap(),
319            sm.1,
320        );
321
322        let mut chain = chain_builder
323            .initialize(&engine)
324            .expect("failed to build chain");
325        assert_eq!(chain.instances().len(), 2);
326
327        let input = vec![Record::new("hello world")];
328        let output = chain
329            .process(
330                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
331                    .expect("input"),
332            )
333            .expect("process");
334        assert_eq!(output.successes.len(), 0); // no records passed
335
336        let input = vec![
337            Record::new("apple"),
338            Record::new("fruit"),
339            Record::new("banana"),
340        ];
341        let output = chain
342            .process(
343                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
344                    .expect("input"),
345            )
346            .expect("process");
347        assert_eq!(output.successes.len(), 2); // one record passed
348        assert_eq!(output.successes[0].value.as_ref(), b"APPLE");
349        assert_eq!(output.successes[1].value.as_ref(), b"BANANA");
350        let fuel_used = chain
351            .instances()
352            .iter()
353            .map(|i| i.metrics().fuel_used())
354            .sum::<u64>();
355        assert!(fuel_used > 0);
356        chain.store.top_up_fuel();
357        assert_eq!(chain.store.get_used_fuel(), 0);
358    }
359
360    const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";
361
362    #[ignore]
363    #[test]
364    fn test_chain_filter_aggregate() {
365        let engine = SmartEngine::new();
366        let mut chain_builder = SmartModuleChainBuilder::default();
367
368        let sm = read_wasm_module(SM_FILTER_INIT);
369        chain_builder.add_smart_module(
370            SmartModuleConfig::builder()
371                .smartmodule_names(&[sm.0])
372                .param("key", "a")
373                .build()
374                .unwrap(),
375            sm.1,
376        );
377
378        let sm = read_wasm_module(SM_AGGEGRATE);
379        chain_builder.add_smart_module(
380            SmartModuleConfig::builder()
381                .smartmodule_names(&[sm.0])
382                .initial_data(SmartModuleInitialData::with_aggregate(
383                    "zero".to_string().as_bytes().to_vec(),
384                ))
385                .build()
386                .unwrap(),
387            sm.1,
388        );
389
390        let mut chain = chain_builder
391            .initialize(&engine)
392            .expect("failed to build chain");
393        assert_eq!(chain.instances().len(), 2);
394
395        // when
396        let input = vec![
397            Record::new("apple"),
398            Record::new("fruit"),
399            Record::new("banana"),
400        ];
401        let output = chain
402            .process(
403                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
404                    .expect("input"),
405            )
406            .expect("process");
407        assert_eq!(output.successes.len(), 2); // one record passed
408        assert_eq!(output.successes[0].value().to_string(), "zeroapple");
409        assert_eq!(output.successes[1].value().to_string(), "zeroapplebanana");
410
411        let input = vec![Record::new("nothing")];
412        let output = chain
413            .process(
414                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
415                    .expect("input"),
416            )
417            .expect("process");
418        assert_eq!(output.successes.len(), 0); // one record passed
419
420        let input = vec![Record::new("elephant")];
421        let output = chain
422            .process(
423                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
424                    .expect("input"),
425            )
426            .expect("process");
427        assert_eq!(output.successes.len(), 1); // one record passed
428        assert_eq!(
429            output.successes[0].value().to_string(),
430            "zeroapplebananaelephant"
431        );
432    }
433
434    #[ignore]
435    #[test]
436    fn test_chain_filter_look_back() {
437        //given
438        let engine = SmartEngine::new();
439        let mut chain_builder = SmartModuleChainBuilder::default();
440
441        let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
442        chain_builder.add_smart_module(
443            SmartModuleConfig::builder()
444                .smartmodule_names(&[sm.0])
445                .lookback(Some(Lookback::Last(1)))
446                .build()
447                .unwrap(),
448            sm.1,
449        );
450
451        let mut chain = chain_builder
452            .initialize(&engine)
453            .expect("failed to build chain");
454
455        // when
456        fluvio_future::task::run_block_on(chain.look_back(|lookback| {
457            assert_eq!(lookback, Lookback::Last(1));
458            async { Ok(vec![Record::new("2")]) }
459        }))
460        .expect("chain look_back");
461
462        // then
463        let input = vec![Record::new("1"), Record::new("2"), Record::new("3")];
464        let output = chain
465            .process(
466                SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
467                    .expect("input"),
468            )
469            .expect("process");
470        assert_eq!(output.successes.len(), 1); // one record passed
471        assert_eq!(output.successes[0].value().to_string(), "3");
472        let metrics = chain.metrics_export();
473        assert_eq!(metrics.len(), 1);
474        let module_metrics = metrics
475            .get("fluvio_smartmodule_filter_lookback")
476            .expect("module metrics");
477        assert!(module_metrics.fuel_used() > 0);
478        assert_eq!(module_metrics.invocation_count(), 2);
479    }
480
481    #[ignore]
482    #[test]
483    fn test_chain_filter_look_back_error_propagated() {
484        //given
485        let engine = SmartEngine::new();
486        let mut chain_builder = SmartModuleChainBuilder::default();
487
488        let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
489        chain_builder.add_smart_module(
490            SmartModuleConfig::builder()
491                .smartmodule_names(&[sm.0])
492                .lookback(Some(Lookback::Last(1)))
493                .build()
494                .unwrap(),
495            sm.1,
496        );
497
498        let mut chain = chain_builder
499            .initialize(&engine)
500            .expect("failed to build chain");
501
502        // when
503        let res = fluvio_future::task::run_block_on(chain.look_back(|lookback| {
504            assert_eq!(lookback, Lookback::Last(1));
505            async { Ok(vec![Record::new("wrong str")]) }
506        }));
507
508        // then
509        assert!(res.is_err());
510        assert_eq!(
511            res.unwrap_err()
512                .downcast::<SmartModuleLookbackRuntimeError>()
513                .expect("downcasted"),
514            SmartModuleLookbackRuntimeError {
515                hint: "invalid digit found in string".to_string(),
516                offset: 0,
517                record_key: None,
518                record_value: "wrong str".to_string().into()
519            }
520        );
521        let sm_metrics = chain.metrics_export();
522        let metrics = sm_metrics.get(SM_FILTER_LOOK_BACK).expect("module metrics");
523        assert!(metrics.fuel_used() > 0);
524        assert_eq!(metrics.invocation_count(), 1);
525    }
526
527    #[test]
528    fn test_empty_chain() {
529        //given
530        let engine = SmartEngine::new();
531        let chain_builder = SmartModuleChainBuilder::default();
532        let mut chain = chain_builder
533            .initialize(&engine)
534            .expect("failed to build chain");
535
536        assert_eq!(chain.store.get_used_fuel(), 0);
537
538        let record = vec![Record::new("input")];
539        let input = SmartModuleInput::try_from_records(record, DEFAULT_SMARTENGINE_VERSION)
540            .expect("valid input record");
541
542        //when
543        let output = chain.process(input).expect("process failed");
544
545        //then
546        assert_eq!(output.successes.len(), 1);
547        assert_eq!(output.successes[0].value().to_string(), "input");
548    }
549
550    #[ignore]
551    #[test]
552    fn test_unsufficient_memory_to_instantiate() {
553        //given
554        let engine = SmartEngine::new();
555        let mut chain_builder = SmartModuleChainBuilder::default();
556        let max_memory = 1_000; // 1 kb
557
558        let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
559        chain_builder.add_smart_module(
560            SmartModuleConfig::builder()
561                .smartmodule_names(&[sm.0])
562                .lookback(Some(Lookback::Last(1)))
563                .build()
564                .unwrap(),
565            sm.1,
566        );
567        chain_builder.set_store_memory_limit(max_memory);
568
569        let res = chain_builder.initialize(&engine);
570
571        // then
572        assert!(res.is_err());
573        let err = res
574            .unwrap_err()
575            .downcast::<EngineError>()
576            .expect("EngineError expected");
577        assert!(matches!(
578            err,
579            EngineError::StoreMemoryExceeded {
580                current: _,
581                requested: _,
582                max
583            }
584            if max == max_memory
585        ))
586    }
587
588    #[ignore]
589    #[test]
590    fn test_look_back_unsufficient_memory() {
591        //given
592        let engine = SmartEngine::new();
593        let mut chain_builder = SmartModuleChainBuilder::default();
594        let max_memory = 1_000_000 * 2; // 2mb
595
596        let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
597        chain_builder.add_smart_module(
598            SmartModuleConfig::builder()
599                .smartmodule_names(&[sm.0])
600                .lookback(Some(Lookback::Last(1000)))
601                .build()
602                .unwrap(),
603            sm.1,
604        );
605        chain_builder.set_store_memory_limit(max_memory);
606
607        let mut chain = chain_builder
608            .initialize(&engine)
609            .expect("failed to build chain");
610
611        // when
612        let res = fluvio_future::task::run_block_on(chain.look_back(|_| {
613            let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
614            async { Ok(res) }
615        }));
616
617        // then
618        assert!(res.is_err());
619        let err = res
620            .unwrap_err()
621            .downcast::<EngineError>()
622            .expect("EngineError expected");
623        assert!(matches!(
624            err,
625            EngineError::StoreMemoryExceeded {
626                current: _,
627                requested: _,
628                max
629            }
630            if max == max_memory
631        ))
632    }
633
634    #[ignore]
635    #[test]
636    fn test_process_unsufficient_memory() {
637        //given
638        let engine = SmartEngine::new();
639        let mut chain_builder = SmartModuleChainBuilder::default();
640        let max_memory = 1_000_000 * 2; // 2mb
641
642        let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
643        chain_builder.add_smart_module(
644            SmartModuleConfig::builder()
645                .smartmodule_names(&[sm.0])
646                .build()
647                .unwrap(),
648            sm.1,
649        );
650        chain_builder.set_store_memory_limit(max_memory);
651
652        let mut chain = chain_builder
653            .initialize(&engine)
654            .expect("failed to build chain");
655
656        // when
657        let input: Vec<Record> = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
658        let res = chain.process(
659            SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION).expect("input"),
660        );
661
662        // then
663        assert!(res.is_err());
664        let err = res
665            .unwrap_err()
666            .downcast::<EngineError>()
667            .expect("EngineError expected");
668        assert!(matches!(
669            err,
670            EngineError::StoreMemoryExceeded {
671                current: _,
672                requested: _,
673                max
674            }
675            if max == max_memory
676        ))
677    }
678}