use alloy_chains::NamedChain;
use alloy_primitives::BlockNumber;
use alloy_provider::Provider;
use chrono::{DateTime, Datelike, NaiveDate, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, OnceCell};
use tracing::{debug, info};
use crate::blocks::cache::{BlockWindowCache, CacheKey, DiskCache};
use crate::errors::{BlockWindowError, RpcError};
use crate::tracing::spans;
use crate::types::config::BlockCount;
pub const DEFAULT_HEAD_TTL: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct UnixTimestamp(pub i64);
impl UnixTimestamp {
pub fn from_datetime(dt: DateTime<Utc>) -> Self {
Self(dt.timestamp())
}
pub fn from_u64(ts: u64) -> Self {
Self(ts as i64)
}
pub fn as_u64(&self) -> u64 {
self.0 as u64
}
pub fn pred(&self) -> Self {
Self(self.0 - 1)
}
}
impl std::fmt::Display for UnixTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DailyBlockWindow {
pub start_block: BlockNumber,
pub end_block: BlockNumber,
pub start_ts: UnixTimestamp,
pub end_ts_exclusive: UnixTimestamp,
}
impl DailyBlockWindow {
pub fn new(
start_block: BlockNumber,
end_block: BlockNumber,
start_ts: UnixTimestamp,
end_ts_exclusive: UnixTimestamp,
) -> Result<Self, BlockWindowError> {
if end_block < start_block {
return Err(BlockWindowError::invalid_range(start_block, end_block));
}
if end_ts_exclusive.0 <= start_ts.0 {
return Err(BlockWindowError::invalid_timestamp_range(
start_ts,
end_ts_exclusive,
));
}
Ok(Self {
start_block,
end_block,
start_ts,
end_ts_exclusive,
})
}
pub fn block_count(&self) -> BlockCount {
let count = self
.end_block
.saturating_sub(self.start_block)
.saturating_add(1);
BlockCount::new(count)
}
}
pub struct BlockWindowCalculator<P> {
provider: P,
cache: Box<dyn BlockWindowCache>,
bounds_memo: ChainBoundsMemo,
eager_head_ts: bool,
}
struct ChainBoundsMemo {
genesis: OnceCell<UnixTimestamp>,
head: Mutex<Option<HeadEntry>>,
head_ttl: Duration,
}
#[derive(Clone, Copy)]
struct HeadEntry {
fetched_at: Instant,
latest_block: BlockNumber,
latest_ts: Option<UnixTimestamp>,
}
impl ChainBoundsMemo {
fn new(head_ttl: Duration) -> Self {
Self {
genesis: OnceCell::new(),
head: Mutex::new(None),
head_ttl,
}
}
async fn get_or_fetch_genesis<F, Fut>(
&self,
fetch: F,
) -> Result<UnixTimestamp, BlockWindowError>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
{
self.genesis.get_or_try_init(fetch).await.copied()
}
async fn get_or_fetch_head<F, Fut>(
&self,
fetch: F,
) -> Result<(BlockNumber, UnixTimestamp), BlockWindowError>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<(BlockNumber, UnixTimestamp), BlockWindowError>>,
{
let mut guard = self.head.lock().await;
if let Some(entry) = guard.as_ref() {
if entry.fetched_at.elapsed() < self.head_ttl {
if let Some(latest_ts) = entry.latest_ts {
return Ok((entry.latest_block, latest_ts));
}
}
}
let (latest_block, latest_ts) = fetch().await?;
*guard = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block,
latest_ts: Some(latest_ts),
});
Ok((latest_block, latest_ts))
}
async fn get_or_fetch_latest_block<F, Fut>(
&self,
fetch: F,
) -> Result<(BlockNumber, Option<UnixTimestamp>), BlockWindowError>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<BlockNumber, BlockWindowError>>,
{
let mut guard = self.head.lock().await;
if let Some(entry) = guard.as_ref() {
if entry.fetched_at.elapsed() < self.head_ttl {
return Ok((entry.latest_block, entry.latest_ts));
}
}
let latest_block = fetch().await?;
*guard = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block,
latest_ts: None,
});
Ok((latest_block, None))
}
}
impl<P: Provider> BlockWindowCalculator<P> {
pub fn new(provider: P, cache: Box<dyn BlockWindowCache>) -> Self {
Self {
provider,
cache,
bounds_memo: ChainBoundsMemo::new(DEFAULT_HEAD_TTL),
eager_head_ts: false,
}
}
pub fn with_head_ttl(mut self, ttl: Duration) -> Self {
self.bounds_memo.head_ttl = ttl;
self
}
pub fn with_disk_cache(
provider: P,
cache_path: impl AsRef<Path>,
) -> Result<Self, BlockWindowError> {
let cache = DiskCache::new(cache_path.as_ref()).validate()?;
Ok(Self {
provider,
cache: Box::new(cache),
bounds_memo: ChainBoundsMemo::new(DEFAULT_HEAD_TTL),
eager_head_ts: true,
})
}
pub fn with_memory_cache(provider: P) -> Self {
use crate::blocks::cache::MemoryCache;
Self::new(provider, Box::new(MemoryCache::new()))
}
pub fn without_cache(provider: P) -> Self {
use crate::blocks::cache::NoOpCache;
Self::new(provider, Box::new(NoOpCache))
}
pub async fn cache_stats(&self) -> crate::blocks::cache::CacheStats {
self.cache.stats().await
}
async fn get_block_timestamp(
&self,
block_number: BlockNumber,
) -> Result<UnixTimestamp, BlockWindowError> {
let span = spans::get_block_timestamp(block_number);
let _guard = span.enter();
let block = self
.provider
.get_block_by_number(block_number.into())
.await
.map_err(|e| RpcError::get_block_failed(block_number, e))?
.ok_or_else(|| RpcError::BlockNotFound { block_number })?;
Ok(UnixTimestamp::from_u64(block.header.timestamp))
}
pub async fn block_range_for_timestamps(
&self,
start_ts: UnixTimestamp,
end_ts: UnixTimestamp,
) -> Result<(BlockNumber, BlockNumber), BlockWindowError> {
let span = spans::block_range_for_timestamps(start_ts.as_u64(), end_ts.as_u64());
let _guard = span.enter();
block_range_for_timestamps_with(
&self.bounds_memo,
start_ts,
end_ts,
|n| self.get_block_timestamp(n),
|| async {
self.provider
.get_block_number()
.await
.map_err(RpcError::get_block_number_failed)
.map_err(BlockWindowError::from)
},
)
.await
}
pub async fn get_daily_window(
&self,
chain: NamedChain,
date: NaiveDate,
) -> Result<DailyBlockWindow, BlockWindowError> {
let span = spans::get_daily_window(chain, date);
let _guard = span.enter();
get_daily_window_with(
&self.bounds_memo,
self.cache.as_ref(),
self.eager_head_ts,
chain,
date,
|n| self.get_block_timestamp(n),
|| async {
self.provider
.get_block_number()
.await
.map_err(RpcError::get_block_number_failed)
.map_err(BlockWindowError::from)
},
)
.await
}
}
async fn find_first_at_or_after_with<F, Fut>(
target_ts: UnixTimestamp,
latest_block: BlockNumber,
mut fetch_ts: F,
) -> Result<BlockNumber, BlockWindowError>
where
F: FnMut(BlockNumber) -> Fut,
Fut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
{
let mut lo = 0u64;
let mut hi = latest_block;
let mut result = latest_block;
while lo <= hi {
let mid = (lo + hi) / 2;
let ts = fetch_ts(mid).await?;
if ts >= target_ts {
result = mid;
if mid == 0 {
break;
}
hi = mid - 1;
} else {
lo = mid + 1;
}
}
debug!(target_ts = %target_ts, result, "Found first block at or after timestamp");
Ok(result)
}
async fn find_last_at_or_before_with<F, Fut>(
target_ts: UnixTimestamp,
latest_block: BlockNumber,
mut fetch_ts: F,
) -> Result<BlockNumber, BlockWindowError>
where
F: FnMut(BlockNumber) -> Fut,
Fut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
{
let mut lo = 0u64;
let mut hi = latest_block;
let mut result = 0u64;
while lo <= hi {
let mid = (lo + hi) / 2;
let ts = fetch_ts(mid).await?;
if ts <= target_ts {
result = mid;
lo = mid + 1;
} else {
if mid == 0 {
break;
}
hi = mid - 1;
}
}
debug!(target_ts = %target_ts, result, "Found last block at or before timestamp");
Ok(result)
}
async fn compute_block_range_given_bounds<F, Fut>(
start_ts: UnixTimestamp,
end_ts: UnixTimestamp,
latest_block: BlockNumber,
genesis_ts: UnixTimestamp,
latest_ts: UnixTimestamp,
mut fetch_ts: F,
) -> Result<(BlockNumber, BlockNumber), BlockWindowError>
where
F: FnMut(BlockNumber) -> Fut,
Fut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
{
debug_assert!(
start_ts <= end_ts,
"caller must validate start_ts <= end_ts"
);
if latest_block > 0 && genesis_ts > latest_ts {
return Err(BlockWindowError::non_monotonic_timestamps(
0,
latest_block,
genesis_ts,
latest_ts,
));
}
if start_ts > latest_ts {
return Ok((latest_block, latest_block));
}
if end_ts < genesis_ts {
return Ok((0, 0));
}
let start_block = if start_ts <= genesis_ts {
0
} else {
find_first_at_or_after_with(start_ts, latest_block, &mut fetch_ts).await?
};
let end_block = if end_ts >= latest_ts {
latest_block
} else {
find_last_at_or_before_with(end_ts, latest_block, &mut fetch_ts).await?
};
Ok((start_block, end_block))
}
async fn block_range_for_timestamps_with<F, FtFut, G, GnFut>(
bounds_memo: &ChainBoundsMemo,
start_ts: UnixTimestamp,
end_ts: UnixTimestamp,
mut fetch_ts: F,
fetch_latest_block_number: G,
) -> Result<(BlockNumber, BlockNumber), BlockWindowError>
where
F: FnMut(BlockNumber) -> FtFut,
FtFut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
G: FnOnce() -> GnFut,
GnFut: std::future::Future<Output = Result<BlockNumber, BlockWindowError>>,
{
if start_ts > end_ts {
return Err(BlockWindowError::invalid_timestamp_range(start_ts, end_ts));
}
let genesis_ts = bounds_memo.get_or_fetch_genesis(|| fetch_ts(0)).await?;
let (latest_block, latest_ts) = bounds_memo
.get_or_fetch_head(|| async {
let latest_block = fetch_latest_block_number().await?;
let latest_ts = if latest_block == 0 {
genesis_ts
} else {
fetch_ts(latest_block).await?
};
Ok((latest_block, latest_ts))
})
.await?;
info!(
start_ts = %start_ts,
end_ts = %end_ts,
latest_block,
"Resolving timestamp range to block range"
);
let (start_block, end_block) = compute_block_range_given_bounds(
start_ts,
end_ts,
latest_block,
genesis_ts,
latest_ts,
fetch_ts,
)
.await?;
info!(
start_ts = %start_ts,
end_ts = %end_ts,
start_block,
end_block,
"Resolved timestamp range to block range"
);
Ok((start_block, end_block))
}
async fn get_daily_window_with<F, FtFut, G, GnFut>(
bounds_memo: &ChainBoundsMemo,
cache: &dyn BlockWindowCache,
eager_head_ts: bool,
chain: NamedChain,
date: NaiveDate,
mut fetch_ts: F,
fetch_latest_block_number: G,
) -> Result<DailyBlockWindow, BlockWindowError>
where
F: FnMut(BlockNumber) -> FtFut,
FtFut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
G: FnOnce() -> GnFut,
GnFut: std::future::Future<Output = Result<BlockNumber, BlockWindowError>>,
{
let key = CacheKey::new(chain, date);
if let Some(window) = cache.get(&key).await {
info!(
chain = %chain,
date = %date,
cache = %cache.name(),
cached = true,
"Retrieved daily block window from cache"
);
return Ok(window);
}
let start_dt = Utc
.with_ymd_and_hms(date.year(), date.month(), date.day(), 0, 0, 0)
.single()
.ok_or_else(|| BlockWindowError::invalid_date_conversion(date))?;
let end_dt = start_dt
.checked_add_signed(chrono::TimeDelta::days(1))
.ok_or_else(|| BlockWindowError::date_arithmetic_overflow(date))?;
let start_ts = UnixTimestamp::from_datetime(start_dt);
let end_ts_exclusive = UnixTimestamp::from_datetime(end_dt);
let (latest_block, memoized_latest_ts) = if eager_head_ts {
let (latest_block, latest_ts) = bounds_memo
.get_or_fetch_head(|| async {
let latest_block = fetch_latest_block_number().await?;
let latest_ts = fetch_ts(latest_block).await?;
Ok((latest_block, latest_ts))
})
.await?;
(latest_block, Some(latest_ts))
} else {
bounds_memo
.get_or_fetch_latest_block(fetch_latest_block_number)
.await?
};
info!(
chain = %chain,
date = %date,
start_ts = %start_ts,
end_ts_exclusive = %end_ts_exclusive,
latest_block,
"Computing daily block window"
);
let safe_to_cache =
memoized_latest_ts.is_some_and(|latest_ts| end_ts_exclusive.pred() < latest_ts);
let window = if memoized_latest_ts.is_some_and(|latest_ts| start_ts > latest_ts) {
debug!(
chain = %chain,
date = %date,
start_ts = %start_ts,
latest_ts = ?memoized_latest_ts,
"Date past chain tip — returning empty window at tip without caching"
);
DailyBlockWindow::new(latest_block, latest_block, start_ts, end_ts_exclusive)?
} else {
let start_block =
find_first_at_or_after_with(start_ts, latest_block, &mut fetch_ts).await?;
let end_block =
find_last_at_or_before_with(end_ts_exclusive.pred(), latest_block, &mut fetch_ts)
.await?;
DailyBlockWindow::new(start_block, end_block, start_ts, end_ts_exclusive)?
};
info!(
chain = %chain,
date = %date,
start_block = window.start_block,
end_block = window.end_block,
block_count = window.block_count().as_u64(),
cache = %cache.name(),
cached = safe_to_cache,
"Computed daily block window"
);
if safe_to_cache {
if let Err(e) = cache.insert(key, window.clone()).await {
debug!(error = %e, "Failed to cache block window (continuing anyway)");
}
} else {
cache.record_skip_insert().await;
debug!(
chain = %chain,
date = %date,
latest_ts = ?memoized_latest_ts,
"Skipping cache insert: cannot confirm the window is safe to \
persist. Either the day touches or extends past the chain tip, \
or the head's timestamp is unknown (partial memo from \
get_or_fetch_latest_block). The (chain, date) cache key cannot \
disambiguate which head was current"
);
}
Ok(window)
}
#[cfg(test)]
mod tests {
use super::*;
use alloy_provider::ProviderBuilder;
fn dummy_provider() -> impl Provider {
ProviderBuilder::new().connect_http("http://localhost:1".parse().unwrap())
}
#[tokio::test]
async fn block_range_for_timestamps_rejects_inverted_range() {
let calculator = BlockWindowCalculator::without_cache(dummy_provider());
let err = calculator
.block_range_for_timestamps(UnixTimestamp(2000), UnixTimestamp(1000))
.await
.unwrap_err();
assert!(
matches!(err, BlockWindowError::InvalidTimestampRange { .. }),
"expected InvalidTimestampRange, got: {err:?}"
);
}
#[tokio::test]
async fn with_disk_cache_opts_into_eager_head_ts() {
let temp_dir = tempfile::TempDir::new().unwrap();
let cache_path = temp_dir.path().join("cache.json");
let calc = BlockWindowCalculator::with_disk_cache(dummy_provider(), &cache_path).unwrap();
assert!(
calc.eager_head_ts,
"with_disk_cache must opt into the eager head-ts fetch path"
);
let cache = crate::blocks::cache::DiskCache::new(&cache_path)
.validate()
.unwrap();
let calc_new = BlockWindowCalculator::new(dummy_provider(), Box::new(cache));
assert!(
!calc_new.eager_head_ts,
"`new` must preserve the conservative default — only the named \
`with_disk_cache` constructor opts into eager head-ts"
);
let calc_mem = BlockWindowCalculator::with_memory_cache(dummy_provider());
assert!(
!calc_mem.eager_head_ts,
"`with_memory_cache` keeps the conservative default"
);
let calc_none = BlockWindowCalculator::without_cache(dummy_provider());
assert!(
!calc_none.eager_head_ts,
"`without_cache` keeps the conservative default"
);
}
#[test]
fn test_cache_key_display() {
let key = CacheKey::new(
NamedChain::Arbitrum,
NaiveDate::from_ymd_opt(2025, 10, 10).unwrap(),
);
let serialized = key.to_string();
assert_eq!(serialized, "42161:2025-10-10");
}
#[test]
fn test_daily_block_window_validation() {
let start_ts = UnixTimestamp(1728518400);
let end_ts = UnixTimestamp(1728604800);
let window = DailyBlockWindow::new(1000, 2000, start_ts, end_ts);
assert!(window.is_ok());
assert_eq!(window.unwrap().block_count().as_u64(), 1001);
let invalid = DailyBlockWindow::new(2000, 1000, start_ts, end_ts);
assert!(invalid.is_err());
let invalid = DailyBlockWindow::new(1000, 2000, end_ts, start_ts);
assert!(invalid.is_err());
}
#[test]
fn test_block_window_edge_cases() {
let single = DailyBlockWindow {
start_block: 1000,
end_block: 1000,
start_ts: UnixTimestamp(1697328000),
end_ts_exclusive: UnixTimestamp(1697414400),
};
assert_eq!(single.block_count().as_u64(), 1);
let large = DailyBlockWindow {
start_block: 100_000_000,
end_block: 100_040_000,
start_ts: UnixTimestamp(1697328000),
end_ts_exclusive: UnixTimestamp(1697414400),
};
assert_eq!(large.block_count().as_u64(), 40_001);
let window = DailyBlockWindow {
start_block: 1000,
end_block: 2000,
start_ts: UnixTimestamp(1697328000),
end_ts_exclusive: UnixTimestamp(1697414400),
};
assert_eq!(window.block_count().as_u64(), 1001);
}
#[test]
fn test_block_window_validation_errors() {
let start_ts = UnixTimestamp(1728518400);
let end_ts = UnixTimestamp(1728604800);
let result = DailyBlockWindow::new(2000, 1000, start_ts, end_ts);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid block range"));
let result = DailyBlockWindow::new(1000, 2000, start_ts, start_ts);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid timestamp range"));
let result = DailyBlockWindow::new(1000, 2000, end_ts, start_ts);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Invalid timestamp range"));
}
#[test]
fn test_block_window_zero_values() {
let start_ts = UnixTimestamp(1728518400);
let end_ts = UnixTimestamp(1728604800);
let window = DailyBlockWindow::new(0, 100, start_ts, end_ts);
assert!(window.is_ok());
assert_eq!(window.unwrap().block_count().as_u64(), 101);
let window = DailyBlockWindow::new(0, 0, start_ts, end_ts);
assert!(window.is_ok());
assert_eq!(window.unwrap().block_count().as_u64(), 1);
}
#[test]
fn test_block_window_large_values() {
let start_ts = UnixTimestamp(1728518400);
let end_ts = UnixTimestamp(1728604800);
let window = DailyBlockWindow::new(100_000_000, 100_040_000, start_ts, end_ts);
assert!(window.is_ok());
assert_eq!(window.unwrap().block_count().as_u64(), 40_001);
let window = DailyBlockWindow::new(1_000_000_000, 1_001_000_000, start_ts, end_ts);
assert!(window.is_ok());
assert_eq!(window.unwrap().block_count().as_u64(), 1_000_001);
}
#[test]
fn test_block_window_count_overflow_protection() {
let start_ts = UnixTimestamp(1728518400);
let end_ts = UnixTimestamp(1728604800);
let window = DailyBlockWindow::new(u64::MAX - 100, u64::MAX, start_ts, end_ts);
assert!(window.is_ok());
let count = window.unwrap().block_count();
assert_eq!(count.as_u64(), 101);
}
fn fetcher_from(
timestamps: Vec<i64>,
) -> impl FnMut(
BlockNumber,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>>,
> {
move |n: BlockNumber| {
let ts = timestamps[n as usize];
Box::pin(async move { Ok(UnixTimestamp(ts)) })
}
}
async fn compute_block_range_with<F, Fut>(
start_ts: UnixTimestamp,
end_ts: UnixTimestamp,
latest_block: BlockNumber,
mut fetch_ts: F,
) -> Result<(BlockNumber, BlockNumber), BlockWindowError>
where
F: FnMut(BlockNumber) -> Fut,
Fut: std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>,
{
let genesis_ts = fetch_ts(0).await?;
let latest_ts = if latest_block == 0 {
genesis_ts
} else {
fetch_ts(latest_block).await?
};
compute_block_range_given_bounds(
start_ts,
end_ts,
latest_block,
genesis_ts,
latest_ts,
fetch_ts,
)
.await
}
#[tokio::test]
async fn block_range_target_inside_chain_history() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = (timestamps.len() - 1) as BlockNumber;
let (start, end) = compute_block_range_with(
UnixTimestamp(1150),
UnixTimestamp(1350),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 2);
assert_eq!(end, 3);
}
#[tokio::test]
async fn block_range_exact_boundary_match() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(1100),
UnixTimestamp(1300),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 1);
assert_eq!(end, 3);
}
#[tokio::test]
async fn block_range_target_before_genesis_returns_zero() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(500),
UnixTimestamp(900),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 0);
assert_eq!(end, 0);
}
#[tokio::test]
async fn block_range_start_before_genesis_clamps_to_zero() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(500),
UnixTimestamp(1250),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 0);
assert_eq!(end, 2);
}
#[tokio::test]
async fn block_range_target_after_latest_returns_latest() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(2000),
UnixTimestamp(3000),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, latest_block);
assert_eq!(end, latest_block);
}
#[tokio::test]
async fn block_range_end_after_latest_clamps_to_latest() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(1250),
UnixTimestamp(9999),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 3);
assert_eq!(end, latest_block);
}
#[tokio::test]
async fn block_range_between_consecutive_blocks_returns_inverted() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest_block: BlockNumber = 4;
let (start, end) = compute_block_range_with(
UnixTimestamp(1150),
UnixTimestamp(1180),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!(start, 2);
assert_eq!(end, 1);
assert!(start > end, "empty window should yield inverted range");
}
#[tokio::test]
async fn block_range_non_monotonic_chain_errors() {
let timestamps = vec![5000, 4000, 3000, 2000, 1000];
let latest_block: BlockNumber = 4;
let err = compute_block_range_with(
UnixTimestamp(2500),
UnixTimestamp(4500),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap_err();
assert!(matches!(
err,
BlockWindowError::NonMonotonicTimestamps { .. }
));
let msg = err.to_string();
assert!(
msg.contains("Non-monotonic"),
"expected non-monotonic message, got: {msg}"
);
}
#[tokio::test]
async fn block_range_single_block_chain() {
let timestamps = vec![1500];
let latest_block: BlockNumber = 0;
let (start, end) = compute_block_range_with(
UnixTimestamp(1000),
UnixTimestamp(2000),
latest_block,
fetcher_from(timestamps.clone()),
)
.await
.unwrap();
assert_eq!((start, end), (0, 0));
let (start, end) = compute_block_range_with(
UnixTimestamp(3000),
UnixTimestamp(4000),
latest_block,
fetcher_from(timestamps.clone()),
)
.await
.unwrap();
assert_eq!((start, end), (0, 0));
let (start, end) = compute_block_range_with(
UnixTimestamp(500),
UnixTimestamp(800),
latest_block,
fetcher_from(timestamps),
)
.await
.unwrap();
assert_eq!((start, end), (0, 0));
}
#[tokio::test]
async fn find_first_at_or_after_target_inside_history() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let result = find_first_at_or_after_with(UnixTimestamp(1150), 4, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 2);
}
#[tokio::test]
async fn find_first_at_or_after_returns_latest_when_target_past_head() {
let timestamps = vec![1000, 1100, 1200];
let result = find_first_at_or_after_with(UnixTimestamp(5000), 2, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 2);
}
#[tokio::test]
async fn find_first_at_or_after_returns_zero_when_target_before_genesis() {
let timestamps = vec![1000, 1100, 1200];
let result = find_first_at_or_after_with(UnixTimestamp(500), 2, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 0);
}
#[tokio::test]
async fn find_last_at_or_before_target_inside_history() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let result = find_last_at_or_before_with(UnixTimestamp(1250), 4, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 2);
}
#[tokio::test]
async fn find_last_at_or_before_returns_latest_when_target_past_head() {
let timestamps = vec![1000, 1100, 1200];
let result = find_last_at_or_before_with(UnixTimestamp(5000), 2, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 2);
}
#[tokio::test]
async fn find_last_at_or_before_returns_zero_when_target_before_genesis() {
let timestamps = vec![1000, 1100, 1200];
let result = find_last_at_or_before_with(UnixTimestamp(500), 2, fetcher_from(timestamps))
.await
.unwrap();
assert_eq!(result, 0);
}
mod bounds_memo {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[tokio::test]
async fn genesis_fetched_only_once() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let counter = Arc::new(AtomicUsize::new(0));
let c1 = counter.clone();
let v1 = memo
.get_or_fetch_genesis(|| async move {
c1.fetch_add(1, Ordering::SeqCst);
Ok(UnixTimestamp(1000))
})
.await
.unwrap();
let c2 = counter.clone();
let v2 = memo
.get_or_fetch_genesis(|| async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok(UnixTimestamp(9999))
})
.await
.unwrap();
assert_eq!(v1, UnixTimestamp(1000));
assert_eq!(v2, UnixTimestamp(1000));
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"second call must reuse the memoized genesis"
);
}
#[tokio::test]
async fn genesis_fetch_error_is_not_memoized() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let counter = Arc::new(AtomicUsize::new(0));
let c1 = counter.clone();
let err = memo
.get_or_fetch_genesis(|| async move {
c1.fetch_add(1, Ordering::SeqCst);
Err(BlockWindowError::invalid_timestamp_range(
UnixTimestamp(2),
UnixTimestamp(1),
))
})
.await;
assert!(err.is_err());
let c2 = counter.clone();
let v = memo
.get_or_fetch_genesis(|| async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok(UnixTimestamp(1000))
})
.await
.unwrap();
assert_eq!(v, UnixTimestamp(1000));
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn head_reused_within_ttl() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let counter = Arc::new(AtomicUsize::new(0));
let c1 = counter.clone();
let (b1, t1) = memo
.get_or_fetch_head(|| async move {
c1.fetch_add(1, Ordering::SeqCst);
Ok((100, UnixTimestamp(5000)))
})
.await
.unwrap();
let c2 = counter.clone();
let (b2, t2) = memo
.get_or_fetch_head(|| async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok((200, UnixTimestamp(9999)))
})
.await
.unwrap();
assert_eq!((b1, t1), (100, UnixTimestamp(5000)));
assert_eq!((b2, t2), (100, UnixTimestamp(5000)));
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"second call within TTL must reuse the memoized head"
);
}
#[tokio::test]
async fn with_head_ttl_preserves_memo() {
let mut calc = BlockWindowCalculator::without_cache(dummy_provider());
calc.bounds_memo
.genesis
.set(UnixTimestamp(1234))
.expect("genesis OnceCell starts empty");
calc = calc.with_head_ttl(Duration::from_secs(120));
assert_eq!(
calc.bounds_memo.genesis.get().copied(),
Some(UnixTimestamp(1234)),
"with_head_ttl must preserve the memoized genesis"
);
assert_eq!(calc.bounds_memo.head_ttl, Duration::from_secs(120));
}
#[tokio::test]
async fn latest_block_reused_within_ttl() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let counter = Arc::new(AtomicUsize::new(0));
let c1 = counter.clone();
let (b1, t1) = memo
.get_or_fetch_latest_block(|| async move {
c1.fetch_add(1, Ordering::SeqCst);
Ok(100)
})
.await
.unwrap();
let c2 = counter.clone();
let (b2, t2) = memo
.get_or_fetch_latest_block(|| async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok(200)
})
.await
.unwrap();
assert_eq!(b1, 100);
assert_eq!(b2, 100);
assert_eq!(
t1, None,
"partial entry from get_or_fetch_latest_block leaves latest_ts unset"
);
assert_eq!(
t2, None,
"second call must reuse the partial entry, still without a timestamp"
);
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"second call within TTL must reuse the memoized block number"
);
}
#[tokio::test]
async fn get_or_fetch_latest_block_reuses_full_entry() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let head_counter = Arc::new(AtomicUsize::new(0));
let block_counter = Arc::new(AtomicUsize::new(0));
let hc = head_counter.clone();
memo.get_or_fetch_head(|| async move {
hc.fetch_add(1, Ordering::SeqCst);
Ok((100, UnixTimestamp(5000)))
})
.await
.unwrap();
let bc = block_counter.clone();
let (b, t) = memo
.get_or_fetch_latest_block(|| async move {
bc.fetch_add(1, Ordering::SeqCst);
Ok(999)
})
.await
.unwrap();
assert_eq!(b, 100);
assert_eq!(
t,
Some(UnixTimestamp(5000)),
"full entry must surface its memoized timestamp at zero RPC cost"
);
assert_eq!(head_counter.load(Ordering::SeqCst), 1);
assert_eq!(
block_counter.load(Ordering::SeqCst),
0,
"block-only fetcher must not fire when a full entry is fresh"
);
}
#[tokio::test]
async fn get_or_fetch_head_upgrades_partial_entry() {
let memo = ChainBoundsMemo::new(Duration::from_secs(60));
let block_counter = Arc::new(AtomicUsize::new(0));
let head_counter = Arc::new(AtomicUsize::new(0));
let bc = block_counter.clone();
memo.get_or_fetch_latest_block(|| async move {
bc.fetch_add(1, Ordering::SeqCst);
Ok(100)
})
.await
.unwrap();
let hc = head_counter.clone();
let (b, t) = memo
.get_or_fetch_head(|| async move {
hc.fetch_add(1, Ordering::SeqCst);
Ok((101, UnixTimestamp(6000)))
})
.await
.unwrap();
assert_eq!((b, t), (101, UnixTimestamp(6000)));
assert_eq!(block_counter.load(Ordering::SeqCst), 1);
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"full-head fetcher must fire to upgrade a partial entry"
);
}
#[tokio::test]
async fn head_refetched_when_ttl_zero() {
let memo = ChainBoundsMemo::new(Duration::ZERO);
let counter = Arc::new(AtomicUsize::new(0));
let c1 = counter.clone();
let _ = memo
.get_or_fetch_head(|| async move {
c1.fetch_add(1, Ordering::SeqCst);
Ok((100, UnixTimestamp(5000)))
})
.await
.unwrap();
let c2 = counter.clone();
let (b, t) = memo
.get_or_fetch_head(|| async move {
c2.fetch_add(1, Ordering::SeqCst);
Ok((200, UnixTimestamp(6000)))
})
.await
.unwrap();
assert_eq!((b, t), (200, UnixTimestamp(6000)));
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"TTL=ZERO must skip memoization"
);
}
}
mod wiring {
use super::*;
use crate::blocks::cache::NoOpCache;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
type BoxedTsFut = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<UnixTimestamp, BlockWindowError>>>,
>;
type BoxedHeadFut = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<BlockNumber, BlockWindowError>>>,
>;
fn counting_fetch_ts(
timestamps: Vec<i64>,
log: Arc<StdMutex<Vec<BlockNumber>>>,
) -> impl FnMut(BlockNumber) -> BoxedTsFut {
move |n: BlockNumber| {
let ts = timestamps[n as usize];
let log = log.clone();
Box::pin(async move {
log.lock().expect("test log mutex poisoned").push(n);
Ok(UnixTimestamp(ts))
})
}
}
fn counting_fetch_head(
latest: BlockNumber,
counter: Arc<AtomicUsize>,
) -> impl FnOnce() -> BoxedHeadFut {
move || {
Box::pin(async move {
counter.fetch_add(1, Ordering::SeqCst);
Ok(latest)
})
}
}
#[tokio::test]
async fn bounds_memoized_across_two_calls_within_ttl() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let (s1, e1) = block_range_for_timestamps_with(
&bounds_memo,
UnixTimestamp(500),
UnixTimestamp(9999),
counting_fetch_ts(timestamps.clone(), log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((s1, e1), (0, latest));
let (s2, e2) = block_range_for_timestamps_with(
&bounds_memo,
UnixTimestamp(2000),
UnixTimestamp(3000),
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((s2, e2), (latest, latest));
let calls = log.lock().unwrap();
assert_eq!(
calls.as_slice(),
&[0u64, latest],
"memo must collapse genesis+head fetches across calls within TTL (got: {calls:?})"
);
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"head block number must be fetched once within TTL"
);
}
#[tokio::test]
async fn single_block_chain_reuses_genesis_for_head() {
let timestamps = vec![1500];
let latest: BlockNumber = 0;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let (s, e) = block_range_for_timestamps_with(
&bounds_memo,
UnixTimestamp(1000),
UnixTimestamp(2000),
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((s, e), (0, 0));
let calls = log.lock().unwrap();
assert_eq!(
calls.as_slice(),
&[0u64],
"block 0 must be fetched exactly once for a single-block chain (calls: {calls:?})"
);
assert_eq!(head_counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn daily_window_head_memoized_across_two_calls_within_ttl() {
let timestamps = vec![
1_735_689_500, 1_735_689_700, 1_735_745_000, 1_735_775_000, 1_735_776_100, ];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = NoOpCache;
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let w1 = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps.clone(), log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((w1.start_block, w1.end_block), (1, 3));
let w2 = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((w2.start_block, w2.end_block), (1, 3));
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"eth_blockNumber must fire exactly once across two cache misses within TTL"
);
}
#[tokio::test]
async fn daily_window_reuses_head_populated_by_block_range_for_timestamps() {
let timestamps = vec![
1_735_689_500,
1_735_689_700,
1_735_745_000,
1_735_775_000,
1_735_776_100,
];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = NoOpCache;
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
block_range_for_timestamps_with(
&bounds_memo,
UnixTimestamp(1_735_689_600),
UnixTimestamp(1_735_775_999),
counting_fetch_ts(timestamps.clone(), log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let w = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((w.start_block, w.end_block), (1, 3));
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"get_daily_window must reuse the head memoized by block_range_for_timestamps"
);
}
#[tokio::test]
async fn daily_window_uses_memoized_latest_block_in_binary_search() {
let timestamps = vec![
1_735_689_500,
1_735_689_700,
1_735_745_000,
1_735_775_000,
1_735_776_100,
];
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
*bounds_memo.head.lock().await = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block: 4,
latest_ts: Some(UnixTimestamp(1_735_776_100)),
});
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let cache = NoOpCache;
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let w = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(99, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((w.start_block, w.end_block), (1, 3));
assert_eq!(
head_counter.load(Ordering::SeqCst),
0,
"pre-populated head must short-circuit the fetcher"
);
}
#[tokio::test]
async fn interior_window_threads_memoized_latest_ts_through_binary_search() {
let timestamps = vec![1000, 1100, 1200, 1300, 1400];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let (s, e) = block_range_for_timestamps_with(
&bounds_memo,
UnixTimestamp(1150),
UnixTimestamp(1250),
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((s, e), (2, 2));
assert_eq!(head_counter.load(Ordering::SeqCst), 1);
}
fn fetch_ts_failing_at(
timestamps: Vec<i64>,
failing_block: BlockNumber,
) -> impl FnMut(BlockNumber) -> BoxedTsFut {
move |n: BlockNumber| {
if n == failing_block {
return Box::pin(async move {
Err(BlockWindowError::from(RpcError::BlockNotFound {
block_number: n,
}))
});
}
let ts = timestamps[n as usize];
Box::pin(async move { Ok(UnixTimestamp(ts)) })
}
}
#[tokio::test]
async fn daily_window_survives_transient_head_ts_failure() {
let timestamps = vec![
1_736_899_100, 1_736_899_300, 1_736_950_000, 1_736_990_000, 1_736_995_000, ];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = NoOpCache;
let date = NaiveDate::from_ymd_opt(2025, 1, 15).unwrap();
let head_counter = Arc::new(AtomicUsize::new(0));
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
fetch_ts_failing_at(timestamps, latest),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.expect(
"historical-date daily-window must succeed even when \
fetch_ts(head) errors — the binary search never needs \
the head's timestamp for an interior date",
);
assert_eq!((window.start_block, window.end_block), (1, 2));
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"eth_blockNumber must fire exactly once on cold memo"
);
}
fn fetch_ts_unreachable() -> impl FnMut(BlockNumber) -> BoxedTsFut {
|n: BlockNumber| {
Box::pin(async move {
panic!(
"binary search must not run when the date is past the memoized \
chain tip — fetch_ts({n}) should never be called"
)
})
}
}
#[tokio::test]
async fn daily_window_past_tip_skips_binary_search_and_does_not_cache() {
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
*bounds_memo.head.lock().await = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block: 100,
latest_ts: Some(UnixTimestamp(1_000_000_000)),
});
let cache = crate::blocks::cache::MemoryCache::new();
let head_counter = Arc::new(AtomicUsize::new(0));
let date = NaiveDate::from_ymd_opt(2026, 1, 1).unwrap();
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
fetch_ts_unreachable(),
counting_fetch_head(100, head_counter.clone()),
)
.await
.expect("past-tip window must return the empty sentinel, not error");
assert_eq!(
(window.start_block, window.end_block),
(100, 100),
"past-tip window collapses to (latest, latest) — same sentinel \
compute_block_range_given_bounds returns for the matching case"
);
assert_eq!(
head_counter.load(Ordering::SeqCst),
0,
"memoized head must short-circuit the eth_blockNumber fetch too"
);
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"past-tip window must not be cached — the (chain, date) key has \
no notion of which head was current and would shadow the correct \
window once the chain catches up"
);
assert_eq!(
stats.skip_inserts, 1,
"past-tip skip must increment skip_inserts so operators can \
distinguish the deliberate skip from a broken insert"
);
assert_eq!(
stats.misses, 1,
"the preceding cache.get still counts as a miss — the counter \
records the skip in addition to, not instead of, the miss"
);
}
#[tokio::test]
async fn daily_window_touching_tip_runs_search_but_does_not_cache() {
let timestamps = vec![
1_735_689_500, 1_735_689_700, 1_735_720_000, 1_735_745_000, 1_735_750_000, ];
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
*bounds_memo.head.lock().await = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block: 4,
latest_ts: Some(UnixTimestamp(1_735_750_000)),
});
let cache = crate::blocks::cache::MemoryCache::new();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(4, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (1, 4));
assert!(
!log.lock().unwrap().is_empty(),
"binary search must still probe blocks when the day starts \
inside chain history"
);
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"tip-adjacent window must not be cached — more blocks may land \
in the day's range and the cached entry would go stale silently"
);
assert_eq!(
stats.skip_inserts, 1,
"tip-touching skip must increment skip_inserts — the binary \
search ran but the result was deliberately not persisted"
);
}
#[tokio::test]
async fn daily_window_cold_memo_does_not_cache_historical_date() {
let timestamps = vec![
1_735_689_500,
1_735_689_700,
1_735_745_000,
1_735_775_000,
1_735_776_100,
];
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = crate::blocks::cache::MemoryCache::new();
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(4, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (1, 3));
assert!(
!log.lock().unwrap().is_empty(),
"binary search must still run on the cold-memo path — the \
caller gets a usable window even when caching is refused"
);
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"cold memo (no latest_ts) must refuse to cache — the same \
(chain, date) key may produce a different window once \
latest_ts is available, and we cannot distinguish a \
genuinely-historical day from one whose head ts is \
inside the day"
);
assert_eq!(
stats.skip_inserts, 1,
"cold-memo cache skip is deliberate and must increment \
skip_inserts so the count surfaces in operator metrics"
);
}
#[tokio::test]
async fn daily_window_cold_memo_does_not_cache_when_head_ts_inside_day() {
let timestamps = vec![
1_735_689_500, 1_735_689_700, 1_735_720_000, 1_735_745_000, 1_735_750_000, ];
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = crate::blocks::cache::MemoryCache::new();
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(4, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (1, 4));
assert!(
!log.lock().unwrap().is_empty(),
"binary search must still run — the caller asked for the \
day and is entitled to the best-effort window"
);
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"tip-touching window must not be cached — future blocks \
will land inside the day's range and the persisted entry \
would shadow the correct window indefinitely (the disk \
cache has no TTL by default)"
);
assert_eq!(
stats.skip_inserts, 1,
"cold-memo tip-touching skip is deliberate and must \
increment skip_inserts"
);
}
#[tokio::test]
async fn daily_window_start_ts_equals_latest_ts_runs_search_and_does_not_cache() {
let timestamps = vec![
1_735_689_400, 1_735_689_500, 1_735_689_600, ];
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
*bounds_memo.head.lock().await = Some(HeadEntry {
fetched_at: Instant::now(),
latest_block: 2,
latest_ts: Some(UnixTimestamp(1_735_689_600)),
});
let cache = crate::blocks::cache::MemoryCache::new();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let window = get_daily_window_with(
&bounds_memo,
&cache,
false,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(2, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (2, 2));
assert!(
!log.lock().unwrap().is_empty(),
"binary search must still run when start_ts == latest_ts — \
the past-tip short-circuit fires only on strictly past dates"
);
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"day whose start coincides with the chain tip is still partial \
and must not be cached"
);
assert_eq!(
stats.skip_inserts, 1,
"fence-post tip-touching skip must still increment skip_inserts"
);
}
#[tokio::test]
async fn daily_window_eager_head_ts_caches_historical_date() {
let timestamps = vec![
1_735_689_500, 1_735_689_700, 1_735_745_000, 1_735_775_000, 1_735_776_100, ];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = crate::blocks::cache::MemoryCache::new();
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let window = get_daily_window_with(
&bounds_memo,
&cache,
true,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (1, 3));
let stats = cache.stats().await;
assert_eq!(
stats.entries, 1,
"eager_head_ts=true on a fully-historical date must persist \
the window — this is the #18 regression test. Without this, \
`with_disk_cache` consumers that only call `get_daily_window` \
see an empty cache file across restarts."
);
assert_eq!(
stats.skip_inserts, 0,
"historical-day cache insert must succeed under eager_head_ts"
);
assert_eq!(
head_counter.load(Ordering::SeqCst),
1,
"eth_blockNumber must fire exactly once on cold memo"
);
}
#[tokio::test]
async fn daily_window_eager_head_ts_still_skips_tip_touching() {
let timestamps = vec![
1_735_689_500, 1_735_689_700, 1_735_720_000, 1_735_745_000, 1_735_750_000, ];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = crate::blocks::cache::MemoryCache::new();
let date = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
let log = Arc::new(StdMutex::new(Vec::<BlockNumber>::new()));
let head_counter = Arc::new(AtomicUsize::new(0));
let window = get_daily_window_with(
&bounds_memo,
&cache,
true,
NamedChain::Arbitrum,
date,
counting_fetch_ts(timestamps, log.clone()),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.unwrap();
assert_eq!((window.start_block, window.end_block), (1, 4));
let stats = cache.stats().await;
assert_eq!(
stats.entries, 0,
"eager_head_ts=true must not weaken the tip-touching gate — \
a day whose end extends past the head still depends on \
future chain state and must not be persisted"
);
assert_eq!(stats.skip_inserts, 1);
}
#[tokio::test]
async fn daily_window_eager_head_ts_propagates_transient_head_failure() {
let timestamps = vec![
1_736_899_100,
1_736_899_300,
1_736_950_000,
1_736_990_000,
1_736_995_000,
];
let latest: BlockNumber = 4;
let bounds_memo = ChainBoundsMemo::new(DEFAULT_HEAD_TTL);
let cache = crate::blocks::cache::MemoryCache::new();
let date = NaiveDate::from_ymd_opt(2025, 1, 15).unwrap();
let head_counter = Arc::new(AtomicUsize::new(0));
let err = get_daily_window_with(
&bounds_memo,
&cache,
true,
NamedChain::Arbitrum,
date,
fetch_ts_failing_at(timestamps, latest),
counting_fetch_head(latest, head_counter.clone()),
)
.await
.expect_err(
"eager_head_ts=true must propagate transient head-ts \
fetch errors — the cold-memo `false` path is the only \
one that hides them",
);
assert!(
matches!(err, BlockWindowError::Rpc(RpcError::BlockNotFound { .. })),
"expected BlockNotFound from the failing head probe, got: {err:?}"
);
}
}
}