use blake3::Hash;
use core::ops::Range;
use std::fs::{File, OpenOptions};
use std::io::{Error, Read, Seek, SeekFrom, Write};
use std::path::Path;
use memmap2::{MmapMut, MmapOptions};
const PAGE_SIZE: usize = 4096;
const HEADER_SIZE: usize = core::mem::size_of::<Header>();
const MAGIC: [u8; 8] = *b"PELIKAN!";
const VERSION: u64 = 0;
#[allow(clippy::len_without_is_empty)]
pub trait Datapool: Send {
fn as_slice(&self) -> &[u8];
fn as_mut_slice(&mut self) -> &mut [u8];
fn flush(&mut self) -> Result<(), std::io::Error>;
fn len(&self) -> usize {
self.as_slice().len()
}
}
pub struct Memory {
mmap: MmapMut,
size: usize,
}
impl Memory {
pub fn create(size: usize) -> Result<Self, std::io::Error> {
let mut mmap = MmapOptions::new().populate().len(size).map_anon()?;
let mut offset = 0;
while offset < size {
mmap[offset] = 0;
offset += PAGE_SIZE;
}
Ok(Self { mmap, size })
}
}
impl Datapool for Memory {
fn as_slice(&self) -> &[u8] {
&self.mmap[..self.size]
}
fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.mmap[..self.size]
}
fn flush(&mut self) -> Result<(), std::io::Error> {
self.mmap.flush()
}
}
#[repr(C, packed)]
pub struct Header {
checksum: [u8; 32],
magic: [u8; 8],
version: u64,
time_monotonic_s: clocksource::coarse::Instant,
time_unix_s: clocksource::coarse::UnixInstant,
time_monotonic_ns: clocksource::precise::Instant,
time_unix_ns: clocksource::precise::UnixInstant,
user_version: u64,
options: u64,
_pad: [u8; 4008],
}
impl Header {
fn new() -> Self {
Self {
checksum: [0; 32],
magic: MAGIC,
version: VERSION,
time_monotonic_s: clocksource::coarse::Instant::now(),
time_unix_s: clocksource::coarse::UnixInstant::now(),
time_monotonic_ns: clocksource::precise::Instant::now(),
time_unix_ns: clocksource::precise::UnixInstant::now(),
user_version: 0,
options: 0,
_pad: [0; 4008],
}
}
fn as_bytes(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts((self as *const Header) as *const u8, HEADER_SIZE) }
}
fn checksum(&self) -> &[u8; 32] {
&self.checksum
}
fn set_checksum(&mut self, hash: Hash) {
for (idx, byte) in hash.as_bytes()[0..32].iter().enumerate() {
self.checksum[idx] = *byte;
}
}
fn zero_checksum(&mut self) {
for byte in self.checksum.iter_mut() {
*byte = 0;
}
}
fn check(&self) -> Result<(), std::io::Error> {
self.check_magic()?;
self.check_version()
}
fn check_version(&self) -> Result<(), std::io::Error> {
if self.version != VERSION {
Err(Error::other("file has incompatible version"))
} else {
Ok(())
}
}
fn check_magic(&self) -> Result<(), std::io::Error> {
if self.magic[0..8] == MAGIC[0..8] {
Ok(())
} else {
Err(Error::other("header is not recognized"))
}
}
fn user_version(&self) -> u64 {
self.user_version
}
fn set_user_version(&mut self, user_version: u64) {
self.user_version = user_version;
}
pub fn options(&self) -> u64 {
self.options
}
}
pub struct MmapFile {
mmap: MmapMut,
data: Range<usize>,
user_version: u64,
}
impl MmapFile {
pub fn open<T: AsRef<Path>>(
path: T,
data_size: usize,
user_version: u64,
) -> Result<Self, std::io::Error> {
let pages = ((HEADER_SIZE + data_size) as f64 / PAGE_SIZE as f64).ceil() as usize;
let total_size = pages * PAGE_SIZE;
let file = OpenOptions::new()
.create_new(false)
.read(true)
.write(true)
.open(path)?;
if file.metadata()?.len() != total_size as u64 {
return Err(Error::other("filesize mismatch"));
}
let data = Range {
start: HEADER_SIZE,
end: HEADER_SIZE + data_size,
};
let mmap = unsafe { MmapOptions::new().populate().map_mut(&file)? };
let mut header = [0; HEADER_SIZE];
header.copy_from_slice(&mmap[0..HEADER_SIZE]);
let header = unsafe { &mut *(header.as_ptr() as *mut Header) };
header.check()?;
if header.user_version() != user_version {
return Err(Error::other("user version mismatch"));
}
header.zero_checksum();
let mut hasher = blake3::Hasher::new();
hasher.update(header.as_bytes());
hasher.update(&mmap[data.start..data.end]);
let hash = hasher.finalize();
if mmap[0..32] != hash.as_bytes()[0..32] {
return Err(Error::other("checksum mismatch"));
}
Ok(Self {
mmap,
data,
user_version,
})
}
pub fn create<T: AsRef<Path>>(
path: T,
data_size: usize,
user_version: u64,
) -> Result<Self, std::io::Error> {
let pages = ((HEADER_SIZE + data_size) as f64 / PAGE_SIZE as f64).ceil() as usize;
let total_size = pages * PAGE_SIZE;
let data = Range {
start: HEADER_SIZE,
end: total_size,
};
let file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(path)?;
file.set_len(total_size as u64)?;
let mut mmap = unsafe { MmapOptions::new().populate().map_mut(&file)? };
let mut offset = 0;
while offset < total_size {
mmap[offset] = 0;
offset += PAGE_SIZE;
}
mmap.flush()?;
Ok(Self {
mmap,
data,
user_version,
})
}
pub fn header(&self) -> &Header {
let mut header = [0; HEADER_SIZE];
header.copy_from_slice(&self.mmap[0..HEADER_SIZE]);
unsafe { &*(header.as_ptr() as *const Header) }
}
pub fn time_monotonic_s(&self) -> clocksource::coarse::Instant {
self.header().time_monotonic_s
}
pub fn time_monotonic_ns(&self) -> clocksource::precise::Instant {
self.header().time_monotonic_ns
}
pub fn time_unix_s(&self) -> clocksource::coarse::UnixInstant {
self.header().time_unix_s
}
pub fn time_unix_ns(&self) -> clocksource::precise::UnixInstant {
self.header().time_unix_ns
}
}
impl Datapool for MmapFile {
fn as_slice(&self) -> &[u8] {
&self.mmap[self.data.start..self.data.end]
}
fn as_mut_slice(&mut self) -> &mut [u8] {
&mut self.mmap[self.data.start..self.data.end]
}
fn flush(&mut self) -> Result<(), std::io::Error> {
self.mmap.flush()?;
let mut hasher = blake3::Hasher::new();
let mut header = Header::new();
header.set_user_version(self.user_version);
hasher.update(header.as_bytes());
let data_pages = (self.mmap.len() - HEADER_SIZE) / PAGE_SIZE;
for page in 0..data_pages {
let start = page * PAGE_SIZE + HEADER_SIZE;
let end = start + PAGE_SIZE;
hasher.update(&self.mmap[start..end]);
}
let hash = hasher.finalize();
header.set_checksum(hash);
unsafe {
let src = header.as_bytes().as_ptr();
let dst = self.mmap.as_mut_ptr();
std::ptr::copy_nonoverlapping(src, dst, HEADER_SIZE);
}
self.mmap.flush()
}
}
pub struct FileBackedMemory {
memory: Memory,
header: Box<[u8]>,
file: File,
file_data: Range<usize>,
user_version: u64,
}
impl FileBackedMemory {
pub fn open<T: AsRef<Path>>(
path: T,
data_size: usize,
user_version: u64,
) -> Result<Self, std::io::Error> {
let pages = ((HEADER_SIZE + data_size) as f64 / PAGE_SIZE as f64).ceil() as usize;
let file_total_size = Range {
start: 0,
end: pages * PAGE_SIZE,
};
let file_data = Range {
start: HEADER_SIZE,
end: HEADER_SIZE + data_size,
};
let mut file = OpenOptions::new()
.create_new(false)
.read(true)
.write(true)
.open(path)?;
if file.metadata()?.len() != file_total_size.end as u64 {
return Err(Error::other("filesize mismatch"));
}
let data_pages = (file_data.end - file_data.start) / PAGE_SIZE;
let mut memory = Memory::create(data_size)?;
file.seek(SeekFrom::Start(0))?;
let mut header = [0; HEADER_SIZE];
loop {
if file.read(&mut header[0..PAGE_SIZE])? == PAGE_SIZE {
break;
}
file.seek(SeekFrom::Start(0))?;
}
let mut hasher = blake3::Hasher::new();
let header = unsafe { &mut *(header.as_ptr() as *mut Header) };
header.check()?;
if header.user_version() != user_version {
return Err(Error::other("user version mismatch"));
}
let file_checksum = header.checksum().to_owned();
header.zero_checksum();
hasher.update(header.as_bytes());
file.seek(SeekFrom::Start(file_data.start as u64))?;
for page in 0..data_pages {
loop {
let start = page * PAGE_SIZE;
let end = start + PAGE_SIZE;
if file.read(&mut memory.as_mut_slice()[start..end])? == PAGE_SIZE {
hasher.update(&memory.as_slice()[start..end]);
break;
}
file.seek(SeekFrom::Start((HEADER_SIZE + start) as u64))?;
}
}
let hash = hasher.finalize();
if file_checksum[0..32] != hash.as_bytes()[0..32] {
return Err(Error::other("checksum mismatch"));
}
Ok(Self {
memory,
header: header.as_bytes().to_owned().into_boxed_slice(),
file,
file_data,
user_version,
})
}
pub fn create<T: AsRef<Path>>(
path: T,
data_size: usize,
user_version: u64,
) -> Result<Self, std::io::Error> {
let pages = ((HEADER_SIZE + data_size) as f64 / PAGE_SIZE as f64).ceil() as usize;
let file_total_size = Range {
start: 0,
end: pages * PAGE_SIZE,
};
let file_data = Range {
start: HEADER_SIZE,
end: pages * PAGE_SIZE,
};
let mut file = OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.open(path)?;
file.set_len(file_total_size.end as u64)?;
for page in 0..pages {
loop {
if file.write(&[0; PAGE_SIZE])? == PAGE_SIZE {
break;
}
file.seek(SeekFrom::Start((page * PAGE_SIZE) as u64))?;
}
}
file.sync_all()?;
let memory = Memory::create(data_size)?;
Ok(Self {
memory,
header: vec![0; HEADER_SIZE].into_boxed_slice(),
file,
file_data,
user_version,
})
}
pub fn header(&self) -> &Header {
unsafe { &*(self.header.as_ptr() as *const Header) }
}
pub fn time_monotonic_s(&self) -> clocksource::coarse::Instant {
self.header().time_monotonic_s
}
pub fn time_monotonic_ns(&self) -> clocksource::precise::Instant {
self.header().time_monotonic_ns
}
pub fn time_unix_s(&self) -> clocksource::coarse::UnixInstant {
self.header().time_unix_s
}
pub fn time_unix_ns(&self) -> clocksource::precise::UnixInstant {
self.header().time_unix_ns
}
}
impl Datapool for FileBackedMemory {
fn as_slice(&self) -> &[u8] {
self.memory.as_slice()
}
fn as_mut_slice(&mut self) -> &mut [u8] {
self.memory.as_mut_slice()
}
fn flush(&mut self) -> Result<(), std::io::Error> {
let mut hasher = blake3::Hasher::new();
let mut header = Header::new();
header.set_user_version(self.user_version);
hasher.update(header.as_bytes());
let data_pages = (self.file_data.end - self.file_data.start) / PAGE_SIZE;
self.file.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
for page in 0..data_pages {
loop {
let start = page * PAGE_SIZE;
let end = start + PAGE_SIZE;
if self.file.write(&self.memory.as_slice()[start..end])? == PAGE_SIZE {
hasher.update(&self.memory.as_slice()[start..end]);
break;
}
self.file
.seek(SeekFrom::Start((HEADER_SIZE + start) as u64))?;
}
}
let hash = hasher.finalize();
header.set_checksum(hash);
self.file.seek(SeekFrom::Start(0))?;
loop {
if self.file.write(header.as_bytes())? == HEADER_SIZE {
break;
}
self.file.seek(SeekFrom::Start(0))?;
}
self.file.sync_all()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn header_size() {
assert_eq!(std::mem::size_of::<Header>(), PAGE_SIZE);
}
#[test]
fn memory_datapool() {
let datapool = Memory::create(2 * PAGE_SIZE).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
}
#[test]
fn mmapfile_datapool() {
let tempdir = TempDir::new().expect("failed to generate tempdir");
let mut path = tempdir.keep();
path.push("mmap_test.data");
let magic_a = [0xDE, 0xCA, 0xFB, 0xAD];
let magic_b = [0xBA, 0xDC, 0x0F, 0xFE, 0xEB, 0xAD, 0xCA, 0xFE];
{
let mut datapool =
MmapFile::create(&path, 2 * PAGE_SIZE, 0).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
datapool.flush().expect("failed to flush");
for (i, byte) in magic_a.iter().enumerate() {
datapool.as_mut_slice()[i] = *byte;
}
datapool.flush().expect("failed to flush");
}
{
let mut datapool =
MmapFile::open(&path, 2 * PAGE_SIZE, 0).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
assert_eq!(datapool.as_slice()[0..4], magic_a[0..4]);
assert_eq!(datapool.as_slice()[4..8], [0; 4]);
for (i, byte) in magic_b.iter().enumerate() {
datapool.as_mut_slice()[i] = *byte;
}
datapool.flush().expect("failed to flush");
}
{
let datapool = MmapFile::open(&path, 2 * PAGE_SIZE, 0).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
assert_eq!(datapool.as_slice()[0..8], magic_b[0..8]);
}
{
assert!(MmapFile::open(&path, 2 * PAGE_SIZE, 1).is_err());
}
}
#[test]
fn filebackedmemory_datapool() {
let tempdir = TempDir::new().expect("failed to generate tempdir");
let mut path = tempdir.keep();
path.push("mmap_test.data");
let magic_a = [0xDE, 0xCA, 0xFB, 0xAD];
let magic_b = [0xBA, 0xDC, 0x0F, 0xFE, 0xEB, 0xAD, 0xCA, 0xFE];
{
let mut datapool =
FileBackedMemory::create(&path, 2 * PAGE_SIZE, 0).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
datapool.flush().expect("failed to flush");
for (i, byte) in magic_a.iter().enumerate() {
datapool.as_mut_slice()[i] = *byte;
}
datapool.flush().expect("failed to flush");
}
{
let mut datapool =
FileBackedMemory::open(&path, 2 * PAGE_SIZE, 0).expect("failed to open pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
assert_eq!(datapool.as_slice()[0..4], magic_a[0..4]);
assert_eq!(datapool.as_slice()[4..8], [0; 4]);
for (i, byte) in magic_b.iter().enumerate() {
datapool.as_mut_slice()[i] = *byte;
}
datapool.flush().expect("failed to flush");
}
{
let datapool =
FileBackedMemory::open(&path, 2 * PAGE_SIZE, 0).expect("failed to create pool");
assert_eq!(datapool.len(), 2 * PAGE_SIZE);
assert_eq!(datapool.as_slice()[0..8], magic_b[0..8]);
}
{
assert!(FileBackedMemory::open(&path, 2 * PAGE_SIZE, 1).is_err());
}
}
}