Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Left side time column for interval matching
18    pub left_time_column: String,
19    /// Right side time column for interval matching
20    pub right_time_column: String,
21    /// Left side table name
22    pub left_table: String,
23    /// Right side table name
24    pub right_table: String,
25    /// Time bound for joining (max time difference between events)
26    pub time_bound: Duration,
27    /// Join type
28    pub join_type: StreamJoinType,
29}
30
31/// Configuration for lookup join operator
32#[derive(Debug, Clone)]
33pub struct LookupJoinConfig {
34    /// Stream side key column
35    pub stream_key: String,
36    /// Lookup table key column
37    pub lookup_key: String,
38    /// Join type
39    pub join_type: LookupJoinType,
40    /// Cache TTL for lookup results
41    pub cache_ttl: Duration,
42}
43
44/// Stream-stream join types
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamJoinType {
47    /// Both sides required
48    Inner,
49    /// Left side always emitted
50    Left,
51    /// Right side always emitted
52    Right,
53    /// Both sides always emitted
54    Full,
55}
56
57/// Lookup join types
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum LookupJoinType {
60    /// Stream event required, lookup optional
61    Inner,
62    /// Stream event always emitted, lookup optional
63    Left,
64}
65
66/// Configuration for temporal join operator (FOR SYSTEM_TIME AS OF).
67#[derive(Debug, Clone)]
68pub struct TemporalJoinTranslatorConfig {
69    /// Stream (left) side table name
70    pub stream_table: String,
71    /// Table (right) side table name
72    pub table_name: String,
73    /// Stream side key column
74    pub stream_key_column: String,
75    /// Table side key column
76    pub table_key_column: String,
77    /// Stream-side column for lookup timestamp (from `FOR SYSTEM_TIME AS OF`)
78    pub stream_time_column: String,
79    /// Table-side version column (defaults to same as `stream_time_column`)
80    pub table_version_column: String,
81    /// Temporal semantics: "event_time" or "process_time"
82    pub semantics: String,
83    /// Join type: "inner" or "left"
84    pub join_type: String,
85}
86
87/// Configuration for ASOF join operator
88#[derive(Debug, Clone)]
89pub struct AsofJoinTranslatorConfig {
90    /// Left side table name
91    pub left_table: String,
92    /// Right side table name
93    pub right_table: String,
94    /// Key column for partitioning (e.g., symbol)
95    pub key_column: String,
96    /// Left side time column
97    pub left_time_column: String,
98    /// Right side time column
99    pub right_time_column: String,
100    /// Join direction (Backward, Forward, or Nearest)
101    pub direction: AsofSqlDirection,
102    /// Maximum allowed time difference
103    pub tolerance: Option<Duration>,
104    /// ASOF join type (Inner or Left)
105    pub join_type: AsofSqlJoinType,
106}
107
108/// ASOF join type
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum AsofSqlJoinType {
111    /// Both sides required
112    Inner,
113    /// Left side always emitted
114    Left,
115}
116
117/// Union type for join operator configurations
118#[derive(Debug, Clone)]
119pub enum JoinOperatorConfig {
120    /// Stream-stream join
121    StreamStream(StreamJoinConfig),
122    /// Lookup join
123    Lookup(LookupJoinConfig),
124    /// ASOF join
125    Asof(AsofJoinTranslatorConfig),
126    /// Temporal join (FOR SYSTEM_TIME AS OF)
127    Temporal(TemporalJoinTranslatorConfig),
128    /// Temporal probe join (multi-offset ASOF)
129    TemporalProbe(super::temporal_probe::TemporalProbeConfig),
130}
131
132impl std::fmt::Display for StreamJoinType {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        match self {
135            StreamJoinType::Inner => write!(f, "INNER"),
136            StreamJoinType::Left => write!(f, "LEFT"),
137            StreamJoinType::Right => write!(f, "RIGHT"),
138            StreamJoinType::Full => write!(f, "FULL"),
139        }
140    }
141}
142
143impl std::fmt::Display for LookupJoinType {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        match self {
146            LookupJoinType::Inner => write!(f, "INNER"),
147            LookupJoinType::Left => write!(f, "LEFT"),
148        }
149    }
150}
151
152impl std::fmt::Display for AsofSqlJoinType {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        match self {
155            AsofSqlJoinType::Inner => write!(f, "INNER"),
156            AsofSqlJoinType::Left => write!(f, "LEFT"),
157        }
158    }
159}
160
161impl std::fmt::Display for StreamJoinConfig {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(
164            f,
165            "{} JOIN ON {}.{} = {}.{} (bound: {}s, time: {} ~ {})",
166            self.join_type,
167            self.left_table,
168            self.left_key,
169            self.right_table,
170            self.right_key,
171            self.time_bound.as_secs(),
172            self.left_time_column,
173            self.right_time_column,
174        )
175    }
176}
177
178impl std::fmt::Display for LookupJoinConfig {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        write!(
181            f,
182            "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
183            self.join_type,
184            self.stream_key,
185            self.lookup_key,
186            self.cache_ttl.as_secs()
187        )
188    }
189}
190
191impl std::fmt::Display for AsofJoinTranslatorConfig {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        write!(
194            f,
195            "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
196            self.join_type,
197            self.left_table,
198            self.key_column,
199            self.right_table,
200            self.key_column,
201            self.direction,
202            self.left_table,
203            self.left_time_column,
204            self.right_table,
205            self.right_time_column,
206        )?;
207        if let Some(tol) = self.tolerance {
208            write!(f, ", tolerance: {}s", tol.as_secs())?;
209        }
210        write!(f, ")")
211    }
212}
213
214impl std::fmt::Display for TemporalJoinTranslatorConfig {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        write!(
217            f,
218            "{} TEMPORAL JOIN ON stream.{} = table.{} (version: {}, {})",
219            self.join_type.to_uppercase(),
220            self.stream_key_column,
221            self.table_key_column,
222            self.table_version_column,
223            self.semantics,
224        )
225    }
226}
227
228impl std::fmt::Display for JoinOperatorConfig {
229    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
230        match self {
231            JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
232            JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
233            JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
234            JoinOperatorConfig::Temporal(c) => write!(f, "{c}"),
235            JoinOperatorConfig::TemporalProbe(c) => write!(f, "{c}"),
236        }
237    }
238}
239
240impl JoinOperatorConfig {
241    /// Create from join analysis.
242    #[must_use]
243    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
244        if analysis.is_temporal_join {
245            let join_type_str = match analysis.join_type {
246                JoinType::Left => "left",
247                _ => "inner",
248            };
249            let version_col = analysis.temporal_version_column.clone().unwrap_or_default();
250            return JoinOperatorConfig::Temporal(TemporalJoinTranslatorConfig {
251                stream_table: analysis.left_table.clone(),
252                table_name: analysis.right_table.clone(),
253                stream_key_column: analysis.left_key_column.clone(),
254                table_key_column: analysis.right_key_column.clone(),
255                stream_time_column: version_col.clone(),
256                table_version_column: version_col,
257                semantics: "event_time".to_string(),
258                join_type: join_type_str.to_string(),
259            });
260        }
261
262        if analysis.is_asof_join {
263            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
264                left_table: analysis.left_table.clone(),
265                right_table: analysis.right_table.clone(),
266                key_column: analysis.left_key_column.clone(),
267                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
268                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
269                direction: analysis
270                    .asof_direction
271                    .unwrap_or(AsofSqlDirection::Backward),
272                tolerance: analysis.asof_tolerance,
273                join_type: if analysis.join_type == JoinType::Inner {
274                    AsofSqlJoinType::Inner
275                } else {
276                    AsofSqlJoinType::Left
277                },
278            });
279        }
280
281        if analysis.is_lookup_join {
282            JoinOperatorConfig::Lookup(LookupJoinConfig {
283                stream_key: analysis.left_key_column.clone(),
284                lookup_key: analysis.right_key_column.clone(),
285                join_type: match analysis.join_type {
286                    JoinType::Inner => LookupJoinType::Inner,
287                    _ => LookupJoinType::Left,
288                },
289                cache_ttl: Duration::from_secs(300), // Default 5 min
290            })
291        } else {
292            JoinOperatorConfig::StreamStream(StreamJoinConfig {
293                left_key: analysis.left_key_column.clone(),
294                right_key: analysis.right_key_column.clone(),
295                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
296                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
297                left_table: analysis.left_table.clone(),
298                right_table: analysis.right_table.clone(),
299                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
300                join_type: match analysis.join_type {
301                    JoinType::Inner
302                    | JoinType::LeftSemi
303                    | JoinType::LeftAnti
304                    | JoinType::RightSemi
305                    | JoinType::RightAnti => StreamJoinType::Inner,
306                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
307                    JoinType::Right => StreamJoinType::Right,
308                    JoinType::Full => StreamJoinType::Full,
309                },
310            })
311        }
312    }
313
314    /// Create from a multi-join analysis, producing one config per join step.
315    #[must_use]
316    pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
317        multi.joins.iter().map(Self::from_analysis).collect()
318    }
319
320    /// Check if this is a stream-stream join.
321    #[must_use]
322    pub fn is_stream_stream(&self) -> bool {
323        matches!(self, JoinOperatorConfig::StreamStream(_))
324    }
325
326    /// Check if this is a lookup join.
327    #[must_use]
328    pub fn is_lookup(&self) -> bool {
329        matches!(self, JoinOperatorConfig::Lookup(_))
330    }
331
332    /// Check if this is an ASOF join.
333    #[must_use]
334    pub fn is_asof(&self) -> bool {
335        matches!(self, JoinOperatorConfig::Asof(_))
336    }
337
338    /// Check if this is a temporal join.
339    #[must_use]
340    pub fn is_temporal(&self) -> bool {
341        matches!(self, JoinOperatorConfig::Temporal(_))
342    }
343
344    /// Get the left key column name.
345    #[must_use]
346    pub fn left_key(&self) -> &str {
347        match self {
348            JoinOperatorConfig::StreamStream(config) => &config.left_key,
349            JoinOperatorConfig::Lookup(config) => &config.stream_key,
350            JoinOperatorConfig::Asof(config) => &config.key_column,
351            JoinOperatorConfig::Temporal(config) => &config.stream_key_column,
352            JoinOperatorConfig::TemporalProbe(config) => {
353                config.key_columns.first().map_or("", String::as_str)
354            }
355        }
356    }
357
358    /// Get the right key column name.
359    #[must_use]
360    pub fn right_key(&self) -> &str {
361        match self {
362            JoinOperatorConfig::StreamStream(config) => &config.right_key,
363            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
364            JoinOperatorConfig::Asof(config) => &config.key_column,
365            JoinOperatorConfig::Temporal(config) => &config.table_key_column,
366            JoinOperatorConfig::TemporalProbe(config) => {
367                config.key_columns.first().map_or("", String::as_str)
368            }
369        }
370    }
371}
372
373impl StreamJoinConfig {
374    /// Create a new stream-stream join configuration.
375    #[must_use]
376    pub fn new(
377        left_key: String,
378        right_key: String,
379        time_bound: Duration,
380        join_type: StreamJoinType,
381    ) -> Self {
382        Self {
383            left_key,
384            right_key,
385            left_time_column: String::new(),
386            right_time_column: String::new(),
387            left_table: String::new(),
388            right_table: String::new(),
389            time_bound,
390            join_type,
391        }
392    }
393
394    /// Create an inner join configuration.
395    #[must_use]
396    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
397        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
398    }
399
400    /// Create a left join configuration.
401    #[must_use]
402    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
403        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
404    }
405}
406
407impl LookupJoinConfig {
408    /// Create a new lookup join configuration.
409    #[must_use]
410    pub fn new(
411        stream_key: String,
412        lookup_key: String,
413        join_type: LookupJoinType,
414        cache_ttl: Duration,
415    ) -> Self {
416        Self {
417            stream_key,
418            lookup_key,
419            join_type,
420            cache_ttl,
421        }
422    }
423
424    /// Create an inner lookup join configuration.
425    #[must_use]
426    pub fn inner(stream_key: String, lookup_key: String) -> Self {
427        Self::new(
428            stream_key,
429            lookup_key,
430            LookupJoinType::Inner,
431            Duration::from_secs(300),
432        )
433    }
434
435    /// Create a left lookup join configuration.
436    #[must_use]
437    pub fn left(stream_key: String, lookup_key: String) -> Self {
438        Self::new(
439            stream_key,
440            lookup_key,
441            LookupJoinType::Left,
442            Duration::from_secs(300),
443        )
444    }
445
446    /// Set the cache TTL.
447    #[must_use]
448    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
449        self.cache_ttl = ttl;
450        self
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    #[test]
459    fn test_stream_join_config() {
460        let config = StreamJoinConfig::inner(
461            "order_id".to_string(),
462            "order_id".to_string(),
463            Duration::from_secs(3600),
464        );
465
466        assert_eq!(config.left_key, "order_id");
467        assert_eq!(config.right_key, "order_id");
468        assert_eq!(config.time_bound, Duration::from_secs(3600));
469        assert_eq!(config.join_type, StreamJoinType::Inner);
470    }
471
472    #[test]
473    fn test_lookup_join_config() {
474        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
475            .with_cache_ttl(Duration::from_secs(600));
476
477        assert_eq!(config.stream_key, "customer_id");
478        assert_eq!(config.lookup_key, "id");
479        assert_eq!(config.cache_ttl, Duration::from_secs(600));
480        assert_eq!(config.join_type, LookupJoinType::Inner);
481    }
482
483    #[test]
484    fn test_from_analysis_lookup() {
485        let analysis = JoinAnalysis::lookup(
486            "orders".to_string(),
487            "customers".to_string(),
488            "customer_id".to_string(),
489            "id".to_string(),
490            JoinType::Inner,
491        );
492
493        let config = JoinOperatorConfig::from_analysis(&analysis);
494
495        assert!(config.is_lookup());
496        assert!(!config.is_stream_stream());
497        assert_eq!(config.left_key(), "customer_id");
498        assert_eq!(config.right_key(), "id");
499    }
500
501    #[test]
502    fn test_from_analysis_stream_stream() {
503        let analysis = JoinAnalysis::stream_stream(
504            "orders".to_string(),
505            "payments".to_string(),
506            "order_id".to_string(),
507            "order_id".to_string(),
508            Duration::from_secs(3600),
509            JoinType::Inner,
510        );
511
512        let config = JoinOperatorConfig::from_analysis(&analysis);
513
514        assert!(config.is_stream_stream());
515        assert!(!config.is_lookup());
516
517        if let JoinOperatorConfig::StreamStream(stream_config) = config {
518            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
519            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
520        }
521    }
522
523    #[test]
524    fn test_from_analysis_asof() {
525        let analysis = JoinAnalysis::asof(
526            "trades".to_string(),
527            "quotes".to_string(),
528            "symbol".to_string(),
529            "symbol".to_string(),
530            AsofSqlDirection::Backward,
531            "ts".to_string(),
532            "ts".to_string(),
533            Some(Duration::from_secs(5)),
534        );
535
536        let config = JoinOperatorConfig::from_analysis(&analysis);
537        assert!(config.is_asof());
538        assert!(!config.is_stream_stream());
539        assert!(!config.is_lookup());
540    }
541
542    #[test]
543    fn test_asof_config_fields() {
544        let analysis = JoinAnalysis::asof(
545            "trades".to_string(),
546            "quotes".to_string(),
547            "symbol".to_string(),
548            "symbol".to_string(),
549            AsofSqlDirection::Forward,
550            "trade_ts".to_string(),
551            "quote_ts".to_string(),
552            Some(Duration::from_millis(5000)),
553        );
554
555        let config = JoinOperatorConfig::from_analysis(&analysis);
556        if let JoinOperatorConfig::Asof(asof) = config {
557            assert_eq!(asof.direction, AsofSqlDirection::Forward);
558            assert_eq!(asof.left_time_column, "trade_ts");
559            assert_eq!(asof.right_time_column, "quote_ts");
560            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
561            assert_eq!(asof.key_column, "symbol");
562            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
563        } else {
564            panic!("Expected Asof config");
565        }
566    }
567
568    #[test]
569    fn test_asof_is_asof() {
570        let analysis = JoinAnalysis::asof(
571            "a".to_string(),
572            "b".to_string(),
573            "id".to_string(),
574            "id".to_string(),
575            AsofSqlDirection::Backward,
576            "ts".to_string(),
577            "ts".to_string(),
578            None,
579        );
580
581        let config = JoinOperatorConfig::from_analysis(&analysis);
582        assert!(config.is_asof());
583    }
584
585    #[test]
586    fn test_asof_key_accessors() {
587        let analysis = JoinAnalysis::asof(
588            "trades".to_string(),
589            "quotes".to_string(),
590            "sym".to_string(),
591            "sym".to_string(),
592            AsofSqlDirection::Backward,
593            "ts".to_string(),
594            "ts".to_string(),
595            None,
596        );
597
598        let config = JoinOperatorConfig::from_analysis(&analysis);
599        assert_eq!(config.left_key(), "sym");
600        assert_eq!(config.right_key(), "sym");
601    }
602
603    // -- Multi-way join translator tests --
604
605    #[test]
606    fn test_from_multi_analysis_single() {
607        let analysis = JoinAnalysis::lookup(
608            "a".to_string(),
609            "b".to_string(),
610            "id".to_string(),
611            "id".to_string(),
612            JoinType::Inner,
613        );
614        let multi = MultiJoinAnalysis {
615            joins: vec![analysis],
616            tables: vec!["a".to_string(), "b".to_string()],
617        };
618
619        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
620        assert_eq!(configs.len(), 1);
621        assert!(configs[0].is_lookup());
622    }
623
624    #[test]
625    fn test_from_multi_analysis_two_lookups() {
626        let j1 = JoinAnalysis::lookup(
627            "a".to_string(),
628            "b".to_string(),
629            "id".to_string(),
630            "a_id".to_string(),
631            JoinType::Inner,
632        );
633        let j2 = JoinAnalysis::lookup(
634            "b".to_string(),
635            "c".to_string(),
636            "id".to_string(),
637            "b_id".to_string(),
638            JoinType::Left,
639        );
640        let multi = MultiJoinAnalysis {
641            joins: vec![j1, j2],
642            tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
643        };
644
645        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
646        assert_eq!(configs.len(), 2);
647        assert!(configs[0].is_lookup());
648        assert!(configs[1].is_lookup());
649        assert_eq!(configs[0].left_key(), "id");
650        assert_eq!(configs[1].left_key(), "id");
651    }
652
653    #[test]
654    fn test_from_multi_analysis_mixed_asof_lookup() {
655        let j1 = JoinAnalysis::asof(
656            "trades".to_string(),
657            "quotes".to_string(),
658            "symbol".to_string(),
659            "symbol".to_string(),
660            AsofSqlDirection::Backward,
661            "ts".to_string(),
662            "ts".to_string(),
663            None,
664        );
665        let j2 = JoinAnalysis::lookup(
666            "quotes".to_string(),
667            "products".to_string(),
668            "product_id".to_string(),
669            "id".to_string(),
670            JoinType::Inner,
671        );
672        let multi = MultiJoinAnalysis {
673            joins: vec![j1, j2],
674            tables: vec![
675                "trades".to_string(),
676                "quotes".to_string(),
677                "products".to_string(),
678            ],
679        };
680
681        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
682        assert_eq!(configs.len(), 2);
683        assert!(configs[0].is_asof());
684        assert!(configs[1].is_lookup());
685    }
686
687    #[test]
688    fn test_from_multi_analysis_stream_stream_and_lookup() {
689        let j1 = JoinAnalysis::stream_stream(
690            "orders".to_string(),
691            "payments".to_string(),
692            "id".to_string(),
693            "order_id".to_string(),
694            Duration::from_secs(3600),
695            JoinType::Inner,
696        );
697        let j2 = JoinAnalysis::lookup(
698            "payments".to_string(),
699            "customers".to_string(),
700            "cust_id".to_string(),
701            "id".to_string(),
702            JoinType::Left,
703        );
704        let multi = MultiJoinAnalysis {
705            joins: vec![j1, j2],
706            tables: vec![
707                "orders".to_string(),
708                "payments".to_string(),
709                "customers".to_string(),
710            ],
711        };
712
713        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
714        assert_eq!(configs.len(), 2);
715        assert!(configs[0].is_stream_stream());
716        assert!(configs[1].is_lookup());
717    }
718
719    #[test]
720    fn test_from_multi_analysis_order_preserved() {
721        let j1 = JoinAnalysis::lookup(
722            "a".to_string(),
723            "b".to_string(),
724            "k1".to_string(),
725            "k1".to_string(),
726            JoinType::Inner,
727        );
728        let j2 = JoinAnalysis::stream_stream(
729            "b".to_string(),
730            "c".to_string(),
731            "k2".to_string(),
732            "k2".to_string(),
733            Duration::from_secs(60),
734            JoinType::Left,
735        );
736        let j3 = JoinAnalysis::lookup(
737            "c".to_string(),
738            "d".to_string(),
739            "k3".to_string(),
740            "k3".to_string(),
741            JoinType::Inner,
742        );
743        let multi = MultiJoinAnalysis {
744            joins: vec![j1, j2, j3],
745            tables: vec![
746                "a".to_string(),
747                "b".to_string(),
748                "c".to_string(),
749                "d".to_string(),
750            ],
751        };
752
753        let configs = JoinOperatorConfig::from_multi_analysis(&multi);
754        assert_eq!(configs.len(), 3);
755        assert!(configs[0].is_lookup());
756        assert!(configs[1].is_stream_stream());
757        assert!(configs[2].is_lookup());
758        assert_eq!(configs[0].left_key(), "k1");
759        assert_eq!(configs[1].left_key(), "k2");
760        assert_eq!(configs[2].left_key(), "k3");
761    }
762
763    #[test]
764    fn test_join_types() {
765        // Test all join types map correctly
766        let left_analysis = JoinAnalysis::stream_stream(
767            "a".to_string(),
768            "b".to_string(),
769            "id".to_string(),
770            "id".to_string(),
771            Duration::from_secs(60),
772            JoinType::Left,
773        );
774
775        if let JoinOperatorConfig::StreamStream(config) =
776            JoinOperatorConfig::from_analysis(&left_analysis)
777        {
778            assert_eq!(config.join_type, StreamJoinType::Left);
779        }
780
781        let right_analysis = JoinAnalysis::stream_stream(
782            "a".to_string(),
783            "b".to_string(),
784            "id".to_string(),
785            "id".to_string(),
786            Duration::from_secs(60),
787            JoinType::Right,
788        );
789
790        if let JoinOperatorConfig::StreamStream(config) =
791            JoinOperatorConfig::from_analysis(&right_analysis)
792        {
793            assert_eq!(config.join_type, StreamJoinType::Right);
794        }
795
796        let full_analysis = JoinAnalysis::stream_stream(
797            "a".to_string(),
798            "b".to_string(),
799            "id".to_string(),
800            "id".to_string(),
801            Duration::from_secs(60),
802            JoinType::Full,
803        );
804
805        if let JoinOperatorConfig::StreamStream(config) =
806            JoinOperatorConfig::from_analysis(&full_analysis)
807        {
808            assert_eq!(config.join_type, StreamJoinType::Full);
809        }
810    }
811
812    #[test]
813    fn test_display_stream_join() {
814        let mut config = StreamJoinConfig::inner(
815            "order_id".to_string(),
816            "order_id".to_string(),
817            Duration::from_secs(3600),
818        );
819        config.left_table = "orders".to_string();
820        config.right_table = "payments".to_string();
821        config.left_time_column = "ts".to_string();
822        config.right_time_column = "ts".to_string();
823        assert_eq!(
824            format!("{config}"),
825            "INNER JOIN ON orders.order_id = payments.order_id (bound: 3600s, time: ts ~ ts)"
826        );
827    }
828
829    #[test]
830    fn test_display_lookup_join() {
831        let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
832        assert_eq!(
833            format!("{config}"),
834            "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
835        );
836    }
837
838    #[test]
839    fn test_display_asof_join() {
840        let analysis = JoinAnalysis::asof(
841            "trades".to_string(),
842            "quotes".to_string(),
843            "symbol".to_string(),
844            "symbol".to_string(),
845            AsofSqlDirection::Backward,
846            "ts".to_string(),
847            "ts".to_string(),
848            Some(Duration::from_secs(5)),
849        );
850        let config = JoinOperatorConfig::from_analysis(&analysis);
851        let s = format!("{config}");
852        assert!(s.contains("ASOF JOIN"), "got: {s}");
853        assert!(s.contains("BACKWARD"), "got: {s}");
854        assert!(s.contains("tolerance: 5s"), "got: {s}");
855    }
856
857    #[test]
858    fn test_display_join_types() {
859        assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
860        assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
861        assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
862        assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
863        assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
864        assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
865        assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
866        assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
867    }
868
869    #[test]
870    fn test_from_analysis_semi_anti_maps_to_inner() {
871        // Semi/anti join types map to Inner for stream-stream joins
872        // (semi/anti semantics require per-key state tracking which is
873        // architecturally different from interval joins)
874        let semi = JoinAnalysis::stream_stream(
875            "a".to_string(),
876            "b".to_string(),
877            "id".to_string(),
878            "id".to_string(),
879            Duration::from_secs(60),
880            JoinType::LeftSemi,
881        );
882        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&semi) {
883            assert_eq!(config.join_type, StreamJoinType::Inner);
884        } else {
885            panic!("Expected StreamStream config");
886        }
887
888        let anti = JoinAnalysis::stream_stream(
889            "a".to_string(),
890            "b".to_string(),
891            "id".to_string(),
892            "id".to_string(),
893            Duration::from_secs(60),
894            JoinType::RightAnti,
895        );
896        if let JoinOperatorConfig::StreamStream(config) = JoinOperatorConfig::from_analysis(&anti) {
897            assert_eq!(config.join_type, StreamJoinType::Inner);
898        } else {
899            panic!("Expected StreamStream config");
900        }
901    }
902
903    #[test]
904    fn test_from_analysis_temporal() {
905        let analysis = JoinAnalysis::temporal(
906            "orders".to_string(),
907            "products".to_string(),
908            "product_id".to_string(),
909            "id".to_string(),
910            "order_time".to_string(),
911            JoinType::Inner,
912        );
913
914        let config = JoinOperatorConfig::from_analysis(&analysis);
915        assert!(config.is_temporal());
916        assert!(!config.is_asof());
917        assert!(!config.is_lookup());
918        assert!(!config.is_stream_stream());
919        assert_eq!(config.left_key(), "product_id");
920        assert_eq!(config.right_key(), "id");
921
922        if let JoinOperatorConfig::Temporal(tc) = config {
923            assert_eq!(tc.table_version_column, "order_time");
924            assert_eq!(tc.semantics, "event_time");
925            assert_eq!(tc.join_type, "inner");
926        } else {
927            panic!("Expected Temporal config");
928        }
929    }
930
931    #[test]
932    fn test_temporal_left_join() {
933        let analysis = JoinAnalysis::temporal(
934            "orders".to_string(),
935            "products".to_string(),
936            "product_id".to_string(),
937            "id".to_string(),
938            "order_time".to_string(),
939            JoinType::Left,
940        );
941
942        let config = JoinOperatorConfig::from_analysis(&analysis);
943        if let JoinOperatorConfig::Temporal(tc) = config {
944            assert_eq!(tc.join_type, "left");
945        } else {
946            panic!("Expected Temporal config");
947        }
948    }
949
950    #[test]
951    fn test_display_temporal_join() {
952        let analysis = JoinAnalysis::temporal(
953            "orders".to_string(),
954            "products".to_string(),
955            "product_id".to_string(),
956            "id".to_string(),
957            "order_time".to_string(),
958            JoinType::Inner,
959        );
960        let config = JoinOperatorConfig::from_analysis(&analysis);
961        let s = format!("{config}");
962        assert!(s.contains("TEMPORAL JOIN"), "got: {s}");
963        assert!(s.contains("order_time"), "got: {s}");
964    }
965}