use crate::error::Result;
use crate::ops::aggregate::{group_by, group_by_agg, AggregationFunction};
use crate::ops::basic::{filter_values, select_columns, sort_by_columns, SortOptions};
use crate::ops::join::{join, JoinKeys, JoinOptions};
use crate::Value;
use super::Operation;
pub struct OperationPipeline {
operations: Vec<Box<dyn Operation + Send + Sync>>,
}
impl OperationPipeline {
#[must_use]
pub fn new() -> Self {
Self {
operations: Vec::new(),
}
}
#[must_use]
pub fn add_operation(mut self, op: Box<dyn Operation + Send + Sync>) -> Self {
self.operations.push(op);
self
}
#[must_use]
pub fn select(self, columns: Vec<String>) -> Self {
self.add_operation(Box::new(SelectOperation { columns }))
}
#[must_use]
pub fn filter<F>(self, predicate: F) -> Self
where
F: Fn(&Value) -> Result<bool> + Send + Sync + 'static,
{
self.add_operation(Box::new(FilterOperation {
predicate: Box::new(predicate),
}))
}
#[must_use]
pub fn sort(self, options: Vec<SortOptions>) -> Self {
self.add_operation(Box::new(SortOperation { options }))
}
#[must_use]
pub fn head(self, n: usize) -> Self {
self.add_operation(Box::new(HeadOperation { n }))
}
#[must_use]
pub fn tail(self, n: usize) -> Self {
self.add_operation(Box::new(TailOperation { n }))
}
#[must_use]
pub fn group_by(self, columns: Vec<String>) -> Self {
self.add_operation(Box::new(GroupByOperation { columns }))
}
#[must_use]
pub fn aggregate(
self,
group_columns: Vec<String>,
agg_functions: Vec<AggregationFunction>,
) -> Self {
self.add_operation(Box::new(AggregateOperation {
group_columns,
agg_functions,
}))
}
#[must_use]
pub fn join(self, right: Value, keys: JoinKeys, options: JoinOptions) -> Self {
self.add_operation(Box::new(JoinOperation {
right,
keys,
options,
}))
}
pub fn execute(&self, mut value: Value) -> Result<Value> {
for operation in &self.operations {
value = operation.apply(&value)?;
}
Ok(value)
}
pub fn execute_mut(&self, value: &mut Value) -> Result<()> {
for operation in &self.operations {
*value = operation.apply(value)?;
}
Ok(())
}
#[must_use]
pub fn len(&self) -> usize {
self.operations.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
#[must_use]
pub fn describe(&self) -> Vec<String> {
self.operations.iter().map(|op| op.description()).collect()
}
}
impl Default for OperationPipeline {
fn default() -> Self {
Self::new()
}
}
struct SelectOperation {
columns: Vec<String>,
}
impl Operation for SelectOperation {
fn apply(&self, value: &Value) -> Result<Value> {
select_columns(value, &self.columns)
}
fn description(&self) -> String {
format!("select columns: {}", self.columns.join(", "))
}
}
#[allow(clippy::type_complexity)]
pub struct FilterOperation {
pub predicate: Box<dyn Fn(&Value) -> Result<bool> + Send + Sync>,
}
impl std::fmt::Debug for FilterOperation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FilterOperation").finish()
}
}
impl Operation for FilterOperation {
fn apply(&self, value: &Value) -> Result<Value> {
filter_values(value, &self.predicate)
}
fn description(&self) -> String {
"filter with custom predicate".to_string()
}
}
struct SortOperation {
options: Vec<SortOptions>,
}
impl Operation for SortOperation {
fn apply(&self, value: &Value) -> Result<Value> {
sort_by_columns(value, &self.options)
}
fn description(&self) -> String {
let sort_desc: Vec<String> = self
.options
.iter()
.map(|opt| {
format!(
"{} {}",
opt.column,
if opt.descending { "desc" } else { "asc" }
)
})
.collect();
format!("sort by: {}", sort_desc.join(", "))
}
}
struct HeadOperation {
n: usize,
}
impl Operation for HeadOperation {
fn apply(&self, value: &Value) -> Result<Value> {
crate::ops::basic::head(value, self.n)
}
fn description(&self) -> String {
format!("head {}", self.n)
}
}
struct TailOperation {
n: usize,
}
impl Operation for TailOperation {
fn apply(&self, value: &Value) -> Result<Value> {
crate::ops::basic::tail(value, self.n)
}
fn description(&self) -> String {
format!("tail {}", self.n)
}
}
struct GroupByOperation {
columns: Vec<String>,
}
impl Operation for GroupByOperation {
fn apply(&self, value: &Value) -> Result<Value> {
group_by(value, &self.columns)
}
fn description(&self) -> String {
format!("group by: {}", self.columns.join(", "))
}
}
struct AggregateOperation {
group_columns: Vec<String>,
agg_functions: Vec<AggregationFunction>,
}
impl Operation for AggregateOperation {
fn apply(&self, value: &Value) -> Result<Value> {
group_by_agg(value, &self.group_columns, &self.agg_functions)
}
fn description(&self) -> String {
let agg_desc: Vec<String> = self
.agg_functions
.iter()
.map(super::aggregate::AggregationFunction::output_column_name)
.collect();
format!(
"aggregate by {} with functions: {}",
self.group_columns.join(", "),
agg_desc.join(", ")
)
}
}
struct JoinOperation {
right: Value,
keys: JoinKeys,
options: JoinOptions,
}
impl Operation for JoinOperation {
fn apply(&self, value: &Value) -> Result<Value> {
join(value, &self.right, &self.keys, &self.options)
}
fn description(&self) -> String {
format!(
"{} join on: {}",
self.options.join_type.as_str(),
self.keys.left_columns().join(", ")
)
}
}
pub fn apply_operations(
value: &Value,
operations: Vec<Box<dyn Operation + Send + Sync>>,
) -> Result<Value> {
let mut pipeline = OperationPipeline::new();
for op in operations {
pipeline = pipeline.add_operation(op);
}
pipeline.execute(value.clone())
}
pub fn apply_operations_owned(
mut value: Value,
operations: Vec<Box<dyn Operation + Send + Sync>>,
) -> Result<Value> {
let mut pipeline = OperationPipeline::new();
for op in operations {
pipeline = pipeline.add_operation(op);
}
pipeline.execute_mut(&mut value)?;
Ok(value)
}
pub fn apply_operations_mut(
value: &mut Value,
operations: Vec<Box<dyn Operation + Send + Sync>>,
) -> Result<()> {
let mut pipeline = OperationPipeline::new();
for op in operations {
pipeline = pipeline.add_operation(op);
}
pipeline.execute_mut(value)
}