1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub left_time_column: String,
19 pub right_time_column: String,
21 pub left_table: String,
23 pub right_table: String,
25 pub time_bound: Duration,
27 pub join_type: StreamJoinType,
29}
30
31#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34 pub stream_key: String,
36 pub lookup_key: String,
38 pub join_type: LookupJoinType,
40 pub cache_ttl: Duration,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47 Inner,
49 Left,
51 Right,
53 Full,
55 LeftSemi,
57 LeftAnti,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum LookupJoinType {
64 Inner,
66 Left,
68}
69
70#[derive(Debug, Clone)]
72pub struct TemporalJoinTranslatorConfig {
73 pub stream_table: String,
75 pub table_name: String,
77 pub stream_key_column: String,
79 pub table_key_column: String,
81 pub stream_time_column: String,
83 pub table_version_column: String,
85 pub semantics: String,
87 pub join_type: String,
89}
90
91#[derive(Debug, Clone)]
93pub struct AsofJoinTranslatorConfig {
94 pub left_table: String,
96 pub right_table: String,
98 pub key_column: String,
100 pub left_time_column: String,
102 pub right_time_column: String,
104 pub direction: AsofSqlDirection,
106 pub tolerance: Option<Duration>,
108 pub join_type: AsofSqlJoinType,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum AsofSqlJoinType {
115 Inner,
117 Left,
119}
120
121#[derive(Debug, Clone)]
123pub enum JoinOperatorConfig {
124 StreamStream(StreamJoinConfig),
126 Lookup(LookupJoinConfig),
128 Asof(AsofJoinTranslatorConfig),
130 Temporal(TemporalJoinTranslatorConfig),
132 TemporalProbe(super::temporal_probe::TemporalProbeConfig),
134}
135
136impl std::fmt::Display for StreamJoinType {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self {
139 StreamJoinType::Inner => write!(f, "INNER"),
140 StreamJoinType::Left => write!(f, "LEFT"),
141 StreamJoinType::Right => write!(f, "RIGHT"),
142 StreamJoinType::Full => write!(f, "FULL"),
143 StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
144 StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
145 }
146 }
147}
148
149impl std::fmt::Display for LookupJoinType {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 match self {
152 LookupJoinType::Inner => write!(f, "INNER"),
153 LookupJoinType::Left => write!(f, "LEFT"),
154 }
155 }
156}
157
158impl std::fmt::Display for AsofSqlJoinType {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 match self {
161 AsofSqlJoinType::Inner => write!(f, "INNER"),
162 AsofSqlJoinType::Left => write!(f, "LEFT"),
163 }
164 }
165}
166
167impl std::fmt::Display for StreamJoinConfig {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 write!(
170 f,
171 "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
172 self.join_type,
173 self.left_table,
174 self.left_key,
175 self.right_table,
176 self.right_key,
177 self.time_bound.as_secs(),
178 self.left_time_column,
179 self.right_time_column,
180 )
181 }
182}
183
184impl std::fmt::Display for LookupJoinConfig {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 write!(
187 f,
188 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
189 self.join_type,
190 self.stream_key,
191 self.lookup_key,
192 self.cache_ttl.as_secs()
193 )
194 }
195}
196
197impl std::fmt::Display for AsofJoinTranslatorConfig {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 write!(
200 f,
201 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
202 self.join_type,
203 self.left_table,
204 self.key_column,
205 self.right_table,
206 self.key_column,
207 self.direction,
208 self.left_table,
209 self.left_time_column,
210 self.right_table,
211 self.right_time_column,
212 )?;
213 if let Some(tol) = self.tolerance {
214 write!(f, ", tolerance: {}s", tol.as_secs())?;
215 }
216 write!(f, ")")
217 }
218}
219
220impl std::fmt::Display for TemporalJoinTranslatorConfig {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 write!(
223 f,
224 "{} TEMPORAL JOIN ON stream.{} = table.{} (AS OF: {}, {})",
225 self.join_type.to_uppercase(),
226 self.stream_key_column,
227 self.table_key_column,
228 self.stream_time_column,
229 self.semantics,
230 )
231 }
232}
233
234impl std::fmt::Display for JoinOperatorConfig {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
238 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
239 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
240 JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
241 JoinOperatorConfig::TemporalProbe(c) => write!(f, "{c}"),
242 }
243 }
244}
245
246impl JoinOperatorConfig {
247 #[must_use]
249 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
250 if analysis.is_temporal_join {
251 let join_type_str = match analysis.join_type {
252 JoinType::Left => "left",
253 _ => "inner",
254 };
255 let stream_time_col = analysis.temporal_version_column.clone().unwrap_or_default();
256 return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
258 stream_table: analysis.left_table.clone(),
259 table_name: analysis.right_table.clone(),
260 stream_key_column: analysis.left_key_column.clone(),
261 table_key_column: analysis.right_key_column.clone(),
262 stream_time_column: stream_time_col,
263 table_version_column: String::new(),
264 semantics: "event_time".to_string(),
265 join_type: join_type_str.to_string(),
266 });
267 }
268
269 if analysis.is_asof_join {
270 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
271 left_table: analysis.left_table.clone(),
272 right_table: analysis.right_table.clone(),
273 key_column: analysis.left_key_column.clone(),
274 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
275 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
276 direction: analysis
277 .asof_direction
278 .unwrap_or(AsofSqlDirection::Backward),
279 tolerance: analysis.asof_tolerance,
280 join_type: if analysis.join_type == JoinType::Inner {
281 AsofSqlJoinType::Inner
282 } else {
283 AsofSqlJoinType::Left
284 },
285 });
286 }
287
288 if analysis.is_lookup_join {
289 JoinOperatorConfig::Lookup(LookupJoinConfig {
290 stream_key: analysis.left_key_column.clone(),
291 lookup_key: analysis.right_key_column.clone(),
292 join_type: match analysis.join_type {
293 JoinType::Inner => LookupJoinType::Inner,
294 _ => LookupJoinType::Left,
295 },
296 cache_ttl: Duration::from_secs(300), })
298 } else {
299 JoinOperatorConfig::StreamStream(StreamJoinConfig {
300 left_key: analysis.left_key_column.clone(),
301 right_key: analysis.right_key_column.clone(),
302 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
303 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
304 left_table: analysis.left_table.clone(),
305 right_table: analysis.right_table.clone(),
306 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
307 join_type: match analysis.join_type {
308 JoinType::Inner | JoinType::RightSemi | JoinType::RightAnti => {
311 StreamJoinType::Inner
312 }
313 JoinType::LeftSemi => StreamJoinType::LeftSemi,
314 JoinType::LeftAnti => StreamJoinType::LeftAnti,
315 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
316 JoinType::Right => StreamJoinType::Right,
317 JoinType::Full => StreamJoinType::Full,
318 },
319 })
320 }
321 }
322
323 #[must_use]
325 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
326 multi.joins.iter().map(Self::from_analysis).collect()
327 }
328
329 #[must_use]
331 pub fn is_stream_stream(&self) -> bool {
332 matches!(self, JoinOperatorConfig::StreamStream(_))
333 }
334
335 #[must_use]
337 pub fn is_lookup(&self) -> bool {
338 matches!(self, JoinOperatorConfig::Lookup(_))
339 }
340
341 #[must_use]
343 pub fn is_asof(&self) -> bool {
344 matches!(self, JoinOperatorConfig::Asof(_))
345 }
346
347 #[must_use]
349 pub fn is_temporal(&self) -> bool {
350 matches!(self, JoinOperatorConfig::Temporal(_))
351 }
352
353 #[must_use]
355 pub fn left_key(&self) -> &str {
356 match self {
357 JoinOperatorConfig::StreamStream(config) => &config.left_key,
358 JoinOperatorConfig::Lookup(config) => &config.stream_key,
359 JoinOperatorConfig::Asof(config) => &config.key_column,
360 JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
361 JoinOperatorConfig::TemporalProbe(config) => {
362 config.key_columns.first().map_or("", String::as_str)
363 }
364 }
365 }
366
367 #[must_use]
369 pub fn right_key(&self) -> &str {
370 match self {
371 JoinOperatorConfig::StreamStream(config) => &config.right_key,
372 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
373 JoinOperatorConfig::Asof(config) => &config.key_column,
374 JoinOperatorConfig::Temporal(config) => &config.table_key_column,
375 JoinOperatorConfig::TemporalProbe(config) => {
376 config.key_columns.first().map_or("", String::as_str)
377 }
378 }
379 }
380}
381
382impl StreamJoinConfig {
383 #[must_use]
385 pub fn new(
386 left_key: String,
387 right_key: String,
388 time_bound: Duration,
389 join_type: StreamJoinType,
390 ) -> Self {
391 Self {
392 left_key,
393 right_key,
394 left_time_column: String::new(),
395 right_time_column: String::new(),
396 left_table: String::new(),
397 right_table: String::new(),
398 time_bound,
399 join_type,
400 }
401 }
402
403 #[must_use]
405 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
406 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
407 }
408
409 #[must_use]
411 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
412 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
413 }
414}
415
416impl LookupJoinConfig {
417 #[must_use]
419 pub fn new(
420 stream_key: String,
421 lookup_key: String,
422 join_type: LookupJoinType,
423 cache_ttl: Duration,
424 ) -> Self {
425 Self {
426 stream_key,
427 lookup_key,
428 join_type,
429 cache_ttl,
430 }
431 }
432
433 #[must_use]
435 pub fn inner(stream_key: String, lookup_key: String) -> Self {
436 Self::new(
437 stream_key,
438 lookup_key,
439 LookupJoinType::Inner,
440 Duration::from_secs(300),
441 )
442 }
443
444 #[must_use]
446 pub fn left(stream_key: String, lookup_key: String) -> Self {
447 Self::new(
448 stream_key,
449 lookup_key,
450 LookupJoinType::Left,
451 Duration::from_secs(300),
452 )
453 }
454
455 #[must_use]
457 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
458 self.cache_ttl = ttl;
459 self
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_stream_join_config() {
469 let config = StreamJoinConfig::inner(
470 "order_id".to_string(),
471 "order_id".to_string(),
472 Duration::from_secs(3600),
473 );
474
475 assert_eq!(config.left_key, "order_id");
476 assert_eq!(config.right_key, "order_id");
477 assert_eq!(config.time_bound, Duration::from_secs(3600));
478 assert_eq!(config.join_type, StreamJoinType::Inner);
479 }
480
481 #[test]
482 fn test_lookup_join_config() {
483 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
484 .with_cache_ttl(Duration::from_secs(600));
485
486 assert_eq!(config.stream_key, "customer_id");
487 assert_eq!(config.lookup_key, "id");
488 assert_eq!(config.cache_ttl, Duration::from_secs(600));
489 assert_eq!(config.join_type, LookupJoinType::Inner);
490 }
491
492 #[test]
493 fn test_from_analysis_lookup() {
494 let analysis = JoinAnalysis::lookup(
495 "orders".to_string(),
496 "customers".to_string(),
497 "customer_id".to_string(),
498 "id".to_string(),
499 JoinType::Inner,
500 );
501
502 let config = JoinOperatorConfig::from_analysis(&analysis);
503
504 assert!(config.is_lookup());
505 assert!(!config.is_stream_stream());
506 assert_eq!(config.left_key(), "customer_id");
507 assert_eq!(config.right_key(), "id");
508 }
509
510 #[test]
511 fn test_from_analysis_stream_stream() {
512 let analysis = JoinAnalysis::stream_stream(
513 "orders".to_string(),
514 "payments".to_string(),
515 "order_id".to_string(),
516 "order_id".to_string(),
517 Duration::from_secs(3600),
518 JoinType::Inner,
519 );
520
521 let config = JoinOperatorConfig::from_analysis(&analysis);
522
523 assert!(config.is_stream_stream());
524 assert!(!config.is_lookup());
525
526 if let JoinOperatorConfig::StreamStream(stream_config) = config {
527 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
528 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
529 }
530 }
531
532 #[test]
533 fn test_from_analysis_asof() {
534 let analysis = JoinAnalysis::asof(
535 "trades".to_string(),
536 "quotes".to_string(),
537 "symbol".to_string(),
538 "symbol".to_string(),
539 AsofSqlDirection::Backward,
540 "ts".to_string(),
541 "ts".to_string(),
542 Some(Duration::from_secs(5)),
543 );
544
545 let config = JoinOperatorConfig::from_analysis(&analysis);
546 assert!(config.is_asof());
547 assert!(!config.is_stream_stream());
548 assert!(!config.is_lookup());
549 }
550
551 #[test]
552 fn test_asof_config_fields() {
553 let analysis = JoinAnalysis::asof(
554 "trades".to_string(),
555 "quotes".to_string(),
556 "symbol".to_string(),
557 "symbol".to_string(),
558 AsofSqlDirection::Forward,
559 "trade_ts".to_string(),
560 "quote_ts".to_string(),
561 Some(Duration::from_millis(5000)),
562 );
563
564 let config = JoinOperatorConfig::from_analysis(&analysis);
565 if let JoinOperatorConfig::Asof(asof) = config {
566 assert_eq!(asof.direction, AsofSqlDirection::Forward);
567 assert_eq!(asof.left_time_column, "trade_ts");
568 assert_eq!(asof.right_time_column, "quote_ts");
569 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
570 assert_eq!(asof.key_column, "symbol");
571 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
572 } else {
573 panic!("Expected Asof config");
574 }
575 }
576
577 #[test]
578 fn test_asof_is_asof() {
579 let analysis = JoinAnalysis::asof(
580 "a".to_string(),
581 "b".to_string(),
582 "id".to_string(),
583 "id".to_string(),
584 AsofSqlDirection::Backward,
585 "ts".to_string(),
586 "ts".to_string(),
587 None,
588 );
589
590 let config = JoinOperatorConfig::from_analysis(&analysis);
591 assert!(config.is_asof());
592 }
593
594 #[test]
595 fn test_asof_key_accessors() {
596 let analysis = JoinAnalysis::asof(
597 "trades".to_string(),
598 "quotes".to_string(),
599 "sym".to_string(),
600 "sym".to_string(),
601 AsofSqlDirection::Backward,
602 "ts".to_string(),
603 "ts".to_string(),
604 None,
605 );
606
607 let config = JoinOperatorConfig::from_analysis(&analysis);
608 assert_eq!(config.left_key(), "sym");
609 assert_eq!(config.right_key(), "sym");
610 }
611
612 #[test]
615 fn test_from_multi_analysis_single() {
616 let analysis = JoinAnalysis::lookup(
617 "a".to_string(),
618 "b".to_string(),
619 "id".to_string(),
620 "id".to_string(),
621 JoinType::Inner,
622 );
623 let multi = MultiJoinAnalysis {
624 joins: vec![analysis],
625 tables: vec!["a".to_string(), "b".to_string()],
626 };
627
628 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
629 assert_eq!(configs.len(), 1);
630 assert!(configs[0].is_lookup());
631 }
632
633 #[test]
634 fn test_from_multi_analysis_two_lookups() {
635 let j1 = JoinAnalysis::lookup(
636 "a".to_string(),
637 "b".to_string(),
638 "id".to_string(),
639 "a_id".to_string(),
640 JoinType::Inner,
641 );
642 let j2 = JoinAnalysis::lookup(
643 "b".to_string(),
644 "c".to_string(),
645 "id".to_string(),
646 "b_id".to_string(),
647 JoinType::Left,
648 );
649 let multi = MultiJoinAnalysis {
650 joins: vec![j1, j2],
651 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
652 };
653
654 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
655 assert_eq!(configs.len(), 2);
656 assert!(configs[0].is_lookup());
657 assert!(configs[1].is_lookup());
658 assert_eq!(configs[0].left_key(), "id");
659 assert_eq!(configs[1].left_key(), "id");
660 }
661
662 #[test]
663 fn test_from_multi_analysis_mixed_asof_lookup() {
664 let j1 = JoinAnalysis::asof(
665 "trades".to_string(),
666 "quotes".to_string(),
667 "symbol".to_string(),
668 "symbol".to_string(),
669 AsofSqlDirection::Backward,
670 "ts".to_string(),
671 "ts".to_string(),
672 None,
673 );
674 let j2 = JoinAnalysis::lookup(
675 "quotes".to_string(),
676 "products".to_string(),
677 "product_id".to_string(),
678 "id".to_string(),
679 JoinType::Inner,
680 );
681 let multi = MultiJoinAnalysis {
682 joins: vec![j1, j2],
683 tables: vec![
684 "trades".to_string(),
685 "quotes".to_string(),
686 "products".to_string(),
687 ],
688 };
689
690 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
691 assert_eq!(configs.len(), 2);
692 assert!(configs[0].is_asof());
693 assert!(configs[1].is_lookup());
694 }
695
696 #[test]
697 fn test_from_multi_analysis_stream_stream_and_lookup() {
698 let j1 = JoinAnalysis::stream_stream(
699 "orders".to_string(),
700 "payments".to_string(),
701 "id".to_string(),
702 "order_id".to_string(),
703 Duration::from_secs(3600),
704 JoinType::Inner,
705 );
706 let j2 = JoinAnalysis::lookup(
707 "payments".to_string(),
708 "customers".to_string(),
709 "cust_id".to_string(),
710 "id".to_string(),
711 JoinType::Left,
712 );
713 let multi = MultiJoinAnalysis {
714 joins: vec![j1, j2],
715 tables: vec![
716 "orders".to_string(),
717 "payments".to_string(),
718 "customers".to_string(),
719 ],
720 };
721
722 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
723 assert_eq!(configs.len(), 2);
724 assert!(configs[0].is_stream_stream());
725 assert!(configs[1].is_lookup());
726 }
727
728 #[test]
729 fn test_from_multi_analysis_order_preserved() {
730 let j1 = JoinAnalysis::lookup(
731 "a".to_string(),
732 "b".to_string(),
733 "k1".to_string(),
734 "k1".to_string(),
735 JoinType::Inner,
736 );
737 let j2 = JoinAnalysis::stream_stream(
738 "b".to_string(),
739 "c".to_string(),
740 "k2".to_string(),
741 "k2".to_string(),
742 Duration::from_secs(60),
743 JoinType::Left,
744 );
745 let j3 = JoinAnalysis::lookup(
746 "c".to_string(),
747 "d".to_string(),
748 "k3".to_string(),
749 "k3".to_string(),
750 JoinType::Inner,
751 );
752 let multi = MultiJoinAnalysis {
753 joins: vec![j1, j2, j3],
754 tables: vec![
755 "a".to_string(),
756 "b".to_string(),
757 "c".to_string(),
758 "d".to_string(),
759 ],
760 };
761
762 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
763 assert_eq!(configs.len(), 3);
764 assert!(configs[0].is_lookup());
765 assert!(configs[1].is_stream_stream());
766 assert!(configs[2].is_lookup());
767 assert_eq!(configs[0].left_key(), "k1");
768 assert_eq!(configs[1].left_key(), "k2");
769 assert_eq!(configs[2].left_key(), "k3");
770 }
771
772 #[test]
773 fn test_join_types() {
774 let left_analysis = JoinAnalysis::stream_stream(
776 "a".to_string(),
777 "b".to_string(),
778 "id".to_string(),
779 "id".to_string(),
780 Duration::from_secs(60),
781 JoinType::Left,
782 );
783
784 if let JoinOperatorConfig::StreamStream(config) =
785 JoinOperatorConfig::from_analysis(&left_analysis)
786 {
787 assert_eq!(config.join_type, StreamJoinType::Left);
788 }
789
790 let right_analysis = JoinAnalysis::stream_stream(
791 "a".to_string(),
792 "b".to_string(),
793 "id".to_string(),
794 "id".to_string(),
795 Duration::from_secs(60),
796 JoinType::Right,
797 );
798
799 if let JoinOperatorConfig::StreamStream(config) =
800 JoinOperatorConfig::from_analysis(&right_analysis)
801 {
802 assert_eq!(config.join_type, StreamJoinType::Right);
803 }
804
805 let full_analysis = JoinAnalysis::stream_stream(
806 "a".to_string(),
807 "b".to_string(),
808 "id".to_string(),
809 "id".to_string(),
810 Duration::from_secs(60),
811 JoinType::Full,
812 );
813
814 if let JoinOperatorConfig::StreamStream(config) =
815 JoinOperatorConfig::from_analysis(&full_analysis)
816 {
817 assert_eq!(config.join_type, StreamJoinType::Full);
818 }
819 }
820
821 #[test]
822 fn test_display_stream_join() {
823 let mut config = StreamJoinConfig::inner(
824 "order_id".to_string(),
825 "order_id".to_string(),
826 Duration::from_secs(3600),
827 );
828 config.left_table = "orders".to_string();
829 config.right_table = "payments".to_string();
830 config.left_time_column = "ts".to_string();
831 config.right_time_column = "ts".to_string();
832 assert_eq!(
833 format!("{config}"),
834 "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
835 );
836 }
837
838 #[test]
839 fn test_display_lookup_join() {
840 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
841 assert_eq!(
842 format!("{config}"),
843 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
844 );
845 }
846
847 #[test]
848 fn test_display_asof_join() {
849 let analysis = JoinAnalysis::asof(
850 "trades".to_string(),
851 "quotes".to_string(),
852 "symbol".to_string(),
853 "symbol".to_string(),
854 AsofSqlDirection::Backward,
855 "ts".to_string(),
856 "ts".to_string(),
857 Some(Duration::from_secs(5)),
858 );
859 let config = JoinOperatorConfig::from_analysis(&analysis);
860 let s = format!("{config}");
861 assert!(s.contains("ASOF JOIN"), "got: {s}");
862 assert!(s.contains("BACKWARD"), "got: {s}");
863 assert!(s.contains("tolerance: 5s"), "got: {s}");
864 }
865
866 #[test]
867 fn test_display_join_types() {
868 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
869 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
870 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
871 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
872 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
873 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
874 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
875 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
876 }
877
878 #[test]
879 fn test_from_analysis_semi_anti() {
880 let semi = JoinAnalysis::stream_stream(
881 "a".to_string(),
882 "b".to_string(),
883 "id".to_string(),
884 "id".to_string(),
885 Duration::from_secs(60),
886 JoinType::LeftSemi,
887 );
888 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
889 assert_eq!(config.join_type, StreamJoinType::LeftSemi);
890 } else {
891 panic!("Expected StreamStream config");
892 }
893
894 let anti = JoinAnalysis::stream_stream(
895 "a".to_string(),
896 "b".to_string(),
897 "id".to_string(),
898 "id".to_string(),
899 Duration::from_secs(60),
900 JoinType::LeftAnti,
901 );
902 if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
903 assert_eq!(config.join_type, StreamJoinType::LeftAnti);
904 } else {
905 panic!("Expected StreamStream config");
906 }
907
908 let right_anti = JoinAnalysis::stream_stream(
910 "a".to_string(),
911 "b".to_string(),
912 "id".to_string(),
913 "id".to_string(),
914 Duration::from_secs(60),
915 JoinType::RightAnti,
916 );
917 if let JoinOperatorConfig::StreamStream(config) =
918 JoinOperatorConfig::from_analysis(&right_anti)
919 {
920 assert_eq!(config.join_type, StreamJoinType::Inner);
921 } else {
922 panic!("Expected StreamStream config");
923 }
924 }
925
926 #[test]
927 fn test_from_analysis_temporal() {
928 let analysis = JoinAnalysis::temporal(
929 "orders".to_string(),
930 "products".to_string(),
931 "product_id".to_string(),
932 "id".to_string(),
933 "order_time".to_string(),
934 JoinType::Inner,
935 );
936
937 let config = JoinOperatorConfig::from_analysis(&analysis);
938 assert!(config.is_temporal());
939 assert!(!config.is_asof());
940 assert!(!config.is_lookup());
941 assert!(!config.is_stream_stream());
942 assert_eq!(config.left_key(), "product_id");
943 assert_eq!(config.right_key(), "id");
944
945 if let JoinOperatorConfig::Temporal(tc) = config {
946 assert!(tc.table_version_column.is_empty());
947 assert_eq!(tc.stream_time_column, "order_time");
948 assert_eq!(tc.semantics, "event_time");
949 assert_eq!(tc.join_type, "inner");
950 } else {
951 panic!("Expected Temporal config");
952 }
953 }
954
955 #[test]
956 fn test_temporal_left_join() {
957 let analysis = JoinAnalysis::temporal(
958 "orders".to_string(),
959 "products".to_string(),
960 "product_id".to_string(),
961 "id".to_string(),
962 "order_time".to_string(),
963 JoinType::Left,
964 );
965
966 let config = JoinOperatorConfig::from_analysis(&analysis);
967 if let JoinOperatorConfig::Temporal(tc) = config {
968 assert_eq!(tc.join_type, "left");
969 } else {
970 panic!("Expected Temporal config");
971 }
972 }
973
974 #[test]
975 fn test_display_temporal_join() {
976 let analysis = JoinAnalysis::temporal(
977 "orders".to_string(),
978 "products".to_string(),
979 "product_id".to_string(),
980 "id".to_string(),
981 "order_time".to_string(),
982 JoinType::Inner,
983 );
984 let config = JoinOperatorConfig::from_analysis(&analysis);
985 let s = format!("{config}");
986 assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
987 assert!(s.contains("order_time"), "got: {s}");
988 }
989}