1use crate::error::Result;
2use crate::ops::aggregate::{group_by, group_by_agg, AggregationFunction};
3use crate::ops::basic::{filter_values, select_columns, sort_by_columns, SortOptions};
4use crate::ops::join::{join, JoinKeys, JoinOptions};
5use crate::Value;
6
7use super::Operation;
8
9pub struct OperationPipeline {
26 operations: Vec<Box<dyn Operation + Send + Sync>>,
27}
28
29impl OperationPipeline {
30 #[must_use]
32 pub fn new() -> Self {
33 Self {
34 operations: Vec::new(),
35 }
36 }
37
38 #[must_use]
40 pub fn add_operation(mut self, op: Box<dyn Operation + Send + Sync>) -> Self {
41 self.operations.push(op);
42 self
43 }
44
45 #[must_use]
47 pub fn select(self, columns: Vec<String>) -> Self {
48 self.add_operation(Box::new(SelectOperation { columns }))
49 }
50
51 #[must_use]
53 pub fn filter<F>(self, predicate: F) -> Self
54 where
55 F: Fn(&Value) -> Result<bool> + Send + Sync + 'static,
56 {
57 self.add_operation(Box::new(FilterOperation {
58 predicate: Box::new(predicate),
59 }))
60 }
61
62 #[must_use]
64 pub fn sort(self, options: Vec<SortOptions>) -> Self {
65 self.add_operation(Box::new(SortOperation { options }))
66 }
67
68 #[must_use]
70 pub fn head(self, n: usize) -> Self {
71 self.add_operation(Box::new(HeadOperation { n }))
72 }
73
74 #[must_use]
76 pub fn tail(self, n: usize) -> Self {
77 self.add_operation(Box::new(TailOperation { n }))
78 }
79
80 #[must_use]
82 pub fn group_by(self, columns: Vec<String>) -> Self {
83 self.add_operation(Box::new(GroupByOperation { columns }))
84 }
85
86 #[must_use]
88 pub fn aggregate(
89 self,
90 group_columns: Vec<String>,
91 agg_functions: Vec<AggregationFunction>,
92 ) -> Self {
93 self.add_operation(Box::new(AggregateOperation {
94 group_columns,
95 agg_functions,
96 }))
97 }
98
99 #[must_use]
101 pub fn join(self, right: Value, keys: JoinKeys, options: JoinOptions) -> Self {
102 self.add_operation(Box::new(JoinOperation {
103 right,
104 keys,
105 options,
106 }))
107 }
108
109 pub fn execute(&self, mut value: Value) -> Result<Value> {
111 for operation in &self.operations {
112 value = operation.apply(&value)?;
113 }
114 Ok(value)
115 }
116
117 pub fn execute_mut(&self, value: &mut Value) -> Result<()> {
120 for operation in &self.operations {
121 *value = operation.apply(value)?;
122 }
123 Ok(())
124 }
125
126 #[must_use]
128 pub fn len(&self) -> usize {
129 self.operations.len()
130 }
131
132 #[must_use]
134 pub fn is_empty(&self) -> bool {
135 self.operations.is_empty()
136 }
137
138 #[must_use]
140 pub fn describe(&self) -> Vec<String> {
141 self.operations.iter().map(|op| op.description()).collect()
142 }
143}
144
145impl Default for OperationPipeline {
146 fn default() -> Self {
147 Self::new()
148 }
149}
150
151struct SelectOperation {
154 columns: Vec<String>,
155}
156
157impl Operation for SelectOperation {
158 fn apply(&self, value: &Value) -> Result<Value> {
159 select_columns(value, &self.columns)
160 }
161
162 fn description(&self) -> String {
163 format!("select columns: {}", self.columns.join(", "))
164 }
165}
166
167#[allow(clippy::type_complexity)]
168pub struct FilterOperation {
169 pub predicate: Box<dyn Fn(&Value) -> Result<bool> + Send + Sync>,
170}
171
172impl std::fmt::Debug for FilterOperation {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 f.debug_struct("FilterOperation").finish()
175 }
176}
177
178impl Operation for FilterOperation {
179 fn apply(&self, value: &Value) -> Result<Value> {
180 filter_values(value, &self.predicate)
181 }
182
183 fn description(&self) -> String {
184 "filter with custom predicate".to_string()
185 }
186}
187
188struct SortOperation {
189 options: Vec<SortOptions>,
190}
191
192impl Operation for SortOperation {
193 fn apply(&self, value: &Value) -> Result<Value> {
194 sort_by_columns(value, &self.options)
195 }
196
197 fn description(&self) -> String {
198 let sort_desc: Vec<String> = self
199 .options
200 .iter()
201 .map(|opt| {
202 format!(
203 "{} {}",
204 opt.column,
205 if opt.descending { "desc" } else { "asc" }
206 )
207 })
208 .collect();
209 format!("sort by: {}", sort_desc.join(", "))
210 }
211}
212
213struct HeadOperation {
214 n: usize,
215}
216
217impl Operation for HeadOperation {
218 fn apply(&self, value: &Value) -> Result<Value> {
219 crate::ops::basic::head(value, self.n)
220 }
221
222 fn description(&self) -> String {
223 format!("head {}", self.n)
224 }
225}
226
227struct TailOperation {
228 n: usize,
229}
230
231impl Operation for TailOperation {
232 fn apply(&self, value: &Value) -> Result<Value> {
233 crate::ops::basic::tail(value, self.n)
234 }
235
236 fn description(&self) -> String {
237 format!("tail {}", self.n)
238 }
239}
240
241struct GroupByOperation {
242 columns: Vec<String>,
243}
244
245impl Operation for GroupByOperation {
246 fn apply(&self, value: &Value) -> Result<Value> {
247 group_by(value, &self.columns)
248 }
249
250 fn description(&self) -> String {
251 format!("group by: {}", self.columns.join(", "))
252 }
253}
254
255struct AggregateOperation {
256 group_columns: Vec<String>,
257 agg_functions: Vec<AggregationFunction>,
258}
259
260impl Operation for AggregateOperation {
261 fn apply(&self, value: &Value) -> Result<Value> {
262 group_by_agg(value, &self.group_columns, &self.agg_functions)
263 }
264
265 fn description(&self) -> String {
266 let agg_desc: Vec<String> = self
267 .agg_functions
268 .iter()
269 .map(super::aggregate::AggregationFunction::output_column_name)
270 .collect();
271 format!(
272 "aggregate by {} with functions: {}",
273 self.group_columns.join(", "),
274 agg_desc.join(", ")
275 )
276 }
277}
278
279struct JoinOperation {
280 right: Value,
281 keys: JoinKeys,
282 options: JoinOptions,
283}
284
285impl Operation for JoinOperation {
286 fn apply(&self, value: &Value) -> Result<Value> {
287 join(value, &self.right, &self.keys, &self.options)
288 }
289
290 fn description(&self) -> String {
291 format!(
292 "{} join on: {}",
293 self.options.join_type.as_str(),
294 self.keys.left_columns().join(", ")
295 )
296 }
297}
298
299pub fn apply_operations(
318 value: &Value,
319 operations: Vec<Box<dyn Operation + Send + Sync>>,
320) -> Result<Value> {
321 let mut pipeline = OperationPipeline::new();
322 for op in operations {
323 pipeline = pipeline.add_operation(op);
324 }
325 pipeline.execute(value.clone())
327}
328
329pub fn apply_operations_owned(
333 mut value: Value,
334 operations: Vec<Box<dyn Operation + Send + Sync>>,
335) -> Result<Value> {
336 let mut pipeline = OperationPipeline::new();
337 for op in operations {
338 pipeline = pipeline.add_operation(op);
339 }
340 pipeline.execute_mut(&mut value)?;
341 Ok(value)
342}
343
344pub fn apply_operations_mut(
349 value: &mut Value,
350 operations: Vec<Box<dyn Operation + Send + Sync>>,
351) -> Result<()> {
352 let mut pipeline = OperationPipeline::new();
353 for op in operations {
354 pipeline = pipeline.add_operation(op);
355 }
356 pipeline.execute_mut(value)
357}