use crate::{
network::{
audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
metered::Network as MeteredNetwork,
},
storage::{
audited::Storage as AuditedStorage, memory::Storage as MemStorage,
metered::Storage as MeteredStorage,
},
telemetry::metrics::task::Label,
utils::signal::{Signal, Stopper},
Clock, Error, Handle, ListenerOf, METRICS_PREFIX,
};
use commonware_macros::select;
use commonware_utils::{hex, SystemTimeExt};
use futures::{
future::AbortHandle,
task::{waker_ref, ArcWake},
Future,
};
use governor::clock::{Clock as GClock, ReasonablyRealtime};
use prometheus_client::{
encoding::text::encode,
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::{Metric, Registry},
};
use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
use sha2::{Digest, Sha256};
use std::{
collections::{BinaryHeap, HashMap},
mem::replace,
net::SocketAddr,
pin::Pin,
sync::{Arc, Mutex, Weak},
task::{self, Poll, Waker},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tracing::trace;
pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
#[derive(Debug)]
struct Metrics {
tasks_spawned: Family<Label, Counter>,
tasks_running: Family<Label, Gauge>,
task_polls: Family<Label, Counter>,
network_bandwidth: Counter,
}
impl Metrics {
pub fn init(registry: &mut Registry) -> Self {
let metrics = Self {
task_polls: Family::default(),
tasks_spawned: Family::default(),
tasks_running: Family::default(),
network_bandwidth: Counter::default(),
};
registry.register(
"tasks_spawned",
"Total number of tasks spawned",
metrics.tasks_spawned.clone(),
);
registry.register(
"tasks_running",
"Number of tasks currently running",
metrics.tasks_running.clone(),
);
registry.register(
"task_polls",
"Total number of task polls",
metrics.task_polls.clone(),
);
registry.register(
"bandwidth",
"Total amount of data sent over network",
metrics.network_bandwidth.clone(),
);
metrics
}
}
pub struct Auditor {
hash: Mutex<Vec<u8>>,
}
impl Default for Auditor {
fn default() -> Self {
Self {
hash: Vec::new().into(),
}
}
}
impl Auditor {
pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
where
F: FnOnce(&mut Sha256),
{
let mut hash = self.hash.lock().unwrap();
let mut hasher = Sha256::new();
hasher.update(&*hash);
hasher.update(label);
payload(&mut hasher);
*hash = hasher.finalize().to_vec();
}
pub fn state(&self) -> String {
let hash = self.hash.lock().unwrap().clone();
hex(&hash)
}
}
#[derive(Clone)]
pub struct Config {
seed: u64,
cycle: Duration,
timeout: Option<Duration>,
}
impl Config {
pub fn new() -> Self {
Self {
seed: 42,
cycle: Duration::from_millis(1),
timeout: None,
}
}
pub fn with_seed(mut self, seed: u64) -> Self {
self.seed = seed;
self
}
pub fn with_cycle(mut self, cycle: Duration) -> Self {
self.cycle = cycle;
self
}
pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
pub fn seed(&self) -> u64 {
self.seed
}
pub fn cycle(&self) -> Duration {
self.cycle
}
pub fn timeout(&self) -> Option<Duration> {
self.timeout
}
pub fn assert(&self) {
assert!(
self.cycle != Duration::default() || self.timeout.is_none(),
"cycle duration must be non-zero when timeout is set",
);
}
}
impl Default for Config {
fn default() -> Self {
Self::new()
}
}
pub struct Executor {
registry: Mutex<Registry>,
cycle: Duration,
deadline: Option<SystemTime>,
metrics: Arc<Metrics>,
auditor: Arc<Auditor>,
rng: Mutex<StdRng>,
time: Mutex<SystemTime>,
tasks: Arc<Tasks>,
sleeping: Mutex<BinaryHeap<Alarm>>,
partitions: Mutex<HashMap<String, Partition>>,
shutdown: Mutex<Stopper>,
finished: Mutex<bool>,
recovered: Mutex<bool>,
}
enum State {
Config(Config),
Context(Context),
}
pub struct Runner {
state: State,
}
impl From<Config> for Runner {
fn from(cfg: Config) -> Self {
Self::new(cfg)
}
}
impl From<Context> for Runner {
fn from(context: Context) -> Self {
Self {
state: State::Context(context),
}
}
}
impl Runner {
pub fn new(cfg: Config) -> Self {
cfg.assert();
Runner {
state: State::Config(cfg),
}
}
pub fn seeded(seed: u64) -> Self {
let cfg = Config {
seed,
..Config::default()
};
Self::new(cfg)
}
pub fn timed(timeout: Duration) -> Self {
let cfg = Config {
timeout: Some(timeout),
..Config::default()
};
Self::new(cfg)
}
}
impl Default for Runner {
fn default() -> Self {
Self::new(Config::default())
}
}
impl crate::Runner for Runner {
type Context = Context;
fn start<F, Fut>(self, f: F) -> Fut::Output
where
F: FnOnce(Self::Context) -> Fut,
Fut: Future,
{
let context = match self.state {
State::Config(config) => Context::new(config),
State::Context(context) => context,
};
let executor = context.executor.clone();
let mut root = Box::pin(f(context));
Tasks::register_root(&executor.tasks);
let mut iter = 0;
loop {
{
let current = executor.time.lock().unwrap();
if let Some(deadline) = executor.deadline {
if *current >= deadline {
panic!("runtime timeout");
}
}
}
let mut tasks = executor.tasks.drain();
{
let mut rng = executor.rng.lock().unwrap();
tasks.shuffle(&mut *rng);
}
trace!(iter, tasks = tasks.len(), "starting loop");
for task in tasks {
executor.auditor.event(b"process_task", |hasher| {
hasher.update(task.id.to_be_bytes());
hasher.update(task.label.name().as_bytes());
});
trace!(id = task.id, "processing task");
executor.metrics.task_polls.get_or_create(&task.label).inc();
let waker = waker_ref(&task);
let mut cx = task::Context::from_waker(&waker);
match &task.operation {
Operation::Root => {
if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
trace!(id = task.id, "task is complete");
*executor.finished.lock().unwrap() = true;
return output;
}
}
Operation::Work { future, completed } => {
if *completed.lock().unwrap() {
trace!(id = task.id, "dropping already complete task");
continue;
}
let mut fut = future.lock().unwrap();
if fut.as_mut().poll(&mut cx).is_ready() {
trace!(id = task.id, "task is complete");
*completed.lock().unwrap() = true;
continue;
}
}
}
trace!(id = task.id, "task is still pending");
}
let mut current;
{
let mut time = executor.time.lock().unwrap();
*time = time
.checked_add(executor.cycle)
.expect("executor time overflowed");
current = *time;
}
trace!(now = current.epoch_millis(), "time advanced");
if executor.tasks.len() == 0 {
let mut skip = None;
{
let sleeping = executor.sleeping.lock().unwrap();
if let Some(next) = sleeping.peek() {
if next.time > current {
skip = Some(next.time);
}
}
}
if let Some(skip_time) = skip {
{
let mut time = executor.time.lock().unwrap();
*time = skip_time;
current = *time;
}
trace!(now = current.epoch_millis(), "time skipped");
}
}
let mut to_wake = Vec::new();
let mut remaining;
{
let mut sleeping = executor.sleeping.lock().unwrap();
while let Some(next) = sleeping.peek() {
if next.time <= current {
let sleeper = sleeping.pop().unwrap();
to_wake.push(sleeper.waker);
} else {
break;
}
}
remaining = sleeping.len();
}
for waker in to_wake {
waker.wake();
}
remaining += executor.tasks.len();
if remaining == 0 {
panic!("runtime stalled");
}
iter += 1;
}
}
}
enum Operation {
Root,
Work {
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
completed: Mutex<bool>,
},
}
struct Task {
id: u128,
label: Label,
tasks: Weak<Tasks>,
operation: Operation,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
if let Some(tasks) = arc_self.tasks.upgrade() {
tasks.enqueue(arc_self.clone());
}
}
}
struct Tasks {
counter: Mutex<u128>,
queue: Mutex<Vec<Arc<Task>>>,
root_registered: Mutex<bool>,
}
impl Tasks {
fn new() -> Self {
Self {
counter: Mutex::new(0),
queue: Mutex::new(Vec::new()),
root_registered: Mutex::new(false),
}
}
fn increment(&self) -> u128 {
let mut counter = self.counter.lock().unwrap();
let old = *counter;
*counter = counter.checked_add(1).expect("task counter overflow");
old
}
fn register_root(arc_self: &Arc<Self>) {
{
let mut registered = arc_self.root_registered.lock().unwrap();
assert!(!*registered, "root already registered");
*registered = true;
}
let id = arc_self.increment();
let mut queue = arc_self.queue.lock().unwrap();
queue.push(Arc::new(Task {
id,
label: Label::root(),
tasks: Arc::downgrade(arc_self),
operation: Operation::Root,
}));
}
fn register_work(
arc_self: &Arc<Self>,
label: Label,
future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) {
let id = arc_self.increment();
let mut queue = arc_self.queue.lock().unwrap();
queue.push(Arc::new(Task {
id,
label,
tasks: Arc::downgrade(arc_self),
operation: Operation::Work {
future: Mutex::new(future),
completed: Mutex::new(false),
},
}));
}
fn enqueue(&self, task: Arc<Task>) {
let mut queue = self.queue.lock().unwrap();
queue.push(task);
}
fn drain(&self) -> Vec<Arc<Task>> {
let mut queue = self.queue.lock().unwrap();
let len = queue.len();
replace(&mut *queue, Vec::with_capacity(len))
}
fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
}
type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
pub struct Context {
name: String,
spawned: bool,
executor: Arc<Executor>,
network: Arc<Network>,
storage: MeteredStorage<AuditedStorage<MemStorage>>,
children: Arc<Mutex<Vec<AbortHandle>>>,
}
impl Default for Context {
fn default() -> Self {
Self::new(Config::default())
}
}
impl Context {
pub fn new(cfg: Config) -> Self {
let mut registry = Registry::default();
let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
let metrics = Arc::new(Metrics::init(runtime_registry));
let start_time = UNIX_EPOCH;
let deadline = cfg
.timeout
.map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
let auditor = Arc::new(Auditor::default());
let storage = MeteredStorage::new(
AuditedStorage::new(MemStorage::default(), auditor.clone()),
runtime_registry,
);
let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
let network = MeteredNetwork::new(network, runtime_registry);
let executor = Arc::new(Executor {
registry: Mutex::new(registry),
cycle: cfg.cycle,
deadline,
metrics: metrics.clone(),
auditor: auditor.clone(),
rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
time: Mutex::new(start_time),
tasks: Arc::new(Tasks::new()),
sleeping: Mutex::new(BinaryHeap::new()),
partitions: Mutex::new(HashMap::new()),
shutdown: Mutex::new(Stopper::default()),
finished: Mutex::new(false),
recovered: Mutex::new(false),
});
Context {
name: String::new(),
spawned: false,
executor: executor.clone(),
network: Arc::new(network),
storage,
children: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn recover(self) -> Self {
if !*self.executor.finished.lock().unwrap() {
panic!("execution is not finished");
}
{
let mut recovered = self.executor.recovered.lock().unwrap();
if *recovered {
panic!("runtime has already been recovered");
}
*recovered = true;
}
let mut registry = Registry::default();
let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
let metrics = Arc::new(Metrics::init(runtime_registry));
let auditor = self.executor.auditor.clone();
let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
let network = MeteredNetwork::new(network, runtime_registry);
let executor = Arc::new(Executor {
cycle: self.executor.cycle,
deadline: self.executor.deadline,
auditor: auditor.clone(),
rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
time: Mutex::new(*self.executor.time.lock().unwrap()),
partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
registry: Mutex::new(registry),
metrics: metrics.clone(),
tasks: Arc::new(Tasks::new()),
sleeping: Mutex::new(BinaryHeap::new()),
shutdown: Mutex::new(Stopper::default()),
finished: Mutex::new(false),
recovered: Mutex::new(false),
});
Self {
name: String::new(),
spawned: false,
executor,
network: Arc::new(network),
storage: self.storage,
children: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn auditor(&self) -> &Auditor {
&self.executor.auditor
}
}
impl Clone for Context {
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
spawned: false,
executor: self.executor.clone(),
network: self.network.clone(),
storage: self.storage.clone(),
children: self.children.clone(),
}
}
}
impl crate::Spawner for Context {
fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
assert!(!self.spawned, "already spawned");
let (label, gauge) = spawn_metrics!(self, future);
let executor = self.executor.clone();
let children = Arc::new(Mutex::new(Vec::new()));
self.children = children.clone();
let future = f(self);
let (f, handle) = Handle::init_future(future, gauge, false, children);
Tasks::register_work(&executor.tasks, label, Box::pin(f));
handle
}
fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
assert!(!self.spawned, "already spawned");
self.spawned = true;
let (label, gauge) = spawn_metrics!(self, future);
let executor = self.executor.clone();
move |f: F| {
let (f, handle) =
Handle::init_future(f, gauge, false, Arc::new(Mutex::new(Vec::new())));
Tasks::register_work(&executor.tasks, label, Box::pin(f));
handle
}
}
fn spawn_child<F, Fut, T>(self, f: F) -> Handle<T>
where
F: FnOnce(Self) -> Fut + Send + 'static,
Fut: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let parent_children = self.children.clone();
let child_handle = self.spawn(f);
if let Some(abort_handle) = child_handle.abort_handle() {
parent_children.lock().unwrap().push(abort_handle);
}
child_handle
}
fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
where
F: FnOnce(Self) -> T + Send + 'static,
T: Send + 'static,
{
assert!(!self.spawned, "already spawned");
let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
let executor = self.executor.clone();
let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
let f = async move { f() };
Tasks::register_work(&executor.tasks, label, Box::pin(f));
handle
}
fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
assert!(!self.spawned, "already spawned");
self.spawned = true;
let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
let executor = self.executor.clone();
move |f: F| {
let (f, handle) = Handle::init_blocking(f, gauge, false);
let f = async move { f() };
Tasks::register_work(&executor.tasks, label, Box::pin(f));
handle
}
}
async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
self.executor.auditor.event(b"stop", |hasher| {
hasher.update(value.to_be_bytes());
});
let stop_resolved = {
let mut shutdown = self.executor.shutdown.lock().unwrap();
shutdown.stop(value)
};
let timeout_future = match timeout {
Some(duration) => futures::future::Either::Left(self.sleep(duration)),
None => futures::future::Either::Right(futures::future::pending()),
};
select! {
result = stop_resolved => {
result.map_err(|_| Error::Closed)?;
Ok(())
},
_ = timeout_future => {
Err(Error::Timeout)
}
}
}
fn stopped(&self) -> Signal {
self.executor.auditor.event(b"stopped", |_| {});
self.executor.shutdown.lock().unwrap().stopped()
}
}
impl crate::Metrics for Context {
fn with_label(&self, label: &str) -> Self {
let name = {
let prefix = self.name.clone();
if prefix.is_empty() {
label.to_string()
} else {
format!("{prefix}_{label}")
}
};
assert!(
!name.starts_with(METRICS_PREFIX),
"using runtime label is not allowed"
);
Self {
name,
spawned: false,
executor: self.executor.clone(),
network: self.network.clone(),
storage: self.storage.clone(),
children: self.children.clone(),
}
}
fn label(&self) -> String {
self.name.clone()
}
fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
let name = name.into();
let help = help.into();
self.executor.auditor.event(b"register", |hasher| {
hasher.update(name.as_bytes());
hasher.update(help.as_bytes());
});
let prefixed_name = {
let prefix = &self.name;
if prefix.is_empty() {
name
} else {
format!("{}_{}", *prefix, name)
}
};
self.executor
.registry
.lock()
.unwrap()
.register(prefixed_name, help, metric)
}
fn encode(&self) -> String {
self.executor.auditor.event(b"encode", |_| {});
let mut buffer = String::new();
encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
buffer
}
}
struct Sleeper {
executor: Arc<Executor>,
time: SystemTime,
registered: bool,
}
struct Alarm {
time: SystemTime,
waker: Waker,
}
impl PartialEq for Alarm {
fn eq(&self, other: &Self) -> bool {
self.time.eq(&other.time)
}
}
impl Eq for Alarm {}
impl PartialOrd for Alarm {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Alarm {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.time.cmp(&self.time)
}
}
impl Future for Sleeper {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
{
let current_time = *self.executor.time.lock().unwrap();
if current_time >= self.time {
return Poll::Ready(());
}
}
if !self.registered {
self.registered = true;
self.executor.sleeping.lock().unwrap().push(Alarm {
time: self.time,
waker: cx.waker().clone(),
});
}
Poll::Pending
}
}
impl Clock for Context {
fn current(&self) -> SystemTime {
*self.executor.time.lock().unwrap()
}
fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
let deadline = self
.current()
.checked_add(duration)
.expect("overflow when setting wake time");
self.sleep_until(deadline)
}
fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
Sleeper {
executor: self.executor.clone(),
time: deadline,
registered: false,
}
}
}
impl GClock for Context {
type Instant = SystemTime;
fn now(&self) -> Self::Instant {
self.current()
}
}
impl ReasonablyRealtime for Context {}
impl crate::Network for Context {
type Listener = ListenerOf<Network>;
async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
self.network.bind(socket).await
}
async fn dial(
&self,
socket: SocketAddr,
) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
self.network.dial(socket).await
}
}
impl RngCore for Context {
fn next_u32(&mut self) -> u32 {
self.executor.auditor.event(b"rand", |hasher| {
hasher.update(b"next_u32");
});
self.executor.rng.lock().unwrap().next_u32()
}
fn next_u64(&mut self) -> u64 {
self.executor.auditor.event(b"rand", |hasher| {
hasher.update(b"next_u64");
});
self.executor.rng.lock().unwrap().next_u64()
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
self.executor.auditor.event(b"rand", |hasher| {
hasher.update(b"fill_bytes");
});
self.executor.rng.lock().unwrap().fill_bytes(dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
self.executor.auditor.event(b"rand", |hasher| {
hasher.update(b"try_fill_bytes");
});
self.executor.rng.lock().unwrap().try_fill_bytes(dest)
}
}
impl CryptoRng for Context {}
impl crate::Storage for Context {
type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
self.storage.open(partition, name).await
}
async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
self.storage.remove(partition, name).await
}
async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
self.storage.scan(partition).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
use commonware_macros::test_traced;
use futures::task::noop_waker;
fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
let executor = deterministic::Runner::seeded(seed);
run_tasks(5, executor)
}
#[test]
fn test_same_seed_same_order() {
let mut outputs = Vec::new();
for seed in 0..1000 {
let output = run_with_seed(seed);
outputs.push(output);
}
for seed in 0..1000 {
let output = run_with_seed(seed);
assert_eq!(output, outputs[seed as usize]);
}
}
#[test_traced("TRACE")]
fn test_different_seeds_different_order() {
let output1 = run_with_seed(12345);
let output2 = run_with_seed(54321);
assert_ne!(output1, output2);
}
#[test]
fn test_alarm_min_heap() {
let now = SystemTime::now();
let alarms = vec![
Alarm {
time: now + Duration::new(10, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(5, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(15, 0),
waker: noop_waker(),
},
Alarm {
time: now + Duration::new(5, 0),
waker: noop_waker(),
},
];
let mut heap = BinaryHeap::new();
for alarm in alarms {
heap.push(alarm);
}
let mut sorted_times = Vec::new();
while let Some(alarm) = heap.pop() {
sorted_times.push(alarm.time);
}
assert_eq!(
sorted_times,
vec![
now + Duration::new(5, 0),
now + Duration::new(5, 0),
now + Duration::new(10, 0),
now + Duration::new(15, 0),
]
);
}
#[test]
#[should_panic(expected = "runtime timeout")]
fn test_timeout() {
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|context| async move {
loop {
context.sleep(Duration::from_secs(1)).await;
}
});
}
#[test]
#[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
fn test_bad_timeout() {
let cfg = Config {
timeout: Some(Duration::default()),
cycle: Duration::default(),
..Config::default()
};
deterministic::Runner::new(cfg);
}
#[test]
fn test_recover_synced_storage_persists() {
let executor1 = deterministic::Runner::default();
let partition = "test_partition";
let name = b"test_blob";
let data = b"Hello, world!";
let (context, state) = executor1.start(|context| async move {
let (blob, _) = context.open(partition, name).await.unwrap();
blob.write_at(Vec::from(data), 0).await.unwrap();
blob.sync().await.unwrap();
let state = context.auditor().state();
(context, state)
});
let recovered_context = context.recover();
assert_eq!(state, recovered_context.auditor().state());
let executor = Runner::from(recovered_context);
executor.start(|context| async move {
let (blob, len) = context.open(partition, name).await.unwrap();
assert_eq!(len, data.len() as u64);
let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
assert_eq!(read.as_ref(), data);
});
}
#[test]
fn test_recover_unsynced_storage_does_not_persist() {
let executor = deterministic::Runner::default();
let partition = "test_partition";
let name = b"test_blob";
let data = Vec::from("Hello, world!");
let context = executor.start(|context| async move {
let context = context.clone();
let (blob, _) = context.open(partition, name).await.unwrap();
blob.write_at(data, 0).await.unwrap();
context
});
let context = context.recover();
let executor = Runner::from(context);
executor.start(|context| async move {
let (_, len) = context.open(partition, name).await.unwrap();
assert_eq!(len, 0);
});
}
#[test]
#[should_panic(expected = "execution is not finished")]
fn test_recover_before_finish_panics() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
context.recover();
});
}
#[test]
#[should_panic(expected = "runtime has already been recovered")]
fn test_recover_twice_panics() {
let executor = deterministic::Runner::default();
let context = executor.start(|context| async move { context });
let cloned_context = context.clone();
context.recover();
cloned_context.recover();
}
#[test]
fn test_default_time_zero() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
assert_eq!(
context.current().duration_since(UNIX_EPOCH).unwrap(),
Duration::ZERO
);
});
}
}