pub use crate::storage::faulty::Config as FaultConfig;
use crate::{
network::{
audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
metered::Network as MeteredNetwork,
},
storage::{
audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
memory::Storage as MemStorage, metered::Storage as MeteredStorage,
},
telemetry::metrics::task::Label,
utils::{
add_attribute,
signal::{Signal, Stopper},
supervision::Tree,
Panicker, Registry, ScopeGuard,
},
validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
};
#[cfg(feature = "external")]
use crate::{Blocker, Pacer};
use commonware_codec::Encode;
use commonware_macros::select;
use commonware_parallel::ThreadPool;
use commonware_utils::{
hex,
sync::{Mutex, RwLock},
time::SYSTEM_TIME_PRECISION,
SystemTimeExt,
};
#[cfg(feature = "external")]
use futures::task::noop_waker;
use futures::{
future::Either,
task::{waker, ArcWake},
Future,
};
use governor::clock::{Clock as GClock, ReasonablyRealtime};
#[cfg(feature = "external")]
use pin_project::pin_project;
use prometheus_client::{
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::{Metric, Registry as PrometheusRegistry},
};
use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
use rand_core::CryptoRngCore;
use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
use sha2::{Digest as _, Sha256};
use std::{
borrow::Cow,
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
mem::{replace, take},
net::{IpAddr, SocketAddr},
num::NonZeroUsize,
panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
pin::Pin,
sync::{Arc, Weak},
task::{self, Poll, Waker},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tracing::{info_span, trace, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Debug)]
struct Metrics {
iterations: Counter,
tasks_spawned: Family<Label, Counter>,
tasks_running: Family<Label, Gauge>,
task_polls: Family<Label, Counter>,
network_bandwidth: Counter,
}
impl Metrics {
pub fn init(registry: &mut PrometheusRegistry) -> Self {
let metrics = Self {
iterations: Counter::default(),
task_polls: Family::default(),
tasks_spawned: Family::default(),
tasks_running: Family::default(),
network_bandwidth: Counter::default(),
};
registry.register(
"iterations",
"Total number of iterations",
metrics.iterations.clone(),
);
registry.register(
"tasks_spawned",
"Total number of tasks spawned",
metrics.tasks_spawned.clone(),
);
registry.register(
"tasks_running",
"Number of tasks currently running",
metrics.tasks_running.clone(),
);
registry.register(
"task_polls",
"Total number of task polls",
metrics.task_polls.clone(),
);
registry.register(
"bandwidth",
"Total amount of data sent over network",
metrics.network_bandwidth.clone(),
);
metrics
}
}
type Digest = [u8; 32];
pub struct Auditor {
digest: Mutex<Digest>,
}
impl Default for Auditor {
fn default() -> Self {
Self {
digest: Digest::default().into(),
}
}
}
impl Auditor {
pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
where
F: FnOnce(&mut Sha256),
{
let mut digest = self.digest.lock();
let mut hasher = Sha256::new();
hasher.update(digest.as_ref());
hasher.update(label);
payload(&mut hasher);
*digest = hasher.finalize().into();
}
pub fn state(&self) -> String {
let hash = self.digest.lock();
hex(hash.as_ref())
}
}
pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
pub struct Config {
rng: BoxDynRng,
cycle: Duration,
start_time: SystemTime,
timeout: Option<Duration>,
catch_panics: bool,
storage_fault_cfg: FaultConfig,
network_buffer_pool_cfg: BufferPoolConfig,
storage_buffer_pool_cfg: BufferPoolConfig,
}
impl Config {
pub fn new() -> Self {
cfg_if::cfg_if! {
if #[cfg(miri)] {
let network_buffer_pool_cfg = BufferPoolConfig::for_network()
.with_max_per_class(commonware_utils::NZUsize!(32))
.with_thread_cache_disabled();
let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
.with_max_per_class(commonware_utils::NZUsize!(32))
.with_thread_cache_disabled();
} else {
let network_buffer_pool_cfg =
BufferPoolConfig::for_network().with_thread_cache_disabled();
let storage_buffer_pool_cfg =
BufferPoolConfig::for_storage().with_thread_cache_disabled();
}
}
Self {
rng: Box::new(StdRng::seed_from_u64(42)),
cycle: Duration::from_millis(1),
start_time: UNIX_EPOCH,
timeout: None,
catch_panics: false,
storage_fault_cfg: FaultConfig::default(),
network_buffer_pool_cfg,
storage_buffer_pool_cfg,
}
}
pub fn with_seed(self, seed: u64) -> Self {
self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
}
pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
self.rng = rng;
self
}
pub const fn with_cycle(mut self, cycle: Duration) -> Self {
self.cycle = cycle;
self
}
pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
self.start_time = start_time;
self
}
pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
self.catch_panics = catch_panics;
self
}
pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
self.network_buffer_pool_cfg = cfg;
self
}
pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
self.storage_buffer_pool_cfg = cfg;
self
}
pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
self.storage_fault_cfg = faults;
self
}
pub const fn cycle(&self) -> Duration {
self.cycle
}
pub const fn start_time(&self) -> SystemTime {
self.start_time
}
pub const fn timeout(&self) -> Option<Duration> {
self.timeout
}
pub const fn catch_panics(&self) -> bool {
self.catch_panics
}
pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
&self.network_buffer_pool_cfg
}
pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
&self.storage_buffer_pool_cfg
}
pub fn assert(&self) {
assert!(
self.cycle != Duration::default() || self.timeout.is_none(),
"cycle duration must be non-zero when timeout is set",
);
assert!(
self.cycle >= SYSTEM_TIME_PRECISION,
"cycle duration must be greater than or equal to system time precision"
);
assert!(
self.start_time >= UNIX_EPOCH,
"start time must be greater than or equal to unix epoch"
);
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
type MetricKey = (String, Vec<(String, String)>);
pub struct Executor {
registry: Mutex<Registry>,
registered_metrics: Mutex<HashSet<MetricKey>>,
cycle: Duration,
deadline: Option<SystemTime>,
metrics: Arc<Metrics>,
auditor: Arc<Auditor>,
rng: Arc<Mutex<BoxDynRng>>,
time: Mutex<SystemTime>,
tasks: Arc<Tasks>,
sleeping: Mutex<BinaryHeap<Alarm>>,
shutdown: Mutex<Stopper>,
panicker: Panicker,
dns: Mutex<HashMap<String, Vec<IpAddr>>>,
}
impl Executor {
fn advance_time(&self) -> SystemTime {
#[cfg(feature = "external")]
std::thread::sleep(self.cycle);
let mut time = self.time.lock();
*time = time
.checked_add(self.cycle)
.expect("executor time overflowed");
let now = *time;
trace!(now = now.epoch_millis(), "time advanced");
now
}
fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
if cfg!(feature = "external") || self.tasks.ready() != 0 {
return current;
}
let mut skip_until = None;
{
let sleeping = self.sleeping.lock();
if let Some(next) = sleeping.peek() {
if next.time > current {
skip_until = Some(next.time);
}
}
}
skip_until.map_or(current, |deadline| {
let mut time = self.time.lock();
*time = deadline;
let now = *time;
trace!(now = now.epoch_millis(), "time skipped");
now
})
}
fn wake_ready_sleepers(&self, current: SystemTime) {
let mut sleeping = self.sleeping.lock();
while let Some(next) = sleeping.peek() {
if next.time <= current {
let sleeper = sleeping.pop().unwrap();
sleeper.waker.wake();
} else {
break;
}
}
}
fn assert_liveness(&self) {
if cfg!(feature = "external") || self.tasks.ready() != 0 {
return;
}
panic!("runtime stalled");
}
}
pub struct Checkpoint {
cycle: Duration,
deadline: Option<SystemTime>,
auditor: Arc<Auditor>,
rng: Arc<Mutex<BoxDynRng>>,
time: Mutex<SystemTime>,
storage: Arc<Storage>,
dns: Mutex<HashMap<String, Vec<IpAddr>>>,
catch_panics: bool,
network_buffer_pool_cfg: BufferPoolConfig,
storage_buffer_pool_cfg: BufferPoolConfig,
}
impl Checkpoint {
pub fn auditor(&self) -> Arc<Auditor> {
self.auditor.clone()
}
}
#[allow(clippy::large_enum_variant)]
enum State {
Config(Config),
Checkpoint(Checkpoint),
}
pub struct Runner {
state: State,
}
impl From<Config> for Runner {
fn from(cfg: Config) -> Self {
Self::new(cfg)
}
}
impl From<Checkpoint> for Runner {
fn from(checkpoint: Checkpoint) -> Self {
Self {
state: State::Checkpoint(checkpoint),
}
}
}
impl Runner {
pub fn new(cfg: Config) -> Self {
cfg.assert();
Self {
state: State::Config(cfg),
}
}
pub fn seeded(seed: u64) -> Self {
Self::new(Config::default().with_seed(seed))
}
pub fn timed(timeout: Duration) -> Self {
let cfg = Config {
timeout: Some(timeout),
..Config::default()
};
Self::new(cfg)
}
pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
where
F: FnOnce(Context) -> Fut,
Fut: Future,
{
let (context, executor, panicked) = match self.state {
State::Config(config) => Context::new(config),
State::Checkpoint(checkpoint) => Context::recover(checkpoint),
};
let storage = context.storage.clone();
let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
let mut root = Box::pin(panicked.interrupt(f(context)));
Tasks::register_root(&executor.tasks);
let result = catch_unwind(AssertUnwindSafe(|| loop {
{
let current = executor.time.lock();
if let Some(deadline) = executor.deadline {
if *current >= deadline {
drop(current);
panic!("runtime timeout");
}
}
}
let mut queue = executor.tasks.drain();
if queue.len() > 1 {
let mut rng = executor.rng.lock();
queue.shuffle(&mut *rng);
}
trace!(
iter = executor.metrics.iterations.get(),
tasks = queue.len(),
"starting loop"
);
let mut output = None;
for id in queue {
let Some(task) = executor.tasks.get(id) else {
trace!(id, "skipping missing task");
continue;
};
executor.auditor.event(b"process_task", |hasher| {
hasher.update(task.id.to_be_bytes());
hasher.update(task.label.name().as_bytes());
});
executor.metrics.task_polls.get_or_create(&task.label).inc();
trace!(id, "processing task");
let waker = waker(Arc::new(TaskWaker {
id,
tasks: Arc::downgrade(&executor.tasks),
}));
let mut cx = task::Context::from_waker(&waker);
match &task.mode {
Mode::Root => {
if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
trace!(id, "root task is complete");
output = Some(result);
break;
}
}
Mode::Work(future) => {
let mut fut_opt = future.lock();
let Some(fut) = fut_opt.as_mut() else {
trace!(id, "skipping already complete task");
executor.tasks.remove(id);
continue;
};
if fut.as_mut().poll(&mut cx).is_ready() {
trace!(id, "task is complete");
executor.tasks.remove(id);
*fut_opt = None;
continue;
}
}
}
trace!(id, "task is still pending");
}
if let Some(output) = output {
break output;
}
let mut current = executor.advance_time();
current = executor.skip_idle_time(current);
executor.wake_ready_sleepers(current);
executor.assert_liveness();
executor.metrics.iterations.inc();
}));
executor.sleeping.lock().clear(); let tasks = executor.tasks.clear();
for task in tasks {
let Mode::Work(future) = &task.mode else {
continue;
};
*future.lock() = None;
}
drop(root);
assert!(
Arc::weak_count(&executor) == 0,
"executor still has weak references"
);
let output = match result {
Ok(output) => output,
Err(payload) => resume_unwind(payload),
};
let executor = Arc::into_inner(executor).expect("executor still has strong references");
let checkpoint = Checkpoint {
cycle: executor.cycle,
deadline: executor.deadline,
auditor: executor.auditor,
rng: executor.rng,
time: executor.time,
storage,
dns: executor.dns,
catch_panics: executor.panicker.catch(),
network_buffer_pool_cfg,
storage_buffer_pool_cfg,
};
(output, checkpoint)
}
}
impl Default for Runner {
fn default() -> Self {
Self::new(Config::default())
}
}
impl crate::Runner for Runner {
type Context = Context;
fn start<F, Fut>(self, f: F) -> Fut::Output
where
F: FnOnce(Self::Context) -> Fut,
Fut: Future,
{
let (output, _) = self.start_and_recover(f);
output
}
}
enum Mode {
Root,
Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
}
struct Task {
id: u128,
label: Label,
mode: Mode,
}
struct TaskWaker {
id: u128,
tasks: Weak<Tasks>,
}
impl ArcWake for TaskWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
if let Some(tasks) = arc_self.tasks.upgrade() {
tasks.queue(arc_self.id);
}
}
}
struct Tasks {
counter: Mutex<u128>,
ready: Mutex<Vec<u128>>,
running: Mutex<BTreeMap<u128, Arc<Task>>>,
}
impl Tasks {
const fn new() -> Self {
Self {
counter: Mutex::new(0),
ready: Mutex::new(Vec::new()),
running: Mutex::new(BTreeMap::new()),
}
}
fn increment(&self) -> u128 {
let mut counter = self.counter.lock();
let old = *counter;
*counter = counter.checked_add(1).expect("task counter overflow");
old
}
fn register_root(arc_self: &Arc<Self>) {
let id = arc_self.increment();
let task = Arc::new(Task {
id,
label: Label::root(),
mode: Mode::Root,
});
arc_self.register(id, task);
}
fn register_work(
arc_self: &Arc<Self>,
label: Label,
future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) {
let id = arc_self.increment();
let task = Arc::new(Task {
id,
label,
mode: Mode::Work(Mutex::new(Some(future))),
});
arc_self.register(id, task);
}
fn register(&self, id: u128, task: Arc<Task>) {
self.running.lock().insert(id, task);
self.queue(id);
}
fn queue(&self, id: u128) {
let mut ready = self.ready.lock();
ready.push(id);
}
fn drain(&self) -> Vec<u128> {
let mut queue = self.ready.lock();
let len = queue.len();
replace(&mut *queue, Vec::with_capacity(len))
}
fn ready(&self) -> usize {
self.ready.lock().len()
}
fn get(&self, id: u128) -> Option<Arc<Task>> {
let running = self.running.lock();
running.get(&id).cloned()
}
fn remove(&self, id: u128) {
self.running.lock().remove(&id);
}
fn clear(&self) -> Vec<Arc<Task>> {
self.ready.lock().clear();
let running: BTreeMap<u128, Arc<Task>> = {
let mut running = self.running.lock();
take(&mut *running)
};
running.into_values().collect()
}
}
type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
pub struct Context {
name: String,
attributes: Vec<(String, String)>,
scope: Option<Arc<ScopeGuard>>,
executor: Weak<Executor>,
network: Arc<Network>,
storage: Arc<Storage>,
network_buffer_pool: BufferPool,
storage_buffer_pool: BufferPool,
tree: Arc<Tree>,
execution: Execution,
traced: bool,
}
impl Clone for Context {
fn clone(&self) -> Self {
let (child, _) = Tree::child(&self.tree);
Self {
name: self.name.clone(),
attributes: self.attributes.clone(),
scope: self.scope.clone(),
executor: self.executor.clone(),
network: self.network.clone(),
storage: self.storage.clone(),
network_buffer_pool: self.network_buffer_pool.clone(),
storage_buffer_pool: self.storage_buffer_pool.clone(),
tree: child,
execution: Execution::default(),
traced: false,
}
}
}
impl Context {
fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
let mut registry = Registry::new();
let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
let metrics = Arc::new(Metrics::init(runtime_registry));
let start_time = cfg.start_time;
let deadline = cfg
.timeout
.map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
let auditor = Arc::new(Auditor::default());
let rng = Arc::new(Mutex::new(cfg.rng));
let network_buffer_pool = BufferPool::new(
cfg.network_buffer_pool_cfg.clone(),
runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
);
let storage_buffer_pool = BufferPool::new(
cfg.storage_buffer_pool_cfg.clone(),
runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
);
let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
let storage = MeteredStorage::new(
AuditedStorage::new(
FaultyStorage::new(
MemStorage::new(storage_buffer_pool.clone()),
rng.clone(),
storage_fault_config,
),
auditor.clone(),
),
runtime_registry,
);
let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
let network = MeteredNetwork::new(network, runtime_registry);
let (panicker, panicked) = Panicker::new(cfg.catch_panics);
let executor = Arc::new(Executor {
registry: Mutex::new(registry),
registered_metrics: Mutex::new(HashSet::new()),
cycle: cfg.cycle,
deadline,
metrics,
auditor,
rng,
time: Mutex::new(start_time),
tasks: Arc::new(Tasks::new()),
sleeping: Mutex::new(BinaryHeap::new()),
shutdown: Mutex::new(Stopper::default()),
panicker,
dns: Mutex::new(HashMap::new()),
});
(
Self {
name: String::new(),
attributes: Vec::new(),
scope: None,
executor: Arc::downgrade(&executor),
network: Arc::new(network),
storage: Arc::new(storage),
network_buffer_pool,
storage_buffer_pool,
tree: Tree::root(),
execution: Execution::default(),
traced: false,
},
executor,
panicked,
)
}
fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
let mut registry = Registry::new();
let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
let metrics = Arc::new(Metrics::init(runtime_registry));
let network =
AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
let network = MeteredNetwork::new(network, runtime_registry);
let network_buffer_pool = BufferPool::new(
checkpoint.network_buffer_pool_cfg.clone(),
runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
);
let storage_buffer_pool = BufferPool::new(
checkpoint.storage_buffer_pool_cfg.clone(),
runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
);
let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
let executor = Arc::new(Executor {
cycle: checkpoint.cycle,
deadline: checkpoint.deadline,
auditor: checkpoint.auditor,
rng: checkpoint.rng,
time: checkpoint.time,
dns: checkpoint.dns,
registry: Mutex::new(registry),
registered_metrics: Mutex::new(HashSet::new()),
metrics,
tasks: Arc::new(Tasks::new()),
sleeping: Mutex::new(BinaryHeap::new()),
shutdown: Mutex::new(Stopper::default()),
panicker,
});
(
Self {
name: String::new(),
attributes: Vec::new(),
scope: None,
executor: Arc::downgrade(&executor),
network: Arc::new(network),
storage: checkpoint.storage,
network_buffer_pool,
storage_buffer_pool,
tree: Tree::root(),
execution: Execution::default(),
traced: false,
},
executor,
panicked,
)
}
fn executor(&self) -> Arc<Executor> {
self.executor.upgrade().expect("executor already dropped")
}
fn metrics(&self) -> Arc<Metrics> {
self.executor().metrics.clone()
}
pub fn auditor(&self) -> Arc<Auditor> {
self.executor().auditor.clone()
}
pub fn storage_audit(&self) -> Digest {
self.storage.inner().inner().inner().audit()
}
pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
self.storage.inner().inner().config()
}
pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
let executor = self.executor();
let host = host.into();
executor.auditor.event(b"resolver_register", |hasher| {
hasher.update(host.as_bytes());
hasher.update(addrs.encode());
});
let mut dns = executor.dns.lock();
match addrs {
Some(addrs) => {
dns.insert(host, addrs);
}
None => {
dns.remove(&host);
}
}
}
}
impl crate::Spawner for Context {
fn dedicated(mut self) -> Self {
self.execution = Execution::Dedicated;
self
}
fn shared(mut self, blocking: bool) -> Self {
self.execution = Execution::Shared(blocking);
self
}
fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (label, metric) = spawn_metrics!(self);
let parent = Arc::clone(&self.tree);
let traced = self.traced;
self.execution = Execution::default();
self.traced = false;
let (child, aborted) = Tree::child(&parent);
if aborted {
return Handle::closed(metric);
}
self.tree = child;
let executor = self.executor();
let future = if traced {
let span = info_span!(parent: None, "task", name = %label.name());
for (key, value) in &self.attributes {
span.set_attribute(key.clone(), value.clone());
}
Either::Left(f(self).instrument(span))
} else {
Either::Right(f(self))
};
let (f, handle) = Handle::init(
future,
metric,
executor.panicker.clone(),
Arc::clone(&parent),
);
Tasks::register_work(&executor.tasks, label, Box::pin(f));
if let Some(aborter) = handle.aborter() {
parent.register(aborter);
}
handle
}
async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
let executor = self.executor();
executor.auditor.event(b"stop", |hasher| {
hasher.update(value.to_be_bytes());
});
let stop_resolved = {
let mut shutdown = executor.shutdown.lock();
shutdown.stop(value)
};
let timeout_future = timeout.map_or_else(
|| futures::future::Either::Right(futures::future::pending()),
|duration| futures::future::Either::Left(self.sleep(duration)),
);
select! {
result = stop_resolved => {
result.map_err(|_| Error::Closed)?;
Ok(())
},
_ = timeout_future => Err(Error::Timeout),
}
}
fn stopped(&self) -> Signal {
let executor = self.executor();
executor.auditor.event(b"stopped", |_| {});
let stopped = executor.shutdown.lock().stopped();
stopped
}
}
impl crate::ThreadPooler for Context {
fn create_thread_pool(
&self,
concurrency: NonZeroUsize,
) -> Result<ThreadPool, ThreadPoolBuildError> {
let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
if rayon::current_thread_index().is_none() {
builder = builder.use_current_thread()
}
builder
.spawn_handler(move |thread| {
self.with_label("rayon_thread")
.dedicated()
.spawn(move |_| async move { thread.run() });
Ok(())
})
.build()
.map(Arc::new)
}
}
impl crate::Metrics for Context {
fn label(&self) -> String {
self.name.clone()
}
fn with_label(&self, label: &str) -> Self {
validate_label(label);
let name = {
let prefix = self.name.clone();
if prefix.is_empty() {
label.to_string()
} else {
format!("{prefix}_{label}")
}
};
assert!(
!name.starts_with(METRICS_PREFIX),
"using runtime label is not allowed"
);
Self {
name,
..self.clone()
}
}
fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
validate_label(key);
let mut attributes = self.attributes.clone();
assert!(
add_attribute(&mut attributes, key, value),
"duplicate attribute key: {key}"
);
Self {
attributes,
..self.clone()
}
}
fn with_scope(&self) -> Self {
let executor = self.executor();
executor.auditor.event(b"with_scope", |_| {});
if self.scope.is_some() {
return self.clone();
}
let weak = self.executor.clone();
let scope_id = executor.registry.lock().create_scope();
let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
if let Some(exec) = weak.upgrade() {
exec.registry.lock().remove_scope(id);
}
}));
Self {
scope: Some(guard),
..self.clone()
}
}
fn with_span(&self) -> Self {
Self {
traced: true,
..self.clone()
}
}
fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
let name = name.into();
let help = help.into();
let executor = self.executor();
executor.auditor.event(b"register", |hasher| {
hasher.update(name.as_bytes());
hasher.update(help.as_bytes());
for (k, v) in &self.attributes {
hasher.update(k.as_bytes());
hasher.update(v.as_bytes());
}
});
let prefixed_name = {
let prefix = &self.name;
if prefix.is_empty() {
name
} else {
format!("{}_{}", *prefix, name)
}
};
let metric_key = (prefixed_name.clone(), self.attributes.clone());
let is_new = executor.registered_metrics.lock().insert(metric_key);
assert!(
is_new,
"duplicate metric: {} with attributes {:?}",
prefixed_name, self.attributes
);
let mut registry = executor.registry.lock();
let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
let sub_registry = self
.attributes
.iter()
.fold(scoped, |reg, (k, v): &(String, String)| {
reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
});
sub_registry.register(prefixed_name, help, metric);
}
fn encode(&self) -> String {
let executor = self.executor();
executor.auditor.event(b"encode", |_| {});
let encoded = executor.registry.lock().encode();
encoded
}
}
struct Sleeper {
executor: Weak<Executor>,
time: SystemTime,
registered: bool,
}
impl Sleeper {
fn executor(&self) -> Arc<Executor> {
self.executor.upgrade().expect("executor already dropped")
}
}
struct Alarm {
time: SystemTime,
waker: Waker,
}
impl PartialEq for Alarm {
fn eq(&self, other: &Self) -> bool {
self.time.eq(&other.time)
}
}
impl Eq for Alarm {}
impl PartialOrd for Alarm {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Alarm {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.time.cmp(&self.time)
}
}
impl Future for Sleeper {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let executor = self.executor();
{
let current_time = *executor.time.lock();
if current_time >= self.time {
return Poll::Ready(());
}
}
if !self.registered {
self.registered = true;
executor.sleeping.lock().push(Alarm {
time: self.time,
waker: cx.waker().clone(),
});
}
Poll::Pending
}
}
impl Clock for Context {
fn current(&self) -> SystemTime {
*self.executor().time.lock()
}
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
let deadline = self
.current()
.checked_add(duration)
.expect("overflow when setting wake time");
self.sleep_until(deadline)
}
fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
Sleeper {
executor: self.executor.clone(),
time: deadline,
registered: false,
}
}
}
#[cfg(feature = "external")]
#[pin_project]
struct Waiter<F: Future> {
executor: Weak<Executor>,
target: SystemTime,
#[pin]
future: F,
ready: Option<F::Output>,
started: bool,
registered: bool,
}
#[cfg(feature = "external")]
impl<F> Future for Waiter<F>
where
F: Future + Send,
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if !*this.started {
*this.started = true;
let waker = noop_waker();
let mut cx_noop = task::Context::from_waker(&waker);
if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
*this.ready = Some(value);
}
}
let executor = this.executor.upgrade().expect("executor already dropped");
let current_time = *executor.time.lock();
if current_time < *this.target {
if !*this.registered {
*this.registered = true;
executor.sleeping.lock().push(Alarm {
time: *this.target,
waker: cx.waker().clone(),
});
}
return Poll::Pending;
}
if let Some(value) = this.ready.take() {
return Poll::Ready(value);
}
let blocker = Blocker::new();
loop {
let waker = waker(blocker.clone());
let mut cx_block = task::Context::from_waker(&waker);
match this.future.as_mut().poll(&mut cx_block) {
Poll::Ready(value) => {
break Poll::Ready(value);
}
Poll::Pending => blocker.wait(),
}
}
}
}
#[cfg(feature = "external")]
impl Pacer for Context {
fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
where
F: Future<Output = T> + Send + 'a,
T: Send + 'a,
{
let target = self
.executor()
.time
.lock()
.checked_add(latency)
.expect("overflow when setting wake time");
Waiter {
executor: self.executor.clone(),
target,
future,
ready: None,
started: false,
registered: false,
}
}
}
impl GClock for Context {
type Instant = SystemTime;
fn now(&self) -> Self::Instant {
self.current()
}
}
impl ReasonablyRealtime for Context {}
impl crate::Network for Context {
type Listener = ListenerOf<Network>;
async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
self.network.bind(socket).await
}
async fn dial(
&self,
socket: SocketAddr,
) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
self.network.dial(socket).await
}
}
impl crate::Resolver for Context {
async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
let executor = self.executor();
let dns = executor.dns.lock();
let result = dns.get(host).cloned();
drop(dns);
executor.auditor.event(b"resolve", |hasher| {
hasher.update(host.as_bytes());
hasher.update(result.encode());
});
result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
}
}
impl RngCore for Context {
fn next_u32(&mut self) -> u32 {
let executor = self.executor();
executor.auditor.event(b"rand", |hasher| {
hasher.update(b"next_u32");
});
let result = executor.rng.lock().next_u32();
result
}
fn next_u64(&mut self) -> u64 {
let executor = self.executor();
executor.auditor.event(b"rand", |hasher| {
hasher.update(b"next_u64");
});
let result = executor.rng.lock().next_u64();
result
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
let executor = self.executor();
executor.auditor.event(b"rand", |hasher| {
hasher.update(b"fill_bytes");
});
executor.rng.lock().fill_bytes(dest);
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
let executor = self.executor();
executor.auditor.event(b"rand", |hasher| {
hasher.update(b"try_fill_bytes");
});
let result = executor.rng.lock().try_fill_bytes(dest);
result
}
}
impl CryptoRng for Context {}
impl crate::Storage for Context {
type Blob = <Storage as crate::Storage>::Blob;
async fn open_versioned(
&self,
partition: &str,
name: &[u8],
versions: std::ops::RangeInclusive<u16>,
) -> Result<(Self::Blob, u64, u16), Error> {
self.storage.open_versioned(partition, name, versions).await
}
async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
self.storage.remove(partition, name).await
}
async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
self.storage.scan(partition).await
}
}
impl crate::BufferPooler for Context {
fn network_buffer_pool(&self) -> &crate::BufferPool {
&self.network_buffer_pool
}
fn storage_buffer_pool(&self) -> &crate::BufferPool {
&self.storage_buffer_pool
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "external")]
use crate::FutureExt;
#[cfg(feature = "external")]
use crate::Spawner;
use crate::{deterministic, reschedule, Blob, Metrics, Resolver, Runner as _, Storage};
use commonware_macros::test_traced;
#[cfg(feature = "external")]
use commonware_utils::channel::mpsc;
use commonware_utils::channel::oneshot;
#[cfg(not(feature = "external"))]
use futures::future::pending;
#[cfg(not(feature = "external"))]
use futures::stream::StreamExt as _;
#[cfg(feature = "external")]
use futures::StreamExt;
use futures::{stream::FuturesUnordered, task::noop_waker};
async fn task(i: usize) -> usize {
for _ in 0..5 {
reschedule().await;
}
i
}
fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
runner.start(|context| async move {
let mut handles = FuturesUnordered::new();
for i in 0..=tasks - 1 {
handles.push(context.clone().spawn(move |_| task(i)));
}
let mut outputs = Vec::new();
while let Some(result) = handles.next().await {
outputs.push(result.unwrap());
}
assert_eq!(outputs.len(), tasks);
(context.auditor().state(), outputs)
})
}
fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
let executor = deterministic::Runner::seeded(seed);
run_tasks(5, executor)
}
#[test]
fn test_same_seed_same_order() {
let mut outputs = Vec::new();
for seed in 0..1000 {
let output = run_with_seed(seed);
outputs.push(output);
}
for seed in 0..1000 {
let output = run_with_seed(seed);
assert_eq!(output, outputs[seed as usize]);
}
}
#[test_traced("TRACE")]
fn test_different_seeds_different_order() {
let output1 = run_with_seed(12345);
let output2 = run_with_seed(54321);
assert_ne!(output1, output2);
}
#[test]
fn test_alarm_min_heap() {
let now = SystemTime::now();
let alarms = vec![
Alarm {
time: now + Duration::new(10, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(5, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(15, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(5, 0),
waker: noop_waker(),
},
];
let mut heap = BinaryHeap::new();
for alarm in alarms {
heap.push(alarm);
}
let mut sorted_times = Vec::new();
while let Some(alarm) = heap.pop() {
sorted_times.push(alarm.time);
}
assert_eq!(
sorted_times,
vec![
now + Duration::new(5, 0),
now + Duration::new(5, 0),
now + Duration::new(10, 0),
now + Duration::new(15, 0),
]
);
}
#[test]
#[should_panic(expected = "runtime timeout")]
fn test_timeout() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
loop {
context.sleep(Duration::from_secs(1)).await;
}
});
}
#[test]
#[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
fn test_bad_timeout() {
let cfg = Config {
timeout: Some(Duration::default()),
cycle: Duration::default(),
..Config::default()
};
deterministic::Runner::new(cfg);
}
#[test]
#[should_panic(
expected = "cycle duration must be greater than or equal to system time precision"
)]
fn test_bad_cycle() {
let cfg = Config {
cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
..Config::default()
};
deterministic::Runner::new(cfg);
}
#[test]
fn test_recover_synced_storage_persists() {
let executor1 = deterministic::Runner::default();
let partition = "test_partition";
let name = b"test_blob";
let data = b"Hello, world!";
let (state, checkpoint) = executor1.start_and_recover(|context| async move {
let (blob, _) = context.open(partition, name).await.unwrap();
blob.write_at(0, data).await.unwrap();
blob.sync().await.unwrap();
context.auditor().state()
});
assert_eq!(state, checkpoint.auditor.state());
let executor = Runner::from(checkpoint);
executor.start(|context| async move {
let (blob, len) = context.open(partition, name).await.unwrap();
assert_eq!(len, data.len() as u64);
let read = blob.read_at(0, data.len()).await.unwrap();
assert_eq!(read.coalesce(), data);
});
}
#[test]
#[should_panic(expected = "goodbye")]
fn test_recover_panic_handling() {
let executor1 = deterministic::Runner::default();
let (_, checkpoint) = executor1.start_and_recover(|_| async move {
reschedule().await;
});
let executor = Runner::from(checkpoint);
executor.start(|_| async move {
panic!("goodbye");
});
}
#[test]
fn test_recover_unsynced_storage_does_not_persist() {
let executor = deterministic::Runner::default();
let partition = "test_partition";
let name = b"test_blob";
let data = b"Hello, world!";
let (_, checkpoint) = executor.start_and_recover(|context| async move {
let context = context.clone();
let (blob, _) = context.open(partition, name).await.unwrap();
blob.write_at(0, data).await.unwrap();
});
let executor = Runner::from(checkpoint);
executor.start(|context| async move {
let (_, len) = context.open(partition, name).await.unwrap();
assert_eq!(len, 0);
});
}
#[test]
fn test_recover_dns_mappings_persist() {
let executor = deterministic::Runner::default();
let host = "example.com";
let addrs = vec![
IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
];
let (state, checkpoint) = executor.start_and_recover({
let addrs = addrs.clone();
|context| async move {
context.resolver_register(host, Some(addrs));
context.auditor().state()
}
});
assert_eq!(state, checkpoint.auditor.state());
let executor = Runner::from(checkpoint);
executor.start(move |context| async move {
let resolved = context.resolve(host).await.unwrap();
assert_eq!(resolved, addrs);
});
}
#[test]
fn test_recover_time_persists() {
let executor = deterministic::Runner::default();
let duration_to_sleep = Duration::from_secs(10);
let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
context.sleep(duration_to_sleep).await;
context.current()
});
assert_eq!(
time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
duration_to_sleep
);
let executor2 = Runner::from(checkpoint);
executor2.start(move |context| async move {
assert_eq!(context.current(), time_before_recovery);
context.sleep(duration_to_sleep).await;
assert_eq!(
context.current().duration_since(UNIX_EPOCH).unwrap(),
duration_to_sleep * 2
);
});
}
#[test]
#[should_panic(expected = "executor still has weak references")]
fn test_context_return() {
let executor = deterministic::Runner::default();
let context = executor.start(|context| async move {
context
});
drop(context);
}
#[test]
fn test_default_time_zero() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
assert_eq!(
context.current().duration_since(UNIX_EPOCH).unwrap(),
Duration::ZERO
);
});
}
#[test]
fn test_start_time() {
let executor_default = deterministic::Runner::default();
executor_default.start(|context| async move {
assert_eq!(context.current(), UNIX_EPOCH);
});
let start_time = UNIX_EPOCH + Duration::from_secs(100);
let cfg = Config::default().with_start_time(start_time);
let executor = deterministic::Runner::new(cfg);
executor.start(move |context| async move {
assert_eq!(context.current(), start_time);
});
}
#[test]
#[should_panic(expected = "start time must be greater than or equal to unix epoch")]
fn test_bad_start_time() {
let cfg = Config::default().with_start_time(UNIX_EPOCH - Duration::from_secs(1));
deterministic::Runner::new(cfg);
}
#[cfg(not(feature = "external"))]
#[test]
#[should_panic(expected = "runtime stalled")]
fn test_stall() {
let executor = deterministic::Runner::default();
executor.start(|_| async move {
pending::<()>().await;
});
}
#[cfg(not(feature = "external"))]
#[test]
#[should_panic(expected = "runtime stalled")]
fn test_external_simulated() {
let executor = deterministic::Runner::default();
let (tx, rx) = oneshot::channel();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
tx.send(()).unwrap();
});
executor.start(|_| async move {
rx.await.unwrap();
});
}
#[cfg(feature = "external")]
#[test]
fn test_external_realtime() {
let executor = deterministic::Runner::default();
let (tx, rx) = oneshot::channel();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
tx.send(()).unwrap();
});
executor.start(|_| async move {
rx.await.unwrap();
});
}
#[cfg(feature = "external")]
#[test]
fn test_external_realtime_variable() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let start_real = SystemTime::now();
let start_sim = context.current();
let (first_tx, first_rx) = oneshot::channel();
let (second_tx, second_rx) = oneshot::channel();
let (results_tx, mut results_rx) = mpsc::channel(2);
let first_wait = Duration::from_secs(1);
std::thread::spawn(move || {
std::thread::sleep(first_wait);
first_tx.send(()).unwrap();
});
std::thread::spawn(move || {
std::thread::sleep(Duration::ZERO);
second_tx.send(()).unwrap();
});
let first = context.clone().spawn({
let results_tx = results_tx.clone();
move |context| async move {
first_rx.pace(&context, Duration::ZERO).await.unwrap();
let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
assert!(elapsed_real > first_wait);
let elapsed_sim = context.current().duration_since(start_sim).unwrap();
assert!(elapsed_sim < first_wait);
results_tx.send(1).await.unwrap();
}
});
let second = context.clone().spawn(move |context| async move {
second_rx.pace(&context, first_wait).await.unwrap();
let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
assert!(elapsed_real >= first_wait);
let elapsed_sim = context.current().duration_since(start_sim).unwrap();
assert!(elapsed_sim >= first_wait);
results_tx.send(2).await.unwrap();
});
second.await.unwrap();
first.await.unwrap();
let mut results = Vec::new();
for _ in 0..2 {
results.push(results_rx.recv().await.unwrap());
}
assert_eq!(results, vec![1, 2]);
});
}
#[cfg(not(feature = "external"))]
#[test]
fn test_simulated_skip() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.sleep(Duration::from_secs(1)).await;
let metrics = context.encode();
let iterations = metrics
.lines()
.find_map(|line| {
line.strip_prefix("runtime_iterations_total ")
.and_then(|value| value.trim().parse::<u64>().ok())
})
.expect("missing runtime_iterations_total metric");
assert!(iterations < 10);
});
}
#[cfg(feature = "external")]
#[test]
fn test_realtime_no_skip() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.sleep(Duration::from_secs(1)).await;
let metrics = context.encode();
let iterations = metrics
.lines()
.find_map(|line| {
line.strip_prefix("runtime_iterations_total ")
.and_then(|value| value.trim().parse::<u64>().ok())
})
.expect("missing runtime_iterations_total metric");
assert!(iterations > 500);
});
}
#[test]
#[should_panic(expected = "label must start with [a-zA-Z]")]
fn test_metrics_label_empty() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.with_label("");
});
}
#[test]
#[should_panic(expected = "label must start with [a-zA-Z]")]
fn test_metrics_label_invalid_first_char() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.with_label("1invalid");
});
}
#[test]
#[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
fn test_metrics_label_invalid_char() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.with_label("invalid-label");
});
}
#[test]
#[should_panic(expected = "using runtime label is not allowed")]
fn test_metrics_label_reserved_prefix() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.with_label(METRICS_PREFIX);
});
}
#[test]
#[should_panic(expected = "duplicate attribute key: epoch")]
fn test_metrics_duplicate_attribute_panics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let _ = context
.with_label("test")
.with_attribute("epoch", "old")
.with_attribute("epoch", "new");
});
}
#[test]
fn test_storage_fault_injection_and_recovery() {
let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
sync_rate: Some(1.0),
..Default::default()
});
let (result, checkpoint) =
deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
blob.write_at(0, b"data".to_vec()).await.unwrap();
blob.sync().await });
assert!(result.is_err());
deterministic::Runner::from(checkpoint).start(|ctx| async move {
*ctx.storage_fault_config().write() = FaultConfig::default();
let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
assert_eq!(len, 0, "unsynced data should be lost after recovery");
blob.write_at(0, b"recovered".to_vec()).await.unwrap();
blob.sync()
.await
.expect("sync should succeed with faults disabled");
let read_buf = blob.read_at(0, 9).await.unwrap();
assert_eq!(read_buf.coalesce(), b"recovered");
});
}
#[test]
fn test_storage_fault_dynamic_config() {
let executor = deterministic::Runner::default();
executor.start(|ctx| async move {
let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
blob.write_at(0, b"initial".to_vec()).await.unwrap();
blob.sync().await.expect("initial sync should succeed");
let storage_fault_cfg = ctx.storage_fault_config();
storage_fault_cfg.write().sync_rate = Some(1.0);
blob.write_at(0, b"updated".to_vec()).await.unwrap();
let result = blob.sync().await;
assert!(result.is_err(), "sync should fail with faults enabled");
storage_fault_cfg.write().sync_rate = Some(0.0);
blob.sync()
.await
.expect("sync should succeed with faults disabled");
});
}
#[test]
fn test_storage_fault_determinism() {
fn run_with_seed(seed: u64) -> Vec<bool> {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_storage_fault_config(FaultConfig {
open_rate: Some(0.5),
..Default::default()
});
let runner = deterministic::Runner::new(cfg);
runner.start(|ctx| async move {
let mut results = Vec::new();
for i in 0..20 {
let name = format!("blob{i}");
let result = ctx.open("test_determinism", name.as_bytes()).await;
results.push(result.is_ok());
}
results
})
}
let results1 = run_with_seed(12345);
let results2 = run_with_seed(12345);
assert_eq!(
results1, results2,
"same seed should produce same failure pattern"
);
let results3 = run_with_seed(99999);
assert_ne!(
results1, results3,
"different seeds should produce different patterns"
);
}
#[test]
fn test_storage_fault_determinism_multi_task() {
fn run_with_seed(seed: u64) -> Vec<u32> {
let cfg = deterministic::Config::default()
.with_seed(seed)
.with_storage_fault_config(FaultConfig {
open_rate: Some(0.5),
write_rate: Some(0.3),
sync_rate: Some(0.2),
..Default::default()
});
let runner = deterministic::Runner::new(cfg);
runner.start(|ctx| async move {
let mut handles = Vec::new();
for i in 0..5 {
let ctx = ctx.clone();
handles.push(ctx.spawn(move |ctx| async move {
let mut successes = 0u32;
for j in 0..4 {
let name = format!("task{i}_blob{j}");
if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
successes += 1;
if blob.write_at(0, b"data".to_vec()).await.is_ok() {
successes += 1;
}
if blob.sync().await.is_ok() {
successes += 1;
}
}
}
successes
}));
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await.unwrap());
}
results
})
}
let results1 = run_with_seed(42);
let results2 = run_with_seed(42);
assert_eq!(
results1, results2,
"same seed should produce same multi-task pattern"
);
let results3 = run_with_seed(99999);
assert_ne!(
results1, results3,
"different seeds should produce different patterns"
);
}
}