Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Time bound for joining (max time difference between events)
18    pub time_bound: Duration,
19    /// Join type
20    pub join_type: StreamJoinType,
21}
22
23/// Configuration for lookup join operator
24#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26    /// Stream side key column
27    pub stream_key: String,
28    /// Lookup table key column
29    pub lookup_key: String,
30    /// Join type
31    pub join_type: LookupJoinType,
32    /// Cache TTL for lookup results
33    pub cache_ttl: Duration,
34}
35
36/// Stream-stream join types
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39    /// Both sides required
40    Inner,
41    /// Left side always emitted
42    Left,
43    /// Right side always emitted
44    Right,
45    /// Both sides always emitted
46    Full,
47    /// Left rows with at least one match (left columns only)
48    LeftSemi,
49    /// Left rows with no match (left columns only)
50    LeftAnti,
51    /// Right rows with at least one match (right columns only)
52    RightSemi,
53    /// Right rows with no match (right columns only)
54    RightAnti,
55}
56
57/// Lookup join types
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60    /// Stream event required, lookup optional
61    Inner,
62    /// Stream event always emitted, lookup optional
63    Left,
64}
65
66/// Configuration for temporal join operator (FOR SYSTEM_TIME AS OF).
67#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69    /// Stream side key column
70    pub stream_key_column: String,
71    /// Table side key column
72    pub table_key_column: String,
73    /// Version column from FOR SYSTEM_TIME AS OF
74    pub table_version_column: String,
75    /// Temporal semantics: "event_time" or "process_time"
76    pub semantics: String,
77    /// Join type: "inner" or "left"
78    pub join_type: String,
79}
80
81/// Configuration for ASOF join operator
82#[derive(Debug, Clone)]
83pub struct AsofJoinTranslatorConfig {
84    /// Left side table name
85    pub left_table: String,
86    /// Right side table name
87    pub right_table: String,
88    /// Key column for partitioning (e.g., symbol)
89    pub key_column: String,
90    /// Left side time column
91    pub left_time_column: String,
92    /// Right side time column
93    pub right_time_column: String,
94    /// Join direction (Backward or Forward)
95    pub direction: AsofSqlDirection,
96    /// Maximum allowed time difference
97    pub tolerance: Option<Duration>,
98    /// ASOF join type (Inner or Left)
99    pub join_type: AsofSqlJoinType,
100}
101
102/// ASOF join type
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum AsofSqlJoinType {
105    /// Both sides required
106    Inner,
107    /// Left side always emitted
108    Left,
109}
110
111/// Union type for join operator configurations
112#[derive(Debug, Clone)]
113pub enum JoinOperatorConfig {
114    /// Stream-stream join
115    StreamStream(StreamJoinConfig),
116    /// Lookup join
117    Lookup(LookupJoinConfig),
118    /// ASOF join
119    Asof(AsofJoinTranslatorConfig),
120    /// Temporal join (FOR SYSTEM_TIME AS OF)
121    Temporal(TemporalJoinTranslatorConfig),
122}
123
124impl std::fmt::Display for StreamJoinType {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            StreamJoinType::Inner => write!(f, "INNER"),
128            StreamJoinType::Left => write!(f, "LEFT"),
129            StreamJoinType::Right => write!(f, "RIGHT"),
130            StreamJoinType::Full => write!(f, "FULL"),
131            StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
132            StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
133            StreamJoinType::RightSemi => write!(f, "RIGHT SEMI"),
134            StreamJoinType::RightAnti => write!(f, "RIGHT ANTI"),
135        }
136    }
137}
138
139impl std::fmt::Display for LookupJoinType {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        match self {
142            LookupJoinType::Inner => write!(f, "INNER"),
143            LookupJoinType::Left => write!(f, "LEFT"),
144        }
145    }
146}
147
148impl std::fmt::Display for AsofSqlJoinType {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        match self {
151            AsofSqlJoinType::Inner => write!(f, "INNER"),
152            AsofSqlJoinType::Left => write!(f, "LEFT"),
153        }
154    }
155}
156
157impl std::fmt::Display for StreamJoinConfig {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        write!(
160            f,
161            "{} JOIN ON left.{} = right.{} (bound: {}s)",
162            self.join_type,
163            self.left_key,
164            self.right_key,
165            self.time_bound.as_secs()
166        )
167    }
168}
169
170impl std::fmt::Display for LookupJoinConfig {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        write!(
173            f,
174            "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
175            self.join_type,
176            self.stream_key,
177            self.lookup_key,
178            self.cache_ttl.as_secs()
179        )
180    }
181}
182
183impl std::fmt::Display for AsofJoinTranslatorConfig {
184    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185        write!(
186            f,
187            "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
188            self.join_type,
189            self.left_table,
190            self.key_column,
191            self.right_table,
192            self.key_column,
193            self.direction,
194            self.left_table,
195            self.left_time_column,
196            self.right_table,
197            self.right_time_column,
198        )?;
199        if let Some(tol) = self.tolerance {
200            write!(f, ", tolerance: {}s", tol.as_secs())?;
201        }
202        write!(f, ")")
203    }
204}
205
206impl std::fmt::Display for TemporalJoinTranslatorConfig {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        write!(
209            f,
210            "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
211            self.join_type.to_uppercase(),
212            self.stream_key_column,
213            self.table_key_column,
214            self.table_version_column,
215            self.semantics,
216        )
217    }
218}
219
220impl std::fmt::Display for JoinOperatorConfig {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        match self {
223            JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
224            JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
225            JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
226            JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
227        }
228    }
229}
230
231impl JoinOperatorConfig {
232    /// Create from join analysis.
233    #[must_use]
234    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
235        if analysis.is_temporal_join {
236            let join_type_str = match analysis.join_type {
237                JoinType::Left => "left",
238                _ => "inner",
239            };
240            return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
241                stream_key_column: analysis.left_key_column.clone(),
242                table_key_column: analysis.right_key_column.clone(),
243                table_version_column: analysis.temporal_version_column.clone().unwrap_or_default(),
244                semantics: "event_time".to_string(),
245                join_type: join_type_str.to_string(),
246            });
247        }
248
249        if analysis.is_asof_join {
250            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
251                left_table: analysis.left_table.clone(),
252                right_table: analysis.right_table.clone(),
253                key_column: analysis.left_key_column.clone(),
254                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
255                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
256                direction: analysis
257                    .asof_direction
258                    .unwrap_or(AsofSqlDirection::Backward),
259                tolerance: analysis.asof_tolerance,
260                join_type: AsofSqlJoinType::Left, // ASOF is always left-style
261            });
262        }
263
264        if analysis.is_lookup_join {
265            JoinOperatorConfig::Lookup(LookupJoinConfig {
266                stream_key: analysis.left_key_column.clone(),
267                lookup_key: analysis.right_key_column.clone(),
268                join_type: match analysis.join_type {
269                    JoinType::Inner => LookupJoinType::Inner,
270                    _ => LookupJoinType::Left,
271                },
272                cache_ttl: Duration::from_secs(300), // Default 5 min
273            })
274        } else {
275            JoinOperatorConfig::StreamStream(StreamJoinConfig {
276                left_key: analysis.left_key_column.clone(),
277                right_key: analysis.right_key_column.clone(),
278                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
279                join_type: match analysis.join_type {
280                    JoinType::Inner => StreamJoinType::Inner,
281                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
282                    JoinType::Right => StreamJoinType::Right,
283                    JoinType::Full => StreamJoinType::Full,
284                    JoinType::LeftSemi => StreamJoinType::LeftSemi,
285                    JoinType::LeftAnti => StreamJoinType::LeftAnti,
286                    JoinType::RightSemi => StreamJoinType::RightSemi,
287                    JoinType::RightAnti => StreamJoinType::RightAnti,
288                },
289            })
290        }
291    }
292
293    /// Create from a multi-join analysis, producing one config per join step.
294    #[must_use]
295    pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
296        multi.joins.iter().map(Self::from_analysis).collect()
297    }
298
299    /// Check if this is a stream-stream join.
300    #[must_use]
301    pub fn is_stream_stream(&self) -> bool {
302        matches!(self, JoinOperatorConfig::StreamStream(_))
303    }
304
305    /// Check if this is a lookup join.
306    #[must_use]
307    pub fn is_lookup(&self) -> bool {
308        matches!(self, JoinOperatorConfig::Lookup(_))
309    }
310
311    /// Check if this is an ASOF join.
312    #[must_use]
313    pub fn is_asof(&self) -> bool {
314        matches!(self, JoinOperatorConfig::Asof(_))
315    }
316
317    /// Check if this is a temporal join.
318    #[must_use]
319    pub fn is_temporal(&self) -> bool {
320        matches!(self, JoinOperatorConfig::Temporal(_))
321    }
322
323    /// Get the left key column name.
324    #[must_use]
325    pub fn left_key(&self) -> &str {
326        match self {
327            JoinOperatorConfig::StreamStream(config) => &config.left_key,
328            JoinOperatorConfig::Lookup(config) => &config.stream_key,
329            JoinOperatorConfig::Asof(config) => &config.key_column,
330            JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
331        }
332    }
333
334    /// Get the right key column name.
335    #[must_use]
336    pub fn right_key(&self) -> &str {
337        match self {
338            JoinOperatorConfig::StreamStream(config) => &config.right_key,
339            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
340            JoinOperatorConfig::Asof(config) => &config.key_column,
341            JoinOperatorConfig::Temporal(config) => &config.table_key_column,
342        }
343    }
344}
345
346impl StreamJoinConfig {
347    /// Create a new stream-stream join configuration.
348    #[must_use]
349    pub fn new(
350        left_key: String,
351        right_key: String,
352        time_bound: Duration,
353        join_type: StreamJoinType,
354    ) -> Self {
355        Self {
356            left_key,
357            right_key,
358            time_bound,
359            join_type,
360        }
361    }
362
363    /// Create an inner join configuration.
364    #[must_use]
365    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
366        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
367    }
368
369    /// Create a left join configuration.
370    #[must_use]
371    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
372        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
373    }
374}
375
376impl LookupJoinConfig {
377    /// Create a new lookup join configuration.
378    #[must_use]
379    pub fn new(
380        stream_key: String,
381        lookup_key: String,
382        join_type: LookupJoinType,
383        cache_ttl: Duration,
384    ) -> Self {
385        Self {
386            stream_key,
387            lookup_key,
388            join_type,
389            cache_ttl,
390        }
391    }
392
393    /// Create an inner lookup join configuration.
394    #[must_use]
395    pub fn inner(stream_key: String, lookup_key: String) -> Self {
396        Self::new(
397            stream_key,
398            lookup_key,
399            LookupJoinType::Inner,
400            Duration::from_secs(300),
401        )
402    }
403
404    /// Create a left lookup join configuration.
405    #[must_use]
406    pub fn left(stream_key: String, lookup_key: String) -> Self {
407        Self::new(
408            stream_key,
409            lookup_key,
410            LookupJoinType::Left,
411            Duration::from_secs(300),
412        )
413    }
414
415    /// Set the cache TTL.
416    #[must_use]
417    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
418        self.cache_ttl = ttl;
419        self
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426
427    #[test]
428    fn test_stream_join_config() {
429        let config = StreamJoinConfig::inner(
430            "order_id".to_string(),
431            "order_id".to_string(),
432            Duration::from_secs(3600),
433        );
434
435        assert_eq!(config.left_key, "order_id");
436        assert_eq!(config.right_key, "order_id");
437        assert_eq!(config.time_bound, Duration::from_secs(3600));
438        assert_eq!(config.join_type, StreamJoinType::Inner);
439    }
440
441    #[test]
442    fn test_lookup_join_config() {
443        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
444            .with_cache_ttl(Duration::from_secs(600));
445
446        assert_eq!(config.stream_key, "customer_id");
447        assert_eq!(config.lookup_key, "id");
448        assert_eq!(config.cache_ttl, Duration::from_secs(600));
449        assert_eq!(config.join_type, LookupJoinType::Inner);
450    }
451
452    #[test]
453    fn test_from_analysis_lookup() {
454        let analysis = JoinAnalysis::lookup(
455            "orders".to_string(),
456            "customers".to_string(),
457            "customer_id".to_string(),
458            "id".to_string(),
459            JoinType::Inner,
460        );
461
462        let config = JoinOperatorConfig::from_analysis(&analysis);
463
464        assert!(config.is_lookup());
465        assert!(!config.is_stream_stream());
466        assert_eq!(config.left_key(), "customer_id");
467        assert_eq!(config.right_key(), "id");
468    }
469
470    #[test]
471    fn test_from_analysis_stream_stream() {
472        let analysis = JoinAnalysis::stream_stream(
473            "orders".to_string(),
474            "payments".to_string(),
475            "order_id".to_string(),
476            "order_id".to_string(),
477            Duration::from_secs(3600),
478            JoinType::Inner,
479        );
480
481        let config = JoinOperatorConfig::from_analysis(&analysis);
482
483        assert!(config.is_stream_stream());
484        assert!(!config.is_lookup());
485
486        if let JoinOperatorConfig::StreamStream(stream_config) = config {
487            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
488            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
489        }
490    }
491
492    #[test]
493    fn test_from_analysis_asof() {
494        let analysis = JoinAnalysis::asof(
495            "trades".to_string(),
496            "quotes".to_string(),
497            "symbol".to_string(),
498            "symbol".to_string(),
499            AsofSqlDirection::Backward,
500            "ts".to_string(),
501            "ts".to_string(),
502            Some(Duration::from_secs(5)),
503        );
504
505        let config = JoinOperatorConfig::from_analysis(&analysis);
506        assert!(config.is_asof());
507        assert!(!config.is_stream_stream());
508        assert!(!config.is_lookup());
509    }
510
511    #[test]
512    fn test_asof_config_fields() {
513        let analysis = JoinAnalysis::asof(
514            "trades".to_string(),
515            "quotes".to_string(),
516            "symbol".to_string(),
517            "symbol".to_string(),
518            AsofSqlDirection::Forward,
519            "trade_ts".to_string(),
520            "quote_ts".to_string(),
521            Some(Duration::from_millis(5000)),
522        );
523
524        let config = JoinOperatorConfig::from_analysis(&analysis);
525        if let JoinOperatorConfig::Asof(asof) = config {
526            assert_eq!(asof.direction, AsofSqlDirection::Forward);
527            assert_eq!(asof.left_time_column, "trade_ts");
528            assert_eq!(asof.right_time_column, "quote_ts");
529            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
530            assert_eq!(asof.key_column, "symbol");
531            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
532        } else {
533            panic!("Expected Asof config");
534        }
535    }
536
537    #[test]
538    fn test_asof_is_asof() {
539        let analysis = JoinAnalysis::asof(
540            "a".to_string(),
541            "b".to_string(),
542            "id".to_string(),
543            "id".to_string(),
544            AsofSqlDirection::Backward,
545            "ts".to_string(),
546            "ts".to_string(),
547            None,
548        );
549
550        let config = JoinOperatorConfig::from_analysis(&analysis);
551        assert!(config.is_asof());
552    }
553
554    #[test]
555    fn test_asof_key_accessors() {
556        let analysis = JoinAnalysis::asof(
557            "trades".to_string(),
558            "quotes".to_string(),
559            "sym".to_string(),
560            "sym".to_string(),
561            AsofSqlDirection::Backward,
562            "ts".to_string(),
563            "ts".to_string(),
564            None,
565        );
566
567        let config = JoinOperatorConfig::from_analysis(&analysis);
568        assert_eq!(config.left_key(), "sym");
569        assert_eq!(config.right_key(), "sym");
570    }
571
572    // -- Multi-way join translator tests --
573
574    #[test]
575    fn test_from_multi_analysis_single() {
576        let analysis = JoinAnalysis::lookup(
577            "a".to_string(),
578            "b".to_string(),
579            "id".to_string(),
580            "id".to_string(),
581            JoinType::Inner,
582        );
583        let multi = MultiJoinAnalysis {
584            joins: vec![analysis],
585            tables: vec!["a".to_string(), "b".to_string()],
586        };
587
588        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
589        assert_eq!(configs.len(), 1);
590        assert!(configs[0].is_lookup());
591    }
592
593    #[test]
594    fn test_from_multi_analysis_two_lookups() {
595        let j1 = JoinAnalysis::lookup(
596            "a".to_string(),
597            "b".to_string(),
598            "id".to_string(),
599            "a_id".to_string(),
600            JoinType::Inner,
601        );
602        let j2 = JoinAnalysis::lookup(
603            "b".to_string(),
604            "c".to_string(),
605            "id".to_string(),
606            "b_id".to_string(),
607            JoinType::Left,
608        );
609        let multi = MultiJoinAnalysis {
610            joins: vec![j1, j2],
611            tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
612        };
613
614        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
615        assert_eq!(configs.len(), 2);
616        assert!(configs[0].is_lookup());
617        assert!(configs[1].is_lookup());
618        assert_eq!(configs[0].left_key(), "id");
619        assert_eq!(configs[1].left_key(), "id");
620    }
621
622    #[test]
623    fn test_from_multi_analysis_mixed_asof_lookup() {
624        let j1 = JoinAnalysis::asof(
625            "trades".to_string(),
626            "quotes".to_string(),
627            "symbol".to_string(),
628            "symbol".to_string(),
629            AsofSqlDirection::Backward,
630            "ts".to_string(),
631            "ts".to_string(),
632            None,
633        );
634        let j2 = JoinAnalysis::lookup(
635            "quotes".to_string(),
636            "products".to_string(),
637            "product_id".to_string(),
638            "id".to_string(),
639            JoinType::Inner,
640        );
641        let multi = MultiJoinAnalysis {
642            joins: vec![j1, j2],
643            tables: vec![
644                "trades".to_string(),
645                "quotes".to_string(),
646                "products".to_string(),
647            ],
648        };
649
650        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
651        assert_eq!(configs.len(), 2);
652        assert!(configs[0].is_asof());
653        assert!(configs[1].is_lookup());
654    }
655
656    #[test]
657    fn test_from_multi_analysis_stream_stream_and_lookup() {
658        let j1 = JoinAnalysis::stream_stream(
659            "orders".to_string(),
660            "payments".to_string(),
661            "id".to_string(),
662            "order_id".to_string(),
663            Duration::from_secs(3600),
664            JoinType::Inner,
665        );
666        let j2 = JoinAnalysis::lookup(
667            "payments".to_string(),
668            "customers".to_string(),
669            "cust_id".to_string(),
670            "id".to_string(),
671            JoinType::Left,
672        );
673        let multi = MultiJoinAnalysis {
674            joins: vec![j1, j2],
675            tables: vec![
676                "orders".to_string(),
677                "payments".to_string(),
678                "customers".to_string(),
679            ],
680        };
681
682        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
683        assert_eq!(configs.len(), 2);
684        assert!(configs[0].is_stream_stream());
685        assert!(configs[1].is_lookup());
686    }
687
688    #[test]
689    fn test_from_multi_analysis_order_preserved() {
690        let j1 = JoinAnalysis::lookup(
691            "a".to_string(),
692            "b".to_string(),
693            "k1".to_string(),
694            "k1".to_string(),
695            JoinType::Inner,
696        );
697        let j2 = JoinAnalysis::stream_stream(
698            "b".to_string(),
699            "c".to_string(),
700            "k2".to_string(),
701            "k2".to_string(),
702            Duration::from_secs(60),
703            JoinType::Left,
704        );
705        let j3 = JoinAnalysis::lookup(
706            "c".to_string(),
707            "d".to_string(),
708            "k3".to_string(),
709            "k3".to_string(),
710            JoinType::Inner,
711        );
712        let multi = MultiJoinAnalysis {
713            joins: vec![j1, j2, j3],
714            tables: vec![
715                "a".to_string(),
716                "b".to_string(),
717                "c".to_string(),
718                "d".to_string(),
719            ],
720        };
721
722        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
723        assert_eq!(configs.len(), 3);
724        assert!(configs[0].is_lookup());
725        assert!(configs[1].is_stream_stream());
726        assert!(configs[2].is_lookup());
727        assert_eq!(configs[0].left_key(), "k1");
728        assert_eq!(configs[1].left_key(), "k2");
729        assert_eq!(configs[2].left_key(), "k3");
730    }
731
732    #[test]
733    fn test_join_types() {
734        // Test all join types map correctly
735        let left_analysis = JoinAnalysis::stream_stream(
736            "a".to_string(),
737            "b".to_string(),
738            "id".to_string(),
739            "id".to_string(),
740            Duration::from_secs(60),
741            JoinType::Left,
742        );
743
744        if let JoinOperatorConfig::StreamStream(config) =
745            JoinOperatorConfig::from_analysis(&left_analysis)
746        {
747            assert_eq!(config.join_type, StreamJoinType::Left);
748        }
749
750        let right_analysis = JoinAnalysis::stream_stream(
751            "a".to_string(),
752            "b".to_string(),
753            "id".to_string(),
754            "id".to_string(),
755            Duration::from_secs(60),
756            JoinType::Right,
757        );
758
759        if let JoinOperatorConfig::StreamStream(config) =
760            JoinOperatorConfig::from_analysis(&right_analysis)
761        {
762            assert_eq!(config.join_type, StreamJoinType::Right);
763        }
764
765        let full_analysis = JoinAnalysis::stream_stream(
766            "a".to_string(),
767            "b".to_string(),
768            "id".to_string(),
769            "id".to_string(),
770            Duration::from_secs(60),
771            JoinType::Full,
772        );
773
774        if let JoinOperatorConfig::StreamStream(config) =
775            JoinOperatorConfig::from_analysis(&full_analysis)
776        {
777            assert_eq!(config.join_type, StreamJoinType::Full);
778        }
779    }
780
781    #[test]
782    fn test_display_stream_join() {
783        let config = StreamJoinConfig::inner(
784            "order_id".to_string(),
785            "order_id".to_string(),
786            Duration::from_secs(3600),
787        );
788        assert_eq!(
789            format!("{config}"),
790            "INNER JOIN ON left.order_id = right.order_id (bound: 3600s)"
791        );
792    }
793
794    #[test]
795    fn test_display_lookup_join() {
796        let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
797        assert_eq!(
798            format!("{config}"),
799            "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
800        );
801    }
802
803    #[test]
804    fn test_display_asof_join() {
805        let analysis = JoinAnalysis::asof(
806            "trades".to_string(),
807            "quotes".to_string(),
808            "symbol".to_string(),
809            "symbol".to_string(),
810            AsofSqlDirection::Backward,
811            "ts".to_string(),
812            "ts".to_string(),
813            Some(Duration::from_secs(5)),
814        );
815        let config = JoinOperatorConfig::from_analysis(&analysis);
816        let s = format!("{config}");
817        assert!(s.contains("ASOF JOIN"), "got: {s}");
818        assert!(s.contains("BACKWARD"), "got: {s}");
819        assert!(s.contains("tolerance: 5s"), "got: {s}");
820    }
821
822    #[test]
823    fn test_display_join_types() {
824        assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
825        assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
826        assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
827        assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
828        assert_eq!(format!("{}", StreamJoinType::LeftSemi), "LEFT SEMI");
829        assert_eq!(format!("{}", StreamJoinType::LeftAnti), "LEFT ANTI");
830        assert_eq!(format!("{}", StreamJoinType::RightSemi), "RIGHT SEMI");
831        assert_eq!(format!("{}", StreamJoinType::RightAnti), "RIGHT ANTI");
832        assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
833        assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
834        assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
835        assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
836    }
837
838    #[test]
839    fn test_from_analysis_semi_anti() {
840        let semi = JoinAnalysis::stream_stream(
841            "a".to_string(),
842            "b".to_string(),
843            "id".to_string(),
844            "id".to_string(),
845            Duration::from_secs(60),
846            JoinType::LeftSemi,
847        );
848        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
849            assert_eq!(config.join_type, StreamJoinType::LeftSemi);
850        } else {
851            panic!("Expected StreamStream config");
852        }
853
854        let anti = JoinAnalysis::stream_stream(
855            "a".to_string(),
856            "b".to_string(),
857            "id".to_string(),
858            "id".to_string(),
859            Duration::from_secs(60),
860            JoinType::RightAnti,
861        );
862        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
863            assert_eq!(config.join_type, StreamJoinType::RightAnti);
864        } else {
865            panic!("Expected StreamStream config");
866        }
867    }
868
869    #[test]
870    fn test_from_analysis_temporal() {
871        let analysis = JoinAnalysis::temporal(
872            "orders".to_string(),
873            "products".to_string(),
874            "product_id".to_string(),
875            "id".to_string(),
876            "order_time".to_string(),
877            JoinType::Inner,
878        );
879
880        let config = JoinOperatorConfig::from_analysis(&analysis);
881        assert!(config.is_temporal());
882        assert!(!config.is_asof());
883        assert!(!config.is_lookup());
884        assert!(!config.is_stream_stream());
885        assert_eq!(config.left_key(), "product_id");
886        assert_eq!(config.right_key(), "id");
887
888        if let JoinOperatorConfig::Temporal(tc) = config {
889            assert_eq!(tc.table_version_column, "order_time");
890            assert_eq!(tc.semantics, "event_time");
891            assert_eq!(tc.join_type, "inner");
892        } else {
893            panic!("Expected Temporal config");
894        }
895    }
896
897    #[test]
898    fn test_temporal_left_join() {
899        let analysis = JoinAnalysis::temporal(
900            "orders".to_string(),
901            "products".to_string(),
902            "product_id".to_string(),
903            "id".to_string(),
904            "order_time".to_string(),
905            JoinType::Left,
906        );
907
908        let config = JoinOperatorConfig::from_analysis(&analysis);
909        if let JoinOperatorConfig::Temporal(tc) = config {
910            assert_eq!(tc.join_type, "left");
911        } else {
912            panic!("Expected Temporal config");
913        }
914    }
915
916    #[test]
917    fn test_display_temporal_join() {
918        let analysis = JoinAnalysis::temporal(
919            "orders".to_string(),
920            "products".to_string(),
921            "product_id".to_string(),
922            "id".to_string(),
923            "order_time".to_string(),
924            JoinType::Inner,
925        );
926        let config = JoinOperatorConfig::from_analysis(&analysis);
927        let s = format!("{config}");
928        assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
929        assert!(s.contains("order_time"), "got: {s}");
930    }
931}