use std::{
ops::ControlFlow,
sync::{
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, RwLock,
},
time::Instant,
};
use log::info;
use crate::{
backend::{workspace::Workspace, BackendSettings, BackendStats, FullMachine},
shared::{
snapshot::log::{MessageType, StepMessage, StepStatus},
BackendInfo, BackendStatus, Request, Response, StepSettings,
},
};
use super::extract_space_info;
struct WorkCommand {
request: Request,
send_to_server: SyncSender<Response>,
}
struct BackendWorker<M: FullMachine, D> {
workspace: Workspace<M, D>,
stats: Arc<RwLock<BackendStats>>,
settings: BackendSettings,
recv_from_server: Receiver<WorkCommand>,
}
impl<M: FullMachine, D> BackendWorker<M, D> {
fn new(
workspace: Workspace<M, D>,
stats: Arc<RwLock<BackendStats>>,
settings: BackendSettings,
recv_from_server: Receiver<WorkCommand>,
) -> Self {
Self {
workspace,
stats,
settings,
recv_from_server,
}
}
fn run(mut self) {
loop {
let worker_request = match self.recv_from_server.recv() {
Ok(ok) => ok,
Err(_) => {
break;
}
};
self.process_request(worker_request);
}
}
fn process_request(&mut self, work_command: WorkCommand) {
enum AsynchronousRequest {
Step(StepSettings),
}
let asynchronous_request = match work_command.request {
Request::InitialContent => None,
Request::GetContent => None,
Request::Query => {
let stats = self
.stats
.read()
.expect("Backend stats should not be poisoned");
let backend_info = backend_info(false, &stats);
let response = Response {
info: backend_info,
snapshot: None,
};
let _ = work_command.send_to_server.send(response);
return;
}
Request::Cancel => {
None
}
Request::Reset => {
self.workspace.framework.reset();
self.workspace.log.add_message(MessageType::Reset);
None
}
Request::Step(step_settings) => Some(AsynchronousRequest::Step(step_settings)),
Request::AddProperty(property) => {
match machine_check_machine::process_property::<M, _>(
self.workspace.framework.machine(),
&property,
self.workspace.property_macros(),
) {
Ok(property) => {
self.workspace.properties.push(property);
}
Err(err) => {
self.workspace.log.error(err.to_string());
}
};
None
}
Request::RemoveProperty(root_property_index) => {
self.workspace.properties.remove(root_property_index.0);
None
}
};
{
let mut stats = self
.stats
.write()
.expect("Backend stats should not be poisoned");
stats.should_cancel = false;
stats.space_info = extract_space_info(&mut self.workspace.framework);
let worker_busy = asynchronous_request.is_some();
let backend_info = backend_info(worker_busy, &stats);
let response = Response {
info: backend_info,
snapshot: Some(self.workspace.generate_snapshot(&self.settings)),
};
let _ = work_command.send_to_server.send(response);
}
match asynchronous_request {
Some(AsynchronousRequest::Step(step_settings)) => self.backend_step(step_settings),
None => {}
}
}
fn backend_step(&mut self, step_settings: StepSettings) {
info!("Starting stepping.");
let start_instant = Instant::now();
let mut num_refinements = 0;
let mut cancelled = false;
loop {
if let Some(max_refinements) = step_settings.max_refinements {
if num_refinements >= max_refinements {
break;
}
}
if self.update_stats_check_cancel() {
info!("Cancelling stepping.");
cancelled = true;
break;
}
if let ControlFlow::Break(_) = self
.workspace
.framework
.step_verification(&step_settings.selected_property)
{
break;
}
num_refinements += 1;
}
self.workspace.framework.make_compact();
let duration = start_instant.elapsed();
self.workspace
.log
.add_message(MessageType::Step(StepMessage {
status: if cancelled {
StepStatus::Cancelled
} else {
StepStatus::Completed
},
num_refinements,
duration,
}));
self.update_stats_check_cancel();
info!("Stepping done.");
}
fn update_stats_check_cancel(&mut self) -> bool {
let mut stats_guard = self
.stats
.write()
.expect("Backend stats should not be poisoned");
stats_guard.space_info = extract_space_info(&mut self.workspace.framework);
stats_guard.should_cancel
}
}
pub struct BackendSync {
stats: Arc<RwLock<BackendStats>>,
send_to_worker: SyncSender<WorkCommand>,
}
impl BackendSync {
pub fn new<M: FullMachine, D: Send + 'static>(
workspace: Workspace<M, D>,
stats: BackendStats,
settings: BackendSettings,
) -> BackendSync {
let stats = Arc::new(RwLock::new(stats));
let (send_to_worker, recv_from_server) = sync_channel(0);
let worker_stats = Arc::clone(&stats);
std::thread::Builder::new()
.name(String::from("backend worker"))
.spawn(|| BackendWorker::new(workspace, worker_stats, settings, recv_from_server).run())
.expect("Worker thread should be spawned");
BackendSync {
stats,
send_to_worker,
}
}
pub fn command(&self, request: Request) -> Response {
let is_cancel = matches!(request, Request::Cancel);
match self.try_execute_worker(request) {
Ok(ok) => ok,
Err(_) => {
let info = if is_cancel {
let mut stats = self.lock_stats_write();
stats.should_cancel = true;
backend_info(true, &stats)
} else {
let stats = self.lock_stats_read();
backend_info(true, &stats)
};
Response {
info,
snapshot: None,
}
}
}
}
fn try_execute_worker(&self, request: Request) -> Result<Response, ()> {
let (send_to_server, recv_from_worker) = sync_channel(1);
let is_initial_content_request = matches!(request, Request::InitialContent);
let worker_request = WorkCommand {
request,
send_to_server,
};
if is_initial_content_request {
if self.send_to_worker.send(worker_request).is_err() {
panic!("Backend worker should not disconnect (service sending)");
}
} else {
match self.send_to_worker.try_send(worker_request) {
Ok(_) => {}
Err(TrySendError::Full(_)) => {
return Err(());
}
Err(TrySendError::Disconnected(_)) => {
panic!("Backend worker should not disconnect (service sending)");
}
};
}
let response = recv_from_worker
.recv()
.expect("Backend worker should not disconnect (service receiving)");
Ok(response)
}
fn lock_stats_read(&self) -> std::sync::RwLockReadGuard<'_, BackendStats> {
self.stats
.read()
.expect("Backend stats should not be poisoned")
}
fn lock_stats_write(&self) -> std::sync::RwLockWriteGuard<'_, BackendStats> {
self.stats
.write()
.expect("Backend stats should not be poisoned")
}
}
fn backend_info(worker_busy: bool, stats: &BackendStats) -> BackendInfo {
let status = if worker_busy {
if stats.should_cancel {
BackendStatus::Cancelling
} else {
BackendStatus::Running
}
} else {
BackendStatus::Waiting
};
BackendInfo {
status,
space_info: stats.space_info.clone(),
}
}