use crate::{
AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
};
use crate::SECTION_HEADER_COMPACT_SIZE;
use AllocatedSection::Section;
use bincode::config::standard;
use bincode::enc::EncoderImpl;
use bincode::enc::write::SliceWriter;
use bincode::error::EncodeError;
use bincode::{Encode, decode_from_slice, encode_into_slice};
use core::slice::from_raw_parts_mut;
use cu29_traits::{
CuError, CuResult, ObservedWriter, UnifiedLogType, abort_observed_encode,
begin_observed_encode, finish_observed_encode,
};
use memmap2::{Mmap, MmapMut};
use std::fs::{File, OpenOptions};
use std::io::Read;
use std::mem::ManuallyDrop;
use std::path::{Path, PathBuf};
use std::{io, mem};
pub struct MmapSectionStorage {
buffer: &'static mut [u8],
offset: usize,
block_size: usize,
}
impl MmapSectionStorage {
pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
Self {
buffer,
offset: 0,
block_size,
}
}
pub fn buffer_ptr(&self) -> *const u8 {
&self.buffer[0] as *const u8
}
}
impl SectionStorage for MmapSectionStorage {
fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
self.post_update_header(header)?;
self.offset = self.block_size;
Ok(self.offset)
}
fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
encode_into_slice(header, &mut self.buffer[0..], standard())
}
fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
begin_observed_encode();
let result = (|| {
let mut encoder = EncoderImpl::new(
ObservedWriter::new(SliceWriter::new(&mut self.buffer[self.offset..])),
standard(),
);
entry.encode(&mut encoder)?;
Ok(encoder.into_writer().into_inner().bytes_written())
})();
let size = match result {
Ok(size) => {
debug_assert_eq!(size, finish_observed_encode());
size
}
Err(err) => {
abort_observed_encode();
return Err(err);
}
};
self.offset += size;
Ok(size)
}
fn flush(&mut self) -> CuResult<usize> {
Ok(self.offset)
}
}
pub enum MmapUnifiedLogger {
Read(MmapUnifiedLoggerRead),
Write(MmapUnifiedLoggerWrite),
}
pub struct MmapUnifiedLoggerBuilder {
file_base_name: Option<PathBuf>,
preallocated_size: Option<usize>,
write: bool,
create: bool,
}
impl Default for MmapUnifiedLoggerBuilder {
fn default() -> Self {
Self::new()
}
}
impl MmapUnifiedLoggerBuilder {
pub fn new() -> Self {
Self {
file_base_name: None,
preallocated_size: None,
write: false,
create: false, }
}
pub fn file_base_name(mut self, file_path: &Path) -> Self {
self.file_base_name = Some(file_path.to_path_buf());
self
}
pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
self.preallocated_size = Some(preallocated_size);
self
}
pub fn write(mut self, write: bool) -> Self {
self.write = write;
self
}
pub fn create(mut self, create: bool) -> Self {
self.create = create;
self
}
pub fn build(self) -> io::Result<MmapUnifiedLogger> {
let page_size = page_size::get();
if self.write && self.create {
let file_path = self.file_base_name.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"File path is required for write mode",
)
})?;
let preallocated_size = self.preallocated_size.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Preallocated size is required for write mode",
)
})?;
let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
Ok(MmapUnifiedLogger::Write(ulw))
} else {
let file_path = self.file_base_name.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
})?;
let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
Ok(MmapUnifiedLogger::Read(ulr))
}
}
}
struct SlabEntry {
file: File,
mmap_buffer: ManuallyDrop<MmapMut>,
current_global_position: usize,
sections_offsets_in_flight: Vec<usize>,
flushed_until_offset: usize,
page_size: usize,
temporary_end_marker: Option<usize>,
#[cfg(test)]
closed_sections: Vec<(usize, usize)>,
#[cfg(test)]
flushed_ranges: Vec<(usize, usize)>,
#[cfg(all(test, feature = "mmap-fsync"))]
sync_call_count: usize,
}
impl Drop for SlabEntry {
fn drop(&mut self) {
self.flush_until(self.current_global_position);
unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
if let Err(error) = self.file.set_len(self.current_global_position as u64) {
eprintln!("Failed to trim datalogger file: {}", error);
}
self.sync_file();
if !self.sections_offsets_in_flight.is_empty() {
eprintln!("Error: Slab not full flushed.");
}
}
}
impl SlabEntry {
fn new(file: File, page_size: usize) -> io::Result<Self> {
let mmap_buffer = ManuallyDrop::new(
unsafe { MmapMut::map_mut(&file) }
.map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
);
Ok(Self {
file,
mmap_buffer,
current_global_position: 0,
sections_offsets_in_flight: Vec::with_capacity(16),
flushed_until_offset: 0,
page_size,
temporary_end_marker: None,
#[cfg(test)]
closed_sections: Vec::new(),
#[cfg(test)]
flushed_ranges: Vec::new(),
#[cfg(all(test, feature = "mmap-fsync"))]
sync_call_count: 0,
})
}
fn flush_range(&mut self, start: usize, len: usize) {
if len == 0 {
return;
}
self.mmap_buffer
.flush_async_range(start, len)
.expect("Failed to flush memory map");
self.sync_file();
#[cfg(test)]
self.record_flushed_range(start, len);
}
fn sync_file(&mut self) {
#[cfg(feature = "mmap-fsync")]
{
self.file.sync_all().expect("Failed to fsync log file");
#[cfg(test)]
{
self.sync_call_count += 1;
}
}
}
fn flush_until(&mut self, until_position: usize) {
if (self.flushed_until_offset == until_position) || (until_position == 0) {
return;
}
self.flush_range(
self.flushed_until_offset,
until_position - self.flushed_until_offset,
);
self.flushed_until_offset = until_position;
}
fn clear_temporary_end_marker(&mut self) {
if let Some(marker_start) = self.temporary_end_marker.take() {
self.current_global_position = marker_start;
if self.flushed_until_offset > marker_start {
self.flushed_until_offset = marker_start;
}
}
}
fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
let marker_start = self.align_to_next_page(self.current_global_position);
let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
if marker_end > self.mmap_buffer.len() {
return Err("Not enough space to write end-of-log marker".into());
}
let header = SectionHeader {
magic: SECTION_MAGIC,
block_size: SECTION_HEADER_COMPACT_SIZE,
entry_type: UnifiedLogType::LastEntry,
offset_to_next_section: total_marker_size as u32,
used: 0,
is_open: temporary,
};
encode_into_slice(
&header,
&mut self.mmap_buffer
[marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
standard(),
)
.map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
self.temporary_end_marker = Some(marker_start);
self.current_global_position = marker_end;
Ok(())
}
fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
let storage = section.get_storage();
let ptr = storage.buffer_ptr();
(ptr >= self.mmap_buffer.as_ptr())
&& (ptr as usize)
< (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
}
fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
section
.post_update_header()
.expect("Failed to update section header");
let storage = section.get_storage();
let ptr = storage.buffer_ptr();
if ptr < self.mmap_buffer.as_ptr()
|| ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
{
panic!("Invalid section buffer, not in the slab");
}
let base = self.mmap_buffer.as_ptr() as usize;
let section_start = ptr as usize - base;
let section_len = section.header.offset_to_next_section as usize;
#[cfg(test)]
self.record_closed_section(section_start, section_len);
self.sections_offsets_in_flight
.retain(|&x| x != section_start);
if self.sections_offsets_in_flight.is_empty() {
self.flush_until(self.current_global_position);
return;
}
let next_open_offset = self.sections_offsets_in_flight[0];
if self.flushed_until_offset < next_open_offset {
self.flush_until(next_open_offset);
}
if section_start + section_len > self.flushed_until_offset {
self.flush_range(section_start, section_len);
}
}
#[cfg(test)]
fn record_closed_section(&mut self, start: usize, len: usize) {
self.closed_sections.push((start, len));
}
#[cfg(test)]
fn record_flushed_range(&mut self, start: usize, len: usize) {
let mut merged_start = start;
let mut merged_end = start + len;
let mut merged_ranges = Vec::with_capacity(self.flushed_ranges.len() + 1);
let mut inserted = false;
for (range_start, range_len) in self.flushed_ranges.drain(..) {
let range_end = range_start + range_len;
if range_end < merged_start {
merged_ranges.push((range_start, range_len));
continue;
}
if merged_end < range_start {
if !inserted {
merged_ranges.push((merged_start, merged_end - merged_start));
inserted = true;
}
merged_ranges.push((range_start, range_len));
continue;
}
merged_start = merged_start.min(range_start);
merged_end = merged_end.max(range_end);
}
if !inserted {
merged_ranges.push((merged_start, merged_end - merged_start));
}
self.flushed_ranges = merged_ranges;
}
#[cfg(test)]
fn pending_closed_bytes(&self) -> usize {
let mut pending = 0;
for (section_start, section_len) in &self.closed_sections {
let section_end = section_start + section_len;
let mut cursor = *section_start;
for (range_start, range_len) in &self.flushed_ranges {
let range_end = range_start + range_len;
if range_end <= cursor {
continue;
}
if *range_start >= section_end {
break;
}
if *range_start > cursor {
pending += *range_start - cursor;
}
cursor = cursor.max(range_end);
if cursor >= section_end {
break;
}
}
if cursor < section_end {
pending += section_end - cursor;
}
}
pending
}
#[inline]
fn align_to_next_page(&self, ptr: usize) -> usize {
(ptr + self.page_size - 1) & !(self.page_size - 1)
}
fn add_section(
&mut self,
entry_type: UnifiedLogType,
requested_section_size: usize,
) -> AllocatedSection<MmapSectionStorage> {
self.current_global_position = self.align_to_next_page(self.current_global_position);
let section_size = self.align_to_next_page(requested_section_size) as u32;
if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
return AllocatedSection::NoMoreSpace;
}
#[cfg(feature = "compact")]
let block_size = SECTION_HEADER_COMPACT_SIZE;
#[cfg(not(feature = "compact"))]
let block_size = self.page_size as u16;
let section_header = SectionHeader {
magic: SECTION_MAGIC,
block_size,
entry_type,
offset_to_next_section: section_size,
used: 0u32,
is_open: true,
};
self.sections_offsets_in_flight
.push(self.current_global_position);
let end_of_section = self.current_global_position + requested_section_size;
let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
let handle_buffer =
unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
self.current_global_position = end_of_section;
Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
}
#[cfg(test)]
fn used(&self) -> usize {
self.current_global_position
}
}
pub struct MmapUnifiedLoggerWrite {
front_slab: SlabEntry,
back_slabs: Vec<SlabEntry>,
base_file_path: PathBuf,
slab_size: usize,
front_slab_suffix: usize,
}
fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
let mut file_path = base_file_path.to_path_buf();
let stem = file_path.file_stem().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Base file path has no file name",
)
})?;
let stem = stem.to_str().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Base file name is not valid UTF-8",
)
})?;
let extension = file_path.extension().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Base file path has no extension",
)
})?;
let extension = extension.to_str().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Base file extension is not valid UTF-8",
)
})?;
if stem.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Base file name is empty",
));
}
let file_name = format!("{stem}_{slab_index}.{extension}");
file_path.set_file_name(file_name);
Ok(file_path)
}
fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
let file_path = build_slab_path(base_file_path, slab_suffix)?;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&file_path)
.map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to open file {}: {e}", file_path.display()),
)
})?;
file.set_len(slab_size as u64).map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to set file length for {}: {e}", file_path.display()),
)
})?;
Ok(file)
}
fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
match std::fs::symlink_metadata(base_file_path) {
Ok(meta) => {
if meta.is_dir() {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!(
"Cannot create base log alias at {} because a directory already exists there",
base_file_path.display()
),
));
}
std::fs::remove_file(base_file_path).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"Failed to remove existing base log alias {}: {e}",
base_file_path.display()
),
)
})
}
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(io::Error::new(
e.kind(),
format!(
"Failed to inspect existing base log alias {}: {e}",
base_file_path.display()
),
)),
}
}
fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
let first_slab_path = build_slab_path(base_file_path, 0)?;
remove_existing_alias(base_file_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::symlink;
let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"First slab file has no name component",
)
})?);
symlink(relative_target, base_file_path).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"Failed to create base log alias {} -> {}: {e}",
base_file_path.display(),
first_slab_path.display()
),
)
})
}
#[cfg(windows)]
{
use std::os::windows::fs::symlink_file;
let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"First slab file has no name component",
)
})?);
match symlink_file(relative_target, base_file_path) {
Ok(()) => Ok(()),
Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
|hard_link_err| {
io::Error::other(format!(
"Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
base_file_path.display()
))
},
),
}?;
Ok(())
}
#[cfg(not(any(unix, windows)))]
{
std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"Failed to create base log alias {} -> {}: {e}",
base_file_path.display(),
first_slab_path.display()
),
)
})
}
}
impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
fn add_section(
&mut self,
entry_type: UnifiedLogType,
requested_section_size: usize,
) -> CuResult<SectionHandle<MmapSectionStorage>> {
self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
let maybe_section = self
.front_slab
.add_section(entry_type, requested_section_size);
match maybe_section {
AllocatedSection::NoMoreSpace => {
let new_slab = self.create_slab()?;
self.back_slabs
.push(mem::replace(&mut self.front_slab, new_slab));
match self
.front_slab
.add_section(entry_type, requested_section_size)
{
AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
Section(section) => {
self.place_end_marker(true)?;
Ok(section)
}
}
}
Section(section) => {
self.place_end_marker(true)?;
Ok(section)
}
}
}
fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
section.mark_closed();
for slab in self.back_slabs.iter_mut() {
if slab.is_it_my_section(section) {
slab.flush_section(section);
return;
}
}
self.front_slab.flush_section(section);
}
fn status(&self) -> UnifiedLogStatus {
UnifiedLogStatus {
total_used_space: self.front_slab.current_global_position,
total_allocated_space: self.slab_size * self.front_slab_suffix,
}
}
}
impl MmapUnifiedLoggerWrite {
fn next_slab(&mut self) -> io::Result<File> {
let next_suffix = self.front_slab_suffix + 1;
let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
self.front_slab_suffix = next_suffix;
Ok(file)
}
fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
let file = make_slab_file(base_file_path, slab_size, 0)?;
create_base_alias_link(base_file_path)?;
let mut front_slab = SlabEntry::new(file, page_size)?;
let main_header = MainHeader {
magic: MAIN_MAGIC,
first_section_offset: page_size as u16,
page_size: page_size as u16,
};
let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
.map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
assert!(nb_bytes < page_size);
front_slab.current_global_position = page_size;
Ok(Self {
front_slab,
back_slabs: Vec::new(),
base_file_path: base_file_path.to_path_buf(),
slab_size,
front_slab_suffix: 0,
})
}
fn garbage_collect_backslabs(&mut self) {
self.back_slabs
.retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
}
fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
match self.front_slab.write_end_marker(temporary) {
Ok(_) => Ok(()),
Err(_) => {
let new_slab = self.create_slab()?;
self.back_slabs
.push(mem::replace(&mut self.front_slab, new_slab));
self.front_slab.write_end_marker(temporary)
}
}
}
pub fn stats(&self) -> (usize, Vec<usize>, usize) {
(
self.front_slab.current_global_position,
self.front_slab.sections_offsets_in_flight.clone(),
self.back_slabs.len(),
)
}
fn create_slab(&mut self) -> CuResult<SlabEntry> {
let file = self
.next_slab()
.map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
SlabEntry::new(file, self.front_slab.page_size)
.map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
}
}
impl Drop for MmapUnifiedLoggerWrite {
fn drop(&mut self) {
#[cfg(debug_assertions)]
eprintln!("Flushing the unified Logger ... ");
self.front_slab.clear_temporary_end_marker();
if let Err(e) = self.place_end_marker(false) {
panic!("Failed to flush the unified logger: {}", e);
}
self.front_slab
.flush_until(self.front_slab.current_global_position);
self.garbage_collect_backslabs();
#[cfg(debug_assertions)]
eprintln!("Unified Logger flushed."); }
}
fn open_slab_index(
base_file_path: &Path,
slab_index: usize,
) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
let mut options = OpenOptions::new();
let options = options.read(true);
let file_path = build_slab_path(base_file_path, slab_index)?;
let file = options.open(&file_path).map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to open slab file {}: {e}", file_path.display()),
)
})?;
let mmap = unsafe { Mmap::map(&file) }
.map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
let mut prolog = 0u16;
let mut maybe_main_header: Option<MainHeader> = None;
if slab_index == 0 {
let main_header: MainHeader;
let _read: usize;
(main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("Failed to decode main header: {e}"),
)
})?;
if main_header.magic != MAIN_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid magic number in main header",
));
}
prolog = main_header.first_section_offset;
maybe_main_header = Some(main_header);
}
Ok((file, mmap, prolog, maybe_main_header))
}
pub struct MmapUnifiedLoggerRead {
base_file_path: PathBuf,
main_header: MainHeader,
current_mmap_buffer: Mmap,
current_file: File,
current_slab_index: usize,
current_reading_position: usize,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct LogPosition {
pub slab_index: usize,
pub offset: usize,
}
impl UnifiedLogRead for MmapUnifiedLoggerRead {
fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
loop {
if self.current_reading_position >= self.current_mmap_buffer.len() {
self.next_slab().map_err(|e| {
CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
})?;
}
let header_result = self.read_section_header();
let header = header_result.map_err(|error| {
CuError::new_with_cause(
&format!(
"Could not read a sections header: {}/{}:{}",
self.base_file_path.as_os_str().to_string_lossy(),
self.current_slab_index,
self.current_reading_position,
),
error,
)
})?;
if header.entry_type == UnifiedLogType::LastEntry {
return Ok(None);
}
if header.entry_type == datalogtype {
let result = Some(self.read_section_content(&header)?);
self.current_reading_position += header.offset_to_next_section as usize;
return Ok(result);
}
self.current_reading_position += header.offset_to_next_section as usize;
}
}
fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
if self.current_reading_position >= self.current_mmap_buffer.len() {
self.next_slab().map_err(|e| {
CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
})?;
}
let read_result = self.read_section_header();
match read_result {
Err(error) => Err(CuError::new_with_cause(
&format!(
"Could not read a sections header: {}/{}:{}",
self.base_file_path.as_os_str().to_string_lossy(),
self.current_slab_index,
self.current_reading_position,
),
error,
)),
Ok(header) => {
let data = self.read_section_content(&header)?;
self.current_reading_position += header.offset_to_next_section as usize;
Ok((header, data))
}
}
}
}
impl MmapUnifiedLoggerRead {
pub fn new(base_file_path: &Path) -> io::Result<Self> {
let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
let main_header = header.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
})?;
Ok(Self {
base_file_path: base_file_path.to_path_buf(),
main_header,
current_file: file,
current_mmap_buffer: mmap,
current_slab_index: 0,
current_reading_position: prolog as usize,
})
}
pub fn position(&self) -> LogPosition {
LogPosition {
slab_index: self.current_slab_index,
offset: self.current_reading_position,
}
}
pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
if pos.slab_index != self.current_slab_index {
let (file, mmap, _prolog, _header) =
open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
CuError::new_with_cause(
&format!("Failed to open slab {} for seek", pos.slab_index),
e,
)
})?;
self.current_file = file;
self.current_mmap_buffer = mmap;
self.current_slab_index = pos.slab_index;
}
self.current_reading_position = pos.offset;
Ok(())
}
fn next_slab(&mut self) -> io::Result<()> {
self.current_slab_index += 1;
let (file, mmap, prolog, _) =
open_slab_index(&self.base_file_path, self.current_slab_index)?;
self.current_file = file;
self.current_mmap_buffer = mmap;
self.current_reading_position = prolog as usize;
Ok(())
}
pub fn raw_main_header(&self) -> &MainHeader {
&self.main_header
}
pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
let mut total = 0u64;
loop {
if self.current_reading_position >= self.current_mmap_buffer.len() {
self.next_slab().map_err(|e| {
CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
})?;
}
let header = self.read_section_header()?;
if header.entry_type == UnifiedLogType::LastEntry {
return Ok(total);
}
if header.entry_type == datalogtype {
total = total.saturating_add(header.used as u64);
}
self.current_reading_position += header.offset_to_next_section as usize;
}
}
fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
let mut section_data = vec![0; header.used as usize];
let start_of_data = self.current_reading_position + header.block_size as usize;
section_data.copy_from_slice(
&self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
);
Ok(section_data)
}
fn read_section_header(&mut self) -> CuResult<SectionHeader> {
let section_header: SectionHeader;
(section_header, _) = decode_from_slice(
&self.current_mmap_buffer[self.current_reading_position..],
standard(),
)
.map_err(|e| {
CuError::new_with_cause(
&format!(
"Could not read a sections header: {}/{}:{}",
self.base_file_path.as_os_str().to_string_lossy(),
self.current_slab_index,
self.current_reading_position,
),
e,
)
})?;
if section_header.magic != SECTION_MAGIC {
return Err("Invalid magic number in section header".into());
}
Ok(section_header)
}
}
pub struct UnifiedLoggerIOReader {
logger: MmapUnifiedLoggerRead,
log_type: UnifiedLogType,
buffer: Vec<u8>,
buffer_pos: usize,
}
impl UnifiedLoggerIOReader {
pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
Self {
logger,
log_type,
buffer: Vec::new(),
buffer_pos: 0,
}
}
fn fill_buffer(&mut self) -> io::Result<bool> {
match self.logger.read_next_section_type(self.log_type) {
Ok(Some(section)) => {
self.buffer = section;
self.buffer_pos = 0;
Ok(true)
}
Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
}
}
}
impl Read for UnifiedLoggerIOReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
return Ok(0);
}
if self.buffer_pos >= self.buffer.len() {
return Ok(0);
}
let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
self.buffer_pos += len;
Ok(len)
}
}
#[cfg(feature = "std")]
#[cfg(test)]
mod tests {
use super::*;
use crate::stream_write;
use bincode::de::read::SliceReader;
use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
use cu29_traits::WriteStream;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024;
fn make_a_logger(
tmp_dir: &TempDir,
slab_size: usize,
) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
let file_path = tmp_dir.path().join("test.bin");
let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_base_name(&file_path)
.preallocated_size(slab_size)
.build()
.expect("Failed to create logger")
else {
panic!("Failed to create logger")
};
(Arc::new(Mutex::new(data_logger)), file_path)
}
#[test]
fn test_truncation_and_sections_creations() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let file_path = tmp_dir.path().join("test.bin");
let _used = {
let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_base_name(&file_path)
.preallocated_size(100000)
.build()
.expect("Failed to create logger")
else {
panic!("Failed to create logger")
};
logger
.add_section(UnifiedLogType::StructuredLogLine, 1024)
.unwrap();
logger
.add_section(UnifiedLogType::CopperList, 2048)
.unwrap();
let used = logger.front_slab.used();
assert!(used < 4 * page_size::get());
used
};
let _file = OpenOptions::new()
.read(true)
.open(tmp_dir.path().join("test_0.bin"))
.expect("Could not reopen the file");
}
#[test]
fn test_base_alias_exists_and_matches_first_slab() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let file_path = tmp_dir.path().join("test.bin");
let _logger = MmapUnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_base_name(&file_path)
.preallocated_size(LARGE_SLAB)
.build()
.expect("Failed to create logger");
let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
assert!(file_path.exists(), "base alias does not exist");
assert!(first_slab.exists(), "first slab does not exist");
let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
assert_eq!(alias_bytes, slab_bytes);
}
#[test]
fn test_one_section_self_cleaning() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let _stream = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
1
);
}
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
0
);
let logger = logger.lock().unwrap();
assert_eq!(
logger.front_slab.flushed_until_offset,
logger.front_slab.current_global_position
);
}
#[test]
fn test_temporary_end_marker_is_created() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let mut stream = stream_write::<u32, MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
)
.unwrap();
stream.log(&42u32).unwrap();
}
let logger_guard = logger.lock().unwrap();
let slab = &logger_guard.front_slab;
let marker_start = slab
.temporary_end_marker
.expect("temporary end-of-log marker missing");
let (eof_header, _) =
decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
.expect("Could not decode end-of-log marker header");
assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
assert!(eof_header.is_open);
assert_eq!(eof_header.used, 0);
}
#[test]
fn test_final_end_marker_is_not_temporary() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let mut stream = stream_write::<u32, MmapSectionStorage>(
logger.clone(),
UnifiedLogType::CopperList,
1024,
)
.unwrap();
stream.log(&1u32).unwrap();
}
drop(logger);
let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
.file_base_name(&f)
.build()
.expect("Failed to build reader")
else {
panic!("Failed to create reader");
};
loop {
let (header, _data) = reader
.raw_read_section()
.expect("Failed to read section while searching for EOF");
if header.entry_type == UnifiedLogType::LastEntry {
assert!(!header.is_open);
break;
}
}
}
#[test]
fn test_two_sections_self_cleaning_in_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
let s1 = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
1
);
let s2 = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
2
);
drop(s2);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
1
);
drop(s1);
let lg = logger.lock().unwrap();
assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
assert_eq!(
lg.front_slab.flushed_until_offset,
lg.front_slab.current_global_position
);
}
#[test]
fn test_two_sections_self_cleaning_out_of_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
let s1 = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
1
);
let s2 = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
2
);
drop(s1);
assert_eq!(
logger
.lock()
.unwrap()
.front_slab
.sections_offsets_in_flight
.len(),
1
);
drop(s2);
let lg = logger.lock().unwrap();
assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
assert_eq!(
lg.front_slab.flushed_until_offset,
lg.front_slab.current_global_position
);
}
#[test]
fn test_closed_section_flushes_behind_open_earlier_section() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
let s1 = stream_write::<(), MmapSectionStorage>(
logger.clone(),
UnifiedLogType::StructuredLogLine,
1024,
)
.unwrap();
{
let mut s2 = stream_write::<u32, MmapSectionStorage>(
logger.clone(),
UnifiedLogType::CopperList,
1024,
)
.unwrap();
s2.log(&42u32).unwrap();
}
let logger_guard = logger.lock().unwrap();
assert_eq!(logger_guard.front_slab.sections_offsets_in_flight.len(), 1);
assert!(
logger_guard.front_slab.flushed_until_offset
< logger_guard.front_slab.current_global_position
);
assert_eq!(logger_guard.front_slab.pending_closed_bytes(), 0);
drop(logger_guard);
drop(s1);
}
#[test]
fn test_write_then_read_one_section() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let mut stream =
stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
stream.log(&1u32).unwrap();
stream.log(&2u32).unwrap();
stream.log(&3u32).unwrap();
}
drop(logger);
let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
.file_base_name(&f)
.build()
.expect("Failed to build logger")
else {
panic!("Failed to build logger");
};
let section = dl
.read_next_section_type(UnifiedLogType::StructuredLogLine)
.expect("Failed to read section");
assert!(section.is_some());
let section = section.unwrap();
let mut reader = SliceReader::new(§ion[..]);
let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
assert_eq!(v3, 3);
}
#[cfg(feature = "mmap-fsync")]
#[test]
fn test_fsync_feature_syncs_on_section_flush() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let mut stream =
stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
stream.log(&1u32).unwrap();
}
let logger = logger.lock().unwrap();
assert!(
logger.front_slab.sync_call_count > 0,
"expected mmap-fsync to issue at least one sync_all call"
);
}
#[derive(Debug, Encode, Decode)]
enum CopperListStateMock {
Free,
ProcessingTasks,
BeingSerialized,
}
#[derive(Encode, Decode)]
struct CopperList<P: bincode::enc::Encode> {
state: CopperListStateMock,
payload: P, }
#[test]
fn test_copperlist_list_like_logging() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
{
let mut stream =
stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
let cl0 = CopperList {
state: CopperListStateMock::Free,
payload: (1u32, 2u32, 3u32),
};
let cl1 = CopperList {
state: CopperListStateMock::ProcessingTasks,
payload: (4u32, 5u32, 6u32),
};
stream.log(&cl0).unwrap();
stream.log(&cl1).unwrap();
}
drop(logger);
let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
.file_base_name(&f)
.build()
.expect("Failed to build logger")
else {
panic!("Failed to build logger");
};
let section = dl
.read_next_section_type(UnifiedLogType::CopperList)
.expect("Failed to read section");
assert!(section.is_some());
let section = section.unwrap();
let mut reader = SliceReader::new(§ion[..]);
let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
assert_eq!(cl0.payload.1, 2);
assert_eq!(cl1.payload.2, 6);
}
#[test]
fn test_multi_slab_end2end() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
{
let mut stream =
stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
let cl0 = CopperList {
state: CopperListStateMock::Free,
payload: (1u32, 2u32, 3u32),
};
for _ in 0..10000 {
stream.log(&cl0).unwrap();
}
}
drop(logger);
let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
.file_base_name(&f)
.build()
.expect("Failed to build logger")
else {
panic!("Failed to build logger");
};
let mut total_readback = 0;
loop {
let section = dl.read_next_section_type(UnifiedLogType::CopperList);
if section.is_err() {
break;
}
let section = section.unwrap();
if section.is_none() {
break;
}
let section = section.unwrap();
let mut reader = SliceReader::new(§ion[..]);
loop {
let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
decode_from_reader(&mut reader, standard());
if maybe_cl.is_ok() {
total_readback += 1;
} else {
break;
}
}
}
assert_eq!(total_readback, 10000);
}
}