dsq_functions/builtin/
buffer.rs1use crate::FunctionRegistration;
2use dsq_shared::error::operation_error;
3use dsq_shared::value::Value;
4use dsq_shared::Result;
5use inventory;
6
7pub fn builtin_buffer(args: &[Value]) -> Result<Value> {
8 if args.is_empty() {
9 return Err(operation_error("buffer() expects at least 1 argument"));
10 }
11
12 let input = &args[0];
13 let batch_size = if args.len() > 1 {
14 match &args[1] {
15 Value::Int(size) if *size > 0 => Some(*size as usize),
16 Value::Int(_) => return Err(operation_error("buffer() batch size must be positive")),
17 _ => return Err(operation_error("buffer() batch size must be an integer")),
18 }
19 } else {
20 None
21 };
22
23 match input {
24 Value::Array(arr) => {
25 if arr.is_empty() {
26 return Ok(Value::Array(vec![]));
27 }
28
29 if let Some(size) = batch_size {
30 let mut batches = Vec::new();
32 for chunk in arr.chunks(size) {
33 batches.push(Value::Array(chunk.to_vec()));
34 }
35 Ok(Value::Array(batches))
36 } else {
37 Ok(Value::Array(vec![Value::Array(arr.clone())]))
39 }
40 }
41 Value::DataFrame(df) => {
42 if df.height() == 0 {
43 return Ok(Value::Array(vec![]));
44 }
45
46 if let Some(size) = batch_size {
47 let mut batches = Vec::new();
49 let total_rows = df.height();
50
51 for start in (0..total_rows).step_by(size) {
52 let end = (start + size).min(total_rows);
53 let batch_df = df.slice(start as i64, end - start);
54 batches.push(Value::DataFrame(batch_df));
55 }
56 Ok(Value::Array(batches))
57 } else {
58 Ok(Value::Array(vec![Value::DataFrame(df.clone())]))
60 }
61 }
62 Value::Series(series) => {
63 if series.is_empty() {
64 return Ok(Value::Array(vec![]));
65 }
66
67 if let Some(size) = batch_size {
68 let mut batches = Vec::new();
70 let total_len = series.len();
71
72 for start in (0..total_len).step_by(size) {
73 let end = (start + size).min(total_len);
74 let batch_series = series.slice(start as i64, end - start);
75 batches.push(Value::Series(batch_series));
76 }
77 Ok(Value::Array(batches))
78 } else {
79 Ok(Value::Array(vec![Value::Series(series.clone())]))
81 }
82 }
83 _ => {
84 if batch_size.is_some() {
86 Ok(Value::Array(vec![input.clone()]))
88 } else {
89 Ok(Value::Array(vec![input.clone()]))
91 }
92 }
93 }
94}
95
96inventory::submit! {
97 FunctionRegistration {
98 name: "buffer",
99 func: builtin_buffer,
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use dsq_shared::value::Value;
107 use polars::prelude::*;
108
109 #[test]
110 fn test_buffer_array_with_batch_size() {
111 let arr = vec![
112 Value::Int(1),
113 Value::Int(2),
114 Value::Int(3),
115 Value::Int(4),
116 Value::Int(5),
117 Value::Int(6),
118 ];
119 let result = builtin_buffer(&[Value::Array(arr), Value::Int(2)]).unwrap();
120
121 match result {
122 Value::Array(batches) => {
123 assert_eq!(batches.len(), 3);
124 if let Value::Array(batch1) = &batches[0] {
126 assert_eq!(batch1.len(), 2);
127 assert_eq!(batch1[0], Value::Int(1));
128 assert_eq!(batch1[1], Value::Int(2));
129 } else {
130 panic!("Expected array batch");
131 }
132 if let Value::Array(batch2) = &batches[1] {
134 assert_eq!(batch2.len(), 2);
135 assert_eq!(batch2[0], Value::Int(3));
136 assert_eq!(batch2[1], Value::Int(4));
137 } else {
138 panic!("Expected array batch");
139 }
140 if let Value::Array(batch3) = &batches[2] {
142 assert_eq!(batch3.len(), 2);
143 assert_eq!(batch3[0], Value::Int(5));
144 assert_eq!(batch3[1], Value::Int(6));
145 } else {
146 panic!("Expected array batch");
147 }
148 }
149 _ => panic!("Expected array of batches"),
150 }
151 }
152
153 #[test]
154 fn test_buffer_array_without_batch_size() {
155 let arr = vec![Value::Int(1), Value::Int(2), Value::Int(3)];
156 let result = builtin_buffer(&[Value::Array(arr)]).unwrap();
157
158 match result {
159 Value::Array(batches) => {
160 assert_eq!(batches.len(), 1);
161 if let Value::Array(batch) = &batches[0] {
163 assert_eq!(batch.len(), 3);
164 assert_eq!(batch[0], Value::Int(1));
165 assert_eq!(batch[1], Value::Int(2));
166 assert_eq!(batch[2], Value::Int(3));
167 } else {
168 panic!("Expected array batch");
169 }
170 }
171 _ => panic!("Expected array of batches"),
172 }
173 }
174
175 #[test]
176 fn test_buffer_array_uneven_batches() {
177 let arr = vec![
178 Value::Int(1),
179 Value::Int(2),
180 Value::Int(3),
181 Value::Int(4),
182 Value::Int(5),
183 ];
184 let result = builtin_buffer(&[Value::Array(arr), Value::Int(2)]).unwrap();
185
186 match result {
187 Value::Array(batches) => {
188 assert_eq!(batches.len(), 3);
189 if let Value::Array(batch1) = &batches[0] {
191 assert_eq!(batch1.len(), 2);
192 }
193 if let Value::Array(batch2) = &batches[1] {
195 assert_eq!(batch2.len(), 2);
196 }
197 if let Value::Array(batch3) = &batches[2] {
199 assert_eq!(batch3.len(), 1);
200 assert_eq!(batch3[0], Value::Int(5));
201 }
202 }
203 _ => panic!("Expected array of batches"),
204 }
205 }
206
207 #[test]
208 fn test_buffer_empty_array() {
209 let arr = vec![];
210 let result = builtin_buffer(&[Value::Array(arr)]).unwrap();
211 match result {
212 Value::Array(batches) => {
213 assert_eq!(batches.len(), 0);
214 }
215 _ => panic!("Expected array of batches"),
216 }
217 }
218
219 #[test]
220 fn test_buffer_dataframe_with_batch_size() {
221 let names = Series::new(
222 PlSmallStr::from("name"),
223 &["Alice", "Bob", "Charlie", "David"],
224 )
225 .into();
226 let ages = Series::new(PlSmallStr::from("age"), &[25i64, 30, 35, 28]).into();
227 let df = DataFrame::new(vec![names, ages]).unwrap();
228
229 let result = builtin_buffer(&[Value::DataFrame(df), Value::Int(2)]).unwrap();
230
231 match result {
232 Value::Array(batches) => {
233 assert_eq!(batches.len(), 2);
234 if let Value::DataFrame(batch1) = &batches[0] {
236 assert_eq!(batch1.height(), 2);
237 } else {
238 panic!("Expected DataFrame batch");
239 }
240 if let Value::DataFrame(batch2) = &batches[1] {
242 assert_eq!(batch2.height(), 2);
243 } else {
244 panic!("Expected DataFrame batch");
245 }
246 }
247 _ => panic!("Expected array of DataFrame batches"),
248 }
249 }
250
251 #[test]
252 fn test_buffer_dataframe_without_batch_size() {
253 let names = Series::new(PlSmallStr::from("name"), &["Alice", "Bob"]).into();
254 let df = DataFrame::new(vec![names]).unwrap();
255
256 let result = builtin_buffer(&[Value::DataFrame(df)]).unwrap();
257
258 match result {
259 Value::Array(batches) => {
260 assert_eq!(batches.len(), 1);
261 if let Value::DataFrame(batch) = &batches[0] {
262 assert_eq!(batch.height(), 2);
263 } else {
264 panic!("Expected DataFrame batch");
265 }
266 }
267 _ => panic!("Expected array of DataFrame batches"),
268 }
269 }
270
271 #[test]
272 fn test_buffer_series_with_batch_size() {
273 let series = Series::new(PlSmallStr::from("values"), &[1, 2, 3, 4, 5]);
274 let result = builtin_buffer(&[Value::Series(series), Value::Int(2)]).unwrap();
275
276 match result {
277 Value::Array(batches) => {
278 assert_eq!(batches.len(), 3);
279 if let Value::Series(batch1) = &batches[0] {
281 assert_eq!(batch1.len(), 2);
282 }
283 if let Value::Series(batch2) = &batches[1] {
285 assert_eq!(batch2.len(), 2);
286 }
287 if let Value::Series(batch3) = &batches[2] {
289 assert_eq!(batch3.len(), 1);
290 }
291 }
292 _ => panic!("Expected array of Series batches"),
293 }
294 }
295
296 #[test]
297 fn test_buffer_invalid_batch_size() {
298 let arr = vec![Value::Int(1), Value::Int(2)];
299 let result = builtin_buffer(&[Value::Array(arr), Value::Int(-1)]);
300 assert!(result.is_err());
301 assert!(result
302 .unwrap_err()
303 .to_string()
304 .contains("batch size must be positive"));
305 }
306
307 #[test]
308 fn test_buffer_non_integer_batch_size() {
309 let arr = vec![Value::Int(1), Value::Int(2)];
310 let result = builtin_buffer(&[Value::Array(arr), Value::String("2".to_string())]);
311 assert!(result.is_err());
312 assert!(result
313 .unwrap_err()
314 .to_string()
315 .contains("batch size must be an integer"));
316 }
317
318 #[test]
319 fn test_buffer_no_args() {
320 let result = builtin_buffer(&[]);
321 assert!(result.is_err());
322 assert!(result
323 .unwrap_err()
324 .to_string()
325 .contains("expects at least 1 argument"));
326 }
327
328 #[test]
329 fn test_buffer_single_value() {
330 let result = builtin_buffer(&[Value::Int(42)]).unwrap();
331 match result {
332 Value::Array(batches) => {
333 assert_eq!(batches.len(), 1);
334 assert_eq!(batches[0], Value::Int(42));
335 }
336 _ => panic!("Expected array of batches"),
337 }
338 }
339
340 #[test]
341 fn test_buffer_registered_via_inventory() {
342 use crate::BuiltinRegistry;
343 let registry = BuiltinRegistry::new();
344 assert!(registry.functions.contains_key("buffer"));
345 }
346}