Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Left side time column for interval matching
18    pub left_time_column: String,
19    /// Right side time column for interval matching
20    pub right_time_column: String,
21    /// Left side table name
22    pub left_table: String,
23    /// Right side table name
24    pub right_table: String,
25    /// Time bound for joining (max time difference between events)
26    pub time_bound: Duration,
27    /// Join type
28    pub join_type: StreamJoinType,
29}
30
31/// Configuration for lookup join operator
32#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34    /// Stream side key column
35    pub stream_key: String,
36    /// Lookup table key column
37    pub lookup_key: String,
38    /// Join type
39    pub join_type: LookupJoinType,
40    /// Cache TTL for lookup results
41    pub cache_ttl: Duration,
42}
43
44/// Stream-stream join types
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47    /// Both sides required
48    Inner,
49    /// Left side always emitted
50    Left,
51    /// Right side always emitted
52    Right,
53    /// Both sides always emitted
54    Full,
55    /// At most one output per left row (first match)
56    LeftSemi,
57    /// Unmatched left rows only (emitted when deadline passes)
58    LeftAnti,
59}
60
61/// Lookup join types
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum LookupJoinType {
64    /// Stream event required, lookup optional
65    Inner,
66    /// Stream event always emitted, lookup optional
67    Left,
68}
69
70/// Configuration for temporal join operator (FOR SYSTEM_TIME AS OF).
71#[derive(Debug, Clone)]
72pub struct TemporalJoinTranslatorConfig {
73    /// Stream (left) side table name
74    pub stream_table: String,
75    /// Table (right) side table name
76    pub table_name: String,
77    /// Stream side key column
78    pub stream_key_column: String,
79    /// Table side key column
80    pub table_key_column: String,
81    /// Stream-side column for lookup timestamp (from `FOR SYSTEM_TIME AS OF`)
82    pub stream_time_column: String,
83    /// Table-side version column. Empty string = auto-detect from table schema at registration.
84    pub table_version_column: String,
85    /// Temporal semantics: "event_time" or "process_time"
86    pub semantics: String,
87    /// Join type: "inner" or "left"
88    pub join_type: String,
89}
90
91/// Configuration for ASOF join operator
92#[derive(Debug, Clone)]
93pub struct AsofJoinTranslatorConfig {
94    /// Left side table name
95    pub left_table: String,
96    /// Right side table name
97    pub right_table: String,
98    /// Key column for partitioning (e.g., symbol)
99    pub key_column: String,
100    /// Left side time column
101    pub left_time_column: String,
102    /// Right side time column
103    pub right_time_column: String,
104    /// Join direction (Backward, Forward, or Nearest)
105    pub direction: AsofSqlDirection,
106    /// Maximum allowed time difference
107    pub tolerance: Option<Duration>,
108    /// ASOF join type (Inner or Left)
109    pub join_type: AsofSqlJoinType,
110}
111
112/// ASOF join type
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum AsofSqlJoinType {
115    /// Both sides required
116    Inner,
117    /// Left side always emitted
118    Left,
119}
120
121/// Union type for join operator configurations
122#[derive(Debug, Clone)]
123pub enum JoinOperatorConfig {
124    /// Stream-stream join
125    StreamStream(StreamJoinConfig),
126    /// Lookup join
127    Lookup(LookupJoinConfig),
128    /// ASOF join
129    Asof(AsofJoinTranslatorConfig),
130    /// Temporal join (FOR SYSTEM_TIME AS OF)
131    Temporal(TemporalJoinTranslatorConfig),
132    /// Temporal probe join (multi-offset ASOF)
133    TemporalProbe(super::temporal_probe::TemporalProbeConfig),
134}
135
136impl std::fmt::Display for StreamJoinType {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        match self {
139            StreamJoinType::Inner => write!(f, "INNER"),
140            StreamJoinType::Left => write!(f, "LEFT"),
141            StreamJoinType::Right => write!(f, "RIGHT"),
142            StreamJoinType::Full => write!(f, "FULL"),
143            StreamJoinType::LeftSemi => write!(f, "LEFT SEMI"),
144            StreamJoinType::LeftAnti => write!(f, "LEFT ANTI"),
145        }
146    }
147}
148
149impl std::fmt::Display for LookupJoinType {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        match self {
152            LookupJoinType::Inner => write!(f, "INNER"),
153            LookupJoinType::Left => write!(f, "LEFT"),
154        }
155    }
156}
157
158impl std::fmt::Display for AsofSqlJoinType {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        match self {
161            AsofSqlJoinType::Inner => write!(f, "INNER"),
162            AsofSqlJoinType::Left => write!(f, "LEFT"),
163        }
164    }
165}
166
167impl std::fmt::Display for StreamJoinConfig {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        write!(
170            f,
171            "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
172            self.join_type,
173            self.left_table,
174            self.left_key,
175            self.right_table,
176            self.right_key,
177            self.time_bound.as_secs(),
178            self.left_time_column,
179            self.right_time_column,
180        )
181    }
182}
183
184impl std::fmt::Display for LookupJoinConfig {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        write!(
187            f,
188            "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
189            self.join_type,
190            self.stream_key,
191            self.lookup_key,
192            self.cache_ttl.as_secs()
193        )
194    }
195}
196
197impl std::fmt::Display for AsofJoinTranslatorConfig {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        write!(
200            f,
201            "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
202            self.join_type,
203            self.left_table,
204            self.key_column,
205            self.right_table,
206            self.key_column,
207            self.direction,
208            self.left_table,
209            self.left_time_column,
210            self.right_table,
211            self.right_time_column,
212        )?;
213        if let Some(tol) = self.tolerance {
214            write!(f, ", tolerance: {}s", tol.as_secs())?;
215        }
216        write!(f, ")")
217    }
218}
219
220impl std::fmt::Display for TemporalJoinTranslatorConfig {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        write!(
223            f,
224            "{} TEMPORAL JOIN ON stream.{} = table.{} (AS OF: {}, {})",
225            self.join_type.to_uppercase(),
226            self.stream_key_column,
227            self.table_key_column,
228            self.stream_time_column,
229            self.semantics,
230        )
231    }
232}
233
234impl std::fmt::Display for JoinOperatorConfig {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
238            JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
239            JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
240            JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
241            JoinOperatorConfig::TemporalProbe(c) => write!(f, "{c}"),
242        }
243    }
244}
245
246impl JoinOperatorConfig {
247    /// Create from join analysis.
248    #[must_use]
249    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
250        if analysis.is_temporal_join {
251            let join_type_str = match analysis.join_type {
252                JoinType::Left => "left",
253                _ => "inner",
254            };
255            let stream_time_col = analysis.temporal_version_column.clone().unwrap_or_default();
256            // table_version_column left empty — resolved from table schema at registration.
257            return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
258                stream_table: analysis.left_table.clone(),
259                table_name: analysis.right_table.clone(),
260                stream_key_column: analysis.left_key_column.clone(),
261                table_key_column: analysis.right_key_column.clone(),
262                stream_time_column: stream_time_col,
263                table_version_column: String::new(),
264                semantics: "event_time".to_string(),
265                join_type: join_type_str.to_string(),
266            });
267        }
268
269        if analysis.is_asof_join {
270            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
271                left_table: analysis.left_table.clone(),
272                right_table: analysis.right_table.clone(),
273                key_column: analysis.left_key_column.clone(),
274                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
275                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
276                direction: analysis
277                    .asof_direction
278                    .unwrap_or(AsofSqlDirection::Backward),
279                tolerance: analysis.asof_tolerance,
280                join_type: if analysis.join_type == JoinType::Inner {
281                    AsofSqlJoinType::Inner
282                } else {
283                    AsofSqlJoinType::Left
284                },
285            });
286        }
287
288        if analysis.is_lookup_join {
289            JoinOperatorConfig::Lookup(LookupJoinConfig {
290                stream_key: analysis.left_key_column.clone(),
291                lookup_key: analysis.right_key_column.clone(),
292                join_type: match analysis.join_type {
293                    JoinType::Inner => LookupJoinType::Inner,
294                    _ => LookupJoinType::Left,
295                },
296                cache_ttl: Duration::from_secs(300), // Default 5 min
297            })
298        } else {
299            JoinOperatorConfig::StreamStream(StreamJoinConfig {
300                left_key: analysis.left_key_column.clone(),
301                right_key: analysis.right_key_column.clone(),
302                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
303                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
304                left_table: analysis.left_table.clone(),
305                right_table: analysis.right_table.clone(),
306                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
307                join_type: match analysis.join_type {
308                    // RightSemi/RightAnti map to Inner here but are rejected
309                    // at detection time in sql_analysis.rs before reaching execution.
310                    JoinType::Inner | JoinType::RightSemi | JoinType::RightAnti => {
311                        StreamJoinType::Inner
312                    }
313                    JoinType::LeftSemi => StreamJoinType::LeftSemi,
314                    JoinType::LeftAnti => StreamJoinType::LeftAnti,
315                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
316                    JoinType::Right => StreamJoinType::Right,
317                    JoinType::Full => StreamJoinType::Full,
318                },
319            })
320        }
321    }
322
323    /// Create from a multi-join analysis, producing one config per join step.
324    #[must_use]
325    pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
326        multi.joins.iter().map(Self::from_analysis).collect()
327    }
328
329    /// Check if this is a stream-stream join.
330    #[must_use]
331    pub fn is_stream_stream(&self) -> bool {
332        matches!(self, JoinOperatorConfig::StreamStream(_))
333    }
334
335    /// Check if this is a lookup join.
336    #[must_use]
337    pub fn is_lookup(&self) -> bool {
338        matches!(self, JoinOperatorConfig::Lookup(_))
339    }
340
341    /// Check if this is an ASOF join.
342    #[must_use]
343    pub fn is_asof(&self) -> bool {
344        matches!(self, JoinOperatorConfig::Asof(_))
345    }
346
347    /// Check if this is a temporal join.
348    #[must_use]
349    pub fn is_temporal(&self) -> bool {
350        matches!(self, JoinOperatorConfig::Temporal(_))
351    }
352
353    /// Get the left key column name.
354    #[must_use]
355    pub fn left_key(&self) -> &str {
356        match self {
357            JoinOperatorConfig::StreamStream(config) => &config.left_key,
358            JoinOperatorConfig::Lookup(config) => &config.stream_key,
359            JoinOperatorConfig::Asof(config) => &config.key_column,
360            JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
361            JoinOperatorConfig::TemporalProbe(config) => {
362                config.key_columns.first().map_or("", String::as_str)
363            }
364        }
365    }
366
367    /// Get the right key column name.
368    #[must_use]
369    pub fn right_key(&self) -> &str {
370        match self {
371            JoinOperatorConfig::StreamStream(config) => &config.right_key,
372            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
373            JoinOperatorConfig::Asof(config) => &config.key_column,
374            JoinOperatorConfig::Temporal(config) => &config.table_key_column,
375            JoinOperatorConfig::TemporalProbe(config) => {
376                config.key_columns.first().map_or("", String::as_str)
377            }
378        }
379    }
380}
381
382impl StreamJoinConfig {
383    /// Create a new stream-stream join configuration.
384    #[must_use]
385    pub fn new(
386        left_key: String,
387        right_key: String,
388        time_bound: Duration,
389        join_type: StreamJoinType,
390    ) -> Self {
391        Self {
392            left_key,
393            right_key,
394            left_time_column: String::new(),
395            right_time_column: String::new(),
396            left_table: String::new(),
397            right_table: String::new(),
398            time_bound,
399            join_type,
400        }
401    }
402
403    /// Create an inner join configuration.
404    #[must_use]
405    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
406        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
407    }
408
409    /// Create a left join configuration.
410    #[must_use]
411    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
412        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
413    }
414}
415
416impl LookupJoinConfig {
417    /// Create a new lookup join configuration.
418    #[must_use]
419    pub fn new(
420        stream_key: String,
421        lookup_key: String,
422        join_type: LookupJoinType,
423        cache_ttl: Duration,
424    ) -> Self {
425        Self {
426            stream_key,
427            lookup_key,
428            join_type,
429            cache_ttl,
430        }
431    }
432
433    /// Create an inner lookup join configuration.
434    #[must_use]
435    pub fn inner(stream_key: String, lookup_key: String) -> Self {
436        Self::new(
437            stream_key,
438            lookup_key,
439            LookupJoinType::Inner,
440            Duration::from_secs(300),
441        )
442    }
443
444    /// Create a left lookup join configuration.
445    #[must_use]
446    pub fn left(stream_key: String, lookup_key: String) -> Self {
447        Self::new(
448            stream_key,
449            lookup_key,
450            LookupJoinType::Left,
451            Duration::from_secs(300),
452        )
453    }
454
455    /// Set the cache TTL.
456    #[must_use]
457    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
458        self.cache_ttl = ttl;
459        self
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_stream_join_config() {
469        let config = StreamJoinConfig::inner(
470            "order_id".to_string(),
471            "order_id".to_string(),
472            Duration::from_secs(3600),
473        );
474
475        assert_eq!(config.left_key, "order_id");
476        assert_eq!(config.right_key, "order_id");
477        assert_eq!(config.time_bound, Duration::from_secs(3600));
478        assert_eq!(config.join_type, StreamJoinType::Inner);
479    }
480
481    #[test]
482    fn test_lookup_join_config() {
483        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
484            .with_cache_ttl(Duration::from_secs(600));
485
486        assert_eq!(config.stream_key, "customer_id");
487        assert_eq!(config.lookup_key, "id");
488        assert_eq!(config.cache_ttl, Duration::from_secs(600));
489        assert_eq!(config.join_type, LookupJoinType::Inner);
490    }
491
492    #[test]
493    fn test_from_analysis_lookup() {
494        let analysis = JoinAnalysis::lookup(
495            "orders".to_string(),
496            "customers".to_string(),
497            "customer_id".to_string(),
498            "id".to_string(),
499            JoinType::Inner,
500        );
501
502        let config = JoinOperatorConfig::from_analysis(&analysis);
503
504        assert!(config.is_lookup());
505        assert!(!config.is_stream_stream());
506        assert_eq!(config.left_key(), "customer_id");
507        assert_eq!(config.right_key(), "id");
508    }
509
510    #[test]
511    fn test_from_analysis_stream_stream() {
512        let analysis = JoinAnalysis::stream_stream(
513            "orders".to_string(),
514            "payments".to_string(),
515            "order_id".to_string(),
516            "order_id".to_string(),
517            Duration::from_secs(3600),
518            JoinType::Inner,
519        );
520
521        let config = JoinOperatorConfig::from_analysis(&analysis);
522
523        assert!(config.is_stream_stream());
524        assert!(!config.is_lookup());
525
526        if let JoinOperatorConfig::StreamStream(stream_config) = config {
527            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
528            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
529        }
530    }
531
532    #[test]
533    fn test_from_analysis_asof() {
534        let analysis = JoinAnalysis::asof(
535            "trades".to_string(),
536            "quotes".to_string(),
537            "symbol".to_string(),
538            "symbol".to_string(),
539            AsofSqlDirection::Backward,
540            "ts".to_string(),
541            "ts".to_string(),
542            Some(Duration::from_secs(5)),
543        );
544
545        let config = JoinOperatorConfig::from_analysis(&analysis);
546        assert!(config.is_asof());
547        assert!(!config.is_stream_stream());
548        assert!(!config.is_lookup());
549    }
550
551    #[test]
552    fn test_asof_config_fields() {
553        let analysis = JoinAnalysis::asof(
554            "trades".to_string(),
555            "quotes".to_string(),
556            "symbol".to_string(),
557            "symbol".to_string(),
558            AsofSqlDirection::Forward,
559            "trade_ts".to_string(),
560            "quote_ts".to_string(),
561            Some(Duration::from_millis(5000)),
562        );
563
564        let config = JoinOperatorConfig::from_analysis(&analysis);
565        if let JoinOperatorConfig::Asof(asof) = config {
566            assert_eq!(asof.direction, AsofSqlDirection::Forward);
567            assert_eq!(asof.left_time_column, "trade_ts");
568            assert_eq!(asof.right_time_column, "quote_ts");
569            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
570            assert_eq!(asof.key_column, "symbol");
571            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
572        } else {
573            panic!("Expected Asof config");
574        }
575    }
576
577    #[test]
578    fn test_asof_is_asof() {
579        let analysis = JoinAnalysis::asof(
580            "a".to_string(),
581            "b".to_string(),
582            "id".to_string(),
583            "id".to_string(),
584            AsofSqlDirection::Backward,
585            "ts".to_string(),
586            "ts".to_string(),
587            None,
588        );
589
590        let config = JoinOperatorConfig::from_analysis(&analysis);
591        assert!(config.is_asof());
592    }
593
594    #[test]
595    fn test_asof_key_accessors() {
596        let analysis = JoinAnalysis::asof(
597            "trades".to_string(),
598            "quotes".to_string(),
599            "sym".to_string(),
600            "sym".to_string(),
601            AsofSqlDirection::Backward,
602            "ts".to_string(),
603            "ts".to_string(),
604            None,
605        );
606
607        let config = JoinOperatorConfig::from_analysis(&analysis);
608        assert_eq!(config.left_key(), "sym");
609        assert_eq!(config.right_key(), "sym");
610    }
611
612    // -- Multi-way join translator tests --
613
614    #[test]
615    fn test_from_multi_analysis_single() {
616        let analysis = JoinAnalysis::lookup(
617            "a".to_string(),
618            "b".to_string(),
619            "id".to_string(),
620            "id".to_string(),
621            JoinType::Inner,
622        );
623        let multi = MultiJoinAnalysis {
624            joins: vec![analysis],
625            tables: vec!["a".to_string(), "b".to_string()],
626        };
627
628        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
629        assert_eq!(configs.len(), 1);
630        assert!(configs[0].is_lookup());
631    }
632
633    #[test]
634    fn test_from_multi_analysis_two_lookups() {
635        let j1 = JoinAnalysis::lookup(
636            "a".to_string(),
637            "b".to_string(),
638            "id".to_string(),
639            "a_id".to_string(),
640            JoinType::Inner,
641        );
642        let j2 = JoinAnalysis::lookup(
643            "b".to_string(),
644            "c".to_string(),
645            "id".to_string(),
646            "b_id".to_string(),
647            JoinType::Left,
648        );
649        let multi = MultiJoinAnalysis {
650            joins: vec![j1, j2],
651            tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
652        };
653
654        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
655        assert_eq!(configs.len(), 2);
656        assert!(configs[0].is_lookup());
657        assert!(configs[1].is_lookup());
658        assert_eq!(configs[0].left_key(), "id");
659        assert_eq!(configs[1].left_key(), "id");
660    }
661
662    #[test]
663    fn test_from_multi_analysis_mixed_asof_lookup() {
664        let j1 = JoinAnalysis::asof(
665            "trades".to_string(),
666            "quotes".to_string(),
667            "symbol".to_string(),
668            "symbol".to_string(),
669            AsofSqlDirection::Backward,
670            "ts".to_string(),
671            "ts".to_string(),
672            None,
673        );
674        let j2 = JoinAnalysis::lookup(
675            "quotes".to_string(),
676            "products".to_string(),
677            "product_id".to_string(),
678            "id".to_string(),
679            JoinType::Inner,
680        );
681        let multi = MultiJoinAnalysis {
682            joins: vec![j1, j2],
683            tables: vec![
684                "trades".to_string(),
685                "quotes".to_string(),
686                "products".to_string(),
687            ],
688        };
689
690        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
691        assert_eq!(configs.len(), 2);
692        assert!(configs[0].is_asof());
693        assert!(configs[1].is_lookup());
694    }
695
696    #[test]
697    fn test_from_multi_analysis_stream_stream_and_lookup() {
698        let j1 = JoinAnalysis::stream_stream(
699            "orders".to_string(),
700            "payments".to_string(),
701            "id".to_string(),
702            "order_id".to_string(),
703            Duration::from_secs(3600),
704            JoinType::Inner,
705        );
706        let j2 = JoinAnalysis::lookup(
707            "payments".to_string(),
708            "customers".to_string(),
709            "cust_id".to_string(),
710            "id".to_string(),
711            JoinType::Left,
712        );
713        let multi = MultiJoinAnalysis {
714            joins: vec![j1, j2],
715            tables: vec![
716                "orders".to_string(),
717                "payments".to_string(),
718                "customers".to_string(),
719            ],
720        };
721
722        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
723        assert_eq!(configs.len(), 2);
724        assert!(configs[0].is_stream_stream());
725        assert!(configs[1].is_lookup());
726    }
727
728    #[test]
729    fn test_from_multi_analysis_order_preserved() {
730        let j1 = JoinAnalysis::lookup(
731            "a".to_string(),
732            "b".to_string(),
733            "k1".to_string(),
734            "k1".to_string(),
735            JoinType::Inner,
736        );
737        let j2 = JoinAnalysis::stream_stream(
738            "b".to_string(),
739            "c".to_string(),
740            "k2".to_string(),
741            "k2".to_string(),
742            Duration::from_secs(60),
743            JoinType::Left,
744        );
745        let j3 = JoinAnalysis::lookup(
746            "c".to_string(),
747            "d".to_string(),
748            "k3".to_string(),
749            "k3".to_string(),
750            JoinType::Inner,
751        );
752        let multi = MultiJoinAnalysis {
753            joins: vec![j1, j2, j3],
754            tables: vec![
755                "a".to_string(),
756                "b".to_string(),
757                "c".to_string(),
758                "d".to_string(),
759            ],
760        };
761
762        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
763        assert_eq!(configs.len(), 3);
764        assert!(configs[0].is_lookup());
765        assert!(configs[1].is_stream_stream());
766        assert!(configs[2].is_lookup());
767        assert_eq!(configs[0].left_key(), "k1");
768        assert_eq!(configs[1].left_key(), "k2");
769        assert_eq!(configs[2].left_key(), "k3");
770    }
771
772    #[test]
773    fn test_join_types() {
774        // Test all join types map correctly
775        let left_analysis = JoinAnalysis::stream_stream(
776            "a".to_string(),
777            "b".to_string(),
778            "id".to_string(),
779            "id".to_string(),
780            Duration::from_secs(60),
781            JoinType::Left,
782        );
783
784        if let JoinOperatorConfig::StreamStream(config) =
785            JoinOperatorConfig::from_analysis(&left_analysis)
786        {
787            assert_eq!(config.join_type, StreamJoinType::Left);
788        }
789
790        let right_analysis = JoinAnalysis::stream_stream(
791            "a".to_string(),
792            "b".to_string(),
793            "id".to_string(),
794            "id".to_string(),
795            Duration::from_secs(60),
796            JoinType::Right,
797        );
798
799        if let JoinOperatorConfig::StreamStream(config) =
800            JoinOperatorConfig::from_analysis(&right_analysis)
801        {
802            assert_eq!(config.join_type, StreamJoinType::Right);
803        }
804
805        let full_analysis = JoinAnalysis::stream_stream(
806            "a".to_string(),
807            "b".to_string(),
808            "id".to_string(),
809            "id".to_string(),
810            Duration::from_secs(60),
811            JoinType::Full,
812        );
813
814        if let JoinOperatorConfig::StreamStream(config) =
815            JoinOperatorConfig::from_analysis(&full_analysis)
816        {
817            assert_eq!(config.join_type, StreamJoinType::Full);
818        }
819    }
820
821    #[test]
822    fn test_display_stream_join() {
823        let mut config = StreamJoinConfig::inner(
824            "order_id".to_string(),
825            "order_id".to_string(),
826            Duration::from_secs(3600),
827        );
828        config.left_table = "orders".to_string();
829        config.right_table = "payments".to_string();
830        config.left_time_column = "ts".to_string();
831        config.right_time_column = "ts".to_string();
832        assert_eq!(
833            format!("{config}"),
834            "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
835        );
836    }
837
838    #[test]
839    fn test_display_lookup_join() {
840        let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
841        assert_eq!(
842            format!("{config}"),
843            "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
844        );
845    }
846
847    #[test]
848    fn test_display_asof_join() {
849        let analysis = JoinAnalysis::asof(
850            "trades".to_string(),
851            "quotes".to_string(),
852            "symbol".to_string(),
853            "symbol".to_string(),
854            AsofSqlDirection::Backward,
855            "ts".to_string(),
856            "ts".to_string(),
857            Some(Duration::from_secs(5)),
858        );
859        let config = JoinOperatorConfig::from_analysis(&analysis);
860        let s = format!("{config}");
861        assert!(s.contains("ASOF JOIN"), "got: {s}");
862        assert!(s.contains("BACKWARD"), "got: {s}");
863        assert!(s.contains("tolerance: 5s"), "got: {s}");
864    }
865
866    #[test]
867    fn test_display_join_types() {
868        assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
869        assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
870        assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
871        assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
872        assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
873        assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
874        assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
875        assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
876    }
877
878    #[test]
879    fn test_from_analysis_semi_anti() {
880        let semi = JoinAnalysis::stream_stream(
881            "a".to_string(),
882            "b".to_string(),
883            "id".to_string(),
884            "id".to_string(),
885            Duration::from_secs(60),
886            JoinType::LeftSemi,
887        );
888        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
889            assert_eq!(config.join_type, StreamJoinType::LeftSemi);
890        } else {
891            panic!("Expected StreamStream config");
892        }
893
894        let anti = JoinAnalysis::stream_stream(
895            "a".to_string(),
896            "b".to_string(),
897            "id".to_string(),
898            "id".to_string(),
899            Duration::from_secs(60),
900            JoinType::LeftAnti,
901        );
902        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
903            assert_eq!(config.join_type, StreamJoinType::LeftAnti);
904        } else {
905            panic!("Expected StreamStream config");
906        }
907
908        // RightSemi/RightAnti still map to Inner (not yet implemented)
909        let right_anti = JoinAnalysis::stream_stream(
910            "a".to_string(),
911            "b".to_string(),
912            "id".to_string(),
913            "id".to_string(),
914            Duration::from_secs(60),
915            JoinType::RightAnti,
916        );
917        if let JoinOperatorConfig::StreamStream(config) =
918            JoinOperatorConfig::from_analysis(&right_anti)
919        {
920            assert_eq!(config.join_type, StreamJoinType::Inner);
921        } else {
922            panic!("Expected StreamStream config");
923        }
924    }
925
926    #[test]
927    fn test_from_analysis_temporal() {
928        let analysis = JoinAnalysis::temporal(
929            "orders".to_string(),
930            "products".to_string(),
931            "product_id".to_string(),
932            "id".to_string(),
933            "order_time".to_string(),
934            JoinType::Inner,
935        );
936
937        let config = JoinOperatorConfig::from_analysis(&analysis);
938        assert!(config.is_temporal());
939        assert!(!config.is_asof());
940        assert!(!config.is_lookup());
941        assert!(!config.is_stream_stream());
942        assert_eq!(config.left_key(), "product_id");
943        assert_eq!(config.right_key(), "id");
944
945        if let JoinOperatorConfig::Temporal(tc) = config {
946            assert!(tc.table_version_column.is_empty());
947            assert_eq!(tc.stream_time_column, "order_time");
948            assert_eq!(tc.semantics, "event_time");
949            assert_eq!(tc.join_type, "inner");
950        } else {
951            panic!("Expected Temporal config");
952        }
953    }
954
955    #[test]
956    fn test_temporal_left_join() {
957        let analysis = JoinAnalysis::temporal(
958            "orders".to_string(),
959            "products".to_string(),
960            "product_id".to_string(),
961            "id".to_string(),
962            "order_time".to_string(),
963            JoinType::Left,
964        );
965
966        let config = JoinOperatorConfig::from_analysis(&analysis);
967        if let JoinOperatorConfig::Temporal(tc) = config {
968            assert_eq!(tc.join_type, "left");
969        } else {
970            panic!("Expected Temporal config");
971        }
972    }
973
974    #[test]
975    fn test_display_temporal_join() {
976        let analysis = JoinAnalysis::temporal(
977            "orders".to_string(),
978            "products".to_string(),
979            "product_id".to_string(),
980            "id".to_string(),
981            "order_time".to_string(),
982            JoinType::Inner,
983        );
984        let config = JoinOperatorConfig::from_analysis(&analysis);
985        let s = format!("{config}");
986        assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
987        assert!(s.contains("order_time"), "got: {s}");
988    }
989}