1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
use std::fs::*;
use std::io::{self};
use std::path::{Path, PathBuf};
use crate::header::Header;
use crate::state::{QueueState, QueueStatePersistence};
use crate::sync::{FileGuard, SyncFollower};
use crate::version::check_queue_version;
use super::try_acquire_recv_lock;
use super::{segment_filename, HEADER_EOF};
/// An [`Iterator`] that iterates over the elements of the queue, until it hts
/// the end for the first time. Use this structure instead of
/// [`crate::Receiver`] if you just need to read the data stored in a queue.
///
/// Three good reasons for this are:
///
/// 1. The API is synchronous, since there is no need to wait for new elements.
/// 2. There is no transactional mechanism involved, since there is no need for
/// one and, because of this,
/// 3. Elements are not buffered in memory, as opposed to what
/// [`crate::Receiver::recv_batch`] does.
///
/// And you also get some extra percents of performance from a simpler
/// implementation. Don't pay for what you don't use!
pub struct QueueIter {
_file_guard: FileGuard,
base: PathBuf,
state: QueueState,
sync_follower: SyncFollower,
}
impl QueueIter {
/// Opens a queue for reading. The access will be exclusive, based on the
/// existence of the temporary file `recv.lock` inside the queue folder.
///
/// # Errors
///
/// This function will return an IO error if the queue is already in use for
/// receiving, which is indicated by a lock file. Also, any other IO error
/// encountered while opening will be sent.
///
/// # Panics
///
/// This function will panic if it is not able to set up the notification
/// handler to watch for file changes.
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<QueueIter> {
// Guarantee that the queue exists:
create_dir_all(base.as_ref())?;
log::trace!("created queue directory");
// Versioning stuff (this should be lightning-fast. Therefore, shameless block):
check_queue_version(base.as_ref())?;
// Acquire guard and state:
let file_guard = try_acquire_recv_lock(base.as_ref())?;
let mut persistence = QueueStatePersistence::new();
let state = persistence.open(base.as_ref())?;
log::trace!("receiver lock acquired. Iter state now is {:?}", state);
// Put the needle on the groove (oh! the 70's):
let mut sync_follower = SyncFollower::open(segment_filename(base.as_ref(), state.segment))?;
sync_follower.seek(io::SeekFrom::Start(state.position))?;
log::trace!("last segment opened fo reading");
Ok(QueueIter {
_file_guard: file_guard,
state,
base: PathBuf::from(base.as_ref()),
sync_follower,
})
}
/// Puts the queue in another position in another segment. This forcibly
/// discards the old tail follower and fetches a fresh new one, so be
/// careful.
fn advance_segment(&mut self) -> io::Result<()> {
let current_segment = self.state.segment;
self.state.advance_segment();
let next_segment = self.state.segment;
log::debug!(
"advanced segment from {:?} to {:?}",
current_segment,
next_segment
);
log::debug!("opening segment {}", next_segment);
self.sync_follower = SyncFollower::open(segment_filename(&self.base, next_segment))?;
Ok(())
}
/// Reads the header. This operation is atomic.
fn read_header(&mut self) -> io::Result<Header> {
// Read header:
let mut header = [0; 4];
self.sync_follower.read_exact(&mut header)?;
// If the header is EOF, advance segment:
if header == HEADER_EOF {
log::trace!("got EOF header. Advancing...");
self.advance_segment()?;
// Re-read the header:
log::trace!("re-reading new header from new file");
self.sync_follower.read_exact(&mut header)?;
}
// Now, you set the header!
let decoded = Header::decode(header);
log::trace!("got header {:?} (read {} bytes)", header, decoded.len());
Ok(decoded)
}
/// Reads one element from the queue.
fn read_one(&mut self) -> io::Result<Vec<u8>> {
// Get the length:
let header = self.read_header()?;
// With the length, read the data:
let mut data = vec![0; header.len() as usize];
self.sync_follower
.read_exact(&mut data)
.expect("poisoned queue");
Ok(data)
}
}
impl Iterator for QueueIter {
type Item = io::Result<Vec<u8>>;
fn next(&mut self) -> Option<io::Result<Vec<u8>>> {
match self.read_one() {
Ok(item) => Some(Ok(item)),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
log::trace!("got interrupted by eof");
None
}
Err(err) => Some(Err(err)),
}
}
}