use crate::{read_u64, safe_write, write_u64, Address, GrowFailed, Memory, Storable};
use std::borrow::Cow;
use std::cell::RefCell;
use std::fmt;
use std::marker::PhantomData;
use std::thread::LocalKey;
#[cfg(test)]
mod tests;
pub const INDEX_MAGIC: &[u8; 3] = b"GLI";
pub const DATA_MAGIC: &[u8; 3] = b"GLD";
const LAYOUT_VERSION: u8 = 1;
const HEADER_V1_SIZE: u64 = 4;
const RESERVED_HEADER_SIZE: u64 = 28;
const HEADER_OFFSET: u64 = HEADER_V1_SIZE + RESERVED_HEADER_SIZE;
struct HeaderV1 {
magic: [u8; 3],
version: u8,
}
#[derive(Debug, PartialEq, Eq)]
pub enum InitError {
IncompatibleDataVersion {
last_supported_version: u8,
decoded_version: u8,
},
IncompatibleIndexVersion {
last_supported_version: u8,
decoded_version: u8,
},
InvalidIndex,
}
impl fmt::Display for InitError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InitError::IncompatibleDataVersion {
last_supported_version,
decoded_version,
} => write!(
f,
"Incompatible data version: last supported version is {}, but decoded version is {}",
last_supported_version, decoded_version
),
InitError::IncompatibleIndexVersion {
last_supported_version,
decoded_version,
} => write!(
f,
"Incompatible index version: last supported version is {}, but decoded version is {}",
last_supported_version, decoded_version
),
InitError::InvalidIndex => write!(f, "Invalid index"),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum WriteError {
GrowFailed { current_size: u64, delta: u64 },
}
impl From<GrowFailed> for WriteError {
fn from(
GrowFailed {
current_size,
delta,
}: GrowFailed,
) -> Self {
Self::GrowFailed {
current_size,
delta,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct NoSuchEntry;
pub struct Log<T: Storable, INDEX: Memory, DATA: Memory> {
index_memory: INDEX,
data_memory: DATA,
_marker: PhantomData<T>,
}
impl<T: Storable, INDEX: Memory, DATA: Memory> Log<T, INDEX, DATA> {
pub fn new(index_memory: INDEX, data_memory: DATA) -> Self {
let log = Self {
index_memory,
data_memory,
_marker: PhantomData,
};
Self::write_header(
&log.index_memory,
&HeaderV1 {
magic: *INDEX_MAGIC,
version: LAYOUT_VERSION,
},
);
Self::write_header(
&log.data_memory,
&HeaderV1 {
magic: *DATA_MAGIC,
version: LAYOUT_VERSION,
},
);
write_u64(&log.index_memory, Address::from(HEADER_OFFSET), 0);
log
}
pub fn init(index_memory: INDEX, data_memory: DATA) -> Result<Self, InitError> {
if data_memory.size() == 0 {
return Ok(Self::new(index_memory, data_memory));
}
let data_header = Self::read_header(&data_memory);
if &data_header.magic != DATA_MAGIC {
return Ok(Self::new(index_memory, data_memory));
}
if data_header.version != LAYOUT_VERSION {
return Err(InitError::IncompatibleDataVersion {
last_supported_version: LAYOUT_VERSION,
decoded_version: data_header.version,
});
}
let index_header = Self::read_header(&index_memory);
if &index_header.magic != INDEX_MAGIC {
return Err(InitError::InvalidIndex);
}
if index_header.version != LAYOUT_VERSION {
return Err(InitError::IncompatibleIndexVersion {
last_supported_version: LAYOUT_VERSION,
decoded_version: index_header.version,
});
}
#[cfg(debug_assertions)]
{
assert_eq!(Ok(()), Self::validate_v1_index(&index_memory));
}
Ok(Self {
index_memory,
data_memory,
_marker: PhantomData,
})
}
fn write_header(memory: &impl Memory, header: &HeaderV1) {
if memory.size() < 1 {
assert!(
memory.grow(1) != -1,
"failed to allocate the first memory page"
);
}
memory.write(0, &header.magic);
memory.write(3, &[header.version]);
}
fn read_header(memory: &impl Memory) -> HeaderV1 {
let mut magic = [0u8; 3];
let mut version = [0u8; 1];
memory.read(0, &mut magic);
memory.read(3, &mut version);
HeaderV1 {
magic,
version: version[0],
}
}
#[cfg(debug_assertions)]
fn validate_v1_index(memory: &INDEX) -> Result<(), String> {
let num_entries = read_u64(memory, Address::from(HEADER_OFFSET));
if num_entries == 0 {
return Ok(());
}
let mut prev_entry = read_u64(memory, Address::from(HEADER_OFFSET + 8));
for i in 1..num_entries {
let entry = read_u64(memory, Address::from(HEADER_OFFSET + 8 + i * 8));
if entry < prev_entry {
return Err(format!("invalid entry I[{i}]: {entry} < {prev_entry}"));
}
prev_entry = entry;
}
Ok(())
}
pub fn into_memories(self) -> (INDEX, DATA) {
(self.index_memory, self.data_memory)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn index_size_bytes(&self) -> u64 {
let num_entries = read_u64(&self.index_memory, Address::from(HEADER_OFFSET));
self.index_entry_offset(num_entries).get()
}
pub fn data_size_bytes(&self) -> u64 {
self.log_size_bytes() + HEADER_OFFSET
}
pub fn log_size_bytes(&self) -> u64 {
let num_entries = self.len();
if num_entries == 0 {
0
} else {
read_u64(&self.index_memory, self.index_entry_offset(num_entries - 1))
}
}
pub fn len(&self) -> u64 {
read_u64(&self.index_memory, Address::from(HEADER_OFFSET))
}
pub fn get(&self, idx: u64) -> Option<T> {
let mut buf = vec![];
self.read_entry(idx, &mut buf).ok()?;
Some(T::from_bytes(Cow::Owned(buf)))
}
pub fn iter(&self) -> Iter<'_, T, INDEX, DATA> {
Iter {
log: self,
buf: vec![],
pos: 0,
}
}
pub fn read_entry(&self, idx: u64, buf: &mut Vec<u8>) -> Result<(), NoSuchEntry> {
let (offset, len) = self.entry_meta(idx).ok_or(NoSuchEntry)?;
buf.resize(len, 0);
self.data_memory.read(HEADER_OFFSET + offset, buf);
Ok(())
}
pub fn append(&self, item: &T) -> Result<u64, WriteError> {
let idx = self.len();
let data_offset = if idx == 0 {
0
} else {
read_u64(&self.index_memory, self.index_entry_offset(idx - 1))
};
let bytes = item.to_bytes();
let new_offset = data_offset
.checked_add(bytes.len() as u64)
.expect("address overflow");
let entry_offset = HEADER_OFFSET
.checked_add(data_offset)
.expect("address overflow");
debug_assert!(new_offset >= data_offset);
safe_write(&self.data_memory, entry_offset, &bytes[..])?;
safe_write(
&self.index_memory,
self.index_entry_offset(idx).get(),
&new_offset.to_le_bytes(),
)?;
write_u64(&self.index_memory, Address::from(HEADER_OFFSET), idx + 1);
debug_assert_eq!(self.get(idx).unwrap().to_bytes(), bytes);
Ok(idx)
}
fn entry_meta(&self, idx: u64) -> Option<(u64, usize)> {
if self.len() <= idx {
return None;
}
if idx == 0 {
Some((
0,
read_u64(&self.index_memory, self.index_entry_offset(0)) as usize,
))
} else {
let offset = read_u64(&self.index_memory, self.index_entry_offset(idx - 1));
let next = read_u64(&self.index_memory, self.index_entry_offset(idx));
debug_assert!(offset <= next);
Some((offset, (next - offset) as usize))
}
}
fn index_entry_offset(&self, idx: u64) -> Address {
Address::from(
HEADER_OFFSET + std::mem::size_of::<u64>() as u64 + idx * (std::mem::size_of::<u64>() as u64), )
}
}
pub struct Iter<'a, T, I, D>
where
T: Storable,
I: Memory,
D: Memory,
{
log: &'a Log<T, I, D>,
buf: Vec<u8>,
pos: u64,
}
impl<T, I, D> Iterator for Iter<'_, T, I, D>
where
T: Storable,
I: Memory,
D: Memory,
{
type Item = T;
fn next(&mut self) -> Option<T> {
match self.log.read_entry(self.pos, &mut self.buf) {
Ok(()) => {
self.pos = self.pos.saturating_add(1);
Some(T::from_bytes(Cow::Borrowed(&self.buf)))
}
Err(NoSuchEntry) => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.log.len().saturating_sub(self.pos) as usize, None)
}
fn count(self) -> usize {
let n = self.log.len().saturating_sub(self.pos);
if n > usize::MAX as u64 {
panic!("The number of items in the log {n} does not fit into usize");
}
n as usize
}
fn nth(&mut self, n: usize) -> Option<T> {
self.pos = self.pos.saturating_add(n as u64);
self.next()
}
}
pub fn iter_thread_local<T, I, D>(
local_key: &'static LocalKey<RefCell<Log<T, I, D>>>,
) -> ThreadLocalRefIterator<T, I, D>
where
T: Storable,
I: Memory,
D: Memory,
{
ThreadLocalRefIterator {
log: local_key,
buf: vec![],
pos: 0,
}
}
pub struct ThreadLocalRefIterator<T, I, D>
where
T: Storable + 'static,
I: Memory + 'static,
D: Memory + 'static,
{
log: &'static LocalKey<RefCell<Log<T, I, D>>>,
buf: Vec<u8>,
pos: u64,
}
impl<T, I, D> Iterator for ThreadLocalRefIterator<T, I, D>
where
T: Storable,
I: Memory,
D: Memory,
{
type Item = T;
fn next(&mut self) -> Option<T> {
self.log.with(
|log| match log.borrow().read_entry(self.pos, &mut self.buf) {
Ok(()) => {
self.pos = self.pos.saturating_add(1);
Some(T::from_bytes(Cow::Borrowed(&self.buf)))
}
Err(NoSuchEntry) => None,
},
)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let count = self.log.with(|cell| cell.borrow().len());
(count.saturating_sub(self.pos) as usize, None)
}
fn count(self) -> usize {
self.size_hint().0
}
fn nth(&mut self, n: usize) -> Option<T> {
self.pos = self.pos.saturating_add(n as u64);
self.next()
}
}