use bytes::{BufMut, BytesMut};
use noxu_util::{
lsn::{Lsn, NULL_LSN},
vlsn::{NULL_VLSN, Vlsn},
};
use std::io;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum LnLogEntryError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
}
fn read_u8_at(buf: &[u8], pos: &mut usize) -> Result<u8, LnLogEntryError> {
if *pos >= buf.len() {
return Err(LnLogEntryError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"read_u8",
)));
}
let v = buf[*pos];
*pos += 1;
Ok(v)
}
fn read_u32_be_at(buf: &[u8], pos: &mut usize) -> Result<u32, LnLogEntryError> {
let end = *pos + 4;
if end > buf.len() {
return Err(LnLogEntryError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"read_u32",
)));
}
let v = u32::from_be_bytes(buf[*pos..end].try_into().unwrap());
*pos = end;
Ok(v)
}
fn read_i32_be_at(buf: &[u8], pos: &mut usize) -> Result<i32, LnLogEntryError> {
Ok(read_u32_be_at(buf, pos)? as i32)
}
fn read_u64_be_at(buf: &[u8], pos: &mut usize) -> Result<u64, LnLogEntryError> {
let end = *pos + 8;
if end > buf.len() {
return Err(LnLogEntryError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"read_u64",
)));
}
let v = u64::from_be_bytes(buf[*pos..end].try_into().unwrap());
*pos = end;
Ok(v)
}
fn read_i64_be_at(buf: &[u8], pos: &mut usize) -> Result<i64, LnLogEntryError> {
Ok(read_u64_be_at(buf, pos)? as i64)
}
fn read_slice_at<'a>(
buf: &'a [u8],
pos: &mut usize,
len: usize,
) -> Result<&'a [u8], LnLogEntryError> {
let end = *pos + len;
if end > buf.len() {
return Err(LnLogEntryError::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"read_slice",
)));
}
let slice = &buf[*pos..end];
*pos = end;
Ok(slice)
}
#[derive(Debug, Clone, Copy)]
pub struct LnEntryRef<'a> {
pub db_id: u64,
pub txn_id: Option<i64>,
pub abort_lsn: Lsn,
pub abort_known_deleted: bool,
pub abort_key: Option<&'a [u8]>,
pub abort_data: Option<&'a [u8]>,
pub abort_vlsn: Vlsn,
pub abort_expiration: i32,
pub embedded_ln: bool,
pub key: &'a [u8],
pub data: Option<&'a [u8]>,
pub expiration: i32,
}
#[derive(Debug, Clone, Copy)]
struct LnFlags {
bits: u8,
}
impl LnFlags {
const ABORT_KD_MASK: u8 = 0x01;
const EMBEDDED_LN_MASK: u8 = 0x02;
const HAVE_ABORT_KEY_MASK: u8 = 0x04;
const HAVE_ABORT_DATA_MASK: u8 = 0x08;
const HAVE_ABORT_VLSN_MASK: u8 = 0x10;
const HAVE_ABORT_LSN_MASK: u8 = 0x20;
const HAVE_ABORT_EXPIRATION_MASK: u8 = 0x40;
const HAVE_EXPIRATION_MASK: u8 = 0x80;
fn new() -> Self {
Self { bits: 0 }
}
fn from_bits(bits: u8) -> Self {
Self { bits }
}
fn set_abort_known_deleted(&mut self, val: bool) {
if val {
self.bits |= Self::ABORT_KD_MASK;
}
}
fn set_embedded_ln(&mut self, val: bool) {
if val {
self.bits |= Self::EMBEDDED_LN_MASK;
}
}
fn set_have_abort_key(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_ABORT_KEY_MASK;
}
}
fn set_have_abort_data(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_ABORT_DATA_MASK;
}
}
fn set_have_abort_vlsn(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_ABORT_VLSN_MASK;
}
}
fn set_have_abort_lsn(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_ABORT_LSN_MASK;
}
}
fn set_have_abort_expiration(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_ABORT_EXPIRATION_MASK;
}
}
fn set_have_expiration(&mut self, val: bool) {
if val {
self.bits |= Self::HAVE_EXPIRATION_MASK;
}
}
fn abort_known_deleted(&self) -> bool {
(self.bits & Self::ABORT_KD_MASK) != 0
}
fn embedded_ln(&self) -> bool {
(self.bits & Self::EMBEDDED_LN_MASK) != 0
}
fn have_abort_key(&self) -> bool {
(self.bits & Self::HAVE_ABORT_KEY_MASK) != 0
}
fn have_abort_data(&self) -> bool {
(self.bits & Self::HAVE_ABORT_DATA_MASK) != 0
}
fn have_abort_vlsn(&self) -> bool {
(self.bits & Self::HAVE_ABORT_VLSN_MASK) != 0
}
fn have_abort_lsn(&self) -> bool {
(self.bits & Self::HAVE_ABORT_LSN_MASK) != 0
}
fn have_abort_expiration(&self) -> bool {
(self.bits & Self::HAVE_ABORT_EXPIRATION_MASK) != 0
}
fn have_expiration(&self) -> bool {
(self.bits & Self::HAVE_EXPIRATION_MASK) != 0
}
}
#[derive(Debug, Clone)]
pub struct LnLogEntry {
pub db_id: u64,
pub txn_id: Option<i64>,
pub abort_lsn: Lsn,
pub abort_known_deleted: bool,
pub abort_key: Option<Vec<u8>>,
pub abort_data: Option<Vec<u8>>,
pub abort_vlsn: Vlsn,
pub abort_expiration: i32,
pub embedded_ln: bool,
pub key: Vec<u8>,
pub data: Option<Vec<u8>>,
pub expiration: i32,
pub vlsn: Vlsn,
}
impl LnLogEntry {
#[expect(clippy::too_many_arguments)]
pub fn new(
db_id: u64,
txn_id: Option<i64>,
abort_lsn: Lsn,
abort_known_deleted: bool,
abort_key: Option<Vec<u8>>,
abort_data: Option<Vec<u8>>,
abort_vlsn: Vlsn,
abort_expiration: i32,
embedded_ln: bool,
key: Vec<u8>,
data: Option<Vec<u8>>,
expiration: i32,
vlsn: Vlsn,
) -> Self {
Self {
db_id,
txn_id,
abort_lsn,
abort_known_deleted,
abort_key,
abort_data,
abort_vlsn,
abort_expiration,
embedded_ln,
key,
data,
expiration,
vlsn,
}
}
pub fn is_transactional(&self) -> bool {
self.txn_id.is_some()
}
pub fn is_deleted(&self) -> bool {
self.data.is_none()
}
pub fn log_size(&self) -> usize {
let mut size = 1;
size += 8;
if self.is_transactional() {
if !self.abort_lsn.is_null() {
size += 8; }
size += 8; }
if let Some(ref k) = self.abort_key {
size += 4 + k.len();
}
if let Some(ref d) = self.abort_data {
size += 4 + d.len();
}
if !self.abort_vlsn.is_null() {
size += 8;
}
if self.abort_expiration != 0 {
size += 4;
}
if self.expiration != 0 {
size += 4;
}
if let Some(ref d) = self.data {
size += 4 + d.len();
} else {
size += 4; }
size += 4 + self.key.len();
size
}
pub fn write_to_log(&self, buf: &mut BytesMut) {
let mut flags = LnFlags::new();
flags.set_abort_known_deleted(self.abort_known_deleted);
flags.set_embedded_ln(self.embedded_ln);
flags.set_have_abort_key(self.abort_key.is_some());
flags.set_have_abort_data(self.abort_data.is_some());
flags.set_have_abort_vlsn(!self.abort_vlsn.is_null());
flags.set_have_abort_lsn(!self.abort_lsn.is_null());
flags.set_have_abort_expiration(self.abort_expiration != 0);
flags.set_have_expiration(self.expiration != 0);
buf.put_u8(flags.bits);
buf.put_u64(self.db_id);
if self.is_transactional() {
if !self.abort_lsn.is_null() {
buf.put_u64(self.abort_lsn.as_u64());
}
buf.put_i64(self.txn_id.unwrap());
}
if let Some(ref k) = self.abort_key {
buf.put_u32(k.len() as u32);
buf.extend_from_slice(k);
}
if let Some(ref d) = self.abort_data {
buf.put_u32(d.len() as u32);
buf.extend_from_slice(d);
}
if !self.abort_vlsn.is_null() {
buf.put_i64(self.abort_vlsn.sequence());
}
if self.abort_expiration != 0 {
buf.put_i32(self.abort_expiration);
}
if self.expiration != 0 {
buf.put_i32(self.expiration);
}
if let Some(ref d) = self.data {
buf.put_u32(d.len() as u32);
buf.extend_from_slice(d);
} else {
buf.put_u32(0);
}
buf.put_u32(self.key.len() as u32);
buf.extend_from_slice(&self.key);
}
pub fn parse_from_slice<'a>(
buf: &'a [u8],
is_transactional: bool,
) -> Result<LnEntryRef<'a>, LnLogEntryError> {
let mut pos = 0usize;
let flags = LnFlags::from_bits(read_u8_at(buf, &mut pos)?);
let db_id = read_u64_be_at(buf, &mut pos)?;
let (txn_id, abort_lsn) = if is_transactional {
let lsn = if flags.have_abort_lsn() {
Lsn::from_u64(read_u64_be_at(buf, &mut pos)?)
} else {
NULL_LSN
};
let txn = read_i64_be_at(buf, &mut pos)?;
(Some(txn), lsn)
} else {
(None, NULL_LSN)
};
let abort_key = if flags.have_abort_key() {
let len = read_u32_be_at(buf, &mut pos)? as usize;
Some(read_slice_at(buf, &mut pos, len)?)
} else {
None
};
let abort_data = if flags.have_abort_data() {
let len = read_u32_be_at(buf, &mut pos)? as usize;
Some(read_slice_at(buf, &mut pos, len)?)
} else {
None
};
let abort_vlsn = if flags.have_abort_vlsn() {
Vlsn::new(read_i64_be_at(buf, &mut pos)?)
} else {
NULL_VLSN
};
let abort_expiration = if flags.have_abort_expiration() {
read_i32_be_at(buf, &mut pos)?
} else {
0
};
let expiration = if flags.have_expiration() {
read_i32_be_at(buf, &mut pos)?
} else {
0
};
let data_len = read_u32_be_at(buf, &mut pos)? as usize;
let data = if data_len > 0 {
Some(read_slice_at(buf, &mut pos, data_len)?)
} else {
None
};
let key_len = read_u32_be_at(buf, &mut pos)? as usize;
let key = read_slice_at(buf, &mut pos, key_len)?;
Ok(LnEntryRef {
db_id,
txn_id,
abort_lsn,
abort_known_deleted: flags.abort_known_deleted(),
abort_key,
abort_data,
abort_vlsn,
abort_expiration,
embedded_ln: flags.embedded_ln(),
key,
data,
expiration,
})
}
pub fn read_from_log(
buf: &[u8],
is_transactional: bool,
) -> Result<Self, LnLogEntryError> {
let r = Self::parse_from_slice(buf, is_transactional)?;
Ok(Self {
db_id: r.db_id,
txn_id: r.txn_id,
abort_lsn: r.abort_lsn,
abort_known_deleted: r.abort_known_deleted,
abort_key: r.abort_key.map(<[u8]>::to_vec),
abort_data: r.abort_data.map(<[u8]>::to_vec),
abort_vlsn: r.abort_vlsn,
abort_expiration: r.abort_expiration,
embedded_ln: r.embedded_ln,
key: r.key.to_vec(),
data: r.data.map(<[u8]>::to_vec),
expiration: r.expiration,
vlsn: NULL_VLSN, })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ln_log_entry_roundtrip_insert() {
let entry = LnLogEntry::new(
100,
Some(42),
Lsn::new(1, 500),
false,
None,
None,
NULL_VLSN,
0,
true,
b"mykey".to_vec(),
Some(b"mydata".to_vec()),
0,
Vlsn::new(10),
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = LnLogEntry::read_from_log(&buf, true).unwrap();
assert_eq!(entry.db_id, decoded.db_id);
assert_eq!(entry.txn_id, decoded.txn_id);
assert_eq!(entry.key, decoded.key);
assert_eq!(entry.data, decoded.data);
assert_eq!(entry.embedded_ln, decoded.embedded_ln);
}
#[test]
fn test_ln_log_entry_roundtrip_delete() {
let entry = LnLogEntry::new(
200,
None,
NULL_LSN,
false,
None,
None,
NULL_VLSN,
0,
false,
b"deletedkey".to_vec(),
None, 0,
NULL_VLSN,
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = LnLogEntry::read_from_log(&buf, false).unwrap();
assert_eq!(entry.db_id, decoded.db_id);
assert_eq!(entry.key, decoded.key);
assert!(decoded.is_deleted());
}
#[test]
fn test_ln_log_entry_with_abort_info() {
let entry = LnLogEntry::new(
300,
Some(99),
Lsn::new(5, 1000),
true,
Some(b"oldkey".to_vec()),
Some(b"olddata".to_vec()),
Vlsn::new(8),
123,
false,
b"newkey".to_vec(),
Some(b"newdata".to_vec()),
456,
Vlsn::new(20),
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = LnLogEntry::read_from_log(&buf, true).unwrap();
assert_eq!(entry.abort_lsn, decoded.abort_lsn);
assert_eq!(entry.abort_known_deleted, decoded.abort_known_deleted);
assert_eq!(entry.abort_key, decoded.abort_key);
assert_eq!(entry.abort_data, decoded.abort_data);
}
}