use crate::realtime::worker::progress::{ProgressHint, ProgressHintSender, SwitchProgressHintOk};
use super::*;
struct SmokeTestEnvironment;
#[derive(Default)]
struct SmokeTestWorker {
start_working_invocations: usize,
finish_working_invocations: usize,
actual_perform_work_invocations: usize,
expected_perform_work_invocations: usize,
}
impl SmokeTestWorker {
fn new(expected_perform_work_invocations: usize) -> Self {
Self {
expected_perform_work_invocations,
..Default::default()
}
}
}
impl Worker for SmokeTestWorker {
type Environment = SmokeTestEnvironment;
fn start_working(&mut self, _env: &mut Self::Environment) -> Result<()> {
self.start_working_invocations += 1;
Ok(())
}
fn finish_working(&mut self, _env: &mut Self::Environment) -> Result<()> {
self.finish_working_invocations += 1;
Ok(())
}
fn perform_work(
&mut self,
_env: &Self::Environment,
progress_hint_rx: &ProgressHintReceiver,
) -> Result<CompletionStatus> {
self.actual_perform_work_invocations += 1;
let progress = match progress_hint_rx.peek() {
ProgressHint::Continue => {
assert!(
self.actual_perform_work_invocations <= self.expected_perform_work_invocations
);
if self.actual_perform_work_invocations < self.expected_perform_work_invocations {
CompletionStatus::Suspending
} else {
CompletionStatus::Finishing
}
}
ProgressHint::Suspend => CompletionStatus::Suspending,
ProgressHint::Finish => CompletionStatus::Finishing,
};
Ok(progress)
}
}
#[test]
fn smoke_test() -> anyhow::Result<()> {
for expected_perform_work_invocations in 1..10 {
let worker = SmokeTestWorker::new(expected_perform_work_invocations);
let progress_hint_rx = ProgressHintReceiver::default();
let progress_hint_tx = ProgressHintSender::attach(&progress_hint_rx);
let context = Context {
progress_hint_rx,
worker,
environment: SmokeTestEnvironment,
};
let worker_thread = WorkerThread::spawn(context, ThreadScheduling::Default);
let mut resume_accepted = 0;
loop {
match worker_thread.load_state() {
State::Initial | State::Starting | State::Finishing | State::Running => (),
State::Suspending => match progress_hint_tx.resume() {
Ok(SwitchProgressHintOk::Accepted { .. }) => {
resume_accepted += 1;
}
Ok(SwitchProgressHintOk::Ignored) | Err(_) => (),
},
State::Terminating => {
break;
}
}
}
match worker_thread.join() {
JoinedThread::Terminated(TerminatedThread {
context:
Context {
progress_hint_rx: _,
worker,
environment: _,
},
result,
}) => {
result?;
assert_eq!(1, worker.start_working_invocations);
assert_eq!(1, worker.finish_working_invocations);
assert_eq!(
expected_perform_work_invocations,
worker.actual_perform_work_invocations
);
assert_eq!(expected_perform_work_invocations, resume_accepted + 1,);
}
JoinedThread::JoinError(err) => {
return Err(anyhow::anyhow!("Failed to join worker thread: {:?}", err))
}
}
}
Ok(())
}
#[test]
fn suspend_before_starting_and_finish_while_suspended() -> anyhow::Result<()> {
let expected_perform_work_invocations = 0;
let worker = SmokeTestWorker::new(expected_perform_work_invocations);
let progress_hint_rx = ProgressHintReceiver::default();
let progress_hint_tx = ProgressHintSender::attach(&progress_hint_rx);
assert!(matches!(
progress_hint_tx.suspend(),
Ok(SwitchProgressHintOk::Accepted {
previous_state: ProgressHint::Continue,
})
));
let context = Context {
progress_hint_rx,
worker,
environment: SmokeTestEnvironment,
};
let worker_thread = WorkerThread::spawn(context, ThreadScheduling::Default);
assert_eq!(State::Suspending, worker_thread.wait_until_not_running());
assert!(matches!(
progress_hint_tx.finish(),
Ok(SwitchProgressHintOk::Accepted {
previous_state: ProgressHint::Suspend,
})
));
match worker_thread.join() {
JoinedThread::Terminated(TerminatedThread {
context:
Context {
progress_hint_rx: _,
worker,
environment: _,
},
result,
}) => {
result?;
assert_eq!(1, worker.start_working_invocations);
assert_eq!(1, worker.finish_working_invocations);
assert_eq!(2, worker.actual_perform_work_invocations);
}
JoinedThread::JoinError(err) => {
return Err(anyhow::anyhow!("Failed to join worker thread: {:?}", err))
}
}
Ok(())
}