use std::collections::HashMap;
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use hdrhistogram::Histogram;
use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError};
use rune::termcolor::{ColorChoice, StandardStream};
use rune::{Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm};
use try_lock::TryLock;
use crate::error::LatteError;
use crate::{context, CassError, CassErrorKind, Context, SessionStats};
struct SessionRef<'a> {
context: &'a Context,
}
impl SessionRef<'_> {
pub fn new(context: &Context) -> SessionRef {
SessionRef { context }
}
}
impl<'a> ToValue for SessionRef<'a> {
fn to_value(self) -> Result<Value, VmError> {
let obj = unsafe { AnyObj::from_ref(self.context) };
Ok(Value::from(Shared::new(obj)))
}
}
struct ContextRefMut<'a> {
context: &'a mut Context,
}
impl ContextRefMut<'_> {
pub fn new(context: &mut Context) -> ContextRefMut {
ContextRefMut { context }
}
}
impl<'a> ToValue for ContextRefMut<'a> {
fn to_value(self) -> Result<Value, VmError> {
let obj = unsafe { AnyObj::from_mut(self.context) };
Ok(Value::from(Shared::new(obj)))
}
}
#[derive(Debug, Copy, Clone)]
pub struct FnRef {
name: &'static str,
hash: rune::Hash,
}
impl FnRef {
pub fn new(name: &'static str) -> FnRef {
FnRef {
name,
hash: rune::Hash::type_hash([name]),
}
}
}
pub const SCHEMA_FN: &str = "schema";
pub const PREPARE_FN: &str = "prepare";
pub const ERASE_FN: &str = "erase";
pub const LOAD_FN: &str = "load";
pub const RUN_FN: &str = "run";
#[derive(Clone)]
pub struct Program {
sources: Arc<Sources>,
context: Arc<RuntimeContext>,
unit: Arc<Unit>,
}
impl Program {
pub fn new(source: Source, params: HashMap<String, String>) -> Result<Program, LatteError> {
let mut context_module = Module::default();
context_module.ty::<Context>().unwrap();
context_module
.async_inst_fn("execute", Context::execute)
.unwrap();
context_module
.async_inst_fn("prepare", Context::prepare)
.unwrap();
context_module
.async_inst_fn("execute_prepared", Context::execute_prepared)
.unwrap();
let mut err_module = Module::default();
err_module.ty::<CassError>().unwrap();
err_module
.inst_fn(rune::runtime::Protocol::STRING_DISPLAY, CassError::display)
.unwrap();
let mut uuid_module = Module::default();
uuid_module.ty::<context::Uuid>().unwrap();
uuid_module
.inst_fn(
rune::runtime::Protocol::STRING_DISPLAY,
context::Uuid::display,
)
.unwrap();
let mut latte_module = Module::with_crate("latte");
latte_module.function(&["blob"], context::blob).unwrap();
latte_module.function(&["hash"], context::hash).unwrap();
latte_module.function(&["hash2"], context::hash2).unwrap();
latte_module
.function(&["hash_range"], context::hash_range)
.unwrap();
latte_module
.function(&["hash_select"], context::hash_select)
.unwrap();
latte_module
.function(&["uuid"], context::Uuid::new)
.unwrap();
latte_module.function(&["normal"], context::normal).unwrap();
latte_module
.macro_(&["param"], move |ctx, ts| context::param(ctx, ¶ms, ts))
.unwrap();
latte_module.inst_fn("to_i32", context::int_to_i32).unwrap();
latte_module
.inst_fn("to_i32", context::float_to_i32)
.unwrap();
latte_module.inst_fn("to_i16", context::int_to_i16).unwrap();
latte_module
.inst_fn("to_i16", context::float_to_i16)
.unwrap();
latte_module.inst_fn("to_i8", context::int_to_i8).unwrap();
latte_module.inst_fn("to_i8", context::float_to_i8).unwrap();
latte_module.inst_fn("clamp", context::clamp_float).unwrap();
latte_module.inst_fn("clamp", context::clamp_int).unwrap();
let mut fs_module = Module::with_crate("fs");
fs_module
.function(&["read_lines"], context::read_lines)
.unwrap();
fs_module
.function(
&["read_resource_to_string"],
context::read_resource_to_string,
)
.unwrap();
fs_module
.function(&["read_resource_lines"], context::read_resource_lines)
.unwrap();
let mut context = rune::Context::with_default_modules().unwrap();
context.install(&context_module).unwrap();
context.install(&err_module).unwrap();
context.install(&uuid_module).unwrap();
context.install(&latte_module).unwrap();
context.install(&fs_module).unwrap();
let mut options = rune::Options::default();
options.debug_info(true);
let mut diagnostics = Diagnostics::new();
let mut sources = Self::load_sources(source)?;
let unit = rune::prepare(&mut sources)
.with_context(&context)
.with_diagnostics(&mut diagnostics)
.build();
if !diagnostics.is_empty() {
let mut writer = StandardStream::stderr(ColorChoice::Always);
diagnostics.emit(&mut writer, &sources)?;
}
let unit = unit?;
Ok(Program {
sources: Arc::new(sources),
context: Arc::new(context.runtime()),
unit: Arc::new(unit),
})
}
fn load_sources(source: Source) -> Result<Sources, LatteError> {
let mut sources = Sources::new();
if let Some(path) = source.path() {
if let Some(parent) = path.parent() {
Self::try_insert_lib_source(parent, &mut sources)?
}
}
sources.insert(source);
Ok(sources)
}
fn try_insert_lib_source(parent: &Path, sources: &mut Sources) -> Result<(), LatteError> {
let lib_src = parent.join("lib.rn");
if lib_src.is_file() {
sources.insert(
Source::from_path(&lib_src)
.map_err(|e| LatteError::ScriptRead(lib_src.clone(), e))?,
);
}
Ok(())
}
fn unshare(&self) -> Program {
Program {
sources: self.sources.clone(),
context: Arc::new(self.context.as_ref().clone()),
unit: Arc::new(self.unit.as_ref().clone()),
}
}
fn vm(&self) -> Vm {
Vm::new(self.context.clone(), self.unit.clone())
}
fn convert_error(
&self,
function_name: &'static str,
result: Value,
) -> Result<Value, LatteError> {
match result {
Value::Result(result) => match result.take().unwrap() {
Ok(value) => Ok(value),
Err(Value::Any(e)) => {
if e.borrow_ref().unwrap().type_hash() == CassError::type_hash() {
let e = e.take_downcast::<CassError>().unwrap();
return Err(LatteError::Cassandra(e));
}
let mut msg = String::new();
let mut buf = String::new();
let e = Value::Any(e);
self.vm().with(|| {
if e.string_display(&mut msg, &mut buf).unwrap().is_err() {
msg = format!("{e:?}")
}
});
Err(LatteError::FunctionResult(function_name, msg))
}
Err(other) => Err(LatteError::FunctionResult(
function_name,
format!("{other:?}"),
)),
},
other => Ok(other),
}
}
pub async fn async_call(
&self,
fun: FnRef,
args: impl Args + Send,
) -> Result<Value, LatteError> {
let handle_err = |e: VmError| {
let mut out = StandardStream::stderr(ColorChoice::Auto);
let _ = e.emit(&mut out, &self.sources);
LatteError::ScriptExecError(fun.name, e)
};
let execution = self.vm().send_execute(fun.hash, args).map_err(handle_err)?;
let result = execution.async_complete().await.map_err(handle_err)?;
self.convert_error(fun.name, result)
}
pub fn has_prepare(&self) -> bool {
self.unit.function(FnRef::new(PREPARE_FN).hash).is_some()
}
pub fn has_schema(&self) -> bool {
self.unit.function(FnRef::new(SCHEMA_FN).hash).is_some()
}
pub fn has_erase(&self) -> bool {
self.unit.function(FnRef::new(ERASE_FN).hash).is_some()
}
pub fn has_load(&self) -> bool {
self.unit.function(FnRef::new(LOAD_FN).hash).is_some()
}
pub fn has_run(&self) -> bool {
self.unit.function(FnRef::new(RUN_FN).hash).is_some()
}
pub async fn prepare(&mut self, context: &mut Context) -> Result<(), LatteError> {
let context = ContextRefMut::new(context);
self.async_call(FnRef::new(PREPARE_FN), (context,)).await?;
Ok(())
}
pub async fn schema(&mut self, context: &mut Context) -> Result<(), LatteError> {
let context = ContextRefMut::new(context);
self.async_call(FnRef::new(SCHEMA_FN), (context,)).await?;
Ok(())
}
pub async fn erase(&mut self, context: &mut Context) -> Result<(), LatteError> {
let context = ContextRefMut::new(context);
self.async_call(FnRef::new(ERASE_FN), (context,)).await?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct FnStats {
pub call_count: u64,
pub call_times_ns: Histogram<u64>,
}
impl FnStats {
pub fn operation_completed(&mut self, duration: Duration) {
self.call_count += 1;
self.call_times_ns
.record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64)
.unwrap();
}
}
impl Default for FnStats {
fn default() -> Self {
FnStats {
call_count: 0,
call_times_ns: Histogram::new(3).unwrap(),
}
}
}
pub struct WorkloadStats {
pub start_time: Instant,
pub end_time: Instant,
pub function_stats: FnStats,
pub session_stats: SessionStats,
}
pub struct WorkloadState {
start_time: Instant,
fn_stats: FnStats,
}
impl Default for WorkloadState {
fn default() -> Self {
WorkloadState {
start_time: Instant::now(),
fn_stats: Default::default(),
}
}
}
pub struct Workload {
context: Context,
program: Program,
function: FnRef,
state: TryLock<WorkloadState>,
}
impl Workload {
pub fn new(context: Context, program: Program, function: FnRef) -> Workload {
Workload {
context,
program,
function,
state: TryLock::new(WorkloadState::default()),
}
}
pub fn clone(&self) -> Result<Self, LatteError> {
Ok(Workload {
context: self.context.clone()?,
program: self.program.unshare(),
function: self.function,
state: TryLock::new(WorkloadState::default()),
})
}
pub async fn run(&self, cycle: u64) -> Result<(u64, Instant), LatteError> {
let start_time = Instant::now();
let context = SessionRef::new(&self.context);
let result = self
.program
.async_call(self.function, (context, cycle as i64))
.await
.map(|_| ()); let end_time = Instant::now();
let mut state = self.state.try_lock().unwrap();
state.fn_stats.operation_completed(end_time - start_time);
match result {
Ok(_) => Ok((cycle, end_time)),
Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_, _)))) => {
Ok((cycle, end_time))
}
Err(e) => Err(e),
}
}
pub fn context(&self) -> &Context {
&self.context
}
pub fn reset(&self, start_time: Instant) {
let mut state = self.state.try_lock().unwrap();
state.fn_stats = FnStats::default();
state.start_time = start_time;
self.context.reset_session_stats();
}
pub fn take_stats(&self, end_time: Instant) -> WorkloadStats {
let mut state = self.state.try_lock().unwrap();
let result = WorkloadStats {
start_time: state.start_time,
end_time,
function_stats: state.fn_stats.clone(),
session_stats: self.context().take_session_stats(),
};
state.start_time = end_time;
state.fn_stats = FnStats::default();
result
}
}