1use std::fmt::{self, Debug};
2use std::future::Future;
3
4use anyhow::Result;
5use fluvio_smartmodule::Record;
6use tracing::debug;
7use wasmtime::{Engine, Module};
8
9use fluvio_smartmodule::dataplane::smartmodule::{SmartModuleInput, SmartModuleOutput};
10
11use crate::SmartModuleConfig;
12use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
13
14use super::init::SmartModuleInit;
15use super::instance::{SmartModuleInstance, SmartModuleInstanceContext};
16
17use super::limiter::StoreResourceLimiter;
18use super::look_back::SmartModuleLookBack;
19use super::metrics::SmartModuleChainMetrics;
20use super::state::WasmState;
21use super::transforms::create_transform;
22
23const DEFAULT_STORE_MEMORY_LIMIT: usize = 1_000_000_000;
25
26#[derive(Clone)]
27pub struct SmartEngine(Engine);
28
29#[allow(clippy::new_without_default)]
30impl SmartEngine {
31 pub fn new() -> Self {
32 let mut config = wasmtime::Config::default();
33 config.consume_fuel(true);
34 Self(Engine::new(&config).expect("Config is static"))
35 }
36
37 pub(crate) fn new_state(&self, store_limiter: StoreResourceLimiter) -> WasmState {
38 WasmState::new(&self.0, store_limiter)
39 }
40}
41
42impl Debug for SmartEngine {
43 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
44 write!(f, "SmartModuleEngine")
45 }
46}
47
48pub struct SmartModuleChainBuilder {
50 smart_modules: Vec<(SmartModuleConfig, Vec<u8>)>,
51 store_limiter: StoreResourceLimiter,
52}
53
54impl SmartModuleChainBuilder {
55 pub fn add_smart_module(&mut self, config: SmartModuleConfig, bytes: Vec<u8>) {
57 self.smart_modules.push((config, bytes))
58 }
59
60 pub fn set_store_memory_limit(&mut self, max_memory_bytes: usize) {
61 self.store_limiter.set_memory_size(max_memory_bytes);
62 }
63
64 pub fn initialize(self, engine: &SmartEngine) -> Result<SmartModuleChainInstance> {
66 let mut instances = Vec::with_capacity(self.smart_modules.len());
67 let mut state = engine.new_state(self.store_limiter);
68 for (config, bytes) in self.smart_modules {
69 let module = Module::new(&engine.0, bytes)?;
70 let version = config.version();
71 let ctx = SmartModuleInstanceContext::instantiate(
72 &mut state,
73 module,
74 config.params,
75 version,
76 config.lookback,
77 )?;
78 let init = SmartModuleInit::try_instantiate(&ctx, &mut state)?;
79 let look_back = SmartModuleLookBack::try_instantiate(&ctx, &mut state)?;
80 let transform = create_transform(&ctx, config.initial_data, &mut state)?;
81 let mut instance = SmartModuleInstance::new(ctx, init, look_back, transform, version);
82
83 instance.call_init(&mut state)?;
84 instances.push(instance);
85 }
86
87 Ok(SmartModuleChainInstance {
88 store: state,
89 instances,
90 })
91 }
92}
93
94impl Default for SmartModuleChainBuilder {
95 fn default() -> Self {
96 let mut store_limiter = StoreResourceLimiter::default();
97 store_limiter.set_memory_size(DEFAULT_STORE_MEMORY_LIMIT);
98 Self {
99 smart_modules: Default::default(),
100 store_limiter,
101 }
102 }
103}
104
105impl<T: Into<Vec<u8>>> From<(SmartModuleConfig, T)> for SmartModuleChainBuilder {
106 fn from(pair: (SmartModuleConfig, T)) -> Self {
107 let mut result = Self::default();
108 result.add_smart_module(pair.0, pair.1.into());
109 result
110 }
111}
112
113pub struct SmartModuleChainInstance {
115 store: WasmState,
116 instances: Vec<SmartModuleInstance>,
117}
118
119impl Debug for SmartModuleChainInstance {
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 write!(f, "SmartModuleChainInstance")
122 }
123}
124
125impl SmartModuleChainInstance {
126 #[cfg(test)]
127 pub(crate) fn instances(&self) -> &Vec<SmartModuleInstance> {
128 &self.instances
129 }
130
131 pub fn process(
136 &mut self,
137 input: SmartModuleInput,
138 metric: &SmartModuleChainMetrics,
139 ) -> Result<SmartModuleOutput> {
140 let raw_len = input.raw_bytes().len();
141 debug!(raw_len, "sm raw input");
142 metric.add_bytes_in(raw_len as u64);
143
144 let base_offset = input.base_offset();
145 let base_timestamp = input.base_timestamp();
146
147 if let Some((last, instances)) = self.instances.split_last_mut() {
148 let mut next_input = input;
149
150 for instance in instances {
151 let time = std::time::Instant::now();
154 self.store.top_up_fuel();
155 let output = instance.process(next_input, &mut self.store)?;
156 let fuel_used = self.store.get_used_fuel();
157 debug!(fuel_used, "fuel used");
158 metric.add_fuel_used(fuel_used, time.elapsed());
159
160 if let Some(ref smerr) = output.error {
161 tracing::error!(err=?smerr);
163 return Ok(output);
164 } else {
165 next_input =
166 SmartModuleInput::try_from_records(output.successes, instance.version())?;
167 next_input.set_base_offset(base_offset);
168 next_input.set_base_timestamp(base_timestamp);
169 }
170 }
171
172 let time = std::time::Instant::now();
173 self.store.top_up_fuel();
174 let output = last.process(next_input, &mut self.store)?;
175 if let Some(ref smerr) = output.error {
176 tracing::error!(err=?smerr);
177 }
178 let fuel_used = self.store.get_used_fuel();
179 debug!(fuel_used, "fuel used");
180 metric.add_fuel_used(fuel_used, time.elapsed());
181 let records_out = output.successes.len();
182 metric.add_records_out(records_out as u64);
183 debug!(records_out, "sm records out");
184 Ok(output)
185 } else {
186 #[allow(deprecated)]
187 let records = input.try_into_records(DEFAULT_SMARTENGINE_VERSION)?;
188
189 Ok(SmartModuleOutput::new(records))
190 }
191 }
192
193 pub async fn look_back<F, R>(
194 &mut self,
195 read_fn: F,
196 metrics: &SmartModuleChainMetrics,
197 ) -> Result<()>
198 where
199 R: Future<Output = Result<Vec<Record>>>,
200 F: Fn(Lookback) -> R,
201 {
202 debug!("look_back on chain with {} instances", self.instances.len());
203
204 for instance in self.instances.iter_mut() {
205 if let Some(lookback) = instance.lookback() {
206 debug!("look_back on instance");
207 let records: Vec<Record> = read_fn(lookback).await?;
208 let input: SmartModuleInput =
209 SmartModuleInput::try_from_records(records, instance.version())?;
210
211 let time = std::time::Instant::now();
212 metrics.add_bytes_in(input.raw_bytes().len() as u64);
213 self.store.top_up_fuel();
214
215 let result = instance.call_look_back(input, &mut self.store);
216 let fuel_used = self.store.get_used_fuel();
217
218 debug!(fuel_used, "fuel used");
219 metrics.add_fuel_used(fuel_used, time.elapsed());
220 result?;
221 }
222 }
223
224 Ok(())
225 }
226}
227
228#[cfg(test)]
229mod test {
230
231 use crate::SmartModuleConfig;
232
233 #[test]
234 fn test_param() {
235 let config = SmartModuleConfig::builder()
236 .param("key", "apple")
237 .build()
238 .unwrap();
239
240 assert_eq!(config.params.get("key"), Some(&"apple".to_string()));
241 }
242}
243
244#[cfg(test)]
245mod chaining_test {
246
247 use fluvio_protocol::record::Record;
248 use fluvio_protocol::link::smartmodule::SmartModuleLookbackRuntimeError;
249 use fluvio_smartmodule::dataplane::smartmodule::SmartModuleInput;
250
251 use crate::engine::error::EngineError;
252 use crate::engine::config::{Lookback, DEFAULT_SMARTENGINE_VERSION};
253
254 use super::super::{
255 SmartEngine, SmartModuleChainBuilder, SmartModuleConfig, SmartModuleInitialData,
256 metrics::SmartModuleChainMetrics,
257 };
258
259 const SM_FILTER_INIT: &str = "fluvio_smartmodule_filter_init";
260 const SM_MAP: &str = "fluvio_smartmodule_map";
261 const SM_FILTER_LOOK_BACK: &str = "fluvio_smartmodule_filter_lookback";
262
263 use super::super::fixture::read_wasm_module;
264
265 #[ignore]
266 #[test]
267 fn test_chain_filter_map() {
268 let engine = SmartEngine::new();
269 let mut chain_builder = SmartModuleChainBuilder::default();
270 let metrics = SmartModuleChainMetrics::default();
271
272 chain_builder.add_smart_module(
273 SmartModuleConfig::builder()
274 .param("key", "a")
275 .build()
276 .unwrap(),
277 read_wasm_module(SM_FILTER_INIT),
278 );
279
280 chain_builder.add_smart_module(
281 SmartModuleConfig::builder().build().unwrap(),
282 read_wasm_module(SM_MAP),
283 );
284
285 let mut chain = chain_builder
286 .initialize(&engine)
287 .expect("failed to build chain");
288 assert_eq!(chain.instances().len(), 2);
289
290 let input = vec![Record::new("hello world")];
291 let output = chain
292 .process(
293 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
294 .expect("input"),
295 &metrics,
296 )
297 .expect("process");
298 assert_eq!(output.successes.len(), 0); let input = vec![
301 Record::new("apple"),
302 Record::new("fruit"),
303 Record::new("banana"),
304 ];
305 let output = chain
306 .process(
307 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
308 .expect("input"),
309 &metrics,
310 )
311 .expect("process");
312 assert_eq!(output.successes.len(), 2); assert_eq!(output.successes[0].value.as_ref(), b"APPLE");
314 assert_eq!(output.successes[1].value.as_ref(), b"BANANA");
315 assert!(metrics.fuel_used() > 0);
316 chain.store.top_up_fuel();
317 assert_eq!(chain.store.get_used_fuel(), 0);
318 }
319
320 const SM_AGGEGRATE: &str = "fluvio_smartmodule_aggregate";
321
322 #[ignore]
323 #[test]
324 fn test_chain_filter_aggregate() {
325 let engine = SmartEngine::new();
326 let mut chain_builder = SmartModuleChainBuilder::default();
327 let metrics = SmartModuleChainMetrics::default();
328
329 chain_builder.add_smart_module(
330 SmartModuleConfig::builder()
331 .param("key", "a")
332 .build()
333 .unwrap(),
334 read_wasm_module(SM_FILTER_INIT),
335 );
336
337 chain_builder.add_smart_module(
338 SmartModuleConfig::builder()
339 .initial_data(SmartModuleInitialData::with_aggregate(
340 "zero".to_string().as_bytes().to_vec(),
341 ))
342 .build()
343 .unwrap(),
344 read_wasm_module(SM_AGGEGRATE),
345 );
346
347 let mut chain = chain_builder
348 .initialize(&engine)
349 .expect("failed to build chain");
350 assert_eq!(chain.instances().len(), 2);
351
352 let input = vec![
353 Record::new("apple"),
354 Record::new("fruit"),
355 Record::new("banana"),
356 ];
357 let output = chain
358 .process(
359 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
360 .expect("input"),
361 &metrics,
362 )
363 .expect("process");
364 assert_eq!(output.successes.len(), 2); assert_eq!(output.successes[0].value().to_string(), "zeroapple");
366 assert_eq!(output.successes[1].value().to_string(), "zeroapplebanana");
367
368 let input = vec![Record::new("nothing")];
369 let output = chain
370 .process(
371 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
372 .expect("input"),
373 &metrics,
374 )
375 .expect("process");
376 assert_eq!(output.successes.len(), 0); let input = vec![Record::new("elephant")];
379 let output = chain
380 .process(
381 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
382 .expect("input"),
383 &metrics,
384 )
385 .expect("process");
386 assert_eq!(output.successes.len(), 1); assert_eq!(
388 output.successes[0].value().to_string(),
389 "zeroapplebananaelephant"
390 );
391 }
392
393 #[ignore]
394 #[test]
395 fn test_chain_filter_look_back() {
396 let engine = SmartEngine::new();
398 let mut chain_builder = SmartModuleChainBuilder::default();
399 let metrics = SmartModuleChainMetrics::default();
400
401 chain_builder.add_smart_module(
402 SmartModuleConfig::builder()
403 .lookback(Some(Lookback::Last(1)))
404 .build()
405 .unwrap(),
406 read_wasm_module(SM_FILTER_LOOK_BACK),
407 );
408
409 let mut chain = chain_builder
410 .initialize(&engine)
411 .expect("failed to build chain");
412
413 fluvio_future::task::run_block_on(chain.look_back(
415 |lookback| {
416 assert_eq!(lookback, Lookback::Last(1));
417 async { Ok(vec![Record::new("2")]) }
418 },
419 &metrics,
420 ))
421 .expect("chain look_back");
422
423 let input = vec![Record::new("1"), Record::new("2"), Record::new("3")];
425 let output = chain
426 .process(
427 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION)
428 .expect("input"),
429 &metrics,
430 )
431 .expect("process");
432 assert_eq!(output.successes.len(), 1); assert_eq!(output.successes[0].value().to_string(), "3");
434 assert!(metrics.fuel_used() > 0);
435 assert_eq!(metrics.invocation_count(), 2);
436 }
437
438 #[ignore]
439 #[test]
440 fn test_chain_filter_look_back_error_propagated() {
441 let engine = SmartEngine::new();
443 let mut chain_builder = SmartModuleChainBuilder::default();
444 let metrics = SmartModuleChainMetrics::default();
445
446 chain_builder.add_smart_module(
447 SmartModuleConfig::builder()
448 .lookback(Some(Lookback::Last(1)))
449 .build()
450 .unwrap(),
451 read_wasm_module(SM_FILTER_LOOK_BACK),
452 );
453
454 let mut chain = chain_builder
455 .initialize(&engine)
456 .expect("failed to build chain");
457
458 let res = fluvio_future::task::run_block_on(chain.look_back(
460 |lookback| {
461 assert_eq!(lookback, Lookback::Last(1));
462 async { Ok(vec![Record::new("wrong str")]) }
463 },
464 &metrics,
465 ));
466
467 assert!(res.is_err());
469 assert_eq!(
470 res.unwrap_err()
471 .downcast::<SmartModuleLookbackRuntimeError>()
472 .expect("downcasted"),
473 SmartModuleLookbackRuntimeError {
474 hint: "invalid digit found in string".to_string(),
475 offset: 0,
476 record_key: None,
477 record_value: "wrong str".to_string().into()
478 }
479 );
480 assert!(metrics.fuel_used() > 0);
481 assert_eq!(metrics.invocation_count(), 1);
482 }
483
484 #[test]
485 fn test_empty_chain() {
486 let engine = SmartEngine::new();
488 let chain_builder = SmartModuleChainBuilder::default();
489 let mut chain = chain_builder
490 .initialize(&engine)
491 .expect("failed to build chain");
492
493 assert_eq!(chain.store.get_used_fuel(), 0);
494
495 let record = vec![Record::new("input")];
496 let input = SmartModuleInput::try_from_records(record, DEFAULT_SMARTENGINE_VERSION)
497 .expect("valid input record");
498 let metrics = SmartModuleChainMetrics::default();
499 let output = chain.process(input, &metrics).expect("process failed");
501
502 assert_eq!(output.successes.len(), 1);
504 assert_eq!(output.successes[0].value().to_string(), "input");
505 }
506
507 #[ignore]
508 #[test]
509 fn test_unsufficient_memory_to_instantiate() {
510 let engine = SmartEngine::new();
512 let mut chain_builder = SmartModuleChainBuilder::default();
513 let max_memory = 1_000; chain_builder.add_smart_module(
516 SmartModuleConfig::builder()
517 .lookback(Some(Lookback::Last(1)))
518 .build()
519 .unwrap(),
520 read_wasm_module(SM_FILTER_LOOK_BACK),
521 );
522 chain_builder.set_store_memory_limit(max_memory);
523
524 let res = chain_builder.initialize(&engine);
526
527 assert!(res.is_err());
529 let err = res
530 .unwrap_err()
531 .downcast::<EngineError>()
532 .expect("EngineError expected");
533 assert!(matches!(
534 err,
535 EngineError::StoreMemoryExceeded {
536 current: _,
537 requested: _,
538 max
539 }
540 if max == max_memory
541 ))
542 }
543
544 #[ignore]
545 #[test]
546 fn test_look_back_unsufficient_memory() {
547 let engine = SmartEngine::new();
549 let mut chain_builder = SmartModuleChainBuilder::default();
550 let metrics = SmartModuleChainMetrics::default();
551 let max_memory = 1_000_000 * 2; chain_builder.add_smart_module(
554 SmartModuleConfig::builder()
555 .lookback(Some(Lookback::Last(1000)))
556 .build()
557 .unwrap(),
558 read_wasm_module(SM_FILTER_LOOK_BACK),
559 );
560 chain_builder.set_store_memory_limit(max_memory);
561
562 let mut chain = chain_builder
563 .initialize(&engine)
564 .expect("failed to build chain");
565
566 let res = fluvio_future::task::run_block_on(chain.look_back(
568 |_| {
569 let res = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
570 async { Ok(res) }
571 },
572 &metrics,
573 ));
574
575 assert!(res.is_err());
577 let err = res
578 .unwrap_err()
579 .downcast::<EngineError>()
580 .expect("EngineError expected");
581 assert!(matches!(
582 err,
583 EngineError::StoreMemoryExceeded {
584 current: _,
585 requested: _,
586 max
587 }
588 if max == max_memory
589 ))
590 }
591
592 #[ignore]
593 #[test]
594 fn test_process_unsufficient_memory() {
595 let engine = SmartEngine::new();
597 let mut chain_builder = SmartModuleChainBuilder::default();
598 let metrics = SmartModuleChainMetrics::default();
599 let max_memory = 1_000_000 * 2; chain_builder.add_smart_module(
602 SmartModuleConfig::builder().build().unwrap(),
603 read_wasm_module(SM_FILTER_LOOK_BACK),
604 );
605 chain_builder.set_store_memory_limit(max_memory);
606
607 let mut chain = chain_builder
608 .initialize(&engine)
609 .expect("failed to build chain");
610
611 let input: Vec<Record> = (0..1000).map(|_| Record::new([0u8; 1_000])).collect();
613 let res = chain.process(
614 SmartModuleInput::try_from_records(input, DEFAULT_SMARTENGINE_VERSION).expect("input"),
615 &metrics,
616 );
617
618 assert!(res.is_err());
620 let err = res
621 .unwrap_err()
622 .downcast::<EngineError>()
623 .expect("EngineError expected");
624 assert!(matches!(
625 err,
626 EngineError::StoreMemoryExceeded {
627 current: _,
628 requested: _,
629 max
630 }
631 if max == max_memory
632 ))
633 }
634}