use byteorder::{BigEndian, ReadBytesExt};
use bytes::{BufMut, BytesMut};
use hashbrown::HashMap;
use std::io::{self, Cursor};
use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum RestoreRequiredError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("UTF-8 error: {0}")]
Utf8(#[from] std::string::FromUtf8Error),
#[error("Invalid failure type: {0}")]
InvalidFailureType(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum FailureType {
NetworkRestore,
LogChecksum,
BtreeCorruption,
}
impl FailureType {
pub fn as_str(&self) -> &'static str {
match self {
FailureType::NetworkRestore => "NETWORK_RESTORE",
FailureType::LogChecksum => "LOG_CHECKSUM",
FailureType::BtreeCorruption => "BTREE_CORRUPTION",
}
}
pub fn parse(s: &str) -> Result<Self, RestoreRequiredError> {
match s {
"NETWORK_RESTORE" => Ok(FailureType::NetworkRestore),
"LOG_CHECKSUM" => Ok(FailureType::LogChecksum),
"BTREE_CORRUPTION" => Ok(FailureType::BtreeCorruption),
_ => Err(RestoreRequiredError::InvalidFailureType(s.to_string())),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RestoreRequired {
pub failure_type: FailureType,
pub timestamp: u64,
pub properties: HashMap<String, String>,
}
impl RestoreRequired {
pub fn new(
failure_type: FailureType,
properties: HashMap<String, String>,
) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self { failure_type, timestamp, properties }
}
pub fn with_timestamp(
failure_type: FailureType,
timestamp: u64,
properties: HashMap<String, String>,
) -> Self {
Self { failure_type, timestamp, properties }
}
pub fn log_size(&self) -> usize {
let type_str = self.failure_type.as_str();
let mut size = 4 + type_str.len(); size += 8;
size += 4; for (key, value) in &self.properties {
size += 4 + key.len(); size += 4 + value.len(); }
size
}
pub fn write_to_log(&self, buf: &mut BytesMut) {
let type_str = self.failure_type.as_str();
buf.put_u32(type_str.len() as u32);
buf.extend_from_slice(type_str.as_bytes());
buf.put_u64(self.timestamp);
buf.put_u32(self.properties.len() as u32);
for (key, value) in &self.properties {
buf.put_u32(key.len() as u32);
buf.extend_from_slice(key.as_bytes());
buf.put_u32(value.len() as u32);
buf.extend_from_slice(value.as_bytes());
}
}
pub fn read_from_log(buf: &[u8]) -> Result<Self, RestoreRequiredError> {
let mut cursor = Cursor::new(buf);
let type_len = cursor.read_u32::<BigEndian>()? as usize;
let mut type_bytes = vec![0u8; type_len];
io::Read::read_exact(&mut cursor, &mut type_bytes)?;
let type_str = String::from_utf8(type_bytes)?;
let failure_type = FailureType::parse(&type_str)?;
let timestamp = cursor.read_u64::<BigEndian>()?;
let prop_count = cursor.read_u32::<BigEndian>()? as usize;
let mut properties = HashMap::new();
for _ in 0..prop_count {
let key_len = cursor.read_u32::<BigEndian>()? as usize;
let mut key_bytes = vec![0u8; key_len];
io::Read::read_exact(&mut cursor, &mut key_bytes)?;
let key = String::from_utf8(key_bytes)?;
let value_len = cursor.read_u32::<BigEndian>()? as usize;
let mut value_bytes = vec![0u8; value_len];
io::Read::read_exact(&mut cursor, &mut value_bytes)?;
let value = String::from_utf8(value_bytes)?;
properties.insert(key, value);
}
Ok(Self { failure_type, timestamp, properties })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_restore_required_roundtrip() {
let mut props = HashMap::new();
props.insert("error_file".to_string(), "0000001a.jdb".to_string());
props.insert("error_offset".to_string(), "12345".to_string());
let entry =
RestoreRequired::new(FailureType::LogChecksum, props.clone());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = RestoreRequired::read_from_log(&buf).unwrap();
assert_eq!(entry.failure_type, decoded.failure_type);
assert_eq!(entry.properties, decoded.properties);
}
#[test]
fn test_all_failure_types() {
for failure_type in [
FailureType::NetworkRestore,
FailureType::LogChecksum,
FailureType::BtreeCorruption,
] {
let entry = RestoreRequired::new(failure_type, HashMap::new());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = RestoreRequired::read_from_log(&buf).unwrap();
assert_eq!(failure_type, decoded.failure_type);
}
}
#[test]
fn test_failure_type_string_conversion() {
assert_eq!(FailureType::NetworkRestore.as_str(), "NETWORK_RESTORE");
assert_eq!(
FailureType::parse("NETWORK_RESTORE").unwrap(),
FailureType::NetworkRestore
);
}
#[test]
fn test_empty_properties() {
let entry =
RestoreRequired::new(FailureType::BtreeCorruption, HashMap::new());
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
let decoded = RestoreRequired::read_from_log(&buf).unwrap();
assert!(decoded.properties.is_empty());
}
}