1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub time_bound: Duration,
19 pub join_type: StreamJoinType,
21}
22
23#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26 pub stream_key: String,
28 pub lookup_key: String,
30 pub join_type: LookupJoinType,
32 pub cache_ttl: Duration,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39 Inner,
41 Left,
43 Right,
45 Full,
47 LeftSemi,
49 LeftAnti,
51 RightSemi,
53 RightAnti,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60 Inner,
62 Left,
64}
65
66#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69 pub stream_table: String,
71 pub table_name: String,
73 pub stream_key_column: String,
75 pub table_key_column: String,
77 pub stream_time_column: String,
79 pub table_version_column: String,
81 pub semantics: String,
83 pub join_type: String,
85}
86
87#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90 pub left_table: String,
92 pub right_table: String,
94 pub key_column: String,
96 pub left_time_column: String,
98 pub right_time_column: String,
100 pub direction: AsofSqlDirection,
102 pub tolerance: Option<Duration>,
104 pub join_type: AsofSqlJoinType,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111 Inner,
113 Left,
115}
116
117#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120 StreamStream(StreamJoinConfig),
122 Lookup(LookupJoinConfig),
124 Asof(AsofJoinTranslatorConfig),
126 Temporal(TemporalJoinTranslatorConfig),
128}
129
130impl std::fmt::Display for StreamJoinType {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 StreamJoinType::Inner => write!(f, "INNER"),
134 StreamJoinType::Left => write!(f, "LEFT"),
135 StreamJoinType::Right => write!(f, "RIGHT"),
136 StreamJoinType::Full => write!(f, "FULL"),
137 StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
138 StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
139 StreamJoinType::RightSemi => write!(f, "RIGHT SEMI"),
140 StreamJoinType::RightAnti => write!(f, "RIGHT ANTI"),
141 }
142 }
143}
144
145impl std::fmt::Display for LookupJoinType {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 match self {
148 LookupJoinType::Inner => write!(f, "INNER"),
149 LookupJoinType::Left => write!(f, "LEFT"),
150 }
151 }
152}
153
154impl std::fmt::Display for AsofSqlJoinType {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 match self {
157 AsofSqlJoinType::Inner => write!(f, "INNER"),
158 AsofSqlJoinType::Left => write!(f, "LEFT"),
159 }
160 }
161}
162
163impl std::fmt::Display for StreamJoinConfig {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 write!(
166 f,
167 "{} JOIN ON left.{} = right.{} (bound: {}s)",
168 self.join_type,
169 self.left_key,
170 self.right_key,
171 self.time_bound.as_secs()
172 )
173 }
174}
175
176impl std::fmt::Display for LookupJoinConfig {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 write!(
179 f,
180 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
181 self.join_type,
182 self.stream_key,
183 self.lookup_key,
184 self.cache_ttl.as_secs()
185 )
186 }
187}
188
189impl std::fmt::Display for AsofJoinTranslatorConfig {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 write!(
192 f,
193 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
194 self.join_type,
195 self.left_table,
196 self.key_column,
197 self.right_table,
198 self.key_column,
199 self.direction,
200 self.left_table,
201 self.left_time_column,
202 self.right_table,
203 self.right_time_column,
204 )?;
205 if let Some(tol) = self.tolerance {
206 write!(f, ", tolerance: {}s", tol.as_secs())?;
207 }
208 write!(f, ")")
209 }
210}
211
212impl std::fmt::Display for TemporalJoinTranslatorConfig {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 write!(
215 f,
216 "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
217 self.join_type.to_uppercase(),
218 self.stream_key_column,
219 self.table_key_column,
220 self.table_version_column,
221 self.semantics,
222 )
223 }
224}
225
226impl std::fmt::Display for JoinOperatorConfig {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 match self {
229 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
230 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
231 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
232 JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
233 }
234 }
235}
236
237impl JoinOperatorConfig {
238 #[must_use]
240 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
241 if analysis.is_temporal_join {
242 let join_type_str = match analysis.join_type {
243 JoinType::Left => "left",
244 _ => "inner",
245 };
246 let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
247 return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
248 stream_table: analysis.left_table.clone(),
249 table_name: analysis.right_table.clone(),
250 stream_key_column: analysis.left_key_column.clone(),
251 table_key_column: analysis.right_key_column.clone(),
252 stream_time_column: version_col.clone(),
253 table_version_column: version_col,
254 semantics: "event_time".to_string(),
255 join_type: join_type_str.to_string(),
256 });
257 }
258
259 if analysis.is_asof_join {
260 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
261 left_table: analysis.left_table.clone(),
262 right_table: analysis.right_table.clone(),
263 key_column: analysis.left_key_column.clone(),
264 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
265 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
266 direction: analysis
267 .asof_direction
268 .unwrap_or(AsofSqlDirection::Backward),
269 tolerance: analysis.asof_tolerance,
270 join_type: AsofSqlJoinType::Left, });
272 }
273
274 if analysis.is_lookup_join {
275 JoinOperatorConfig::Lookup(LookupJoinConfig {
276 stream_key: analysis.left_key_column.clone(),
277 lookup_key: analysis.right_key_column.clone(),
278 join_type: match analysis.join_type {
279 JoinType::Inner => LookupJoinType::Inner,
280 _ => LookupJoinType::Left,
281 },
282 cache_ttl: Duration::from_secs(300), })
284 } else {
285 JoinOperatorConfig::StreamStream(StreamJoinConfig {
286 left_key: analysis.left_key_column.clone(),
287 right_key: analysis.right_key_column.clone(),
288 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
289 join_type: match analysis.join_type {
290 JoinType::Inner => StreamJoinType::Inner,
291 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
292 JoinType::Right => StreamJoinType::Right,
293 JoinType::Full => StreamJoinType::Full,
294 JoinType::LeftSemi => StreamJoinType::LeftSemi,
295 JoinType::LeftAnti => StreamJoinType::LeftAnti,
296 JoinType::RightSemi => StreamJoinType::RightSemi,
297 JoinType::RightAnti => StreamJoinType::RightAnti,
298 },
299 })
300 }
301 }
302
303 #[must_use]
305 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
306 multi.joins.iter().map(Self::from_analysis).collect()
307 }
308
309 #[must_use]
311 pub fn is_stream_stream(&self) -> bool {
312 matches!(self, JoinOperatorConfig::StreamStream(_))
313 }
314
315 #[must_use]
317 pub fn is_lookup(&self) -> bool {
318 matches!(self, JoinOperatorConfig::Lookup(_))
319 }
320
321 #[must_use]
323 pub fn is_asof(&self) -> bool {
324 matches!(self, JoinOperatorConfig::Asof(_))
325 }
326
327 #[must_use]
329 pub fn is_temporal(&self) -> bool {
330 matches!(self, JoinOperatorConfig::Temporal(_))
331 }
332
333 #[must_use]
335 pub fn left_key(&self) -> &str {
336 match self {
337 JoinOperatorConfig::StreamStream(config) => &config.left_key,
338 JoinOperatorConfig::Lookup(config) => &config.stream_key,
339 JoinOperatorConfig::Asof(config) => &config.key_column,
340 JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
341 }
342 }
343
344 #[must_use]
346 pub fn right_key(&self) -> &str {
347 match self {
348 JoinOperatorConfig::StreamStream(config) => &config.right_key,
349 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
350 JoinOperatorConfig::Asof(config) => &config.key_column,
351 JoinOperatorConfig::Temporal(config) => &config.table_key_column,
352 }
353 }
354}
355
356impl StreamJoinConfig {
357 #[must_use]
359 pub fn new(
360 left_key: String,
361 right_key: String,
362 time_bound: Duration,
363 join_type: StreamJoinType,
364 ) -> Self {
365 Self {
366 left_key,
367 right_key,
368 time_bound,
369 join_type,
370 }
371 }
372
373 #[must_use]
375 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
376 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
377 }
378
379 #[must_use]
381 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
382 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
383 }
384}
385
386impl LookupJoinConfig {
387 #[must_use]
389 pub fn new(
390 stream_key: String,
391 lookup_key: String,
392 join_type: LookupJoinType,
393 cache_ttl: Duration,
394 ) -> Self {
395 Self {
396 stream_key,
397 lookup_key,
398 join_type,
399 cache_ttl,
400 }
401 }
402
403 #[must_use]
405 pub fn inner(stream_key: String, lookup_key: String) -> Self {
406 Self::new(
407 stream_key,
408 lookup_key,
409 LookupJoinType::Inner,
410 Duration::from_secs(300),
411 )
412 }
413
414 #[must_use]
416 pub fn left(stream_key: String, lookup_key: String) -> Self {
417 Self::new(
418 stream_key,
419 lookup_key,
420 LookupJoinType::Left,
421 Duration::from_secs(300),
422 )
423 }
424
425 #[must_use]
427 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
428 self.cache_ttl = ttl;
429 self
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn test_stream_join_config() {
439 let config = StreamJoinConfig::inner(
440 "order_id".to_string(),
441 "order_id".to_string(),
442 Duration::from_secs(3600),
443 );
444
445 assert_eq!(config.left_key, "order_id");
446 assert_eq!(config.right_key, "order_id");
447 assert_eq!(config.time_bound, Duration::from_secs(3600));
448 assert_eq!(config.join_type, StreamJoinType::Inner);
449 }
450
451 #[test]
452 fn test_lookup_join_config() {
453 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
454 .with_cache_ttl(Duration::from_secs(600));
455
456 assert_eq!(config.stream_key, "customer_id");
457 assert_eq!(config.lookup_key, "id");
458 assert_eq!(config.cache_ttl, Duration::from_secs(600));
459 assert_eq!(config.join_type, LookupJoinType::Inner);
460 }
461
462 #[test]
463 fn test_from_analysis_lookup() {
464 let analysis = JoinAnalysis::lookup(
465 "orders".to_string(),
466 "customers".to_string(),
467 "customer_id".to_string(),
468 "id".to_string(),
469 JoinType::Inner,
470 );
471
472 let config = JoinOperatorConfig::from_analysis(&analysis);
473
474 assert!(config.is_lookup());
475 assert!(!config.is_stream_stream());
476 assert_eq!(config.left_key(), "customer_id");
477 assert_eq!(config.right_key(), "id");
478 }
479
480 #[test]
481 fn test_from_analysis_stream_stream() {
482 let analysis = JoinAnalysis::stream_stream(
483 "orders".to_string(),
484 "payments".to_string(),
485 "order_id".to_string(),
486 "order_id".to_string(),
487 Duration::from_secs(3600),
488 JoinType::Inner,
489 );
490
491 let config = JoinOperatorConfig::from_analysis(&analysis);
492
493 assert!(config.is_stream_stream());
494 assert!(!config.is_lookup());
495
496 if let JoinOperatorConfig::StreamStream(stream_config) = config {
497 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
498 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
499 }
500 }
501
502 #[test]
503 fn test_from_analysis_asof() {
504 let analysis = JoinAnalysis::asof(
505 "trades".to_string(),
506 "quotes".to_string(),
507 "symbol".to_string(),
508 "symbol".to_string(),
509 AsofSqlDirection::Backward,
510 "ts".to_string(),
511 "ts".to_string(),
512 Some(Duration::from_secs(5)),
513 );
514
515 let config = JoinOperatorConfig::from_analysis(&analysis);
516 assert!(config.is_asof());
517 assert!(!config.is_stream_stream());
518 assert!(!config.is_lookup());
519 }
520
521 #[test]
522 fn test_asof_config_fields() {
523 let analysis = JoinAnalysis::asof(
524 "trades".to_string(),
525 "quotes".to_string(),
526 "symbol".to_string(),
527 "symbol".to_string(),
528 AsofSqlDirection::Forward,
529 "trade_ts".to_string(),
530 "quote_ts".to_string(),
531 Some(Duration::from_millis(5000)),
532 );
533
534 let config = JoinOperatorConfig::from_analysis(&analysis);
535 if let JoinOperatorConfig::Asof(asof) = config {
536 assert_eq!(asof.direction, AsofSqlDirection::Forward);
537 assert_eq!(asof.left_time_column, "trade_ts");
538 assert_eq!(asof.right_time_column, "quote_ts");
539 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
540 assert_eq!(asof.key_column, "symbol");
541 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
542 } else {
543 panic!("Expected Asof config");
544 }
545 }
546
547 #[test]
548 fn test_asof_is_asof() {
549 let analysis = JoinAnalysis::asof(
550 "a".to_string(),
551 "b".to_string(),
552 "id".to_string(),
553 "id".to_string(),
554 AsofSqlDirection::Backward,
555 "ts".to_string(),
556 "ts".to_string(),
557 None,
558 );
559
560 let config = JoinOperatorConfig::from_analysis(&analysis);
561 assert!(config.is_asof());
562 }
563
564 #[test]
565 fn test_asof_key_accessors() {
566 let analysis = JoinAnalysis::asof(
567 "trades".to_string(),
568 "quotes".to_string(),
569 "sym".to_string(),
570 "sym".to_string(),
571 AsofSqlDirection::Backward,
572 "ts".to_string(),
573 "ts".to_string(),
574 None,
575 );
576
577 let config = JoinOperatorConfig::from_analysis(&analysis);
578 assert_eq!(config.left_key(), "sym");
579 assert_eq!(config.right_key(), "sym");
580 }
581
582 #[test]
585 fn test_from_multi_analysis_single() {
586 let analysis = JoinAnalysis::lookup(
587 "a".to_string(),
588 "b".to_string(),
589 "id".to_string(),
590 "id".to_string(),
591 JoinType::Inner,
592 );
593 let multi = MultiJoinAnalysis {
594 joins: vec![analysis],
595 tables: vec!["a".to_string(), "b".to_string()],
596 };
597
598 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
599 assert_eq!(configs.len(), 1);
600 assert!(configs[0].is_lookup());
601 }
602
603 #[test]
604 fn test_from_multi_analysis_two_lookups() {
605 let j1 = JoinAnalysis::lookup(
606 "a".to_string(),
607 "b".to_string(),
608 "id".to_string(),
609 "a_id".to_string(),
610 JoinType::Inner,
611 );
612 let j2 = JoinAnalysis::lookup(
613 "b".to_string(),
614 "c".to_string(),
615 "id".to_string(),
616 "b_id".to_string(),
617 JoinType::Left,
618 );
619 let multi = MultiJoinAnalysis {
620 joins: vec![j1, j2],
621 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
622 };
623
624 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
625 assert_eq!(configs.len(), 2);
626 assert!(configs[0].is_lookup());
627 assert!(configs[1].is_lookup());
628 assert_eq!(configs[0].left_key(), "id");
629 assert_eq!(configs[1].left_key(), "id");
630 }
631
632 #[test]
633 fn test_from_multi_analysis_mixed_asof_lookup() {
634 let j1 = JoinAnalysis::asof(
635 "trades".to_string(),
636 "quotes".to_string(),
637 "symbol".to_string(),
638 "symbol".to_string(),
639 AsofSqlDirection::Backward,
640 "ts".to_string(),
641 "ts".to_string(),
642 None,
643 );
644 let j2 = JoinAnalysis::lookup(
645 "quotes".to_string(),
646 "products".to_string(),
647 "product_id".to_string(),
648 "id".to_string(),
649 JoinType::Inner,
650 );
651 let multi = MultiJoinAnalysis {
652 joins: vec![j1, j2],
653 tables: vec![
654 "trades".to_string(),
655 "quotes".to_string(),
656 "products".to_string(),
657 ],
658 };
659
660 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
661 assert_eq!(configs.len(), 2);
662 assert!(configs[0].is_asof());
663 assert!(configs[1].is_lookup());
664 }
665
666 #[test]
667 fn test_from_multi_analysis_stream_stream_and_lookup() {
668 let j1 = JoinAnalysis::stream_stream(
669 "orders".to_string(),
670 "payments".to_string(),
671 "id".to_string(),
672 "order_id".to_string(),
673 Duration::from_secs(3600),
674 JoinType::Inner,
675 );
676 let j2 = JoinAnalysis::lookup(
677 "payments".to_string(),
678 "customers".to_string(),
679 "cust_id".to_string(),
680 "id".to_string(),
681 JoinType::Left,
682 );
683 let multi = MultiJoinAnalysis {
684 joins: vec![j1, j2],
685 tables: vec![
686 "orders".to_string(),
687 "payments".to_string(),
688 "customers".to_string(),
689 ],
690 };
691
692 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
693 assert_eq!(configs.len(), 2);
694 assert!(configs[0].is_stream_stream());
695 assert!(configs[1].is_lookup());
696 }
697
698 #[test]
699 fn test_from_multi_analysis_order_preserved() {
700 let j1 = JoinAnalysis::lookup(
701 "a".to_string(),
702 "b".to_string(),
703 "k1".to_string(),
704 "k1".to_string(),
705 JoinType::Inner,
706 );
707 let j2 = JoinAnalysis::stream_stream(
708 "b".to_string(),
709 "c".to_string(),
710 "k2".to_string(),
711 "k2".to_string(),
712 Duration::from_secs(60),
713 JoinType::Left,
714 );
715 let j3 = JoinAnalysis::lookup(
716 "c".to_string(),
717 "d".to_string(),
718 "k3".to_string(),
719 "k3".to_string(),
720 JoinType::Inner,
721 );
722 let multi = MultiJoinAnalysis {
723 joins: vec![j1, j2, j3],
724 tables: vec![
725 "a".to_string(),
726 "b".to_string(),
727 "c".to_string(),
728 "d".to_string(),
729 ],
730 };
731
732 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
733 assert_eq!(configs.len(), 3);
734 assert!(configs[0].is_lookup());
735 assert!(configs[1].is_stream_stream());
736 assert!(configs[2].is_lookup());
737 assert_eq!(configs[0].left_key(), "k1");
738 assert_eq!(configs[1].left_key(), "k2");
739 assert_eq!(configs[2].left_key(), "k3");
740 }
741
742 #[test]
743 fn test_join_types() {
744 let left_analysis = JoinAnalysis::stream_stream(
746 "a".to_string(),
747 "b".to_string(),
748 "id".to_string(),
749 "id".to_string(),
750 Duration::from_secs(60),
751 JoinType::Left,
752 );
753
754 if let JoinOperatorConfig::StreamStream(config) =
755 JoinOperatorConfig::from_analysis(&left_analysis)
756 {
757 assert_eq!(config.join_type, StreamJoinType::Left);
758 }
759
760 let right_analysis = JoinAnalysis::stream_stream(
761 "a".to_string(),
762 "b".to_string(),
763 "id".to_string(),
764 "id".to_string(),
765 Duration::from_secs(60),
766 JoinType::Right,
767 );
768
769 if let JoinOperatorConfig::StreamStream(config) =
770 JoinOperatorConfig::from_analysis(&right_analysis)
771 {
772 assert_eq!(config.join_type, StreamJoinType::Right);
773 }
774
775 let full_analysis = JoinAnalysis::stream_stream(
776 "a".to_string(),
777 "b".to_string(),
778 "id".to_string(),
779 "id".to_string(),
780 Duration::from_secs(60),
781 JoinType::Full,
782 );
783
784 if let JoinOperatorConfig::StreamStream(config) =
785 JoinOperatorConfig::from_analysis(&full_analysis)
786 {
787 assert_eq!(config.join_type, StreamJoinType::Full);
788 }
789 }
790
791 #[test]
792 fn test_display_stream_join() {
793 let config = StreamJoinConfig::inner(
794 "order_id".to_string(),
795 "order_id".to_string(),
796 Duration::from_secs(3600),
797 );
798 assert_eq!(
799 format!("{config}"),
800 "INNER JOIN ON left.order_id = right.order_id (bound: 3600s)"
801 );
802 }
803
804 #[test]
805 fn test_display_lookup_join() {
806 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
807 assert_eq!(
808 format!("{config}"),
809 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
810 );
811 }
812
813 #[test]
814 fn test_display_asof_join() {
815 let analysis = JoinAnalysis::asof(
816 "trades".to_string(),
817 "quotes".to_string(),
818 "symbol".to_string(),
819 "symbol".to_string(),
820 AsofSqlDirection::Backward,
821 "ts".to_string(),
822 "ts".to_string(),
823 Some(Duration::from_secs(5)),
824 );
825 let config = JoinOperatorConfig::from_analysis(&analysis);
826 let s = format!("{config}");
827 assert!(s.contains("ASOF JOIN"), "got: {s}");
828 assert!(s.contains("BACKWARD"), "got: {s}");
829 assert!(s.contains("tolerance: 5s"), "got: {s}");
830 }
831
832 #[test]
833 fn test_display_join_types() {
834 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
835 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
836 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
837 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
838 assert_eq!(format!("{}", StreamJoinType::LeftSemi), "LEFT SEMI");
839 assert_eq!(format!("{}", StreamJoinType::LeftAnti), "LEFT ANTI");
840 assert_eq!(format!("{}", StreamJoinType::RightSemi), "RIGHT SEMI");
841 assert_eq!(format!("{}", StreamJoinType::RightAnti), "RIGHT ANTI");
842 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
843 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
844 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
845 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
846 }
847
848 #[test]
849 fn test_from_analysis_semi_anti() {
850 let semi = JoinAnalysis::stream_stream(
851 "a".to_string(),
852 "b".to_string(),
853 "id".to_string(),
854 "id".to_string(),
855 Duration::from_secs(60),
856 JoinType::LeftSemi,
857 );
858 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
859 assert_eq!(config.join_type, StreamJoinType::LeftSemi);
860 } else {
861 panic!("Expected StreamStream config");
862 }
863
864 let anti = JoinAnalysis::stream_stream(
865 "a".to_string(),
866 "b".to_string(),
867 "id".to_string(),
868 "id".to_string(),
869 Duration::from_secs(60),
870 JoinType::RightAnti,
871 );
872 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
873 assert_eq!(config.join_type, StreamJoinType::RightAnti);
874 } else {
875 panic!("Expected StreamStream config");
876 }
877 }
878
879 #[test]
880 fn test_from_analysis_temporal() {
881 let analysis = JoinAnalysis::temporal(
882 "orders".to_string(),
883 "products".to_string(),
884 "product_id".to_string(),
885 "id".to_string(),
886 "order_time".to_string(),
887 JoinType::Inner,
888 );
889
890 let config = JoinOperatorConfig::from_analysis(&analysis);
891 assert!(config.is_temporal());
892 assert!(!config.is_asof());
893 assert!(!config.is_lookup());
894 assert!(!config.is_stream_stream());
895 assert_eq!(config.left_key(), "product_id");
896 assert_eq!(config.right_key(), "id");
897
898 if let JoinOperatorConfig::Temporal(tc) = config {
899 assert_eq!(tc.table_version_column, "order_time");
900 assert_eq!(tc.semantics, "event_time");
901 assert_eq!(tc.join_type, "inner");
902 } else {
903 panic!("Expected Temporal config");
904 }
905 }
906
907 #[test]
908 fn test_temporal_left_join() {
909 let analysis = JoinAnalysis::temporal(
910 "orders".to_string(),
911 "products".to_string(),
912 "product_id".to_string(),
913 "id".to_string(),
914 "order_time".to_string(),
915 JoinType::Left,
916 );
917
918 let config = JoinOperatorConfig::from_analysis(&analysis);
919 if let JoinOperatorConfig::Temporal(tc) = config {
920 assert_eq!(tc.join_type, "left");
921 } else {
922 panic!("Expected Temporal config");
923 }
924 }
925
926 #[test]
927 fn test_display_temporal_join() {
928 let analysis = JoinAnalysis::temporal(
929 "orders".to_string(),
930 "products".to_string(),
931 "product_id".to_string(),
932 "id".to_string(),
933 "order_time".to_string(),
934 JoinType::Inner,
935 );
936 let config = JoinOperatorConfig::from_analysis(&analysis);
937 let s = format!("{config}");
938 assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
939 assert!(s.contains("order_time"), "got: {s}");
940 }
941}