use std::sync::Arc;
use alloy::json_abi::Event;
use alloy::rpc::types::Filter;
use anyhow::Result;
use futures_util::stream::StreamExt;
use tempfile::TempDir;
use ethl::archive::extract::EventExtractor;
use ethl::rpc::config::ProviderSettings;
use ethl::storage::{
codec::SolEventCodec,
store::{EventStore, parse_store_uri},
};
#[tokio::test]
#[ntest::timeout(10000)]
async fn test_backfill_and_extract() -> Result<()> {
let temp_dir = TempDir::new()?;
let temp_path = temp_dir.path().to_string_lossy().to_string();
let file_uri = format!("file://{}", temp_path);
let codec = SolEventCodec::new(&Event::parse(
"event Transfer(address indexed from, address indexed to, uint256 value)",
)?)?;
let settings =
ProviderSettings::http(option_env!("TEST_RPC_URL").unwrap_or("https://0xrpc.io/base"))?;
let filter = Filter::new().from_block(34762687).to_block(34762688);
let (store, path) = parse_store_uri(&file_uri)?;
let mut indexer = EventExtractor::new(
Arc::new(store),
path,
10000,
settings,
vec![codec.event.clone()],
filter,
)?;
indexer.extract().await?;
assert!(
std::path::Path::new(&format!(
"{}/transfer-0xddf252ad/000034762687-000034762688.parquet",
temp_path
))
.exists()
);
let store = EventStore::from_uri(&file_uri, &codec)?;
let stream = store.read_all().await?;
let data = stream.collect::<Vec<_>>().await;
assert!(!data.is_empty());
assert_eq!(1, data.len());
assert_eq!(1510, data[0].num_rows());
Ok(())
}