dsq_functions/builtin/
buffer.rs

1use crate::FunctionRegistration;
2use dsq_shared::error::operation_error;
3use dsq_shared::value::Value;
4use dsq_shared::Result;
5use inventory;
6
7pub fn builtin_buffer(args: &[Value]) -> Result<Value> {
8    if args.is_empty() {
9        return Err(operation_error("buffer() expects at least 1 argument"));
10    }
11
12    let input = &args[0];
13    let batch_size = if args.len() > 1 {
14        match &args[1] {
15            Value::Int(size) if *size > 0 => Some(*size as usize),
16            Value::Int(_) => return Err(operation_error("buffer() batch size must be positive")),
17            _ => return Err(operation_error("buffer() batch size must be an integer")),
18        }
19    } else {
20        None
21    };
22
23    match input {
24        Value::Array(arr) => {
25            if arr.is_empty() {
26                return Ok(Value::Array(vec![]));
27            }
28
29            if let Some(size) = batch_size {
30                // Split into batches of the specified size
31                let mut batches = Vec::new();
32                for chunk in arr.chunks(size) {
33                    batches.push(Value::Array(chunk.to_vec()));
34                }
35                Ok(Value::Array(batches))
36            } else {
37                // No batch size specified, return all items as one batch
38                Ok(Value::Array(vec![Value::Array(arr.clone())]))
39            }
40        }
41        Value::DataFrame(df) => {
42            if df.height() == 0 {
43                return Ok(Value::Array(vec![]));
44            }
45
46            if let Some(size) = batch_size {
47                // Split DataFrame into batches
48                let mut batches = Vec::new();
49                let total_rows = df.height();
50
51                for start in (0..total_rows).step_by(size) {
52                    let end = (start + size).min(total_rows);
53                    let batch_df = df.slice(start as i64, end - start);
54                    batches.push(Value::DataFrame(batch_df));
55                }
56                Ok(Value::Array(batches))
57            } else {
58                // No batch size specified, return entire DataFrame as one batch
59                Ok(Value::Array(vec![Value::DataFrame(df.clone())]))
60            }
61        }
62        Value::Series(series) => {
63            if series.is_empty() {
64                return Ok(Value::Array(vec![]));
65            }
66
67            if let Some(size) = batch_size {
68                // Split Series into batches
69                let mut batches = Vec::new();
70                let total_len = series.len();
71
72                for start in (0..total_len).step_by(size) {
73                    let end = (start + size).min(total_len);
74                    let batch_series = series.slice(start as i64, end - start);
75                    batches.push(Value::Series(batch_series));
76                }
77                Ok(Value::Array(batches))
78            } else {
79                // No batch size specified, return entire Series as one batch
80                Ok(Value::Array(vec![Value::Series(series.clone())]))
81            }
82        }
83        _ => {
84            // For other types, wrap in an array as a single batch
85            if batch_size.is_some() {
86                // If batch size is specified but input is not a collection, return as single-item batches
87                Ok(Value::Array(vec![input.clone()]))
88            } else {
89                // No batch size, return as one batch
90                Ok(Value::Array(vec![input.clone()]))
91            }
92        }
93    }
94}
95
96inventory::submit! {
97    FunctionRegistration {
98        name: "buffer",
99        func: builtin_buffer,
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use dsq_shared::value::Value;
107    use polars::prelude::*;
108
109    #[test]
110    fn test_buffer_array_with_batch_size() {
111        let arr = vec![
112            Value::Int(1),
113            Value::Int(2),
114            Value::Int(3),
115            Value::Int(4),
116            Value::Int(5),
117            Value::Int(6),
118        ];
119        let result = builtin_buffer(&[Value::Array(arr), Value::Int(2)]).unwrap();
120
121        match result {
122            Value::Array(batches) => {
123                assert_eq!(batches.len(), 3);
124                // First batch: [1, 2]
125                if let Value::Array(batch1) = &batches[0] {
126                    assert_eq!(batch1.len(), 2);
127                    assert_eq!(batch1[0], Value::Int(1));
128                    assert_eq!(batch1[1], Value::Int(2));
129                } else {
130                    panic!("Expected array batch");
131                }
132                // Second batch: [3, 4]
133                if let Value::Array(batch2) = &batches[1] {
134                    assert_eq!(batch2.len(), 2);
135                    assert_eq!(batch2[0], Value::Int(3));
136                    assert_eq!(batch2[1], Value::Int(4));
137                } else {
138                    panic!("Expected array batch");
139                }
140                // Third batch: [5, 6]
141                if let Value::Array(batch3) = &batches[2] {
142                    assert_eq!(batch3.len(), 2);
143                    assert_eq!(batch3[0], Value::Int(5));
144                    assert_eq!(batch3[1], Value::Int(6));
145                } else {
146                    panic!("Expected array batch");
147                }
148            }
149            _ => panic!("Expected array of batches"),
150        }
151    }
152
153    #[test]
154    fn test_buffer_array_without_batch_size() {
155        let arr = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
156        let result = builtin_buffer(&[Value::Array(arr)]).unwrap();
157
158        match result {
159            Value::Array(batches) => {
160                assert_eq!(batches.len(), 1);
161                // Single batch containing all items
162                if let Value::Array(batch) = &batches[0] {
163                    assert_eq!(batch.len(), 3);
164                    assert_eq!(batch[0], Value::Int(1));
165                    assert_eq!(batch[1], Value::Int(2));
166                    assert_eq!(batch[2], Value::Int(3));
167                } else {
168                    panic!("Expected array batch");
169                }
170            }
171            _ => panic!("Expected array of batches"),
172        }
173    }
174
175    #[test]
176    fn test_buffer_array_uneven_batches() {
177        let arr = vec![
178            Value::Int(1),
179            Value::Int(2),
180            Value::Int(3),
181            Value::Int(4),
182            Value::Int(5),
183        ];
184        let result = builtin_buffer(&[Value::Array(arr), Value::Int(2)]).unwrap();
185
186        match result {
187            Value::Array(batches) => {
188                assert_eq!(batches.len(), 3);
189                // First batch: [1, 2]
190                if let Value::Array(batch1) = &batches[0] {
191                    assert_eq!(batch1.len(), 2);
192                }
193                // Second batch: [3, 4]
194                if let Value::Array(batch2) = &batches[1] {
195                    assert_eq!(batch2.len(), 2);
196                }
197                // Third batch: [5]
198                if let Value::Array(batch3) = &batches[2] {
199                    assert_eq!(batch3.len(), 1);
200                    assert_eq!(batch3[0], Value::Int(5));
201                }
202            }
203            _ => panic!("Expected array of batches"),
204        }
205    }
206
207    #[test]
208    fn test_buffer_empty_array() {
209        let arr = vec![];
210        let result = builtin_buffer(&[Value::Array(arr)]).unwrap();
211        match result {
212            Value::Array(batches) => {
213                assert_eq!(batches.len(), 0);
214            }
215            _ => panic!("Expected array of batches"),
216        }
217    }
218
219    #[test]
220    fn test_buffer_dataframe_with_batch_size() {
221        let names = Series::new(
222            PlSmallStr::from("name"),
223            &["Alice", "Bob", "Charlie", "David"],
224        )
225        .into();
226        let ages = Series::new(PlSmallStr::from("age"), &[25i64, 30, 35, 28]).into();
227        let df = DataFrame::new(vec![names, ages]).unwrap();
228
229        let result = builtin_buffer(&[Value::DataFrame(df), Value::Int(2)]).unwrap();
230
231        match result {
232            Value::Array(batches) => {
233                assert_eq!(batches.len(), 2);
234                // First batch
235                if let Value::DataFrame(batch1) = &batches[0] {
236                    assert_eq!(batch1.height(), 2);
237                } else {
238                    panic!("Expected DataFrame batch");
239                }
240                // Second batch
241                if let Value::DataFrame(batch2) = &batches[1] {
242                    assert_eq!(batch2.height(), 2);
243                } else {
244                    panic!("Expected DataFrame batch");
245                }
246            }
247            _ => panic!("Expected array of DataFrame batches"),
248        }
249    }
250
251    #[test]
252    fn test_buffer_dataframe_without_batch_size() {
253        let names = Series::new(PlSmallStr::from("name"), &["Alice", "Bob"]).into();
254        let df = DataFrame::new(vec![names]).unwrap();
255
256        let result = builtin_buffer(&[Value::DataFrame(df)]).unwrap();
257
258        match result {
259            Value::Array(batches) => {
260                assert_eq!(batches.len(), 1);
261                if let Value::DataFrame(batch) = &batches[0] {
262                    assert_eq!(batch.height(), 2);
263                } else {
264                    panic!("Expected DataFrame batch");
265                }
266            }
267            _ => panic!("Expected array of DataFrame batches"),
268        }
269    }
270
271    #[test]
272    fn test_buffer_series_with_batch_size() {
273        let series = Series::new(PlSmallStr::from("values"), &[1, 2, 3, 4, 5]);
274        let result = builtin_buffer(&[Value::Series(series), Value::Int(2)]).unwrap();
275
276        match result {
277            Value::Array(batches) => {
278                assert_eq!(batches.len(), 3);
279                // First batch
280                if let Value::Series(batch1) = &batches[0] {
281                    assert_eq!(batch1.len(), 2);
282                }
283                // Second batch
284                if let Value::Series(batch2) = &batches[1] {
285                    assert_eq!(batch2.len(), 2);
286                }
287                // Third batch
288                if let Value::Series(batch3) = &batches[2] {
289                    assert_eq!(batch3.len(), 1);
290                }
291            }
292            _ => panic!("Expected array of Series batches"),
293        }
294    }
295
296    #[test]
297    fn test_buffer_invalid_batch_size() {
298        let arr = vec![Value::Int(1), Value::Int(2)];
299        let result = builtin_buffer(&[Value::Array(arr), Value::Int(-1)]);
300        assert!(result.is_err());
301        assert!(result
302            .unwrap_err()
303            .to_string()
304            .contains("batch size must be positive"));
305    }
306
307    #[test]
308    fn test_buffer_non_integer_batch_size() {
309        let arr = vec![Value::Int(1), Value::Int(2)];
310        let result = builtin_buffer(&[Value::Array(arr), Value::String("2".to_string())]);
311        assert!(result.is_err());
312        assert!(result
313            .unwrap_err()
314            .to_string()
315            .contains("batch size must be an integer"));
316    }
317
318    #[test]
319    fn test_buffer_no_args() {
320        let result = builtin_buffer(&[]);
321        assert!(result.is_err());
322        assert!(result
323            .unwrap_err()
324            .to_string()
325            .contains("expects at least 1 argument"));
326    }
327
328    #[test]
329    fn test_buffer_single_value() {
330        let result = builtin_buffer(&[Value::Int(42)]).unwrap();
331        match result {
332            Value::Array(batches) => {
333                assert_eq!(batches.len(), 1);
334                assert_eq!(batches[0], Value::Int(42));
335            }
336            _ => panic!("Expected array of batches"),
337        }
338    }
339
340    #[test]
341    fn test_buffer_registered_via_inventory() {
342        use crate::BuiltinRegistry;
343        let registry = BuiltinRegistry::new();
344        assert!(registry.functions.contains_key("buffer"));
345    }
346}