1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub time_bound: Duration,
19 pub join_type: StreamJoinType,
21}
22
23#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26 pub stream_key: String,
28 pub lookup_key: String,
30 pub join_type: LookupJoinType,
32 pub cache_ttl: Duration,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39 Inner,
41 Left,
43 Right,
45 Full,
47 LeftSemi,
49 LeftAnti,
51 RightSemi,
53 RightAnti,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60 Inner,
62 Left,
64}
65
66#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69 pub stream_key_column: String,
71 pub table_key_column: String,
73 pub table_version_column: String,
75 pub semantics: String,
77 pub join_type: String,
79}
80
81#[derive(Debug, Clone)]
83pub struct AsofJoinTranslatorConfig {
84 pub left_table: String,
86 pub right_table: String,
88 pub key_column: String,
90 pub left_time_column: String,
92 pub right_time_column: String,
94 pub direction: AsofSqlDirection,
96 pub tolerance: Option<Duration>,
98 pub join_type: AsofSqlJoinType,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum AsofSqlJoinType {
105 Inner,
107 Left,
109}
110
111#[derive(Debug, Clone)]
113pub enum JoinOperatorConfig {
114 StreamStream(StreamJoinConfig),
116 Lookup(LookupJoinConfig),
118 Asof(AsofJoinTranslatorConfig),
120 Temporal(TemporalJoinTranslatorConfig),
122}
123
124impl std::fmt::Display for StreamJoinType {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 match self {
127 StreamJoinType::Inner => write!(f, "INNER"),
128 StreamJoinType::Left => write!(f, "LEFT"),
129 StreamJoinType::Right => write!(f, "RIGHT"),
130 StreamJoinType::Full => write!(f, "FULL"),
131 StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
132 StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
133 StreamJoinType::RightSemi => write!(f, "RIGHT SEMI"),
134 StreamJoinType::RightAnti => write!(f, "RIGHT ANTI"),
135 }
136 }
137}
138
139impl std::fmt::Display for LookupJoinType {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 match self {
142 LookupJoinType::Inner => write!(f, "INNER"),
143 LookupJoinType::Left => write!(f, "LEFT"),
144 }
145 }
146}
147
148impl std::fmt::Display for AsofSqlJoinType {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 AsofSqlJoinType::Inner => write!(f, "INNER"),
152 AsofSqlJoinType::Left => write!(f, "LEFT"),
153 }
154 }
155}
156
157impl std::fmt::Display for StreamJoinConfig {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 write!(
160 f,
161 "{} JOIN ON left.{} = right.{} (bound: {}s)",
162 self.join_type,
163 self.left_key,
164 self.right_key,
165 self.time_bound.as_secs()
166 )
167 }
168}
169
170impl std::fmt::Display for LookupJoinConfig {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 write!(
173 f,
174 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
175 self.join_type,
176 self.stream_key,
177 self.lookup_key,
178 self.cache_ttl.as_secs()
179 )
180 }
181}
182
183impl std::fmt::Display for AsofJoinTranslatorConfig {
184 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 write!(
186 f,
187 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
188 self.join_type,
189 self.left_table,
190 self.key_column,
191 self.right_table,
192 self.key_column,
193 self.direction,
194 self.left_table,
195 self.left_time_column,
196 self.right_table,
197 self.right_time_column,
198 )?;
199 if let Some(tol) = self.tolerance {
200 write!(f, ", tolerance: {}s", tol.as_secs())?;
201 }
202 write!(f, ")")
203 }
204}
205
206impl std::fmt::Display for TemporalJoinTranslatorConfig {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 write!(
209 f,
210 "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
211 self.join_type.to_uppercase(),
212 self.stream_key_column,
213 self.table_key_column,
214 self.table_version_column,
215 self.semantics,
216 )
217 }
218}
219
220impl std::fmt::Display for JoinOperatorConfig {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 match self {
223 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
224 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
225 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
226 JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
227 }
228 }
229}
230
231impl JoinOperatorConfig {
232 #[must_use]
234 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
235 if analysis.is_temporal_join {
236 let join_type_str = match analysis.join_type {
237 JoinType::Left => "left",
238 _ => "inner",
239 };
240 return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
241 stream_key_column: analysis.left_key_column.clone(),
242 table_key_column: analysis.right_key_column.clone(),
243 table_version_column: analysis.temporal_version_column.clone().unwrap_or_default(),
244 semantics: "event_time".to_string(),
245 join_type: join_type_str.to_string(),
246 });
247 }
248
249 if analysis.is_asof_join {
250 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
251 left_table: analysis.left_table.clone(),
252 right_table: analysis.right_table.clone(),
253 key_column: analysis.left_key_column.clone(),
254 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
255 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
256 direction: analysis
257 .asof_direction
258 .unwrap_or(AsofSqlDirection::Backward),
259 tolerance: analysis.asof_tolerance,
260 join_type: AsofSqlJoinType::Left, });
262 }
263
264 if analysis.is_lookup_join {
265 JoinOperatorConfig::Lookup(LookupJoinConfig {
266 stream_key: analysis.left_key_column.clone(),
267 lookup_key: analysis.right_key_column.clone(),
268 join_type: match analysis.join_type {
269 JoinType::Inner => LookupJoinType::Inner,
270 _ => LookupJoinType::Left,
271 },
272 cache_ttl: Duration::from_secs(300), })
274 } else {
275 JoinOperatorConfig::StreamStream(StreamJoinConfig {
276 left_key: analysis.left_key_column.clone(),
277 right_key: analysis.right_key_column.clone(),
278 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
279 join_type: match analysis.join_type {
280 JoinType::Inner => StreamJoinType::Inner,
281 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
282 JoinType::Right => StreamJoinType::Right,
283 JoinType::Full => StreamJoinType::Full,
284 JoinType::LeftSemi => StreamJoinType::LeftSemi,
285 JoinType::LeftAnti => StreamJoinType::LeftAnti,
286 JoinType::RightSemi => StreamJoinType::RightSemi,
287 JoinType::RightAnti => StreamJoinType::RightAnti,
288 },
289 })
290 }
291 }
292
293 #[must_use]
295 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
296 multi.joins.iter().map(Self::from_analysis).collect()
297 }
298
299 #[must_use]
301 pub fn is_stream_stream(&self) -> bool {
302 matches!(self, JoinOperatorConfig::StreamStream(_))
303 }
304
305 #[must_use]
307 pub fn is_lookup(&self) -> bool {
308 matches!(self, JoinOperatorConfig::Lookup(_))
309 }
310
311 #[must_use]
313 pub fn is_asof(&self) -> bool {
314 matches!(self, JoinOperatorConfig::Asof(_))
315 }
316
317 #[must_use]
319 pub fn is_temporal(&self) -> bool {
320 matches!(self, JoinOperatorConfig::Temporal(_))
321 }
322
323 #[must_use]
325 pub fn left_key(&self) -> &str {
326 match self {
327 JoinOperatorConfig::StreamStream(config) => &config.left_key,
328 JoinOperatorConfig::Lookup(config) => &config.stream_key,
329 JoinOperatorConfig::Asof(config) => &config.key_column,
330 JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
331 }
332 }
333
334 #[must_use]
336 pub fn right_key(&self) -> &str {
337 match self {
338 JoinOperatorConfig::StreamStream(config) => &config.right_key,
339 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
340 JoinOperatorConfig::Asof(config) => &config.key_column,
341 JoinOperatorConfig::Temporal(config) => &config.table_key_column,
342 }
343 }
344}
345
346impl StreamJoinConfig {
347 #[must_use]
349 pub fn new(
350 left_key: String,
351 right_key: String,
352 time_bound: Duration,
353 join_type: StreamJoinType,
354 ) -> Self {
355 Self {
356 left_key,
357 right_key,
358 time_bound,
359 join_type,
360 }
361 }
362
363 #[must_use]
365 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
366 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
367 }
368
369 #[must_use]
371 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
372 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
373 }
374}
375
376impl LookupJoinConfig {
377 #[must_use]
379 pub fn new(
380 stream_key: String,
381 lookup_key: String,
382 join_type: LookupJoinType,
383 cache_ttl: Duration,
384 ) -> Self {
385 Self {
386 stream_key,
387 lookup_key,
388 join_type,
389 cache_ttl,
390 }
391 }
392
393 #[must_use]
395 pub fn inner(stream_key: String, lookup_key: String) -> Self {
396 Self::new(
397 stream_key,
398 lookup_key,
399 LookupJoinType::Inner,
400 Duration::from_secs(300),
401 )
402 }
403
404 #[must_use]
406 pub fn left(stream_key: String, lookup_key: String) -> Self {
407 Self::new(
408 stream_key,
409 lookup_key,
410 LookupJoinType::Left,
411 Duration::from_secs(300),
412 )
413 }
414
415 #[must_use]
417 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
418 self.cache_ttl = ttl;
419 self
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 #[test]
428 fn test_stream_join_config() {
429 let config = StreamJoinConfig::inner(
430 "order_id".to_string(),
431 "order_id".to_string(),
432 Duration::from_secs(3600),
433 );
434
435 assert_eq!(config.left_key, "order_id");
436 assert_eq!(config.right_key, "order_id");
437 assert_eq!(config.time_bound, Duration::from_secs(3600));
438 assert_eq!(config.join_type, StreamJoinType::Inner);
439 }
440
441 #[test]
442 fn test_lookup_join_config() {
443 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
444 .with_cache_ttl(Duration::from_secs(600));
445
446 assert_eq!(config.stream_key, "customer_id");
447 assert_eq!(config.lookup_key, "id");
448 assert_eq!(config.cache_ttl, Duration::from_secs(600));
449 assert_eq!(config.join_type, LookupJoinType::Inner);
450 }
451
452 #[test]
453 fn test_from_analysis_lookup() {
454 let analysis = JoinAnalysis::lookup(
455 "orders".to_string(),
456 "customers".to_string(),
457 "customer_id".to_string(),
458 "id".to_string(),
459 JoinType::Inner,
460 );
461
462 let config = JoinOperatorConfig::from_analysis(&analysis);
463
464 assert!(config.is_lookup());
465 assert!(!config.is_stream_stream());
466 assert_eq!(config.left_key(), "customer_id");
467 assert_eq!(config.right_key(), "id");
468 }
469
470 #[test]
471 fn test_from_analysis_stream_stream() {
472 let analysis = JoinAnalysis::stream_stream(
473 "orders".to_string(),
474 "payments".to_string(),
475 "order_id".to_string(),
476 "order_id".to_string(),
477 Duration::from_secs(3600),
478 JoinType::Inner,
479 );
480
481 let config = JoinOperatorConfig::from_analysis(&analysis);
482
483 assert!(config.is_stream_stream());
484 assert!(!config.is_lookup());
485
486 if let JoinOperatorConfig::StreamStream(stream_config) = config {
487 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
488 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
489 }
490 }
491
492 #[test]
493 fn test_from_analysis_asof() {
494 let analysis = JoinAnalysis::asof(
495 "trades".to_string(),
496 "quotes".to_string(),
497 "symbol".to_string(),
498 "symbol".to_string(),
499 AsofSqlDirection::Backward,
500 "ts".to_string(),
501 "ts".to_string(),
502 Some(Duration::from_secs(5)),
503 );
504
505 let config = JoinOperatorConfig::from_analysis(&analysis);
506 assert!(config.is_asof());
507 assert!(!config.is_stream_stream());
508 assert!(!config.is_lookup());
509 }
510
511 #[test]
512 fn test_asof_config_fields() {
513 let analysis = JoinAnalysis::asof(
514 "trades".to_string(),
515 "quotes".to_string(),
516 "symbol".to_string(),
517 "symbol".to_string(),
518 AsofSqlDirection::Forward,
519 "trade_ts".to_string(),
520 "quote_ts".to_string(),
521 Some(Duration::from_millis(5000)),
522 );
523
524 let config = JoinOperatorConfig::from_analysis(&analysis);
525 if let JoinOperatorConfig::Asof(asof) = config {
526 assert_eq!(asof.direction, AsofSqlDirection::Forward);
527 assert_eq!(asof.left_time_column, "trade_ts");
528 assert_eq!(asof.right_time_column, "quote_ts");
529 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
530 assert_eq!(asof.key_column, "symbol");
531 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
532 } else {
533 panic!("Expected Asof config");
534 }
535 }
536
537 #[test]
538 fn test_asof_is_asof() {
539 let analysis = JoinAnalysis::asof(
540 "a".to_string(),
541 "b".to_string(),
542 "id".to_string(),
543 "id".to_string(),
544 AsofSqlDirection::Backward,
545 "ts".to_string(),
546 "ts".to_string(),
547 None,
548 );
549
550 let config = JoinOperatorConfig::from_analysis(&analysis);
551 assert!(config.is_asof());
552 }
553
554 #[test]
555 fn test_asof_key_accessors() {
556 let analysis = JoinAnalysis::asof(
557 "trades".to_string(),
558 "quotes".to_string(),
559 "sym".to_string(),
560 "sym".to_string(),
561 AsofSqlDirection::Backward,
562 "ts".to_string(),
563 "ts".to_string(),
564 None,
565 );
566
567 let config = JoinOperatorConfig::from_analysis(&analysis);
568 assert_eq!(config.left_key(), "sym");
569 assert_eq!(config.right_key(), "sym");
570 }
571
572 #[test]
575 fn test_from_multi_analysis_single() {
576 let analysis = JoinAnalysis::lookup(
577 "a".to_string(),
578 "b".to_string(),
579 "id".to_string(),
580 "id".to_string(),
581 JoinType::Inner,
582 );
583 let multi = MultiJoinAnalysis {
584 joins: vec![analysis],
585 tables: vec!["a".to_string(), "b".to_string()],
586 };
587
588 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
589 assert_eq!(configs.len(), 1);
590 assert!(configs[0].is_lookup());
591 }
592
593 #[test]
594 fn test_from_multi_analysis_two_lookups() {
595 let j1 = JoinAnalysis::lookup(
596 "a".to_string(),
597 "b".to_string(),
598 "id".to_string(),
599 "a_id".to_string(),
600 JoinType::Inner,
601 );
602 let j2 = JoinAnalysis::lookup(
603 "b".to_string(),
604 "c".to_string(),
605 "id".to_string(),
606 "b_id".to_string(),
607 JoinType::Left,
608 );
609 let multi = MultiJoinAnalysis {
610 joins: vec![j1, j2],
611 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
612 };
613
614 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
615 assert_eq!(configs.len(), 2);
616 assert!(configs[0].is_lookup());
617 assert!(configs[1].is_lookup());
618 assert_eq!(configs[0].left_key(), "id");
619 assert_eq!(configs[1].left_key(), "id");
620 }
621
622 #[test]
623 fn test_from_multi_analysis_mixed_asof_lookup() {
624 let j1 = JoinAnalysis::asof(
625 "trades".to_string(),
626 "quotes".to_string(),
627 "symbol".to_string(),
628 "symbol".to_string(),
629 AsofSqlDirection::Backward,
630 "ts".to_string(),
631 "ts".to_string(),
632 None,
633 );
634 let j2 = JoinAnalysis::lookup(
635 "quotes".to_string(),
636 "products".to_string(),
637 "product_id".to_string(),
638 "id".to_string(),
639 JoinType::Inner,
640 );
641 let multi = MultiJoinAnalysis {
642 joins: vec![j1, j2],
643 tables: vec![
644 "trades".to_string(),
645 "quotes".to_string(),
646 "products".to_string(),
647 ],
648 };
649
650 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
651 assert_eq!(configs.len(), 2);
652 assert!(configs[0].is_asof());
653 assert!(configs[1].is_lookup());
654 }
655
656 #[test]
657 fn test_from_multi_analysis_stream_stream_and_lookup() {
658 let j1 = JoinAnalysis::stream_stream(
659 "orders".to_string(),
660 "payments".to_string(),
661 "id".to_string(),
662 "order_id".to_string(),
663 Duration::from_secs(3600),
664 JoinType::Inner,
665 );
666 let j2 = JoinAnalysis::lookup(
667 "payments".to_string(),
668 "customers".to_string(),
669 "cust_id".to_string(),
670 "id".to_string(),
671 JoinType::Left,
672 );
673 let multi = MultiJoinAnalysis {
674 joins: vec![j1, j2],
675 tables: vec![
676 "orders".to_string(),
677 "payments".to_string(),
678 "customers".to_string(),
679 ],
680 };
681
682 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
683 assert_eq!(configs.len(), 2);
684 assert!(configs[0].is_stream_stream());
685 assert!(configs[1].is_lookup());
686 }
687
688 #[test]
689 fn test_from_multi_analysis_order_preserved() {
690 let j1 = JoinAnalysis::lookup(
691 "a".to_string(),
692 "b".to_string(),
693 "k1".to_string(),
694 "k1".to_string(),
695 JoinType::Inner,
696 );
697 let j2 = JoinAnalysis::stream_stream(
698 "b".to_string(),
699 "c".to_string(),
700 "k2".to_string(),
701 "k2".to_string(),
702 Duration::from_secs(60),
703 JoinType::Left,
704 );
705 let j3 = JoinAnalysis::lookup(
706 "c".to_string(),
707 "d".to_string(),
708 "k3".to_string(),
709 "k3".to_string(),
710 JoinType::Inner,
711 );
712 let multi = MultiJoinAnalysis {
713 joins: vec![j1, j2, j3],
714 tables: vec![
715 "a".to_string(),
716 "b".to_string(),
717 "c".to_string(),
718 "d".to_string(),
719 ],
720 };
721
722 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
723 assert_eq!(configs.len(), 3);
724 assert!(configs[0].is_lookup());
725 assert!(configs[1].is_stream_stream());
726 assert!(configs[2].is_lookup());
727 assert_eq!(configs[0].left_key(), "k1");
728 assert_eq!(configs[1].left_key(), "k2");
729 assert_eq!(configs[2].left_key(), "k3");
730 }
731
732 #[test]
733 fn test_join_types() {
734 let left_analysis = JoinAnalysis::stream_stream(
736 "a".to_string(),
737 "b".to_string(),
738 "id".to_string(),
739 "id".to_string(),
740 Duration::from_secs(60),
741 JoinType::Left,
742 );
743
744 if let JoinOperatorConfig::StreamStream(config) =
745 JoinOperatorConfig::from_analysis(&left_analysis)
746 {
747 assert_eq!(config.join_type, StreamJoinType::Left);
748 }
749
750 let right_analysis = JoinAnalysis::stream_stream(
751 "a".to_string(),
752 "b".to_string(),
753 "id".to_string(),
754 "id".to_string(),
755 Duration::from_secs(60),
756 JoinType::Right,
757 );
758
759 if let JoinOperatorConfig::StreamStream(config) =
760 JoinOperatorConfig::from_analysis(&right_analysis)
761 {
762 assert_eq!(config.join_type, StreamJoinType::Right);
763 }
764
765 let full_analysis = JoinAnalysis::stream_stream(
766 "a".to_string(),
767 "b".to_string(),
768 "id".to_string(),
769 "id".to_string(),
770 Duration::from_secs(60),
771 JoinType::Full,
772 );
773
774 if let JoinOperatorConfig::StreamStream(config) =
775 JoinOperatorConfig::from_analysis(&full_analysis)
776 {
777 assert_eq!(config.join_type, StreamJoinType::Full);
778 }
779 }
780
781 #[test]
782 fn test_display_stream_join() {
783 let config = StreamJoinConfig::inner(
784 "order_id".to_string(),
785 "order_id".to_string(),
786 Duration::from_secs(3600),
787 );
788 assert_eq!(
789 format!("{config}"),
790 "INNER JOIN ON left.order_id = right.order_id (bound: 3600s)"
791 );
792 }
793
794 #[test]
795 fn test_display_lookup_join() {
796 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
797 assert_eq!(
798 format!("{config}"),
799 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
800 );
801 }
802
803 #[test]
804 fn test_display_asof_join() {
805 let analysis = JoinAnalysis::asof(
806 "trades".to_string(),
807 "quotes".to_string(),
808 "symbol".to_string(),
809 "symbol".to_string(),
810 AsofSqlDirection::Backward,
811 "ts".to_string(),
812 "ts".to_string(),
813 Some(Duration::from_secs(5)),
814 );
815 let config = JoinOperatorConfig::from_analysis(&analysis);
816 let s = format!("{config}");
817 assert!(s.contains("ASOF JOIN"), "got: {s}");
818 assert!(s.contains("BACKWARD"), "got: {s}");
819 assert!(s.contains("tolerance: 5s"), "got: {s}");
820 }
821
822 #[test]
823 fn test_display_join_types() {
824 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
825 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
826 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
827 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
828 assert_eq!(format!("{}", StreamJoinType::LeftSemi), "LEFT SEMI");
829 assert_eq!(format!("{}", StreamJoinType::LeftAnti), "LEFT ANTI");
830 assert_eq!(format!("{}", StreamJoinType::RightSemi), "RIGHT SEMI");
831 assert_eq!(format!("{}", StreamJoinType::RightAnti), "RIGHT ANTI");
832 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
833 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
834 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
835 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
836 }
837
838 #[test]
839 fn test_from_analysis_semi_anti() {
840 let semi = JoinAnalysis::stream_stream(
841 "a".to_string(),
842 "b".to_string(),
843 "id".to_string(),
844 "id".to_string(),
845 Duration::from_secs(60),
846 JoinType::LeftSemi,
847 );
848 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
849 assert_eq!(config.join_type, StreamJoinType::LeftSemi);
850 } else {
851 panic!("Expected StreamStream config");
852 }
853
854 let anti = JoinAnalysis::stream_stream(
855 "a".to_string(),
856 "b".to_string(),
857 "id".to_string(),
858 "id".to_string(),
859 Duration::from_secs(60),
860 JoinType::RightAnti,
861 );
862 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
863 assert_eq!(config.join_type, StreamJoinType::RightAnti);
864 } else {
865 panic!("Expected StreamStream config");
866 }
867 }
868
869 #[test]
870 fn test_from_analysis_temporal() {
871 let analysis = JoinAnalysis::temporal(
872 "orders".to_string(),
873 "products".to_string(),
874 "product_id".to_string(),
875 "id".to_string(),
876 "order_time".to_string(),
877 JoinType::Inner,
878 );
879
880 let config = JoinOperatorConfig::from_analysis(&analysis);
881 assert!(config.is_temporal());
882 assert!(!config.is_asof());
883 assert!(!config.is_lookup());
884 assert!(!config.is_stream_stream());
885 assert_eq!(config.left_key(), "product_id");
886 assert_eq!(config.right_key(), "id");
887
888 if let JoinOperatorConfig::Temporal(tc) = config {
889 assert_eq!(tc.table_version_column, "order_time");
890 assert_eq!(tc.semantics, "event_time");
891 assert_eq!(tc.join_type, "inner");
892 } else {
893 panic!("Expected Temporal config");
894 }
895 }
896
897 #[test]
898 fn test_temporal_left_join() {
899 let analysis = JoinAnalysis::temporal(
900 "orders".to_string(),
901 "products".to_string(),
902 "product_id".to_string(),
903 "id".to_string(),
904 "order_time".to_string(),
905 JoinType::Left,
906 );
907
908 let config = JoinOperatorConfig::from_analysis(&analysis);
909 if let JoinOperatorConfig::Temporal(tc) = config {
910 assert_eq!(tc.join_type, "left");
911 } else {
912 panic!("Expected Temporal config");
913 }
914 }
915
916 #[test]
917 fn test_display_temporal_join() {
918 let analysis = JoinAnalysis::temporal(
919 "orders".to_string(),
920 "products".to_string(),
921 "product_id".to_string(),
922 "id".to_string(),
923 "order_time".to_string(),
924 JoinType::Inner,
925 );
926 let config = JoinOperatorConfig::from_analysis(&analysis);
927 let s = format!("{config}");
928 assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
929 assert!(s.contains("order_time"), "got: {s}");
930 }
931}