1use std::time::Duration;
7
8use crate::parser::join_parser::{AsofSqlDirection, JoinAnalysis, JoinType, MultiJoinAnalysis};
9
10#[derive(Debug, Clone)]
12pub struct StreamJoinConfig {
13 pub left_key: String,
15 pub right_key: String,
17 pub time_bound: Duration,
19 pub join_type: StreamJoinType,
21}
22
23#[derive(Debug, Clone)]
25pub struct LookupJoinConfig {
26 pub stream_key: String,
28 pub lookup_key: String,
30 pub join_type: LookupJoinType,
32 pub cache_ttl: Duration,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum StreamJoinType {
39 Inner,
41 Left,
43 Right,
45 Full,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub enum LookupJoinType {
52 Inner,
54 Left,
56}
57
58#[derive(Debug, Clone)]
60pub struct AsofJoinTranslatorConfig {
61 pub left_table: String,
63 pub right_table: String,
65 pub key_column: String,
67 pub left_time_column: String,
69 pub right_time_column: String,
71 pub direction: AsofSqlDirection,
73 pub tolerance: Option<Duration>,
75 pub join_type: AsofSqlJoinType,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum AsofSqlJoinType {
82 Inner,
84 Left,
86}
87
88#[derive(Debug, Clone)]
90pub enum JoinOperatorConfig {
91 StreamStream(StreamJoinConfig),
93 Lookup(LookupJoinConfig),
95 Asof(AsofJoinTranslatorConfig),
97}
98
99impl std::fmt::Display for StreamJoinType {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 StreamJoinType::Inner => write!(f, "INNER"),
103 StreamJoinType::Left => write!(f, "LEFT"),
104 StreamJoinType::Right => write!(f, "RIGHT"),
105 StreamJoinType::Full => write!(f, "FULL"),
106 }
107 }
108}
109
110impl std::fmt::Display for LookupJoinType {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 LookupJoinType::Inner => write!(f, "INNER"),
114 LookupJoinType::Left => write!(f, "LEFT"),
115 }
116 }
117}
118
119impl std::fmt::Display for AsofSqlJoinType {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 match self {
122 AsofSqlJoinType::Inner => write!(f, "INNER"),
123 AsofSqlJoinType::Left => write!(f, "LEFT"),
124 }
125 }
126}
127
128impl std::fmt::Display for StreamJoinConfig {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 write!(
131 f,
132 "{} JOIN ON left.{} = right.{} (bound: {}s)",
133 self.join_type,
134 self.left_key,
135 self.right_key,
136 self.time_bound.as_secs()
137 )
138 }
139}
140
141impl std::fmt::Display for LookupJoinConfig {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 write!(
144 f,
145 "{} LOOKUP JOIN ON stream.{} = lookup.{} (cache_ttl: {}s)",
146 self.join_type,
147 self.stream_key,
148 self.lookup_key,
149 self.cache_ttl.as_secs()
150 )
151 }
152}
153
154impl std::fmt::Display for AsofJoinTranslatorConfig {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 write!(
157 f,
158 "{} ASOF JOIN {}.{} = {}.{} ({}, {}.{} ~ {}.{}",
159 self.join_type,
160 self.left_table,
161 self.key_column,
162 self.right_table,
163 self.key_column,
164 self.direction,
165 self.left_table,
166 self.left_time_column,
167 self.right_table,
168 self.right_time_column,
169 )?;
170 if let Some(tol) = self.tolerance {
171 write!(f, ", tolerance: {}s", tol.as_secs())?;
172 }
173 write!(f, ")")
174 }
175}
176
177impl std::fmt::Display for JoinOperatorConfig {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 match self {
180 JoinOperatorConfig::StreamStream(c) => write!(f, "{c}"),
181 JoinOperatorConfig::Lookup(c) => write!(f, "{c}"),
182 JoinOperatorConfig::Asof(c) => write!(f, "{c}"),
183 }
184 }
185}
186
187impl JoinOperatorConfig {
188 #[must_use]
190 pub fn from_analysis(analysis: &JoinAnalysis) -> Self {
191 if analysis.is_asof_join {
192 return JoinOperatorConfig::Asof(AsofJoinTranslatorConfig {
193 left_table: analysis.left_table.clone(),
194 right_table: analysis.right_table.clone(),
195 key_column: analysis.left_key_column.clone(),
196 left_time_column: analysis.left_time_column.clone().unwrap_or_default(),
197 right_time_column: analysis.right_time_column.clone().unwrap_or_default(),
198 direction: analysis
199 .asof_direction
200 .unwrap_or(AsofSqlDirection::Backward),
201 tolerance: analysis.asof_tolerance,
202 join_type: AsofSqlJoinType::Left, });
204 }
205
206 if analysis.is_lookup_join {
207 JoinOperatorConfig::Lookup(LookupJoinConfig {
208 stream_key: analysis.left_key_column.clone(),
209 lookup_key: analysis.right_key_column.clone(),
210 join_type: match analysis.join_type {
211 JoinType::Inner => LookupJoinType::Inner,
212 _ => LookupJoinType::Left,
213 },
214 cache_ttl: Duration::from_secs(300), })
216 } else {
217 JoinOperatorConfig::StreamStream(StreamJoinConfig {
218 left_key: analysis.left_key_column.clone(),
219 right_key: analysis.right_key_column.clone(),
220 time_bound: analysis.time_bound.unwrap_or(Duration::from_secs(3600)),
221 join_type: match analysis.join_type {
222 JoinType::Inner => StreamJoinType::Inner,
223 JoinType::Left | JoinType::AsOf => StreamJoinType::Left,
224 JoinType::Right => StreamJoinType::Right,
225 JoinType::Full => StreamJoinType::Full,
226 },
227 })
228 }
229 }
230
231 #[must_use]
233 pub fn from_multi_analysis(multi: &MultiJoinAnalysis) -> Vec<Self> {
234 multi.joins.iter().map(Self::from_analysis).collect()
235 }
236
237 #[must_use]
239 pub fn is_stream_stream(&self) -> bool {
240 matches!(self, JoinOperatorConfig::StreamStream(_))
241 }
242
243 #[must_use]
245 pub fn is_lookup(&self) -> bool {
246 matches!(self, JoinOperatorConfig::Lookup(_))
247 }
248
249 #[must_use]
251 pub fn is_asof(&self) -> bool {
252 matches!(self, JoinOperatorConfig::Asof(_))
253 }
254
255 #[must_use]
257 pub fn left_key(&self) -> &str {
258 match self {
259 JoinOperatorConfig::StreamStream(config) => &config.left_key,
260 JoinOperatorConfig::Lookup(config) => &config.stream_key,
261 JoinOperatorConfig::Asof(config) => &config.key_column,
262 }
263 }
264
265 #[must_use]
267 pub fn right_key(&self) -> &str {
268 match self {
269 JoinOperatorConfig::StreamStream(config) => &config.right_key,
270 JoinOperatorConfig::Lookup(config) => &config.lookup_key,
271 JoinOperatorConfig::Asof(config) => &config.key_column,
272 }
273 }
274}
275
276impl StreamJoinConfig {
277 #[must_use]
279 pub fn new(
280 left_key: String,
281 right_key: String,
282 time_bound: Duration,
283 join_type: StreamJoinType,
284 ) -> Self {
285 Self {
286 left_key,
287 right_key,
288 time_bound,
289 join_type,
290 }
291 }
292
293 #[must_use]
295 pub fn inner(left_key: String, right_key: String, time_bound: Duration) -> Self {
296 Self::new(left_key, right_key, time_bound, StreamJoinType::Inner)
297 }
298
299 #[must_use]
301 pub fn left(left_key: String, right_key: String, time_bound: Duration) -> Self {
302 Self::new(left_key, right_key, time_bound, StreamJoinType::Left)
303 }
304}
305
306impl LookupJoinConfig {
307 #[must_use]
309 pub fn new(
310 stream_key: String,
311 lookup_key: String,
312 join_type: LookupJoinType,
313 cache_ttl: Duration,
314 ) -> Self {
315 Self {
316 stream_key,
317 lookup_key,
318 join_type,
319 cache_ttl,
320 }
321 }
322
323 #[must_use]
325 pub fn inner(stream_key: String, lookup_key: String) -> Self {
326 Self::new(
327 stream_key,
328 lookup_key,
329 LookupJoinType::Inner,
330 Duration::from_secs(300),
331 )
332 }
333
334 #[must_use]
336 pub fn left(stream_key: String, lookup_key: String) -> Self {
337 Self::new(
338 stream_key,
339 lookup_key,
340 LookupJoinType::Left,
341 Duration::from_secs(300),
342 )
343 }
344
345 #[must_use]
347 pub fn with_cache_ttl(mut self, ttl: Duration) -> Self {
348 self.cache_ttl = ttl;
349 self
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 #[test]
358 fn test_stream_join_config() {
359 let config = StreamJoinConfig::inner(
360 "order_id".to_string(),
361 "order_id".to_string(),
362 Duration::from_secs(3600),
363 );
364
365 assert_eq!(config.left_key, "order_id");
366 assert_eq!(config.right_key, "order_id");
367 assert_eq!(config.time_bound, Duration::from_secs(3600));
368 assert_eq!(config.join_type, StreamJoinType::Inner);
369 }
370
371 #[test]
372 fn test_lookup_join_config() {
373 let config = LookupJoinConfig::inner("customer_id".to_string(), "id".to_string())
374 .with_cache_ttl(Duration::from_secs(600));
375
376 assert_eq!(config.stream_key, "customer_id");
377 assert_eq!(config.lookup_key, "id");
378 assert_eq!(config.cache_ttl, Duration::from_secs(600));
379 assert_eq!(config.join_type, LookupJoinType::Inner);
380 }
381
382 #[test]
383 fn test_from_analysis_lookup() {
384 let analysis = JoinAnalysis::lookup(
385 "orders".to_string(),
386 "customers".to_string(),
387 "customer_id".to_string(),
388 "id".to_string(),
389 JoinType::Inner,
390 );
391
392 let config = JoinOperatorConfig::from_analysis(&analysis);
393
394 assert!(config.is_lookup());
395 assert!(!config.is_stream_stream());
396 assert_eq!(config.left_key(), "customer_id");
397 assert_eq!(config.right_key(), "id");
398 }
399
400 #[test]
401 fn test_from_analysis_stream_stream() {
402 let analysis = JoinAnalysis::stream_stream(
403 "orders".to_string(),
404 "payments".to_string(),
405 "order_id".to_string(),
406 "order_id".to_string(),
407 Duration::from_secs(3600),
408 JoinType::Inner,
409 );
410
411 let config = JoinOperatorConfig::from_analysis(&analysis);
412
413 assert!(config.is_stream_stream());
414 assert!(!config.is_lookup());
415
416 if let JoinOperatorConfig::StreamStream(stream_config) = config {
417 assert_eq!(stream_config.time_bound, Duration::from_secs(3600));
418 assert_eq!(stream_config.join_type, StreamJoinType::Inner);
419 }
420 }
421
422 #[test]
423 fn test_from_analysis_asof() {
424 let analysis = JoinAnalysis::asof(
425 "trades".to_string(),
426 "quotes".to_string(),
427 "symbol".to_string(),
428 "symbol".to_string(),
429 AsofSqlDirection::Backward,
430 "ts".to_string(),
431 "ts".to_string(),
432 Some(Duration::from_secs(5)),
433 );
434
435 let config = JoinOperatorConfig::from_analysis(&analysis);
436 assert!(config.is_asof());
437 assert!(!config.is_stream_stream());
438 assert!(!config.is_lookup());
439 }
440
441 #[test]
442 fn test_asof_config_fields() {
443 let analysis = JoinAnalysis::asof(
444 "trades".to_string(),
445 "quotes".to_string(),
446 "symbol".to_string(),
447 "symbol".to_string(),
448 AsofSqlDirection::Forward,
449 "trade_ts".to_string(),
450 "quote_ts".to_string(),
451 Some(Duration::from_millis(5000)),
452 );
453
454 let config = JoinOperatorConfig::from_analysis(&analysis);
455 if let JoinOperatorConfig::Asof(asof) = config {
456 assert_eq!(asof.direction, AsofSqlDirection::Forward);
457 assert_eq!(asof.left_time_column, "trade_ts");
458 assert_eq!(asof.right_time_column, "quote_ts");
459 assert_eq!(asof.tolerance, Some(Duration::from_millis(5000)));
460 assert_eq!(asof.key_column, "symbol");
461 assert_eq!(asof.join_type, AsofSqlJoinType::Left);
462 } else {
463 panic!("Expected Asof config");
464 }
465 }
466
467 #[test]
468 fn test_asof_is_asof() {
469 let analysis = JoinAnalysis::asof(
470 "a".to_string(),
471 "b".to_string(),
472 "id".to_string(),
473 "id".to_string(),
474 AsofSqlDirection::Backward,
475 "ts".to_string(),
476 "ts".to_string(),
477 None,
478 );
479
480 let config = JoinOperatorConfig::from_analysis(&analysis);
481 assert!(config.is_asof());
482 }
483
484 #[test]
485 fn test_asof_key_accessors() {
486 let analysis = JoinAnalysis::asof(
487 "trades".to_string(),
488 "quotes".to_string(),
489 "sym".to_string(),
490 "sym".to_string(),
491 AsofSqlDirection::Backward,
492 "ts".to_string(),
493 "ts".to_string(),
494 None,
495 );
496
497 let config = JoinOperatorConfig::from_analysis(&analysis);
498 assert_eq!(config.left_key(), "sym");
499 assert_eq!(config.right_key(), "sym");
500 }
501
502 #[test]
505 fn test_from_multi_analysis_single() {
506 let analysis = JoinAnalysis::lookup(
507 "a".to_string(),
508 "b".to_string(),
509 "id".to_string(),
510 "id".to_string(),
511 JoinType::Inner,
512 );
513 let multi = MultiJoinAnalysis {
514 joins: vec![analysis],
515 tables: vec!["a".to_string(), "b".to_string()],
516 };
517
518 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
519 assert_eq!(configs.len(), 1);
520 assert!(configs[0].is_lookup());
521 }
522
523 #[test]
524 fn test_from_multi_analysis_two_lookups() {
525 let j1 = JoinAnalysis::lookup(
526 "a".to_string(),
527 "b".to_string(),
528 "id".to_string(),
529 "a_id".to_string(),
530 JoinType::Inner,
531 );
532 let j2 = JoinAnalysis::lookup(
533 "b".to_string(),
534 "c".to_string(),
535 "id".to_string(),
536 "b_id".to_string(),
537 JoinType::Left,
538 );
539 let multi = MultiJoinAnalysis {
540 joins: vec![j1, j2],
541 tables: vec!["a".to_string(), "b".to_string(), "c".to_string()],
542 };
543
544 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
545 assert_eq!(configs.len(), 2);
546 assert!(configs[0].is_lookup());
547 assert!(configs[1].is_lookup());
548 assert_eq!(configs[0].left_key(), "id");
549 assert_eq!(configs[1].left_key(), "id");
550 }
551
552 #[test]
553 fn test_from_multi_analysis_mixed_asof_lookup() {
554 let j1 = JoinAnalysis::asof(
555 "trades".to_string(),
556 "quotes".to_string(),
557 "symbol".to_string(),
558 "symbol".to_string(),
559 AsofSqlDirection::Backward,
560 "ts".to_string(),
561 "ts".to_string(),
562 None,
563 );
564 let j2 = JoinAnalysis::lookup(
565 "quotes".to_string(),
566 "products".to_string(),
567 "product_id".to_string(),
568 "id".to_string(),
569 JoinType::Inner,
570 );
571 let multi = MultiJoinAnalysis {
572 joins: vec![j1, j2],
573 tables: vec![
574 "trades".to_string(),
575 "quotes".to_string(),
576 "products".to_string(),
577 ],
578 };
579
580 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
581 assert_eq!(configs.len(), 2);
582 assert!(configs[0].is_asof());
583 assert!(configs[1].is_lookup());
584 }
585
586 #[test]
587 fn test_from_multi_analysis_stream_stream_and_lookup() {
588 let j1 = JoinAnalysis::stream_stream(
589 "orders".to_string(),
590 "payments".to_string(),
591 "id".to_string(),
592 "order_id".to_string(),
593 Duration::from_secs(3600),
594 JoinType::Inner,
595 );
596 let j2 = JoinAnalysis::lookup(
597 "payments".to_string(),
598 "customers".to_string(),
599 "cust_id".to_string(),
600 "id".to_string(),
601 JoinType::Left,
602 );
603 let multi = MultiJoinAnalysis {
604 joins: vec![j1, j2],
605 tables: vec![
606 "orders".to_string(),
607 "payments".to_string(),
608 "customers".to_string(),
609 ],
610 };
611
612 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
613 assert_eq!(configs.len(), 2);
614 assert!(configs[0].is_stream_stream());
615 assert!(configs[1].is_lookup());
616 }
617
618 #[test]
619 fn test_from_multi_analysis_order_preserved() {
620 let j1 = JoinAnalysis::lookup(
621 "a".to_string(),
622 "b".to_string(),
623 "k1".to_string(),
624 "k1".to_string(),
625 JoinType::Inner,
626 );
627 let j2 = JoinAnalysis::stream_stream(
628 "b".to_string(),
629 "c".to_string(),
630 "k2".to_string(),
631 "k2".to_string(),
632 Duration::from_secs(60),
633 JoinType::Left,
634 );
635 let j3 = JoinAnalysis::lookup(
636 "c".to_string(),
637 "d".to_string(),
638 "k3".to_string(),
639 "k3".to_string(),
640 JoinType::Inner,
641 );
642 let multi = MultiJoinAnalysis {
643 joins: vec![j1, j2, j3],
644 tables: vec![
645 "a".to_string(),
646 "b".to_string(),
647 "c".to_string(),
648 "d".to_string(),
649 ],
650 };
651
652 let configs = JoinOperatorConfig::from_multi_analysis(&multi);
653 assert_eq!(configs.len(), 3);
654 assert!(configs[0].is_lookup());
655 assert!(configs[1].is_stream_stream());
656 assert!(configs[2].is_lookup());
657 assert_eq!(configs[0].left_key(), "k1");
658 assert_eq!(configs[1].left_key(), "k2");
659 assert_eq!(configs[2].left_key(), "k3");
660 }
661
662 #[test]
663 fn test_join_types() {
664 let left_analysis = JoinAnalysis::stream_stream(
666 "a".to_string(),
667 "b".to_string(),
668 "id".to_string(),
669 "id".to_string(),
670 Duration::from_secs(60),
671 JoinType::Left,
672 );
673
674 if let JoinOperatorConfig::StreamStream(config) =
675 JoinOperatorConfig::from_analysis(&left_analysis)
676 {
677 assert_eq!(config.join_type, StreamJoinType::Left);
678 }
679
680 let right_analysis = JoinAnalysis::stream_stream(
681 "a".to_string(),
682 "b".to_string(),
683 "id".to_string(),
684 "id".to_string(),
685 Duration::from_secs(60),
686 JoinType::Right,
687 );
688
689 if let JoinOperatorConfig::StreamStream(config) =
690 JoinOperatorConfig::from_analysis(&right_analysis)
691 {
692 assert_eq!(config.join_type, StreamJoinType::Right);
693 }
694
695 let full_analysis = JoinAnalysis::stream_stream(
696 "a".to_string(),
697 "b".to_string(),
698 "id".to_string(),
699 "id".to_string(),
700 Duration::from_secs(60),
701 JoinType::Full,
702 );
703
704 if let JoinOperatorConfig::StreamStream(config) =
705 JoinOperatorConfig::from_analysis(&full_analysis)
706 {
707 assert_eq!(config.join_type, StreamJoinType::Full);
708 }
709 }
710
711 #[test]
712 fn test_display_stream_join() {
713 let config = StreamJoinConfig::inner(
714 "order_id".to_string(),
715 "order_id".to_string(),
716 Duration::from_secs(3600),
717 );
718 assert_eq!(
719 format!("{config}"),
720 "INNER JOIN ON left.order_id = right.order_id (bound: 3600s)"
721 );
722 }
723
724 #[test]
725 fn test_display_lookup_join() {
726 let config = LookupJoinConfig::left("cust_id".to_string(), "id".to_string());
727 assert_eq!(
728 format!("{config}"),
729 "LEFT LOOKUP JOIN ON stream.cust_id = lookup.id (cache_ttl: 300s)"
730 );
731 }
732
733 #[test]
734 fn test_display_asof_join() {
735 let analysis = JoinAnalysis::asof(
736 "trades".to_string(),
737 "quotes".to_string(),
738 "symbol".to_string(),
739 "symbol".to_string(),
740 AsofSqlDirection::Backward,
741 "ts".to_string(),
742 "ts".to_string(),
743 Some(Duration::from_secs(5)),
744 );
745 let config = JoinOperatorConfig::from_analysis(&analysis);
746 let s = format!("{config}");
747 assert!(s.contains("ASOF JOIN"), "got: {s}");
748 assert!(s.contains("BACKWARD"), "got: {s}");
749 assert!(s.contains("tolerance: 5s"), "got: {s}");
750 }
751
752 #[test]
753 fn test_display_join_types() {
754 assert_eq!(format!("{}", StreamJoinType::Inner), "INNER");
755 assert_eq!(format!("{}", StreamJoinType::Left), "LEFT");
756 assert_eq!(format!("{}", StreamJoinType::Right), "RIGHT");
757 assert_eq!(format!("{}", StreamJoinType::Full), "FULL");
758 assert_eq!(format!("{}", LookupJoinType::Inner), "INNER");
759 assert_eq!(format!("{}", LookupJoinType::Left), "LEFT");
760 assert_eq!(format!("{}", AsofSqlJoinType::Inner), "INNER");
761 assert_eq!(format!("{}", AsofSqlJoinType::Left), "LEFT");
762 }
763}