use crate::{
parser::{redis_value_as_string, redis_value_as_vec},
FalkorDBError, FalkorResult,
};
use regex::Regex;
use std::{
cell::RefCell,
cmp::Ordering,
collections::{HashMap, VecDeque},
ops::Not,
rc::Rc,
};
#[derive(Debug)]
struct IntermediateOperation {
name: String,
args: Option<Vec<String>>,
records_produced: Option<i64>,
execution_time: Option<f64>,
depth: usize,
children: Vec<Rc<RefCell<IntermediateOperation>>>,
}
impl IntermediateOperation {
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Create New Operation", skip_all, level = "trace")
)]
fn new(
depth: usize,
operation_string: &str,
) -> FalkorResult<IntermediateOperation> {
let mut args = operation_string.split('|').collect::<VecDeque<_>>();
let name = args
.pop_front()
.ok_or(FalkorDBError::CorruptExecutionPlan)?
.trim();
let (records_produced, execution_time) = match args.pop_back() {
Some(last_arg) if last_arg.contains("Records produced") => (
Regex::new(r"Records produced: (\d+)")
.map_err(|err| {
FalkorDBError::ParsingError(format!("Error constructing regex: {err}"))
})?
.captures(last_arg.trim())
.and_then(|cap| cap.get(1))
.and_then(|m| m.as_str().parse().ok()),
Regex::new(r"Execution time: (\d+\.\d+) ms")
.map_err(|err| {
FalkorDBError::ParsingError(format!("Error constructing regex: {err}"))
})?
.captures(last_arg.trim())
.and_then(|cap| cap.get(1))
.and_then(|m| m.as_str().parse().ok()),
),
Some(last_arg) => {
args.push_back(last_arg);
(None, None)
}
None => (None, None),
};
Ok(Self {
name: name.to_string(),
args: args
.is_empty()
.not()
.then(|| args.into_iter().map(ToString::to_string).collect()),
records_produced,
execution_time,
depth,
children: vec![],
})
}
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct Operation {
pub name: String,
pub args: Option<Vec<String>>,
pub records_produced: Option<i64>,
pub execution_time: Option<f64>,
pub children: Vec<Rc<Operation>>,
depth: usize,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ExecutionPlan {
string_representation: String,
plan: Vec<String>,
operations: HashMap<String, Vec<Rc<Operation>>>,
operation_tree: Rc<Operation>,
}
impl ExecutionPlan {
pub fn plan(&self) -> &[String] {
self.plan.as_slice()
}
pub fn operations(&self) -> &HashMap<String, Vec<Rc<Operation>>> {
&self.operations
}
pub fn operation_tree(&self) -> &Rc<Operation> {
&self.operation_tree
}
pub fn string_representation(&self) -> &str {
self.string_representation.as_str()
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Create Node", skip_all, level = "debug")
)]
fn create_node(
depth: usize,
operation_string: &str,
traversal_stack: &mut Vec<Rc<RefCell<IntermediateOperation>>>,
) -> FalkorResult<()> {
let new_node = Rc::new(RefCell::new(IntermediateOperation::new(
depth,
operation_string,
)?));
traversal_stack.push(Rc::clone(&new_node));
Ok(())
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Finalize Operation", skip_all, level = "debug")
)]
fn finalize_operation(
current_refcell: Rc<RefCell<IntermediateOperation>>
) -> FalkorResult<Rc<Operation>> {
let current_op = Rc::try_unwrap(current_refcell)
.map_err(|_| FalkorDBError::RefCountBooBoo)?
.into_inner();
let children_count = current_op.children.len();
Ok(Rc::new(Operation {
name: current_op.name,
args: current_op.args,
records_produced: current_op.records_produced,
execution_time: current_op.execution_time,
depth: current_op.depth,
children: current_op.children.into_iter().try_fold(
Vec::with_capacity(children_count),
|mut acc, child| {
acc.push(Self::finalize_operation(child)?);
Result::<_, FalkorDBError>::Ok(acc)
},
)?,
}))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Parse Operation Tree To Map", skip_all, level = "trace")
)]
fn operations_map_from_tree(
current_branch: &Rc<Operation>,
map: &mut HashMap<String, Vec<Rc<Operation>>>,
) {
map.entry(current_branch.name.clone())
.or_default()
.push(Rc::clone(current_branch));
for child in ¤t_branch.children {
Self::operations_map_from_tree(child, map);
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Parse Execution Plan", skip_all, level = "info")
)]
pub(crate) fn parse(value: redis::Value) -> FalkorResult<Self> {
let redis_value_vec = redis_value_as_vec(value)?;
let mut string_representation = Vec::with_capacity(redis_value_vec.len() + 1);
let mut current_traversal_stack = vec![];
for node in redis_value_vec {
let node_string = redis_value_as_string(node)?;
let depth = node_string.matches(" ").count();
let node = node_string.trim();
let current_node = match current_traversal_stack.last().cloned() {
None => {
current_traversal_stack.push(Rc::new(RefCell::new(
IntermediateOperation::new(depth, node)?,
)));
string_representation.push(node_string);
continue;
}
Some(current_node) => current_node,
};
let current_depth = current_node.borrow().depth;
match depth.cmp(¤t_depth) {
Ordering::Less => {
let times_to_pop = (current_depth - depth) + 1;
if times_to_pop > current_traversal_stack.len() {
return Err(FalkorDBError::CorruptExecutionPlan);
}
for _ in 0..times_to_pop {
current_traversal_stack.pop();
}
Self::create_node(depth, node, &mut current_traversal_stack)?;
}
Ordering::Equal => {
current_traversal_stack.pop();
Self::create_node(depth, node, &mut current_traversal_stack)?;
}
Ordering::Greater => {
if depth - current_depth > 1 {
return Err(FalkorDBError::CorruptExecutionPlan);
}
let new_node = Rc::new(RefCell::new(IntermediateOperation::new(depth, node)?));
current_traversal_stack.push(Rc::clone(&new_node));
current_node.borrow_mut().children.push(new_node);
}
}
string_representation.push(node_string);
}
let root_node = current_traversal_stack
.into_iter()
.next()
.ok_or(FalkorDBError::CorruptExecutionPlan)?;
let operation_tree = Self::finalize_operation(root_node)?;
let mut operations = HashMap::new();
Self::operations_map_from_tree(&operation_tree, &mut operations);
Ok(ExecutionPlan {
string_representation: format!("\n{}", string_representation.join("\n")),
plan: string_representation,
operations,
operation_tree,
})
}
}