new_lambda_channel

Function new_lambda_channel 

Source
pub fn new_lambda_channel<T: Send + 'static, U: Send + 'static, V: Clone + Send + 'static>(
    input_capacity: Option<usize>,
    output_capacity: Option<usize>,
    shared_resource: V,
    function: fn(&V, T) -> U,
) -> (Sender<T>, Receiver<U>, ThreadPool)
Expand description

Creates two normal channels and connects them to a new ThreadPool to create a multi-producer multi-consumer multi-threaded lambda-channel.

ยงExamples

use crossbeam_channel::RecvError;
use lambda_channel::new_lambda_channel;

fn fib(_: &Option<()>, n: i32) -> u64 {
    if n <= 1 {
        n as u64
    } else {
        fib(&None, n - 1) + fib(&None, n - 2)
    }
}

let (s, r, _p) = new_lambda_channel(None, None, None, fib);
s.send(20).unwrap();
assert_eq!(r.recv(), Ok(6765));

s.send(10).unwrap();
drop(s);
assert_eq!(r.recv(), Ok(55));
assert_eq!(r.recv(), Err(RecvError));
Examples found in repository?
examples/channel_example.rs (line 55)
41fn main() {
42    let clock = quanta::Clock::new();
43    let start = clock.now();
44
45    let mut map: HashMap<char, AtomicU64> = HashMap::new();
46    let mut all_alphanumeric: Vec<char> = Vec::new();
47    all_alphanumeric.extend('0'..='9');
48    all_alphanumeric.extend('a'..='z');
49    all_alphanumeric.extend('A'..='Z');
50    for char in all_alphanumeric {
51        map.insert(char, AtomicU64::new(0));
52    }
53    let char_counts = Arc::new(map);
54
55    let (tx, rx, thread_pool) = new_lambda_channel(None, None, char_counts.clone(), process_file);
56    thread_pool.set_pool_size(4).unwrap();
57
58    let files = vec![
59        "./a.txt", "./b.txt", "./c.txt", "./d.txt", "./e.txt", "./f.txt",
60    ];
61
62    thread::spawn(move || {
63        for file in files {
64            tx.send(file).unwrap();
65        }
66    });
67
68    while let Ok(msg) = rx.recv() {
69        if let Err(e) = msg {
70            println!("Failed to open file: {}", e);
71        }
72    }
73
74    let mut total_counts: HashMap<char, u64> = HashMap::new();
75    for (k, v) in char_counts.iter() {
76        total_counts.insert(*k, v.load(Ordering::Relaxed));
77    }
78
79    println!("Execution Time: {:?}", start.elapsed());
80    println!("{:?}", total_counts);
81}