use std::fs::*;
use std::io::{self};
use std::path::{Path, PathBuf};
use crate::header::Header;
use crate::state::{QueueState, QueueStatePersistence};
use crate::sync::{FileGuard, SyncFollower};
use crate::version::check_queue_version;
use super::try_acquire_recv_lock;
use super::{segment_filename, HEADER_EOF};
pub struct QueueIter {
_file_guard: FileGuard,
base: PathBuf,
state: QueueState,
sync_follower: SyncFollower,
}
impl QueueIter {
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<QueueIter> {
create_dir_all(base.as_ref())?;
log::trace!("created queue directory");
check_queue_version(base.as_ref())?;
let file_guard = try_acquire_recv_lock(base.as_ref())?;
let mut persistence = QueueStatePersistence::new();
let state = persistence.open(base.as_ref())?;
log::trace!("receiver lock acquired. Iter state now is {:?}", state);
let mut sync_follower = SyncFollower::open(segment_filename(base.as_ref(), state.segment))?;
sync_follower.seek(io::SeekFrom::Start(state.position))?;
log::trace!("last segment opened fo reading");
Ok(QueueIter {
_file_guard: file_guard,
state,
base: PathBuf::from(base.as_ref()),
sync_follower,
})
}
fn advance_segment(&mut self) -> io::Result<()> {
let current_segment = self.state.segment;
self.state.advance_segment();
let next_segment = self.state.segment;
log::debug!(
"advanced segment from {:?} to {:?}",
current_segment,
next_segment
);
log::debug!("opening segment {}", next_segment);
self.sync_follower = SyncFollower::open(segment_filename(&self.base, next_segment))?;
Ok(())
}
fn read_header(&mut self) -> io::Result<Header> {
let mut header = [0; 4];
self.sync_follower.read_exact(&mut header)?;
if header == HEADER_EOF {
log::trace!("got EOF header. Advancing...");
self.advance_segment()?;
log::trace!("re-reading new header from new file");
self.sync_follower.read_exact(&mut header)?;
}
let decoded = Header::decode(header);
log::trace!("got header {:?} (read {} bytes)", header, decoded.len());
Ok(decoded)
}
fn read_one(&mut self) -> io::Result<Vec<u8>> {
let header = self.read_header()?;
let mut data = vec![0; header.len() as usize];
self.sync_follower
.read_exact(&mut data)
.expect("poisoned queue");
Ok(data)
}
}
impl Iterator for QueueIter {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
match self.read_one() {
Ok(item) => Some(Ok(item)),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
log::trace!("got interrupted by eof");
None
}
Err(err) => Some(Err(err)),
}
}
}