use libc;
use memmap2::{Mmap, MmapMut, RemapOptions};
use std::fmt::{Debug, Formatter};
use std::fs::{File, OpenOptions};
use std::io::Read;
use std::path::{Path, PathBuf};
use std::slice::from_raw_parts_mut;
use std::sync::{Arc, Mutex};
use std::{io, mem};
use bincode::config::standard;
use bincode::decode_from_slice;
use bincode::encode_into_slice;
use bincode::error::EncodeError;
use bincode::{Decode, Encode};
use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
#[derive(Encode, Decode, Debug)]
struct MainHeader {
magic: [u8; 4], first_section_offset: u16, page_size: u16,
}
#[derive(Encode, Decode, Debug)]
pub struct SectionHeader {
magic: [u8; 2], entry_type: UnifiedLogType,
section_size: u32, filled_size: u32, }
const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; impl Default for SectionHeader {
fn default() -> Self {
Self {
magic: SECTION_MAGIC,
entry_type: UnifiedLogType::Empty,
section_size: 0,
filled_size: 0,
}
}
}
struct MmapStream {
entry_type: UnifiedLogType,
parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
current_section: SectionHandle,
current_position: usize,
minimum_allocation_amount: usize,
}
impl MmapStream {
fn new(
entry_type: UnifiedLogType,
parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
minimum_allocation_amount: usize,
) -> Self {
let section = parent_logger
.lock()
.unwrap()
.add_section(entry_type, minimum_allocation_amount);
Self {
entry_type,
parent_logger,
current_section: section,
current_position: 0,
minimum_allocation_amount,
}
}
}
impl Debug for MmapStream {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
}
}
impl<E: Encode> WriteStream<E> for MmapStream {
fn log(&mut self, obj: &E) -> CuResult<()> {
let dst = self.current_section.get_user_buffer();
let result = encode_into_slice(obj, dst, standard());
match result {
Ok(nb_bytes) => {
self.current_position += nb_bytes;
self.current_section.used += nb_bytes as u32;
Ok(())
}
Err(e) => match e {
EncodeError::UnexpectedEnd => {
let mut logger_guard = self.parent_logger.lock().unwrap();
self.current_section =
logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);
let result = encode_into_slice(
obj,
self.current_section.get_user_buffer(),
standard(),
)
.expect(
"Failed to encode object in a newly minted section. Unrecoverable failure.",
); self.current_position += result;
self.current_section.used += result as u32;
Ok(())
}
_ => {
let err =
<&str as Into<CuError>>::into("Unexpected error while encoding object.")
.add_cause(e.to_string().as_str());
Err(err)
}
},
}
}
}
impl Drop for MmapStream {
fn drop(&mut self) {
let mut logger_guard = self.parent_logger.lock().unwrap();
logger_guard.unlock_section(&mut self.current_section);
mem::take(&mut self.current_section);
}
}
pub fn stream_write<E: Encode>(
logger: Arc<Mutex<UnifiedLoggerWrite>>,
entry_type: UnifiedLogType,
minimum_allocation_amount: usize,
) -> impl WriteStream<E> {
MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
}
const DEFAULT_LOGGER_SIZE: usize = 1024 * 1024 * 1024; pub enum UnifiedLogger {
Read(UnifiedLoggerRead),
Write(UnifiedLoggerWrite),
}
pub struct UnifiedLoggerBuilder {
file_path: Option<PathBuf>,
preallocated_size: Option<usize>,
write: bool,
create: bool,
}
impl UnifiedLoggerBuilder {
pub fn new() -> Self {
Self {
file_path: None,
preallocated_size: None,
write: false,
create: false, }
}
pub fn file_path(mut self, file_path: &Path) -> Self {
self.file_path = Some(file_path.to_path_buf());
self
}
pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
self.preallocated_size = Some(preallocated_size);
self
}
pub fn write(mut self, write: bool) -> Self {
self.write = write;
self
}
pub fn create(mut self, create: bool) -> Self {
self.create = create;
self
}
pub fn build(self) -> io::Result<UnifiedLogger> {
if self.create && !self.write {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Cannot create a read-only file",
));
}
let file_path = self
.file_path
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "File path is required"))?;
let mut options = OpenOptions::new();
let mut options = options.read(true);
if self.write {
options = options.write(true);
if self.create {
options = options.create(true);
} else {
options = options.append(true);
}
}
let file = options.open(file_path)?;
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };
if self.write && self.create {
if let Some(size) = self.preallocated_size {
file.set_len(size as u64)?;
} else {
file.set_len(DEFAULT_LOGGER_SIZE as u64)?;
}
let mut mmap = unsafe { MmapMut::map_mut(&file) }?;
let main_header = MainHeader {
magic: MAIN_MAGIC,
first_section_offset: page_size as u16,
page_size: page_size as u16,
};
let nb_bytes = encode_into_slice(&main_header, &mut mmap[..], standard())
.expect("Failed to encode main header");
assert!(nb_bytes < page_size);
Ok(UnifiedLogger::Write(UnifiedLoggerWrite {
file,
mmap_buffer: mmap,
page_size,
current_global_position: page_size,
sections_in_flight: Vec::with_capacity(16),
flushed_until: 0,
}))
} else {
let mmap = unsafe { Mmap::map(&file) }?;
let main_header: MainHeader;
let _read: usize;
(main_header, _read) =
decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
if main_header.magic != MAIN_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid magic number in main header",
));
}
Ok(UnifiedLogger::Read(UnifiedLoggerRead {
file,
mmap_buffer: mmap,
reading_position: main_header.first_section_offset as usize,
page_size: main_header.page_size as usize,
}))
}
}
}
pub struct UnifiedLoggerRead {
#[allow(dead_code)]
file: File,
mmap_buffer: Mmap,
reading_position: usize,
#[allow(dead_code)]
page_size: usize,
}
pub struct UnifiedLoggerWrite {
file: File,
mmap_buffer: MmapMut,
page_size: usize,
current_global_position: usize,
sections_in_flight: Vec<usize>,
flushed_until: usize,
}
pub struct SectionHandle {
section_header: SectionHeader,
buffer: &'static mut [u8], used: u32, }
impl Default for SectionHandle {
fn default() -> Self {
Self {
section_header: SectionHeader::default(),
buffer: &mut [],
used: 0,
}
}
}
impl SectionHandle {
pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
panic!("Invalid section buffer, magic number not found");
}
if buffer.len() < MAX_HEADER_SIZE {
panic!(
"Invalid section buffer, too small: {}, it needs to be > {}",
buffer.len(),
MAX_HEADER_SIZE
);
}
Self {
section_header,
buffer,
used: 0,
}
}
pub fn get_user_buffer(&mut self) -> &mut [u8] {
&mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
}
}
impl Drop for SectionHandle {
fn drop(&mut self) {
if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
return;
}
self.section_header.filled_size = self.used;
let _sz = encode_into_slice(&self.section_header, &mut self.buffer, standard())
.expect("Failed to encode section header");
}
}
impl UnifiedLoggerWrite {
fn unsure_size(&mut self, size: usize) -> io::Result<()> {
if size > self.mmap_buffer.len() {
let ropts = RemapOptions::default().may_move(false);
self.file
.set_len(size as u64)
.expect("Failed to extend file");
unsafe { self.mmap_buffer.remap(size, ropts) }?;
}
Ok(())
}
fn flush_until(&mut self, position: usize) {
self.mmap_buffer
.flush_async_range(self.flushed_until, position)
.expect("Failed to flush memory map");
self.flushed_until = position;
}
fn flush(&mut self) {
self.flush_until(self.current_global_position);
}
fn unlock_section(&mut self, section: &mut SectionHandle) {
let base = self.mmap_buffer.as_mut_ptr() as usize;
let section_buffer_addr = section.buffer.as_mut_ptr() as usize;
self.sections_in_flight
.retain(|&x| x != section_buffer_addr - base);
if self.sections_in_flight.is_empty() {
self.flush_until(self.current_global_position);
return;
}
if self.flushed_until < self.sections_in_flight[0] {
self.flush_until(self.sections_in_flight[0]);
}
}
#[inline]
fn align_to_next_page(&self, ptr: usize) -> usize {
(ptr + self.page_size - 1) & !(self.page_size - 1)
}
fn add_section(
&mut self,
entry_type: UnifiedLogType,
requested_section_size: usize,
) -> SectionHandle {
self.current_global_position = self.align_to_next_page(self.current_global_position);
let section_size = self.align_to_next_page(requested_section_size) as u32;
self.unsure_size(self.current_global_position + requested_section_size + self.page_size)
.expect("Failed to resize memory map");
let section_header = SectionHeader {
magic: SECTION_MAGIC,
entry_type,
section_size,
filled_size: 0u32,
};
let nb_bytes = encode_into_slice(
§ion_header,
&mut self.mmap_buffer[self.current_global_position..],
standard(),
)
.expect("Failed to encode section header");
assert!(nb_bytes < self.page_size);
self.sections_in_flight.push(self.current_global_position);
let end_of_section = self.current_global_position + requested_section_size;
let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
let handle_buffer =
unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
self.current_global_position = end_of_section;
println!("Log Used {}MB", self.current_global_position / 1024 / 1024);
SectionHandle::create(section_header, handle_buffer)
}
#[allow(dead_code)]
#[inline]
fn allocated_len(&self) -> usize {
self.mmap_buffer.len()
}
#[allow(dead_code)]
#[inline]
fn used(&self) -> usize {
self.current_global_position
}
}
impl Drop for UnifiedLoggerWrite {
fn drop(&mut self) {
let section = self.add_section(UnifiedLogType::LastEntry, 80); drop(section);
self.file
.set_len(self.current_global_position as u64)
.expect("Failed to trim datalogger file");
self.flush();
}
}
impl UnifiedLoggerRead {
pub fn read_next_section_type(
&mut self,
datalogtype: UnifiedLogType,
) -> CuResult<Option<Vec<u8>>> {
loop {
let header_result = self.read_section_header();
if let Err(error) = header_result {
return Err(CuError::new_with_cause(
"Could not read a sections header",
error,
));
};
let header = header_result.unwrap();
if header.entry_type == UnifiedLogType::LastEntry {
return Ok(None);
}
if header.entry_type == datalogtype {
let result = Some(self.read_section_content(&header)?);
self.reading_position += header.section_size as usize;
return Ok(result);
}
self.reading_position += header.section_size as usize;
if self.reading_position >= self.mmap_buffer.len() {
return Err("Corrupted Log, past end of file".into());
}
}
}
pub fn read_section(&mut self) -> CuResult<Vec<u8>> {
let read_result = self.read_section_header();
if let Err(error) = read_result {
return Err(CuError::new_with_cause(
"Could not read a sections header",
error,
));
};
self.read_section_content(&read_result.unwrap())
}
fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
let mut section = vec![0; header.filled_size as usize];
let start_of_data = self.reading_position + MAX_HEADER_SIZE;
section.copy_from_slice(
&self.mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
);
Ok(section)
}
fn read_section_header(&mut self) -> CuResult<SectionHeader> {
let section_header: SectionHeader;
(section_header, _) =
decode_from_slice(&self.mmap_buffer[self.reading_position..], standard())
.expect("Failed to decode section header");
if section_header.magic != SECTION_MAGIC {
return Err("Invalid magic number in section header".into());
}
Ok(section_header)
}
}
pub struct UnifiedLoggerIOReader {
logger: UnifiedLoggerRead,
log_type: UnifiedLogType,
buffer: Vec<u8>,
buffer_pos: usize,
}
impl UnifiedLoggerIOReader {
pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
Self {
logger,
log_type,
buffer: Vec::new(),
buffer_pos: 0,
}
}
fn fill_buffer(&mut self) -> io::Result<bool> {
match self.logger.read_next_section_type(self.log_type) {
Ok(Some(section)) => {
self.buffer = section;
self.buffer_pos = 0;
Ok(true)
}
Ok(None) => Ok(false), Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}
}
impl Read for UnifiedLoggerIOReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
return Ok(0);
}
if self.buffer_pos >= self.buffer.len() {
return Ok(0);
}
let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
self.buffer_pos += len;
Ok(len)
}
}
#[cfg(test)]
mod tests {
use super::*;
use bincode::decode_from_reader;
use std::io::BufReader;
use std::path::PathBuf;
use tempfile::TempDir;
fn make_a_logger(tmp_dir: &TempDir) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
let file_path = tmp_dir.path().join("test.bin");
let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_path(&file_path)
.preallocated_size(100000)
.build()
.expect("Failed to create logger")
else {
panic!("Failed to create logger")
};
(Arc::new(Mutex::new(data_logger)), file_path)
}
#[test]
fn test_truncation_and_sections_creations() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let file_path = tmp_dir.path().join("test.bin");
let _used = {
let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_path(&file_path)
.preallocated_size(100000)
.build()
.expect("Failed to create logger")
else {
panic!("Failed to create logger")
};
logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
logger.add_section(UnifiedLogType::CopperList, 2048);
let used = logger.used();
assert!(used < 4 * 4096); used
};
let _file = OpenOptions::new()
.read(true)
.open(file_path)
.expect("Could not reopen the file");
}
#[test]
fn test_one_section_self_cleaning() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
{
let _stream =
stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
}
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 0);
let logger = logger.lock().unwrap();
assert_eq!(logger.flushed_until, logger.current_global_position);
}
#[test]
fn test_two_sections_self_cleaning_in_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s2);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
drop(s1);
let lg = logger.lock().unwrap();
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}
#[test]
fn test_two_sections_self_cleaning_out_of_order() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, _) = make_a_logger(&tmp_dir);
let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 2);
drop(s1);
assert_eq!(logger.lock().unwrap().sections_in_flight.len(), 1);
drop(s2);
let lg = logger.lock().unwrap();
assert_eq!(lg.sections_in_flight.len(), 0);
assert_eq!(lg.flushed_until, lg.current_global_position);
}
#[test]
fn test_write_then_read_one_section() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir);
{
let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
stream.log(&1u32).unwrap();
stream.log(&2u32).unwrap();
stream.log(&3u32).unwrap();
}
drop(logger);
let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
.file_path(&f.to_path_buf())
.build()
.expect("Failed to build logger")
else {
panic!("Failed to build logger");
};
let section = dl
.read_next_section_type(UnifiedLogType::StructuredLogLine)
.expect("Failed to read section");
assert!(section.is_some());
let section = section.unwrap();
let mut reader = BufReader::new(§ion[..]);
let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
assert_eq!(v3, 3);
}
#[derive(Debug, Encode, Decode)]
enum CopperListStateMock {
Free,
ProcessingTasks,
BeingSerialized,
}
#[derive(Encode, Decode)]
struct CopperList<P: bincode::enc::Encode> {
state: CopperListStateMock,
payload: P, }
#[test]
fn test_copperlist_list_like_logging() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let (logger, f) = make_a_logger(&tmp_dir);
{
let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
let cl0 = CopperList {
state: CopperListStateMock::Free,
payload: (1u32, 2u32, 3u32),
};
let cl1 = CopperList {
state: CopperListStateMock::ProcessingTasks,
payload: (4u32, 5u32, 6u32),
};
stream.log(&cl0).unwrap();
stream.log(&cl1).unwrap();
}
drop(logger);
let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
.file_path(&f.to_path_buf())
.build()
.expect("Failed to build logger")
else {
panic!("Failed to build logger");
};
let section = dl
.read_next_section_type(UnifiedLogType::CopperList)
.expect("Failed to read section");
assert!(section.is_some());
let section = section.unwrap();
let mut reader = BufReader::new(§ion[..]);
let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
assert_eq!(cl0.payload.1, 2);
assert_eq!(cl1.payload.2, 6);
}
#[test]
#[ignore = "will be fixed later"]
fn test_mmap_resize() {
let tmp_dir = TempDir::new().expect("could not create a tmp dir");
let file_path = tmp_dir.path().join("test.bin");
let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
.write(true)
.create(true)
.file_path(&file_path)
.preallocated_size(8000) .build()
.expect("Failed to create logger")
else {
panic!("Failed to create logger")
};
let handler = logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
handler.buffer[0] = 42;
for _ in 0..100 {
logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
}
println!("AF: {}", handler.buffer[0]);
}
}