use super::{DbOperationType, LnLogEntry};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, BytesMut};
use noxu_util::{lsn::Lsn, vlsn::Vlsn};
use std::io::{self, Cursor};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum NameLnLogEntryError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Invalid database operation type: {0}")]
InvalidOpType(#[from] super::db_operation_type::DbOperationTypeError),
#[error("LN log entry error: {0}")]
LnEntry(#[from] super::ln_log_entry::LnLogEntryError),
}
#[derive(Debug, Clone)]
pub struct NameLnLogEntry {
pub ln_entry: LnLogEntry,
pub operation_type: DbOperationType,
pub replicated_create_config: Option<Vec<u8>>,
pub truncate_old_db_id: Option<u64>,
}
impl NameLnLogEntry {
pub fn new(
ln_entry: LnLogEntry,
operation_type: DbOperationType,
replicated_create_config: Option<Vec<u8>>,
truncate_old_db_id: Option<u64>,
) -> Self {
Self {
ln_entry,
operation_type,
replicated_create_config,
truncate_old_db_id,
}
}
pub fn new_create(
db_id: u64,
txn_id: Option<i64>,
abort_lsn: Lsn,
key: Vec<u8>,
data: Vec<u8>,
config: Vec<u8>,
) -> Self {
let ln_entry = LnLogEntry::new(
db_id,
txn_id,
abort_lsn,
false, None, None, Vlsn::new(0),
0, false,
key,
Some(data),
0, Vlsn::new(0),
);
Self {
ln_entry,
operation_type: DbOperationType::Create,
replicated_create_config: Some(config),
truncate_old_db_id: None,
}
}
pub fn new_remove(
db_id: u64,
txn_id: Option<i64>,
abort_lsn: Lsn,
key: Vec<u8>,
) -> Self {
let ln_entry = LnLogEntry::new(
db_id,
txn_id,
abort_lsn,
false,
None,
None,
Vlsn::new(0),
0,
false,
key,
None, 0,
Vlsn::new(0),
);
Self {
ln_entry,
operation_type: DbOperationType::Remove,
replicated_create_config: None,
truncate_old_db_id: None,
}
}
pub fn new_truncate(
db_id: u64,
txn_id: Option<i64>,
abort_lsn: Lsn,
key: Vec<u8>,
data: Vec<u8>,
old_db_id: u64,
) -> Self {
let ln_entry = LnLogEntry::new(
db_id,
txn_id,
abort_lsn,
false,
None,
None,
Vlsn::new(0),
0,
false,
key,
Some(data),
0,
Vlsn::new(0),
);
Self {
ln_entry,
operation_type: DbOperationType::Truncate,
replicated_create_config: None,
truncate_old_db_id: Some(old_db_id),
}
}
pub fn log_size(&self) -> usize {
let mut size = self.ln_entry.log_size();
size += DbOperationType::log_size();
if self.operation_type.is_write_config_type()
&& let Some(ref config) = self.replicated_create_config
{
size += 4 + config.len();
}
if self.operation_type == DbOperationType::Truncate {
size += 8; }
size
}
pub fn write_to_log(&self, buf: &mut BytesMut) {
self.ln_entry.write_to_log(buf);
self.operation_type.write_to_log(buf);
if self.operation_type.is_write_config_type()
&& let Some(ref config) = self.replicated_create_config
{
buf.put_u32(config.len() as u32);
buf.extend_from_slice(config);
}
if self.operation_type == DbOperationType::Truncate {
buf.put_u64(self.truncate_old_db_id.unwrap_or(0));
}
}
pub fn read_from_log(
buf: &[u8],
is_transactional: bool,
) -> Result<Self, NameLnLogEntryError> {
let ln_entry = LnLogEntry::read_from_log(buf, is_transactional)?;
let mut temp_buf = BytesMut::new();
ln_entry.write_to_log(&mut temp_buf);
let ln_size = temp_buf.len();
let mut cursor = Cursor::new(&buf[ln_size..]);
let op_byte_buf = &buf[ln_size..ln_size + 1];
let operation_type = DbOperationType::read_from_log(op_byte_buf)?;
cursor.set_position(cursor.position() + 1);
let replicated_create_config = if operation_type.is_write_config_type()
{
let config_len = cursor.read_u32::<BigEndian>()? as usize;
let mut config = vec![0u8; config_len];
io::Read::read_exact(&mut cursor, &mut config)?;
Some(config)
} else {
None
};
let truncate_old_db_id = if operation_type == DbOperationType::Truncate
{
Some(cursor.read_u64::<BigEndian>()?)
} else {
None
};
Ok(Self {
ln_entry,
operation_type,
replicated_create_config,
truncate_old_db_id,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use noxu_util::lsn::NULL_LSN;
#[test]
fn test_name_ln_create_roundtrip() {
let entry = NameLnLogEntry::new_create(
100,
Some(42),
NULL_LSN,
b"mydb".to_vec(),
b"name_ln_data".to_vec(),
b"db_config_bytes".to_vec(),
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, true).unwrap();
assert_eq!(entry.operation_type, decoded.operation_type);
assert_eq!(entry.ln_entry.db_id, decoded.ln_entry.db_id);
assert_eq!(entry.ln_entry.key, decoded.ln_entry.key);
assert_eq!(
entry.replicated_create_config,
decoded.replicated_create_config
);
}
#[test]
fn test_name_ln_remove_roundtrip() {
let entry =
NameLnLogEntry::new_remove(200, None, NULL_LSN, b"olddb".to_vec());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, false).unwrap();
assert_eq!(entry.operation_type, DbOperationType::Remove);
assert_eq!(decoded.operation_type, DbOperationType::Remove);
assert!(decoded.ln_entry.is_deleted());
}
#[test]
fn test_name_ln_truncate_roundtrip() {
let entry = NameLnLogEntry::new_truncate(
300,
Some(99),
Lsn::new(1, 500),
b"truncdb".to_vec(),
b"new_mapping".to_vec(),
999,
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, true).unwrap();
assert_eq!(entry.operation_type, DbOperationType::Truncate);
assert_eq!(decoded.truncate_old_db_id, Some(999));
}
#[test]
fn test_new_constructor() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let ln_entry = LnLogEntry::new(
1,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"key".to_vec(),
Some(b"data".to_vec()),
0,
Vlsn::new(0),
);
let entry =
NameLnLogEntry::new(ln_entry, DbOperationType::Rename, None, None);
assert_eq!(entry.operation_type, DbOperationType::Rename);
assert!(entry.replicated_create_config.is_none());
assert!(entry.truncate_old_db_id.is_none());
}
#[test]
fn test_log_size_remove_entry() {
let entry =
NameLnLogEntry::new_remove(10, None, NULL_LSN, b"db".to_vec());
let size = entry.log_size();
assert!(size > 1);
}
#[test]
fn test_log_size_truncate_entry() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let make_base_ln = || {
LnLogEntry::new(
20,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"db".to_vec(),
Some(b"data".to_vec()),
0,
Vlsn::new(0),
)
};
let entry_trunc = NameLnLogEntry::new(
make_base_ln(),
DbOperationType::Truncate,
None,
Some(42),
);
let entry_remove = NameLnLogEntry::new(
make_base_ln(),
DbOperationType::Remove,
None,
None,
);
assert_eq!(entry_trunc.log_size(), entry_remove.log_size() + 8);
}
#[test]
fn test_log_size_create_entry_includes_config() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let config = b"some_config_bytes".to_vec();
let make_base_ln = || {
LnLogEntry::new(
30,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"db".to_vec(),
Some(b"data".to_vec()),
0,
Vlsn::new(0),
)
};
let entry_create = NameLnLogEntry::new(
make_base_ln(),
DbOperationType::Create,
Some(config.clone()),
None,
);
let entry_remove = NameLnLogEntry::new(
make_base_ln(),
DbOperationType::Remove,
None,
None,
);
assert_eq!(
entry_create.log_size(),
entry_remove.log_size() + 4 + config.len()
);
}
#[test]
fn test_rename_operation_roundtrip() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let ln_entry = LnLogEntry::new(
5,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"mydb".to_vec(),
Some(b"newname".to_vec()),
0,
Vlsn::new(0),
);
let entry =
NameLnLogEntry::new(ln_entry, DbOperationType::Rename, None, None);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, false).unwrap();
assert_eq!(decoded.operation_type, DbOperationType::Rename);
assert!(decoded.replicated_create_config.is_none());
assert!(decoded.truncate_old_db_id.is_none());
}
#[test]
fn test_update_config_operation_roundtrip() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let config_bytes = b"updated_config".to_vec();
let ln_entry = LnLogEntry::new(
6,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"cfgdb".to_vec(),
Some(b"value".to_vec()),
0,
Vlsn::new(0),
);
let entry = NameLnLogEntry::new(
ln_entry,
DbOperationType::UpdateConfig,
Some(config_bytes.clone()),
None,
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, false).unwrap();
assert_eq!(decoded.operation_type, DbOperationType::UpdateConfig);
assert_eq!(decoded.replicated_create_config, Some(config_bytes));
assert!(decoded.truncate_old_db_id.is_none());
}
#[test]
fn test_none_operation_roundtrip() {
use super::LnLogEntry;
use noxu_util::vlsn::Vlsn;
let ln_entry = LnLogEntry::new(
7,
None,
NULL_LSN,
false,
None,
None,
Vlsn::new(0),
0,
false,
b"nonedb".to_vec(),
None,
0,
Vlsn::new(0),
);
let entry =
NameLnLogEntry::new(ln_entry, DbOperationType::None, None, None);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = NameLnLogEntry::read_from_log(&buf, false).unwrap();
assert_eq!(decoded.operation_type, DbOperationType::None);
assert!(decoded.replicated_create_config.is_none());
assert!(decoded.truncate_old_db_id.is_none());
}
#[test]
fn test_name_ln_log_entry_debug() {
let entry =
NameLnLogEntry::new_remove(1, None, NULL_LSN, b"x".to_vec());
let s = format!("{:?}", entry);
assert!(s.contains("NameLnLogEntry"));
}
#[test]
fn test_name_ln_log_entry_clone() {
let original =
NameLnLogEntry::new_remove(1, None, NULL_LSN, b"y".to_vec());
let cloned = original.clone();
assert_eq!(original.operation_type, cloned.operation_type);
assert_eq!(original.ln_entry.db_id, cloned.ln_entry.db_id);
}
#[test]
fn test_new_create_sets_fields() {
let entry = NameLnLogEntry::new_create(
42,
Some(10),
NULL_LSN,
b"createdb".to_vec(),
b"data".to_vec(),
b"cfg".to_vec(),
);
assert_eq!(entry.operation_type, DbOperationType::Create);
assert_eq!(entry.ln_entry.db_id, 42);
assert!(entry.replicated_create_config.is_some());
assert_eq!(entry.replicated_create_config.unwrap(), b"cfg");
assert!(entry.truncate_old_db_id.is_none());
}
#[test]
fn test_new_remove_sets_fields() {
let entry =
NameLnLogEntry::new_remove(77, Some(5), NULL_LSN, b"rdb".to_vec());
assert_eq!(entry.operation_type, DbOperationType::Remove);
assert_eq!(entry.ln_entry.db_id, 77);
assert!(entry.replicated_create_config.is_none());
assert!(entry.truncate_old_db_id.is_none());
}
#[test]
fn test_new_truncate_sets_fields() {
let entry = NameLnLogEntry::new_truncate(
88,
None,
NULL_LSN,
b"tdb".to_vec(),
b"val".to_vec(),
12345,
);
assert_eq!(entry.operation_type, DbOperationType::Truncate);
assert_eq!(entry.ln_entry.db_id, 88);
assert!(entry.replicated_create_config.is_none());
assert_eq!(entry.truncate_old_db_id, Some(12345));
}
#[test]
fn test_write_to_log_creates_nonempty_buffer() {
let entry =
NameLnLogEntry::new_remove(100, None, NULL_LSN, b"testdb".to_vec());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
assert!(!buf.is_empty());
assert_eq!(buf.len(), entry.log_size());
}
}