use std::collections::HashSet;
use super::{
computation::{ComputationId, ComputationRequirements},
events::DerivedDataEvent,
};
#[derive(Debug)]
pub struct ReadinessTracker {
current_block: Option<u64>,
ready_for_block: HashSet<ComputationId>,
ever_computed: HashSet<ComputationId>,
failed_for_block: HashSet<ComputationId>,
requirements: ComputationRequirements,
}
impl ReadinessTracker {
pub fn new(requirements: ComputationRequirements) -> Self {
Self {
current_block: None,
ready_for_block: HashSet::new(),
ever_computed: HashSet::new(),
failed_for_block: HashSet::new(),
requirements,
}
}
#[cfg(test)]
pub fn no_requirements() -> Self {
Self::new(ComputationRequirements::none())
}
pub fn handle_event(&mut self, event: &DerivedDataEvent) {
match event {
DerivedDataEvent::NewBlock { block } => {
self.on_new_block(*block);
}
DerivedDataEvent::ComputationComplete { computation_id, block, .. } => {
self.on_computation_complete(computation_id, *block);
}
DerivedDataEvent::ComputationFailed { computation_id, block } => {
self.on_computation_failed(computation_id, *block);
}
}
}
fn on_new_block(&mut self, block: u64) {
if self
.current_block
.is_none_or(|b| block > b)
{
self.current_block = Some(block);
self.ready_for_block.clear();
self.failed_for_block.clear();
}
}
fn on_computation_failed(&mut self, computation_id: ComputationId, block: u64) {
if self
.requirements
.require_fresh
.contains(computation_id) &&
self.current_block
.is_some_and(|b| block == b)
{
self.failed_for_block
.insert(computation_id);
}
}
fn on_computation_complete(&mut self, computation_id: ComputationId, block: u64) {
self.ever_computed
.insert(computation_id);
if self
.current_block
.is_some_and(|b| block < b)
{
return;
}
if self
.current_block
.is_none_or(|b| block > b)
{
self.on_new_block(block);
}
self.ready_for_block
.insert(computation_id);
}
pub fn is_blocked_for_current_block(&self) -> bool {
self.requirements
.require_fresh
.iter()
.any(|id| self.failed_for_block.contains(id))
}
pub fn is_ready(&self) -> bool {
let fresh_ready = self
.requirements
.require_fresh
.iter()
.all(|id| self.ready_for_block.contains(id));
let stale_ready = self
.requirements
.allow_stale
.iter()
.all(|id| self.ever_computed.contains(id));
fresh_ready && stale_ready
}
pub fn has_requirements(&self) -> bool {
self.requirements.has_requirements()
}
pub fn missing(&self) -> HashSet<ComputationId> {
let missing_fresh: HashSet<_> = self
.requirements
.require_fresh
.difference(&self.ready_for_block)
.copied()
.collect();
let missing_stale: HashSet<_> = self
.requirements
.allow_stale
.difference(&self.ever_computed)
.copied()
.collect();
missing_fresh
.union(&missing_stale)
.copied()
.collect()
}
#[cfg(test)]
pub fn current_block(&self) -> Option<u64> {
self.current_block
}
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_requirements(ids: &[&'static str]) -> ComputationRequirements {
ids.iter()
.fold(ComputationRequirements::none(), |req, id| {
req.require_fresh(id)
.expect("test ids should not conflict")
})
}
fn stale_requirements(ids: &[&'static str]) -> ComputationRequirements {
ids.iter()
.fold(ComputationRequirements::none(), |req, id| {
req.allow_stale(id)
.expect("test ids should not conflict")
})
}
#[test]
fn new_tracker_not_ready_with_fresh_requirements() {
let tracker = ReadinessTracker::new(fresh_requirements(&["token_prices"]));
assert!(!tracker.is_ready());
assert!(tracker.has_requirements());
assert_eq!(tracker.current_block(), None);
}
#[test]
fn new_tracker_ready_without_requirements() {
let tracker = ReadinessTracker::no_requirements();
assert!(tracker.is_ready());
assert!(!tracker.has_requirements());
}
#[test]
fn fresh_requirement_needs_current_block() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 100,
failed_items: vec![],
});
assert!(tracker.is_ready());
assert_eq!(tracker.current_block(), Some(100));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 101 });
assert!(!tracker.is_ready()); assert_eq!(tracker.current_block(), Some(101));
}
#[test]
fn fresh_requirement_ignores_old_blocks() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 99,
failed_items: vec![],
});
assert!(!tracker.is_ready());
}
#[test]
fn fresh_requirement_multiple_computations() {
let mut tracker =
ReadinessTracker::new(fresh_requirements(&["token_prices", "spot_prices"]));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
assert!(!tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 100,
failed_items: vec![],
});
assert!(tracker.is_ready());
}
#[test]
fn fresh_requirement_newer_block_resets() {
let mut tracker =
ReadinessTracker::new(fresh_requirements(&["token_prices", "spot_prices"]));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 101,
failed_items: vec![],
});
assert!(!tracker.is_ready()); assert_eq!(tracker.current_block(), Some(101));
assert!(tracker
.ready_for_block
.contains(&"spot_prices"));
assert!(!tracker
.ready_for_block
.contains(&"token_prices"));
}
#[test]
fn new_tracker_not_ready_with_stale_requirements() {
let tracker = ReadinessTracker::new(stale_requirements(&["token_prices"]));
assert!(!tracker.is_ready());
assert!(tracker.has_requirements());
}
#[test]
fn stale_requirement_accepts_any_block() {
let mut tracker = ReadinessTracker::new(stale_requirements(&["token_prices"]));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
assert!(tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 101 });
assert!(tracker.is_ready()); }
#[test]
fn stale_requirement_accepts_old_blocks() {
let mut tracker = ReadinessTracker::new(stale_requirements(&["token_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 99,
failed_items: vec![],
});
assert!(tracker.is_ready()); }
#[test]
fn stale_requirement_persists_across_blocks() {
let mut tracker = ReadinessTracker::new(stale_requirements(&["token_prices"]));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
assert!(tracker.is_ready());
for block in 101..=110 {
tracker.handle_event(&DerivedDataEvent::NewBlock { block });
assert!(tracker.is_ready()); }
assert!(tracker
.ever_computed
.contains(&"token_prices"));
}
#[test]
fn mixed_fresh_and_stale_requirements() {
let requirements = ComputationRequirements::none()
.require_fresh("spot_prices") .unwrap()
.allow_stale("token_prices") .unwrap();
let mut tracker = ReadinessTracker::new(requirements);
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
assert!(!tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 100,
failed_items: vec![],
});
assert!(tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 101 });
assert!(!tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 101,
failed_items: vec![],
});
assert!(tracker.is_ready()); }
#[test]
fn test_fresh_failure_blocks_current_block() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationFailed {
computation_id: "spot_prices",
block: 100,
});
assert!(tracker.is_blocked_for_current_block());
}
#[test]
fn test_stale_failure_is_ignored() {
let mut tracker = ReadinessTracker::new(stale_requirements(&["token_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationFailed {
computation_id: "token_prices",
block: 100,
});
assert!(!tracker.is_blocked_for_current_block());
}
#[test]
fn test_failure_cleared_on_new_block() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationFailed {
computation_id: "spot_prices",
block: 100,
});
assert!(tracker.is_blocked_for_current_block());
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 101 });
assert!(!tracker.is_blocked_for_current_block());
}
#[test]
fn test_old_block_failure_does_not_block() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationFailed {
computation_id: "spot_prices",
block: 99,
});
assert!(!tracker.is_blocked_for_current_block());
}
#[test]
fn test_failure_recovers_on_next_block() {
let mut tracker = ReadinessTracker::new(fresh_requirements(&["spot_prices"]));
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 100 });
tracker.handle_event(&DerivedDataEvent::ComputationFailed {
computation_id: "spot_prices",
block: 100,
});
assert!(tracker.is_blocked_for_current_block());
assert!(!tracker.is_ready());
tracker.handle_event(&DerivedDataEvent::NewBlock { block: 101 });
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "spot_prices",
block: 101,
failed_items: vec![],
});
assert!(!tracker.is_blocked_for_current_block());
assert!(tracker.is_ready());
}
#[test]
fn missing_returns_unready_set() {
let requirements = ComputationRequirements::none()
.require_fresh("spot_prices")
.unwrap()
.allow_stale("token_prices")
.unwrap();
let mut tracker = ReadinessTracker::new(requirements);
let missing = tracker.missing();
assert_eq!(missing.len(), 2);
assert!(missing.contains(&"spot_prices"));
assert!(missing.contains(&"token_prices"));
tracker.handle_event(&DerivedDataEvent::ComputationComplete {
computation_id: "token_prices",
block: 100,
failed_items: vec![],
});
let missing = tracker.missing();
assert_eq!(missing.len(), 1);
assert!(missing.contains(&"spot_prices"));
assert!(!missing.contains(&"token_prices"));
}
}