use std::fmt::Display;
use crate::storage::{
codec::SolEventCodec,
store::{EventStore, StoredFile},
};
use alloy::json_abi::Event;
use anyhow::Result;
use tracing::info;
#[derive(Debug, Clone)]
pub struct MergeGroup {
pub sources: Vec<StoredFile>,
}
impl MergeGroup {
pub fn init(file: StoredFile) -> Self {
MergeGroup {
sources: vec![file],
}
}
pub fn append(&mut self, file: StoredFile) {
self.sources.push(file);
}
pub fn range(&self) -> (u64, u64) {
let start = self.sources.first().map(|f| f.block_start).unwrap();
let end = self.sources.last().map(|f| f.block_end).unwrap();
(start, end)
}
pub fn size(&self) -> u64 {
self.sources.iter().map(|f| f.meta.size).sum()
}
fn filename(&self) -> String {
let (start, end) = self.range();
format!("{:012}-{:012}.parquet", start, end)
}
fn qualified(&self) -> bool {
self.sources.len() > 1
}
}
impl Display for MergeGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (start, end) = self.range();
write!(
f,
"blocks {}-{} ({} files, {} bytes) -> {}",
start,
end,
self.sources.len(),
self.size(),
self.filename()
)
}
}
#[derive(Debug, Clone)]
pub struct MergePlan {
pub candidates: Vec<MergeGroup>,
pub target_size_bytes: u64,
pub skipped: Vec<StoredFile>,
}
impl MergePlan {
pub fn new(target_size_bytes: u64) -> Self {
MergePlan {
candidates: vec![],
target_size_bytes,
skipped: vec![],
}
}
fn add(&mut self, file: StoredFile) {
if file.meta.size >= self.target_size_bytes {
self.skipped.push(file);
return;
}
if self.candidates.is_empty() {
self.candidates.push(MergeGroup::init(file));
return;
}
let tail = self.candidates.last().unwrap();
if tail.size() >= self.target_size_bytes {
self.candidates.push(MergeGroup::init(file));
return;
}
if tail.range().1 + 1 != file.block_start {
self.candidates.push(MergeGroup::init(file));
return;
}
self.candidates.last_mut().unwrap().append(file);
}
fn finalize(&mut self) {
let unqualified = self
.candidates
.iter()
.filter(|g| !g.qualified())
.cloned()
.flat_map(|g| g.sources)
.collect::<Vec<_>>();
self.skipped.extend(unqualified);
self.candidates.retain(|g| g.qualified());
}
}
impl Display for MergePlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "Merge Plan:")?;
for group in &self.candidates {
writeln!(f, " - {}", group)?;
}
if !self.skipped.is_empty() {
writeln!(f, "Skipped files:")?;
for file in &self.skipped {
writeln!(
f,
" - {} ({} bytes)",
file.path().filename().unwrap(),
file.meta.size
)?;
}
}
Ok(())
}
}
pub struct EventFileMerger {
store: EventStore,
byte_size: u64,
}
impl EventFileMerger {
pub fn new(base_uri: impl AsRef<str>, byte_size: u64, event: Event) -> Result<Self> {
let codec = SolEventCodec::new(&event)?;
let store = EventStore::from_uri(&base_uri, &codec)?;
Ok(EventFileMerger { store, byte_size })
}
pub async fn generate_plan(&self) -> Result<MergePlan> {
let files: Vec<StoredFile> = self.store.list().await?.files;
let mut plan = MergePlan::new(self.byte_size);
for file in &files {
plan.add(file.clone());
}
plan.finalize();
Ok(plan)
}
pub async fn execute_plan(&self, plan: MergePlan) -> Result<()> {
if plan.candidates.is_empty() {
info!("No files to merge...");
return Ok(());
}
for group in &plan.candidates {
if !group.qualified() {
continue;
}
info!("Merging {}", group);
self.store
.merge_files(&group.sources, &group.filename())
.await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use chrono::DateTime;
use object_store::ObjectMeta;
use super::*;
fn mock_file(start: u64, end: u64, size: u64) -> StoredFile {
StoredFile {
block_start: start,
block_end: end,
meta: ObjectMeta {
size,
last_modified: DateTime::default(),
location: "".into(),
e_tag: None,
version: None,
},
}
}
#[test]
fn test_merge_group() -> Result<()> {
let file1 = mock_file(0, 100, 500);
let file2 = mock_file(101, 200, 600);
let mut group = MergeGroup::init(file1.clone());
assert_eq!(group.range(), (0, 100));
assert_eq!(group.size(), 500);
assert!(!group.qualified());
group.append(file2.clone());
assert_eq!(group.range(), (0, 200));
assert_eq!(group.size(), 1100);
assert!(group.qualified());
assert_eq!(group.filename(), "000000000000-000000000200.parquet");
Ok(())
}
#[test]
fn test_merge_plan() -> Result<()> {
let target_size = 1000;
let file1 = mock_file(0, 100, 500);
let file2 = mock_file(101, 200, 400);
let file3 = mock_file(201, 300, 1500);
let file4 = mock_file(301, 400, 700);
let file5 = mock_file(401, 500, 800);
let mut plan = MergePlan::new(target_size);
plan.add(file1.clone());
plan.add(file2.clone());
plan.add(file3.clone());
plan.add(file4.clone());
plan.add(file5.clone());
plan.finalize();
assert_eq!(plan.candidates.len(), 2);
let group = &plan.candidates[0];
assert_eq!(group.range(), (0, 200));
let group = &plan.candidates[1];
assert_eq!(group.range(), (301, 500));
assert_eq!(plan.skipped.len(), 1);
assert!(plan.skipped.contains(&file3));
Ok(())
}
}