Skip to main content

laminar_sql/translator/
temporal_probe.rs

1//! Configuration types for the temporal probe join operator.
2//!
3//! A temporal probe join matches each left event against the right stream at
4//! multiple fixed time offsets. For each left row, it produces N output rows
5//! (one per offset) with the ASOF-matched right value at `event_time + offset`.
6
7use std::fmt;
8
9/// Offset specification: either a uniform range or an explicit list.
10#[derive(Debug, Clone)]
11pub enum ProbeOffsetSpec {
12    /// `RANGE FROM <start> TO <end> STEP <step>` (all in ms).
13    Range {
14        /// Inclusive start offset in milliseconds.
15        start_ms: i64,
16        /// Inclusive end offset in milliseconds.
17        end_ms: i64,
18        /// Step size in milliseconds.
19        step_ms: i64,
20    },
21    /// `LIST (<ms>, ...)`.
22    List(Vec<i64>),
23}
24
25impl ProbeOffsetSpec {
26    /// Expand into a sorted, deduplicated list of offsets in milliseconds.
27    #[must_use]
28    pub fn expand(&self) -> Vec<i64> {
29        let mut offsets = match self {
30            ProbeOffsetSpec::Range {
31                start_ms,
32                end_ms,
33                step_ms,
34            } => {
35                if *step_ms <= 0 || *start_ms > *end_ms {
36                    return vec![*start_ms];
37                }
38                let mut v = Vec::new();
39                let mut cur = *start_ms;
40                loop {
41                    v.push(cur);
42                    match cur.checked_add(*step_ms) {
43                        Some(next) if next <= *end_ms => cur = next,
44                        _ => break,
45                    }
46                }
47                v
48            }
49            ProbeOffsetSpec::List(list) => list.clone(),
50        };
51        offsets.sort_unstable();
52        offsets.dedup();
53        offsets
54    }
55}
56
57/// Configuration for the temporal probe join operator.
58#[derive(Debug, Clone)]
59#[allow(missing_docs)]
60pub struct TemporalProbeConfig {
61    pub left_table: String,
62    pub right_table: String,
63    pub left_alias: Option<String>,
64    pub right_alias: Option<String>,
65    pub key_columns: Vec<String>,
66    pub left_time_column: String,
67    pub right_time_column: String,
68    /// Sorted, deduplicated offsets in milliseconds.
69    pub expanded_offsets_ms: Vec<i64>,
70    /// Probe pseudo-table alias (the `AS p` in the SQL).
71    pub probe_alias: String,
72}
73
74impl TemporalProbeConfig {
75    /// Create a new config, expanding offsets immediately.
76    #[must_use]
77    #[allow(clippy::too_many_arguments)]
78    pub fn new(
79        left_table: String,
80        right_table: String,
81        left_alias: Option<String>,
82        right_alias: Option<String>,
83        key_columns: Vec<String>,
84        left_time_column: String,
85        right_time_column: String,
86        offsets: &ProbeOffsetSpec,
87        probe_alias: String,
88    ) -> Self {
89        let expanded_offsets_ms = offsets.expand();
90        Self {
91            left_table,
92            right_table,
93            left_alias,
94            right_alias,
95            key_columns,
96            left_time_column,
97            right_time_column,
98            expanded_offsets_ms,
99            probe_alias,
100        }
101    }
102
103    /// Maximum offset in milliseconds.
104    #[must_use]
105    pub fn max_offset_ms(&self) -> i64 {
106        self.expanded_offsets_ms.iter().copied().max().unwrap_or(0)
107    }
108
109    /// Minimum offset in milliseconds. Negative means look-back.
110    #[must_use]
111    pub fn min_offset_ms(&self) -> i64 {
112        self.expanded_offsets_ms.iter().copied().min().unwrap_or(0)
113    }
114}
115
116impl fmt::Display for TemporalProbeConfig {
117    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118        write!(
119            f,
120            "TemporalProbe({} × {} on [{}] offsets={})",
121            self.left_table,
122            self.right_table,
123            self.key_columns.join(", "),
124            self.expanded_offsets_ms.len()
125        )
126    }
127}
128
129/// Parse a duration string like `0s`, `5s`, `100ms`, `1m`, `-5s` into milliseconds.
130///
131/// Supported suffixes: `us` (microseconds), `ms` (milliseconds), `s` (seconds), `m` (minutes).
132#[must_use]
133pub fn parse_interval_to_ms(s: &str) -> Option<i64> {
134    let s = s.trim();
135    if s.is_empty() {
136        return None;
137    }
138
139    let (negative, s) = if let Some(rest) = s.strip_prefix('-') {
140        (true, rest)
141    } else {
142        (false, s)
143    };
144
145    let (num_str, unit) = if let Some(n) = s.strip_suffix("us") {
146        (n, "us")
147    } else if let Some(n) = s.strip_suffix("ms") {
148        (n, "ms")
149    } else if let Some(n) = s.strip_suffix('s') {
150        (n, "s")
151    } else if let Some(n) = s.strip_suffix('m') {
152        (n, "m")
153    } else {
154        return s.parse::<i64>().ok().map(|v| if negative { -v } else { v });
155    };
156
157    let num: i64 = num_str.parse().ok()?;
158    let ms = match unit {
159        "us" => num / 1000,
160        "ms" => num,
161        "s" => num.checked_mul(1000)?,
162        "m" => num.checked_mul(60_000)?,
163        _ => return None,
164    };
165
166    Some(if negative { -ms } else { ms })
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn test_parse_interval_seconds() {
175        assert_eq!(parse_interval_to_ms("0s"), Some(0));
176        assert_eq!(parse_interval_to_ms("1s"), Some(1000));
177        assert_eq!(parse_interval_to_ms("60s"), Some(60_000));
178        assert_eq!(parse_interval_to_ms("-5s"), Some(-5000));
179    }
180
181    #[test]
182    fn test_parse_interval_millis() {
183        assert_eq!(parse_interval_to_ms("100ms"), Some(100));
184        assert_eq!(parse_interval_to_ms("0ms"), Some(0));
185    }
186
187    #[test]
188    fn test_parse_interval_minutes() {
189        assert_eq!(parse_interval_to_ms("1m"), Some(60_000));
190        assert_eq!(parse_interval_to_ms("5m"), Some(300_000));
191    }
192
193    #[test]
194    fn test_parse_interval_micros() {
195        assert_eq!(parse_interval_to_ms("5000us"), Some(5));
196        assert_eq!(parse_interval_to_ms("500us"), Some(0));
197    }
198
199    #[test]
200    fn test_range_expand() {
201        let spec = ProbeOffsetSpec::Range {
202            start_ms: 0,
203            end_ms: 5000,
204            step_ms: 1000,
205        };
206        assert_eq!(spec.expand(), vec![0, 1000, 2000, 3000, 4000, 5000]);
207    }
208
209    #[test]
210    fn test_range_expand_with_negative() {
211        let spec = ProbeOffsetSpec::Range {
212            start_ms: -2000,
213            end_ms: 2000,
214            step_ms: 1000,
215        };
216        assert_eq!(spec.expand(), vec![-2000, -1000, 0, 1000, 2000]);
217    }
218
219    #[test]
220    fn test_list_expand_dedup_sort() {
221        let spec = ProbeOffsetSpec::List(vec![5000, 1000, 0, 1000, -5000]);
222        assert_eq!(spec.expand(), vec![-5000, 0, 1000, 5000]);
223    }
224
225    #[test]
226    fn test_config_max_min_offset() {
227        let config = TemporalProbeConfig::new(
228            "trades".into(),
229            "prices".into(),
230            None,
231            None,
232            vec!["symbol".into()],
233            "ts".into(),
234            "ts".into(),
235            &ProbeOffsetSpec::List(vec![-5000, 0, 1000, 5000, 30_000, 60_000]),
236            "p".into(),
237        );
238        assert_eq!(config.max_offset_ms(), 60_000);
239        assert_eq!(config.min_offset_ms(), -5000);
240    }
241
242    #[test]
243    fn test_composite_key_config() {
244        let config = TemporalProbeConfig::new(
245            "trades".into(),
246            "prices".into(),
247            None,
248            None,
249            vec!["symbol".into(), "venue".into()],
250            "ts".into(),
251            "ts".into(),
252            &ProbeOffsetSpec::List(vec![0]),
253            "p".into(),
254        );
255        assert_eq!(config.key_columns, vec!["symbol", "venue"]);
256        assert_eq!(
257            config.to_string(),
258            "TemporalProbe(trades × prices on [symbol, venue] offsets=1)"
259        );
260    }
261}