use std::fmt::Display;
use anyhow::Result;
use object_store::{ObjectMeta, path::Path};
use crate::storage::store::EventStoreError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IntegrityStatus {
Intact,
Repairable,
Unrepairable,
}
#[derive(Debug, Clone)]
pub struct StoreIntegrityReport {
pub gaps: Vec<(u64, u64)>,
pub orphans: Vec<StoredFile>,
pub overlaps: Vec<(u64, u64)>,
}
impl StoreIntegrityReport {
pub fn status(&self) -> IntegrityStatus {
if !self.gaps.is_empty() || !self.overlaps.is_empty() {
IntegrityStatus::Unrepairable
} else if !self.orphans.is_empty() {
IntegrityStatus::Repairable
} else {
IntegrityStatus::Intact
}
}
}
impl Display for StoreIntegrityReport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.status() {
IntegrityStatus::Intact => write!(f, "Store integrity: intact"),
IntegrityStatus::Repairable => {
write!(f, "Store integrity: repairable")?;
for file in &self.orphans {
writeln!(
f,
" - {} (blocks {} to {})",
file.path().filename().unwrap_or_default(),
file.block_start,
file.block_end
)?;
}
Ok(())
}
IntegrityStatus::Unrepairable => {
writeln!(f, "Store integrity: unrepairable")?;
if !self.gaps.is_empty() {
writeln!(f, "Gaps in stored files:")?;
for (start, end) in &self.gaps {
writeln!(f, " - blocks {} to {}", start, end)?;
}
}
if !self.overlaps.is_empty() {
writeln!(f, "Overlapping files:")?;
for (start, end) in &self.overlaps {
writeln!(f, " - blocks {} to {}", start, end)?;
}
}
Ok(())
}
}
}
}
#[derive(Debug, Clone, Eq)]
pub struct StoredFile {
pub block_start: u64,
pub block_end: u64,
pub meta: ObjectMeta,
}
impl StoredFile {
pub fn new(meta: ObjectMeta) -> Result<Self> {
let path = &meta.location;
let parts: Vec<&str> = path.filename().unwrap().split('-').collect();
if parts.len() == 2 {
let block_start = parts[0].parse()?;
let block_end = parts[1].replace(".parquet", "").parse()?;
Ok(Self {
block_start,
block_end,
meta,
})
} else {
Err(anyhow::anyhow!("Invalid file name format"))
}
}
pub fn path(&self) -> &Path {
&self.meta.location
}
pub fn range(&self) -> (u64, u64) {
(self.block_start, self.block_end)
}
pub fn overlaps_range(&self, start: u64, end: u64) -> bool {
self.block_start <= end && start <= self.block_end
}
}
impl PartialEq for StoredFile {
fn eq(&self, other: &Self) -> bool {
self.block_start == other.block_start
}
}
impl Ord for StoredFile {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.block_start.cmp(&other.block_start)
}
}
impl PartialOrd for StoredFile {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
pub struct StoreDirectory {
pub files: Vec<StoredFile>,
}
impl From<Vec<StoredFile>> for StoreDirectory {
fn from(files: Vec<StoredFile>) -> Self {
StoreDirectory { files }
}
}
impl StoreDirectory {
pub fn stored_range(&self) -> (u64, u64) {
if self.files.is_empty() {
return (0, 0);
}
(
self.files[0].block_start,
self.files[self.files.len() - 1].block_end,
)
}
pub fn sanitize(self) -> Result<Self, EventStoreError> {
let integrity = self.integrity_report();
match integrity.status() {
IntegrityStatus::Unrepairable => Err(EventStoreError::IntegrityError(integrity)),
IntegrityStatus::Repairable => {
let files = self
.files
.into_iter()
.filter(|f| !integrity.orphans.contains(f))
.collect::<Vec<_>>();
Ok(StoreDirectory { files })
}
IntegrityStatus::Intact => {
Ok(self)
}
}
}
fn orphans(&self) -> Vec<StoredFile> {
let mut orphans = Vec::new();
if self.files.is_empty() {
return orphans;
}
for file in &self.files {
for other in &self.files {
if file != other
&& file.block_start >= other.block_start
&& file.block_end <= other.block_end
{
orphans.push(file.clone());
}
}
}
orphans
}
fn gaps(&self) -> Vec<(u64, u64)> {
self.files
.windows(2)
.filter(|w| w[1].block_start - 1 > w[0].block_end)
.map(|w| (w[0].block_end + 1, w[1].block_start - 1))
.collect::<Vec<_>>()
}
fn overlaps(&self) -> Vec<(u64, u64)> {
self.files
.windows(2)
.filter(|w| w[0].block_end >= w[1].block_start)
.map(|w| (w[1].block_start, w[0].block_end))
.collect::<Vec<_>>()
}
pub fn integrity_report(&self) -> StoreIntegrityReport {
let orphans = self.orphans();
let target = if orphans.is_empty() {
self
} else {
let files = self
.files
.iter()
.filter(|f| !orphans.contains(f))
.cloned()
.collect::<Vec<_>>();
&StoreDirectory { files }
};
StoreIntegrityReport {
gaps: target.gaps(),
orphans,
overlaps: target.overlaps(),
}
}
}
#[cfg(test)]
mod tests {
use crate::storage::store::{StoreDirectory, StoredFile};
use anyhow::Result;
use chrono::DateTime;
use object_store::{ObjectMeta, path::Path};
fn mock_file(location: &str) -> Result<StoredFile> {
StoredFile::new(ObjectMeta {
location: Path::parse(location).unwrap(),
last_modified: DateTime::default(),
size: 0,
e_tag: None,
version: None,
})
}
#[test]
pub fn test_name_parsing() -> Result<()> {
let file = mock_file("000000000100-000000000200.parquet").unwrap();
assert_eq!(file.block_start, 100);
assert_eq!(file.block_end, 200);
assert!(mock_file("invalid-name.parquet").is_err());
Ok(())
}
#[test]
pub fn test_overlap() {
let file = mock_file("000000000100-000000000200.parquet").unwrap();
assert_eq!(file.block_start, 100);
assert_eq!(file.block_end, 200);
assert!(file.overlaps_range(98, 101));
assert!(file.overlaps_range(98, 201));
assert!(file.overlaps_range(140, 160));
assert!(file.overlaps_range(198, 201));
assert!(!file.overlaps_range(201, 2000));
assert!(!file.overlaps_range(97, 98));
}
#[test]
fn test_integrity_overlap() -> Result<()> {
let files: StoreDirectory = vec![
mock_file("000000000001-000000000010.parquet")?,
mock_file("000000000011-000000000020.parquet")?,
mock_file("000000000021-000000000030.parquet")?,
]
.into();
assert!(files.overlaps().is_empty(), "Expected no overlaps");
let files: StoreDirectory = vec![
mock_file("000000000001-000000000010.parquet")?,
mock_file("000000000010-000000000020.parquet")?,
mock_file("000000000021-000000000030.parquet")?,
mock_file("000000000025-000000000032.parquet")?,
]
.into();
println!("Overlaps: {:?}", files.overlaps());
assert!(!files.overlaps().is_empty(), "Expected overlaps");
assert_eq!(files.overlaps()[0], (10, 10));
assert_eq!(files.overlaps()[1], (25, 30));
Ok(())
}
#[test]
fn test_integrity_gap() -> Result<()> {
let files: StoreDirectory = vec![
mock_file("000000000001-000000000010.parquet")?,
mock_file("000000000011-000000000020.parquet")?,
mock_file("000000000021-000000000030.parquet")?,
]
.into();
assert!(files.gaps().is_empty(), "Expected no gaps");
let files: StoreDirectory = vec![
mock_file("000000000001-000000000010.parquet")?,
mock_file("000000000012-000000000020.parquet")?,
mock_file("000000000021-000000000030.parquet")?,
mock_file("000000000210-000000000300.parquet")?,
]
.into();
println!("Gaps: {:?}", files.gaps());
assert!(!files.gaps().is_empty(), "Expected gaps");
assert_eq!(files.gaps()[0], (11, 11), "Expected (11,11) gap");
assert_eq!(files.gaps()[1], (31, 209), "Expected (31,209) gap");
Ok(())
}
#[test]
fn test_integrity_orphan() -> Result<()> {
let dir: StoreDirectory = vec![
mock_file("000000000001-000000000030.parquet").unwrap(),
mock_file("000000000031-000000000040.parquet").unwrap(),
mock_file("000000000041-000000000050.parquet").unwrap(),
]
.into();
assert!(dir.orphans().is_empty(), "Expected no gaps");
let dir: StoreDirectory = vec![
mock_file("000000000001-000000000030.parquet").unwrap(),
mock_file("000000000015-000000000020.parquet").unwrap(),
mock_file("000000000025-000000000030.parquet").unwrap(),
]
.into();
println!("Orphans: {:?}", dir.orphans());
assert!(!dir.orphans().is_empty(), "Expected orphans");
assert_eq!(dir.orphans()[0].range(), (15, 20));
assert_eq!(dir.orphans()[1].range(), (25, 30));
Ok(())
}
#[test]
fn test_sanitize() -> Result<()> {
let dir: StoreDirectory = vec![
mock_file("000000000001-000000000030.parquet").unwrap(),
mock_file("000000000015-000000000020.parquet").unwrap(),
mock_file("000000000025-000000000030.parquet").unwrap(),
]
.into();
assert_eq!(
dir.sanitize()?.stored_range(),
(1, 30),
"Orphans should be ignored"
);
let dir: StoreDirectory = vec![
mock_file("000000000015-000000000020.parquet").unwrap(),
mock_file("000000000025-000000000030.parquet").unwrap(),
]
.into();
assert!(dir.sanitize().is_err(), "Expected integrity error");
let dir: StoreDirectory = vec![
mock_file("000000000015-000000000020.parquet").unwrap(),
mock_file("000000000020-000000000030.parquet").unwrap(),
]
.into();
assert!(dir.sanitize().is_err(), "Expected integrity error");
let dir: StoreDirectory = vec![
mock_file("000000000015-000000000020.parquet").unwrap(),
mock_file("000000000021-000000000030.parquet").unwrap(),
]
.into();
assert!(dir.sanitize().is_ok());
Ok(())
}
}