Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Time bound for joining (max time difference between events)
18    pub time_bound: Duration,
19    /// Join type
20    pub join_type: StreamJoinType,
21}
22
23/// Configuration for lookup join operator
24#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26    /// Stream side key column
27    pub stream_key: String,
28    /// Lookup table key column
29    pub lookup_key: String,
30    /// Join type
31    pub join_type: LookupJoinType,
32    /// Cache TTL for lookup results
33    pub cache_ttl: Duration,
34}
35
36/// Stream-stream join types
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39    /// Both sides required
40    Inner,
41    /// Left side always emitted
42    Left,
43    /// Right side always emitted
44    Right,
45    /// Both sides always emitted
46    Full,
47    /// Left rows with at least one match (left columns only)
48    LeftSemi,
49    /// Left rows with no match (left columns only)
50    LeftAnti,
51    /// Right rows with at least one match (right columns only)
52    RightSemi,
53    /// Right rows with no match (right columns only)
54    RightAnti,
55}
56
57/// Lookup join types
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60    /// Stream event required, lookup optional
61    Inner,
62    /// Stream event always emitted, lookup optional
63    Left,
64}
65
66/// Configuration for temporal join operator (FOR SYSTEM_TIME AS OF).
67#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69    /// Stream (left) side table name
70    pub stream_table: String,
71    /// Table (right) side table name
72    pub table_name: String,
73    /// Stream side key column
74    pub stream_key_column: String,
75    /// Table side key column
76    pub table_key_column: String,
77    /// Stream-side column for lookup timestamp (from `FOR SYSTEM_TIME AS OF`)
78    pub stream_time_column: String,
79    /// Table-side version column (defaults to same as `stream_time_column`)
80    pub table_version_column: String,
81    /// Temporal semantics: "event_time" or "process_time"
82    pub semantics: String,
83    /// Join type: "inner" or "left"
84    pub join_type: String,
85}
86
87/// Configuration for ASOF join operator
88#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90    /// Left side table name
91    pub left_table: String,
92    /// Right side table name
93    pub right_table: String,
94    /// Key column for partitioning (e.g., symbol)
95    pub key_column: String,
96    /// Left side time column
97    pub left_time_column: String,
98    /// Right side time column
99    pub right_time_column: String,
100    /// Join direction (Backward, Forward, or Nearest)
101    pub direction: AsofSqlDirection,
102    /// Maximum allowed time difference
103    pub tolerance: Option<Duration>,
104    /// ASOF join type (Inner or Left)
105    pub join_type: AsofSqlJoinType,
106}
107
108/// ASOF join type
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111    /// Both sides required
112    Inner,
113    /// Left side always emitted
114    Left,
115}
116
117/// Union type for join operator configurations
118#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120    /// Stream-stream join
121    StreamStream(StreamJoinConfig),
122    /// Lookup join
123    Lookup(LookupJoinConfig),
124    /// ASOF join
125    Asof(AsofJoinTranslatorConfig),
126    /// Temporal join (FOR SYSTEM_TIME AS OF)
127    Temporal(TemporalJoinTranslatorConfig),
128}
129
130impl std::fmt::Display for StreamJoinType {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            StreamJoinType::Inner => write!(f, "INNER"),
134            StreamJoinType::Left => write!(f, "LEFT"),
135            StreamJoinType::Right => write!(f, "RIGHT"),
136            StreamJoinType::Full => write!(f, "FULL"),
137            StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
138            StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
139            StreamJoinType::RightSemi => write!(f, "RIGHT SEMI"),
140            StreamJoinType::RightAnti => write!(f, "RIGHT ANTI"),
141        }
142    }
143}
144
145impl std::fmt::Display for LookupJoinType {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        match self {
148            LookupJoinType::Inner => write!(f, "INNER"),
149            LookupJoinType::Left => write!(f, "LEFT"),
150        }
151    }
152}
153
154impl std::fmt::Display for AsofSqlJoinType {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        match self {
157            AsofSqlJoinType::Inner => write!(f, "INNER"),
158            AsofSqlJoinType::Left => write!(f, "LEFT"),
159        }
160    }
161}
162
163impl std::fmt::Display for StreamJoinConfig {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        write!(
166            f,
167            "{} JOIN ON left.{} = right.{} (bound: {}s)",
168            self.join_type,
169            self.left_key,
170            self.right_key,
171            self.time_bound.as_secs()
172        )
173    }
174}
175
176impl std::fmt::Display for LookupJoinConfig {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
181            self.join_type,
182            self.stream_key,
183            self.lookup_key,
184            self.cache_ttl.as_secs()
185        )
186    }
187}
188
189impl std::fmt::Display for AsofJoinTranslatorConfig {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        write!(
192            f,
193            "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
194            self.join_type,
195            self.left_table,
196            self.key_column,
197            self.right_table,
198            self.key_column,
199            self.direction,
200            self.left_table,
201            self.left_time_column,
202            self.right_table,
203            self.right_time_column,
204        )?;
205        if let Some(tol) = self.tolerance {
206            write!(f, ", tolerance: {}s", tol.as_secs())?;
207        }
208        write!(f, ")")
209    }
210}
211
212impl std::fmt::Display for TemporalJoinTranslatorConfig {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        write!(
215            f,
216            "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
217            self.join_type.to_uppercase(),
218            self.stream_key_column,
219            self.table_key_column,
220            self.table_version_column,
221            self.semantics,
222        )
223    }
224}
225
226impl std::fmt::Display for JoinOperatorConfig {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        match self {
229            JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
230            JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
231            JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
232            JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
233        }
234    }
235}
236
237impl JoinOperatorConfig {
238    /// Create from join analysis.
239    #[must_use]
240    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
241        if analysis.is_temporal_join {
242            let join_type_str = match analysis.join_type {
243                JoinType::Left => "left",
244                _ => "inner",
245            };
246            let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
247            return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
248                stream_table: analysis.left_table.clone(),
249                table_name: analysis.right_table.clone(),
250                stream_key_column: analysis.left_key_column.clone(),
251                table_key_column: analysis.right_key_column.clone(),
252                stream_time_column: version_col.clone(),
253                table_version_column: version_col,
254                semantics: "event_time".to_string(),
255                join_type: join_type_str.to_string(),
256            });
257        }
258
259        if analysis.is_asof_join {
260            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
261                left_table: analysis.left_table.clone(),
262                right_table: analysis.right_table.clone(),
263                key_column: analysis.left_key_column.clone(),
264                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
265                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
266                direction: analysis
267                    .asof_direction
268                    .unwrap_or(AsofSqlDirection::Backward),
269                tolerance: analysis.asof_tolerance,
270                join_type: AsofSqlJoinType::Left, // ASOF is always left-style
271            });
272        }
273
274        if analysis.is_lookup_join {
275            JoinOperatorConfig::Lookup(LookupJoinConfig {
276                stream_key: analysis.left_key_column.clone(),
277                lookup_key: analysis.right_key_column.clone(),
278                join_type: match analysis.join_type {
279                    JoinType::Inner => LookupJoinType::Inner,
280                    _ => LookupJoinType::Left,
281                },
282                cache_ttl: Duration::from_secs(300), // Default 5 min
283            })
284        } else {
285            JoinOperatorConfig::StreamStream(StreamJoinConfig {
286                left_key: analysis.left_key_column.clone(),
287                right_key: analysis.right_key_column.clone(),
288                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
289                join_type: match analysis.join_type {
290                    JoinType::Inner => StreamJoinType::Inner,
291                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
292                    JoinType::Right => StreamJoinType::Right,
293                    JoinType::Full => StreamJoinType::Full,
294                    JoinType::LeftSemi => StreamJoinType::LeftSemi,
295                    JoinType::LeftAnti => StreamJoinType::LeftAnti,
296                    JoinType::RightSemi => StreamJoinType::RightSemi,
297                    JoinType::RightAnti => StreamJoinType::RightAnti,
298                },
299            })
300        }
301    }
302
303    /// Create from a multi-join analysis, producing one config per join step.
304    #[must_use]
305    pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
306        multi.joins.iter().map(Self::from_analysis).collect()
307    }
308
309    /// Check if this is a stream-stream join.
310    #[must_use]
311    pub fn is_stream_stream(&self) -> bool {
312        matches!(self, JoinOperatorConfig::StreamStream(_))
313    }
314
315    /// Check if this is a lookup join.
316    #[must_use]
317    pub fn is_lookup(&self) -> bool {
318        matches!(self, JoinOperatorConfig::Lookup(_))
319    }
320
321    /// Check if this is an ASOF join.
322    #[must_use]
323    pub fn is_asof(&self) -> bool {
324        matches!(self, JoinOperatorConfig::Asof(_))
325    }
326
327    /// Check if this is a temporal join.
328    #[must_use]
329    pub fn is_temporal(&self) -> bool {
330        matches!(self, JoinOperatorConfig::Temporal(_))
331    }
332
333    /// Get the left key column name.
334    #[must_use]
335    pub fn left_key(&self) -> &str {
336        match self {
337            JoinOperatorConfig::StreamStream(config) => &config.left_key,
338            JoinOperatorConfig::Lookup(config) => &config.stream_key,
339            JoinOperatorConfig::Asof(config) => &config.key_column,
340            JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
341        }
342    }
343
344    /// Get the right key column name.
345    #[must_use]
346    pub fn right_key(&self) -> &str {
347        match self {
348            JoinOperatorConfig::StreamStream(config) => &config.right_key,
349            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
350            JoinOperatorConfig::Asof(config) => &config.key_column,
351            JoinOperatorConfig::Temporal(config) => &config.table_key_column,
352        }
353    }
354}
355
356impl StreamJoinConfig {
357    /// Create a new stream-stream join configuration.
358    #[must_use]
359    pub fn new(
360        left_key: String,
361        right_key: String,
362        time_bound: Duration,
363        join_type: StreamJoinType,
364    ) -> Self {
365        Self {
366            left_key,
367            right_key,
368            time_bound,
369            join_type,
370        }
371    }
372
373    /// Create an inner join configuration.
374    #[must_use]
375    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
376        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
377    }
378
379    /// Create a left join configuration.
380    #[must_use]
381    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
382        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
383    }
384}
385
386impl LookupJoinConfig {
387    /// Create a new lookup join configuration.
388    #[must_use]
389    pub fn new(
390        stream_key: String,
391        lookup_key: String,
392        join_type: LookupJoinType,
393        cache_ttl: Duration,
394    ) -> Self {
395        Self {
396            stream_key,
397            lookup_key,
398            join_type,
399            cache_ttl,
400        }
401    }
402
403    /// Create an inner lookup join configuration.
404    #[must_use]
405    pub fn inner(stream_key: String, lookup_key: String) -> Self {
406        Self::new(
407            stream_key,
408            lookup_key,
409            LookupJoinType::Inner,
410            Duration::from_secs(300),
411        )
412    }
413
414    /// Create a left lookup join configuration.
415    #[must_use]
416    pub fn left(stream_key: String, lookup_key: String) -> Self {
417        Self::new(
418            stream_key,
419            lookup_key,
420            LookupJoinType::Left,
421            Duration::from_secs(300),
422        )
423    }
424
425    /// Set the cache TTL.
426    #[must_use]
427    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
428        self.cache_ttl = ttl;
429        self
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn test_stream_join_config() {
439        let config = StreamJoinConfig::inner(
440            "order_id".to_string(),
441            "order_id".to_string(),
442            Duration::from_secs(3600),
443        );
444
445        assert_eq!(config.left_key, "order_id");
446        assert_eq!(config.right_key, "order_id");
447        assert_eq!(config.time_bound, Duration::from_secs(3600));
448        assert_eq!(config.join_type, StreamJoinType::Inner);
449    }
450
451    #[test]
452    fn test_lookup_join_config() {
453        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
454            .with_cache_ttl(Duration::from_secs(600));
455
456        assert_eq!(config.stream_key, "customer_id");
457        assert_eq!(config.lookup_key, "id");
458        assert_eq!(config.cache_ttl, Duration::from_secs(600));
459        assert_eq!(config.join_type, LookupJoinType::Inner);
460    }
461
462    #[test]
463    fn test_from_analysis_lookup() {
464        let analysis = JoinAnalysis::lookup(
465            "orders".to_string(),
466            "customers".to_string(),
467            "customer_id".to_string(),
468            "id".to_string(),
469            JoinType::Inner,
470        );
471
472        let config = JoinOperatorConfig::from_analysis(&analysis);
473
474        assert!(config.is_lookup());
475        assert!(!config.is_stream_stream());
476        assert_eq!(config.left_key(), "customer_id");
477        assert_eq!(config.right_key(), "id");
478    }
479
480    #[test]
481    fn test_from_analysis_stream_stream() {
482        let analysis = JoinAnalysis::stream_stream(
483            "orders".to_string(),
484            "payments".to_string(),
485            "order_id".to_string(),
486            "order_id".to_string(),
487            Duration::from_secs(3600),
488            JoinType::Inner,
489        );
490
491        let config = JoinOperatorConfig::from_analysis(&analysis);
492
493        assert!(config.is_stream_stream());
494        assert!(!config.is_lookup());
495
496        if let JoinOperatorConfig::StreamStream(stream_config) = config {
497            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
498            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
499        }
500    }
501
502    #[test]
503    fn test_from_analysis_asof() {
504        let analysis = JoinAnalysis::asof(
505            "trades".to_string(),
506            "quotes".to_string(),
507            "symbol".to_string(),
508            "symbol".to_string(),
509            AsofSqlDirection::Backward,
510            "ts".to_string(),
511            "ts".to_string(),
512            Some(Duration::from_secs(5)),
513        );
514
515        let config = JoinOperatorConfig::from_analysis(&analysis);
516        assert!(config.is_asof());
517        assert!(!config.is_stream_stream());
518        assert!(!config.is_lookup());
519    }
520
521    #[test]
522    fn test_asof_config_fields() {
523        let analysis = JoinAnalysis::asof(
524            "trades".to_string(),
525            "quotes".to_string(),
526            "symbol".to_string(),
527            "symbol".to_string(),
528            AsofSqlDirection::Forward,
529            "trade_ts".to_string(),
530            "quote_ts".to_string(),
531            Some(Duration::from_millis(5000)),
532        );
533
534        let config = JoinOperatorConfig::from_analysis(&analysis);
535        if let JoinOperatorConfig::Asof(asof) = config {
536            assert_eq!(asof.direction, AsofSqlDirection::Forward);
537            assert_eq!(asof.left_time_column, "trade_ts");
538            assert_eq!(asof.right_time_column, "quote_ts");
539            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
540            assert_eq!(asof.key_column, "symbol");
541            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
542        } else {
543            panic!("Expected Asof config");
544        }
545    }
546
547    #[test]
548    fn test_asof_is_asof() {
549        let analysis = JoinAnalysis::asof(
550            "a".to_string(),
551            "b".to_string(),
552            "id".to_string(),
553            "id".to_string(),
554            AsofSqlDirection::Backward,
555            "ts".to_string(),
556            "ts".to_string(),
557            None,
558        );
559
560        let config = JoinOperatorConfig::from_analysis(&analysis);
561        assert!(config.is_asof());
562    }
563
564    #[test]
565    fn test_asof_key_accessors() {
566        let analysis = JoinAnalysis::asof(
567            "trades".to_string(),
568            "quotes".to_string(),
569            "sym".to_string(),
570            "sym".to_string(),
571            AsofSqlDirection::Backward,
572            "ts".to_string(),
573            "ts".to_string(),
574            None,
575        );
576
577        let config = JoinOperatorConfig::from_analysis(&analysis);
578        assert_eq!(config.left_key(), "sym");
579        assert_eq!(config.right_key(), "sym");
580    }
581
582    // -- Multi-way join translator tests --
583
584    #[test]
585    fn test_from_multi_analysis_single() {
586        let analysis = JoinAnalysis::lookup(
587            "a".to_string(),
588            "b".to_string(),
589            "id".to_string(),
590            "id".to_string(),
591            JoinType::Inner,
592        );
593        let multi = MultiJoinAnalysis {
594            joins: vec![analysis],
595            tables: vec!["a".to_string(), "b".to_string()],
596        };
597
598        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
599        assert_eq!(configs.len(), 1);
600        assert!(configs[0].is_lookup());
601    }
602
603    #[test]
604    fn test_from_multi_analysis_two_lookups() {
605        let j1 = JoinAnalysis::lookup(
606            "a".to_string(),
607            "b".to_string(),
608            "id".to_string(),
609            "a_id".to_string(),
610            JoinType::Inner,
611        );
612        let j2 = JoinAnalysis::lookup(
613            "b".to_string(),
614            "c".to_string(),
615            "id".to_string(),
616            "b_id".to_string(),
617            JoinType::Left,
618        );
619        let multi = MultiJoinAnalysis {
620            joins: vec![j1, j2],
621            tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
622        };
623
624        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
625        assert_eq!(configs.len(), 2);
626        assert!(configs[0].is_lookup());
627        assert!(configs[1].is_lookup());
628        assert_eq!(configs[0].left_key(), "id");
629        assert_eq!(configs[1].left_key(), "id");
630    }
631
632    #[test]
633    fn test_from_multi_analysis_mixed_asof_lookup() {
634        let j1 = JoinAnalysis::asof(
635            "trades".to_string(),
636            "quotes".to_string(),
637            "symbol".to_string(),
638            "symbol".to_string(),
639            AsofSqlDirection::Backward,
640            "ts".to_string(),
641            "ts".to_string(),
642            None,
643        );
644        let j2 = JoinAnalysis::lookup(
645            "quotes".to_string(),
646            "products".to_string(),
647            "product_id".to_string(),
648            "id".to_string(),
649            JoinType::Inner,
650        );
651        let multi = MultiJoinAnalysis {
652            joins: vec![j1, j2],
653            tables: vec![
654                "trades".to_string(),
655                "quotes".to_string(),
656                "products".to_string(),
657            ],
658        };
659
660        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
661        assert_eq!(configs.len(), 2);
662        assert!(configs[0].is_asof());
663        assert!(configs[1].is_lookup());
664    }
665
666    #[test]
667    fn test_from_multi_analysis_stream_stream_and_lookup() {
668        let j1 = JoinAnalysis::stream_stream(
669            "orders".to_string(),
670            "payments".to_string(),
671            "id".to_string(),
672            "order_id".to_string(),
673            Duration::from_secs(3600),
674            JoinType::Inner,
675        );
676        let j2 = JoinAnalysis::lookup(
677            "payments".to_string(),
678            "customers".to_string(),
679            "cust_id".to_string(),
680            "id".to_string(),
681            JoinType::Left,
682        );
683        let multi = MultiJoinAnalysis {
684            joins: vec![j1, j2],
685            tables: vec![
686                "orders".to_string(),
687                "payments".to_string(),
688                "customers".to_string(),
689            ],
690        };
691
692        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
693        assert_eq!(configs.len(), 2);
694        assert!(configs[0].is_stream_stream());
695        assert!(configs[1].is_lookup());
696    }
697
698    #[test]
699    fn test_from_multi_analysis_order_preserved() {
700        let j1 = JoinAnalysis::lookup(
701            "a".to_string(),
702            "b".to_string(),
703            "k1".to_string(),
704            "k1".to_string(),
705            JoinType::Inner,
706        );
707        let j2 = JoinAnalysis::stream_stream(
708            "b".to_string(),
709            "c".to_string(),
710            "k2".to_string(),
711            "k2".to_string(),
712            Duration::from_secs(60),
713            JoinType::Left,
714        );
715        let j3 = JoinAnalysis::lookup(
716            "c".to_string(),
717            "d".to_string(),
718            "k3".to_string(),
719            "k3".to_string(),
720            JoinType::Inner,
721        );
722        let multi = MultiJoinAnalysis {
723            joins: vec![j1, j2, j3],
724            tables: vec![
725                "a".to_string(),
726                "b".to_string(),
727                "c".to_string(),
728                "d".to_string(),
729            ],
730        };
731
732        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
733        assert_eq!(configs.len(), 3);
734        assert!(configs[0].is_lookup());
735        assert!(configs[1].is_stream_stream());
736        assert!(configs[2].is_lookup());
737        assert_eq!(configs[0].left_key(), "k1");
738        assert_eq!(configs[1].left_key(), "k2");
739        assert_eq!(configs[2].left_key(), "k3");
740    }
741
742    #[test]
743    fn test_join_types() {
744        // Test all join types map correctly
745        let left_analysis = JoinAnalysis::stream_stream(
746            "a".to_string(),
747            "b".to_string(),
748            "id".to_string(),
749            "id".to_string(),
750            Duration::from_secs(60),
751            JoinType::Left,
752        );
753
754        if let JoinOperatorConfig::StreamStream(config) =
755            JoinOperatorConfig::from_analysis(&left_analysis)
756        {
757            assert_eq!(config.join_type, StreamJoinType::Left);
758        }
759
760        let right_analysis = JoinAnalysis::stream_stream(
761            "a".to_string(),
762            "b".to_string(),
763            "id".to_string(),
764            "id".to_string(),
765            Duration::from_secs(60),
766            JoinType::Right,
767        );
768
769        if let JoinOperatorConfig::StreamStream(config) =
770            JoinOperatorConfig::from_analysis(&right_analysis)
771        {
772            assert_eq!(config.join_type, StreamJoinType::Right);
773        }
774
775        let full_analysis = JoinAnalysis::stream_stream(
776            "a".to_string(),
777            "b".to_string(),
778            "id".to_string(),
779            "id".to_string(),
780            Duration::from_secs(60),
781            JoinType::Full,
782        );
783
784        if let JoinOperatorConfig::StreamStream(config) =
785            JoinOperatorConfig::from_analysis(&full_analysis)
786        {
787            assert_eq!(config.join_type, StreamJoinType::Full);
788        }
789    }
790
791    #[test]
792    fn test_display_stream_join() {
793        let config = StreamJoinConfig::inner(
794            "order_id".to_string(),
795            "order_id".to_string(),
796            Duration::from_secs(3600),
797        );
798        assert_eq!(
799            format!("{config}"),
800            "INNER JOIN ON left.order_id = right.order_id (bound: 3600s)"
801        );
802    }
803
804    #[test]
805    fn test_display_lookup_join() {
806        let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
807        assert_eq!(
808            format!("{config}"),
809            "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
810        );
811    }
812
813    #[test]
814    fn test_display_asof_join() {
815        let analysis = JoinAnalysis::asof(
816            "trades".to_string(),
817            "quotes".to_string(),
818            "symbol".to_string(),
819            "symbol".to_string(),
820            AsofSqlDirection::Backward,
821            "ts".to_string(),
822            "ts".to_string(),
823            Some(Duration::from_secs(5)),
824        );
825        let config = JoinOperatorConfig::from_analysis(&analysis);
826        let s = format!("{config}");
827        assert!(s.contains("ASOF JOIN"), "got: {s}");
828        assert!(s.contains("BACKWARD"), "got: {s}");
829        assert!(s.contains("tolerance: 5s"), "got: {s}");
830    }
831
832    #[test]
833    fn test_display_join_types() {
834        assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
835        assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
836        assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
837        assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
838        assert_eq!(format!("{}", StreamJoinType::LeftSemi), "LEFT SEMI");
839        assert_eq!(format!("{}", StreamJoinType::LeftAnti), "LEFT ANTI");
840        assert_eq!(format!("{}", StreamJoinType::RightSemi), "RIGHT SEMI");
841        assert_eq!(format!("{}", StreamJoinType::RightAnti), "RIGHT ANTI");
842        assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
843        assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
844        assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
845        assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
846    }
847
848    #[test]
849    fn test_from_analysis_semi_anti() {
850        let semi = JoinAnalysis::stream_stream(
851            "a".to_string(),
852            "b".to_string(),
853            "id".to_string(),
854            "id".to_string(),
855            Duration::from_secs(60),
856            JoinType::LeftSemi,
857        );
858        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
859            assert_eq!(config.join_type, StreamJoinType::LeftSemi);
860        } else {
861            panic!("Expected StreamStream config");
862        }
863
864        let anti = JoinAnalysis::stream_stream(
865            "a".to_string(),
866            "b".to_string(),
867            "id".to_string(),
868            "id".to_string(),
869            Duration::from_secs(60),
870            JoinType::RightAnti,
871        );
872        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
873            assert_eq!(config.join_type, StreamJoinType::RightAnti);
874        } else {
875            panic!("Expected StreamStream config");
876        }
877    }
878
879    #[test]
880    fn test_from_analysis_temporal() {
881        let analysis = JoinAnalysis::temporal(
882            "orders".to_string(),
883            "products".to_string(),
884            "product_id".to_string(),
885            "id".to_string(),
886            "order_time".to_string(),
887            JoinType::Inner,
888        );
889
890        let config = JoinOperatorConfig::from_analysis(&analysis);
891        assert!(config.is_temporal());
892        assert!(!config.is_asof());
893        assert!(!config.is_lookup());
894        assert!(!config.is_stream_stream());
895        assert_eq!(config.left_key(), "product_id");
896        assert_eq!(config.right_key(), "id");
897
898        if let JoinOperatorConfig::Temporal(tc) = config {
899            assert_eq!(tc.table_version_column, "order_time");
900            assert_eq!(tc.semantics, "event_time");
901            assert_eq!(tc.join_type, "inner");
902        } else {
903            panic!("Expected Temporal config");
904        }
905    }
906
907    #[test]
908    fn test_temporal_left_join() {
909        let analysis = JoinAnalysis::temporal(
910            "orders".to_string(),
911            "products".to_string(),
912            "product_id".to_string(),
913            "id".to_string(),
914            "order_time".to_string(),
915            JoinType::Left,
916        );
917
918        let config = JoinOperatorConfig::from_analysis(&analysis);
919        if let JoinOperatorConfig::Temporal(tc) = config {
920            assert_eq!(tc.join_type, "left");
921        } else {
922            panic!("Expected Temporal config");
923        }
924    }
925
926    #[test]
927    fn test_display_temporal_join() {
928        let analysis = JoinAnalysis::temporal(
929            "orders".to_string(),
930            "products".to_string(),
931            "product_id".to_string(),
932            "id".to_string(),
933            "order_time".to_string(),
934            JoinType::Inner,
935        );
936        let config = JoinOperatorConfig::from_analysis(&analysis);
937        let s = format!("{config}");
938        assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
939        assert!(s.contains("order_time"), "got: {s}");
940    }
941}