channel_example/
channel_example.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::{BufRead, BufReader};
4use std::path::Path;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7use std::{char, thread};
8
9use lambda_channel::new_lambda_channel;
10
11fn process_file(
12    total_counter: &Arc<HashMap<char, AtomicU64>>,
13    filepath: &str,
14) -> Result<(), String> {
15    let path = Path::new(filepath);
16    if let Ok(file) = File::open(path) {
17        let lines = BufReader::new(file).lines();
18
19        let mut local_counter: HashMap<char, u64> = HashMap::new();
20
21        for line in lines.map_while(Result::ok) {
22            for c in line.chars() {
23                if c.is_ascii_alphanumeric() {
24                    *local_counter.entry(c).or_insert(0) += 1;
25                }
26            }
27        }
28
29        for (k, v) in local_counter {
30            if let Some(atomic_v) = total_counter.get(&k) {
31                atomic_v.fetch_add(v, Ordering::Relaxed);
32            }
33        }
34
35        Ok(())
36    } else {
37        Err(filepath.to_string())
38    }
39}
40
41fn main() {
42    let clock = quanta::Clock::new();
43    let start = clock.now();
44
45    let mut map: HashMap<char, AtomicU64> = HashMap::new();
46    let mut all_alphanumeric: Vec<char> = Vec::new();
47    all_alphanumeric.extend('0'..='9');
48    all_alphanumeric.extend('a'..='z');
49    all_alphanumeric.extend('A'..='Z');
50    for char in all_alphanumeric {
51        map.insert(char, AtomicU64::new(0));
52    }
53    let char_counts = Arc::new(map);
54
55    let (tx, rx, thread_pool) = new_lambda_channel(None, None, char_counts.clone(), process_file);
56    thread_pool.set_pool_size(4).unwrap();
57
58    let files = vec![
59        "./a.txt", "./b.txt", "./c.txt", "./d.txt", "./e.txt", "./f.txt",
60    ];
61
62    thread::spawn(move || {
63        for file in files {
64            tx.send(file).unwrap();
65        }
66    });
67
68    while let Ok(msg) = rx.recv() {
69        if let Err(e) = msg {
70            println!("Failed to open file: {}", e);
71        }
72    }
73
74    let mut total_counts: HashMap<char, u64> = HashMap::new();
75    for (k, v) in char_counts.iter() {
76        total_counts.insert(*k, v.load(Ordering::Relaxed));
77    }
78
79    println!("Execution Time: {:?}", start.elapsed());
80    println!("{:?}", total_counts);
81}