use futures::future;
use futures::FutureExt;
use std::collections::VecDeque;
use std::fs::*;
use std::future::Future;
use std::io::{self};
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use crate::error::TryRecvError;
use crate::header::Header;
use crate::state::QueueState;
use crate::state::QueueStatePersistence;
use crate::sync::{FileGuard, TailFollower};
use crate::version::check_queue_version;
use super::{segment_filename, HEADER_EOF};
pub(crate) fn recv_lock_filename<P: AsRef<Path>>(base: P) -> PathBuf {
base.as_ref().join("recv.lock")
}
pub(crate) fn try_acquire_recv_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
FileGuard::try_lock(recv_lock_filename(base.as_ref()))?.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!(
"queue `{}` receiver side already in use",
base.as_ref().to_string_lossy()
),
)
})
}
pub(crate) async fn acquire_recv_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
FileGuard::lock(recv_lock_filename(base.as_ref())).await
}
pub struct ReceiverBuilder {
save_every_nth: Option<usize>,
save_every: Option<Duration>,
}
impl Default for ReceiverBuilder {
fn default() -> ReceiverBuilder {
ReceiverBuilder {
save_every_nth: Some(250),
save_every: Some(Duration::from_millis(350)),
}
}
}
impl ReceiverBuilder {
pub fn new() -> ReceiverBuilder {
ReceiverBuilder::default()
}
pub fn save_every_nth(mut self, nth: Option<usize>) -> ReceiverBuilder {
self.save_every_nth = nth;
self
}
pub fn save_every(mut self, duration: Option<Duration>) -> ReceiverBuilder {
self.save_every = duration;
self
}
pub fn open<P: AsRef<Path>>(self, base: P) -> io::Result<Receiver> {
create_dir_all(base.as_ref())?;
log::trace!("created queue directory");
check_queue_version(base.as_ref())?;
let file_guard = try_acquire_recv_lock(base.as_ref())?;
let mut persistence = QueueStatePersistence::new();
let state = persistence.open(base.as_ref())?;
log::trace!("receiver lock acquired. Receiver state now is {:?}", state);
let mut tail_follower =
TailFollower::open(&segment_filename(base.as_ref(), state.segment))?;
tail_follower.seek(io::SeekFrom::Start(state.position))?;
log::trace!("last segment opened fo reading");
Ok(Receiver {
_file_guard: file_guard,
tail_follower,
maybe_header: None,
state,
initial_state: state,
base: PathBuf::from(base.as_ref()),
persistence,
read_and_unused: VecDeque::new(),
save_every: self.save_every,
save_every_nth: self.save_every_nth,
n_reads: 0,
last_saved_at: Instant::now(),
})
}
}
pub struct Receiver {
base: PathBuf,
_file_guard: FileGuard,
tail_follower: TailFollower,
maybe_header: Option<[u8; 4]>,
state: QueueState,
initial_state: QueueState,
persistence: QueueStatePersistence,
read_and_unused: VecDeque<Vec<u8>>,
save_every_nth: Option<usize>,
save_every: Option<Duration>,
n_reads: usize,
last_saved_at: Instant,
}
impl Receiver {
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<Receiver> {
ReceiverBuilder::default().open(base)
}
fn begin(&mut self) {
log::debug!("begin transaction in {:?} at {:?}", self.base, self.state);
}
fn go_to(&mut self, state: QueueState) -> io::Result<()> {
let different_segment = self.state.segment != state.segment;
log::debug!("going from {:?} to {:?}", self.state, state);
self.state = state;
if different_segment {
log::debug!("opening segment {}", self.state.segment);
self.tail_follower =
TailFollower::open(&segment_filename(&self.base, self.state.segment))?;
}
self.tail_follower
.seek(io::SeekFrom::Start(state.position))?;
Ok(())
}
fn end(&mut self) -> io::Result<()> {
assert!(
self.state.segment >= self.initial_state.segment,
"advanced to a past position. Initial was {:?}; current is {:?}",
self.initial_state,
self.state
);
for segment_id in self.initial_state.segment..self.state.segment {
log::debug!("removing segment {} from {:?}", segment_id, self.base);
remove_file(segment_filename(&self.base, segment_id))?;
}
log::debug!(
"end transaction in {:?} at {:?} (from {:?})",
self.base,
self.state,
self.initial_state
);
assert!(
self.read_and_unused.is_empty(),
"There were read and unused items at the end of transaction. Read and unused queue: {:?}",
self.read_and_unused
);
self.initial_state = self.state;
self.maybe_save()?;
Ok(())
}
async fn read_header(&mut self) -> io::Result<Header> {
if let Some(header) = self.maybe_header {
return Ok(Header::decode(header));
}
let mut header = [0; 4];
self.tail_follower.read_exact(&mut header).await?;
if header == HEADER_EOF {
log::trace!("got EOF header. Advancing...");
let mut new_state = self.state.clone();
new_state.advance_segment();
self.go_to(new_state)?;
log::trace!("re-reading new header from new file");
self.tail_follower.read_exact(&mut header).await?;
}
self.maybe_header = Some(header.clone());
let decoded = Header::decode(header);
self.state.advance_position(4);
log::trace!("got header {:?} (read {} bytes)", header, decoded.len());
Ok(decoded)
}
async fn read_one(&mut self) -> io::Result<()> {
let header = self.read_header().await?;
let mut data = vec![0; header.len() as usize];
if header.len() > 0 {
self.tail_follower
.read_exact(&mut data)
.await
.expect("poisoned queue");
}
self.state.advance_position(data.len() as u64);
self.maybe_header = None;
self.read_and_unused.push_back(data);
self.n_reads += 1;
Ok(())
}
async fn read_one_timeout<F>(&mut self, timeout: F) -> io::Result<bool>
where
F: Future<Output = ()> + Unpin,
{
match future::select(Box::pin(self.read_one()), timeout).await {
future::Either::Left((read_one, _)) => read_one.map(|_| true),
future::Either::Right((_, _)) => Ok(false),
}
}
fn drain(&mut self, n: usize) -> Vec<Vec<u8>> {
let mut data = Vec::with_capacity(n);
if n > 0 {
while let Some(element) = self.read_and_unused.pop_front() {
data.push(element);
if data.len() == n {
break;
}
}
}
data
}
pub fn save(&mut self) -> io::Result<()> {
self.persistence.save(&self.initial_state) }
fn maybe_save(&mut self) -> io::Result<()> {
if let Some(save_every_nth) = self.save_every_nth {
if self.n_reads % save_every_nth == 0 {
self.save()?;
}
} else if let Some(save_every) = self.save_every {
if self.last_saved_at.elapsed() >= save_every {
self.save()?;
}
}
Ok(())
}
pub async fn recv(&mut self) -> io::Result<RecvGuard<'_, Vec<u8>>> {
self.begin();
let data = if let Some(data) = self.read_and_unused.pop_front() {
data
} else {
self.read_one().await?;
self.read_and_unused
.pop_front()
.expect("guaranteed to yield an element")
};
Ok(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
})
}
pub fn try_recv(&mut self) -> Result<RecvGuard<'_, Vec<u8>>, TryRecvError> {
TryRecvError::result_from_option(self.recv().now_or_never())
}
pub async fn recv_timeout<F>(
&mut self,
timeout: F,
) -> io::Result<Option<RecvGuard<'_, Vec<u8>>>>
where
F: Future<Output = ()> + Unpin,
{
self.begin();
let data = if let Some(data) = self.read_and_unused.pop_front() {
data
} else {
if self.read_one_timeout(timeout).await? {
self.read_and_unused
.pop_front()
.expect("guaranteed to yield an element")
} else {
return Ok(None);
}
};
Ok(Some(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
}))
}
pub async fn recv_batch(&mut self, n: usize) -> io::Result<RecvGuard<'_, Vec<Vec<u8>>>> {
self.begin();
if n > self.read_and_unused.len() {
for _ in 0..(n - self.read_and_unused.len()) {
self.read_one().await?;
}
}
let data = self.drain(n);
Ok(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
})
}
pub async fn recv_batch_up_to(&mut self, n: usize) -> io::Result<RecvGuard<'_, Vec<Vec<u8>>>> {
self.begin();
while self.read_and_unused.len() < n {
if self.read_one().now_or_never().is_none() {
break;
}
}
if n > 0 {
if self.read_and_unused.is_empty() {
self.read_one().await?;
}
}
let data = self.drain(n);
Ok(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
})
}
pub fn try_recv_batch(
&mut self,
n: usize,
) -> Result<RecvGuard<'_, Vec<Vec<u8>>>, TryRecvError> {
TryRecvError::result_from_option(self.recv_batch(n).now_or_never())
}
pub fn try_recv_batch_up_to(
&mut self,
n: usize,
) -> Result<RecvGuard<'_, Vec<Vec<u8>>>, TryRecvError> {
TryRecvError::result_from_option(self.recv_batch_up_to(n).now_or_never())
}
pub async fn recv_batch_timeout<F>(
&mut self,
n: usize,
mut timeout: F,
) -> io::Result<RecvGuard<'_, Vec<Vec<u8>>>>
where
F: Future<Output = ()> + Unpin,
{
self.begin();
let mut n_read = 0;
if n > self.read_and_unused.len() {
for _ in 0..(n - self.read_and_unused.len()) {
if !self.read_one_timeout(&mut timeout).await? {
break;
} else {
n_read += 1;
}
}
}
let data = self.drain(n_read);
Ok(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
})
}
pub async fn recv_until<P, Fut>(
&mut self,
mut predicate: P,
) -> io::Result<RecvGuard<'_, Vec<Vec<u8>>>>
where
P: FnMut(Option<&[u8]>) -> Fut,
Fut: std::future::Future<Output = bool>,
{
self.begin();
let mut n_read = 0;
predicate(None).await;
loop {
if n_read == self.read_and_unused.len() {
self.read_one().await?;
}
let item_ref = &self.read_and_unused[n_read];
if !predicate(Some(item_ref)).await {
n_read += 1;
} else {
break;
}
}
let data = self.drain(n_read);
Ok(RecvGuard {
receiver: self,
item: Some(data),
was_finished: false,
})
}
pub fn try_recv_until<P, Fut>(
&mut self,
mut predicate: P,
) -> Result<RecvGuard<'_, Vec<Vec<u8>>>, TryRecvError>
where
P: FnMut(Option<&[u8]>) -> bool,
{
TryRecvError::result_from_option(
self.recv_until(move |el| {
let outcome = predicate(el);
async move { outcome }
})
.now_or_never(),
)
}
}
impl Drop for Receiver {
fn drop(&mut self) {
if let Err(err) = self.save() {
log::error!(
"(probably) could not save queue state during `Drop`: {}",
err
);
}
}
}
pub struct RecvGuard<'a, T> {
receiver: &'a mut Receiver,
item: Option<T>,
was_finished: bool,
}
impl<'a, T> Drop for RecvGuard<'a, T> {
fn drop(&mut self) {
if !self.was_finished {
if let Err(err) = self.rollback_mut() {
log::error!("unable to rollback on drop: {}", err);
}
}
}
}
impl<'a, T> Deref for RecvGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
self.item.as_ref().expect("unreachable")
}
}
impl<'a, T> DerefMut for RecvGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
self.item.as_mut().expect("unreachable")
}
}
impl<'a, T> RecvGuard<'a, T> {
pub fn try_into_inner(mut self) -> io::Result<T> {
let item = self.item.take().expect("unreachable");
self.commit()?;
Ok(item)
}
pub fn commit(mut self) -> io::Result<()> {
self.receiver.end()?;
self.was_finished = true;
Ok(())
}
fn rollback_mut(&mut self) -> io::Result<()> {
self.receiver.go_to(self.receiver.initial_state)?;
self.receiver.end()?;
self.was_finished = true;
Ok(())
}
pub fn rollback(mut self) -> io::Result<()> {
self.rollback_mut()
}
}