use crate::api::ChannelError::AccessError;
use crate::api::{ChannelError, WriteError, Writer};
use crate::header::Header;
use crate::utils::{align, store_atomic_u64, CLOSE, REC_HEADER_LEN, WATERMARK};
use kekbit_codecs::codecs::DataFormat;
use kekbit_codecs::codecs::Encodable;
use log::{debug, error, info};
use memmap::MmapMut;
use std::cmp::min;
use std::io::Write;
use std::ptr::copy_nonoverlapping;
use std::result::Result;
use std::sync::atomic::Ordering;
/// An implementation of the [Writer](trait.Writer.html) which access a persistent channel through
/// memory mapping, and uses a specific [DataFormat](../codecs/trait.DataFormat.html). A `ShmWriter` must be created using the [shm_writer](fn.shm_writer.html) function.
/// Any `ShmWriter` exclusively holds the channel is bound to, and it is *not thread safe*.
/// If multiple threads must write into a channel they should be externally synchronized.
///
/// # Examples
///
/// ```
/// use kekbit_core::tick::TickUnit::Nanos;
/// use kekbit_core::shm::*;
/// use kekbit_core::header::Header;
/// use kekbit_core::api::Writer;
/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
///
/// const FOREVER: u64 = 99_999_999_999;
/// let writer_id = 1850;
/// let channel_id = 42;
/// let capacity = 3000;
/// let max_msg_len = 100;
/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
/// writer.heartbeat().unwrap();
/// ```
pub struct ShmWriter<D: DataFormat> {
header: Header,
data_ptr: *mut u8,
write_offset: u32,
mmap: MmapMut,
df: D,
write: KekWrite,
}
impl<D: DataFormat> ShmWriter<D> {
#[allow(clippy::cast_ptr_alignment)]
pub(super) fn new(mut mmap: MmapMut, df: D) -> Result<ShmWriter<D>, ChannelError> {
let buf = &mut mmap[..];
let header = Header::read(buf)?;
let header_ptr = buf.as_ptr() as *mut u64;
let head_len = header.len();
let data_ptr = unsafe { header_ptr.add(head_len) } as *mut u8;
let write = KekWrite::new(data_ptr, header.max_msg_len() as usize);
let mut writer = ShmWriter {
header,
data_ptr,
write_offset: 0,
mmap,
df,
write,
};
info!(
"Kekbit channel writer created. Size is {}MB. Max msg size {}KB",
writer.header.capacity() / 1_000_000,
writer.header.max_msg_len() / 1_000
);
//sent the very first original heart bear
match writer.heartbeat() {
Ok(_) => {
info!("Initial hearbeat successfully sent!");
Ok(writer)
}
Err(we) => Err(AccessError {
reason: format!("Initial heartbeat failed!. Reason {:?}", we),
}),
}
}
#[inline(always)]
fn write_metadata(&mut self, write_ptr: *mut u64, len: u64, aligned_rec_len: u32) {
unsafe {
//we should always have the 8 bytes required by WATERMARK as they are acounted in the Footer
store_atomic_u64(write_ptr.add(aligned_rec_len as usize), WATERMARK, Ordering::Release);
}
store_atomic_u64(write_ptr, len, Ordering::Release);
}
}
impl<D: DataFormat> Writer<D> for ShmWriter<D> {
/// Writes a message into the channel. This operation will encode the data directly into channel.
/// While this is a non blocking operation, only one write should be executed at any given time.
///
/// Returns the total amount of bytes wrote into the channel which includes, the size of the message,
/// the size of the message header and the amount of padding add to that message.
///
/// # Arguments
///
/// * `data` - The data which to encode and write into the channel.
///
/// # Errors
///
/// Two kinds of [failures](enum.WriteError.html) may occur. One if the encoding operation failed, the other if the channel
/// rejected the message for reasons such data is too large or no space is available in the channel.
///
/// # Examples
///
/// ```
/// use kekbit_core::tick::TickUnit::Nanos;
/// use kekbit_core::shm::*;
/// use kekbit_core::header::Header;
/// use kekbit_core::api::Writer;
/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
///
/// const FOREVER: u64 = 99_999_999_999;
/// let writer_id = 1850;
/// let channel_id = 42;
/// let capacity = 30_000;
/// let max_msg_len = 100;
/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
/// let msg = "There are 10 kinds of people: those who know binary and those who don't";
/// let msg_data = msg.as_bytes();
/// writer.write(&msg_data).unwrap();
/// ```
///
#[allow(clippy::cast_ptr_alignment)]
fn write(&mut self, data: &impl Encodable<D>) -> Result<u32, WriteError> {
let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
let write_ptr = unsafe { read_head_ptr.add(REC_HEADER_LEN as usize) };
let available = self.available();
if available <= REC_HEADER_LEN {
return Err(WriteError::ChannelFull);
}
let len = min(self.header.max_msg_len(), available - REC_HEADER_LEN) as usize;
let write_res = data.encode_to(&self.df, self.write.reset(write_ptr, len));
match write_res {
Ok(0) => Err(WriteError::NoSpaceForRecord),
Ok(_) => {
if !self.write.failed {
let aligned_rec_len = align(self.write.total as u32 + REC_HEADER_LEN);
self.write_metadata(read_head_ptr as *mut u64, self.write.total as u64, aligned_rec_len >> 3);
self.write_offset += aligned_rec_len;
Ok(aligned_rec_len)
} else {
Err(WriteError::NoSpaceForRecord)
}
}
Err(io_err) => Err(WriteError::EncodingError(io_err)),
}
}
///Push a heartbeat message into the channel. Hearbeats are zero sized messages which do not need encoding.
///Reader should never activate callbacks for heartbeat messsages.
///
/// Returns RecordHeaderLen, 8 in the current version if the operation succeeds.
///
/// # Errors
///
/// If the operation fails a *ChannelFull* error will be returned, which signals that the channel will not accept any new messages.
///
#[allow(clippy::cast_ptr_alignment)]
#[inline]
fn heartbeat(&mut self) -> Result<u32, WriteError> {
let read_head_ptr = unsafe { self.data_ptr.add(self.write_offset as usize) };
let available = self.available();
if available <= REC_HEADER_LEN {
return Err(WriteError::ChannelFull);
}
let aligned_rec_len = REC_HEADER_LEN; //no need to align REC_HEADER)LEN must be align
self.write_metadata(read_head_ptr as *mut u64, 0u64, aligned_rec_len >> 3);
self.write_offset += aligned_rec_len;
Ok(aligned_rec_len)
}
/// Flushes the channel's outstanding memory map modifications to disk. Calling this method explicitly
/// it is not encouraged as flushing does occur automatically and comes with a performance penalty.
/// It should be used only if for various reasons a writer wants to persist the channel data to the disk
/// at a higher rate than is done automatically.
///
/// Returns Ok(()) if the operation succeeds.
///
/// # Errors
///
/// If flushing fails an I/O error is returned.
///
/// # Examples
///
/// ```
/// use kekbit_core::tick::TickUnit::Nanos;
/// use kekbit_core::shm::*;
/// use kekbit_core::header::Header;
/// use kekbit_core::api::Writer;
/// use kekbit_codecs::codecs::raw::RawBinDataFormat;
///
/// const FOREVER: u64 = 99_999_999_999;
/// let writer_id = 1850;
/// let channel_id = 42;
/// let capacity = 30_000;
/// let max_msg_len = 100;
/// let header = Header::new(writer_id, channel_id, capacity, max_msg_len, FOREVER, Nanos);
/// let test_tmp_dir = tempdir::TempDir::new("kektest").unwrap();
/// let mut writer = shm_writer(&test_tmp_dir.path(), &header, RawBinDataFormat).unwrap();
/// let msg = "There are 10 kinds of people: those who know binary and those who don't";
/// let msg_data = msg.as_bytes();
/// writer.write(&msg_data).unwrap();
/// writer.flush().unwrap();
/// ```
#[inline]
fn flush(&mut self) -> Result<(), std::io::Error> {
debug!("Flushing the channel");
self.mmap.flush()
}
}
impl<D: DataFormat> Drop for ShmWriter<D> {
/// Marks this channel as `closed`, flushes the changes to the disk, and removes the memory mapping.
fn drop(&mut self) {
let write_index = self.write_offset;
info!("Closing message queue..");
unsafe {
#[allow(clippy::cast_ptr_alignment)]
//we should always have the 8 bytes required by CLOSE as they are acounted in the Footer
let write_ptr = self.data_ptr.offset(write_index as isize) as *mut u64;
store_atomic_u64(write_ptr, CLOSE, Ordering::Release);
info!("Closing message sent")
}
self.write_offset = self.mmap.len() as u32;
if self.mmap.flush().is_ok() {
info!("All changes flushed");
} else {
error!("Flush Failed");
}
}
}
impl<D: DataFormat> ShmWriter<D> {
///Returns the amount of space in this channel still available for write.
#[inline]
pub fn available(&self) -> u32 {
(self.header.capacity() - self.write_offset) & 0xFFFF_FFF8 //rounded down to alignement
}
///Returns the amount of data written into this channel.
#[inline]
pub fn write_offset(&self) -> u32 {
self.write_offset
}
///Returns a reference to the [Header](struct.Header.html) associated with this channel.
#[inline]
pub fn header(&self) -> &Header {
&self.header
}
#[inline]
pub fn data_format(&self) -> &D {
&self.df
}
}
struct KekWrite {
write_ptr: *mut u8,
max_size: usize,
total: usize,
failed: bool,
}
impl KekWrite {
#[inline]
fn new(write_ptr: *mut u8, max_size: usize) -> Self {
KekWrite {
write_ptr,
max_size,
total: 0,
failed: false,
}
}
#[inline]
fn reset(&mut self, write_ptr: *mut u8, max_size: usize) -> &mut Self {
self.write_ptr = write_ptr;
self.max_size = max_size;
self.total = 0;
self.failed = false;
self
}
}
impl Write for KekWrite {
#[inline]
fn write(&mut self, data: &[u8]) -> Result<usize, std::io::Error> {
if self.failed {
return Ok(0);
}
let data_len = data.len();
if self.total + data_len > self.max_size {
self.failed |= true;
return Ok(0);
}
unsafe {
let crt_ptr = self.write_ptr.add(self.total as usize);
copy_nonoverlapping(data.as_ptr(), crt_ptr, data_len);
}
self.total += data_len;
Ok(data_len)
}
#[inline]
fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_write() {
let mut raw_data: [u8; 1000] = [0; 1000];
let write_ptr = raw_data.as_mut_ptr();
let mut kw = KekWrite::new(write_ptr, 20);
kw.flush().unwrap(); //should never crash as it does nothing
let d1: [u8; 10] = [1; 10];
let r1 = kw.write(&d1).unwrap();
assert_eq!(kw.total, r1);
assert!(!kw.failed);
for i in 0..10 {
assert_eq!(raw_data[i], 1);
}
kw.flush().unwrap(); //should never crash as it does nothing
let r2 = kw.write(&d1).unwrap();
assert_eq!(kw.total, r1 + r2);
assert!(!kw.failed);
for i in 10..20 {
assert_eq!(raw_data[i], 1);
}
let r3 = kw.write(&d1).unwrap();
assert_eq!(0, r3);
assert!(kw.failed);
kw.reset(write_ptr, 15);
assert!(!kw.failed);
let d2: [u8; 10] = [2; 10];
let r4 = kw.write(&d2).unwrap();
assert_eq!(kw.total, r4);
assert!(!kw.failed);
for i in 0..10 {
assert_eq!(raw_data[i], 2);
}
assert_eq!(kw.total, 10);
let r5 = kw.write(&d2).unwrap();
assert_eq!(0, r5);
assert!(kw.failed);
assert_eq!(kw.total, 10);
//once it fails it will never recover, even if it has enough space
let r6 = kw.write(&d2[0..3]).unwrap();
assert_eq!(0, r6);
assert!(kw.failed);
assert_eq!(kw.total, 10);
}
}