use std::collections::HashSet;
use std::io::BufRead;
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use crossbeam_channel::Sender;
use quick_xml::Error as XmlError;
use crate::error::Error;
#[cfg(feature = "threading")]
#[derive(Debug, PartialEq, Eq)]
enum State {
Started,
Reading,
Finished,
}
#[cfg(feature = "threading")]
pub struct Producer<B> {
reader: Option<B>,
threads: usize,
s_text: Sender<Option<Result<Vec<u8>, Error>>>,
alive: Arc<AtomicBool>,
handle: Option<std::thread::JoinHandle<()>>,
}
impl<B: BufRead + Send + 'static> Producer<B> {
pub(super) fn new(
reader: B,
threads: usize,
s_text: Sender<Option<Result<Vec<u8>, Error>>>,
) -> Self {
Self {
reader: Some(reader),
s_text,
threads,
handle: None,
alive: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(&mut self) {
self.alive.store(true, Ordering::SeqCst);
let alive = self.alive.clone();
let threads = self.threads;
let s_text = self.s_text.clone();
let mut reader = self.reader.take().unwrap();
self.handle = Some(std::thread::spawn(move || {
let mut buffer = Vec::new();
let mut state = State::Started;
loop {
match state {
State::Started => match reader.read_until(b'>', &mut buffer) {
Ok(0) => {
state = State::Finished;
}
Ok(_) => {
let i = memchr::memrchr(b'<', &buffer).unwrap();
if buffer[i..].starts_with(b"<entry") {
state = State::Reading;
}
}
Err(e) => {
s_text.send(Some(Err(Error::from(e)))).ok();
state = State::Finished;
}
},
State::Reading => {
match reader.read_until(b'>', &mut buffer) {
Ok(_) if buffer.ends_with(&b"</entry>"[..]) => {
s_text.send(Some(Ok(buffer.as_slice().to_vec()))).ok();
state = State::Started;
buffer.clear();
}
Ok(0) => {
s_text
.send(Some(Err(Error::from(XmlError::UnexpectedEof(
String::from("entry"),
)))))
.ok();
state = State::Finished;
}
Err(e) => {
s_text.send(Some(Err(Error::from(e)))).ok();
state = State::Finished;
}
_ => (),
}
}
State::Finished => {
for _ in 0..threads {
s_text.send(None).ok();
}
alive.store(false, Ordering::SeqCst);
break;
}
}
}
}));
}
pub fn join(&mut self) -> std::thread::Result<()> {
if let Some(handle) = self.handle.take() {
handle.join()
} else {
Ok(())
}
}
pub fn is_alive(&self) -> bool {
self.alive.load(Ordering::SeqCst)
}
}