use super::utils::{align, load_atomic_u64, CLOSE, REC_HEADER_LEN, U64_SIZE, WATERMARK};
use super::Metadata;
use crate::api::ReadError::*;
use crate::api::{ChannelError, ReadError, Reader};
use crate::core::TickUnit;
use log::{error, info, warn};
use memmap::MmapMut;
use std::iter::FusedIterator;
use std::iter::Iterator;
use std::result::Result;
use std::sync::atomic::Ordering;
const END_OF_TIME: u64 = std::u64::MAX;
#[derive(Debug)]
pub struct ShmReader {
metadata: Metadata,
data_ptr: *const u8,
read_index: u32,
failure: Option<ReadError>,
_mmap: MmapMut,
}
impl ShmReader {
#[allow(clippy::cast_ptr_alignment)]
pub(super) fn new(mut mmap: MmapMut) -> Result<ShmReader, ChannelError> {
let buf = &mut mmap[..];
let metadata = Metadata::read(buf)?;
let metadata_ptr = buf.as_ptr() as *mut u64;
let data_ptr = unsafe { metadata_ptr.add(metadata.len() as usize) } as *const u8;
info!("Kekbit Reader successfully created");
Ok(ShmReader {
metadata,
data_ptr,
read_index: 0,
failure: None,
_mmap: mmap,
})
}
#[inline]
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
pub fn position(&self) -> u32 {
self.read_index
}
#[inline]
pub fn try_iter(&mut self) -> TryIter<Self> {
TryIter { inner: self }
}
#[inline]
fn record_failure(&mut self, failure: ReadError) -> ReadError {
if self.failure.is_none() {
self.failure = Some(failure);
}
failure
}
}
impl Reader for ShmReader {
#[allow(clippy::cast_ptr_alignment)]
#[allow(clippy::cast_ptr_alignment)]
fn try_read<'a>(&mut self) -> Result<Option<&'a [u8]>, ReadError> {
let crt_index = self.read_index as usize;
debug_assert!(crt_index + U64_SIZE < self.metadata.capacity() as usize);
let rec_len: u64 = unsafe { load_atomic_u64(self.data_ptr.add(crt_index) as *mut u64, Ordering::Acquire) };
if rec_len <= self.metadata.max_msg_len() as u64 {
let rec_size = align(REC_HEADER_LEN + rec_len as u32);
debug_assert!((crt_index + rec_size as usize) < self.metadata.capacity() as usize);
self.read_index += rec_size;
debug_assert!(rec_len > 0);
unsafe {
Ok(Some(std::slice::from_raw_parts(
self.data_ptr.add(crt_index + REC_HEADER_LEN as usize),
rec_len as usize,
)))
}
} else {
match rec_len {
WATERMARK => Ok(None),
CLOSE => {
info!("Producer closed channel");
Err(self.record_failure(Closed))
}
_ => {
error!(
"Channel corrupted. Unknown Marker {:#016X} at position {} ",
rec_len, self.read_index,
);
Err(self.record_failure(Failed))
}
}
}
}
#[inline]
fn exhausted(&self) -> Option<ReadError> {
self.failure
}
}
pub struct TimeoutReader<R: Reader> {
inner: R,
tick: TickUnit,
to_interval: u64,
expiration: u64,
expired: Option<ReadError>,
}
impl<R: Reader> TimeoutReader<R> {
#[inline]
pub fn new(reader: R, tick: TickUnit, timeout: u64) -> TimeoutReader<R> {
TimeoutReader {
inner: reader,
tick,
to_interval: timeout,
expiration: END_OF_TIME,
expired: None,
}
}
#[inline]
pub fn try_iter(&mut self) -> TryIter<Self> {
TryIter { inner: self }
}
}
impl<R: Reader> Reader for TimeoutReader<R> {
#[inline]
fn try_read<'b>(&mut self) -> Result<Option<&'b [u8]>, ReadError> {
match self.exhausted() {
Some(err) => Err(err),
None => {
let read_res = self.inner.try_read()?;
if read_res.is_none() {
if self.expiration == END_OF_TIME {
self.expiration = self.tick.nix_time() + self.to_interval;
} else {
let crt_time = self.tick.nix_time();
if self.expiration <= crt_time {
warn!("Writer timeout detected. Channel will be abandoned. No reads will be performed");
self.expired = Some(Timeout(self.expiration));
return Err(self.expired.unwrap());
}
}
return Ok(None);
}
self.expiration = END_OF_TIME;
Ok(read_res)
}
}
}
#[inline]
fn exhausted(&self) -> Option<ReadError> {
self.inner.exhausted().or(self.expired)
}
}
impl From<ShmReader> for TimeoutReader<ShmReader> {
#[inline]
fn from(reader: ShmReader) -> TimeoutReader<ShmReader> {
let metadata = reader.metadata();
let tick = metadata.tick_unit();
let timeout = metadata.timeout();
TimeoutReader::new(reader, tick, timeout)
}
}
#[derive(Debug)]
pub enum ReadResult<'a> {
Record(&'a [u8]),
Nothing,
Failed(ReadError),
}
#[repr(transparent)]
pub struct TryIter<'a, R: Reader> {
inner: &'a mut R,
}
impl<'a, R: Reader> Iterator for TryIter<'a, R> {
type Item = ReadResult<'a>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.inner.exhausted().is_none() {
match self.inner.try_read() {
Ok(None) => Some(ReadResult::Nothing),
Ok(Some(record)) => Some(ReadResult::Record(record)),
Err(fault) => Some(ReadResult::Failed(fault)),
}
} else {
None
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
if self.inner.exhausted().is_none() {
(0, None)
} else {
(0, Some(0))
}
}
}
impl<'a, R: Reader> FusedIterator for TryIter<'a, R> {}