Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Left side time column for interval matching
18    pub left_time_column: String,
19    /// Right side time column for interval matching
20    pub right_time_column: String,
21    /// Left side table name
22    pub left_table: String,
23    /// Right side table name
24    pub right_table: String,
25    /// Time bound for joining (max time difference between events)
26    pub time_bound: Duration,
27    /// Join type
28    pub join_type: StreamJoinType,
29}
30
31/// Configuration for lookup join operator
32#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34    /// Stream side key column
35    pub stream_key: String,
36    /// Lookup table key column
37    pub lookup_key: String,
38    /// Join type
39    pub join_type: LookupJoinType,
40    /// Cache TTL for lookup results
41    pub cache_ttl: Duration,
42}
43
44/// Stream-stream join types
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47    /// Both sides required
48    Inner,
49    /// Left side always emitted
50    Left,
51    /// Right side always emitted
52    Right,
53    /// Both sides always emitted
54    Full,
55}
56
57/// Lookup join types
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60    /// Stream event required, lookup optional
61    Inner,
62    /// Stream event always emitted, lookup optional
63    Left,
64}
65
66/// Configuration for temporal join operator (FOR SYSTEM_TIME AS OF).
67#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69    /// Stream (left) side table name
70    pub stream_table: String,
71    /// Table (right) side table name
72    pub table_name: String,
73    /// Stream side key column
74    pub stream_key_column: String,
75    /// Table side key column
76    pub table_key_column: String,
77    /// Stream-side column for lookup timestamp (from `FOR SYSTEM_TIME AS OF`)
78    pub stream_time_column: String,
79    /// Table-side version column (defaults to same as `stream_time_column`)
80    pub table_version_column: String,
81    /// Temporal semantics: "event_time" or "process_time"
82    pub semantics: String,
83    /// Join type: "inner" or "left"
84    pub join_type: String,
85}
86
87/// Configuration for ASOF join operator
88#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90    /// Left side table name
91    pub left_table: String,
92    /// Right side table name
93    pub right_table: String,
94    /// Key column for partitioning (e.g., symbol)
95    pub key_column: String,
96    /// Left side time column
97    pub left_time_column: String,
98    /// Right side time column
99    pub right_time_column: String,
100    /// Join direction (Backward, Forward, or Nearest)
101    pub direction: AsofSqlDirection,
102    /// Maximum allowed time difference
103    pub tolerance: Option<Duration>,
104    /// ASOF join type (Inner or Left)
105    pub join_type: AsofSqlJoinType,
106}
107
108/// ASOF join type
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111    /// Both sides required
112    Inner,
113    /// Left side always emitted
114    Left,
115}
116
117/// Union type for join operator configurations
118#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120    /// Stream-stream join
121    StreamStream(StreamJoinConfig),
122    /// Lookup join
123    Lookup(LookupJoinConfig),
124    /// ASOF join
125    Asof(AsofJoinTranslatorConfig),
126    /// Temporal join (FOR SYSTEM_TIME AS OF)
127    Temporal(TemporalJoinTranslatorConfig),
128}
129
130impl std::fmt::Display for StreamJoinType {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            StreamJoinType::Inner => write!(f, "INNER"),
134            StreamJoinType::Left => write!(f, "LEFT"),
135            StreamJoinType::Right => write!(f, "RIGHT"),
136            StreamJoinType::Full => write!(f, "FULL"),
137        }
138    }
139}
140
141impl std::fmt::Display for LookupJoinType {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        match self {
144            LookupJoinType::Inner => write!(f, "INNER"),
145            LookupJoinType::Left => write!(f, "LEFT"),
146        }
147    }
148}
149
150impl std::fmt::Display for AsofSqlJoinType {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        match self {
153            AsofSqlJoinType::Inner => write!(f, "INNER"),
154            AsofSqlJoinType::Left => write!(f, "LEFT"),
155        }
156    }
157}
158
159impl std::fmt::Display for StreamJoinConfig {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(
162            f,
163            "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
164            self.join_type,
165            self.left_table,
166            self.left_key,
167            self.right_table,
168            self.right_key,
169            self.time_bound.as_secs(),
170            self.left_time_column,
171            self.right_time_column,
172        )
173    }
174}
175
176impl std::fmt::Display for LookupJoinConfig {
177    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178        write!(
179            f,
180            "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
181            self.join_type,
182            self.stream_key,
183            self.lookup_key,
184            self.cache_ttl.as_secs()
185        )
186    }
187}
188
189impl std::fmt::Display for AsofJoinTranslatorConfig {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        write!(
192            f,
193            "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
194            self.join_type,
195            self.left_table,
196            self.key_column,
197            self.right_table,
198            self.key_column,
199            self.direction,
200            self.left_table,
201            self.left_time_column,
202            self.right_table,
203            self.right_time_column,
204        )?;
205        if let Some(tol) = self.tolerance {
206            write!(f, ", tolerance: {}s", tol.as_secs())?;
207        }
208        write!(f, ")")
209    }
210}
211
212impl std::fmt::Display for TemporalJoinTranslatorConfig {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        write!(
215            f,
216            "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
217            self.join_type.to_uppercase(),
218            self.stream_key_column,
219            self.table_key_column,
220            self.table_version_column,
221            self.semantics,
222        )
223    }
224}
225
226impl std::fmt::Display for JoinOperatorConfig {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        match self {
229            JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
230            JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
231            JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
232            JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
233        }
234    }
235}
236
237impl JoinOperatorConfig {
238    /// Create from join analysis.
239    #[must_use]
240    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
241        if analysis.is_temporal_join {
242            let join_type_str = match analysis.join_type {
243                JoinType::Left => "left",
244                _ => "inner",
245            };
246            let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
247            return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
248                stream_table: analysis.left_table.clone(),
249                table_name: analysis.right_table.clone(),
250                stream_key_column: analysis.left_key_column.clone(),
251                table_key_column: analysis.right_key_column.clone(),
252                stream_time_column: version_col.clone(),
253                table_version_column: version_col,
254                semantics: "event_time".to_string(),
255                join_type: join_type_str.to_string(),
256            });
257        }
258
259        if analysis.is_asof_join {
260            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
261                left_table: analysis.left_table.clone(),
262                right_table: analysis.right_table.clone(),
263                key_column: analysis.left_key_column.clone(),
264                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
265                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
266                direction: analysis
267                    .asof_direction
268                    .unwrap_or(AsofSqlDirection::Backward),
269                tolerance: analysis.asof_tolerance,
270                join_type: if analysis.join_type == JoinType::Inner {
271                    AsofSqlJoinType::Inner
272                } else {
273                    AsofSqlJoinType::Left
274                },
275            });
276        }
277
278        if analysis.is_lookup_join {
279            JoinOperatorConfig::Lookup(LookupJoinConfig {
280                stream_key: analysis.left_key_column.clone(),
281                lookup_key: analysis.right_key_column.clone(),
282                join_type: match analysis.join_type {
283                    JoinType::Inner => LookupJoinType::Inner,
284                    _ => LookupJoinType::Left,
285                },
286                cache_ttl: Duration::from_secs(300), // Default 5 min
287            })
288        } else {
289            JoinOperatorConfig::StreamStream(StreamJoinConfig {
290                left_key: analysis.left_key_column.clone(),
291                right_key: analysis.right_key_column.clone(),
292                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
293                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
294                left_table: analysis.left_table.clone(),
295                right_table: analysis.right_table.clone(),
296                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
297                join_type: match analysis.join_type {
298                    JoinType::Inner
299                    | JoinType::LeftSemi
300                    | JoinType::LeftAnti
301                    | JoinType::RightSemi
302                    | JoinType::RightAnti => StreamJoinType::Inner,
303                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
304                    JoinType::Right => StreamJoinType::Right,
305                    JoinType::Full => StreamJoinType::Full,
306                },
307            })
308        }
309    }
310
311    /// Create from a multi-join analysis, producing one config per join step.
312    #[must_use]
313    pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
314        multi.joins.iter().map(Self::from_analysis).collect()
315    }
316
317    /// Check if this is a stream-stream join.
318    #[must_use]
319    pub fn is_stream_stream(&self) -> bool {
320        matches!(self, JoinOperatorConfig::StreamStream(_))
321    }
322
323    /// Check if this is a lookup join.
324    #[must_use]
325    pub fn is_lookup(&self) -> bool {
326        matches!(self, JoinOperatorConfig::Lookup(_))
327    }
328
329    /// Check if this is an ASOF join.
330    #[must_use]
331    pub fn is_asof(&self) -> bool {
332        matches!(self, JoinOperatorConfig::Asof(_))
333    }
334
335    /// Check if this is a temporal join.
336    #[must_use]
337    pub fn is_temporal(&self) -> bool {
338        matches!(self, JoinOperatorConfig::Temporal(_))
339    }
340
341    /// Get the left key column name.
342    #[must_use]
343    pub fn left_key(&self) -> &str {
344        match self {
345            JoinOperatorConfig::StreamStream(config) => &config.left_key,
346            JoinOperatorConfig::Lookup(config) => &config.stream_key,
347            JoinOperatorConfig::Asof(config) => &config.key_column,
348            JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
349        }
350    }
351
352    /// Get the right key column name.
353    #[must_use]
354    pub fn right_key(&self) -> &str {
355        match self {
356            JoinOperatorConfig::StreamStream(config) => &config.right_key,
357            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
358            JoinOperatorConfig::Asof(config) => &config.key_column,
359            JoinOperatorConfig::Temporal(config) => &config.table_key_column,
360        }
361    }
362}
363
364impl StreamJoinConfig {
365    /// Create a new stream-stream join configuration.
366    #[must_use]
367    pub fn new(
368        left_key: String,
369        right_key: String,
370        time_bound: Duration,
371        join_type: StreamJoinType,
372    ) -> Self {
373        Self {
374            left_key,
375            right_key,
376            left_time_column: String::new(),
377            right_time_column: String::new(),
378            left_table: String::new(),
379            right_table: String::new(),
380            time_bound,
381            join_type,
382        }
383    }
384
385    /// Create an inner join configuration.
386    #[must_use]
387    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
388        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
389    }
390
391    /// Create a left join configuration.
392    #[must_use]
393    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
394        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
395    }
396}
397
398impl LookupJoinConfig {
399    /// Create a new lookup join configuration.
400    #[must_use]
401    pub fn new(
402        stream_key: String,
403        lookup_key: String,
404        join_type: LookupJoinType,
405        cache_ttl: Duration,
406    ) -> Self {
407        Self {
408            stream_key,
409            lookup_key,
410            join_type,
411            cache_ttl,
412        }
413    }
414
415    /// Create an inner lookup join configuration.
416    #[must_use]
417    pub fn inner(stream_key: String, lookup_key: String) -> Self {
418        Self::new(
419            stream_key,
420            lookup_key,
421            LookupJoinType::Inner,
422            Duration::from_secs(300),
423        )
424    }
425
426    /// Create a left lookup join configuration.
427    #[must_use]
428    pub fn left(stream_key: String, lookup_key: String) -> Self {
429        Self::new(
430            stream_key,
431            lookup_key,
432            LookupJoinType::Left,
433            Duration::from_secs(300),
434        )
435    }
436
437    /// Set the cache TTL.
438    #[must_use]
439    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
440        self.cache_ttl = ttl;
441        self
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448
449    #[test]
450    fn test_stream_join_config() {
451        let config = StreamJoinConfig::inner(
452            "order_id".to_string(),
453            "order_id".to_string(),
454            Duration::from_secs(3600),
455        );
456
457        assert_eq!(config.left_key, "order_id");
458        assert_eq!(config.right_key, "order_id");
459        assert_eq!(config.time_bound, Duration::from_secs(3600));
460        assert_eq!(config.join_type, StreamJoinType::Inner);
461    }
462
463    #[test]
464    fn test_lookup_join_config() {
465        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
466            .with_cache_ttl(Duration::from_secs(600));
467
468        assert_eq!(config.stream_key, "customer_id");
469        assert_eq!(config.lookup_key, "id");
470        assert_eq!(config.cache_ttl, Duration::from_secs(600));
471        assert_eq!(config.join_type, LookupJoinType::Inner);
472    }
473
474    #[test]
475    fn test_from_analysis_lookup() {
476        let analysis = JoinAnalysis::lookup(
477            "orders".to_string(),
478            "customers".to_string(),
479            "customer_id".to_string(),
480            "id".to_string(),
481            JoinType::Inner,
482        );
483
484        let config = JoinOperatorConfig::from_analysis(&analysis);
485
486        assert!(config.is_lookup());
487        assert!(!config.is_stream_stream());
488        assert_eq!(config.left_key(), "customer_id");
489        assert_eq!(config.right_key(), "id");
490    }
491
492    #[test]
493    fn test_from_analysis_stream_stream() {
494        let analysis = JoinAnalysis::stream_stream(
495            "orders".to_string(),
496            "payments".to_string(),
497            "order_id".to_string(),
498            "order_id".to_string(),
499            Duration::from_secs(3600),
500            JoinType::Inner,
501        );
502
503        let config = JoinOperatorConfig::from_analysis(&analysis);
504
505        assert!(config.is_stream_stream());
506        assert!(!config.is_lookup());
507
508        if let JoinOperatorConfig::StreamStream(stream_config) = config {
509            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
510            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
511        }
512    }
513
514    #[test]
515    fn test_from_analysis_asof() {
516        let analysis = JoinAnalysis::asof(
517            "trades".to_string(),
518            "quotes".to_string(),
519            "symbol".to_string(),
520            "symbol".to_string(),
521            AsofSqlDirection::Backward,
522            "ts".to_string(),
523            "ts".to_string(),
524            Some(Duration::from_secs(5)),
525        );
526
527        let config = JoinOperatorConfig::from_analysis(&analysis);
528        assert!(config.is_asof());
529        assert!(!config.is_stream_stream());
530        assert!(!config.is_lookup());
531    }
532
533    #[test]
534    fn test_asof_config_fields() {
535        let analysis = JoinAnalysis::asof(
536            "trades".to_string(),
537            "quotes".to_string(),
538            "symbol".to_string(),
539            "symbol".to_string(),
540            AsofSqlDirection::Forward,
541            "trade_ts".to_string(),
542            "quote_ts".to_string(),
543            Some(Duration::from_millis(5000)),
544        );
545
546        let config = JoinOperatorConfig::from_analysis(&analysis);
547        if let JoinOperatorConfig::Asof(asof) = config {
548            assert_eq!(asof.direction, AsofSqlDirection::Forward);
549            assert_eq!(asof.left_time_column, "trade_ts");
550            assert_eq!(asof.right_time_column, "quote_ts");
551            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
552            assert_eq!(asof.key_column, "symbol");
553            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
554        } else {
555            panic!("Expected Asof config");
556        }
557    }
558
559    #[test]
560    fn test_asof_is_asof() {
561        let analysis = JoinAnalysis::asof(
562            "a".to_string(),
563            "b".to_string(),
564            "id".to_string(),
565            "id".to_string(),
566            AsofSqlDirection::Backward,
567            "ts".to_string(),
568            "ts".to_string(),
569            None,
570        );
571
572        let config = JoinOperatorConfig::from_analysis(&analysis);
573        assert!(config.is_asof());
574    }
575
576    #[test]
577    fn test_asof_key_accessors() {
578        let analysis = JoinAnalysis::asof(
579            "trades".to_string(),
580            "quotes".to_string(),
581            "sym".to_string(),
582            "sym".to_string(),
583            AsofSqlDirection::Backward,
584            "ts".to_string(),
585            "ts".to_string(),
586            None,
587        );
588
589        let config = JoinOperatorConfig::from_analysis(&analysis);
590        assert_eq!(config.left_key(), "sym");
591        assert_eq!(config.right_key(), "sym");
592    }
593
594    // -- Multi-way join translator tests --
595
596    #[test]
597    fn test_from_multi_analysis_single() {
598        let analysis = JoinAnalysis::lookup(
599            "a".to_string(),
600            "b".to_string(),
601            "id".to_string(),
602            "id".to_string(),
603            JoinType::Inner,
604        );
605        let multi = MultiJoinAnalysis {
606            joins: vec![analysis],
607            tables: vec!["a".to_string(), "b".to_string()],
608        };
609
610        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
611        assert_eq!(configs.len(), 1);
612        assert!(configs[0].is_lookup());
613    }
614
615    #[test]
616    fn test_from_multi_analysis_two_lookups() {
617        let j1 = JoinAnalysis::lookup(
618            "a".to_string(),
619            "b".to_string(),
620            "id".to_string(),
621            "a_id".to_string(),
622            JoinType::Inner,
623        );
624        let j2 = JoinAnalysis::lookup(
625            "b".to_string(),
626            "c".to_string(),
627            "id".to_string(),
628            "b_id".to_string(),
629            JoinType::Left,
630        );
631        let multi = MultiJoinAnalysis {
632            joins: vec![j1, j2],
633            tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
634        };
635
636        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
637        assert_eq!(configs.len(), 2);
638        assert!(configs[0].is_lookup());
639        assert!(configs[1].is_lookup());
640        assert_eq!(configs[0].left_key(), "id");
641        assert_eq!(configs[1].left_key(), "id");
642    }
643
644    #[test]
645    fn test_from_multi_analysis_mixed_asof_lookup() {
646        let j1 = JoinAnalysis::asof(
647            "trades".to_string(),
648            "quotes".to_string(),
649            "symbol".to_string(),
650            "symbol".to_string(),
651            AsofSqlDirection::Backward,
652            "ts".to_string(),
653            "ts".to_string(),
654            None,
655        );
656        let j2 = JoinAnalysis::lookup(
657            "quotes".to_string(),
658            "products".to_string(),
659            "product_id".to_string(),
660            "id".to_string(),
661            JoinType::Inner,
662        );
663        let multi = MultiJoinAnalysis {
664            joins: vec![j1, j2],
665            tables: vec![
666                "trades".to_string(),
667                "quotes".to_string(),
668                "products".to_string(),
669            ],
670        };
671
672        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
673        assert_eq!(configs.len(), 2);
674        assert!(configs[0].is_asof());
675        assert!(configs[1].is_lookup());
676    }
677
678    #[test]
679    fn test_from_multi_analysis_stream_stream_and_lookup() {
680        let j1 = JoinAnalysis::stream_stream(
681            "orders".to_string(),
682            "payments".to_string(),
683            "id".to_string(),
684            "order_id".to_string(),
685            Duration::from_secs(3600),
686            JoinType::Inner,
687        );
688        let j2 = JoinAnalysis::lookup(
689            "payments".to_string(),
690            "customers".to_string(),
691            "cust_id".to_string(),
692            "id".to_string(),
693            JoinType::Left,
694        );
695        let multi = MultiJoinAnalysis {
696            joins: vec![j1, j2],
697            tables: vec![
698                "orders".to_string(),
699                "payments".to_string(),
700                "customers".to_string(),
701            ],
702        };
703
704        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
705        assert_eq!(configs.len(), 2);
706        assert!(configs[0].is_stream_stream());
707        assert!(configs[1].is_lookup());
708    }
709
710    #[test]
711    fn test_from_multi_analysis_order_preserved() {
712        let j1 = JoinAnalysis::lookup(
713            "a".to_string(),
714            "b".to_string(),
715            "k1".to_string(),
716            "k1".to_string(),
717            JoinType::Inner,
718        );
719        let j2 = JoinAnalysis::stream_stream(
720            "b".to_string(),
721            "c".to_string(),
722            "k2".to_string(),
723            "k2".to_string(),
724            Duration::from_secs(60),
725            JoinType::Left,
726        );
727        let j3 = JoinAnalysis::lookup(
728            "c".to_string(),
729            "d".to_string(),
730            "k3".to_string(),
731            "k3".to_string(),
732            JoinType::Inner,
733        );
734        let multi = MultiJoinAnalysis {
735            joins: vec![j1, j2, j3],
736            tables: vec![
737                "a".to_string(),
738                "b".to_string(),
739                "c".to_string(),
740                "d".to_string(),
741            ],
742        };
743
744        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
745        assert_eq!(configs.len(), 3);
746        assert!(configs[0].is_lookup());
747        assert!(configs[1].is_stream_stream());
748        assert!(configs[2].is_lookup());
749        assert_eq!(configs[0].left_key(), "k1");
750        assert_eq!(configs[1].left_key(), "k2");
751        assert_eq!(configs[2].left_key(), "k3");
752    }
753
754    #[test]
755    fn test_join_types() {
756        // Test all join types map correctly
757        let left_analysis = JoinAnalysis::stream_stream(
758            "a".to_string(),
759            "b".to_string(),
760            "id".to_string(),
761            "id".to_string(),
762            Duration::from_secs(60),
763            JoinType::Left,
764        );
765
766        if let JoinOperatorConfig::StreamStream(config) =
767            JoinOperatorConfig::from_analysis(&left_analysis)
768        {
769            assert_eq!(config.join_type, StreamJoinType::Left);
770        }
771
772        let right_analysis = JoinAnalysis::stream_stream(
773            "a".to_string(),
774            "b".to_string(),
775            "id".to_string(),
776            "id".to_string(),
777            Duration::from_secs(60),
778            JoinType::Right,
779        );
780
781        if let JoinOperatorConfig::StreamStream(config) =
782            JoinOperatorConfig::from_analysis(&right_analysis)
783        {
784            assert_eq!(config.join_type, StreamJoinType::Right);
785        }
786
787        let full_analysis = JoinAnalysis::stream_stream(
788            "a".to_string(),
789            "b".to_string(),
790            "id".to_string(),
791            "id".to_string(),
792            Duration::from_secs(60),
793            JoinType::Full,
794        );
795
796        if let JoinOperatorConfig::StreamStream(config) =
797            JoinOperatorConfig::from_analysis(&full_analysis)
798        {
799            assert_eq!(config.join_type, StreamJoinType::Full);
800        }
801    }
802
803    #[test]
804    fn test_display_stream_join() {
805        let mut config = StreamJoinConfig::inner(
806            "order_id".to_string(),
807            "order_id".to_string(),
808            Duration::from_secs(3600),
809        );
810        config.left_table = "orders".to_string();
811        config.right_table = "payments".to_string();
812        config.left_time_column = "ts".to_string();
813        config.right_time_column = "ts".to_string();
814        assert_eq!(
815            format!("{config}"),
816            "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
817        );
818    }
819
820    #[test]
821    fn test_display_lookup_join() {
822        let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
823        assert_eq!(
824            format!("{config}"),
825            "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
826        );
827    }
828
829    #[test]
830    fn test_display_asof_join() {
831        let analysis = JoinAnalysis::asof(
832            "trades".to_string(),
833            "quotes".to_string(),
834            "symbol".to_string(),
835            "symbol".to_string(),
836            AsofSqlDirection::Backward,
837            "ts".to_string(),
838            "ts".to_string(),
839            Some(Duration::from_secs(5)),
840        );
841        let config = JoinOperatorConfig::from_analysis(&analysis);
842        let s = format!("{config}");
843        assert!(s.contains("ASOF JOIN"), "got: {s}");
844        assert!(s.contains("BACKWARD"), "got: {s}");
845        assert!(s.contains("tolerance: 5s"), "got: {s}");
846    }
847
848    #[test]
849    fn test_display_join_types() {
850        assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
851        assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
852        assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
853        assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
854        assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
855        assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
856        assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
857        assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
858    }
859
860    #[test]
861    fn test_from_analysis_semi_anti_maps_to_inner() {
862        // Semi/anti join types map to Inner for stream-stream joins
863        // (semi/anti semantics require per-key state tracking which is
864        // architecturally different from interval joins)
865        let semi = JoinAnalysis::stream_stream(
866            "a".to_string(),
867            "b".to_string(),
868            "id".to_string(),
869            "id".to_string(),
870            Duration::from_secs(60),
871            JoinType::LeftSemi,
872        );
873        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
874            assert_eq!(config.join_type, StreamJoinType::Inner);
875        } else {
876            panic!("Expected StreamStream config");
877        }
878
879        let anti = JoinAnalysis::stream_stream(
880            "a".to_string(),
881            "b".to_string(),
882            "id".to_string(),
883            "id".to_string(),
884            Duration::from_secs(60),
885            JoinType::RightAnti,
886        );
887        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
888            assert_eq!(config.join_type, StreamJoinType::Inner);
889        } else {
890            panic!("Expected StreamStream config");
891        }
892    }
893
894    #[test]
895    fn test_from_analysis_temporal() {
896        let analysis = JoinAnalysis::temporal(
897            "orders".to_string(),
898            "products".to_string(),
899            "product_id".to_string(),
900            "id".to_string(),
901            "order_time".to_string(),
902            JoinType::Inner,
903        );
904
905        let config = JoinOperatorConfig::from_analysis(&analysis);
906        assert!(config.is_temporal());
907        assert!(!config.is_asof());
908        assert!(!config.is_lookup());
909        assert!(!config.is_stream_stream());
910        assert_eq!(config.left_key(), "product_id");
911        assert_eq!(config.right_key(), "id");
912
913        if let JoinOperatorConfig::Temporal(tc) = config {
914            assert_eq!(tc.table_version_column, "order_time");
915            assert_eq!(tc.semantics, "event_time");
916            assert_eq!(tc.join_type, "inner");
917        } else {
918            panic!("Expected Temporal config");
919        }
920    }
921
922    #[test]
923    fn test_temporal_left_join() {
924        let analysis = JoinAnalysis::temporal(
925            "orders".to_string(),
926            "products".to_string(),
927            "product_id".to_string(),
928            "id".to_string(),
929            "order_time".to_string(),
930            JoinType::Left,
931        );
932
933        let config = JoinOperatorConfig::from_analysis(&analysis);
934        if let JoinOperatorConfig::Temporal(tc) = config {
935            assert_eq!(tc.join_type, "left");
936        } else {
937            panic!("Expected Temporal config");
938        }
939    }
940
941    #[test]
942    fn test_display_temporal_join() {
943        let analysis = JoinAnalysis::temporal(
944            "orders".to_string(),
945            "products".to_string(),
946            "product_id".to_string(),
947            "id".to_string(),
948            "order_time".to_string(),
949            JoinType::Inner,
950        );
951        let config = JoinOperatorConfig::from_analysis(&analysis);
952        let s = format!("{config}");
953        assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
954        assert!(s.contains("order_time"), "got: {s}");
955    }
956}