pub fn new_lambda_channel<T: Send + 'static, U: Send + 'static, V: Clone + Send + 'static>(
input_capacity: Option<usize>,
output_capacity: Option<usize>,
shared_resource: V,
function: fn(&V, T) -> U,
) -> (Sender<T>, Receiver<U>, ThreadPool)Expand description
Creates two normal channels and connects them to a new ThreadPool to create a
multi-producer multi-consumer multi-threaded lambda-channel.
ยงExamples
use crossbeam_channel::RecvError;
use lambda_channel::new_lambda_channel;
fn fib(_: &Option<()>, n: i32) -> u64 {
if n <= 1 {
n as u64
} else {
fib(&None, n - 1) + fib(&None, n - 2)
}
}
let (s, r, _p) = new_lambda_channel(None, None, None, fib);
s.send(20).unwrap();
assert_eq!(r.recv(), Ok(6765));
s.send(10).unwrap();
drop(s);
assert_eq!(r.recv(), Ok(55));
assert_eq!(r.recv(), Err(RecvError));Examples found in repository?
examples/channel_example.rs (line 55)
41fn main() {
42 let clock = quanta::Clock::new();
43 let start = clock.now();
44
45 let mut map: HashMap<char, AtomicU64> = HashMap::new();
46 let mut all_alphanumeric: Vec<char> = Vec::new();
47 all_alphanumeric.extend('0'..='9');
48 all_alphanumeric.extend('a'..='z');
49 all_alphanumeric.extend('A'..='Z');
50 for char in all_alphanumeric {
51 map.insert(char, AtomicU64::new(0));
52 }
53 let char_counts = Arc::new(map);
54
55 let (tx, rx, thread_pool) = new_lambda_channel(None, None, char_counts.clone(), process_file);
56 thread_pool.set_pool_size(4).unwrap();
57
58 let files = vec![
59 "./a.txt", "./b.txt", "./c.txt", "./d.txt", "./e.txt", "./f.txt",
60 ];
61
62 thread::spawn(move || {
63 for file in files {
64 tx.send(file).unwrap();
65 }
66 });
67
68 while let Ok(msg) = rx.recv() {
69 if let Err(e) = msg {
70 println!("Failed to open file: {}", e);
71 }
72 }
73
74 let mut total_counts: HashMap<char, u64> = HashMap::new();
75 for (k, v) in char_counts.iter() {
76 total_counts.insert(*k, v.load(Ordering::Relaxed));
77 }
78
79 println!("Execution Time: {:?}", start.elapsed());
80 println!("{:?}", total_counts);
81}