use super::super::{
meta_publisher::MetaPublisher,
meta_subscriber::MetaSubscriber,
meta_topic::MetaTopic,
};
use std::{
fs::{OpenOptions, File},
sync::{
Arc,
atomic::{
AtomicUsize,
Ordering::Relaxed,
}
},
fmt::Debug,
num::NonZeroU32,
};
use memmap::{
MmapOptions,
MmapMut,
};
#[derive(Debug)]
pub struct MMapMeta<'a, SlotType: 'a> {
mmap_file_path: String,
mmap_file: File,
mmap_handle: MmapMut,
mmap_contents: &'a mut MMapContents<SlotType>,
buffer: &'a mut [SlotType],
}
#[repr(C)]
#[derive(Debug)]
struct MMapContents<SlotType> {
publisher_tail: AtomicUsize,
consumer_tail: AtomicUsize,
slice_length: AtomicUsize,
first_buffer_element: SlotType,
}
impl<'a, SlotType: 'a + Debug> MMapMeta<'a, SlotType> {
fn buffer_as_slice_mut(&self) -> &'a mut [SlotType] {
unsafe {
let mutable_self = &mut *((self as *const Self) as *mut Self);
std::slice::from_raw_parts_mut(&mut mutable_self.mmap_contents.first_buffer_element as *mut SlotType, mutable_self.mmap_contents.slice_length.load(Relaxed))
}
}
pub fn subscribe_to_old_events_only(self: &Arc<Self>) -> MMapMetaFixedSubscriber<'a, SlotType> {
todo!()
}
pub fn subscribe_to_new_events_only(self: &Arc<Self>) -> MMapMetaDynamicSubscriber<'a, SlotType> {
let first_element_slot_id = self.mmap_contents.consumer_tail.load(Relaxed);
MMapMetaDynamicSubscriber {
head: AtomicUsize::new(first_element_slot_id),
buffer: self.buffer_as_slice_mut(),
meta_mmap_log_topic: Arc::clone(&self),
}
}
pub fn subscribe_to_separated_old_and_new_events(self: &Arc<Self>) -> (MMapMetaFixedSubscriber<'a, SlotType>, MMapMetaDynamicSubscriber<'a, SlotType>) {
let tail = self.mmap_contents.consumer_tail.load(Relaxed);
(
MMapMetaFixedSubscriber {
head: AtomicUsize::new(0),
buffer: self.buffer_as_slice_mut(),
fixed_tail: tail,
},
MMapMetaDynamicSubscriber {
head: AtomicUsize::new(tail),
buffer: self.buffer_as_slice_mut(),
meta_mmap_log_topic: Arc::clone(&self),
},
)
}
pub fn subscribe_to_joined_old_and_new_events(self: &Arc<Self>) -> MMapMetaDynamicSubscriber<'a, SlotType> {
let first_element_slot_id = 0;
MMapMetaDynamicSubscriber {
head: AtomicUsize::new(first_element_slot_id),
buffer: self.buffer_as_slice_mut(),
meta_mmap_log_topic: Arc::clone(&self),
}
}
}
impl<'a, SlotType: 'a + Debug> MetaTopic<'a, SlotType> for MMapMeta<'a, SlotType> {
fn new<IntoString: Into<String>>(mmap_file_path: IntoString, max_slots: u64) -> Result<Arc<Self>, Box<dyn std::error::Error>> {
let mmap_file_path = mmap_file_path.into();
let mmap_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&mmap_file_path)
.map_err(|err| format!("Could not open file '{mmap_file_path}' (that would be used to mmap a `log_topic` buffer): {:?}", err))?;
let mmap_file_len = std::mem::size_of::<MMapContents<SlotType>>() as u64 + (max_slots-1) * std::mem::size_of::<SlotType>() as u64; mmap_file.set_len(0)
.map_err(|err| format!("mmapped log topic '{mmap_file_path}': Could not set the (sparsed) length of the mentioned mmapped file (after opening it) to {mmap_file_len}: {:?}", err, mmap_file_len = 0))?;
mmap_file.set_len(mmap_file_len as u64)
.map_err(|err| format!("mmapped log topic '{mmap_file_path}': Could not set the (sparsed) length of the mentioned mmapped file (after opening it) to {mmap_file_len}: {:?}", err))?;
let mut mmap_handle = unsafe {
MmapOptions::new()
.len(mmap_file_len as usize)
.map_mut(&mmap_file)
.map_err(|err| format!("Couldn't mmap the (already openned) file '{mmap_file_path}' in RW mode, for use as the backing storage of a `log_topic` buffer: {:?}", err))?
};
let mmap_contents: &mut MMapContents<SlotType> = unsafe { &mut *(mmap_handle.as_mut_ptr() as *mut MMapContents<SlotType>) };
unsafe { std::ptr::write_bytes::<MMapContents<SlotType>>(mmap_contents, 0, 1); } mmap_contents.slice_length.store(max_slots as usize, Relaxed);
mmap_contents.publisher_tail.store(0, Relaxed);
mmap_contents.consumer_tail.store(0, Relaxed);
let buffer_slice_ptr = &mut mmap_contents.first_buffer_element as *mut SlotType;
let slice_length = mmap_contents.slice_length.load(Relaxed);
Ok(Arc::new(Self {
mmap_file_path,
mmap_file,
mmap_handle,
mmap_contents,
buffer: unsafe { std::slice::from_raw_parts_mut(buffer_slice_ptr, slice_length) },
}))
}
}
impl<'a, SlotType: 'a + Debug> MetaPublisher<'a, SlotType> for MMapMeta<'a, SlotType> {
#[inline(always)]
fn publish<F: FnOnce(&mut SlotType)>(&self, setter: F) -> Option<NonZeroU32> {
let mutable_self = unsafe {&mut *((self as *const Self) as *mut Self)};
let tail = self.mmap_contents.publisher_tail.fetch_add(1, Relaxed);
let slot = unsafe { mutable_self.buffer.get_unchecked_mut(tail) };
setter(slot);
while self.mmap_contents.consumer_tail.compare_exchange_weak(tail, tail+1, Relaxed, Relaxed).is_err() {
std::hint::spin_loop();
}
NonZeroU32::new(1 + tail as u32)
}
#[inline(always)]
fn publish_movable(&self, item: SlotType) -> Option<NonZeroU32> {
self.publish(|slot| *slot = item)
}
fn leak_slot(&self) -> Option<(&'a mut SlotType, u32)> {
todo!()
}
fn publish_leaked_ref(&'a self, _slot: &'a SlotType) -> Option<NonZeroU32> {
todo!()
}
fn publish_leaked_id(&'a self, _slot_id: u32) -> Option<NonZeroU32> {
todo!()
}
fn unleak_slot_ref(&'a self, _slot: &'a mut SlotType) {
todo!()
}
fn unleak_slot_id(&'a self, _slot_id: u32) {
todo!()
}
#[inline(always)]
fn available_elements_count(&self) -> usize {
self.mmap_contents.publisher_tail.load(Relaxed)
}
#[inline(always)]
fn max_size(&self) -> usize {
self.mmap_contents.slice_length.load(Relaxed)
}
fn debug_info(&self) -> String {
todo!()
}
}
pub enum MMapMetaSubscriber<'a, SlotType: 'a + Debug> {
Dynamic (MMapMetaDynamicSubscriber<'a, SlotType>),
Fixed (MMapMetaFixedSubscriber<'a, SlotType>),
}
impl<'a, SlotType: 'a + Debug> MMapMetaSubscriber<'a, SlotType> {
#[inline(always)]
pub fn remaining_elements_count(&self) -> usize {
match self {
Self::Dynamic(subscriber) => subscriber.remaining_elements_count(),
Self::Fixed(subscriber) => subscriber.remaining_elements_count(),
}
}
}
pub struct MMapMetaDynamicSubscriber<'a, SlotType: 'a> {
head: AtomicUsize,
buffer: &'a mut [SlotType],
meta_mmap_log_topic: Arc<MMapMeta<'a, SlotType>>,
}
impl<'a, SlotType: 'a + Debug> MetaSubscriber<'a, SlotType> for MMapMetaDynamicSubscriber<'a, SlotType> {
fn consume<GetterReturnType: 'a,
GetterFn: FnOnce(&'a SlotType) -> GetterReturnType,
ReportEmptyFn: Fn() -> bool,
ReportLenAfterDequeueingFn: FnOnce(i32)>
(&self,
getter_fn: GetterFn,
report_empty_fn: ReportEmptyFn,
report_len_after_dequeueing_fn: ReportLenAfterDequeueingFn)
-> Option<GetterReturnType> {
let mutable_self = unsafe {&mut *((self as *const Self) as *mut Self)};
let head = self.head.fetch_add(1, Relaxed);
let tail = self.meta_mmap_log_topic.mmap_contents.consumer_tail.load(Relaxed);
if head >= tail {
while self.head.compare_exchange_weak(head+1, head, Relaxed, Relaxed).is_err() {
std::hint::spin_loop();
}
report_empty_fn();
return None;
}
let slot_ref = unsafe { mutable_self.buffer.get_unchecked(head) };
report_len_after_dequeueing_fn((tail - head) as i32);
Some(getter_fn(slot_ref))
}
fn consume_leaking(&'a self) -> Option<(&'a SlotType, u32)> {
todo!()
}
fn release_leaked_ref(&'a self, _slot: &'a SlotType) {
todo!()
}
fn release_leaked_id(&'a self, _slot_id: u32) {
todo!()
}
#[inline(always)]
fn remaining_elements_count(&self) -> usize {
self.meta_mmap_log_topic.mmap_contents.consumer_tail.load(Relaxed) - self.head.load(Relaxed)
}
unsafe fn peek_remaining(&self) -> Vec<&SlotType> {
todo!()
}
}
pub struct MMapMetaFixedSubscriber<'a, SlotType: 'a> {
head: AtomicUsize,
buffer: &'a mut [SlotType],
fixed_tail: usize,
}
impl<'a, SlotType: 'a + Debug> MetaSubscriber<'a, SlotType> for MMapMetaFixedSubscriber<'a, SlotType> {
fn consume<GetterReturnType: 'a,
GetterFn: FnOnce(&'a SlotType) -> GetterReturnType,
ReportEmptyFn: Fn() -> bool,
ReportLenAfterDequeueingFn: FnOnce(i32)>
(&self,
getter_fn: GetterFn,
report_empty_fn: ReportEmptyFn,
report_len_after_dequeueing_fn: ReportLenAfterDequeueingFn)
-> Option<GetterReturnType> {
let mutable_self = unsafe {&mut *((self as *const Self) as *mut Self)};
let head = self.head.fetch_add(1, Relaxed);
if head >= self.fixed_tail {
while self.head.compare_exchange_weak(head+1, head, Relaxed, Relaxed).is_err() {
std::hint::spin_loop();
}
report_empty_fn();
return None;
}
let slot_ref = unsafe { mutable_self.buffer.get_unchecked(head) };
report_len_after_dequeueing_fn((self.fixed_tail - head) as i32);
Some(getter_fn(slot_ref))
}
fn consume_leaking(&'a self) -> Option<(&'a SlotType, u32)> {
todo!()
}
fn release_leaked_ref(&'a self, _slot: &'a SlotType) {
todo!()
}
fn release_leaked_id(&'a self, _slot_id: u32) {
todo!()
}
#[inline(always)]
fn remaining_elements_count(&self) -> usize {
self.fixed_tail - self.head.load(Relaxed)
}
unsafe fn peek_remaining(&self) -> Vec<&SlotType> {
todo!()
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
#[cfg_attr(not(doc),test)]
fn happy_path() {
#[derive(Debug,PartialEq)]
struct MyData {
name: [u8; 254],
name_len: u8,
age: u8,
}
let meta_log_topic = MMapMeta::<MyData>::new("/tmp/happy_path.test.mmap", 4096)
.expect("Instantiating the meta log topic");
let expected_name = "zertyz";
let expected_age = 42;
let mut iter = expected_name.as_bytes().iter();
let my_data = MyData {
name: [0; 254].map(|_| *iter.next().unwrap_or(&0)),
name_len: expected_name.len() as u8,
age: expected_age,
};
let publishing_result = meta_log_topic.publish(|slot| *slot = my_data);
assert!(publishing_result.is_some(), "Publishing failed");
let consumer_1 = meta_log_topic.subscribe_to_joined_old_and_new_events();
let observed_slot_1: &MyData = consumer_1.consume(|slot| unsafe { &*(slot as *const MyData) },
|| false,
|_| {})
.expect("Consuming the element from `consumer_1`");
let observed_name = unsafe { std::slice::from_raw_parts(observed_slot_1.name.as_ptr(), observed_slot_1.name_len as usize) };
let observed_name = String::from_utf8_lossy(observed_name);
assert_eq!(&observed_name, expected_name, "Name doesn't match");
assert_eq!(observed_slot_1.age, expected_age, "Age doesn't match");
let consumer_2 = meta_log_topic.subscribe_to_joined_old_and_new_events();
let observed_slot_2: &MyData = consumer_2.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {})
.expect("Consuming the element from `consumer_2`");
assert_eq!(observed_slot_2 as *const MyData, observed_slot_1 as *const MyData, "These references should point to the same address");
let consumer_3 = meta_log_topic.subscribe_to_new_events_only();
let observed_slot_3 = consumer_3.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {});
assert_eq!(None, observed_slot_3, "`consumer_3` was told not to retrieve old elements...");
let mut iter = expected_name.as_bytes().iter().map(|c| c.to_ascii_uppercase());
let my_data = MyData {
name: [0; 254].map(|_| iter.next().unwrap_or(0)),
name_len: expected_name.len() as u8,
age: 100 - expected_age,
};
let publishing_result = meta_log_topic.publish(|slot| *slot = my_data);
assert!(publishing_result.is_some(), "Publishing of a second element failed");
let observed_slot_1: &MyData = consumer_1.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {})
.expect("Consuming yet another element from `consumer_1`");
let observed_name = unsafe { std::slice::from_raw_parts(observed_slot_1.name.as_ptr(), observed_slot_1.name_len as usize) };
let observed_name = String::from_utf8_lossy(observed_name);
assert_eq!(*observed_name, expected_name.to_ascii_uppercase(), "Name doesn't match");
assert_eq!(100 - observed_slot_1.age, expected_age, "Age doesn't match");
let observed_slot_2: &MyData = consumer_2.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {})
.expect("Consuming yet another element from `consumer_2`");
assert_eq!(observed_slot_2 as *const MyData, observed_slot_1 as *const MyData, "These references should point to the same address");
let observed_slot_3: &MyData = consumer_3.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {})
.expect("Consuming the element from `consumer_3`");
assert_eq!(observed_slot_3 as *const MyData, observed_slot_1 as *const MyData, "These references should point to the same address");
}
#[cfg_attr(not(doc),test)]
fn indefinite_growth() {
const N_ELEMENTS: u64 = 2 * 1024 * 1024;
#[derive(Debug, PartialEq)]
struct MyData {
id: u64,
year_of_birth: u16,
year_of_death: u16,
known_number_of_descendents: u32,
}
let meta_log_topic = MMapMeta::<MyData>::new("/tmp/indefinite_growth.test.mmap", 1024 * 1024 * 1024)
.expect("Instantiating the meta log topic");
let consumer = meta_log_topic.subscribe_to_new_events_only();
for i in 0..N_ELEMENTS {
let publishing_result = meta_log_topic.publish(|slot| *slot = create_expected_element(i));
assert!(publishing_result.is_some(), "Publishing of event #{i} failed");
}
for i in 0..N_ELEMENTS {
let expected_element = create_expected_element(i);
let observed_element = consumer.consume(|slot| unsafe {&*(slot as *const MyData)},
|| false,
|_| {})
.expect("Consuming an element");
assert_eq!(observed_element, &expected_element, "Element #{i} doesn't match");
}
fn create_expected_element(i: u64) -> MyData {
let year_of_birth = 1 + i as u16 % 2023;
let year_of_death = year_of_birth + (i % 120) as u16;
MyData {
id: i,
year_of_birth,
year_of_death,
known_number_of_descendents: ( (i % year_of_birth as u64) * (year_of_death - year_of_birth) as u64 ) as u32,
}
}
}
#[cfg_attr(not(doc),test)]
fn safe_lifetimes<'a>() {
const EXPECTED_ELEMENT: u128 = 1928384756;
let meta_log_topic = MMapMeta::<u128>::new("/tmp/safe_lifetimes.test.mmap", 1024 * 1024 * 1024)
.expect("Instantiating the meta log topic");
let consumer = meta_log_topic.subscribe_to_new_events_only();
meta_log_topic.publish(|slot| *slot = EXPECTED_ELEMENT);
let observed_element = consumer.consume(|slot| unsafe {&*(slot as *const u128)},
|| false,
|_| {})
.expect("Consuming an element");
assert_eq!(observed_element, &EXPECTED_ELEMENT, "Dequeued element doesn't match");
drop(consumer);
}
}