fluvio_smartengine/engine/wasmtime/
engine.rs1use std::fmt::{self, Debug};
2use std::future::Future;
3use std::collections::HashMap;
4
5use anyhow::Result;
6use fluvio_smartmodule::Record;
7use tracing::debug;
8use wasmtime::{Engine, Module};
9
10use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};
11
12use crate::SmartModuleConfig;
13use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
14
15use super::init::SmartModuleInit;
16use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};
17
18use super::limiter::StoreResourceLimiter;
19use super::look_back::SmartModuleLookBack;
20use super::metrics::SmartModuleChainMetrics;
21use super::state::WasmState;
22use super::transforms::create_transform;
23
24const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000;
26
27const TTGT_SMARTMODULE_CALL: &str = "fluvio_smartengine::smartmodule::call";
29
30#[derive(Clone)]
31pub struct SmartEngine(Engine);
32
33#[allow(clippy::new_without_default)]
34impl SmartEngine {
35 pub fn new() -> Self {
36 let mut config = wasmtime::Config::default();
37 config.consume_fuel(true);
38 Self(Engine::new(&config).expect("Config is static"))
39 }
40
41 pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState {
42 WasmState::new(&self.0, store_limiter)
43 }
44}
45
46impl Debug for SmartEngine {
47 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48 write!(f, "SmartModuleEngine")
49 }
50}
51
52pub struct SmartModuleChainBuilder {
54 smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
55 store_limiter: StoreResourceLimiter,
56}
57
58impl SmartModuleChainBuilder {
59 pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
61 self.smart_modules.push((config, bytes))
62 }
63
64 pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) {
65 self.store_limiter.set_memory_size(max_memory_bytes);
66 }
67
68 pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
70 let mut instances = Vec::with_capacity(self.smart_modules.len());
71 let mut state = engine.new_state(self.store_limiter);
72 for (config, bytes) in self.smart_modules {
73 let module = Module::new(&engine.0, bytes)?;
74 let version = config.version();
75 let ctx = SmartModuleInstanceContext::instantiate(
76 &mut state,
77 module,
78 config.params,
79 version,
80 config.lookback,
81 &config.smartmodule_names,
82 )?;
83 let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
84 let look_back = SmartModuleLookBack::try_instantiate(&ctx, &mut state)?;
85 let transform = create_transform(&ctx, config.initial_data, &mut state)?;
86 let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform, version);
87
88 instance.call_init(&mut state)?;
89 instances.push(instance);
90 }
91
92 Ok(SmartModuleChainInstance {
93 store: state,
94 instances,
95 })
96 }
97}
98
99impl Default for SmartModuleChainBuilder {
100 fn default() -> Self {
101 let mut store_limiter = StoreResourceLimiter::default();
102 store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT);
103 Self {
104 smart_modules: Default::default(),
105 store_limiter,
106 }
107 }
108}
109
110impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
111 fn from(pair: (SmartModuleConfig, T)) -> Self {
112 let mut result = Self::default();
113 result.add_smart_module(pair.0, pair.1.into());
114 result
115 }
116}
117
118pub struct SmartModuleChainInstance {
120 store: WasmState,
121 instances: Vec<SmartModuleInstance>,
122}
123
124impl Debug for SmartModuleChainInstance {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 write!(f, "SmartModuleChainInstance")
127 }
128}
129
130impl SmartModuleChainInstance {
131 #[cfg(test)]
132 pub(crate) fn instances(&self) -> &Vec<SmartModuleInstance> {
133 &self.instances
134 }
135
136 pub fn metrics_export(&self) -> HashMap<String, SmartModuleChainMetrics> {
138 let mut out = HashMap::<String, SmartModuleChainMetrics>::new();
139 for instance in self.instances.iter() {
140 let metrics = instance.metrics();
141 let num_modules = metrics.smartmodule_names().len();
144
145 let mfrac = SmartModuleChainMetrics::new(&[]);
147 mfrac.add_bytes_in(metrics.bytes_in() / num_modules as u64);
148 mfrac.add_records_out(metrics.records_out() / num_modules as u64);
149 mfrac.add_records_err(metrics.records_err() / num_modules as u64);
150 let frac_ms = metrics.cpu_ms() / num_modules as u64;
151 let frac_duration = std::time::Duration::from_millis(frac_ms);
152 mfrac.add_fuel_used(metrics.fuel_used() / num_modules as u64, frac_duration);
153 mfrac.add_invocation_count(metrics.invocation_count() / num_modules as u64);
154
155 for name in metrics.smartmodule_names() {
156 if let Some(existing_metrics) = out.get(name) {
159 existing_metrics.append(&mfrac);
160 } else {
161 let mfrac_w_name = SmartModuleChainMetrics::new(&[name.to_string()]);
162 mfrac_w_name.append(&mfrac);
163 out.insert(name.to_string(), mfrac_w_name);
164 }
165 }
166 }
167 self.metrics_reset();
168 out
169 }
170
171 pub fn metrics_reset(&self) {
172 for instance in self.instances.iter() {
173 instance.metrics().reset();
174 }
175 }
176
177 pub fn process(&mut self, input: SmartModuleInput) -> Result<SmartModuleOutput> {
182 let raw_len = input.raw_bytes().len();
183 tracing::trace!(target = TTGT_SMARTMODULE_CALL, raw_len, "sm raw input");
184
185 let base_offset = input.base_offset();
186 let base_timestamp = input.base_timestamp();
187
188 if let Some((last, instances)) = self.instances.split_last_mut() {
189 let mut next_input = input;
190
191 for instance in instances {
192 self.store.top_up_fuel();
193 let output = instance.process(next_input, &mut self.store)?;
194 if let Some(ref smerr) = output.error {
195 tracing::error!(err=?smerr);
197 return Ok(output);
198 } else {
199 next_input =
200 SmartModuleInput::try_from_records(output.successes, instance.version())?;
201 next_input.set_base_offset(base_offset);
202 next_input.set_base_timestamp(base_timestamp);
203 }
204 }
205
206 self.store.top_up_fuel();
207 let output = last.process(next_input, &mut self.store)?;
208 if let Some(ref smerr) = output.error {
209 tracing::error!(err=?smerr);
210 }
211 let records_out = output.successes.len();
212 tracing::trace!(
213 target = TTGT_SMARTMODULE_CALL,
214 records_out,
215 "sm records out"
216 );
217 Ok(output)
218 } else {
219 #[allow(deprecated)]
220 let records = input.try_into_records(DEFAULT_SMARTENGINE_VERSION)?;
221
222 Ok(SmartModuleOutput::new(records))
223 }
224 }
225
226 pub async fn look_back<F, R>(&mut self, read_fn: F) -> Result<()>
227 where
228 R: Future<Output = Result<Vec<Record>>>,
229 F: Fn(Lookback) -> R,
230 {
231 debug!("look_back on chain with {} instances", self.instances.len());
232
233 for instance in self.instances.iter_mut() {
234 let metrics = instance.metrics();
235 if let Some(lookback) = instance.lookback() {
236 debug!("look_back on instance");
237 let records: Vec<Record> = read_fn(lookback).await?;
238 let input: SmartModuleInput =
239 SmartModuleInput::try_from_records(records, instance.version())?;
240
241 let time = std::time::Instant::now();
242
243 metrics.add_bytes_in(input.raw_bytes().len() as u64);
244 self.store.top_up_fuel();
245
246 let result = instance.call_look_back(input, &mut self.store);
247 let fuel_used = self.store.get_used_fuel();
248
249 debug!(fuel_used, "fuel used");
250 metrics.add_fuel_used(fuel_used, time.elapsed());
251 metrics.add_invocation_count(1);
252 result?;
253 }
254 }
255
256 Ok(())
257 }
258}
259
260#[cfg(test)]
261mod test {
262
263 use crate::SmartModuleConfig;
264
265 #[test]
266 fn test_param() {
267 let config = SmartModuleConfig::builder()
268 .smartmodule_names(&["test".to_string()])
269 .param("key", "apple")
270 .build()
271 .unwrap();
272
273 assert_eq!(config.params.get("key"), Some(&"apple".to_string()));
274 }
275}
276
277#[cfg(test)]
278mod chaining_test {
279
280 use fluvio_protocol::record::Record;
281 use fluvio_protocol::link::smartmodule::SmartModuleLookbackRuntimeError;
282 use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
283
284 use crate::engine::error::EngineError;
285 use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
286
287 use super::super::{
288 SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
289 };
290
291 const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
292 const SM_MAP: &str = "fluvio_smartmodule_map";
293 const SM_FILTER_LOOK_BACK: &str = "fluvio_smartmodule_filter_lookback";
294
295 use super::super::fixture::read_wasm_module;
296
297 #[ignore]
298 #[test]
299 fn test_chain_filter_map() {
300 let engine = SmartEngine::new();
301 let mut chain_builder = SmartModuleChainBuilder::default();
302
303 let sm = read_wasm_module(SM_FILTER_INIT);
304 chain_builder.add_smart_module(
305 SmartModuleConfig::builder()
306 .smartmodule_names(&[sm.0])
307 .param("key", "a")
308 .build()
309 .unwrap(),
310 sm.1,
311 );
312
313 let sm = read_wasm_module(SM_MAP);
314 chain_builder.add_smart_module(
315 SmartModuleConfig::builder()
316 .smartmodule_names(&[sm.0])
317 .build()
318 .unwrap(),
319 sm.1,
320 );
321
322 let mut chain = chain_builder
323 .initialize(&engine)
324 .expect("failed to build chain");
325 assert_eq!(chain.instances().len(), 2);
326
327 let input = vec![Record::new("hello world")];
328 let output = chain
329 .process(
330 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
331 .expect("input"),
332 )
333 .expect("process");
334 assert_eq!(output.successes.len(), 0); let input = vec![
337 Record::new("apple"),
338 Record::new("fruit"),
339 Record::new("banana"),
340 ];
341 let output = chain
342 .process(
343 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
344 .expect("input"),
345 )
346 .expect("process");
347 assert_eq!(output.successes.len(), 2); assert_eq!(output.successes[0].value.as_ref(), b"APPLE");
349 assert_eq!(output.successes[1].value.as_ref(), b"BANANA");
350 let fuel_used = chain
351 .instances()
352 .iter()
353 .map(|i| i.metrics().fuel_used())
354 .sum::<u64>();
355 assert!(fuel_used > 0);
356 chain.store.top_up_fuel();
357 assert_eq!(chain.store.get_used_fuel(), 0);
358 }
359
360 const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";
361
362 #[ignore]
363 #[test]
364 fn test_chain_filter_aggregate() {
365 let engine = SmartEngine::new();
366 let mut chain_builder = SmartModuleChainBuilder::default();
367
368 let sm = read_wasm_module(SM_FILTER_INIT);
369 chain_builder.add_smart_module(
370 SmartModuleConfig::builder()
371 .smartmodule_names(&[sm.0])
372 .param("key", "a")
373 .build()
374 .unwrap(),
375 sm.1,
376 );
377
378 let sm = read_wasm_module(SM_AGGEGRATE);
379 chain_builder.add_smart_module(
380 SmartModuleConfig::builder()
381 .smartmodule_names(&[sm.0])
382 .initial_data(SmartModuleInitialData::with_aggregate(
383 "zero".to_string().as_bytes().to_vec(),
384 ))
385 .build()
386 .unwrap(),
387 sm.1,
388 );
389
390 let mut chain = chain_builder
391 .initialize(&engine)
392 .expect("failed to build chain");
393 assert_eq!(chain.instances().len(), 2);
394
395 let input = vec![
397 Record::new("apple"),
398 Record::new("fruit"),
399 Record::new("banana"),
400 ];
401 let output = chain
402 .process(
403 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
404 .expect("input"),
405 )
406 .expect("process");
407 assert_eq!(output.successes.len(), 2); assert_eq!(output.successes[0].value().to_string(), "zeroapple");
409 assert_eq!(output.successes[1].value().to_string(), "zeroapplebanana");
410
411 let input = vec![Record::new("nothing")];
412 let output = chain
413 .process(
414 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
415 .expect("input"),
416 )
417 .expect("process");
418 assert_eq!(output.successes.len(), 0); let input = vec![Record::new("elephant")];
421 let output = chain
422 .process(
423 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
424 .expect("input"),
425 )
426 .expect("process");
427 assert_eq!(output.successes.len(), 1); assert_eq!(
429 output.successes[0].value().to_string(),
430 "zeroapplebananaelephant"
431 );
432 }
433
434 #[ignore]
435 #[test]
436 fn test_chain_filter_look_back() {
437 let engine = SmartEngine::new();
439 let mut chain_builder = SmartModuleChainBuilder::default();
440
441 let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
442 chain_builder.add_smart_module(
443 SmartModuleConfig::builder()
444 .smartmodule_names(&[sm.0])
445 .lookback(Some(Lookback::Last(1)))
446 .build()
447 .unwrap(),
448 sm.1,
449 );
450
451 let mut chain = chain_builder
452 .initialize(&engine)
453 .expect("failed to build chain");
454
455 fluvio_future::task::run_block_on(chain.look_back(|lookback| {
457 assert_eq!(lookback, Lookback::Last(1));
458 async { Ok(vec![Record::new("2")]) }
459 }))
460 .expect("chain look_back");
461
462 let input = vec![Record::new("1"), Record::new("2"), Record::new("3")];
464 let output = chain
465 .process(
466 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
467 .expect("input"),
468 )
469 .expect("process");
470 assert_eq!(output.successes.len(), 1); assert_eq!(output.successes[0].value().to_string(), "3");
472 let metrics = chain.metrics_export();
473 assert_eq!(metrics.len(), 1);
474 let module_metrics = metrics
475 .get("fluvio_smartmodule_filter_lookback")
476 .expect("module metrics");
477 assert!(module_metrics.fuel_used() > 0);
478 assert_eq!(module_metrics.invocation_count(), 2);
479 }
480
481 #[ignore]
482 #[test]
483 fn test_chain_filter_look_back_error_propagated() {
484 let engine = SmartEngine::new();
486 let mut chain_builder = SmartModuleChainBuilder::default();
487
488 let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
489 chain_builder.add_smart_module(
490 SmartModuleConfig::builder()
491 .smartmodule_names(&[sm.0])
492 .lookback(Some(Lookback::Last(1)))
493 .build()
494 .unwrap(),
495 sm.1,
496 );
497
498 let mut chain = chain_builder
499 .initialize(&engine)
500 .expect("failed to build chain");
501
502 let res = fluvio_future::task::run_block_on(chain.look_back(|lookback| {
504 assert_eq!(lookback, Lookback::Last(1));
505 async { Ok(vec![Record::new("wrong str")]) }
506 }));
507
508 assert!(res.is_err());
510 assert_eq!(
511 res.unwrap_err()
512 .downcast::<SmartModuleLookbackRuntimeError>()
513 .expect("downcasted"),
514 SmartModuleLookbackRuntimeError {
515 hint: "invalid digit found in string".to_string(),
516 offset: 0,
517 record_key: None,
518 record_value: "wrong str".to_string().into()
519 }
520 );
521 let sm_metrics = chain.metrics_export();
522 let metrics = sm_metrics.get(SM_FILTER_LOOK_BACK).expect("module metrics");
523 assert!(metrics.fuel_used() > 0);
524 assert_eq!(metrics.invocation_count(), 1);
525 }
526
527 #[test]
528 fn test_empty_chain() {
529 let engine = SmartEngine::new();
531 let chain_builder = SmartModuleChainBuilder::default();
532 let mut chain = chain_builder
533 .initialize(&engine)
534 .expect("failed to build chain");
535
536 assert_eq!(chain.store.get_used_fuel(), 0);
537
538 let record = vec![Record::new("input")];
539 let input = SmartModuleInput::try_from_records(record, DEFAULT_SMARTENGINE_VERSION)
540 .expect("valid input record");
541
542 let output = chain.process(input).expect("process failed");
544
545 assert_eq!(output.successes.len(), 1);
547 assert_eq!(output.successes[0].value().to_string(), "input");
548 }
549
550 #[ignore]
551 #[test]
552 fn test_unsufficient_memory_to_instantiate() {
553 let engine = SmartEngine::new();
555 let mut chain_builder = SmartModuleChainBuilder::default();
556 let max_memory = 1_000; let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
559 chain_builder.add_smart_module(
560 SmartModuleConfig::builder()
561 .smartmodule_names(&[sm.0])
562 .lookback(Some(Lookback::Last(1)))
563 .build()
564 .unwrap(),
565 sm.1,
566 );
567 chain_builder.set_store_memory_limit(max_memory);
568
569 let res = chain_builder.initialize(&engine);
570
571 assert!(res.is_err());
573 let err = res
574 .unwrap_err()
575 .downcast::<EngineError>()
576 .expect("EngineError expected");
577 assert!(matches!(
578 err,
579 EngineError::StoreMemoryExceeded {
580 current: _,
581 requested: _,
582 max
583 }
584 if max == max_memory
585 ))
586 }
587
588 #[ignore]
589 #[test]
590 fn test_look_back_unsufficient_memory() {
591 let engine = SmartEngine::new();
593 let mut chain_builder = SmartModuleChainBuilder::default();
594 let max_memory = 1_000_000 * 2; let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
597 chain_builder.add_smart_module(
598 SmartModuleConfig::builder()
599 .smartmodule_names(&[sm.0])
600 .lookback(Some(Lookback::Last(1000)))
601 .build()
602 .unwrap(),
603 sm.1,
604 );
605 chain_builder.set_store_memory_limit(max_memory);
606
607 let mut chain = chain_builder
608 .initialize(&engine)
609 .expect("failed to build chain");
610
611 let res = fluvio_future::task::run_block_on(chain.look_back(|_| {
613 let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
614 async { Ok(res) }
615 }));
616
617 assert!(res.is_err());
619 let err = res
620 .unwrap_err()
621 .downcast::<EngineError>()
622 .expect("EngineError expected");
623 assert!(matches!(
624 err,
625 EngineError::StoreMemoryExceeded {
626 current: _,
627 requested: _,
628 max
629 }
630 if max == max_memory
631 ))
632 }
633
634 #[ignore]
635 #[test]
636 fn test_process_unsufficient_memory() {
637 let engine = SmartEngine::new();
639 let mut chain_builder = SmartModuleChainBuilder::default();
640 let max_memory = 1_000_000 * 2; let sm = read_wasm_module(SM_FILTER_LOOK_BACK);
643 chain_builder.add_smart_module(
644 SmartModuleConfig::builder()
645 .smartmodule_names(&[sm.0])
646 .build()
647 .unwrap(),
648 sm.1,
649 );
650 chain_builder.set_store_memory_limit(max_memory);
651
652 let mut chain = chain_builder
653 .initialize(&engine)
654 .expect("failed to build chain");
655
656 let input: Vec<Record> = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
658 let res = chain.process(
659 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION).expect("input"),
660 );
661
662 assert!(res.is_err());
664 let err = res
665 .unwrap_err()
666 .downcast::<EngineError>()
667 .expect("EngineError expected");
668 assert!(matches!(
669 err,
670 EngineError::StoreMemoryExceeded {
671 current: _,
672 requested: _,
673 max
674 }
675 if max == max_memory
676 ))
677 }
678}