use core::panic;
use std::sync::Arc;
use anyhow::Result;
use arrow::{
array::{AsArray, RecordBatch, UInt64Array},
compute::kernels::cmp::{gt_eq, lt_eq},
datatypes::{Schema, UInt64Type},
};
use async_stream::stream;
use futures_util::{Stream, StreamExt, TryStreamExt};
use object_store::{ObjectStore, ObjectStoreScheme, aws::AmazonS3Builder, parse_url, path::Path};
use parquet::{
arrow::{
AsyncArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask,
arrow_reader::{ArrowPredicate, ArrowPredicateFn, RowFilter},
async_reader::ParquetObjectReader,
async_writer::ParquetObjectWriter,
},
basic::{Compression, ZstdLevel},
errors::ParquetError,
file::properties::{WriterProperties, WriterVersion},
schema::types::SchemaDescriptor,
};
use reqwest::Url;
use thiserror::Error;
use crate::storage::{codec::SolEventCodec, store::directory::IntegrityStatus};
pub mod directory;
pub use directory::{StoreDirectory, StoreIntegrityReport, StoredFile};
#[derive(Debug, Clone, Error)]
pub enum EventStoreError {
#[error("Integrity Error: {0}")]
IntegrityError(StoreIntegrityReport),
}
pub struct EventStore {
store: Arc<dyn ObjectStore>,
path: Path,
schema: SchemaDescriptor,
arrow_schema: Arc<Schema>,
writer_properties: Option<WriterProperties>,
}
impl EventStore {
pub fn new(store: Arc<dyn ObjectStore>, path: Path, codec: &SolEventCodec) -> Result<Self> {
Ok(Self {
store,
path: path.child(codec.event_id()),
arrow_schema: codec.schema.clone(),
schema: codec.parquet_schema(),
writer_properties: Some(
WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(false)
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.build(),
),
})
}
pub fn from_uri(base_uri: impl AsRef<str>, codec: &SolEventCodec) -> Result<Self> {
let (store, path) = parse_store_uri(base_uri)?;
Self::new(Arc::new(store), path, codec)
}
pub async fn stored_range(&self) -> Result<(u64, u64)> {
self.list_sanitized().await.map(|dir| dir.stored_range())
}
async fn range_archive_files(
&self,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<Vec<StoredFile>> {
let from_block = from_block.unwrap_or(0);
let to_block = to_block.unwrap_or(u64::MAX);
let files = self
.list_sanitized()
.await?
.files
.into_iter()
.filter(|file| file.overlaps_range(from_block, to_block))
.collect();
Ok(files)
}
pub async fn list(&self) -> Result<StoreDirectory> {
let mut files = Vec::new();
let mut entries = self.store.list(Some(&self.path));
while let Some(meta) = entries.next().await.transpose()? {
if let Some(filename) = &meta.location.filename() {
match StoredFile::new(meta.clone()) {
Ok(file) => files.push(file),
Err(_) => {
tracing::warn!("Skipping file with invalid name: {}", filename);
}
}
}
}
files.sort();
Ok(files.into())
}
pub async fn list_sanitized(&self) -> Result<StoreDirectory> {
self.list().await?.sanitize().map_err(|err| err.into())
}
pub async fn read_all(&self) -> Result<impl Stream<Item = RecordBatch>> {
let files = self.list_sanitized().await?.files;
self.read_chunks(files, None, None).await
}
pub async fn read_range(
&self,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<impl Stream<Item = RecordBatch>> {
let files = self.range_archive_files(from_block, to_block).await?;
self.read_chunks(files, from_block, to_block).await
}
async fn read_chunks(
&self,
files: Vec<StoredFile>,
from_block: Option<u64>,
to_block: Option<u64>,
) -> Result<impl Stream<Item = RecordBatch>> {
let from_block = from_block.unwrap_or(0);
let to_block = to_block.unwrap_or(u64::MAX);
Ok(stream! {
for file in files {
tracing::trace!("Reading file: {}", file.path());
if !file.overlaps_range(from_block, to_block) {
continue;
}
let filter = if file.block_start < from_block || file.block_end > to_block {
let mut predicates: Vec<Box<dyn ArrowPredicate>> = Vec::new();
if file.block_start < from_block {
predicates.push(from_block_predicate(&self.schema, from_block));
}
if file.block_end > to_block {
predicates.push(to_block_predicate(&self.schema, to_block));
}
Some(RowFilter::new(predicates))
} else {
None
};
let batches = self.read_chunk(file.path(), filter).await;
match batches {
Ok(batches) => {
for batch in batches {
yield batch;
}
}
Err(e) => {
tracing::warn!("Error reading file {}: {}", file.path(), e);
panic!("Error reading file {}: {}", file.path(), e);
}
}
}
})
}
async fn read_chunk(&self, path: &Path, filter: Option<RowFilter>) -> Result<Vec<RecordBatch>> {
let reader = ParquetObjectReader::new(self.store.clone(), path.clone())
.with_preload_column_index(filter.is_some());
let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
.await?
.with_batch_size(100_000);
if let Some(f) = filter {
builder = builder.with_row_filter(f);
}
let stream = builder.build()?;
let result = stream.try_collect::<Vec<_>>().await?;
Ok(result)
}
#[allow(dead_code)]
async fn stream_chunk_batches(
&self,
path: &Path,
filter: Option<RowFilter>,
) -> Result<impl Stream<Item = Result<RecordBatch, ParquetError>>> {
let reader = ParquetObjectReader::new(self.store.clone(), path.clone())
.with_preload_column_index(filter.is_some());
let mut builder = ParquetRecordBatchStreamBuilder::new(reader)
.await?
.with_batch_size(100_000);
if let Some(f) = filter {
builder = builder.with_row_filter(f);
}
let stream = builder.build()?;
Ok(stream)
}
pub async fn write_records(&self, block_range: (u64, u64), records: RecordBatch) -> Result<()> {
let path = self.path.child(format!(
"{:012}-{:012}.parquet",
block_range.0, block_range.1
));
let writer = ParquetObjectWriter::new(self.store.clone(), path);
let mut arrow_writer =
AsyncArrowWriter::try_new(writer, records.schema(), self.writer_properties.clone())?;
arrow_writer.write(&records).await?;
arrow_writer.flush().await?;
arrow_writer.finish().await?;
Ok(())
}
pub(super) async fn merge_files(&self, files: &[StoredFile], filename: &str) -> Result<()> {
let path = self.path.child(filename);
let writer = ParquetObjectWriter::new(self.store.clone(), path);
let mut arrow_writer = AsyncArrowWriter::try_new(
writer,
self.arrow_schema.clone(),
self.writer_properties.clone(),
)?;
for file in files {
let batches = self.read_chunk(file.path(), None).await?;
for batch in batches {
arrow_writer.write(&batch).await?;
arrow_writer.flush().await?;
}
}
arrow_writer.finish().await?;
self.remove_files(files).await?;
Ok(())
}
pub async fn repair(&self) -> Result<()> {
let list = self.list().await?;
let integrity = list.integrity_report();
match integrity.status() {
IntegrityStatus::Intact => {
tracing::info!("Archive is valid, no repairs needed");
Ok(())
}
IntegrityStatus::Repairable => {
tracing::info!(
"Archive has integrity issues, attempting repair: {}",
integrity
);
self.remove_files(&integrity.orphans).await?;
Ok(())
}
IntegrityStatus::Unrepairable => Err(EventStoreError::IntegrityError(integrity).into()),
}
}
async fn remove_files(&self, files: &[StoredFile]) -> Result<()> {
for file in files {
self.store.delete(file.path()).await?;
}
Ok(())
}
}
fn from_block_predicate(schema: &SchemaDescriptor, block: u64) -> Box<dyn ArrowPredicate> {
let projection_mask = ProjectionMask::leaves(schema, [0]); let predicate = move |batch: RecordBatch| {
let scalar_0 = UInt64Array::new_scalar(block);
let column = batch.column(0).as_primitive::<UInt64Type>();
gt_eq(column, &scalar_0)
};
Box::new(ArrowPredicateFn::new(projection_mask, predicate))
}
fn to_block_predicate(schema: &SchemaDescriptor, block: u64) -> Box<dyn ArrowPredicate> {
let projection_mask = ProjectionMask::leaves(schema, [0]); let predicate = move |batch: RecordBatch| {
let scalar_0 = UInt64Array::new_scalar(block);
let column = batch.column(0).as_primitive::<UInt64Type>();
lt_eq(column, &scalar_0)
};
Box::new(ArrowPredicateFn::new(projection_mask, predicate))
}
pub fn parse_store_uri(uri: impl AsRef<str>) -> Result<(Box<dyn ObjectStore>, Path)> {
let url = Url::parse(uri.as_ref())?;
if let Ok((scheme, _)) = ObjectStoreScheme::parse(&url)
&& scheme == ObjectStoreScheme::AmazonS3
{
let builder = AmazonS3Builder::from_env().with_url(url.clone()).build()?;
return Ok((Box::new(builder), Path::from(url.path())));
}
parse_url(&url).map_err(|e| e.into())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use alloy::json_abi::Event;
use anyhow::Result;
use arrow::{
array::{AsArray, RecordBatch},
datatypes::UInt64Type,
};
use futures_util::StreamExt;
use crate::storage::codec::SolEventCodec;
async fn create_archive_with_data(path: &str) -> Result<super::EventStore> {
let _ = std::fs::remove_dir_all(path);
let uri = format!("file://{path}");
let codec = SolEventCodec::new(&Event::parse("event Hello()")?)?;
let archive = super::EventStore::from_uri(&uri, &codec).expect("Failed to create archive");
let batch1 = RecordBatch::try_new(
codec.schema.clone(),
vec![
Arc::new(arrow::array::UInt64Array::from(vec![1, 1, 2])),
Arc::new(arrow::array::UInt32Array::from(vec![1, 1, 2])),
Arc::new(arrow::array::StringArray::from(vec!["0x0", "0x1", "0x2"])),
],
)
.expect("batch creation failed");
let batch2 = RecordBatch::try_new(
codec.schema.clone(),
vec![
Arc::new(arrow::array::UInt64Array::from(vec![3, 3, 4])),
Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 3])),
Arc::new(arrow::array::StringArray::from(vec!["0x0", "0x1", "0x2"])),
],
)
.expect("batch creation failed");
archive
.write_records((1, 2), batch1)
.await
.expect("Failed to write event range");
archive
.write_records((3, 4), batch2)
.await
.expect("Failed to write event range");
Ok(archive)
}
#[tokio::test]
async fn test_lifecycle() -> Result<()> {
let archive = create_archive_with_data("/tmp/test_archive/test_event").await?;
assert_eq!(
archive.path.to_string(),
"tmp/test_archive/test_event/hello-0xbcdfe0d5"
);
let dir = archive.list().await.expect("Failed to list archive files");
assert!(!dir.files.is_empty(), "No archive files found");
assert!(
dir.files.iter().any(|f| f
.path()
.to_string()
.contains("000000000001-000000000002.parquet")),
"Expected file not found"
);
let range = archive
.stored_range()
.await
.expect("Failed to get stored range");
assert_eq!(range, (1, 4), "Stored range does not match expected value");
let range_files = archive
.range_archive_files(Some(1), Some(4))
.await
.expect("Failed to get range archive files");
assert_eq!(
range_files.len(),
2,
"Range files count does not match expected value"
);
let range_files = archive
.range_archive_files(Some(4), Some(5))
.await
.expect("Failed to get range archive files");
assert_eq!(
range_files.len(),
1,
"Range files should be empty for non-existing range"
);
let range_files = archive
.range_archive_files(None, Some(1))
.await
.expect("Failed to get range archive files");
assert_eq!(
range_files.len(),
1,
"Range files should be empty for non-existing range"
);
let range = archive.read_range(Some(1), Some(4)).await?;
let batches: Vec<RecordBatch> = range.collect().await;
assert_eq!(batches.len(), 2, "Expected 2 batches in range");
assert_eq!(batches[0].num_rows(), 3, "First batch row count mismatch");
assert_eq!(batches[1].num_rows(), 3, "Second batch row count mismatch");
let range = archive.read_range(Some(4), Some(4)).await?;
let batches: Vec<RecordBatch> = range.collect().await;
assert_eq!(batches.len(), 1, "Expected 1 batches in range");
assert_eq!(batches[0].num_rows(), 1, "First batch row count mismatch");
let range = archive.read_range(Some(1), Some(1)).await?;
let batches: Vec<RecordBatch> = range.collect().await;
assert_eq!(batches.len(), 1, "Expected 1 batches in range");
assert_eq!(batches[0].num_rows(), 2, "First batch row count mismatch");
Ok(())
}
#[tokio::test]
async fn test_merge_files() -> Result<()> {
let archive = create_archive_with_data("/tmp/test_archive/test_merge").await?;
let files = archive
.list()
.await
.expect("Failed to list archive files")
.files;
assert_eq!(files.len(), 2, "Expected 2 files before merge");
archive
.merge_files(&files, "000000000001-000000000004.parquet")
.await
.expect("Failed to merge files");
let merged_files = archive
.list()
.await
.expect("Failed to list archive files after merge")
.files;
assert_eq!(merged_files.len(), 1, "Expected 1 file after merge");
assert!(
merged_files.iter().any(|f| f
.path()
.to_string()
.contains("000000000001-000000000004.parquet")),
"Merged file not found"
);
let range = archive.read_all().await?;
let batches: Vec<RecordBatch> = range.collect().await;
let columns = batches
.iter()
.flat_map(|b| {
b.column(0)
.as_primitive::<UInt64Type>()
.into_iter()
.collect::<Vec<Option<u64>>>()
})
.flatten()
.collect::<Vec<_>>();
assert_eq!(columns.len(), 6);
assert_eq!(columns, vec![1, 1, 2, 3, 3, 4]);
Ok(())
}
#[tokio::test]
async fn test_contiguous_file_set_and_orphan_detection() -> Result<()> {
Ok(())
}
}