#![allow(unused_imports)]
use std::collections::HashMap;
use std::io::prelude::*;
use std::sync::Arc;
use std::sync::Condvar;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use rayon::prelude::*;
use uuid::Uuid;
use crate::compute::cache::ComputationResultCache;
use crate::compute::cache::ParsingCache;
use crate::compute::computation::Computation;
use crate::compute::computation::ComputationProgress;
use crate::compute::computation::ComputationStatus;
use crate::compute::computation::Value;
use crate::compute::state::State;
use crate::symbolic::core::Expr;
#[allow(dead_code)]
#[derive(Clone)]
pub struct ComputeEngine {
computations: Arc<RwLock<HashMap<String, Arc<Mutex<Computation>>>>>,
parsing_cache: Arc<ParsingCache>,
result_cache: Arc<ComputationResultCache>,
}
impl ComputeEngine {
#[must_use]
pub fn new() -> Self {
Self {
computations: Arc::new(RwLock::new(HashMap::new())),
parsing_cache: Arc::new(ParsingCache::new()),
result_cache: Arc::new(ComputationResultCache::new()),
}
}
pub fn parse_and_submit(
&self,
input: &str,
) -> Result<String, String> {
let expr = match self.parsing_cache.get(input) {
| Some(expr) => expr,
| None => {
match crate::input::parser::parse_expr(input) {
| Ok((_, expr)) => {
let expr = Arc::new(expr);
self.parsing_cache.set(input.to_string(), expr.clone());
expr
},
| Err(e) => return Err(e.to_string()),
}
},
};
Ok(self.submit(expr))
}
#[must_use]
pub fn get_status(
&self,
id: &str,
) -> Option<ComputationStatus> {
let computations = self.computations.read().expect(
"ComputeEngine \
computations lock \
poisoned",
);
computations.get(id).map(|comp| {
comp.lock()
.expect(
"Computation \
lock poisoned",
)
.status
.clone()
})
}
#[must_use]
pub fn get_progress(
&self,
id: &str,
) -> Option<ComputationProgress> {
let computations = self.computations.read().expect(
"ComputeEngine \
computations lock \
poisoned",
);
computations.get(id).map(|comp| {
comp.lock()
.expect(
"Computation \
lock poisoned",
)
.progress
.clone()
})
}
#[must_use]
pub fn get_result(
&self,
id: &str,
) -> Option<Value> {
let computations = self.computations.read().expect(
"ComputeEngine \
computations lock \
poisoned",
);
computations.get(id).and_then(|comp| {
comp.lock()
.expect(
"Computation \
lock poisoned",
)
.result
.clone()
})
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn submit(
&self,
expr: Arc<Expr>,
) -> String {
let id = Uuid::new_v4().to_string();
let pause = Arc::new((Mutex::new(false), Condvar::new()));
let computation = Arc::new(Mutex::new(Computation {
id: id.clone(),
expr,
status: ComputationStatus::Pending,
progress: ComputationProgress {
percentage: 0.0,
description: "Pending".to_string(),
},
result: None,
cancel_signal: Arc::new(AtomicBool::new(false)),
state: State {
intermediate_value: String::new(),
},
pause: pause.clone(),
}));
{
let mut computations = self.computations.write().expect(
"ComputeEngine \
computations \
lock poisoned",
);
computations.insert(id.clone(), computation.clone());
}
let _engine = self.clone();
let result_cache = self.result_cache.clone();
rayon::spawn(move || {
let (lock, cvar) = &*pause;
let mut comp_guard = computation.lock().expect(
"Computation \
lock poisoned",
);
comp_guard.status = ComputationStatus::Running;
for i in 0u8..100u8 {
let mut paused = lock.lock().expect(
"Pause lock \
poisoned",
);
while *paused {
comp_guard.status = ComputationStatus::Paused;
println!(
"Computation \
{} paused.",
comp_guard.id
);
paused = cvar.wait(paused).expect("Condition variable wait failed");
}
drop(paused);
comp_guard.status = ComputationStatus::Running;
if comp_guard.status == ComputationStatus::Failed("Cancelled".to_string()) {
println!("Computation {} cancelled.", comp_guard.id);
return;
}
std::thread::sleep(std::time::Duration::from_millis(50));
comp_guard.progress.percentage = f32::from(i);
comp_guard.progress.description = format!("{i}% complete");
}
comp_guard.status = ComputationStatus::Completed;
comp_guard.progress.percentage = 100.0;
comp_guard.progress.description = "Completed".to_string();
let result = "Result of the \
computation"
.to_string();
comp_guard.result = Some(result.clone());
result_cache.set(comp_guard.expr.clone(), result);
});
id
}
pub fn pause(
&self,
id: &str,
) {
let computation = self
.computations
.read()
.expect(
"ComputeEngine \
computations lock \
poisoned",
)
.get(id)
.cloned();
if let Some(computation) = computation {
let pause = {
let comp = computation.lock().expect(
"Computation \
lock poisoned",
);
comp.pause.clone()
};
{
let mut paused = pause.0.lock().expect(
"Pause lock \
poisoned",
);
*paused = true;
}
pause.1.notify_one();
}
}
pub fn resume(
&self,
id: &str,
) {
let computation = self
.computations
.read()
.expect(
"ComputeEngine \
computations lock \
poisoned",
)
.get(id)
.cloned();
if let Some(computation) = computation {
let pause = {
let comp = computation.lock().expect(
"Computation \
lock poisoned",
);
comp.pause.clone()
};
{
let mut paused = pause.0.lock().expect(
"Pause lock \
poisoned",
);
*paused = false;
}
pause.1.notify_one();
}
}
pub fn cancel(
&self,
id: &str,
) {
let computation = self
.computations
.read()
.expect(
"ComputeEngine \
computations lock \
poisoned",
)
.get(id)
.cloned();
if let Some(computation) = computation {
let pause = {
let mut comp = computation.lock().expect(
"Computation \
lock poisoned",
);
comp.status = ComputationStatus::Failed("Cancelled".to_string());
comp.pause.clone()
};
{
let mut paused = pause.0.lock().expect(
"Pause lock \
poisoned",
);
*paused = false;
}
pause.1.notify_one();
}
self.computations
.write()
.expect(
"ComputeEngine \
computations lock \
poisoned",
)
.remove(id);
}
}
impl Default for ComputeEngine {
fn default() -> Self {
Self::new()
}
}