Skip to main content

stoolap/functions/window/
lead_lag.rs

1// Copyright 2025 Stoolap Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! LEAD and LAG window functions
16
17use crate::core::{Result, Value};
18use crate::functions::{
19    FunctionDataType, FunctionInfo, FunctionSignature, FunctionType, WindowFunction,
20};
21
22/// LEAD window function
23///
24/// Returns the value from a row that is `offset` rows after the current row
25/// within the partition. If there is no such row, returns the default value.
26#[derive(Default)]
27pub struct LeadFunction {
28    offset: usize,
29    default_value: Value,
30}
31
32impl LeadFunction {
33    /// Create a new LEAD function with the specified offset and default value
34    pub fn new(offset: usize, default_value: Value) -> Self {
35        Self {
36            offset,
37            default_value,
38        }
39    }
40
41    /// Create a new LEAD function with offset 1 and NULL default
42    pub fn with_offset(offset: usize) -> Self {
43        Self {
44            offset,
45            default_value: Value::null_unknown(),
46        }
47    }
48}
49
50impl WindowFunction for LeadFunction {
51    fn name(&self) -> &str {
52        "LEAD"
53    }
54
55    fn info(&self) -> FunctionInfo {
56        FunctionInfo::new(
57            "LEAD",
58            FunctionType::Window,
59            "Returns the value from a row that is offset rows after the current row",
60            FunctionSignature::new(
61                FunctionDataType::Any,
62                vec![
63                    FunctionDataType::Any,     // column
64                    FunctionDataType::Integer, // offset
65                    FunctionDataType::Any,     // default
66                ],
67                1,
68                3,
69            ),
70        )
71    }
72
73    fn process(
74        &self,
75        partition: &[Value],
76        _order_by: &[Value],
77        current_row: usize,
78    ) -> Result<Value> {
79        let target_row = current_row + self.offset;
80
81        if target_row < partition.len() {
82            Ok(partition[target_row].clone())
83        } else {
84            Ok(self.default_value.clone())
85        }
86    }
87
88    fn clone_box(&self) -> Box<dyn WindowFunction> {
89        Box::new(LeadFunction::new(self.offset, self.default_value.clone()))
90    }
91}
92
93/// LAG window function
94///
95/// Returns the value from a row that is `offset` rows before the current row
96/// within the partition. If there is no such row, returns the default value.
97#[derive(Default)]
98pub struct LagFunction {
99    offset: usize,
100    default_value: Value,
101}
102
103impl LagFunction {
104    /// Create a new LAG function with the specified offset and default value
105    pub fn new(offset: usize, default_value: Value) -> Self {
106        Self {
107            offset,
108            default_value,
109        }
110    }
111
112    /// Create a new LAG function with offset 1 and NULL default
113    pub fn with_offset(offset: usize) -> Self {
114        Self {
115            offset,
116            default_value: Value::null_unknown(),
117        }
118    }
119}
120
121impl WindowFunction for LagFunction {
122    fn name(&self) -> &str {
123        "LAG"
124    }
125
126    fn info(&self) -> FunctionInfo {
127        FunctionInfo::new(
128            "LAG",
129            FunctionType::Window,
130            "Returns the value from a row that is offset rows before the current row",
131            FunctionSignature::new(
132                FunctionDataType::Any,
133                vec![
134                    FunctionDataType::Any,     // column
135                    FunctionDataType::Integer, // offset
136                    FunctionDataType::Any,     // default
137                ],
138                1,
139                3,
140            ),
141        )
142    }
143
144    fn process(
145        &self,
146        partition: &[Value],
147        _order_by: &[Value],
148        current_row: usize,
149    ) -> Result<Value> {
150        if current_row >= self.offset {
151            let target_row = current_row - self.offset;
152            if target_row < partition.len() {
153                return Ok(partition[target_row].clone());
154            }
155        }
156
157        Ok(self.default_value.clone())
158    }
159
160    fn clone_box(&self) -> Box<dyn WindowFunction> {
161        Box::new(LagFunction::new(self.offset, self.default_value.clone()))
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    #[test]
170    fn test_lead_basic() {
171        let f = LeadFunction::with_offset(1);
172        let partition = vec![
173            Value::Integer(10),
174            Value::Integer(20),
175            Value::Integer(30),
176            Value::Integer(40),
177        ];
178        let order_by = vec![];
179
180        assert_eq!(
181            f.process(&partition, &order_by, 0).unwrap(),
182            Value::Integer(20)
183        );
184        assert_eq!(
185            f.process(&partition, &order_by, 1).unwrap(),
186            Value::Integer(30)
187        );
188        assert_eq!(
189            f.process(&partition, &order_by, 2).unwrap(),
190            Value::Integer(40)
191        );
192        // Last row has no next row, returns NULL
193        assert!(f.process(&partition, &order_by, 3).unwrap().is_null());
194    }
195
196    #[test]
197    fn test_lead_offset_2() {
198        let f = LeadFunction::with_offset(2);
199        let partition = vec![
200            Value::Integer(10),
201            Value::Integer(20),
202            Value::Integer(30),
203            Value::Integer(40),
204        ];
205        let order_by = vec![];
206
207        assert_eq!(
208            f.process(&partition, &order_by, 0).unwrap(),
209            Value::Integer(30)
210        );
211        assert_eq!(
212            f.process(&partition, &order_by, 1).unwrap(),
213            Value::Integer(40)
214        );
215        // No value 2 rows ahead
216        assert!(f.process(&partition, &order_by, 2).unwrap().is_null());
217        assert!(f.process(&partition, &order_by, 3).unwrap().is_null());
218    }
219
220    #[test]
221    fn test_lead_with_default() {
222        let f = LeadFunction::new(1, Value::Integer(-1));
223        let partition = vec![Value::Integer(10), Value::Integer(20)];
224        let order_by = vec![];
225
226        assert_eq!(
227            f.process(&partition, &order_by, 0).unwrap(),
228            Value::Integer(20)
229        );
230        // Last row returns default value
231        assert_eq!(
232            f.process(&partition, &order_by, 1).unwrap(),
233            Value::Integer(-1)
234        );
235    }
236
237    #[test]
238    fn test_lag_basic() {
239        let f = LagFunction::with_offset(1);
240        let partition = vec![
241            Value::Integer(10),
242            Value::Integer(20),
243            Value::Integer(30),
244            Value::Integer(40),
245        ];
246        let order_by = vec![];
247
248        // First row has no previous row, returns NULL
249        assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
250        assert_eq!(
251            f.process(&partition, &order_by, 1).unwrap(),
252            Value::Integer(10)
253        );
254        assert_eq!(
255            f.process(&partition, &order_by, 2).unwrap(),
256            Value::Integer(20)
257        );
258        assert_eq!(
259            f.process(&partition, &order_by, 3).unwrap(),
260            Value::Integer(30)
261        );
262    }
263
264    #[test]
265    fn test_lag_offset_2() {
266        let f = LagFunction::with_offset(2);
267        let partition = vec![
268            Value::Integer(10),
269            Value::Integer(20),
270            Value::Integer(30),
271            Value::Integer(40),
272        ];
273        let order_by = vec![];
274
275        // No value 2 rows behind
276        assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
277        assert!(f.process(&partition, &order_by, 1).unwrap().is_null());
278        assert_eq!(
279            f.process(&partition, &order_by, 2).unwrap(),
280            Value::Integer(10)
281        );
282        assert_eq!(
283            f.process(&partition, &order_by, 3).unwrap(),
284            Value::Integer(20)
285        );
286    }
287
288    #[test]
289    fn test_lag_with_default() {
290        let f = LagFunction::new(1, Value::Integer(0));
291        let partition = vec![Value::Integer(10), Value::Integer(20)];
292        let order_by = vec![];
293
294        // First row returns default value
295        assert_eq!(
296            f.process(&partition, &order_by, 0).unwrap(),
297            Value::Integer(0)
298        );
299        assert_eq!(
300            f.process(&partition, &order_by, 1).unwrap(),
301            Value::Integer(10)
302        );
303    }
304
305    #[test]
306    fn test_lead_empty_partition() {
307        let f = LeadFunction::with_offset(1);
308        let partition = vec![];
309        let order_by = vec![];
310
311        // Returns NULL for empty partition
312        assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
313    }
314
315    #[test]
316    fn test_lag_empty_partition() {
317        let f = LagFunction::with_offset(1);
318        let partition = vec![];
319        let order_by = vec![];
320
321        // Returns NULL for empty partition
322        assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
323    }
324
325    #[test]
326    fn test_lead_strings() {
327        let f = LeadFunction::with_offset(1);
328        let partition = vec![
329            Value::text("apple"),
330            Value::text("banana"),
331            Value::text("cherry"),
332        ];
333        let order_by = vec![];
334
335        assert_eq!(
336            f.process(&partition, &order_by, 0).unwrap(),
337            Value::text("banana")
338        );
339        assert_eq!(
340            f.process(&partition, &order_by, 1).unwrap(),
341            Value::text("cherry")
342        );
343        assert!(f.process(&partition, &order_by, 2).unwrap().is_null());
344    }
345
346    #[test]
347    fn test_lag_strings() {
348        let f = LagFunction::with_offset(1);
349        let partition = vec![
350            Value::text("apple"),
351            Value::text("banana"),
352            Value::text("cherry"),
353        ];
354        let order_by = vec![];
355
356        assert!(f.process(&partition, &order_by, 0).unwrap().is_null());
357        assert_eq!(
358            f.process(&partition, &order_by, 1).unwrap(),
359            Value::text("apple")
360        );
361        assert_eq!(
362            f.process(&partition, &order_by, 2).unwrap(),
363            Value::text("banana")
364        );
365    }
366}