use crate::api::Encodable;
use crate::api::Reader;
use crate::api::WriteError;
use crate::api::Writer;
use crate::core::ReadResult;
use crate::core::TryIter;
use crossbeam_utils::Backoff;
use parking_lot::Mutex;
use std::iter::FusedIterator;
use std::iter::Iterator;
use std::sync::Arc;
#[repr(transparent)]
pub struct RetryIter<'a, R: Reader> {
inner: TryIter<'a, R>,
}
impl<'a, R: Reader> From<TryIter<'a, R>> for RetryIter<'a, R> {
fn from(try_iter: TryIter<'a, R>) -> RetryIter<'a, R> {
RetryIter { inner: try_iter }
}
}
impl<'a, R: Reader> Iterator for RetryIter<'a, R> {
type Item = ReadResult<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let backoff = Backoff::new();
loop {
let res = self.inner.next();
match res {
Some(ReadResult::Nothing) => {
if backoff.is_completed() {
return res;
} else {
backoff.snooze();
}
}
Some(_) => return res,
None => return None,
}
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl<'a, R: Reader> FusedIterator for RetryIter<'a, R> {}
#[repr(transparent)]
pub struct RetryWriter<W: Writer> {
mx_writer: Arc<Mutex<W>>,
}
impl<W: Writer> RetryWriter<W> {
#[inline]
pub fn new(mx_writer: Arc<Mutex<W>>) -> RetryWriter<W> {
RetryWriter { mx_writer }
}
}
impl<W: Writer> Writer for RetryWriter<W> {
#[inline]
fn write<E: Encodable>(&mut self, data: &E) -> Result<u32, WriteError> {
let backoff = Backoff::new();
loop {
let try_write = self.mx_writer.try_lock();
match try_write {
Some(mut writer) => {
return writer.write(data);
}
None => {
if backoff.is_completed() {
return Err(WriteError::Wait);
} else {
backoff.snooze();
}
}
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::api::*;
use crate::core::*;
use assert_matches::assert_matches as match_assert;
use tempdir::TempDir;
#[test]
fn retry_iter() {
let metadata = Metadata::new(100, 1000, 10000, 1000, 1000, TickUnit::Millis);
let test_tmp_dir = TempDir::new("kektest").unwrap();
let txt = "There are 10 kinds of people";
let mut msgs = txt.split_whitespace();
let mut writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
let mut reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
let mut retry_iter: RetryIter<ShmReader> = reader.try_iter().into();
match_assert!(retry_iter.size_hint(), (0, None));
match_assert!(retry_iter.next(), Some(ReadResult::Nothing));
for _i in 0..3 {
let to_wr = msgs.next().unwrap().as_bytes();
writer.write(&to_wr).unwrap();
}
for _i in 0..3 {
match_assert!(retry_iter.next(), Some(ReadResult::Record(_)));
}
match_assert!(retry_iter.next(), Some(ReadResult::Nothing));
std::mem::drop(writer);
match_assert!(retry_iter.next(), Some(ReadResult::Failed(ReadError::Closed)));
match_assert!(retry_iter.next(), None);
match_assert!(retry_iter.size_hint(), (0, Some(0)));
}
#[test]
fn retry_write() {
let metadata = Metadata::new(100, 1000, 10000, 1000, 1000, TickUnit::Millis);
let test_tmp_dir = TempDir::new("kektest").unwrap();
let writer = shm_writer(&test_tmp_dir.path(), &metadata, EncoderHandler::default()).unwrap();
let arc_mx = Arc::new(Mutex::new(writer));
let handles: Vec<std::thread::JoinHandle<()>> = (0..5)
.map(|i| (i, arc_mx.clone()))
.map(|(i, arc_mx)| {
std::thread::spawn(move || {
let to_wr = format!("Hello {}", &i);
let mut retry_w = RetryWriter::new(arc_mx);
for _i in 0..3 {
loop {
match retry_w.write(&to_wr) {
Ok(_) => break,
Err(WriteError::Wait) => (),
Err(_) => panic!("Write failure"),
};
}
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
let mut read_results = std::collections::HashMap::<&str, i32>::new();
let mut shm_reader = shm_reader(&test_tmp_dir.path(), 1000).unwrap();
let reader_iter = shm_reader.try_iter();
for msg in reader_iter {
match msg {
ReadResult::Record(data) => {
*read_results.entry(std::str::from_utf8(data).unwrap()).or_default() += 1;
}
_ => break,
}
}
assert_eq!(read_results.len(), 5);
for (key, value) in read_results {
dbg!(key, value);
assert_eq!(value, 3);
}
}
}