1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub left_time_column: String,
19 pub right_time_column: String,
21 pub left_table: String,
23 pub right_table: String,
25 pub time_bound: Duration,
27 pub join_type: StreamJoinType,
29}
30
31#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34 pub stream_key: String,
36 pub lookup_key: String,
38 pub join_type: LookupJoinType,
40 pub cache_ttl: Duration,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47 Inner,
49 Left,
51 Right,
53 Full,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60 Inner,
62 Left,
64}
65
66#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69 pub stream_table: String,
71 pub table_name: String,
73 pub stream_key_column: String,
75 pub table_key_column: String,
77 pub stream_time_column: String,
79 pub table_version_column: String,
81 pub semantics: String,
83 pub join_type: String,
85}
86
87#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90 pub left_table: String,
92 pub right_table: String,
94 pub key_column: String,
96 pub left_time_column: String,
98 pub right_time_column: String,
100 pub direction: AsofSqlDirection,
102 pub tolerance: Option<Duration>,
104 pub join_type: AsofSqlJoinType,
106}
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111 Inner,
113 Left,
115}
116
117#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120 StreamStream(StreamJoinConfig),
122 Lookup(LookupJoinConfig),
124 Asof(AsofJoinTranslatorConfig),
126 Temporal(TemporalJoinTranslatorConfig),
128 TemporalProbe(super::temporal_probe::TemporalProbeConfig),
130}
131
132impl std::fmt::Display for StreamJoinType {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 match self {
135 StreamJoinType::Inner => write!(f, "INNER"),
136 StreamJoinType::Left => write!(f, "LEFT"),
137 StreamJoinType::Right => write!(f, "RIGHT"),
138 StreamJoinType::Full => write!(f, "FULL"),
139 }
140 }
141}
142
143impl std::fmt::Display for LookupJoinType {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 match self {
146 LookupJoinType::Inner => write!(f, "INNER"),
147 LookupJoinType::Left => write!(f, "LEFT"),
148 }
149 }
150}
151
152impl std::fmt::Display for AsofSqlJoinType {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 match self {
155 AsofSqlJoinType::Inner => write!(f, "INNER"),
156 AsofSqlJoinType::Left => write!(f, "LEFT"),
157 }
158 }
159}
160
161impl std::fmt::Display for StreamJoinConfig {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 write!(
164 f,
165 "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
166 self.join_type,
167 self.left_table,
168 self.left_key,
169 self.right_table,
170 self.right_key,
171 self.time_bound.as_secs(),
172 self.left_time_column,
173 self.right_time_column,
174 )
175 }
176}
177
178impl std::fmt::Display for LookupJoinConfig {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 write!(
181 f,
182 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
183 self.join_type,
184 self.stream_key,
185 self.lookup_key,
186 self.cache_ttl.as_secs()
187 )
188 }
189}
190
191impl std::fmt::Display for AsofJoinTranslatorConfig {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 write!(
194 f,
195 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
196 self.join_type,
197 self.left_table,
198 self.key_column,
199 self.right_table,
200 self.key_column,
201 self.direction,
202 self.left_table,
203 self.left_time_column,
204 self.right_table,
205 self.right_time_column,
206 )?;
207 if let Some(tol) = self.tolerance {
208 write!(f, ", tolerance: {}s", tol.as_secs())?;
209 }
210 write!(f, ")")
211 }
212}
213
214impl std::fmt::Display for TemporalJoinTranslatorConfig {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 write!(
217 f,
218 "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
219 self.join_type.to_uppercase(),
220 self.stream_key_column,
221 self.table_key_column,
222 self.table_version_column,
223 self.semantics,
224 )
225 }
226}
227
228impl std::fmt::Display for JoinOperatorConfig {
229 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230 match self {
231 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
232 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
233 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
234 JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
235 JoinOperatorConfig::TemporalProbe(c) => write!(f, "{c}"),
236 }
237 }
238}
239
240impl JoinOperatorConfig {
241 #[must_use]
243 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
244 if analysis.is_temporal_join {
245 let join_type_str = match analysis.join_type {
246 JoinType::Left => "left",
247 _ => "inner",
248 };
249 let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
250 return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
251 stream_table: analysis.left_table.clone(),
252 table_name: analysis.right_table.clone(),
253 stream_key_column: analysis.left_key_column.clone(),
254 table_key_column: analysis.right_key_column.clone(),
255 stream_time_column: version_col.clone(),
256 table_version_column: version_col,
257 semantics: "event_time".to_string(),
258 join_type: join_type_str.to_string(),
259 });
260 }
261
262 if analysis.is_asof_join {
263 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
264 left_table: analysis.left_table.clone(),
265 right_table: analysis.right_table.clone(),
266 key_column: analysis.left_key_column.clone(),
267 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
268 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
269 direction: analysis
270 .asof_direction
271 .unwrap_or(AsofSqlDirection::Backward),
272 tolerance: analysis.asof_tolerance,
273 join_type: if analysis.join_type == JoinType::Inner {
274 AsofSqlJoinType::Inner
275 } else {
276 AsofSqlJoinType::Left
277 },
278 });
279 }
280
281 if analysis.is_lookup_join {
282 JoinOperatorConfig::Lookup(LookupJoinConfig {
283 stream_key: analysis.left_key_column.clone(),
284 lookup_key: analysis.right_key_column.clone(),
285 join_type: match analysis.join_type {
286 JoinType::Inner => LookupJoinType::Inner,
287 _ => LookupJoinType::Left,
288 },
289 cache_ttl: Duration::from_secs(300), })
291 } else {
292 JoinOperatorConfig::StreamStream(StreamJoinConfig {
293 left_key: analysis.left_key_column.clone(),
294 right_key: analysis.right_key_column.clone(),
295 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
296 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
297 left_table: analysis.left_table.clone(),
298 right_table: analysis.right_table.clone(),
299 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
300 join_type: match analysis.join_type {
301 JoinType::Inner
302 | JoinType::LeftSemi
303 | JoinType::LeftAnti
304 | JoinType::RightSemi
305 | JoinType::RightAnti => StreamJoinType::Inner,
306 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
307 JoinType::Right => StreamJoinType::Right,
308 JoinType::Full => StreamJoinType::Full,
309 },
310 })
311 }
312 }
313
314 #[must_use]
316 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
317 multi.joins.iter().map(Self::from_analysis).collect()
318 }
319
320 #[must_use]
322 pub fn is_stream_stream(&self) -> bool {
323 matches!(self, JoinOperatorConfig::StreamStream(_))
324 }
325
326 #[must_use]
328 pub fn is_lookup(&self) -> bool {
329 matches!(self, JoinOperatorConfig::Lookup(_))
330 }
331
332 #[must_use]
334 pub fn is_asof(&self) -> bool {
335 matches!(self, JoinOperatorConfig::Asof(_))
336 }
337
338 #[must_use]
340 pub fn is_temporal(&self) -> bool {
341 matches!(self, JoinOperatorConfig::Temporal(_))
342 }
343
344 #[must_use]
346 pub fn left_key(&self) -> &str {
347 match self {
348 JoinOperatorConfig::StreamStream(config) => &config.left_key,
349 JoinOperatorConfig::Lookup(config) => &config.stream_key,
350 JoinOperatorConfig::Asof(config) => &config.key_column,
351 JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
352 JoinOperatorConfig::TemporalProbe(config) => {
353 config.key_columns.first().map_or("", String::as_str)
354 }
355 }
356 }
357
358 #[must_use]
360 pub fn right_key(&self) -> &str {
361 match self {
362 JoinOperatorConfig::StreamStream(config) => &config.right_key,
363 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
364 JoinOperatorConfig::Asof(config) => &config.key_column,
365 JoinOperatorConfig::Temporal(config) => &config.table_key_column,
366 JoinOperatorConfig::TemporalProbe(config) => {
367 config.key_columns.first().map_or("", String::as_str)
368 }
369 }
370 }
371}
372
373impl StreamJoinConfig {
374 #[must_use]
376 pub fn new(
377 left_key: String,
378 right_key: String,
379 time_bound: Duration,
380 join_type: StreamJoinType,
381 ) -> Self {
382 Self {
383 left_key,
384 right_key,
385 left_time_column: String::new(),
386 right_time_column: String::new(),
387 left_table: String::new(),
388 right_table: String::new(),
389 time_bound,
390 join_type,
391 }
392 }
393
394 #[must_use]
396 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
397 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
398 }
399
400 #[must_use]
402 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
403 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
404 }
405}
406
407impl LookupJoinConfig {
408 #[must_use]
410 pub fn new(
411 stream_key: String,
412 lookup_key: String,
413 join_type: LookupJoinType,
414 cache_ttl: Duration,
415 ) -> Self {
416 Self {
417 stream_key,
418 lookup_key,
419 join_type,
420 cache_ttl,
421 }
422 }
423
424 #[must_use]
426 pub fn inner(stream_key: String, lookup_key: String) -> Self {
427 Self::new(
428 stream_key,
429 lookup_key,
430 LookupJoinType::Inner,
431 Duration::from_secs(300),
432 )
433 }
434
435 #[must_use]
437 pub fn left(stream_key: String, lookup_key: String) -> Self {
438 Self::new(
439 stream_key,
440 lookup_key,
441 LookupJoinType::Left,
442 Duration::from_secs(300),
443 )
444 }
445
446 #[must_use]
448 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
449 self.cache_ttl = ttl;
450 self
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 #[test]
459 fn test_stream_join_config() {
460 let config = StreamJoinConfig::inner(
461 "order_id".to_string(),
462 "order_id".to_string(),
463 Duration::from_secs(3600),
464 );
465
466 assert_eq!(config.left_key, "order_id");
467 assert_eq!(config.right_key, "order_id");
468 assert_eq!(config.time_bound, Duration::from_secs(3600));
469 assert_eq!(config.join_type, StreamJoinType::Inner);
470 }
471
472 #[test]
473 fn test_lookup_join_config() {
474 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
475 .with_cache_ttl(Duration::from_secs(600));
476
477 assert_eq!(config.stream_key, "customer_id");
478 assert_eq!(config.lookup_key, "id");
479 assert_eq!(config.cache_ttl, Duration::from_secs(600));
480 assert_eq!(config.join_type, LookupJoinType::Inner);
481 }
482
483 #[test]
484 fn test_from_analysis_lookup() {
485 let analysis = JoinAnalysis::lookup(
486 "orders".to_string(),
487 "customers".to_string(),
488 "customer_id".to_string(),
489 "id".to_string(),
490 JoinType::Inner,
491 );
492
493 let config = JoinOperatorConfig::from_analysis(&analysis);
494
495 assert!(config.is_lookup());
496 assert!(!config.is_stream_stream());
497 assert_eq!(config.left_key(), "customer_id");
498 assert_eq!(config.right_key(), "id");
499 }
500
501 #[test]
502 fn test_from_analysis_stream_stream() {
503 let analysis = JoinAnalysis::stream_stream(
504 "orders".to_string(),
505 "payments".to_string(),
506 "order_id".to_string(),
507 "order_id".to_string(),
508 Duration::from_secs(3600),
509 JoinType::Inner,
510 );
511
512 let config = JoinOperatorConfig::from_analysis(&analysis);
513
514 assert!(config.is_stream_stream());
515 assert!(!config.is_lookup());
516
517 if let JoinOperatorConfig::StreamStream(stream_config) = config {
518 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
519 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
520 }
521 }
522
523 #[test]
524 fn test_from_analysis_asof() {
525 let analysis = JoinAnalysis::asof(
526 "trades".to_string(),
527 "quotes".to_string(),
528 "symbol".to_string(),
529 "symbol".to_string(),
530 AsofSqlDirection::Backward,
531 "ts".to_string(),
532 "ts".to_string(),
533 Some(Duration::from_secs(5)),
534 );
535
536 let config = JoinOperatorConfig::from_analysis(&analysis);
537 assert!(config.is_asof());
538 assert!(!config.is_stream_stream());
539 assert!(!config.is_lookup());
540 }
541
542 #[test]
543 fn test_asof_config_fields() {
544 let analysis = JoinAnalysis::asof(
545 "trades".to_string(),
546 "quotes".to_string(),
547 "symbol".to_string(),
548 "symbol".to_string(),
549 AsofSqlDirection::Forward,
550 "trade_ts".to_string(),
551 "quote_ts".to_string(),
552 Some(Duration::from_millis(5000)),
553 );
554
555 let config = JoinOperatorConfig::from_analysis(&analysis);
556 if let JoinOperatorConfig::Asof(asof) = config {
557 assert_eq!(asof.direction, AsofSqlDirection::Forward);
558 assert_eq!(asof.left_time_column, "trade_ts");
559 assert_eq!(asof.right_time_column, "quote_ts");
560 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
561 assert_eq!(asof.key_column, "symbol");
562 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
563 } else {
564 panic!("Expected Asof config");
565 }
566 }
567
568 #[test]
569 fn test_asof_is_asof() {
570 let analysis = JoinAnalysis::asof(
571 "a".to_string(),
572 "b".to_string(),
573 "id".to_string(),
574 "id".to_string(),
575 AsofSqlDirection::Backward,
576 "ts".to_string(),
577 "ts".to_string(),
578 None,
579 );
580
581 let config = JoinOperatorConfig::from_analysis(&analysis);
582 assert!(config.is_asof());
583 }
584
585 #[test]
586 fn test_asof_key_accessors() {
587 let analysis = JoinAnalysis::asof(
588 "trades".to_string(),
589 "quotes".to_string(),
590 "sym".to_string(),
591 "sym".to_string(),
592 AsofSqlDirection::Backward,
593 "ts".to_string(),
594 "ts".to_string(),
595 None,
596 );
597
598 let config = JoinOperatorConfig::from_analysis(&analysis);
599 assert_eq!(config.left_key(), "sym");
600 assert_eq!(config.right_key(), "sym");
601 }
602
603 #[test]
606 fn test_from_multi_analysis_single() {
607 let analysis = JoinAnalysis::lookup(
608 "a".to_string(),
609 "b".to_string(),
610 "id".to_string(),
611 "id".to_string(),
612 JoinType::Inner,
613 );
614 let multi = MultiJoinAnalysis {
615 joins: vec![analysis],
616 tables: vec!["a".to_string(), "b".to_string()],
617 };
618
619 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
620 assert_eq!(configs.len(), 1);
621 assert!(configs[0].is_lookup());
622 }
623
624 #[test]
625 fn test_from_multi_analysis_two_lookups() {
626 let j1 = JoinAnalysis::lookup(
627 "a".to_string(),
628 "b".to_string(),
629 "id".to_string(),
630 "a_id".to_string(),
631 JoinType::Inner,
632 );
633 let j2 = JoinAnalysis::lookup(
634 "b".to_string(),
635 "c".to_string(),
636 "id".to_string(),
637 "b_id".to_string(),
638 JoinType::Left,
639 );
640 let multi = MultiJoinAnalysis {
641 joins: vec![j1, j2],
642 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
643 };
644
645 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
646 assert_eq!(configs.len(), 2);
647 assert!(configs[0].is_lookup());
648 assert!(configs[1].is_lookup());
649 assert_eq!(configs[0].left_key(), "id");
650 assert_eq!(configs[1].left_key(), "id");
651 }
652
653 #[test]
654 fn test_from_multi_analysis_mixed_asof_lookup() {
655 let j1 = JoinAnalysis::asof(
656 "trades".to_string(),
657 "quotes".to_string(),
658 "symbol".to_string(),
659 "symbol".to_string(),
660 AsofSqlDirection::Backward,
661 "ts".to_string(),
662 "ts".to_string(),
663 None,
664 );
665 let j2 = JoinAnalysis::lookup(
666 "quotes".to_string(),
667 "products".to_string(),
668 "product_id".to_string(),
669 "id".to_string(),
670 JoinType::Inner,
671 );
672 let multi = MultiJoinAnalysis {
673 joins: vec![j1, j2],
674 tables: vec![
675 "trades".to_string(),
676 "quotes".to_string(),
677 "products".to_string(),
678 ],
679 };
680
681 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
682 assert_eq!(configs.len(), 2);
683 assert!(configs[0].is_asof());
684 assert!(configs[1].is_lookup());
685 }
686
687 #[test]
688 fn test_from_multi_analysis_stream_stream_and_lookup() {
689 let j1 = JoinAnalysis::stream_stream(
690 "orders".to_string(),
691 "payments".to_string(),
692 "id".to_string(),
693 "order_id".to_string(),
694 Duration::from_secs(3600),
695 JoinType::Inner,
696 );
697 let j2 = JoinAnalysis::lookup(
698 "payments".to_string(),
699 "customers".to_string(),
700 "cust_id".to_string(),
701 "id".to_string(),
702 JoinType::Left,
703 );
704 let multi = MultiJoinAnalysis {
705 joins: vec![j1, j2],
706 tables: vec![
707 "orders".to_string(),
708 "payments".to_string(),
709 "customers".to_string(),
710 ],
711 };
712
713 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
714 assert_eq!(configs.len(), 2);
715 assert!(configs[0].is_stream_stream());
716 assert!(configs[1].is_lookup());
717 }
718
719 #[test]
720 fn test_from_multi_analysis_order_preserved() {
721 let j1 = JoinAnalysis::lookup(
722 "a".to_string(),
723 "b".to_string(),
724 "k1".to_string(),
725 "k1".to_string(),
726 JoinType::Inner,
727 );
728 let j2 = JoinAnalysis::stream_stream(
729 "b".to_string(),
730 "c".to_string(),
731 "k2".to_string(),
732 "k2".to_string(),
733 Duration::from_secs(60),
734 JoinType::Left,
735 );
736 let j3 = JoinAnalysis::lookup(
737 "c".to_string(),
738 "d".to_string(),
739 "k3".to_string(),
740 "k3".to_string(),
741 JoinType::Inner,
742 );
743 let multi = MultiJoinAnalysis {
744 joins: vec![j1, j2, j3],
745 tables: vec![
746 "a".to_string(),
747 "b".to_string(),
748 "c".to_string(),
749 "d".to_string(),
750 ],
751 };
752
753 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
754 assert_eq!(configs.len(), 3);
755 assert!(configs[0].is_lookup());
756 assert!(configs[1].is_stream_stream());
757 assert!(configs[2].is_lookup());
758 assert_eq!(configs[0].left_key(), "k1");
759 assert_eq!(configs[1].left_key(), "k2");
760 assert_eq!(configs[2].left_key(), "k3");
761 }
762
763 #[test]
764 fn test_join_types() {
765 let left_analysis = JoinAnalysis::stream_stream(
767 "a".to_string(),
768 "b".to_string(),
769 "id".to_string(),
770 "id".to_string(),
771 Duration::from_secs(60),
772 JoinType::Left,
773 );
774
775 if let JoinOperatorConfig::StreamStream(config) =
776 JoinOperatorConfig::from_analysis(&left_analysis)
777 {
778 assert_eq!(config.join_type, StreamJoinType::Left);
779 }
780
781 let right_analysis = JoinAnalysis::stream_stream(
782 "a".to_string(),
783 "b".to_string(),
784 "id".to_string(),
785 "id".to_string(),
786 Duration::from_secs(60),
787 JoinType::Right,
788 );
789
790 if let JoinOperatorConfig::StreamStream(config) =
791 JoinOperatorConfig::from_analysis(&right_analysis)
792 {
793 assert_eq!(config.join_type, StreamJoinType::Right);
794 }
795
796 let full_analysis = JoinAnalysis::stream_stream(
797 "a".to_string(),
798 "b".to_string(),
799 "id".to_string(),
800 "id".to_string(),
801 Duration::from_secs(60),
802 JoinType::Full,
803 );
804
805 if let JoinOperatorConfig::StreamStream(config) =
806 JoinOperatorConfig::from_analysis(&full_analysis)
807 {
808 assert_eq!(config.join_type, StreamJoinType::Full);
809 }
810 }
811
812 #[test]
813 fn test_display_stream_join() {
814 let mut config = StreamJoinConfig::inner(
815 "order_id".to_string(),
816 "order_id".to_string(),
817 Duration::from_secs(3600),
818 );
819 config.left_table = "orders".to_string();
820 config.right_table = "payments".to_string();
821 config.left_time_column = "ts".to_string();
822 config.right_time_column = "ts".to_string();
823 assert_eq!(
824 format!("{config}"),
825 "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
826 );
827 }
828
829 #[test]
830 fn test_display_lookup_join() {
831 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
832 assert_eq!(
833 format!("{config}"),
834 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
835 );
836 }
837
838 #[test]
839 fn test_display_asof_join() {
840 let analysis = JoinAnalysis::asof(
841 "trades".to_string(),
842 "quotes".to_string(),
843 "symbol".to_string(),
844 "symbol".to_string(),
845 AsofSqlDirection::Backward,
846 "ts".to_string(),
847 "ts".to_string(),
848 Some(Duration::from_secs(5)),
849 );
850 let config = JoinOperatorConfig::from_analysis(&analysis);
851 let s = format!("{config}");
852 assert!(s.contains("ASOF JOIN"), "got: {s}");
853 assert!(s.contains("BACKWARD"), "got: {s}");
854 assert!(s.contains("tolerance: 5s"), "got: {s}");
855 }
856
857 #[test]
858 fn test_display_join_types() {
859 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
860 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
861 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
862 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
863 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
864 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
865 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
866 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
867 }
868
869 #[test]
870 fn test_from_analysis_semi_anti_maps_to_inner() {
871 let semi = JoinAnalysis::stream_stream(
875 "a".to_string(),
876 "b".to_string(),
877 "id".to_string(),
878 "id".to_string(),
879 Duration::from_secs(60),
880 JoinType::LeftSemi,
881 );
882 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
883 assert_eq!(config.join_type, StreamJoinType::Inner);
884 } else {
885 panic!("Expected StreamStream config");
886 }
887
888 let anti = JoinAnalysis::stream_stream(
889 "a".to_string(),
890 "b".to_string(),
891 "id".to_string(),
892 "id".to_string(),
893 Duration::from_secs(60),
894 JoinType::RightAnti,
895 );
896 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
897 assert_eq!(config.join_type, StreamJoinType::Inner);
898 } else {
899 panic!("Expected StreamStream config");
900 }
901 }
902
903 #[test]
904 fn test_from_analysis_temporal() {
905 let analysis = JoinAnalysis::temporal(
906 "orders".to_string(),
907 "products".to_string(),
908 "product_id".to_string(),
909 "id".to_string(),
910 "order_time".to_string(),
911 JoinType::Inner,
912 );
913
914 let config = JoinOperatorConfig::from_analysis(&analysis);
915 assert!(config.is_temporal());
916 assert!(!config.is_asof());
917 assert!(!config.is_lookup());
918 assert!(!config.is_stream_stream());
919 assert_eq!(config.left_key(), "product_id");
920 assert_eq!(config.right_key(), "id");
921
922 if let JoinOperatorConfig::Temporal(tc) = config {
923 assert_eq!(tc.table_version_column, "order_time");
924 assert_eq!(tc.semantics, "event_time");
925 assert_eq!(tc.join_type, "inner");
926 } else {
927 panic!("Expected Temporal config");
928 }
929 }
930
931 #[test]
932 fn test_temporal_left_join() {
933 let analysis = JoinAnalysis::temporal(
934 "orders".to_string(),
935 "products".to_string(),
936 "product_id".to_string(),
937 "id".to_string(),
938 "order_time".to_string(),
939 JoinType::Left,
940 );
941
942 let config = JoinOperatorConfig::from_analysis(&analysis);
943 if let JoinOperatorConfig::Temporal(tc) = config {
944 assert_eq!(tc.join_type, "left");
945 } else {
946 panic!("Expected Temporal config");
947 }
948 }
949
950 #[test]
951 fn test_display_temporal_join() {
952 let analysis = JoinAnalysis::temporal(
953 "orders".to_string(),
954 "products".to_string(),
955 "product_id".to_string(),
956 "id".to_string(),
957 "order_time".to_string(),
958 JoinType::Inner,
959 );
960 let config = JoinOperatorConfig::from_analysis(&analysis);
961 let s = format!("{config}");
962 assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
963 assert!(s.contains("order_time"), "got: {s}");
964 }
965}