#![allow(dead_code)]
#![feature(min_const_generics)]
mod consumer;
mod hashmap;
mod producer;
mod stack;
use consumer::Consumer;
use hashmap::*;
use crndm::default::*;
use producer::Producer;
use stack::*;
use std::env;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::thread;
type P = BuddyAlloc;
fn help() {
println!("usage: grep [OPTIONS] list-file");
println!();
println!("OPTIONS:");
println!(" -p num Search pattern (Default '\\w+')");
println!(" -r num Number of reader threads (Default 1)");
println!(" -c num Number of consumer threads (Default 1)");
println!(" -f file Pool filename (Default ./wc.pool)");
println!(" -C Continue from the previous run");
println!(" -P Prepare only (do not run threads)");
println!(" -h Display help");
println!();
println!("The input list-file should contain a list files to read and count words.");
}
fn main() {
let args: Vec<std::string::String> = env::args().collect();
if args.len() < 2 {
println!("usage: {} [OPTIONS] filename", args[0]);
return;
}
let mut r = 1;
let mut c = 1;
let mut pool = "wc.pool".to_string();
let mut filename = std::string::String::new();
let mut i = 1;
let mut cont = false;
let mut prep = false;
let mut pattern = "(\\w+)".to_string();
while i < args.len() {
let s = &args[i];
if s == "-h" {
help();
return;
} else if s == "-p" {
if i == args.len() - 1 {
panic!("-p requires an argument");
}
i += 1;
pattern = format!("({})", args[i]);
} else if s == "-r" {
if i == args.len() - 1 {
panic!("-r requires an argument");
}
i += 1;
r = args[i].parse().expect("An integer expected");
if r < 1 {
panic!("Number of reader threads cannot be less than 1");
}
} else if s == "-c" {
if i == args.len() - 1 {
panic!("-c requires an argument");
}
i += 1;
c = args[i].parse().expect("An integer expected");
if c < 1 {
panic!("Number of consumer threads cannot be less than 1");
}
} else if s == "-f" {
if i == args.len() - 1 {
panic!("-f requires an argument");
}
i += 1;
pool = args[i].clone();
} else if s == "-C" {
cont = true;
} else if s == "-P" {
prep = true;
} else if filename.is_empty() {
filename = s.clone();
} else {
panic!(format!("Unknown option `{}'", s));
}
i += 1;
}
struct Root {
lines: Parc<PMutex<Stack<PString>>>,
words: Parc<PMutex<HashMap<PString, u64>>>,
producers: PRefCell<PVec<Parc<Producer>>>,
consumers: PRefCell<PVec<Parc<Consumer>>>,
}
impl RootObj<P> for Root {
fn init(j: &Journal) -> Self {
Root {
lines: Parc::new(PMutex::new(Stack::new(), j), j),
words: Parc::new(PMutex::new(HashMap::new(j), j), j),
producers: PRefCell::new(PVec::new(j), j),
consumers: PRefCell::new(PVec::new(j), j),
}
}
}
let root = P::open::<Root>(&pool, O_CFNE | O_1GB).unwrap();
P::transaction(|j| {
let mut producers = root.producers.borrow_mut(j);
let mut consumers = root.consumers.borrow_mut(j);
if !cont {
producers.clear();
consumers.clear();
root.lines.lock(j).clear();
root.words.lock(j).clear(j);
let mut files = vec![];
let f = BufReader::new(
File::open(&filename).expect(&format!("cannot open `{}`", &filename)),
);
for line in f.lines() {
files.push(line.unwrap());
}
let p = usize::min(r, files.len());
let b = files.len() / p;
for i in 0..p + 1 {
if i * b < files.len() {
producers.push(
Parc::new(
Producer::new(
files[i * b..usize::min(files.len(), (i + 1) * b)].to_vec(),
root.lines.pclone(j),
j,
),
j,
),
j,
);
}
}
for _ in 0..c {
consumers.push(
Parc::new(
Consumer::new(&pattern, root.lines.pclone(j), root.words.pclone(j), j),
j,
),
j,
);
}
}
})
.unwrap();
eprintln!(
"Total remaining from previous run: {} ",
P::transaction(|j| root.lines.lock(j).len()).unwrap()
);
if !prep {
let producers = root.producers.borrow();
let consumers = root.consumers.borrow();
let mut p_threads = vec![];
let mut c_threads = vec![];
for i in 0..producers.len() {
let producer = producers[i].volatile();
p_threads.push(thread::spawn(move || Producer::start(producer)))
}
for i in 0..consumers.len() {
let consumer = consumers[i].volatile();
c_threads.push(thread::spawn(move || Consumer::start(consumer)))
}
for thread in p_threads {
thread.join().unwrap()
}
let consumers = root.consumers.borrow();
for consumer in &*consumers {
consumer.stop_when_finished();
}
for thread in c_threads {
thread.join().unwrap()
}
P::transaction(|j| {
let words = root.words.lock(j);
let mut vec = vec![];
words.foreach(|word, freq| {
vec.push((word.to_string(), freq.clone()));
});
vec.sort_by(|x, y| x.0.cmp(&y.0));
for (word, freq) in vec {
println!("{:>32}: {}", word, freq);
}
})
.unwrap();
}
println!("Memory usage = {} bytes", P::used());
}