pub struct Transformer<I, O, E> { /* private fields */ }Expand description
An object that enables data mutation. After processing data a Transformer can yield three variants:
TransformerResult::Transformed- The data has been processed and can move along the rest of theRefluxnetwork.TransformerResult::NeedsMoreWork- The data still needs additional processing. The data is sent back into theTransformer.TransformerResult::Error- There was an error in processing the data. This error is simply logged tostderr.
Using a Transformer yields the following benefits:
- Mutation of data in a
Refluxnetwork.
§Example
#![feature(coroutines, coroutine_trait, stmt_expr_attributes)]
#![feature(unboxed_closures)]
use reflux::{Transformer, TransformerContext, TransformerResult};
use std::sync::{Arc, Mutex};
use std::cell::Cell;
use std::sync::atomic::{AtomicBool, Ordering};
use crossbeam_channel::{Receiver, Sender};
use reflux::add_routine;
use crossbeam_channel::unbounded;
use std::time::Duration;
use std::thread::sleep;
#[derive(Clone, Default)]
struct InnerContext {
inc_val: i32
}
let ctx = InnerContext {
inc_val: 1
};
let stop_flag = Arc::new(AtomicBool::new(false));
let (transformer, input, output, _): (Transformer<i32, String, String>, Sender<i32>, Receiver<String>, Receiver<String>) = Transformer::new(
add_routine!(#[coroutine] |input: Arc<Mutex<Cell<TransformerContext<i32, InnerContext>>>>| {
let data_cell = {
input.lock().unwrap().take()
};
let mut data = data_cell.data.unwrap();
while data < 5 {
data += data_cell.globals.inc_val;
yield TransformerResult::NeedsMoreWork(data);
}
yield TransformerResult::Completed(Some(format!("The number is {data}")));
}), None, stop_flag.clone(), ctx, None);
input.send(0).unwrap();
let result = output.recv().unwrap();
stop_flag.store(true, Ordering::Relaxed);
transformer.join().unwrap();
assert_eq!(result, "The number is 5".to_string())Implementations§
Source§impl<I, O, E> Transformer<I, O, E>
impl<I, O, E> Transformer<I, O, E>
Sourcepub fn new<Ctx, F, C>(
transform_fn: F,
pause_sig: Option<Arc<AtomicBool>>,
stop_sig: Arc<AtomicBool>,
context: Ctx,
data_limit: Option<usize>,
) -> (Self, Sender<I>, Receiver<O>, Receiver<E>)where
F: Fn() -> C + Send + 'static,
C: Coroutine<Arc<Mutex<Cell<TransformerContext<I, Ctx>>>>> + Send + 'static + Unpin,
Ctx: Send + 'static + Clone,
I: Send + 'static,
O: Send + 'static,
E: Send + 'static,
TransformerResult<I, O, E>: Send + 'static + From<<C as Coroutine<Arc<Mutex<Cell<TransformerContext<I, Ctx>>>>>>::Yield>,
pub fn new<Ctx, F, C>(
transform_fn: F,
pause_sig: Option<Arc<AtomicBool>>,
stop_sig: Arc<AtomicBool>,
context: Ctx,
data_limit: Option<usize>,
) -> (Self, Sender<I>, Receiver<O>, Receiver<E>)where
F: Fn() -> C + Send + 'static,
C: Coroutine<Arc<Mutex<Cell<TransformerContext<I, Ctx>>>>> + Send + 'static + Unpin,
Ctx: Send + 'static + Clone,
I: Send + 'static,
O: Send + 'static,
E: Send + 'static,
TransformerResult<I, O, E>: Send + 'static + From<<C as Coroutine<Arc<Mutex<Cell<TransformerContext<I, Ctx>>>>>>::Yield>,
Creates a new Transformer object.
§Parameters
transform_fn- A coroutine that will transform data. The use of theadd_routine!macro is necessary when passing in a coroutine.pause_sig- A flag to signal theTransformerobject to pause execution.stop_sig- A flag to signal theTransformerobject to terminate execution.context- An object of immutable values for thetransformer_fnto use during computation.
§Returns
A Transformer object.
Auto Trait Implementations§
impl<I, O, E> Freeze for Transformer<I, O, E>
impl<I, O, E> !RefUnwindSafe for Transformer<I, O, E>
impl<I, O, E> Send for Transformer<I, O, E>
impl<I, O, E> Sync for Transformer<I, O, E>
impl<I, O, E> Unpin for Transformer<I, O, E>
impl<I, O, E> !UnwindSafe for Transformer<I, O, E>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more