use crate::{Reader, RingBuffer, Writer};
use memmap2::{Mmap, MmapMut, MmapOptions};
use std::hint;
use std::ops::{Deref, DerefMut};
use std::path::Path;
pub struct MappedWriter {
writer: Writer,
#[allow(dead_code)]
mmap: MmapMut,
}
impl Deref for MappedWriter {
type Target = Writer;
fn deref(&self) -> &Self::Target {
&self.writer
}
}
impl DerefMut for MappedWriter {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.writer
}
}
impl MappedWriter {
pub fn new(path: impl AsRef<Path>, size: usize) -> std::io::Result<Self> {
if path.as_ref().exists() {
std::fs::remove_file(path.as_ref())?;
}
if let Some(parent) = path.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create_new(true)
.open(path)?;
file.set_len(size as u64)?;
file.sync_all()?;
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
let bytes = mmap.as_ref();
Ok(Self {
writer: RingBuffer::new(bytes).into_writer(),
mmap,
})
}
pub fn join(path: impl AsRef<Path>) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new().read(true).write(true).open(path)?;
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
let bytes = mmap.as_ref();
Ok(Self {
writer: RingBuffer::new(bytes).join_writer(),
mmap,
})
}
pub fn join_or_create(path: impl AsRef<Path>, size: usize) -> std::io::Result<Self> {
match path.as_ref().exists() {
true => {
let file_len = path.as_ref().metadata()?.len() as usize;
match file_len == size {
true => Self::join(path),
false => Self::new(path, size),
}
}
false => Self::new(path, size),
}
}
}
pub struct MappedReader {
reader: Reader,
#[allow(dead_code)]
mmap: Mmap,
}
impl Deref for MappedReader {
type Target = Reader;
fn deref(&self) -> &Self::Target {
&self.reader
}
}
impl DerefMut for MappedReader {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.reader
}
}
impl MappedReader {
pub fn new(path: impl AsRef<Path>) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new().read(true).open(path)?;
loop {
let len = file.metadata()?.len() as usize;
if len > 0 {
break;
}
hint::spin_loop()
}
let mmap = unsafe { MmapOptions::new().map(&file)? };
let bytes = mmap.as_ref();
Ok(Self {
reader: RingBuffer::new(bytes).into_reader(),
mmap,
})
}
pub fn new_with_position(path: impl AsRef<Path>, position: usize) -> std::io::Result<Self> {
let file = std::fs::OpenOptions::new().read(true).open(path)?;
let mmap = unsafe { MmapOptions::new().map(&file)? };
let bytes = mmap.as_ref();
Ok(Self {
reader: RingBuffer::new(bytes).into_reader().with_initial_position(position),
mmap,
})
}
}
#[cfg(test)]
mod tests {
use crate::HEADER_SIZE;
use crate::mmap::{MappedReader, MappedWriter};
use tempfile::NamedTempFile;
#[test]
fn should_use_mapped_reader_and_writer() {
const RING_BUFFER_SIZE: usize = HEADER_SIZE + 1024;
let file = NamedTempFile::new().unwrap();
let writer = MappedWriter::new(&file, RING_BUFFER_SIZE).unwrap();
let reader = MappedReader::new(&file).unwrap();
writer.claim_with_user_defined(32, true, 100).commit();
writer.claim_with_user_defined(32, true, 101).commit();
let mut iter = reader.read_batch().unwrap().into_iter();
assert_eq!(100, iter.next().unwrap().unwrap().user_defined);
assert_eq!(101, iter.next().unwrap().unwrap().user_defined);
let late_reader = MappedReader::new_with_position(&file, 0).unwrap();
let mut iter = late_reader.read_batch().unwrap().into_iter();
assert_eq!(100, iter.next().unwrap().unwrap().user_defined);
assert_eq!(101, iter.next().unwrap().unwrap().user_defined);
}
#[test]
fn should_use_writer_join() {
const RING_BUFFER_SIZE: usize = HEADER_SIZE + 1024;
let file = NamedTempFile::new().unwrap();
{
let writer = MappedWriter::new(&file, RING_BUFFER_SIZE).unwrap();
writer.claim_with_user_defined(32, true, 100).commit();
writer.claim_with_user_defined(32, true, 101).commit();
}
let writer = MappedWriter::join(&file).unwrap();
writer.claim_with_user_defined(32, true, 102).commit();
let reader = MappedReader::new_with_position(&file, 0).unwrap();
let mut iter = reader.read_batch().unwrap().into_iter();
assert_eq!(100, iter.next().unwrap().unwrap().user_defined);
assert_eq!(101, iter.next().unwrap().unwrap().user_defined);
assert_eq!(102, iter.next().unwrap().unwrap().user_defined);
assert!(iter.next().is_none());
}
}