use crossbeam::channel;
use hyperminhash::Sketch;
use std::{
env, io, process,
sync::{Arc, Mutex, atomic},
thread,
};
type Synced<T> = Arc<Mutex<T>>;
type AsyncResult<T, E> = Result<Synced<T>, E>;
#[derive(Debug)]
struct AsyncSink<T, E> {
t: Vec<(Synced<T>, thread::JoinHandle<AsyncResult<T, E>>)>,
}
impl<T, E> AsyncSink<T, E>
where
T: Send + 'static,
E: Send + 'static,
{
fn new<U, V, I>(num_threads: usize, init: U, f: V) -> (channel::Sender<I>, Self)
where
U: Send + Fn() -> T + 'static,
V: Send + Clone + Fn(&mut T, I) -> Result<(), E> + 'static,
I: Send + 'static,
{
assert!(num_threads > 0);
let (s, recv) = channel::bounded(num_threads + 1);
let t = (0..num_threads)
.map(|_| {
let state = Arc::new(Mutex::new(init()));
let t_state = state.clone();
let t_recv = recv.clone();
let t_f = f.clone();
(
state,
thread::spawn(move || {
for item in t_recv {
t_f(&mut t_state.lock().unwrap(), item)?;
}
Ok(t_state)
}),
)
})
.collect();
(s, AsyncSink { t })
}
pub fn with_default<V, I>(f: V) -> (channel::Sender<I>, Self)
where
V: Send + Clone + Fn(&mut T, I) -> Result<(), E> + 'static,
T: Default,
I: Send + 'static,
{
Self::new(8, Default::default, f)
}
pub fn join(self) -> thread::Result<Result<Vec<T>, E>> {
self.t
.into_iter()
.map(|t| t.1.join())
.map(|r| {
r.map(|sk| match sk {
Ok(sk) => match Arc::try_unwrap(sk) {
Ok(sk) => Ok(sk.into_inner().unwrap()),
Err(_) => unreachable!(),
},
Err(e) => Err(e),
})
})
.collect()
}
pub fn inspect<A, B>(&self, state: B, f: A) -> B
where
A: Fn(B, &T) -> B,
{
self.t
.iter()
.fold(state, |st, t| f(st, &t.0.lock().unwrap()))
}
}
#[derive(Debug)]
struct LineBuf<B> {
inner: B,
}
impl<B: LineBuffered> Iterator for LineBuf<B> {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
let mut buf = Vec::new();
match self.inner.read_lines(&mut buf) {
Ok(0) => None,
Ok(_) => Some(Ok(buf)),
Err(e) => Some(Err(e)),
}
}
}
trait LineBuffered: io::BufRead {
fn read_lines(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
let b = self.fill_buf()?;
if b.is_empty() {
return Ok(0);
}
let mut len = b.len();
buf.reserve(len + 128);
buf.extend_from_slice(b);
self.consume(len);
len += self.read_until(b'\n', buf)?;
Ok(len)
}
fn line_buffered(self) -> LineBuf<Self>
where
Self: Sized,
{
LineBuf { inner: self }
}
}
impl<T> LineBuffered for T where T: io::BufRead {}
pub fn byte_lines(inp: &[u8]) -> impl Iterator<Item = &[u8]> {
let mut inp = inp;
std::iter::from_fn(move || {
if inp.is_empty() {
return None;
}
let ending = memchr::memchr(b'\n', inp).unwrap_or(inp.len() - 1) + 1;
let (mut line, rest) = inp.split_at(ending);
inp = rest;
if let Some(b'\n') = line.last() {
line = &line[..line.len() - 1];
if let Some(b'\r') = line.last() {
line = &line[..line.len() - 1];
}
}
Some(line)
})
}
pub fn lines(inp: &str) -> impl Iterator<Item = &str> {
byte_lines(inp.as_bytes()).map(|sl| unsafe { std::str::from_utf8_unchecked(sl) })
}
fn main() -> io::Result<()> {
let fname = match env::args_os().nth(1) {
Some(fname) => fname,
None => {
eprintln!("Usage: cargo run --release --example parallel [FILENAME]");
process::exit(1);
}
};
let (sender, sink) = AsyncSink::with_default(|sk: &mut Sketch, items: Vec<u8>| {
String::from_utf8(items).map(|s| lines(&s).for_each(|l| sk.add_bytes(l.as_bytes())))
});
let shall_stop = Arc::new(atomic::AtomicBool::new(false));
let shall_stop_c = shall_stop.clone();
let printer = thread::spawn(move || {
use io::Write;
let mut stdout = io::stdout();
while !shall_stop_c.load(atomic::Ordering::Relaxed) {
let now = std::time::Instant::now();
if let Some(sk) = sink.inspect(None, |sk1, sk2| match sk1 {
None => Some(sk2.clone()),
Some(mut sk) => {
sk.union(&sk2);
Some(sk)
}
}) {
write!(stdout, "\rCurrent: {:.0}", sk.cardinality()).unwrap();
stdout.flush().unwrap();
}
if let Some(elapsed) = std::time::Duration::from_millis(100).checked_sub(now.elapsed())
{
thread::sleep(elapsed);
}
}
sink
});
let reader = io::BufReader::with_capacity(512 * 1024, std::fs::File::open(fname)?);
for chunk in reader.line_buffered() {
sender.send(chunk?).unwrap();
}
drop(sender);
shall_stop.store(true, atomic::Ordering::Relaxed);
let sink = printer.join().unwrap();
let final_sketch = sink
.join()
.expect("Another thread panicked")
.expect("utf decoding failed")
.into_iter()
.fold(None, |sk1, sk2| match sk1 {
None => Some(sk2),
Some(mut sk) => {
sk.union(&sk2);
Some(sk)
}
})
.unwrap();
println!("\nTotal: {:.0}", final_sketch.cardinality());
Ok(())
}