use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, BytesMut};
use noxu_util::{lsn::Lsn, vlsn::Vlsn};
use std::io::{self, Cursor};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum RollbackStartEntryError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RollbackStartEntry {
pub matchpoint_vlsn: Vlsn,
pub matchpoint_lsn: Lsn,
pub active_txn_ids: Vec<i64>,
}
impl RollbackStartEntry {
pub fn new(
matchpoint_vlsn: Vlsn,
matchpoint_lsn: Lsn,
mut active_txn_ids: Vec<i64>,
) -> Self {
active_txn_ids.sort_unstable();
active_txn_ids.dedup();
Self { matchpoint_vlsn, matchpoint_lsn, active_txn_ids }
}
pub fn log_size(&self) -> usize {
8 + 8 + 4 + 8 * self.active_txn_ids.len() }
pub fn write_to_log(&self, buf: &mut BytesMut) {
buf.put_i64(self.matchpoint_vlsn.sequence());
buf.put_u64(self.matchpoint_lsn.as_u64());
buf.put_u32(self.active_txn_ids.len() as u32);
for &id in &self.active_txn_ids {
buf.put_i64(id);
}
}
pub fn read_from_log(buf: &[u8]) -> Result<Self, RollbackStartEntryError> {
let mut cursor = Cursor::new(buf);
let matchpoint_vlsn = Vlsn::new(cursor.read_i64::<BigEndian>()?);
let matchpoint_lsn = Lsn::from_u64(cursor.read_u64::<BigEndian>()?);
let count = cursor.read_u32::<BigEndian>()? as usize;
let mut active_txn_ids = Vec::with_capacity(count);
for _ in 0..count {
active_txn_ids.push(cursor.read_i64::<BigEndian>()?);
}
Ok(Self { matchpoint_vlsn, matchpoint_lsn, active_txn_ids })
}
}
#[cfg(test)]
mod tests {
use super::*;
use noxu_util::lsn::{Lsn, NULL_LSN};
use noxu_util::vlsn::{NULL_VLSN, Vlsn};
#[test]
fn test_rollback_start_roundtrip() {
let entry = RollbackStartEntry::new(
Vlsn::new(99),
Lsn::new(7, 4000),
vec![10, 20, 30],
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = RollbackStartEntry::read_from_log(&buf).unwrap();
assert_eq!(entry, decoded);
assert_eq!(decoded.matchpoint_vlsn, Vlsn::new(99));
assert_eq!(decoded.matchpoint_lsn, Lsn::new(7, 4000));
assert_eq!(decoded.active_txn_ids, vec![10, 20, 30]);
}
#[test]
fn test_rollback_start_empty_txn_set() {
let entry = RollbackStartEntry::new(NULL_VLSN, NULL_LSN, Vec::new());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = RollbackStartEntry::read_from_log(&buf).unwrap();
assert_eq!(entry, decoded);
assert!(decoded.active_txn_ids.is_empty());
}
#[test]
fn test_active_txn_ids_sorted_and_deduped() {
let entry = RollbackStartEntry::new(
Vlsn::new(1),
Lsn::new(1, 100),
vec![30, 10, 20, 10],
);
assert_eq!(entry.active_txn_ids, vec![10, 20, 30]);
}
#[test]
fn test_log_size() {
let entry = RollbackStartEntry::new(NULL_VLSN, NULL_LSN, vec![1, 2, 3]);
assert_eq!(entry.log_size(), 44);
assert_eq!(entry.log_size(), buf_size(&entry));
}
#[test]
fn test_log_size_empty() {
let entry = RollbackStartEntry::new(NULL_VLSN, NULL_LSN, vec![]);
assert_eq!(entry.log_size(), 20);
assert_eq!(entry.log_size(), buf_size(&entry));
}
fn buf_size(entry: &RollbackStartEntry) -> usize {
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
buf.len()
}
}