use std::fs::*;
use std::io::{self, Write};
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use crate::error::TrySendError;
use crate::header::Header;
use crate::state::QueueState;
use crate::sync::{DeletionEvent, FileGuard};
use crate::version::check_queue_version;
use super::{segment_filename, HEADER_EOF};
pub(crate) fn send_lock_filename<P: AsRef<Path>>(base: P) -> PathBuf {
base.as_ref().join("send.lock")
}
pub(crate) fn try_acquire_send_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
FileGuard::try_lock(send_lock_filename(base.as_ref()))?.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
format!(
"queue `{}` sender side already in use",
base.as_ref().to_string_lossy()
),
)
})
}
pub(crate) async fn acquire_send_lock<P: AsRef<Path>>(base: P) -> io::Result<FileGuard> {
FileGuard::lock(send_lock_filename(base.as_ref())).await
}
pub(crate) struct QueueSize {
pub(crate) in_bytes: u64,
pub(crate) in_segments: u64,
}
pub(crate) fn get_queue_size<P: AsRef<Path>>(base: P) -> io::Result<QueueSize> {
let mut in_bytes = 0;
let mut in_segments = 0;
for dir_entry in read_dir(base.as_ref())? {
let dir_entry = dir_entry?;
if let Some(extension) = dir_entry.path().extension() {
if extension == "q" {
in_bytes += dir_entry.metadata()?.len();
in_segments += 1;
}
}
}
Ok(QueueSize {
in_bytes,
in_segments,
})
}
pub struct SenderBuilder {
segment_size: NonZeroU64,
max_queue_size: Option<NonZeroU64>,
}
impl Default for SenderBuilder {
fn default() -> SenderBuilder {
SenderBuilder {
segment_size: NonZeroU64::new(1024 * 1024 * 4).expect("impossible"), max_queue_size: None,
}
}
}
impl SenderBuilder {
pub fn new() -> SenderBuilder {
SenderBuilder::default()
}
pub fn segment_size(mut self, size: u64) -> SenderBuilder {
let size = NonZeroU64::new(size).expect("got segment_size=0");
self.segment_size = size;
self
}
pub fn max_queue_size(mut self, size: Option<u64>) -> SenderBuilder {
let size = size.map(|s| NonZeroU64::new(s).expect("got max_queue_size=0"));
self.max_queue_size = size;
self
}
pub fn open<P: AsRef<Path>>(self, base: P) -> io::Result<Sender> {
create_dir_all(base.as_ref())?;
log::trace!("created queue directory");
check_queue_version(base.as_ref())?;
let file_guard = try_acquire_send_lock(base.as_ref())?;
let state = QueueState::for_send_metadata(base.as_ref())?;
log::trace!("sender lock acquired. Sender state now is {:?}", state);
let file = io::BufWriter::new(
OpenOptions::new()
.create(true)
.append(true)
.open(segment_filename(base.as_ref(), state.segment))?,
);
log::trace!("last segment opened for appending");
Ok(Sender {
segment_size: self.segment_size,
max_queue_size: self.max_queue_size,
_file_guard: file_guard,
file,
state,
deletion_stream: None,
base: PathBuf::from(base.as_ref()),
})
}
}
pub struct Sender {
segment_size: NonZeroU64,
max_queue_size: Option<NonZeroU64>,
_file_guard: FileGuard,
file: io::BufWriter<File>,
state: QueueState,
deletion_stream: Option<DeletionEvent>, base: PathBuf,
}
impl Sender {
pub fn open<P: AsRef<Path>>(base: P) -> io::Result<Sender> {
SenderBuilder::default().open(base)
}
#[deprecated(
since = "0.5.0",
note = "the sender state is now always inferred. There is no need to save anything"
)]
pub fn save(&mut self) -> io::Result<()> {
Ok(())
}
fn write(&mut self, data: &[u8]) -> io::Result<u64> {
let len = data.as_ref().len();
assert!(len < std::u64::MAX as usize);
let header = Header::new(len as u32).encode();
self.file.write_all(&header)?;
self.file.write_all(data.as_ref())?;
Ok(4 + len as u64)
}
fn is_past_end(&self) -> bool {
self.state.position > self.segment_size.get()
}
#[must_use = "you need to always check if a segment was created or not!"]
fn try_cap_off_and_move(&mut self) -> io::Result<bool> {
if let Some(max_queue_size) = self.max_queue_size {
let queue_size = get_queue_size(&self.base)?;
if queue_size.in_bytes >= max_queue_size.get() && queue_size.in_segments > 1 {
log::trace!(
"oops! Directory size is {}, but max queue size is {}",
queue_size.in_bytes,
max_queue_size.get()
);
return Ok(false);
}
}
log::trace!("there is enough space for a new segment. Let's cap off and move on!");
self.file.write(&HEADER_EOF)?;
self.file.flush()?;
*self.file.get_mut() = OpenOptions::new()
.create(true)
.append(true)
.open(segment_filename(&self.base, self.state.advance_segment()))?;
Ok(true)
}
fn maybe_cap_off_and_move<T>(&mut self, item: T) -> Result<T, TrySendError<T>> {
if self.is_past_end() {
log::trace!("is past the segment end. Trying to cap off and move");
if !self.try_cap_off_and_move()? {
log::trace!(
"could not cap off and move. The queue `{:?}` is full",
self.base
);
return Err(TrySendError::QueueFull {
item,
base: self.base.clone(),
});
}
}
Ok(item)
}
fn deletion_stream(&mut self) -> &mut DeletionEvent {
if self.deletion_stream.is_none() {
let deletion_stream = DeletionEvent::new(&self.base);
self.deletion_stream = Some(deletion_stream);
}
self.deletion_stream.as_mut().unwrap() }
pub fn try_send<D: AsRef<[u8]>>(&mut self, data: D) -> Result<(), TrySendError<D>> {
let data = self.maybe_cap_off_and_move(data)?;
let written = self.write(data.as_ref())?;
self.file.flush()?; self.state.advance_position(written);
Ok(())
}
pub async fn send<D: AsRef<[u8]>>(&mut self, mut data: D) -> io::Result<()> {
loop {
match self.try_send(data) {
Ok(()) => break Ok(()),
Err(TrySendError::Io(err)) => break Err(err),
Err(TrySendError::QueueFull { item, .. }) => {
data = item; self.deletion_stream().await }
}
}
}
pub fn try_send_batch<I>(&mut self, it: I) -> Result<(), TrySendError<I>>
where
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
let it = self.maybe_cap_off_and_move(it)?;
let mut written = 0;
for item in it {
written += self.write(item.as_ref())?;
}
self.file.flush()?; self.state.advance_position(written);
Ok(())
}
pub async fn send_batch<I>(&mut self, mut it: I) -> io::Result<()>
where
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
loop {
match self.try_send_batch(it) {
Ok(()) => break Ok(()),
Err(TrySendError::Io(err)) => break Err(err),
Err(TrySendError::QueueFull { item, .. }) => {
it = item; self.deletion_stream().await }
}
}
}
}