use std::sync::Arc;
use std::sync::Weak;
use crate::evaluation::context::SideEffects;
use crate::evaluation::functions::LazyScalarFunction;
use crate::evaluation::functions::ValueAccumulator;
use crate::evaluation::variable_value::VariableValue;
use crate::evaluation::ExpressionEvaluationContext;
use crate::evaluation::ExpressionEvaluator;
use crate::evaluation::{FunctionError, FunctionEvaluationError};
use crate::interface::ResultIndex;
use crate::interface::ResultKey;
use crate::interface::ResultOwner;
use crate::interface::{FutureQueue, PushType};
use async_trait::async_trait;
use chrono::Duration;
use drasi_query_ast::ast;
pub struct SlidingWindow {
future_queue: Arc<dyn FutureQueue>,
result_index: Arc<dyn ResultIndex>,
expression_evaluator: Weak<ExpressionEvaluator>,
}
impl SlidingWindow {
pub fn new(
future_queue: Arc<dyn FutureQueue>,
result_index: Arc<dyn ResultIndex>,
expression_evaluator: Weak<ExpressionEvaluator>,
) -> Self {
Self {
future_queue,
result_index,
expression_evaluator,
}
}
}
#[async_trait]
impl LazyScalarFunction for SlidingWindow {
async fn call(
&self,
context: &ExpressionEvaluationContext,
expression: &ast::FunctionExpression,
args: &Vec<ast::Expression>,
) -> Result<VariableValue, FunctionError> {
let result_owner = ResultOwner::Function(expression.position_in_query);
if args.len() != 2 {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::InvalidArgumentCount,
});
}
let expression_evaluator = match self.expression_evaluator.upgrade() {
Some(evaluator) => evaluator,
None => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::CorruptData,
})
}
};
let window_arg = match expression_evaluator
.evaluate_expression(context, &args[0])
.await
{
Ok(value) => value,
Err(e) => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::EvaluationError(Box::new(e)),
})
}
};
let expression_arg = &args[1];
let window_size = match &window_arg {
VariableValue::Duration(d) => *d.duration(),
VariableValue::Integer(n) => match n.as_i64() {
Some(ms) => Duration::milliseconds(ms),
None => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::OverflowError,
})
}
},
VariableValue::Null => return Ok(VariableValue::Null),
_ => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::InvalidArgument(0),
})
}
};
let input_signature = context.get_input_grouping_hash();
let due_time = context.get_transaction_time() + (window_size.num_milliseconds() as u64);
let expired = context.get_realtime() >= due_time;
if let Some(anchor_element) = context.get_anchor_element() {
let result_key = ResultKey::Element(anchor_element.get_reference().clone());
#[allow(clippy::single_match)]
match context.get_side_effects() {
SideEffects::Apply => {
if expired {
match self
.result_index
.set(
result_key,
result_owner.clone(),
Some(ValueAccumulator::Signature(1)),
)
.await
{
Ok(()) => (),
Err(e) => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::IndexError(e),
})
}
};
let mut new_context = context.clone();
new_context.set_side_effects(SideEffects::Snapshot);
return match expression_evaluator
.evaluate_expression(&new_context, expression_arg)
.await
{
Ok(value) => Ok(value),
Err(e) => Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::EvaluationError(Box::new(e)),
}),
};
}
let anchor_ref = anchor_element.get_reference().clone();
match self
.future_queue
.push(
PushType::Overwrite,
expression.position_in_query,
input_signature,
&anchor_ref,
context.get_transaction_time(),
due_time,
)
.await
{
Ok(_) => (),
Err(e) => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::IndexError(e),
})
}
};
}
SideEffects::RevertForUpdate | SideEffects::RevertForDelete => {
match self.result_index.get(&result_key, &result_owner).await {
Ok(Some(ValueAccumulator::Signature(_))) => {
match self.result_index.set(result_key, result_owner, None).await {
Ok(()) => (),
Err(e) => {
return Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::IndexError(e),
})
}
};
let mut new_context = context.clone();
new_context.set_side_effects(SideEffects::Snapshot);
return match expression_evaluator
.evaluate_expression(&new_context, expression_arg)
.await
{
Ok(value) => Ok(value),
Err(e) => Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::EvaluationError(Box::new(e)),
}),
};
}
_ => {}
}
}
_ => (),
}
}
match expression_evaluator
.evaluate_expression(context, expression_arg)
.await
{
Ok(value) => Ok(value),
Err(e) => Err(FunctionError {
function_name: expression.name.to_string(),
error: FunctionEvaluationError::EvaluationError(Box::new(e)),
}),
}
}
}