Skip to main content

laminar_sql/translator/
join_translator.rs

1//! Join operator configuration builder
2//!
3//! Translates parsed join analysis into operator configurations
4//! for stream-stream joins and lookup joins.
5
6use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType};
9
10/// Configuration for stream-stream join operator
11#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13    /// Left side key column
14    pub left_key: String,
15    /// Right side key column
16    pub right_key: String,
17    /// Time bound for joining (max time difference between events)
18    pub time_bound: Duration,
19    /// Join type
20    pub join_type: StreamJoinType,
21}
22
23/// Configuration for lookup join operator
24#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26    /// Stream side key column
27    pub stream_key: String,
28    /// Lookup table key column
29    pub lookup_key: String,
30    /// Join type
31    pub join_type: LookupJoinType,
32    /// Cache TTL for lookup results
33    pub cache_ttl: Duration,
34}
35
36/// Stream-stream join types
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39    /// Both sides required
40    Inner,
41    /// Left side always emitted
42    Left,
43    /// Right side always emitted
44    Right,
45    /// Both sides always emitted
46    Full,
47}
48
49/// Lookup join types
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum LookupJoinType {
52    /// Stream event required, lookup optional
53    Inner,
54    /// Stream event always emitted, lookup optional
55    Left,
56}
57
58/// Configuration for ASOF join operator
59#[derive(Debug, Clone)]
60pub struct AsofJoinTranslatorConfig {
61    /// Key column for partitioning (e.g., symbol)
62    pub key_column: String,
63    /// Left side time column
64    pub left_time_column: String,
65    /// Right side time column
66    pub right_time_column: String,
67    /// Join direction (Backward or Forward)
68    pub direction: AsofSqlDirection,
69    /// Maximum allowed time difference
70    pub tolerance: Option<Duration>,
71    /// ASOF join type (Inner or Left)
72    pub join_type: AsofSqlJoinType,
73}
74
75/// ASOF join type
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum AsofSqlJoinType {
78    /// Both sides required
79    Inner,
80    /// Left side always emitted
81    Left,
82}
83
84/// Union type for join operator configurations
85#[derive(Debug, Clone)]
86pub enum JoinOperatorConfig {
87    /// Stream-stream join
88    StreamStream(StreamJoinConfig),
89    /// Lookup join
90    Lookup(LookupJoinConfig),
91    /// ASOF join
92    Asof(AsofJoinTranslatorConfig),
93}
94
95impl JoinOperatorConfig {
96    /// Create from join analysis.
97    #[must_use]
98    pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
99        if analysis.is_asof_join {
100            return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
101                key_column: analysis.left_key_column.clone(),
102                left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
103                right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
104                direction: analysis
105                    .asof_direction
106                    .unwrap_or(AsofSqlDirection::Backward),
107                tolerance: analysis.asof_tolerance,
108                join_type: AsofSqlJoinType::Left, // ASOF is always left-style
109            });
110        }
111
112        if analysis.is_lookup_join {
113            JoinOperatorConfig::Lookup(LookupJoinConfig {
114                stream_key: analysis.left_key_column.clone(),
115                lookup_key: analysis.right_key_column.clone(),
116                join_type: match analysis.join_type {
117                    JoinType::Inner => LookupJoinType::Inner,
118                    _ => LookupJoinType::Left,
119                },
120                cache_ttl: Duration::from_secs(300), // Default 5 min
121            })
122        } else {
123            JoinOperatorConfig::StreamStream(StreamJoinConfig {
124                left_key: analysis.left_key_column.clone(),
125                right_key: analysis.right_key_column.clone(),
126                time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
127                join_type: match analysis.join_type {
128                    JoinType::Inner => StreamJoinType::Inner,
129                    JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
130                    JoinType::Right => StreamJoinType::Right,
131                    JoinType::Full => StreamJoinType::Full,
132                },
133            })
134        }
135    }
136
137    /// Check if this is a stream-stream join.
138    #[must_use]
139    pub fn is_stream_stream(&self) -> bool {
140        matches!(self, JoinOperatorConfig::StreamStream(_))
141    }
142
143    /// Check if this is a lookup join.
144    #[must_use]
145    pub fn is_lookup(&self) -> bool {
146        matches!(self, JoinOperatorConfig::Lookup(_))
147    }
148
149    /// Check if this is an ASOF join.
150    #[must_use]
151    pub fn is_asof(&self) -> bool {
152        matches!(self, JoinOperatorConfig::Asof(_))
153    }
154
155    /// Get the left key column name.
156    #[must_use]
157    pub fn left_key(&self) -> &str {
158        match self {
159            JoinOperatorConfig::StreamStream(config) => &config.left_key,
160            JoinOperatorConfig::Lookup(config) => &config.stream_key,
161            JoinOperatorConfig::Asof(config) => &config.key_column,
162        }
163    }
164
165    /// Get the right key column name.
166    #[must_use]
167    pub fn right_key(&self) -> &str {
168        match self {
169            JoinOperatorConfig::StreamStream(config) => &config.right_key,
170            JoinOperatorConfig::Lookup(config) => &config.lookup_key,
171            JoinOperatorConfig::Asof(config) => &config.key_column,
172        }
173    }
174}
175
176impl StreamJoinConfig {
177    /// Create a new stream-stream join configuration.
178    #[must_use]
179    pub fn new(
180        left_key: String,
181        right_key: String,
182        time_bound: Duration,
183        join_type: StreamJoinType,
184    ) -> Self {
185        Self {
186            left_key,
187            right_key,
188            time_bound,
189            join_type,
190        }
191    }
192
193    /// Create an inner join configuration.
194    #[must_use]
195    pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
196        Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
197    }
198
199    /// Create a left join configuration.
200    #[must_use]
201    pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
202        Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
203    }
204}
205
206impl LookupJoinConfig {
207    /// Create a new lookup join configuration.
208    #[must_use]
209    pub fn new(
210        stream_key: String,
211        lookup_key: String,
212        join_type: LookupJoinType,
213        cache_ttl: Duration,
214    ) -> Self {
215        Self {
216            stream_key,
217            lookup_key,
218            join_type,
219            cache_ttl,
220        }
221    }
222
223    /// Create an inner lookup join configuration.
224    #[must_use]
225    pub fn inner(stream_key: String, lookup_key: String) -> Self {
226        Self::new(
227            stream_key,
228            lookup_key,
229            LookupJoinType::Inner,
230            Duration::from_secs(300),
231        )
232    }
233
234    /// Create a left lookup join configuration.
235    #[must_use]
236    pub fn left(stream_key: String, lookup_key: String) -> Self {
237        Self::new(
238            stream_key,
239            lookup_key,
240            LookupJoinType::Left,
241            Duration::from_secs(300),
242        )
243    }
244
245    /// Set the cache TTL.
246    #[must_use]
247    pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
248        self.cache_ttl = ttl;
249        self
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_stream_join_config() {
259        let config = StreamJoinConfig::inner(
260            "order_id".to_string(),
261            "order_id".to_string(),
262            Duration::from_secs(3600),
263        );
264
265        assert_eq!(config.left_key, "order_id");
266        assert_eq!(config.right_key, "order_id");
267        assert_eq!(config.time_bound, Duration::from_secs(3600));
268        assert_eq!(config.join_type, StreamJoinType::Inner);
269    }
270
271    #[test]
272    fn test_lookup_join_config() {
273        let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
274            .with_cache_ttl(Duration::from_secs(600));
275
276        assert_eq!(config.stream_key, "customer_id");
277        assert_eq!(config.lookup_key, "id");
278        assert_eq!(config.cache_ttl, Duration::from_secs(600));
279        assert_eq!(config.join_type, LookupJoinType::Inner);
280    }
281
282    #[test]
283    fn test_from_analysis_lookup() {
284        let analysis = JoinAnalysis::lookup(
285            "orders".to_string(),
286            "customers".to_string(),
287            "customer_id".to_string(),
288            "id".to_string(),
289            JoinType::Inner,
290        );
291
292        let config = JoinOperatorConfig::from_analysis(&analysis);
293
294        assert!(config.is_lookup());
295        assert!(!config.is_stream_stream());
296        assert_eq!(config.left_key(), "customer_id");
297        assert_eq!(config.right_key(), "id");
298    }
299
300    #[test]
301    fn test_from_analysis_stream_stream() {
302        let analysis = JoinAnalysis::stream_stream(
303            "orders".to_string(),
304            "payments".to_string(),
305            "order_id".to_string(),
306            "order_id".to_string(),
307            Duration::from_secs(3600),
308            JoinType::Inner,
309        );
310
311        let config = JoinOperatorConfig::from_analysis(&analysis);
312
313        assert!(config.is_stream_stream());
314        assert!(!config.is_lookup());
315
316        if let JoinOperatorConfig::StreamStream(stream_config) = config {
317            assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
318            assert_eq!(stream_config.join_type, StreamJoinType::Inner);
319        }
320    }
321
322    #[test]
323    fn test_from_analysis_asof() {
324        let analysis = JoinAnalysis::asof(
325            "trades".to_string(),
326            "quotes".to_string(),
327            "symbol".to_string(),
328            "symbol".to_string(),
329            AsofSqlDirection::Backward,
330            "ts".to_string(),
331            "ts".to_string(),
332            Some(Duration::from_secs(5)),
333        );
334
335        let config = JoinOperatorConfig::from_analysis(&analysis);
336        assert!(config.is_asof());
337        assert!(!config.is_stream_stream());
338        assert!(!config.is_lookup());
339    }
340
341    #[test]
342    fn test_asof_config_fields() {
343        let analysis = JoinAnalysis::asof(
344            "trades".to_string(),
345            "quotes".to_string(),
346            "symbol".to_string(),
347            "symbol".to_string(),
348            AsofSqlDirection::Forward,
349            "trade_ts".to_string(),
350            "quote_ts".to_string(),
351            Some(Duration::from_millis(5000)),
352        );
353
354        let config = JoinOperatorConfig::from_analysis(&analysis);
355        if let JoinOperatorConfig::Asof(asof) = config {
356            assert_eq!(asof.direction, AsofSqlDirection::Forward);
357            assert_eq!(asof.left_time_column, "trade_ts");
358            assert_eq!(asof.right_time_column, "quote_ts");
359            assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
360            assert_eq!(asof.key_column, "symbol");
361            assert_eq!(asof.join_type, AsofSqlJoinType::Left);
362        } else {
363            panic!("Expected Asof config");
364        }
365    }
366
367    #[test]
368    fn test_asof_is_asof() {
369        let analysis = JoinAnalysis::asof(
370            "a".to_string(),
371            "b".to_string(),
372            "id".to_string(),
373            "id".to_string(),
374            AsofSqlDirection::Backward,
375            "ts".to_string(),
376            "ts".to_string(),
377            None,
378        );
379
380        let config = JoinOperatorConfig::from_analysis(&analysis);
381        assert!(config.is_asof());
382    }
383
384    #[test]
385    fn test_asof_key_accessors() {
386        let analysis = JoinAnalysis::asof(
387            "trades".to_string(),
388            "quotes".to_string(),
389            "sym".to_string(),
390            "sym".to_string(),
391            AsofSqlDirection::Backward,
392            "ts".to_string(),
393            "ts".to_string(),
394            None,
395        );
396
397        let config = JoinOperatorConfig::from_analysis(&analysis);
398        assert_eq!(config.left_key(), "sym");
399        assert_eq!(config.right_key(), "sym");
400    }
401
402    #[test]
403    fn test_join_types() {
404        // Test all join types map correctly
405        let left_analysis = JoinAnalysis::stream_stream(
406            "a".to_string(),
407            "b".to_string(),
408            "id".to_string(),
409            "id".to_string(),
410            Duration::from_secs(60),
411            JoinType::Left,
412        );
413
414        if let JoinOperatorConfig::StreamStream(config) =
415            JoinOperatorConfig::from_analysis(&left_analysis)
416        {
417            assert_eq!(config.join_type, StreamJoinType::Left);
418        }
419
420        let right_analysis = JoinAnalysis::stream_stream(
421            "a".to_string(),
422            "b".to_string(),
423            "id".to_string(),
424            "id".to_string(),
425            Duration::from_secs(60),
426            JoinType::Right,
427        );
428
429        if let JoinOperatorConfig::StreamStream(config) =
430            JoinOperatorConfig::from_analysis(&right_analysis)
431        {
432            assert_eq!(config.join_type, StreamJoinType::Right);
433        }
434
435        let full_analysis = JoinAnalysis::stream_stream(
436            "a".to_string(),
437            "b".to_string(),
438            "id".to_string(),
439            "id".to_string(),
440            Duration::from_secs(60),
441            JoinType::Full,
442        );
443
444        if let JoinOperatorConfig::StreamStream(config) =
445            JoinOperatorConfig::from_analysis(&full_analysis)
446        {
447            assert_eq!(config.join_type, StreamJoinType::Full);
448        }
449    }
450}