1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub time_bound: Duration,
19 pub join_type: StreamJoinType,
21}
22
23#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26 pub stream_key: String,
28 pub lookup_key: String,
30 pub join_type: LookupJoinType,
32 pub cache_ttl: Duration,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39 Inner,
41 Left,
43 Right,
45 Full,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum LookupJoinType {
52 Inner,
54 Left,
56}
57
58#[derive(Debug, Clone)]
60pub struct AsofJoinTranslatorConfig {
61 pub key_column: String,
63 pub left_time_column: String,
65 pub right_time_column: String,
67 pub direction: AsofSqlDirection,
69 pub tolerance: Option<Duration>,
71 pub join_type: AsofSqlJoinType,
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum AsofSqlJoinType {
78 Inner,
80 Left,
82}
83
84#[derive(Debug, Clone)]
86pub enum JoinOperatorConfig {
87 StreamStream(StreamJoinConfig),
89 Lookup(LookupJoinConfig),
91 Asof(AsofJoinTranslatorConfig),
93}
94
95impl JoinOperatorConfig {
96 #[must_use]
98 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
99 if analysis.is_asof_join {
100 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
101 key_column: analysis.left_key_column.clone(),
102 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
103 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
104 direction: analysis
105 .asof_direction
106 .unwrap_or(AsofSqlDirection::Backward),
107 tolerance: analysis.asof_tolerance,
108 join_type: AsofSqlJoinType::Left, });
110 }
111
112 if analysis.is_lookup_join {
113 JoinOperatorConfig::Lookup(LookupJoinConfig {
114 stream_key: analysis.left_key_column.clone(),
115 lookup_key: analysis.right_key_column.clone(),
116 join_type: match analysis.join_type {
117 JoinType::Inner => LookupJoinType::Inner,
118 _ => LookupJoinType::Left,
119 },
120 cache_ttl: Duration::from_secs(300), })
122 } else {
123 JoinOperatorConfig::StreamStream(StreamJoinConfig {
124 left_key: analysis.left_key_column.clone(),
125 right_key: analysis.right_key_column.clone(),
126 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
127 join_type: match analysis.join_type {
128 JoinType::Inner => StreamJoinType::Inner,
129 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
130 JoinType::Right => StreamJoinType::Right,
131 JoinType::Full => StreamJoinType::Full,
132 },
133 })
134 }
135 }
136
137 #[must_use]
139 pub fn is_stream_stream(&self) -> bool {
140 matches!(self, JoinOperatorConfig::StreamStream(_))
141 }
142
143 #[must_use]
145 pub fn is_lookup(&self) -> bool {
146 matches!(self, JoinOperatorConfig::Lookup(_))
147 }
148
149 #[must_use]
151 pub fn is_asof(&self) -> bool {
152 matches!(self, JoinOperatorConfig::Asof(_))
153 }
154
155 #[must_use]
157 pub fn left_key(&self) -> &str {
158 match self {
159 JoinOperatorConfig::StreamStream(config) => &config.left_key,
160 JoinOperatorConfig::Lookup(config) => &config.stream_key,
161 JoinOperatorConfig::Asof(config) => &config.key_column,
162 }
163 }
164
165 #[must_use]
167 pub fn right_key(&self) -> &str {
168 match self {
169 JoinOperatorConfig::StreamStream(config) => &config.right_key,
170 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
171 JoinOperatorConfig::Asof(config) => &config.key_column,
172 }
173 }
174}
175
176impl StreamJoinConfig {
177 #[must_use]
179 pub fn new(
180 left_key: String,
181 right_key: String,
182 time_bound: Duration,
183 join_type: StreamJoinType,
184 ) -> Self {
185 Self {
186 left_key,
187 right_key,
188 time_bound,
189 join_type,
190 }
191 }
192
193 #[must_use]
195 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
196 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
197 }
198
199 #[must_use]
201 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
202 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
203 }
204}
205
206impl LookupJoinConfig {
207 #[must_use]
209 pub fn new(
210 stream_key: String,
211 lookup_key: String,
212 join_type: LookupJoinType,
213 cache_ttl: Duration,
214 ) -> Self {
215 Self {
216 stream_key,
217 lookup_key,
218 join_type,
219 cache_ttl,
220 }
221 }
222
223 #[must_use]
225 pub fn inner(stream_key: String, lookup_key: String) -> Self {
226 Self::new(
227 stream_key,
228 lookup_key,
229 LookupJoinType::Inner,
230 Duration::from_secs(300),
231 )
232 }
233
234 #[must_use]
236 pub fn left(stream_key: String, lookup_key: String) -> Self {
237 Self::new(
238 stream_key,
239 lookup_key,
240 LookupJoinType::Left,
241 Duration::from_secs(300),
242 )
243 }
244
245 #[must_use]
247 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
248 self.cache_ttl = ttl;
249 self
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256
257 #[test]
258 fn test_stream_join_config() {
259 let config = StreamJoinConfig::inner(
260 "order_id".to_string(),
261 "order_id".to_string(),
262 Duration::from_secs(3600),
263 );
264
265 assert_eq!(config.left_key, "order_id");
266 assert_eq!(config.right_key, "order_id");
267 assert_eq!(config.time_bound, Duration::from_secs(3600));
268 assert_eq!(config.join_type, StreamJoinType::Inner);
269 }
270
271 #[test]
272 fn test_lookup_join_config() {
273 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
274 .with_cache_ttl(Duration::from_secs(600));
275
276 assert_eq!(config.stream_key, "customer_id");
277 assert_eq!(config.lookup_key, "id");
278 assert_eq!(config.cache_ttl, Duration::from_secs(600));
279 assert_eq!(config.join_type, LookupJoinType::Inner);
280 }
281
282 #[test]
283 fn test_from_analysis_lookup() {
284 let analysis = JoinAnalysis::lookup(
285 "orders".to_string(),
286 "customers".to_string(),
287 "customer_id".to_string(),
288 "id".to_string(),
289 JoinType::Inner,
290 );
291
292 let config = JoinOperatorConfig::from_analysis(&analysis);
293
294 assert!(config.is_lookup());
295 assert!(!config.is_stream_stream());
296 assert_eq!(config.left_key(), "customer_id");
297 assert_eq!(config.right_key(), "id");
298 }
299
300 #[test]
301 fn test_from_analysis_stream_stream() {
302 let analysis = JoinAnalysis::stream_stream(
303 "orders".to_string(),
304 "payments".to_string(),
305 "order_id".to_string(),
306 "order_id".to_string(),
307 Duration::from_secs(3600),
308 JoinType::Inner,
309 );
310
311 let config = JoinOperatorConfig::from_analysis(&analysis);
312
313 assert!(config.is_stream_stream());
314 assert!(!config.is_lookup());
315
316 if let JoinOperatorConfig::StreamStream(stream_config) = config {
317 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
318 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
319 }
320 }
321
322 #[test]
323 fn test_from_analysis_asof() {
324 let analysis = JoinAnalysis::asof(
325 "trades".to_string(),
326 "quotes".to_string(),
327 "symbol".to_string(),
328 "symbol".to_string(),
329 AsofSqlDirection::Backward,
330 "ts".to_string(),
331 "ts".to_string(),
332 Some(Duration::from_secs(5)),
333 );
334
335 let config = JoinOperatorConfig::from_analysis(&analysis);
336 assert!(config.is_asof());
337 assert!(!config.is_stream_stream());
338 assert!(!config.is_lookup());
339 }
340
341 #[test]
342 fn test_asof_config_fields() {
343 let analysis = JoinAnalysis::asof(
344 "trades".to_string(),
345 "quotes".to_string(),
346 "symbol".to_string(),
347 "symbol".to_string(),
348 AsofSqlDirection::Forward,
349 "trade_ts".to_string(),
350 "quote_ts".to_string(),
351 Some(Duration::from_millis(5000)),
352 );
353
354 let config = JoinOperatorConfig::from_analysis(&analysis);
355 if let JoinOperatorConfig::Asof(asof) = config {
356 assert_eq!(asof.direction, AsofSqlDirection::Forward);
357 assert_eq!(asof.left_time_column, "trade_ts");
358 assert_eq!(asof.right_time_column, "quote_ts");
359 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
360 assert_eq!(asof.key_column, "symbol");
361 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
362 } else {
363 panic!("Expected Asof config");
364 }
365 }
366
367 #[test]
368 fn test_asof_is_asof() {
369 let analysis = JoinAnalysis::asof(
370 "a".to_string(),
371 "b".to_string(),
372 "id".to_string(),
373 "id".to_string(),
374 AsofSqlDirection::Backward,
375 "ts".to_string(),
376 "ts".to_string(),
377 None,
378 );
379
380 let config = JoinOperatorConfig::from_analysis(&analysis);
381 assert!(config.is_asof());
382 }
383
384 #[test]
385 fn test_asof_key_accessors() {
386 let analysis = JoinAnalysis::asof(
387 "trades".to_string(),
388 "quotes".to_string(),
389 "sym".to_string(),
390 "sym".to_string(),
391 AsofSqlDirection::Backward,
392 "ts".to_string(),
393 "ts".to_string(),
394 None,
395 );
396
397 let config = JoinOperatorConfig::from_analysis(&analysis);
398 assert_eq!(config.left_key(), "sym");
399 assert_eq!(config.right_key(), "sym");
400 }
401
402 #[test]
403 fn test_join_types() {
404 let left_analysis = JoinAnalysis::stream_stream(
406 "a".to_string(),
407 "b".to_string(),
408 "id".to_string(),
409 "id".to_string(),
410 Duration::from_secs(60),
411 JoinType::Left,
412 );
413
414 if let JoinOperatorConfig::StreamStream(config) =
415 JoinOperatorConfig::from_analysis(&left_analysis)
416 {
417 assert_eq!(config.join_type, StreamJoinType::Left);
418 }
419
420 let right_analysis = JoinAnalysis::stream_stream(
421 "a".to_string(),
422 "b".to_string(),
423 "id".to_string(),
424 "id".to_string(),
425 Duration::from_secs(60),
426 JoinType::Right,
427 );
428
429 if let JoinOperatorConfig::StreamStream(config) =
430 JoinOperatorConfig::from_analysis(&right_analysis)
431 {
432 assert_eq!(config.join_type, StreamJoinType::Right);
433 }
434
435 let full_analysis = JoinAnalysis::stream_stream(
436 "a".to_string(),
437 "b".to_string(),
438 "id".to_string(),
439 "id".to_string(),
440 Duration::from_secs(60),
441 JoinType::Full,
442 );
443
444 if let JoinOperatorConfig::StreamStream(config) =
445 JoinOperatorConfig::from_analysis(&full_analysis)
446 {
447 assert_eq!(config.join_type, StreamJoinType::Full);
448 }
449 }
450}