1use std::fmt;
8
9#[derive(Debug, Clone)]
11pub enum ProbeOffsetSpec {
12 Range {
14 start_ms: i64,
16 end_ms: i64,
18 step_ms: i64,
20 },
21 List(Vec<i64>),
23}
24
25impl ProbeOffsetSpec {
26 #[must_use]
28 pub fn expand(&self) -> Vec<i64> {
29 let mut offsets = match self {
30 ProbeOffsetSpec::Range {
31 start_ms,
32 end_ms,
33 step_ms,
34 } => {
35 if *step_ms <= 0 || *start_ms > *end_ms {
36 return vec![*start_ms];
37 }
38 let mut v = Vec::new();
39 let mut cur = *start_ms;
40 loop {
41 v.push(cur);
42 match cur.checked_add(*step_ms) {
43 Some(next) if next <= *end_ms => cur = next,
44 _ => break,
45 }
46 }
47 v
48 }
49 ProbeOffsetSpec::List(list) => list.clone(),
50 };
51 offsets.sort_unstable();
52 offsets.dedup();
53 offsets
54 }
55}
56
57#[derive(Debug, Clone)]
59#[allow(missing_docs)]
60pub struct TemporalProbeConfig {
61 pub left_table: String,
62 pub right_table: String,
63 pub left_alias: Option<String>,
64 pub right_alias: Option<String>,
65 pub key_columns: Vec<String>,
66 pub left_time_column: String,
67 pub right_time_column: String,
68 pub expanded_offsets_ms: Vec<i64>,
70 pub probe_alias: String,
72}
73
74impl TemporalProbeConfig {
75 #[must_use]
77 #[allow(clippy::too_many_arguments)]
78 pub fn new(
79 left_table: String,
80 right_table: String,
81 left_alias: Option<String>,
82 right_alias: Option<String>,
83 key_columns: Vec<String>,
84 left_time_column: String,
85 right_time_column: String,
86 offsets: &ProbeOffsetSpec,
87 probe_alias: String,
88 ) -> Self {
89 let expanded_offsets_ms = offsets.expand();
90 Self {
91 left_table,
92 right_table,
93 left_alias,
94 right_alias,
95 key_columns,
96 left_time_column,
97 right_time_column,
98 expanded_offsets_ms,
99 probe_alias,
100 }
101 }
102
103 #[must_use]
105 pub fn max_offset_ms(&self) -> i64 {
106 self.expanded_offsets_ms.iter().copied().max().unwrap_or(0)
107 }
108
109 #[must_use]
111 pub fn min_offset_ms(&self) -> i64 {
112 self.expanded_offsets_ms.iter().copied().min().unwrap_or(0)
113 }
114}
115
116impl fmt::Display for TemporalProbeConfig {
117 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
118 write!(
119 f,
120 "TemporalProbe({} × {} on [{}] offsets={})",
121 self.left_table,
122 self.right_table,
123 self.key_columns.join(", "),
124 self.expanded_offsets_ms.len()
125 )
126 }
127}
128
129#[must_use]
133pub fn parse_interval_to_ms(s: &str) -> Option<i64> {
134 let s = s.trim();
135 if s.is_empty() {
136 return None;
137 }
138
139 let (negative, s) = if let Some(rest) = s.strip_prefix('-') {
140 (true, rest)
141 } else {
142 (false, s)
143 };
144
145 let (num_str, unit) = if let Some(n) = s.strip_suffix("us") {
146 (n, "us")
147 } else if let Some(n) = s.strip_suffix("ms") {
148 (n, "ms")
149 } else if let Some(n) = s.strip_suffix('s') {
150 (n, "s")
151 } else if let Some(n) = s.strip_suffix('m') {
152 (n, "m")
153 } else {
154 return s.parse::<i64>().ok().map(|v| if negative { -v } else { v });
155 };
156
157 let num: i64 = num_str.parse().ok()?;
158 let ms = match unit {
159 "us" => num / 1000,
160 "ms" => num,
161 "s" => num.checked_mul(1000)?,
162 "m" => num.checked_mul(60_000)?,
163 _ => return None,
164 };
165
166 Some(if negative { -ms } else { ms })
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[test]
174 fn test_parse_interval_seconds() {
175 assert_eq!(parse_interval_to_ms("0s"), Some(0));
176 assert_eq!(parse_interval_to_ms("1s"), Some(1000));
177 assert_eq!(parse_interval_to_ms("60s"), Some(60_000));
178 assert_eq!(parse_interval_to_ms("-5s"), Some(-5000));
179 }
180
181 #[test]
182 fn test_parse_interval_millis() {
183 assert_eq!(parse_interval_to_ms("100ms"), Some(100));
184 assert_eq!(parse_interval_to_ms("0ms"), Some(0));
185 }
186
187 #[test]
188 fn test_parse_interval_minutes() {
189 assert_eq!(parse_interval_to_ms("1m"), Some(60_000));
190 assert_eq!(parse_interval_to_ms("5m"), Some(300_000));
191 }
192
193 #[test]
194 fn test_parse_interval_micros() {
195 assert_eq!(parse_interval_to_ms("5000us"), Some(5));
196 assert_eq!(parse_interval_to_ms("500us"), Some(0));
197 }
198
199 #[test]
200 fn test_range_expand() {
201 let spec = ProbeOffsetSpec::Range {
202 start_ms: 0,
203 end_ms: 5000,
204 step_ms: 1000,
205 };
206 assert_eq!(spec.expand(), vec![0, 1000, 2000, 3000, 4000, 5000]);
207 }
208
209 #[test]
210 fn test_range_expand_with_negative() {
211 let spec = ProbeOffsetSpec::Range {
212 start_ms: -2000,
213 end_ms: 2000,
214 step_ms: 1000,
215 };
216 assert_eq!(spec.expand(), vec![-2000, -1000, 0, 1000, 2000]);
217 }
218
219 #[test]
220 fn test_list_expand_dedup_sort() {
221 let spec = ProbeOffsetSpec::List(vec![5000, 1000, 0, 1000, -5000]);
222 assert_eq!(spec.expand(), vec![-5000, 0, 1000, 5000]);
223 }
224
225 #[test]
226 fn test_config_max_min_offset() {
227 let config = TemporalProbeConfig::new(
228 "trades".into(),
229 "prices".into(),
230 None,
231 None,
232 vec!["symbol".into()],
233 "ts".into(),
234 "ts".into(),
235 &ProbeOffsetSpec::List(vec![-5000, 0, 1000, 5000, 30_000, 60_000]),
236 "p".into(),
237 );
238 assert_eq!(config.max_offset_ms(), 60_000);
239 assert_eq!(config.min_offset_ms(), -5000);
240 }
241
242 #[test]
243 fn test_composite_key_config() {
244 let config = TemporalProbeConfig::new(
245 "trades".into(),
246 "prices".into(),
247 None,
248 None,
249 vec!["symbol".into(), "venue".into()],
250 "ts".into(),
251 "ts".into(),
252 &ProbeOffsetSpec::List(vec![0]),
253 "p".into(),
254 );
255 assert_eq!(config.key_columns, vec!["symbol", "venue"]);
256 assert_eq!(
257 config.to_string(),
258 "TemporalProbe(trades × prices on [symbol, venue] offsets=1)"
259 );
260 }
261}