channel_example/
channel_example.rs1use std::collections::HashMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::{char, thread};
8
9use lambda_channel::new_lambda_channel;
10
11fn process_file(
12 total_counter: &Arc<HashMap<char, AtomicU64>>,
13 filepath: &str,
14) -> Result<(), String> {
15 let path = Path::new(filepath);
16 if let Ok(file) = File::open(path) {
17 let lines = BufReader::new(file).lines();
18
19 let mut local_counter: HashMap<char, u64> = HashMap::new();
20
21 for line in lines.map_while(Result::ok) {
22 for c in line.chars() {
23 if c.is_ascii_alphanumeric() {
24 *local_counter.entry(c).or_insert(0) += 1;
25 }
26 }
27 }
28
29 for (k, v) in local_counter {
30 if let Some(atomic_v) = total_counter.get(&k) {
31 atomic_v.fetch_add(v, Ordering::Relaxed);
32 }
33 }
34
35 Ok(())
36 } else {
37 Err(filepath.to_string())
38 }
39}
40
41fn main() {
42 let clock = quanta::Clock::new();
43 let start = clock.now();
44
45 let mut map: HashMap<char, AtomicU64> = HashMap::new();
46 let mut all_alphanumeric: Vec<char> = Vec::new();
47 all_alphanumeric.extend('0'..='9');
48 all_alphanumeric.extend('a'..='z');
49 all_alphanumeric.extend('A'..='Z');
50 for char in all_alphanumeric {
51 map.insert(char, AtomicU64::new(0));
52 }
53 let char_counts = Arc::new(map);
54
55 let (tx, rx, thread_pool) = new_lambda_channel(None, None, char_counts.clone(), process_file);
56 thread_pool.set_pool_size(4).unwrap();
57
58 let files = vec![
59 "./a.txt", "./b.txt", "./c.txt", "./d.txt", "./e.txt", "./f.txt",
60 ];
61
62 thread::spawn(move || {
63 for file in files {
64 tx.send(file).unwrap();
65 }
66 });
67
68 while let Ok(msg) = rx.recv() {
69 if let Err(e) = msg {
70 println!("Failed to open file: {}", e);
71 }
72 }
73
74 let mut total_counts: HashMap<char, u64> = HashMap::new();
75 for (k, v) in char_counts.iter() {
76 total_counts.insert(*k, v.load(Ordering::Relaxed));
77 }
78
79 println!("Execution Time: {:?}", start.elapsed());
80 println!("{:?}", total_counts);
81}