Function desync::pipe::pipe

source ·
pub fn pipe<Core, S, Output, OutputErr, ProcessFn>(
    desync: Arc<Desync<Core>>,
    stream: S,
    process: ProcessFn
) -> PipeStream<Output, OutputErr>where
    Core: 'static + Send,
    S: 'static + Send + Stream,
    S::Item: Send,
    S::Error: Send,
    Output: 'static + Send,
    OutputErr: 'static + Send,
    ProcessFn: 'static + Send + FnMut(&mut Core, Result<S::Item, S::Error>) -> Result<Output, OutputErr>,
Expand description

Pipes a stream into this object. Whenever an item becomes available on the stream, the processing function is called asynchronously with the item that was received. The return value is placed onto the output stream.

Unlike pipe_in, this keeps a strong reference to the Desync object so the processing will continue so long as the input stream has data and the output stream is not dropped.

The input stream will start executing and reading values immediately when this is called. Dropping the output stream will cause the pipe to be closed (the input stream will be dropped and no further processing will occur).

This example demonstrates how to create a simple demonstration pipe that takes hashset values and returns a stream indicating whether or not they were already included:

use futures::sync::mpsc;
use futures::executor;
 
let desync_hashset      = Arc::new(Desync::new(HashSet::new()));
let (sender, receiver)  = mpsc::channel::<String>(5);
 
let value_inserted = pipe(Arc::clone(&desync_hashset), receiver, 
    |hashset, value| { value.map(|value| (value.clone(), hashset.insert(value))) });
 
let mut sender = executor::spawn(sender);
sender.wait_send("Test".to_string());
sender.wait_send("Another value".to_string());
sender.wait_send("Test".to_string());
 
let mut value_inserted = executor::spawn(value_inserted);
assert!(value_inserted.wait_stream() == Some(Ok(("Test".to_string(), true))));
assert!(value_inserted.wait_stream() == Some(Ok(("Another value".to_string(), true))));
assert!(value_inserted.wait_stream() == Some(Ok(("Test".to_string(), false))));