stoolap/functions/window/
lead_lag.rs1use crate::core::{Result, Value};
18use crate::functions::{
19 FunctionDataType, FunctionInfo, FunctionSignature, FunctionType, WindowFunction,
20};
21
22#[derive(Default)]
27pub struct LeadFunction {
28 offset: usize,
29 default_value: Value,
30}
31
32impl LeadFunction {
33 pub fn new(offset: usize, default_value: Value) -> Self {
35 Self {
36 offset,
37 default_value,
38 }
39 }
40
41 pub fn with_offset(offset: usize) -> Self {
43 Self {
44 offset,
45 default_value: Value::null_unknown(),
46 }
47 }
48}
49
50impl WindowFunction for LeadFunction {
51 fn name(&self) -> &str {
52 "LEAD"
53 }
54
55 fn info(&self) -> FunctionInfo {
56 FunctionInfo::new(
57 "LEAD",
58 FunctionType::Window,
59 "Returns the value from a row that is offset rows after the current row",
60 FunctionSignature::new(
61 FunctionDataType::Any,
62 vec![
63 FunctionDataType::Any, FunctionDataType::Integer, FunctionDataType::Any, ],
67 1,
68 3,
69 ),
70 )
71 }
72
73 fn process(
74 &self,
75 partition: &[Value],
76 _order_by: &[Value],
77 current_row: usize,
78 ) -> Result<Value> {
79 let target_row = current_row + self.offset;
80
81 if target_row < partition.len() {
82 Ok(partition[target_row].clone())
83 } else {
84 Ok(self.default_value.clone())
85 }
86 }
87
88 fn clone_box(&self) -> Box<dyn WindowFunction> {
89 Box::new(LeadFunction::new(self.offset, self.default_value.clone()))
90 }
91}
92
93#[derive(Default)]
98pub struct LagFunction {
99 offset: usize,
100 default_value: Value,
101}
102
103impl LagFunction {
104 pub fn new(offset: usize, default_value: Value) -> Self {
106 Self {
107 offset,
108 default_value,
109 }
110 }
111
112 pub fn with_offset(offset: usize) -> Self {
114 Self {
115 offset,
116 default_value: Value::null_unknown(),
117 }
118 }
119}
120
121impl WindowFunction for LagFunction {
122 fn name(&self) -> &str {
123 "LAG"
124 }
125
126 fn info(&self) -> FunctionInfo {
127 FunctionInfo::new(
128 "LAG",
129 FunctionType::Window,
130 "Returns the value from a row that is offset rows before the current row",
131 FunctionSignature::new(
132 FunctionDataType::Any,
133 vec![
134 FunctionDataType::Any, FunctionDataType::Integer, FunctionDataType::Any, ],
138 1,
139 3,
140 ),
141 )
142 }
143
144 fn process(
145 &self,
146 partition: &[Value],
147 _order_by: &[Value],
148 current_row: usize,
149 ) -> Result<Value> {
150 if current_row >= self.offset {
151 let target_row = current_row - self.offset;
152 if target_row < partition.len() {
153 return Ok(partition[target_row].clone());
154 }
155 }
156
157 Ok(self.default_value.clone())
158 }
159
160 fn clone_box(&self) -> Box<dyn WindowFunction> {
161 Box::new(LagFunction::new(self.offset, self.default_value.clone()))
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 #[test]
170 fn test_lead_basic() {
171 let f = LeadFunction::with_offset(1);
172 let partition = vec![
173 Value::Integer(10),
174 Value::Integer(20),
175 Value::Integer(30),
176 Value::Integer(40),
177 ];
178 let order_by = vec![];
179
180 assert_eq!(
181 f.process(&partition, &order_by, 0).unwrap(),
182 Value::Integer(20)
183 );
184 assert_eq!(
185 f.process(&partition, &order_by, 1).unwrap(),
186 Value::Integer(30)
187 );
188 assert_eq!(
189 f.process(&partition, &order_by, 2).unwrap(),
190 Value::Integer(40)
191 );
192 assert!(f.process(&partition, &order_by, 3).unwrap().is_null());
194 }
195
196 #[test]
197 fn test_lead_offset_2() {
198 let f = LeadFunction::with_offset(2);
199 let partition = vec![
200 Value::Integer(10),
201 Value::Integer(20),
202 Value::Integer(30),
203 Value::Integer(40),
204 ];
205 let order_by = vec![];
206
207 assert_eq!(
208 f.process(&partition, &order_by, 0).unwrap(),
209 Value::Integer(30)
210 );
211 assert_eq!(
212 f.process(&partition, &order_by, 1).unwrap(),
213 Value::Integer(40)
214 );
215 assert!(f.process(&partition, &order_by, 2).unwrap().is_null());
217 assert!(f.process(&partition, &order_by, 3).unwrap().is_null());
218 }
219
220 #[test]
221 fn test_lead_with_default() {
222 let f = LeadFunction::new(1, Value::Integer(-1));
223 let partition = vec![Value::Integer(10), Value::Integer(20)];
224 let order_by = vec![];
225
226 assert_eq!(
227 f.process(&partition, &order_by, 0).unwrap(),
228 Value::Integer(20)
229 );
230 assert_eq!(
232 f.process(&partition, &order_by, 1).unwrap(),
233 Value::Integer(-1)
234 );
235 }
236
237 #[test]
238 fn test_lag_basic() {
239 let f = LagFunction::with_offset(1);
240 let partition = vec![
241 Value::Integer(10),
242 Value::Integer(20),
243 Value::Integer(30),
244 Value::Integer(40),
245 ];
246 let order_by = vec![];
247
248 assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
250 assert_eq!(
251 f.process(&partition, &order_by, 1).unwrap(),
252 Value::Integer(10)
253 );
254 assert_eq!(
255 f.process(&partition, &order_by, 2).unwrap(),
256 Value::Integer(20)
257 );
258 assert_eq!(
259 f.process(&partition, &order_by, 3).unwrap(),
260 Value::Integer(30)
261 );
262 }
263
264 #[test]
265 fn test_lag_offset_2() {
266 let f = LagFunction::with_offset(2);
267 let partition = vec![
268 Value::Integer(10),
269 Value::Integer(20),
270 Value::Integer(30),
271 Value::Integer(40),
272 ];
273 let order_by = vec![];
274
275 assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
277 assert!(f.process(&partition, &order_by, 1).unwrap().is_null());
278 assert_eq!(
279 f.process(&partition, &order_by, 2).unwrap(),
280 Value::Integer(10)
281 );
282 assert_eq!(
283 f.process(&partition, &order_by, 3).unwrap(),
284 Value::Integer(20)
285 );
286 }
287
288 #[test]
289 fn test_lag_with_default() {
290 let f = LagFunction::new(1, Value::Integer(0));
291 let partition = vec![Value::Integer(10), Value::Integer(20)];
292 let order_by = vec![];
293
294 assert_eq!(
296 f.process(&partition, &order_by, 0).unwrap(),
297 Value::Integer(0)
298 );
299 assert_eq!(
300 f.process(&partition, &order_by, 1).unwrap(),
301 Value::Integer(10)
302 );
303 }
304
305 #[test]
306 fn test_lead_empty_partition() {
307 let f = LeadFunction::with_offset(1);
308 let partition = vec![];
309 let order_by = vec![];
310
311 assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
313 }
314
315 #[test]
316 fn test_lag_empty_partition() {
317 let f = LagFunction::with_offset(1);
318 let partition = vec![];
319 let order_by = vec![];
320
321 assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
323 }
324
325 #[test]
326 fn test_lead_strings() {
327 let f = LeadFunction::with_offset(1);
328 let partition = vec![
329 Value::text("apple"),
330 Value::text("banana"),
331 Value::text("cherry"),
332 ];
333 let order_by = vec![];
334
335 assert_eq!(
336 f.process(&partition, &order_by, 0).unwrap(),
337 Value::text("banana")
338 );
339 assert_eq!(
340 f.process(&partition, &order_by, 1).unwrap(),
341 Value::text("cherry")
342 );
343 assert!(f.process(&partition, &order_by, 2).unwrap().is_null());
344 }
345
346 #[test]
347 fn test_lag_strings() {
348 let f = LagFunction::with_offset(1);
349 let partition = vec![
350 Value::text("apple"),
351 Value::text("banana"),
352 Value::text("cherry"),
353 ];
354 let order_by = vec![];
355
356 assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
357 assert_eq!(
358 f.process(&partition, &order_by, 1).unwrap(),
359 Value::text("apple")
360 );
361 assert_eq!(
362 f.process(&partition, &order_by, 2).unwrap(),
363 Value::text("banana")
364 );
365 }
366}