dsq_core/ops/
pipeline.rs

1use crate::error::Result;
2use crate::ops::aggregate::{group_by, group_by_agg, AggregationFunction};
3use crate::ops::basic::{filter_values, select_columns, sort_by_columns, SortOptions};
4use crate::ops::join::{join, JoinKeys, JoinOptions};
5use crate::Value;
6
7use super::Operation;
8
9/// A pipeline of operations that can be applied sequentially
10///
11/// # Examples
12///
13/// ```rust,ignore
14/// use dsq_core::ops::{OperationPipeline, basic::SortOptions};
15/// use dsq_core::value::Value;
16///
17/// let mut pipeline = OperationPipeline::new();
18/// pipeline
19///     .select(vec!["name".to_string(), "age".to_string()])
20///     .sort(vec![SortOptions::desc("age")])
21///     .head(10);
22///
23/// let result = pipeline.execute(&input_value).unwrap();
24/// ```
25pub struct OperationPipeline {
26    operations: Vec<Box<dyn Operation + Send + Sync>>,
27}
28
29impl OperationPipeline {
30    /// Create a new empty operation pipeline
31    #[must_use]
32    pub fn new() -> Self {
33        Self {
34            operations: Vec::new(),
35        }
36    }
37
38    /// Add a generic operation to the pipeline
39    #[must_use]
40    pub fn add_operation(mut self, op: Box<dyn Operation + Send + Sync>) -> Self {
41        self.operations.push(op);
42        self
43    }
44
45    /// Add a select columns operation
46    #[must_use]
47    pub fn select(self, columns: Vec<String>) -> Self {
48        self.add_operation(Box::new(SelectOperation { columns }))
49    }
50
51    /// Add a filter operation
52    #[must_use]
53    pub fn filter<F>(self, predicate: F) -> Self
54    where
55        F: Fn(&Value) -> Result<bool> + Send + Sync + 'static,
56    {
57        self.add_operation(Box::new(FilterOperation {
58            predicate: Box::new(predicate),
59        }))
60    }
61
62    /// Add a sort operation
63    #[must_use]
64    pub fn sort(self, options: Vec<SortOptions>) -> Self {
65        self.add_operation(Box::new(SortOperation { options }))
66    }
67
68    /// Add a head operation (take first N rows)
69    #[must_use]
70    pub fn head(self, n: usize) -> Self {
71        self.add_operation(Box::new(HeadOperation { n }))
72    }
73
74    /// Add a tail operation (take last N rows)
75    #[must_use]
76    pub fn tail(self, n: usize) -> Self {
77        self.add_operation(Box::new(TailOperation { n }))
78    }
79
80    /// Add a group by operation
81    #[must_use]
82    pub fn group_by(self, columns: Vec<String>) -> Self {
83        self.add_operation(Box::new(GroupByOperation { columns }))
84    }
85
86    /// Add an aggregation operation
87    #[must_use]
88    pub fn aggregate(
89        self,
90        group_columns: Vec<String>,
91        agg_functions: Vec<AggregationFunction>,
92    ) -> Self {
93        self.add_operation(Box::new(AggregateOperation {
94            group_columns,
95            agg_functions,
96        }))
97    }
98
99    /// Add a join operation
100    #[must_use]
101    pub fn join(self, right: Value, keys: JoinKeys, options: JoinOptions) -> Self {
102        self.add_operation(Box::new(JoinOperation {
103            right,
104            keys,
105            options,
106        }))
107    }
108
109    /// Execute the pipeline on a value
110    pub fn execute(&self, mut value: Value) -> Result<Value> {
111        for operation in &self.operations {
112            value = operation.apply(&value)?;
113        }
114        Ok(value)
115    }
116
117    /// Execute the pipeline on a value by reference, avoiding clones where possible
118    /// This is more efficient when the caller doesn't need to keep the original value
119    pub fn execute_mut(&self, value: &mut Value) -> Result<()> {
120        for operation in &self.operations {
121            *value = operation.apply(value)?;
122        }
123        Ok(())
124    }
125
126    /// Get the number of operations in the pipeline
127    #[must_use]
128    pub fn len(&self) -> usize {
129        self.operations.len()
130    }
131
132    /// Check if the pipeline is empty
133    #[must_use]
134    pub fn is_empty(&self) -> bool {
135        self.operations.is_empty()
136    }
137
138    /// Get descriptions of all operations in the pipeline
139    #[must_use]
140    pub fn describe(&self) -> Vec<String> {
141        self.operations.iter().map(|op| op.description()).collect()
142    }
143}
144
145impl Default for OperationPipeline {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151// Concrete operation implementations for the pipeline
152
153struct SelectOperation {
154    columns: Vec<String>,
155}
156
157impl Operation for SelectOperation {
158    fn apply(&self, value: &Value) -> Result<Value> {
159        select_columns(value, &self.columns)
160    }
161
162    fn description(&self) -> String {
163        format!("select columns: {}", self.columns.join(", "))
164    }
165}
166
167#[allow(clippy::type_complexity)]
168pub struct FilterOperation {
169    pub predicate: Box<dyn Fn(&Value) -> Result<bool> + Send + Sync>,
170}
171
172impl std::fmt::Debug for FilterOperation {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        f.debug_struct("FilterOperation").finish()
175    }
176}
177
178impl Operation for FilterOperation {
179    fn apply(&self, value: &Value) -> Result<Value> {
180        filter_values(value, &self.predicate)
181    }
182
183    fn description(&self) -> String {
184        "filter with custom predicate".to_string()
185    }
186}
187
188struct SortOperation {
189    options: Vec<SortOptions>,
190}
191
192impl Operation for SortOperation {
193    fn apply(&self, value: &Value) -> Result<Value> {
194        sort_by_columns(value, &self.options)
195    }
196
197    fn description(&self) -> String {
198        let sort_desc: Vec<String> = self
199            .options
200            .iter()
201            .map(|opt| {
202                format!(
203                    "{} {}",
204                    opt.column,
205                    if opt.descending { "desc" } else { "asc" }
206                )
207            })
208            .collect();
209        format!("sort by: {}", sort_desc.join(", "))
210    }
211}
212
213struct HeadOperation {
214    n: usize,
215}
216
217impl Operation for HeadOperation {
218    fn apply(&self, value: &Value) -> Result<Value> {
219        crate::ops::basic::head(value, self.n)
220    }
221
222    fn description(&self) -> String {
223        format!("head {}", self.n)
224    }
225}
226
227struct TailOperation {
228    n: usize,
229}
230
231impl Operation for TailOperation {
232    fn apply(&self, value: &Value) -> Result<Value> {
233        crate::ops::basic::tail(value, self.n)
234    }
235
236    fn description(&self) -> String {
237        format!("tail {}", self.n)
238    }
239}
240
241struct GroupByOperation {
242    columns: Vec<String>,
243}
244
245impl Operation for GroupByOperation {
246    fn apply(&self, value: &Value) -> Result<Value> {
247        group_by(value, &self.columns)
248    }
249
250    fn description(&self) -> String {
251        format!("group by: {}", self.columns.join(", "))
252    }
253}
254
255struct AggregateOperation {
256    group_columns: Vec<String>,
257    agg_functions: Vec<AggregationFunction>,
258}
259
260impl Operation for AggregateOperation {
261    fn apply(&self, value: &Value) -> Result<Value> {
262        group_by_agg(value, &self.group_columns, &self.agg_functions)
263    }
264
265    fn description(&self) -> String {
266        let agg_desc: Vec<String> = self
267            .agg_functions
268            .iter()
269            .map(super::aggregate::AggregationFunction::output_column_name)
270            .collect();
271        format!(
272            "aggregate by {} with functions: {}",
273            self.group_columns.join(", "),
274            agg_desc.join(", ")
275        )
276    }
277}
278
279struct JoinOperation {
280    right: Value,
281    keys: JoinKeys,
282    options: JoinOptions,
283}
284
285impl Operation for JoinOperation {
286    fn apply(&self, value: &Value) -> Result<Value> {
287        join(value, &self.right, &self.keys, &self.options)
288    }
289
290    fn description(&self) -> String {
291        format!(
292            "{} join on: {}",
293            self.options.join_type.as_str(),
294            self.keys.left_columns().join(", ")
295        )
296    }
297}
298
299/// Apply a series of operations to a value
300///
301/// This is a convenience function that creates a temporary pipeline
302/// and executes it.
303///
304/// # Examples
305///
306/// ```rust,ignore
307/// use dsq_core::ops::{apply_operations, basic::SortOptions};
308/// use dsq_core::value::Value;
309///
310/// let operations = vec![
311///     Box::new(SelectOperation { columns: vec!["name".to_string()] }),
312///     Box::new(SortOperation { options: vec![SortOptions::asc("name")] }),
313/// ];
314///
315/// let result = apply_operations(&input_value, operations).unwrap();
316/// ```
317pub fn apply_operations(
318    value: &Value,
319    operations: Vec<Box<dyn Operation + Send + Sync>>,
320) -> Result<Value> {
321    let mut pipeline = OperationPipeline::new();
322    for op in operations {
323        pipeline = pipeline.add_operation(op);
324    }
325    // Use execute which already clones internally if needed
326    pipeline.execute(value.clone())
327}
328
329/// Apply a series of operations to an owned value (consumes the value)
330///
331/// More efficient than `apply_operations` when you don't need to keep the original value.
332pub fn apply_operations_owned(
333    mut value: Value,
334    operations: Vec<Box<dyn Operation + Send + Sync>>,
335) -> Result<Value> {
336    let mut pipeline = OperationPipeline::new();
337    for op in operations {
338        pipeline = pipeline.add_operation(op);
339    }
340    pipeline.execute_mut(&mut value)?;
341    Ok(value)
342}
343
344/// Apply a series of operations to a value in place
345///
346/// This is more efficient than `apply_operations` when the caller doesn't need
347/// to preserve the original value.
348pub fn apply_operations_mut(
349    value: &mut Value,
350    operations: Vec<Box<dyn Operation + Send + Sync>>,
351) -> Result<()> {
352    let mut pipeline = OperationPipeline::new();
353    for op in operations {
354        pipeline = pipeline.add_operation(op);
355    }
356    pipeline.execute_mut(value)
357}