use std::collections::HashMap;
use std::time::Duration;
use tokio::time::Instant;
pub(crate) fn now_nanos() -> u64 {
static EPOCH: std::sync::OnceLock<Instant> = std::sync::OnceLock::new();
let epoch = EPOCH.get_or_init(Instant::now);
Instant::now().duration_since(*epoch).as_nanos() as u64
}
pub(crate) const SLOW_INIT_THRESHOLD: Duration = Duration::from_secs(1);
pub(crate) const STALE_INIT_THRESHOLD: Duration = Duration::from_secs(30);
pub(crate) const MAX_QUEUED_OPS_PER_CONTRACT: usize = 100;
pub(crate) const MAX_CONCURRENT_INITIALIZATIONS: usize = 50;
use either::Either;
use freenet_stdlib::prelude::*;
#[derive(Debug, thiserror::Error)]
pub(crate) enum InitTrackerError {
#[error("too many concurrent contract initializations (limit: {limit})")]
TooManyConcurrentInits { limit: usize },
}
#[derive(Debug)]
pub(crate) enum InitCheckResult {
NotInitializing,
Queued { queue_size: usize },
QueueFull,
PutDuringInit,
}
#[derive(Debug)]
pub(crate) struct QueuedOperation {
pub update: Either<WrappedState, StateDelta<'static>>,
pub related_contracts: RelatedContracts<'static>,
pub queued_at_nanos: u64,
}
#[derive(Debug)]
pub(crate) struct InitCompletionInfo {
pub queued_ops: Vec<QueuedOperation>,
pub init_duration: Duration,
}
#[derive(Debug)]
pub(crate) struct StaleInitInfo {
pub key: ContractKey,
pub age: Duration,
pub dropped_ops: usize,
}
#[derive(Debug)]
pub(crate) struct ContractInitTracker {
states: HashMap<ContractKey, InitState>,
}
#[derive(Debug)]
struct InitState {
queued_ops: Vec<QueuedOperation>,
started_at_nanos: u64,
}
impl Default for ContractInitTracker {
fn default() -> Self {
Self::new()
}
}
impl ContractInitTracker {
pub fn new() -> Self {
Self {
states: HashMap::new(),
}
}
pub fn check_and_maybe_queue(
&mut self,
key: &ContractKey,
has_code: bool,
update: Either<WrappedState, StateDelta<'static>>,
related_contracts: RelatedContracts<'static>,
now_nanos: u64,
) -> InitCheckResult {
let Some(state) = self.states.get_mut(key) else {
return InitCheckResult::NotInitializing;
};
if has_code {
return InitCheckResult::PutDuringInit;
}
if state.queued_ops.len() >= MAX_QUEUED_OPS_PER_CONTRACT {
return InitCheckResult::QueueFull;
}
state.queued_ops.push(QueuedOperation {
update,
related_contracts,
queued_at_nanos: now_nanos,
});
InitCheckResult::Queued {
queue_size: state.queued_ops.len(),
}
}
#[allow(dead_code)] pub fn is_initializing(&self, key: &ContractKey) -> bool {
self.states.contains_key(key)
}
pub fn start_initialization(
&mut self,
key: ContractKey,
now_nanos: u64,
) -> Result<(), InitTrackerError> {
if self.states.len() >= MAX_CONCURRENT_INITIALIZATIONS {
return Err(InitTrackerError::TooManyConcurrentInits {
limit: MAX_CONCURRENT_INITIALIZATIONS,
});
}
self.states.insert(
key,
InitState {
queued_ops: Vec::new(),
started_at_nanos: now_nanos,
},
);
Ok(())
}
pub fn complete_initialization(
&mut self,
key: &ContractKey,
now_nanos: u64,
) -> Option<InitCompletionInfo> {
self.states.remove(key).map(|state| {
let elapsed_nanos = now_nanos.saturating_sub(state.started_at_nanos);
InitCompletionInfo {
queued_ops: state.queued_ops,
init_duration: Duration::from_nanos(elapsed_nanos),
}
})
}
pub fn fail_initialization(&mut self, key: &ContractKey) -> Option<usize> {
self.states.remove(key).map(|state| state.queued_ops.len())
}
#[allow(dead_code)] pub fn queued_count(&self, key: &ContractKey) -> usize {
self.states
.get(key)
.map(|s| s.queued_ops.len())
.unwrap_or(0)
}
pub fn cleanup_stale_initializations(
&mut self,
max_age: Duration,
now_nanos: u64,
) -> Vec<StaleInitInfo> {
let max_age_nanos = max_age.as_nanos() as u64;
let stale_keys: Vec<_> = self
.states
.iter()
.filter_map(|(key, state)| {
let age_nanos = now_nanos.saturating_sub(state.started_at_nanos);
if age_nanos > max_age_nanos {
Some((
*key,
Duration::from_nanos(age_nanos),
state.queued_ops.len(),
))
} else {
None
}
})
.collect();
stale_keys
.into_iter()
.map(|(key, age, dropped_ops)| {
self.states.remove(&key);
StaleInitInfo {
key,
age,
dropped_ops,
}
})
.collect()
}
#[allow(dead_code)] pub fn initializing_count(&self) -> usize {
self.states.len()
}
pub fn queue_wait_duration(op: &QueuedOperation, now_nanos: u64) -> Duration {
Duration::from_nanos(now_nanos.saturating_sub(op.queued_at_nanos))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_key() -> ContractKey {
let code = ContractCode::from(vec![1, 2, 3, 4]);
let params = Parameters::from(vec![5, 6, 7, 8]);
ContractKey::from_params_and_code(¶ms, &code)
}
fn make_test_state(data: &[u8]) -> WrappedState {
WrappedState::new(data.to_vec())
}
#[test]
fn test_not_initializing_returns_not_initializing() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
let state = make_test_state(&[1, 2, 3]);
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
1_000_000,
);
assert!(matches!(result, InitCheckResult::NotInitializing));
}
#[test]
fn test_put_during_init_returns_error() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
let state = make_test_state(&[1, 2, 3]);
let result = tracker.check_and_maybe_queue(
&key,
true, Either::Left(state),
RelatedContracts::default(),
2_000_000,
);
assert!(matches!(result, InitCheckResult::PutDuringInit));
}
#[test]
fn test_update_during_init_is_queued() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
let state = make_test_state(&[1, 2, 3]);
let result = tracker.check_and_maybe_queue(
&key,
false, Either::Left(state),
RelatedContracts::default(),
2_000_000,
);
assert!(matches!(result, InitCheckResult::Queued { queue_size: 1 }));
assert_eq!(tracker.queued_count(&key), 1);
}
#[test]
fn test_multiple_updates_queued() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
for i in 0..3 {
let state = make_test_state(&[i]);
let now = 2_000_000 + (i as u64) * 1_000_000;
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
now,
);
assert!(matches!(
result,
InitCheckResult::Queued { queue_size } if queue_size == (i as usize + 1)
));
}
assert_eq!(tracker.queued_count(&key), 3);
}
#[test]
fn test_complete_initialization_returns_queued_ops() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
for i in 0..2 {
let state = make_test_state(&[i]);
tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
2_000_000 + (i as u64) * 1_000_000,
);
}
let completion = tracker.complete_initialization(&key, 10_000_000).unwrap();
assert_eq!(completion.queued_ops.len(), 2);
assert_eq!(completion.init_duration, Duration::from_nanos(9_000_000));
assert!(!tracker.is_initializing(&key));
}
#[test]
fn test_fail_initialization_drops_queued_ops() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
for i in 0..3 {
let state = make_test_state(&[i]);
tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
2_000_000 + (i as u64) * 1_000_000,
);
}
let dropped_count = tracker.fail_initialization(&key).unwrap();
assert_eq!(dropped_count, 3);
assert!(!tracker.is_initializing(&key));
}
#[test]
fn test_complete_nonexistent_returns_none() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
assert!(tracker.complete_initialization(&key, 1_000_000).is_none());
}
#[test]
fn test_fail_nonexistent_returns_none() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
assert!(tracker.fail_initialization(&key).is_none());
}
#[test]
fn test_is_initializing() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
assert!(!tracker.is_initializing(&key));
tracker.start_initialization(key, 1_000_000).unwrap();
assert!(tracker.is_initializing(&key));
tracker.complete_initialization(&key, 2_000_000);
assert!(!tracker.is_initializing(&key));
}
#[test]
fn test_delta_update_can_be_queued() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
let delta = StateDelta::from(vec![10, 20, 30]);
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Right(delta),
RelatedContracts::default(),
2_000_000,
);
assert!(matches!(result, InitCheckResult::Queued { queue_size: 1 }));
let completion = tracker.complete_initialization(&key, 3_000_000).unwrap();
assert!(matches!(completion.queued_ops[0].update, Either::Right(_)));
}
#[test]
fn test_cleanup_stale_removes_old_entries() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 0).unwrap();
let state = make_test_state(&[1, 2, 3]);
tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
1_000_000,
);
let stale = tracker.cleanup_stale_initializations(Duration::from_secs(30), 31_000_000_000);
assert_eq!(stale.len(), 1);
assert_eq!(stale[0].key, key);
assert_eq!(stale[0].dropped_ops, 1);
assert!(!tracker.is_initializing(&key));
}
#[test]
fn test_cleanup_stale_keeps_fresh_entries() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
let start_nanos = 100_000_000_000;
tracker.start_initialization(key, start_nanos).unwrap();
let stale = tracker
.cleanup_stale_initializations(Duration::from_secs(3600), start_nanos + 1_000_000_000);
assert!(stale.is_empty());
assert!(tracker.is_initializing(&key));
}
#[test]
fn test_cleanup_stale_multiple_contracts() {
let mut tracker = ContractInitTracker::new();
let key1 = make_contract_key_with_code(&[1]);
let key2 = make_contract_key_with_code(&[2]);
let key3 = make_contract_key_with_code(&[3]);
tracker.start_initialization(key1, 0).unwrap();
tracker.start_initialization(key2, 1_000_000).unwrap();
tracker.start_initialization(key3, 2_000_000).unwrap();
assert_eq!(tracker.initializing_count(), 3);
let stale = tracker.cleanup_stale_initializations(Duration::ZERO, 10_000_000);
assert_eq!(stale.len(), 3);
assert_eq!(tracker.initializing_count(), 0);
}
#[test]
fn test_initializing_count() {
let mut tracker = ContractInitTracker::new();
assert_eq!(tracker.initializing_count(), 0);
let key1 = make_contract_key_with_code(&[1]);
let key2 = make_contract_key_with_code(&[2]);
tracker.start_initialization(key1, 1_000_000).unwrap();
assert_eq!(tracker.initializing_count(), 1);
tracker.start_initialization(key2, 2_000_000).unwrap();
assert_eq!(tracker.initializing_count(), 2);
tracker.complete_initialization(&key1, 3_000_000);
assert_eq!(tracker.initializing_count(), 1);
}
#[test]
fn test_queue_wait_duration() {
let op = QueuedOperation {
update: Either::Left(make_test_state(&[1])),
related_contracts: RelatedContracts::default(),
queued_at_nanos: 5_000_000_000, };
let wait = ContractInitTracker::queue_wait_duration(&op, 8_000_000_000); assert_eq!(wait, Duration::from_secs(3));
}
#[test]
fn test_init_duration_deterministic() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
let start = 1_000_000_000; let end = 1_500_000_000; tracker.start_initialization(key, start).unwrap();
let info = tracker.complete_initialization(&key, end).unwrap();
assert_eq!(info.init_duration, Duration::from_millis(500));
}
#[test]
fn test_fully_deterministic_sequence() {
let run = || {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 100).unwrap();
let state = make_test_state(&[42]);
tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
200,
);
let stale = tracker.cleanup_stale_initializations(Duration::from_secs(1), 300);
assert!(stale.is_empty(), "should not be stale yet");
let completion = tracker.complete_initialization(&key, 400).unwrap();
(completion.init_duration, completion.queued_ops.len())
};
let (dur1, ops1) = run();
let (dur2, ops2) = run();
assert_eq!(dur1, dur2);
assert_eq!(ops1, ops2);
assert_eq!(dur1, Duration::from_nanos(300));
}
#[test]
fn test_deterministic_stale_cleanup_with_explicit_time() {
let run = |start: u64, queue_time: u64, cleanup_time: u64, max_age_secs: u64| {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, start).unwrap();
let state = make_test_state(&[99]);
tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
queue_time,
);
let stale = tracker
.cleanup_stale_initializations(Duration::from_secs(max_age_secs), cleanup_time);
(stale.len(), tracker.is_initializing(&key))
};
let (stale1, init1) = run(0, 1_000_000_000, 5_000_000_000, 10);
let (stale2, init2) = run(0, 1_000_000_000, 5_000_000_000, 10);
assert_eq!((stale1, init1), (0, true));
assert_eq!((stale1, init1), (stale2, init2));
let (stale3, init3) = run(0, 1_000_000_000, 15_000_000_000, 10);
let (stale4, init4) = run(0, 1_000_000_000, 15_000_000_000, 10);
assert_eq!((stale3, init3), (1, false));
assert_eq!((stale3, init3), (stale4, init4));
}
#[test]
fn test_queue_wait_duration_is_pure() {
let make_op = |queued_at: u64| QueuedOperation {
update: Either::Left(make_test_state(&[1])),
related_contracts: RelatedContracts::default(),
queued_at_nanos: queued_at,
};
let op = make_op(1_000_000_000);
let d1 = ContractInitTracker::queue_wait_duration(&op, 3_000_000_000);
let d2 = ContractInitTracker::queue_wait_duration(&op, 3_000_000_000);
assert_eq!(d1, d2);
assert_eq!(d1, Duration::from_secs(2));
let d3 = ContractInitTracker::queue_wait_duration(&op, 500_000_000);
assert_eq!(d3, Duration::ZERO);
}
fn make_contract_key_with_code(code_bytes: &[u8]) -> ContractKey {
let code = ContractCode::from(code_bytes.to_vec());
let params = Parameters::from(vec![5, 6, 7, 8]);
ContractKey::from_params_and_code(¶ms, &code)
}
#[test]
fn test_queue_overflow() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
tracker.start_initialization(key, 1_000_000).unwrap();
for i in 0..MAX_QUEUED_OPS_PER_CONTRACT {
let state = make_test_state(&[i as u8]);
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
2_000_000 + i as u64,
);
assert!(
matches!(result, InitCheckResult::Queued { .. }),
"Expected Queued at index {i}, got {:?}",
result
);
}
assert_eq!(tracker.queued_count(&key), MAX_QUEUED_OPS_PER_CONTRACT);
let state = make_test_state(&[255]);
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
3_000_000,
);
assert!(
matches!(result, InitCheckResult::QueueFull),
"Expected QueueFull after limit, got {:?}",
result
);
assert_eq!(tracker.queued_count(&key), MAX_QUEUED_OPS_PER_CONTRACT);
}
#[test]
fn test_concurrent_init_limit() {
let mut tracker = ContractInitTracker::new();
for i in 0..MAX_CONCURRENT_INITIALIZATIONS {
let code = ContractCode::from(vec![i as u8, (i >> 8) as u8, 0, 1]);
let params = Parameters::from(vec![0, 0, 0, i as u8]);
let key = ContractKey::from_params_and_code(¶ms, &code);
tracker
.start_initialization(key, 1_000_000 + i as u64)
.unwrap_or_else(|_| panic!("Expected Ok at index {i}"));
}
assert_eq!(tracker.initializing_count(), MAX_CONCURRENT_INITIALIZATIONS);
let overflow_code = ContractCode::from(vec![99, 99, 99, 99]);
let overflow_params = Parameters::from(vec![99, 99, 99, 99]);
let overflow_key = ContractKey::from_params_and_code(&overflow_params, &overflow_code);
let result = tracker.start_initialization(overflow_key, 9_999_999);
assert!(
matches!(result, Err(InitTrackerError::TooManyConcurrentInits { .. })),
"Expected TooManyConcurrentInits, got {:?}",
result
);
assert_eq!(tracker.initializing_count(), MAX_CONCURRENT_INITIALIZATIONS);
}
#[test]
fn test_normal_ops_within_limit_work() {
let mut tracker = ContractInitTracker::new();
let key = make_test_key();
const N: usize = 10;
const { assert!(N < MAX_QUEUED_OPS_PER_CONTRACT) };
tracker.start_initialization(key, 1_000_000).unwrap();
for i in 0..N {
let state = make_test_state(&[i as u8]);
let result = tracker.check_and_maybe_queue(
&key,
false,
Either::Left(state),
RelatedContracts::default(),
2_000_000 + i as u64,
);
assert!(
matches!(result, InitCheckResult::Queued { queue_size } if queue_size == i + 1),
"Expected Queued {{ queue_size: {} }}, got {:?}",
i + 1,
result
);
}
assert_eq!(tracker.queued_count(&key), N);
let completion = tracker.complete_initialization(&key, 10_000_000).unwrap();
assert_eq!(completion.queued_ops.len(), N);
assert!(!tracker.is_initializing(&key));
}
}