1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub left_time_column: String,
19 pub right_time_column: String,
21 pub left_table: String,
23 pub right_table: String,
25 pub time_bound: Duration,
27 pub join_type: StreamJoinType,
29}
30
31#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34 pub stream_key: String,
36 pub lookup_key: String,
38 pub join_type: LookupJoinType,
40 pub cache_ttl: Duration,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47 Inner,
49 Left,
51 Right,
53 Full,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60 Inner,
62 Left,
64}
65
66#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69 pub stream_table: String,
71 pub table_name: String,
73 pub stream_key_column: String,
75 pub table_key_column: String,
77 pub stream_time_column: String,
79 pub table_version_column: String,
81 pub semantics: String,
83 pub join_type: String,
85}
86
87#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90 pub left_table: String,
92 pub right_table: String,
94 pub key_column: String,
96 pub left_time_column: String,
98 pub right_time_column: String,
100 pub direction: AsofSqlDirection,
102 pub tolerance: Option<Duration>,
104 pub join_type: AsofSqlJoinType,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111 Inner,
113 Left,
115}
116
117#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120 StreamStream(StreamJoinConfig),
122 Lookup(LookupJoinConfig),
124 Asof(AsofJoinTranslatorConfig),
126 Temporal(TemporalJoinTranslatorConfig),
128}
129
130impl std::fmt::Display for StreamJoinType {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 StreamJoinType::Inner => write!(f, "INNER"),
134 StreamJoinType::Left => write!(f, "LEFT"),
135 StreamJoinType::Right => write!(f, "RIGHT"),
136 StreamJoinType::Full => write!(f, "FULL"),
137 }
138 }
139}
140
141impl std::fmt::Display for LookupJoinType {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 match self {
144 LookupJoinType::Inner => write!(f, "INNER"),
145 LookupJoinType::Left => write!(f, "LEFT"),
146 }
147 }
148}
149
150impl std::fmt::Display for AsofSqlJoinType {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 match self {
153 AsofSqlJoinType::Inner => write!(f, "INNER"),
154 AsofSqlJoinType::Left => write!(f, "LEFT"),
155 }
156 }
157}
158
159impl std::fmt::Display for StreamJoinConfig {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(
162 f,
163 "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
164 self.join_type,
165 self.left_table,
166 self.left_key,
167 self.right_table,
168 self.right_key,
169 self.time_bound.as_secs(),
170 self.left_time_column,
171 self.right_time_column,
172 )
173 }
174}
175
176impl std::fmt::Display for LookupJoinConfig {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 write!(
179 f,
180 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
181 self.join_type,
182 self.stream_key,
183 self.lookup_key,
184 self.cache_ttl.as_secs()
185 )
186 }
187}
188
189impl std::fmt::Display for AsofJoinTranslatorConfig {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 write!(
192 f,
193 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
194 self.join_type,
195 self.left_table,
196 self.key_column,
197 self.right_table,
198 self.key_column,
199 self.direction,
200 self.left_table,
201 self.left_time_column,
202 self.right_table,
203 self.right_time_column,
204 )?;
205 if let Some(tol) = self.tolerance {
206 write!(f, ", tolerance: {}s", tol.as_secs())?;
207 }
208 write!(f, ")")
209 }
210}
211
212impl std::fmt::Display for TemporalJoinTranslatorConfig {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 write!(
215 f,
216 "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
217 self.join_type.to_uppercase(),
218 self.stream_key_column,
219 self.table_key_column,
220 self.table_version_column,
221 self.semantics,
222 )
223 }
224}
225
226impl std::fmt::Display for JoinOperatorConfig {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 match self {
229 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
230 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
231 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
232 JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
233 }
234 }
235}
236
237impl JoinOperatorConfig {
238 #[must_use]
240 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
241 if analysis.is_temporal_join {
242 let join_type_str = match analysis.join_type {
243 JoinType::Left => "left",
244 _ => "inner",
245 };
246 let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
247 return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
248 stream_table: analysis.left_table.clone(),
249 table_name: analysis.right_table.clone(),
250 stream_key_column: analysis.left_key_column.clone(),
251 table_key_column: analysis.right_key_column.clone(),
252 stream_time_column: version_col.clone(),
253 table_version_column: version_col,
254 semantics: "event_time".to_string(),
255 join_type: join_type_str.to_string(),
256 });
257 }
258
259 if analysis.is_asof_join {
260 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
261 left_table: analysis.left_table.clone(),
262 right_table: analysis.right_table.clone(),
263 key_column: analysis.left_key_column.clone(),
264 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
265 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
266 direction: analysis
267 .asof_direction
268 .unwrap_or(AsofSqlDirection::Backward),
269 tolerance: analysis.asof_tolerance,
270 join_type: if analysis.join_type == JoinType::Inner {
271 AsofSqlJoinType::Inner
272 } else {
273 AsofSqlJoinType::Left
274 },
275 });
276 }
277
278 if analysis.is_lookup_join {
279 JoinOperatorConfig::Lookup(LookupJoinConfig {
280 stream_key: analysis.left_key_column.clone(),
281 lookup_key: analysis.right_key_column.clone(),
282 join_type: match analysis.join_type {
283 JoinType::Inner => LookupJoinType::Inner,
284 _ => LookupJoinType::Left,
285 },
286 cache_ttl: Duration::from_secs(300), })
288 } else {
289 JoinOperatorConfig::StreamStream(StreamJoinConfig {
290 left_key: analysis.left_key_column.clone(),
291 right_key: analysis.right_key_column.clone(),
292 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
293 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
294 left_table: analysis.left_table.clone(),
295 right_table: analysis.right_table.clone(),
296 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
297 join_type: match analysis.join_type {
298 JoinType::Inner
299 | JoinType::LeftSemi
300 | JoinType::LeftAnti
301 | JoinType::RightSemi
302 | JoinType::RightAnti => StreamJoinType::Inner,
303 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
304 JoinType::Right => StreamJoinType::Right,
305 JoinType::Full => StreamJoinType::Full,
306 },
307 })
308 }
309 }
310
311 #[must_use]
313 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
314 multi.joins.iter().map(Self::from_analysis).collect()
315 }
316
317 #[must_use]
319 pub fn is_stream_stream(&self) -> bool {
320 matches!(self, JoinOperatorConfig::StreamStream(_))
321 }
322
323 #[must_use]
325 pub fn is_lookup(&self) -> bool {
326 matches!(self, JoinOperatorConfig::Lookup(_))
327 }
328
329 #[must_use]
331 pub fn is_asof(&self) -> bool {
332 matches!(self, JoinOperatorConfig::Asof(_))
333 }
334
335 #[must_use]
337 pub fn is_temporal(&self) -> bool {
338 matches!(self, JoinOperatorConfig::Temporal(_))
339 }
340
341 #[must_use]
343 pub fn left_key(&self) -> &str {
344 match self {
345 JoinOperatorConfig::StreamStream(config) => &config.left_key,
346 JoinOperatorConfig::Lookup(config) => &config.stream_key,
347 JoinOperatorConfig::Asof(config) => &config.key_column,
348 JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
349 }
350 }
351
352 #[must_use]
354 pub fn right_key(&self) -> &str {
355 match self {
356 JoinOperatorConfig::StreamStream(config) => &config.right_key,
357 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
358 JoinOperatorConfig::Asof(config) => &config.key_column,
359 JoinOperatorConfig::Temporal(config) => &config.table_key_column,
360 }
361 }
362}
363
364impl StreamJoinConfig {
365 #[must_use]
367 pub fn new(
368 left_key: String,
369 right_key: String,
370 time_bound: Duration,
371 join_type: StreamJoinType,
372 ) -> Self {
373 Self {
374 left_key,
375 right_key,
376 left_time_column: String::new(),
377 right_time_column: String::new(),
378 left_table: String::new(),
379 right_table: String::new(),
380 time_bound,
381 join_type,
382 }
383 }
384
385 #[must_use]
387 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
388 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
389 }
390
391 #[must_use]
393 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
394 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
395 }
396}
397
398impl LookupJoinConfig {
399 #[must_use]
401 pub fn new(
402 stream_key: String,
403 lookup_key: String,
404 join_type: LookupJoinType,
405 cache_ttl: Duration,
406 ) -> Self {
407 Self {
408 stream_key,
409 lookup_key,
410 join_type,
411 cache_ttl,
412 }
413 }
414
415 #[must_use]
417 pub fn inner(stream_key: String, lookup_key: String) -> Self {
418 Self::new(
419 stream_key,
420 lookup_key,
421 LookupJoinType::Inner,
422 Duration::from_secs(300),
423 )
424 }
425
426 #[must_use]
428 pub fn left(stream_key: String, lookup_key: String) -> Self {
429 Self::new(
430 stream_key,
431 lookup_key,
432 LookupJoinType::Left,
433 Duration::from_secs(300),
434 )
435 }
436
437 #[must_use]
439 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
440 self.cache_ttl = ttl;
441 self
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 #[test]
450 fn test_stream_join_config() {
451 let config = StreamJoinConfig::inner(
452 "order_id".to_string(),
453 "order_id".to_string(),
454 Duration::from_secs(3600),
455 );
456
457 assert_eq!(config.left_key, "order_id");
458 assert_eq!(config.right_key, "order_id");
459 assert_eq!(config.time_bound, Duration::from_secs(3600));
460 assert_eq!(config.join_type, StreamJoinType::Inner);
461 }
462
463 #[test]
464 fn test_lookup_join_config() {
465 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
466 .with_cache_ttl(Duration::from_secs(600));
467
468 assert_eq!(config.stream_key, "customer_id");
469 assert_eq!(config.lookup_key, "id");
470 assert_eq!(config.cache_ttl, Duration::from_secs(600));
471 assert_eq!(config.join_type, LookupJoinType::Inner);
472 }
473
474 #[test]
475 fn test_from_analysis_lookup() {
476 let analysis = JoinAnalysis::lookup(
477 "orders".to_string(),
478 "customers".to_string(),
479 "customer_id".to_string(),
480 "id".to_string(),
481 JoinType::Inner,
482 );
483
484 let config = JoinOperatorConfig::from_analysis(&analysis);
485
486 assert!(config.is_lookup());
487 assert!(!config.is_stream_stream());
488 assert_eq!(config.left_key(), "customer_id");
489 assert_eq!(config.right_key(), "id");
490 }
491
492 #[test]
493 fn test_from_analysis_stream_stream() {
494 let analysis = JoinAnalysis::stream_stream(
495 "orders".to_string(),
496 "payments".to_string(),
497 "order_id".to_string(),
498 "order_id".to_string(),
499 Duration::from_secs(3600),
500 JoinType::Inner,
501 );
502
503 let config = JoinOperatorConfig::from_analysis(&analysis);
504
505 assert!(config.is_stream_stream());
506 assert!(!config.is_lookup());
507
508 if let JoinOperatorConfig::StreamStream(stream_config) = config {
509 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
510 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
511 }
512 }
513
514 #[test]
515 fn test_from_analysis_asof() {
516 let analysis = JoinAnalysis::asof(
517 "trades".to_string(),
518 "quotes".to_string(),
519 "symbol".to_string(),
520 "symbol".to_string(),
521 AsofSqlDirection::Backward,
522 "ts".to_string(),
523 "ts".to_string(),
524 Some(Duration::from_secs(5)),
525 );
526
527 let config = JoinOperatorConfig::from_analysis(&analysis);
528 assert!(config.is_asof());
529 assert!(!config.is_stream_stream());
530 assert!(!config.is_lookup());
531 }
532
533 #[test]
534 fn test_asof_config_fields() {
535 let analysis = JoinAnalysis::asof(
536 "trades".to_string(),
537 "quotes".to_string(),
538 "symbol".to_string(),
539 "symbol".to_string(),
540 AsofSqlDirection::Forward,
541 "trade_ts".to_string(),
542 "quote_ts".to_string(),
543 Some(Duration::from_millis(5000)),
544 );
545
546 let config = JoinOperatorConfig::from_analysis(&analysis);
547 if let JoinOperatorConfig::Asof(asof) = config {
548 assert_eq!(asof.direction, AsofSqlDirection::Forward);
549 assert_eq!(asof.left_time_column, "trade_ts");
550 assert_eq!(asof.right_time_column, "quote_ts");
551 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
552 assert_eq!(asof.key_column, "symbol");
553 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
554 } else {
555 panic!("Expected Asof config");
556 }
557 }
558
559 #[test]
560 fn test_asof_is_asof() {
561 let analysis = JoinAnalysis::asof(
562 "a".to_string(),
563 "b".to_string(),
564 "id".to_string(),
565 "id".to_string(),
566 AsofSqlDirection::Backward,
567 "ts".to_string(),
568 "ts".to_string(),
569 None,
570 );
571
572 let config = JoinOperatorConfig::from_analysis(&analysis);
573 assert!(config.is_asof());
574 }
575
576 #[test]
577 fn test_asof_key_accessors() {
578 let analysis = JoinAnalysis::asof(
579 "trades".to_string(),
580 "quotes".to_string(),
581 "sym".to_string(),
582 "sym".to_string(),
583 AsofSqlDirection::Backward,
584 "ts".to_string(),
585 "ts".to_string(),
586 None,
587 );
588
589 let config = JoinOperatorConfig::from_analysis(&analysis);
590 assert_eq!(config.left_key(), "sym");
591 assert_eq!(config.right_key(), "sym");
592 }
593
594 #[test]
597 fn test_from_multi_analysis_single() {
598 let analysis = JoinAnalysis::lookup(
599 "a".to_string(),
600 "b".to_string(),
601 "id".to_string(),
602 "id".to_string(),
603 JoinType::Inner,
604 );
605 let multi = MultiJoinAnalysis {
606 joins: vec![analysis],
607 tables: vec!["a".to_string(), "b".to_string()],
608 };
609
610 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
611 assert_eq!(configs.len(), 1);
612 assert!(configs[0].is_lookup());
613 }
614
615 #[test]
616 fn test_from_multi_analysis_two_lookups() {
617 let j1 = JoinAnalysis::lookup(
618 "a".to_string(),
619 "b".to_string(),
620 "id".to_string(),
621 "a_id".to_string(),
622 JoinType::Inner,
623 );
624 let j2 = JoinAnalysis::lookup(
625 "b".to_string(),
626 "c".to_string(),
627 "id".to_string(),
628 "b_id".to_string(),
629 JoinType::Left,
630 );
631 let multi = MultiJoinAnalysis {
632 joins: vec![j1, j2],
633 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
634 };
635
636 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
637 assert_eq!(configs.len(), 2);
638 assert!(configs[0].is_lookup());
639 assert!(configs[1].is_lookup());
640 assert_eq!(configs[0].left_key(), "id");
641 assert_eq!(configs[1].left_key(), "id");
642 }
643
644 #[test]
645 fn test_from_multi_analysis_mixed_asof_lookup() {
646 let j1 = JoinAnalysis::asof(
647 "trades".to_string(),
648 "quotes".to_string(),
649 "symbol".to_string(),
650 "symbol".to_string(),
651 AsofSqlDirection::Backward,
652 "ts".to_string(),
653 "ts".to_string(),
654 None,
655 );
656 let j2 = JoinAnalysis::lookup(
657 "quotes".to_string(),
658 "products".to_string(),
659 "product_id".to_string(),
660 "id".to_string(),
661 JoinType::Inner,
662 );
663 let multi = MultiJoinAnalysis {
664 joins: vec![j1, j2],
665 tables: vec![
666 "trades".to_string(),
667 "quotes".to_string(),
668 "products".to_string(),
669 ],
670 };
671
672 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
673 assert_eq!(configs.len(), 2);
674 assert!(configs[0].is_asof());
675 assert!(configs[1].is_lookup());
676 }
677
678 #[test]
679 fn test_from_multi_analysis_stream_stream_and_lookup() {
680 let j1 = JoinAnalysis::stream_stream(
681 "orders".to_string(),
682 "payments".to_string(),
683 "id".to_string(),
684 "order_id".to_string(),
685 Duration::from_secs(3600),
686 JoinType::Inner,
687 );
688 let j2 = JoinAnalysis::lookup(
689 "payments".to_string(),
690 "customers".to_string(),
691 "cust_id".to_string(),
692 "id".to_string(),
693 JoinType::Left,
694 );
695 let multi = MultiJoinAnalysis {
696 joins: vec![j1, j2],
697 tables: vec![
698 "orders".to_string(),
699 "payments".to_string(),
700 "customers".to_string(),
701 ],
702 };
703
704 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
705 assert_eq!(configs.len(), 2);
706 assert!(configs[0].is_stream_stream());
707 assert!(configs[1].is_lookup());
708 }
709
710 #[test]
711 fn test_from_multi_analysis_order_preserved() {
712 let j1 = JoinAnalysis::lookup(
713 "a".to_string(),
714 "b".to_string(),
715 "k1".to_string(),
716 "k1".to_string(),
717 JoinType::Inner,
718 );
719 let j2 = JoinAnalysis::stream_stream(
720 "b".to_string(),
721 "c".to_string(),
722 "k2".to_string(),
723 "k2".to_string(),
724 Duration::from_secs(60),
725 JoinType::Left,
726 );
727 let j3 = JoinAnalysis::lookup(
728 "c".to_string(),
729 "d".to_string(),
730 "k3".to_string(),
731 "k3".to_string(),
732 JoinType::Inner,
733 );
734 let multi = MultiJoinAnalysis {
735 joins: vec![j1, j2, j3],
736 tables: vec![
737 "a".to_string(),
738 "b".to_string(),
739 "c".to_string(),
740 "d".to_string(),
741 ],
742 };
743
744 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
745 assert_eq!(configs.len(), 3);
746 assert!(configs[0].is_lookup());
747 assert!(configs[1].is_stream_stream());
748 assert!(configs[2].is_lookup());
749 assert_eq!(configs[0].left_key(), "k1");
750 assert_eq!(configs[1].left_key(), "k2");
751 assert_eq!(configs[2].left_key(), "k3");
752 }
753
754 #[test]
755 fn test_join_types() {
756 let left_analysis = JoinAnalysis::stream_stream(
758 "a".to_string(),
759 "b".to_string(),
760 "id".to_string(),
761 "id".to_string(),
762 Duration::from_secs(60),
763 JoinType::Left,
764 );
765
766 if let JoinOperatorConfig::StreamStream(config) =
767 JoinOperatorConfig::from_analysis(&left_analysis)
768 {
769 assert_eq!(config.join_type, StreamJoinType::Left);
770 }
771
772 let right_analysis = JoinAnalysis::stream_stream(
773 "a".to_string(),
774 "b".to_string(),
775 "id".to_string(),
776 "id".to_string(),
777 Duration::from_secs(60),
778 JoinType::Right,
779 );
780
781 if let JoinOperatorConfig::StreamStream(config) =
782 JoinOperatorConfig::from_analysis(&right_analysis)
783 {
784 assert_eq!(config.join_type, StreamJoinType::Right);
785 }
786
787 let full_analysis = JoinAnalysis::stream_stream(
788 "a".to_string(),
789 "b".to_string(),
790 "id".to_string(),
791 "id".to_string(),
792 Duration::from_secs(60),
793 JoinType::Full,
794 );
795
796 if let JoinOperatorConfig::StreamStream(config) =
797 JoinOperatorConfig::from_analysis(&full_analysis)
798 {
799 assert_eq!(config.join_type, StreamJoinType::Full);
800 }
801 }
802
803 #[test]
804 fn test_display_stream_join() {
805 let mut config = StreamJoinConfig::inner(
806 "order_id".to_string(),
807 "order_id".to_string(),
808 Duration::from_secs(3600),
809 );
810 config.left_table = "orders".to_string();
811 config.right_table = "payments".to_string();
812 config.left_time_column = "ts".to_string();
813 config.right_time_column = "ts".to_string();
814 assert_eq!(
815 format!("{config}"),
816 "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
817 );
818 }
819
820 #[test]
821 fn test_display_lookup_join() {
822 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
823 assert_eq!(
824 format!("{config}"),
825 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
826 );
827 }
828
829 #[test]
830 fn test_display_asof_join() {
831 let analysis = JoinAnalysis::asof(
832 "trades".to_string(),
833 "quotes".to_string(),
834 "symbol".to_string(),
835 "symbol".to_string(),
836 AsofSqlDirection::Backward,
837 "ts".to_string(),
838 "ts".to_string(),
839 Some(Duration::from_secs(5)),
840 );
841 let config = JoinOperatorConfig::from_analysis(&analysis);
842 let s = format!("{config}");
843 assert!(s.contains("ASOF JOIN"), "got: {s}");
844 assert!(s.contains("BACKWARD"), "got: {s}");
845 assert!(s.contains("tolerance: 5s"), "got: {s}");
846 }
847
848 #[test]
849 fn test_display_join_types() {
850 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
851 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
852 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
853 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
854 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
855 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
856 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
857 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
858 }
859
860 #[test]
861 fn test_from_analysis_semi_anti_maps_to_inner() {
862 let semi = JoinAnalysis::stream_stream(
866 "a".to_string(),
867 "b".to_string(),
868 "id".to_string(),
869 "id".to_string(),
870 Duration::from_secs(60),
871 JoinType::LeftSemi,
872 );
873 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
874 assert_eq!(config.join_type, StreamJoinType::Inner);
875 } else {
876 panic!("Expected StreamStream config");
877 }
878
879 let anti = JoinAnalysis::stream_stream(
880 "a".to_string(),
881 "b".to_string(),
882 "id".to_string(),
883 "id".to_string(),
884 Duration::from_secs(60),
885 JoinType::RightAnti,
886 );
887 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
888 assert_eq!(config.join_type, StreamJoinType::Inner);
889 } else {
890 panic!("Expected StreamStream config");
891 }
892 }
893
894 #[test]
895 fn test_from_analysis_temporal() {
896 let analysis = JoinAnalysis::temporal(
897 "orders".to_string(),
898 "products".to_string(),
899 "product_id".to_string(),
900 "id".to_string(),
901 "order_time".to_string(),
902 JoinType::Inner,
903 );
904
905 let config = JoinOperatorConfig::from_analysis(&analysis);
906 assert!(config.is_temporal());
907 assert!(!config.is_asof());
908 assert!(!config.is_lookup());
909 assert!(!config.is_stream_stream());
910 assert_eq!(config.left_key(), "product_id");
911 assert_eq!(config.right_key(), "id");
912
913 if let JoinOperatorConfig::Temporal(tc) = config {
914 assert_eq!(tc.table_version_column, "order_time");
915 assert_eq!(tc.semantics, "event_time");
916 assert_eq!(tc.join_type, "inner");
917 } else {
918 panic!("Expected Temporal config");
919 }
920 }
921
922 #[test]
923 fn test_temporal_left_join() {
924 let analysis = JoinAnalysis::temporal(
925 "orders".to_string(),
926 "products".to_string(),
927 "product_id".to_string(),
928 "id".to_string(),
929 "order_time".to_string(),
930 JoinType::Left,
931 );
932
933 let config = JoinOperatorConfig::from_analysis(&analysis);
934 if let JoinOperatorConfig::Temporal(tc) = config {
935 assert_eq!(tc.join_type, "left");
936 } else {
937 panic!("Expected Temporal config");
938 }
939 }
940
941 #[test]
942 fn test_display_temporal_join() {
943 let analysis = JoinAnalysis::temporal(
944 "orders".to_string(),
945 "products".to_string(),
946 "product_id".to_string(),
947 "id".to_string(),
948 "order_time".to_string(),
949 JoinType::Inner,
950 );
951 let config = JoinOperatorConfig::from_analysis(&analysis);
952 let s = format!("{config}");
953 assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
954 assert!(s.contains("order_time"), "got: {s}");
955 }
956}