1use super::{
76 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
77 TimerKey,
78};
79use crate::state::{StateStore, StateStoreExt};
80use arrow_array::{Array, ArrayRef, Float64Array, Int64Array, RecordBatch, StringArray};
81use arrow_schema::{DataType, Field, Schema, SchemaRef};
82use bytes::Bytes;
83use rkyv::{
84 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
85};
86use std::collections::HashMap;
87use std::sync::atomic::{AtomicU64, Ordering};
88use std::sync::Arc;
89use std::time::Duration;
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum JoinType {
94 #[default]
96 Inner,
97 Left,
99 Right,
101 Full,
103}
104
105impl JoinType {
106 #[must_use]
108 pub fn emits_unmatched_left(&self) -> bool {
109 matches!(self, JoinType::Left | JoinType::Full)
110 }
111
112 #[must_use]
114 pub fn emits_unmatched_right(&self) -> bool {
115 matches!(self, JoinType::Right | JoinType::Full)
116 }
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum JoinSide {
122 Left,
124 Right,
126}
127
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
135pub enum JoinRowEncoding {
136 #[default]
142 Compact,
143
144 CpuFriendly,
150}
151
152impl std::fmt::Display for JoinRowEncoding {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 match self {
155 Self::Compact => write!(f, "compact"),
156 Self::CpuFriendly => write!(f, "cpu_friendly"),
157 }
158 }
159}
160
161impl std::str::FromStr for JoinRowEncoding {
162 type Err = String;
163
164 fn from_str(s: &str) -> Result<Self, Self::Err> {
165 match s.to_lowercase().as_str() {
166 "compact" => Ok(Self::Compact),
167 "cpu_friendly" | "cpufriendly" | "cpu-friendly" => Ok(Self::CpuFriendly),
168 _ => Err(format!(
169 "Unknown encoding: {s}. Expected 'compact' or 'cpu_friendly'"
170 )),
171 }
172 }
173}
174
175#[derive(Debug, Clone)]
195pub struct StreamJoinConfig {
196 pub left_key_column: String,
198 pub right_key_column: String,
200 pub time_bound_ms: i64,
202 pub join_type: JoinType,
204 pub operator_id: Option<String>,
206
207 pub row_encoding: JoinRowEncoding,
210 pub asymmetric_compaction: bool,
212 pub idle_threshold_ms: i64,
214 pub per_key_tracking: bool,
216 pub key_idle_threshold_ms: i64,
218 pub build_side_pruning: bool,
220 pub build_side: Option<JoinSide>,
222}
223
224impl Default for StreamJoinConfig {
225 fn default() -> Self {
226 Self {
227 left_key_column: String::new(),
228 right_key_column: String::new(),
229 time_bound_ms: 0,
230 join_type: JoinType::Inner,
231 operator_id: None,
232 row_encoding: JoinRowEncoding::Compact,
233 asymmetric_compaction: true,
234 idle_threshold_ms: 60_000, per_key_tracking: true,
236 key_idle_threshold_ms: 300_000, build_side_pruning: true,
238 build_side: None,
239 }
240 }
241}
242
243impl StreamJoinConfig {
244 #[must_use]
246 pub fn builder() -> StreamJoinConfigBuilder {
247 StreamJoinConfigBuilder::default()
248 }
249}
250
251#[derive(Debug, Default)]
253pub struct StreamJoinConfigBuilder {
254 config: StreamJoinConfig,
255}
256
257impl StreamJoinConfigBuilder {
258 #[must_use]
260 pub fn left_key_column(mut self, column: impl Into<String>) -> Self {
261 self.config.left_key_column = column.into();
262 self
263 }
264
265 #[must_use]
267 pub fn right_key_column(mut self, column: impl Into<String>) -> Self {
268 self.config.right_key_column = column.into();
269 self
270 }
271
272 #[must_use]
274 #[allow(clippy::cast_possible_truncation)] pub fn time_bound(mut self, duration: Duration) -> Self {
276 self.config.time_bound_ms = duration.as_millis() as i64;
277 self
278 }
279
280 #[must_use]
282 pub fn time_bound_ms(mut self, ms: i64) -> Self {
283 self.config.time_bound_ms = ms;
284 self
285 }
286
287 #[must_use]
289 pub fn join_type(mut self, join_type: JoinType) -> Self {
290 self.config.join_type = join_type;
291 self
292 }
293
294 #[must_use]
296 pub fn operator_id(mut self, id: impl Into<String>) -> Self {
297 self.config.operator_id = Some(id.into());
298 self
299 }
300
301 #[must_use]
303 pub fn row_encoding(mut self, encoding: JoinRowEncoding) -> Self {
304 self.config.row_encoding = encoding;
305 self
306 }
307
308 #[must_use]
310 pub fn asymmetric_compaction(mut self, enabled: bool) -> Self {
311 self.config.asymmetric_compaction = enabled;
312 self
313 }
314
315 #[must_use]
317 #[allow(clippy::cast_possible_truncation)] pub fn idle_threshold(mut self, duration: Duration) -> Self {
319 self.config.idle_threshold_ms = duration.as_millis() as i64;
320 self
321 }
322
323 #[must_use]
325 pub fn per_key_tracking(mut self, enabled: bool) -> Self {
326 self.config.per_key_tracking = enabled;
327 self
328 }
329
330 #[must_use]
332 #[allow(clippy::cast_possible_truncation)] pub fn key_idle_threshold(mut self, duration: Duration) -> Self {
334 self.config.key_idle_threshold_ms = duration.as_millis() as i64;
335 self
336 }
337
338 #[must_use]
340 pub fn build_side_pruning(mut self, enabled: bool) -> Self {
341 self.config.build_side_pruning = enabled;
342 self
343 }
344
345 #[must_use]
347 pub fn build_side(mut self, side: JoinSide) -> Self {
348 self.config.build_side = Some(side);
349 self
350 }
351
352 #[must_use]
354 pub fn build(self) -> StreamJoinConfig {
355 self.config
356 }
357}
358
359#[derive(Debug, Clone, Default)]
361pub struct SideStats {
362 pub events_received: u64,
364 pub events_this_window: u64,
366 pub last_event_time: i64,
368 pub write_rate: f64,
370 window_start: i64,
372}
373
374impl SideStats {
375 #[must_use]
377 pub fn new() -> Self {
378 Self::default()
379 }
380
381 #[allow(clippy::cast_precision_loss)]
383 pub fn record_event(&mut self, processing_time: i64) {
384 self.events_received += 1;
385 self.events_this_window += 1;
386 self.last_event_time = processing_time;
387
388 if self.window_start == 0 {
390 self.window_start = processing_time;
391 } else {
392 let elapsed_ms = processing_time - self.window_start;
393 if elapsed_ms >= 1000 {
394 self.write_rate = (self.events_this_window as f64 * 1000.0) / elapsed_ms as f64;
396 self.events_this_window = 0;
397 self.window_start = processing_time;
398 }
399 }
400 }
401
402 #[must_use]
404 pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
405 if self.events_received == 0 {
406 return false; }
408 let time_since_last = current_time - self.last_event_time;
409 time_since_last > threshold_ms && self.events_this_window == 0
410 }
411}
412
413#[derive(Debug, Clone)]
415pub struct KeyMetadata {
416 pub last_activity: i64,
418 pub event_count: u64,
420 pub state_entries: u64,
422}
423
424impl KeyMetadata {
425 #[must_use]
427 pub fn new(processing_time: i64) -> Self {
428 Self {
429 last_activity: processing_time,
430 event_count: 1,
431 state_entries: 1,
432 }
433 }
434
435 pub fn record_event(&mut self, processing_time: i64) {
437 self.last_activity = processing_time;
438 self.event_count += 1;
439 self.state_entries += 1;
440 }
441
442 pub fn decrement_entries(&mut self) {
444 self.state_entries = self.state_entries.saturating_sub(1);
445 }
446
447 #[must_use]
449 pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
450 current_time - self.last_activity > threshold_ms
451 }
452}
453
454const LEFT_STATE_PREFIX: &[u8; 4] = b"sjl:";
456const RIGHT_STATE_PREFIX: &[u8; 4] = b"sjr:";
457
458const LEFT_TIMER_PREFIX: u8 = 0x10;
460const RIGHT_TIMER_PREFIX: u8 = 0x20;
462const UNMATCHED_TIMER_PREFIX: u8 = 0x30;
464
465static JOIN_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
467
468static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
470
471#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
473pub struct JoinRow {
474 pub timestamp: i64,
476 pub key_value: Vec<u8>,
478 pub data: Vec<u8>,
480 pub matched: bool,
482 encoding: u8,
485}
486
487const CPU_FRIENDLY_MAGIC: [u8; 4] = *b"CPUF";
489
490#[repr(u8)]
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493enum CpuFriendlyType {
494 Null = 0,
495 Int64 = 1,
496 Float64 = 2,
497 Utf8 = 3,
498}
499
500impl JoinRow {
501 #[cfg(test)]
504 fn new(timestamp: i64, key_value: Vec<u8>, batch: &RecordBatch) -> Result<Self, OperatorError> {
505 Self::with_encoding(timestamp, key_value, batch, JoinRowEncoding::Compact)
506 }
507
508 fn with_encoding(
510 timestamp: i64,
511 key_value: Vec<u8>,
512 batch: &RecordBatch,
513 encoding: JoinRowEncoding,
514 ) -> Result<Self, OperatorError> {
515 let (data, encoding_byte) = match encoding {
516 JoinRowEncoding::Compact => (Self::serialize_compact(batch)?, 0),
517 JoinRowEncoding::CpuFriendly => (Self::serialize_cpu_friendly(batch)?, 1),
518 };
519 Ok(Self {
520 timestamp,
521 key_value,
522 data,
523 matched: false,
524 encoding: encoding_byte,
525 })
526 }
527
528 fn serialize_compact(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
530 let mut buf = Vec::new();
531 {
532 let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
533 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
534 writer
535 .write(batch)
536 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
537 writer
538 .finish()
539 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
540 }
541 Ok(buf)
542 }
543
544 #[allow(clippy::cast_possible_truncation)] fn serialize_cpu_friendly(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
561 let schema = batch.schema();
562 let num_rows = batch.num_rows();
563 let num_cols = batch.num_columns();
564
565 let mut buf = Vec::with_capacity(4 + 2 + 4 + num_cols * 64 + num_rows * num_cols * 8);
567
568 buf.extend_from_slice(&CPU_FRIENDLY_MAGIC);
570 buf.extend_from_slice(&(num_cols as u16).to_le_bytes());
571 buf.extend_from_slice(&(num_rows as u32).to_le_bytes());
572
573 for (i, field) in schema.fields().iter().enumerate() {
575 let column = batch.column(i);
576
577 let name_bytes = field.name().as_bytes();
579 buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
580 buf.extend_from_slice(name_bytes);
581
582 buf.push(u8::from(field.is_nullable()));
584
585 match field.data_type() {
587 DataType::Int64 => {
588 buf.push(CpuFriendlyType::Int64 as u8);
589 Self::write_int64_column(&mut buf, column, num_rows)?;
590 }
591 DataType::Float64 => {
592 buf.push(CpuFriendlyType::Float64 as u8);
593 Self::write_float64_column(&mut buf, column, num_rows)?;
594 }
595 DataType::Utf8 => {
596 buf.push(CpuFriendlyType::Utf8 as u8);
597 Self::write_utf8_column(&mut buf, column, num_rows)?;
598 }
599 _ => {
600 buf.push(CpuFriendlyType::Null as u8);
602 }
603 }
604 }
605
606 Ok(buf)
607 }
608
609 fn write_int64_column(
611 buf: &mut Vec<u8>,
612 column: &ArrayRef,
613 num_rows: usize,
614 ) -> Result<(), OperatorError> {
615 let arr = column
616 .as_any()
617 .downcast_ref::<Int64Array>()
618 .ok_or_else(|| OperatorError::SerializationFailed("Expected Int64Array".into()))?;
619
620 let validity_bytes = num_rows.div_ceil(8);
622 if let Some(nulls) = arr.nulls() {
623 let buffer = nulls.buffer();
625 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
626 buf.extend_from_slice(slice);
627 for _ in slice.len()..validity_bytes {
629 buf.push(0xFF);
630 }
631 } else {
632 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
634 }
635
636 let values = arr.values();
640 let value_bytes =
641 unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
642 buf.extend_from_slice(value_bytes);
643
644 Ok(())
645 }
646
647 fn write_float64_column(
649 buf: &mut Vec<u8>,
650 column: &ArrayRef,
651 num_rows: usize,
652 ) -> Result<(), OperatorError> {
653 let arr = column
654 .as_any()
655 .downcast_ref::<Float64Array>()
656 .ok_or_else(|| OperatorError::SerializationFailed("Expected Float64Array".into()))?;
657
658 let validity_bytes = num_rows.div_ceil(8);
660 if let Some(nulls) = arr.nulls() {
661 let buffer = nulls.buffer();
662 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
663 buf.extend_from_slice(slice);
664 for _ in slice.len()..validity_bytes {
665 buf.push(0xFF);
666 }
667 } else {
668 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
669 }
670
671 let values = arr.values();
674 let value_bytes =
675 unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
676 buf.extend_from_slice(value_bytes);
677
678 Ok(())
679 }
680
681 #[allow(clippy::cast_sign_loss)]
683 fn write_utf8_column(
684 buf: &mut Vec<u8>,
685 column: &ArrayRef,
686 num_rows: usize,
687 ) -> Result<(), OperatorError> {
688 let arr = column
689 .as_any()
690 .downcast_ref::<StringArray>()
691 .ok_or_else(|| OperatorError::SerializationFailed("Expected StringArray".into()))?;
692
693 let validity_bytes = num_rows.div_ceil(8);
695 if let Some(nulls) = arr.nulls() {
696 let buffer = nulls.buffer();
697 let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
698 buf.extend_from_slice(slice);
699 for _ in slice.len()..validity_bytes {
700 buf.push(0xFF);
701 }
702 } else {
703 buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
704 }
705
706 let offsets = arr.offsets();
709 for offset in offsets.iter() {
710 buf.extend_from_slice(&(*offset as u32).to_le_bytes());
711 }
712
713 let values = arr.values();
715 buf.extend_from_slice(values.as_slice());
716
717 Ok(())
718 }
719
720 fn deserialize_batch(data: &[u8], encoding: u8) -> Result<RecordBatch, OperatorError> {
722 if encoding == 1 && data.starts_with(&CPU_FRIENDLY_MAGIC) {
723 Self::deserialize_cpu_friendly(data)
724 } else {
725 Self::deserialize_compact(data)
726 }
727 }
728
729 fn deserialize_compact(data: &[u8]) -> Result<RecordBatch, OperatorError> {
731 let cursor = std::io::Cursor::new(data);
732 let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
733 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
734 reader
735 .next()
736 .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
737 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
738 }
739
740 fn deserialize_cpu_friendly(data: &[u8]) -> Result<RecordBatch, OperatorError> {
742 if data.len() < 10 {
743 return Err(OperatorError::SerializationFailed(
744 "Buffer too short".into(),
745 ));
746 }
747
748 let num_cols = u16::from_le_bytes([data[4], data[5]]) as usize;
750 let num_rows = u32::from_le_bytes([data[6], data[7], data[8], data[9]]) as usize;
751
752 let mut offset = 10;
753 let mut fields = Vec::with_capacity(num_cols);
754 let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
755
756 for _ in 0..num_cols {
757 if offset + 2 > data.len() {
758 return Err(OperatorError::SerializationFailed(
759 "Truncated column header".into(),
760 ));
761 }
762
763 let name_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
765 offset += 2;
766
767 if offset + name_len > data.len() {
768 return Err(OperatorError::SerializationFailed(
769 "Truncated column name".into(),
770 ));
771 }
772 let name = String::from_utf8_lossy(&data[offset..offset + name_len]).to_string();
773 offset += name_len;
774
775 if offset + 2 > data.len() {
776 return Err(OperatorError::SerializationFailed(
777 "Truncated type info".into(),
778 ));
779 }
780
781 let nullable = data[offset] != 0;
783 offset += 1;
784 let type_tag = data[offset];
785 offset += 1;
786
787 let validity_bytes = num_rows.div_ceil(8);
789
790 match type_tag {
791 t if t == CpuFriendlyType::Int64 as u8 => {
792 let (arr, new_offset) =
793 Self::read_int64_column(data, offset, num_rows, validity_bytes)?;
794 offset = new_offset;
795 fields.push(Field::new(&name, DataType::Int64, nullable));
796 columns.push(Arc::new(arr));
797 }
798 t if t == CpuFriendlyType::Float64 as u8 => {
799 let (arr, new_offset) =
800 Self::read_float64_column(data, offset, num_rows, validity_bytes)?;
801 offset = new_offset;
802 fields.push(Field::new(&name, DataType::Float64, nullable));
803 columns.push(Arc::new(arr));
804 }
805 t if t == CpuFriendlyType::Utf8 as u8 => {
806 let (arr, new_offset) =
807 Self::read_utf8_column(data, offset, num_rows, validity_bytes)?;
808 offset = new_offset;
809 fields.push(Field::new(&name, DataType::Utf8, nullable));
810 columns.push(Arc::new(arr));
811 }
812 _ => {
813 fields.push(Field::new(&name, DataType::Int64, true));
815 columns.push(Arc::new(Int64Array::from(vec![None; num_rows])));
816 }
817 }
818 }
819
820 let schema = Arc::new(Schema::new(fields));
821 RecordBatch::try_new(schema, columns)
822 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
823 }
824
825 fn read_int64_column(
827 data: &[u8],
828 offset: usize,
829 num_rows: usize,
830 validity_bytes: usize,
831 ) -> Result<(Int64Array, usize), OperatorError> {
832 let mut pos = offset;
833
834 if pos + validity_bytes > data.len() {
836 return Err(OperatorError::SerializationFailed(
837 "Truncated validity".into(),
838 ));
839 }
840 pos += validity_bytes;
841
842 let values_bytes = num_rows * 8;
844 if pos + values_bytes > data.len() {
845 return Err(OperatorError::SerializationFailed(
846 "Truncated int64 values".into(),
847 ));
848 }
849
850 let mut values = Vec::with_capacity(num_rows);
851 for i in 0..num_rows {
852 let start = pos + i * 8;
853 let bytes = [
854 data[start],
855 data[start + 1],
856 data[start + 2],
857 data[start + 3],
858 data[start + 4],
859 data[start + 5],
860 data[start + 6],
861 data[start + 7],
862 ];
863 values.push(i64::from_le_bytes(bytes));
864 }
865 pos += values_bytes;
866
867 Ok((Int64Array::from(values), pos))
868 }
869
870 fn read_float64_column(
872 data: &[u8],
873 offset: usize,
874 num_rows: usize,
875 validity_bytes: usize,
876 ) -> Result<(Float64Array, usize), OperatorError> {
877 let mut pos = offset;
878
879 if pos + validity_bytes > data.len() {
881 return Err(OperatorError::SerializationFailed(
882 "Truncated validity".into(),
883 ));
884 }
885 pos += validity_bytes;
886
887 let values_bytes = num_rows * 8;
889 if pos + values_bytes > data.len() {
890 return Err(OperatorError::SerializationFailed(
891 "Truncated float64 values".into(),
892 ));
893 }
894
895 let mut values = Vec::with_capacity(num_rows);
896 for i in 0..num_rows {
897 let start = pos + i * 8;
898 let bytes = [
899 data[start],
900 data[start + 1],
901 data[start + 2],
902 data[start + 3],
903 data[start + 4],
904 data[start + 5],
905 data[start + 6],
906 data[start + 7],
907 ];
908 values.push(f64::from_le_bytes(bytes));
909 }
910 pos += values_bytes;
911
912 Ok((Float64Array::from(values), pos))
913 }
914
915 fn read_utf8_column(
917 data: &[u8],
918 offset: usize,
919 num_rows: usize,
920 validity_bytes: usize,
921 ) -> Result<(StringArray, usize), OperatorError> {
922 let mut pos = offset;
923
924 if pos + validity_bytes > data.len() {
926 return Err(OperatorError::SerializationFailed(
927 "Truncated validity".into(),
928 ));
929 }
930 pos += validity_bytes;
931
932 let offsets_bytes = (num_rows + 1) * 4;
934 if pos + offsets_bytes > data.len() {
935 return Err(OperatorError::SerializationFailed(
936 "Truncated offsets".into(),
937 ));
938 }
939
940 let mut offsets = Vec::with_capacity(num_rows + 1);
941 for i in 0..=num_rows {
942 let start = pos + i * 4;
943 let bytes = [
944 data[start],
945 data[start + 1],
946 data[start + 2],
947 data[start + 3],
948 ];
949 offsets.push(u32::from_le_bytes(bytes) as usize);
950 }
951 pos += offsets_bytes;
952
953 let data_len = offsets.last().copied().unwrap_or(0);
955 if pos + data_len > data.len() {
956 return Err(OperatorError::SerializationFailed(
957 "Truncated string data".into(),
958 ));
959 }
960
961 let string_data = &data[pos..pos + data_len];
962 pos += data_len;
963
964 let mut strings = Vec::with_capacity(num_rows);
966 for i in 0..num_rows {
967 let start = offsets[i];
968 let end = offsets[i + 1];
969 let s = String::from_utf8_lossy(&string_data[start..end]).to_string();
970 strings.push(s);
971 }
972
973 Ok((StringArray::from(strings), pos))
974 }
975
976 pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
982 Self::deserialize_batch(&self.data, self.encoding)
983 }
984
985 #[must_use]
987 pub fn encoding(&self) -> JoinRowEncoding {
988 if self.encoding == 1 {
989 JoinRowEncoding::CpuFriendly
990 } else {
991 JoinRowEncoding::Compact
992 }
993 }
994}
995
996#[derive(Debug, Clone, Default)]
998pub struct JoinMetrics {
999 pub left_events: u64,
1001 pub right_events: u64,
1003 pub matches: u64,
1005 pub unmatched_left: u64,
1007 pub unmatched_right: u64,
1009 pub late_events: u64,
1011 pub state_cleanups: u64,
1013
1014 pub cpu_friendly_encodes: u64,
1017 pub compact_encodes: u64,
1019 pub asymmetric_skips: u64,
1021 pub idle_key_cleanups: u64,
1023 pub build_side_prunes: u64,
1025 pub tracked_keys: u64,
1027}
1028
1029impl JoinMetrics {
1030 #[must_use]
1032 pub fn new() -> Self {
1033 Self::default()
1034 }
1035
1036 pub fn reset(&mut self) {
1038 *self = Self::default();
1039 }
1040}
1041
1042pub struct StreamJoinOperator {
1067 left_key_column: String,
1069 right_key_column: String,
1071 time_bound_ms: i64,
1073 join_type: JoinType,
1075 operator_id: String,
1077 metrics: JoinMetrics,
1079 output_schema: Option<SchemaRef>,
1081 left_schema: Option<SchemaRef>,
1083 right_schema: Option<SchemaRef>,
1085
1086 row_encoding: JoinRowEncoding,
1089 asymmetric_compaction: bool,
1091 idle_threshold_ms: i64,
1093 per_key_tracking: bool,
1095 key_idle_threshold_ms: i64,
1097 build_side_pruning: bool,
1099 build_side: Option<JoinSide>,
1101 left_stats: SideStats,
1103 right_stats: SideStats,
1105 key_metadata: HashMap<u64, KeyMetadata>,
1107 left_watermark: i64,
1109 right_watermark: i64,
1111 prune_buffer: Vec<Bytes>,
1113}
1114
1115impl StreamJoinOperator {
1116 #[must_use]
1125 #[allow(clippy::cast_possible_truncation)] pub fn new(
1127 left_key_column: String,
1128 right_key_column: String,
1129 time_bound: Duration,
1130 join_type: JoinType,
1131 ) -> Self {
1132 let operator_num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1133 Self {
1134 left_key_column,
1135 right_key_column,
1136 time_bound_ms: time_bound.as_millis() as i64,
1137 join_type,
1138 operator_id: format!("stream_join_{operator_num}"),
1139 metrics: JoinMetrics::new(),
1140 output_schema: None,
1141 left_schema: None,
1142 right_schema: None,
1143 row_encoding: JoinRowEncoding::Compact,
1145 asymmetric_compaction: true,
1146 idle_threshold_ms: 60_000,
1147 per_key_tracking: true,
1148 key_idle_threshold_ms: 300_000,
1149 build_side_pruning: true,
1150 build_side: None,
1151 left_stats: SideStats::new(),
1152 right_stats: SideStats::new(),
1153 key_metadata: HashMap::new(),
1154 left_watermark: i64::MIN,
1155 right_watermark: i64::MIN,
1156 prune_buffer: Vec::with_capacity(100),
1157 }
1158 }
1159
1160 #[must_use]
1162 #[allow(clippy::cast_possible_truncation)] pub fn with_id(
1164 left_key_column: String,
1165 right_key_column: String,
1166 time_bound: Duration,
1167 join_type: JoinType,
1168 operator_id: String,
1169 ) -> Self {
1170 Self {
1171 left_key_column,
1172 right_key_column,
1173 time_bound_ms: time_bound.as_millis() as i64,
1174 join_type,
1175 operator_id,
1176 metrics: JoinMetrics::new(),
1177 output_schema: None,
1178 left_schema: None,
1179 right_schema: None,
1180 row_encoding: JoinRowEncoding::Compact,
1182 asymmetric_compaction: true,
1183 idle_threshold_ms: 60_000,
1184 per_key_tracking: true,
1185 key_idle_threshold_ms: 300_000,
1186 build_side_pruning: true,
1187 build_side: None,
1188 left_stats: SideStats::new(),
1189 right_stats: SideStats::new(),
1190 key_metadata: HashMap::new(),
1191 left_watermark: i64::MIN,
1192 right_watermark: i64::MIN,
1193 prune_buffer: Vec::with_capacity(100),
1194 }
1195 }
1196
1197 #[must_use]
1221 pub fn from_config(config: StreamJoinConfig) -> Self {
1222 let operator_id = config.operator_id.unwrap_or_else(|| {
1223 let num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1224 format!("stream_join_{num}")
1225 });
1226
1227 Self {
1228 left_key_column: config.left_key_column,
1229 right_key_column: config.right_key_column,
1230 time_bound_ms: config.time_bound_ms,
1231 join_type: config.join_type,
1232 operator_id,
1233 metrics: JoinMetrics::new(),
1234 output_schema: None,
1235 left_schema: None,
1236 right_schema: None,
1237 row_encoding: config.row_encoding,
1238 asymmetric_compaction: config.asymmetric_compaction,
1239 idle_threshold_ms: config.idle_threshold_ms,
1240 per_key_tracking: config.per_key_tracking,
1241 key_idle_threshold_ms: config.key_idle_threshold_ms,
1242 build_side_pruning: config.build_side_pruning,
1243 build_side: config.build_side,
1244 left_stats: SideStats::new(),
1245 right_stats: SideStats::new(),
1246 key_metadata: HashMap::new(),
1247 left_watermark: i64::MIN,
1248 right_watermark: i64::MIN,
1249 prune_buffer: Vec::with_capacity(100),
1250 }
1251 }
1252
1253 #[must_use]
1255 pub fn join_type(&self) -> JoinType {
1256 self.join_type
1257 }
1258
1259 #[must_use]
1261 pub fn time_bound_ms(&self) -> i64 {
1262 self.time_bound_ms
1263 }
1264
1265 #[must_use]
1267 pub fn metrics(&self) -> &JoinMetrics {
1268 &self.metrics
1269 }
1270
1271 pub fn reset_metrics(&mut self) {
1273 self.metrics.reset();
1274 }
1275
1276 #[must_use]
1278 pub fn row_encoding(&self) -> JoinRowEncoding {
1279 self.row_encoding
1280 }
1281
1282 #[must_use]
1284 pub fn asymmetric_compaction_enabled(&self) -> bool {
1285 self.asymmetric_compaction
1286 }
1287
1288 #[must_use]
1290 pub fn per_key_tracking_enabled(&self) -> bool {
1291 self.per_key_tracking
1292 }
1293
1294 #[must_use]
1296 pub fn left_stats(&self) -> &SideStats {
1297 &self.left_stats
1298 }
1299
1300 #[must_use]
1302 pub fn right_stats(&self) -> &SideStats {
1303 &self.right_stats
1304 }
1305
1306 #[must_use]
1308 pub fn tracked_key_count(&self) -> usize {
1309 self.key_metadata.len()
1310 }
1311
1312 #[must_use]
1314 pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool {
1315 match side {
1316 JoinSide::Left => self
1317 .left_stats
1318 .is_idle(current_time, self.idle_threshold_ms),
1319 JoinSide::Right => self
1320 .right_stats
1321 .is_idle(current_time, self.idle_threshold_ms),
1322 }
1323 }
1324
1325 #[must_use]
1327 pub fn effective_build_side(&self) -> JoinSide {
1328 if let Some(side) = self.build_side {
1330 return side;
1331 }
1332
1333 if self.left_stats.events_received < self.right_stats.events_received {
1335 JoinSide::Left
1336 } else {
1337 JoinSide::Right
1338 }
1339 }
1340
1341 pub fn process_side(
1346 &mut self,
1347 event: &Event,
1348 side: JoinSide,
1349 ctx: &mut OperatorContext,
1350 ) -> OutputVec {
1351 match side {
1352 JoinSide::Left => self.process_left(event, ctx),
1353 JoinSide::Right => self.process_right(event, ctx),
1354 }
1355 }
1356
1357 fn process_left(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1359 self.metrics.left_events += 1;
1360
1361 self.left_stats.record_event(ctx.processing_time);
1363
1364 if self.left_schema.is_none() {
1366 self.left_schema = Some(event.data.schema());
1367 self.update_output_schema();
1368 }
1369
1370 self.process_event(event, JoinSide::Left, ctx)
1371 }
1372
1373 fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1375 self.metrics.right_events += 1;
1376
1377 self.right_stats.record_event(ctx.processing_time);
1379
1380 if self.right_schema.is_none() {
1382 self.right_schema = Some(event.data.schema());
1383 self.update_output_schema();
1384 }
1385
1386 self.process_event(event, JoinSide::Right, ctx)
1387 }
1388
1389 fn update_output_schema(&mut self) {
1391 if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
1392 let mut fields: Vec<Field> = left.fields().iter().map(|f| f.as_ref().clone()).collect();
1393
1394 for field in right.fields() {
1396 let name = if left.field_with_name(field.name()).is_ok() {
1397 format!("right_{}", field.name())
1398 } else {
1399 field.name().clone()
1400 };
1401 fields.push(Field::new(
1402 name,
1403 field.data_type().clone(),
1404 true, ));
1406 }
1407
1408 self.output_schema = Some(Arc::new(Schema::new(fields)));
1409 }
1410 }
1411
1412 fn process_event(
1414 &mut self,
1415 event: &Event,
1416 side: JoinSide,
1417 ctx: &mut OperatorContext,
1418 ) -> OutputVec {
1419 let mut output = OutputVec::new();
1420 let event_time = event.timestamp;
1421
1422 let emitted_watermark = ctx.watermark_generator.on_event(event_time);
1424
1425 match side {
1427 JoinSide::Left => self.left_watermark = self.left_watermark.max(event_time),
1428 JoinSide::Right => self.right_watermark = self.right_watermark.max(event_time),
1429 }
1430
1431 let current_wm = ctx.watermark_generator.current_watermark();
1433 if current_wm > i64::MIN && event_time + self.time_bound_ms < current_wm {
1434 self.metrics.late_events += 1;
1435 output.push(Output::LateEvent(event.clone()));
1436 return output;
1437 }
1438
1439 let key_column = match side {
1441 JoinSide::Left => &self.left_key_column,
1442 JoinSide::Right => &self.right_key_column,
1443 };
1444 let Some(key_value) = Self::extract_key(&event.data, key_column) else {
1445 return output;
1447 };
1448
1449 let key_hash = fxhash::hash64(&key_value);
1451
1452 if self.per_key_tracking {
1454 self.key_metadata
1455 .entry(key_hash)
1456 .and_modify(|meta| meta.record_event(ctx.processing_time))
1457 .or_insert_with(|| KeyMetadata::new(ctx.processing_time));
1458 self.metrics.tracked_keys = self.key_metadata.len() as u64;
1459 }
1460
1461 let join_row = match JoinRow::with_encoding(
1463 event_time,
1464 key_value.clone(),
1465 &event.data,
1466 self.row_encoding,
1467 ) {
1468 Ok(row) => {
1469 match self.row_encoding {
1471 JoinRowEncoding::Compact => self.metrics.compact_encodes += 1,
1472 JoinRowEncoding::CpuFriendly => self.metrics.cpu_friendly_encodes += 1,
1473 }
1474 row
1475 }
1476 Err(_) => return output,
1477 };
1478
1479 let state_key = Self::make_state_key(side, &key_value, event_time);
1481 if ctx.state.put_typed(&state_key, &join_row).is_err() {
1482 return output;
1483 }
1484
1485 let cleanup_time = event_time + self.time_bound_ms;
1487 let timer_key = Self::make_timer_key(side, &state_key);
1488 ctx.timers
1489 .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
1490
1491 if (side == JoinSide::Left && self.join_type.emits_unmatched_left())
1493 || (side == JoinSide::Right && self.join_type.emits_unmatched_right())
1494 {
1495 let unmatched_timer_key = Self::make_unmatched_timer_key(side, &state_key);
1496 ctx.timers.register_timer(
1497 cleanup_time,
1498 Some(unmatched_timer_key),
1499 Some(ctx.operator_index),
1500 );
1501 }
1502
1503 if self.build_side_pruning {
1505 self.prune_build_side(side, ctx);
1506 }
1507
1508 let matches = self.probe_opposite_side(side, &key_value, event_time, ctx.state);
1510
1511 for (other_row_key, mut other_row) in matches {
1513 self.metrics.matches += 1;
1514
1515 other_row.matched = true;
1517 let _ = ctx.state.put_typed(&other_row_key, &other_row);
1518
1519 if let Ok(Some(mut our_row)) = ctx.state.get_typed::<JoinRow>(&state_key) {
1521 our_row.matched = true;
1522 let _ = ctx.state.put_typed(&state_key, &our_row);
1523 }
1524
1525 if let Some(joined_event) = self.create_joined_event(
1527 side,
1528 &join_row,
1529 &other_row,
1530 std::cmp::max(event_time, other_row.timestamp),
1531 ) {
1532 output.push(Output::Event(joined_event));
1533 }
1534 }
1535
1536 if let Some(wm) = emitted_watermark {
1538 output.push(Output::Watermark(wm.timestamp()));
1539 }
1540
1541 output
1542 }
1543
1544 fn prune_build_side(&mut self, current_side: JoinSide, ctx: &mut OperatorContext) {
1549 let build_side = self.effective_build_side();
1550
1551 if current_side == build_side {
1553 return;
1554 }
1555
1556 let probe_watermark = match build_side {
1558 JoinSide::Left => self.right_watermark,
1559 JoinSide::Right => self.left_watermark,
1560 };
1561
1562 if probe_watermark == i64::MIN {
1563 return;
1564 }
1565
1566 let prune_threshold = probe_watermark - self.time_bound_ms;
1568 if prune_threshold == i64::MIN {
1569 return;
1570 }
1571
1572 if self.join_type == JoinType::Inner {
1574 let prefix = match build_side {
1575 JoinSide::Left => LEFT_STATE_PREFIX,
1576 JoinSide::Right => RIGHT_STATE_PREFIX,
1577 };
1578
1579 self.prune_buffer.clear();
1581 let time_bound = self.time_bound_ms;
1582 for (key, value) in ctx.state.prefix_scan(prefix) {
1583 if self.prune_buffer.len() >= 100 {
1584 break; }
1586 if key.len() >= 20 {
1588 if let Ok(ts_bytes) = <[u8; 8]>::try_from(&key[12..20]) {
1589 let timestamp = i64::from_be_bytes(ts_bytes);
1590 if timestamp + time_bound < prune_threshold {
1591 if let Ok(row) =
1593 rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1594 .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1595 {
1596 if row.timestamp + time_bound < prune_threshold {
1597 self.prune_buffer.push(key);
1598 }
1599 }
1600 }
1601 }
1602 }
1603 }
1604
1605 for key in &self.prune_buffer {
1606 if ctx.state.delete(key).is_ok() {
1607 self.metrics.build_side_prunes += 1;
1608 }
1609 }
1610 }
1611 }
1612
1613 pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext) {
1618 if !self.per_key_tracking {
1619 return;
1620 }
1621
1622 let threshold = ctx.processing_time - self.key_idle_threshold_ms;
1623
1624 let idle_keys: Vec<u64> = self
1626 .key_metadata
1627 .iter()
1628 .filter(|(_, meta)| meta.last_activity < threshold && meta.state_entries == 0)
1629 .map(|(k, _)| *k)
1630 .collect();
1631
1632 for key_hash in idle_keys {
1634 self.key_metadata.remove(&key_hash);
1635 self.metrics.idle_key_cleanups += 1;
1636 }
1637
1638 self.metrics.tracked_keys = self.key_metadata.len() as u64;
1639 }
1640
1641 #[must_use]
1643 pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool {
1644 if !self.asymmetric_compaction {
1645 return false;
1646 }
1647
1648 let is_idle = match side {
1650 JoinSide::Left => self
1651 .left_stats
1652 .is_idle(current_time, self.idle_threshold_ms),
1653 JoinSide::Right => self
1654 .right_stats
1655 .is_idle(current_time, self.idle_threshold_ms),
1656 };
1657
1658 if is_idle {
1659 true
1661 } else {
1662 false
1663 }
1664 }
1665
1666 fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
1668 let column_index = batch.schema().index_of(column_name).ok()?;
1669 let column = batch.column(column_index);
1670
1671 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
1673 if string_array.is_empty() || string_array.is_null(0) {
1674 return None;
1675 }
1676 return Some(string_array.value(0).as_bytes().to_vec());
1677 }
1678
1679 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
1680 if int_array.is_empty() || int_array.is_null(0) {
1681 return None;
1682 }
1683 return Some(int_array.value(0).to_le_bytes().to_vec());
1684 }
1685
1686 None
1689 }
1690
1691 #[allow(clippy::cast_sign_loss)]
1693 fn make_state_key(side: JoinSide, key_value: &[u8], timestamp: i64) -> Vec<u8> {
1694 let prefix = match side {
1695 JoinSide::Left => LEFT_STATE_PREFIX,
1696 JoinSide::Right => RIGHT_STATE_PREFIX,
1697 };
1698
1699 let event_id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
1700
1701 let mut key = Vec::with_capacity(28);
1703 key.extend_from_slice(prefix);
1704
1705 let key_hash = fxhash::hash64(key_value);
1707 key.extend_from_slice(&key_hash.to_be_bytes());
1708 key.extend_from_slice(×tamp.to_be_bytes());
1709 key.extend_from_slice(&event_id.to_be_bytes());
1710
1711 key
1712 }
1713
1714 fn make_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1716 let prefix = match side {
1717 JoinSide::Left => LEFT_TIMER_PREFIX,
1718 JoinSide::Right => RIGHT_TIMER_PREFIX,
1719 };
1720
1721 let mut key = TimerKey::new();
1722 key.push(prefix);
1723 key.extend_from_slice(state_key);
1724 key
1725 }
1726
1727 fn make_unmatched_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1729 let side_byte = match side {
1730 JoinSide::Left => 0x01,
1731 JoinSide::Right => 0x02,
1732 };
1733
1734 let mut key = TimerKey::new();
1735 key.push(UNMATCHED_TIMER_PREFIX);
1736 key.push(side_byte);
1737 key.extend_from_slice(state_key);
1738 key
1739 }
1740
1741 fn probe_opposite_side(
1743 &self,
1744 current_side: JoinSide,
1745 key_value: &[u8],
1746 timestamp: i64,
1747 state: &dyn StateStore,
1748 ) -> Vec<(Vec<u8>, JoinRow)> {
1749 let mut matches = Vec::new();
1750
1751 let prefix = match current_side {
1752 JoinSide::Left => RIGHT_STATE_PREFIX,
1753 JoinSide::Right => LEFT_STATE_PREFIX,
1754 };
1755
1756 let key_hash = fxhash::hash64(key_value);
1758 let mut scan_prefix = Vec::with_capacity(12);
1759 scan_prefix.extend_from_slice(prefix);
1760 scan_prefix.extend_from_slice(&key_hash.to_be_bytes());
1761
1762 for (state_key, value) in state.prefix_scan(&scan_prefix) {
1764 let Ok(row) = rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1766 .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1767 else {
1768 continue;
1769 };
1770
1771 let time_diff = (timestamp - row.timestamp).abs();
1773 if time_diff <= self.time_bound_ms {
1774 if row.key_value == key_value {
1776 matches.push((state_key.to_vec(), row));
1777 }
1778 }
1779 }
1780
1781 matches
1782 }
1783
1784 fn create_joined_event(
1786 &self,
1787 current_side: JoinSide,
1788 current_row: &JoinRow,
1789 other_row: &JoinRow,
1790 output_timestamp: i64,
1791 ) -> Option<Event> {
1792 let (left_row, right_row) = match current_side {
1793 JoinSide::Left => (current_row, other_row),
1794 JoinSide::Right => (other_row, current_row),
1795 };
1796
1797 let left_batch = left_row.to_batch().ok()?;
1798 let right_batch = right_row.to_batch().ok()?;
1799
1800 let joined_batch = self.concat_batches(&left_batch, &right_batch)?;
1801
1802 Some(Event::new(output_timestamp, joined_batch))
1803 }
1804
1805 fn concat_batches(&self, left: &RecordBatch, right: &RecordBatch) -> Option<RecordBatch> {
1807 let schema = self.output_schema.as_ref()?;
1808
1809 let mut columns: Vec<ArrayRef> = left.columns().to_vec();
1810
1811 for column in right.columns() {
1813 columns.push(Arc::clone(column));
1814 }
1815
1816 RecordBatch::try_new(Arc::clone(schema), columns).ok()
1817 }
1818
1819 fn create_unmatched_event(&self, side: JoinSide, row: &JoinRow) -> Option<Event> {
1821 let batch = row.to_batch().ok()?;
1822 let schema = self.output_schema.as_ref()?;
1823
1824 let num_rows = batch.num_rows();
1825 let mut columns: Vec<ArrayRef> = Vec::new();
1826
1827 match side {
1828 JoinSide::Left => {
1829 columns.extend(batch.columns().iter().cloned());
1831
1832 if let Some(right_schema) = &self.right_schema {
1834 for field in right_schema.fields() {
1835 columns.push(Self::create_null_array(field.data_type(), num_rows));
1836 }
1837 }
1838 }
1839 JoinSide::Right => {
1840 if let Some(left_schema) = &self.left_schema {
1842 for field in left_schema.fields() {
1843 columns.push(Self::create_null_array(field.data_type(), num_rows));
1844 }
1845 }
1846
1847 columns.extend(batch.columns().iter().cloned());
1848 }
1849 }
1850
1851 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
1852
1853 Some(Event::new(row.timestamp, joined_batch))
1854 }
1855
1856 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
1858 match data_type {
1859 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
1860 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
1862 }
1863 }
1864
1865 fn handle_cleanup_timer(
1867 &mut self,
1868 side: JoinSide,
1869 state_key: &[u8],
1870 ctx: &mut OperatorContext,
1871 ) -> OutputVec {
1872 let output = OutputVec::new();
1873
1874 if self.should_skip_compaction(side, ctx.processing_time) {
1876 self.metrics.asymmetric_skips += 1;
1877 }
1879
1880 if self.per_key_tracking && state_key.len() >= 12 {
1882 if let Ok(hash_bytes) = state_key[4..12].try_into() {
1884 let key_hash = u64::from_be_bytes(hash_bytes);
1885 if let Some(meta) = self.key_metadata.get_mut(&key_hash) {
1886 meta.decrement_entries();
1887 }
1888 }
1889 }
1890
1891 if ctx.state.delete(state_key).is_ok() {
1893 self.metrics.state_cleanups += 1;
1894 }
1895
1896 output
1897 }
1898
1899 fn handle_unmatched_timer(
1901 &mut self,
1902 side: JoinSide,
1903 state_key: &[u8],
1904 ctx: &mut OperatorContext,
1905 ) -> OutputVec {
1906 let mut output = OutputVec::new();
1907
1908 let Ok(Some(row)) = ctx.state.get_typed::<JoinRow>(state_key) else {
1910 return output;
1911 };
1912
1913 if !row.matched {
1915 match side {
1916 JoinSide::Left if self.join_type.emits_unmatched_left() => {
1917 self.metrics.unmatched_left += 1;
1918 if let Some(event) = self.create_unmatched_event(side, &row) {
1919 output.push(Output::Event(event));
1920 }
1921 }
1922 JoinSide::Right if self.join_type.emits_unmatched_right() => {
1923 self.metrics.unmatched_right += 1;
1924 if let Some(event) = self.create_unmatched_event(side, &row) {
1925 output.push(Output::Event(event));
1926 }
1927 }
1928 _ => {}
1929 }
1930 }
1931
1932 output
1933 }
1934
1935 fn parse_timer_key(key: &[u8]) -> Option<(TimerKeyType, JoinSide, Vec<u8>)> {
1937 if key.is_empty() {
1938 return None;
1939 }
1940
1941 match key[0] {
1942 LEFT_TIMER_PREFIX => {
1943 let state_key = key[1..].to_vec();
1944 Some((TimerKeyType::Cleanup, JoinSide::Left, state_key))
1945 }
1946 RIGHT_TIMER_PREFIX => {
1947 let state_key = key[1..].to_vec();
1948 Some((TimerKeyType::Cleanup, JoinSide::Right, state_key))
1949 }
1950 UNMATCHED_TIMER_PREFIX => {
1951 if key.len() < 2 {
1952 return None;
1953 }
1954 let side = match key[1] {
1955 0x01 => JoinSide::Left,
1956 0x02 => JoinSide::Right,
1957 _ => return None,
1958 };
1959 let state_key = key[2..].to_vec();
1960 Some((TimerKeyType::Unmatched, side, state_key))
1961 }
1962 _ => None,
1963 }
1964 }
1965}
1966
1967#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1969enum TimerKeyType {
1970 Cleanup,
1972 Unmatched,
1974}
1975
1976impl Operator for StreamJoinOperator {
1977 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1978 self.process_left(event, ctx)
1980 }
1981
1982 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
1983 let Some((timer_type, side, state_key)) = Self::parse_timer_key(&timer.key) else {
1984 return OutputVec::new();
1985 };
1986
1987 match timer_type {
1988 TimerKeyType::Cleanup => self.handle_cleanup_timer(side, &state_key, ctx),
1989 TimerKeyType::Unmatched => self.handle_unmatched_timer(side, &state_key, ctx),
1990 }
1991 }
1992
1993 fn checkpoint(&self) -> OperatorState {
1994 let checkpoint_data = (
1998 (
2000 self.left_key_column.clone(),
2001 self.right_key_column.clone(),
2002 self.time_bound_ms,
2003 ),
2004 (
2006 self.metrics.left_events,
2007 self.metrics.right_events,
2008 self.metrics.matches,
2009 ),
2010 (
2012 self.metrics.cpu_friendly_encodes,
2013 self.metrics.compact_encodes,
2014 self.metrics.asymmetric_skips,
2015 self.metrics.idle_key_cleanups,
2016 self.metrics.build_side_prunes,
2017 ),
2018 (
2020 self.left_stats.events_received,
2021 self.right_stats.events_received,
2022 self.left_watermark,
2023 self.right_watermark,
2024 ),
2025 );
2026
2027 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
2028 .map(|v| v.to_vec())
2029 .unwrap_or_default();
2030
2031 OperatorState {
2032 operator_id: self.operator_id.clone(),
2033 data,
2034 }
2035 }
2036
2037 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
2038 type CheckpointData = (
2040 (String, String, i64), (u64, u64, u64), (u64, u64, u64, u64, u64), (u64, u64, i64, i64), );
2045 type LegacyCheckpointData = (String, String, i64, u64, u64, u64);
2047
2048 if state.operator_id != self.operator_id {
2049 return Err(OperatorError::StateAccessFailed(format!(
2050 "Operator ID mismatch: expected {}, got {}",
2051 self.operator_id, state.operator_id
2052 )));
2053 }
2054
2055 if let Ok(archived) = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
2057 {
2058 if let Ok(data) = rkyv::deserialize::<CheckpointData, RkyvError>(archived) {
2059 let (
2060 _config,
2061 (left_events, right_events, matches),
2062 (
2063 cpu_friendly_encodes,
2064 compact_encodes,
2065 asymmetric_skips,
2066 idle_key_cleanups,
2067 build_side_prunes,
2068 ),
2069 (left_received, right_received, left_wm, right_wm),
2070 ) = data;
2071
2072 self.metrics.left_events = left_events;
2073 self.metrics.right_events = right_events;
2074 self.metrics.matches = matches;
2075 self.metrics.cpu_friendly_encodes = cpu_friendly_encodes;
2076 self.metrics.compact_encodes = compact_encodes;
2077 self.metrics.asymmetric_skips = asymmetric_skips;
2078 self.metrics.idle_key_cleanups = idle_key_cleanups;
2079 self.metrics.build_side_prunes = build_side_prunes;
2080 self.left_stats.events_received = left_received;
2081 self.right_stats.events_received = right_received;
2082 self.left_watermark = left_wm;
2083 self.right_watermark = right_wm;
2084
2085 return Ok(());
2086 }
2087 }
2088
2089 let archived = rkyv::access::<rkyv::Archived<LegacyCheckpointData>, RkyvError>(&state.data)
2091 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2092 let (_, _, _, left_events, right_events, matches) =
2093 rkyv::deserialize::<LegacyCheckpointData, RkyvError>(archived)
2094 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2095
2096 self.metrics.left_events = left_events;
2097 self.metrics.right_events = right_events;
2098 self.metrics.matches = matches;
2099
2100 Ok(())
2101 }
2102}
2103
2104#[cfg(test)]
2105#[allow(clippy::cast_possible_wrap)]
2106mod tests {
2107 use super::*;
2108 use crate::state::InMemoryStore;
2109 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
2110 use arrow_array::{Int64Array, StringArray};
2111 use arrow_schema::{DataType, Field, Schema};
2112
2113 fn create_order_event(timestamp: i64, order_id: &str, amount: i64) -> Event {
2114 let schema = Arc::new(Schema::new(vec![
2115 Field::new("order_id", DataType::Utf8, false),
2116 Field::new("amount", DataType::Int64, false),
2117 ]));
2118 let batch = RecordBatch::try_new(
2119 schema,
2120 vec![
2121 Arc::new(StringArray::from(vec![order_id])),
2122 Arc::new(Int64Array::from(vec![amount])),
2123 ],
2124 )
2125 .unwrap();
2126 Event::new(timestamp, batch)
2127 }
2128
2129 fn create_payment_event(timestamp: i64, order_id: &str, status: &str) -> Event {
2130 let schema = Arc::new(Schema::new(vec![
2131 Field::new("order_id", DataType::Utf8, false),
2132 Field::new("status", DataType::Utf8, false),
2133 ]));
2134 let batch = RecordBatch::try_new(
2135 schema,
2136 vec![
2137 Arc::new(StringArray::from(vec![order_id])),
2138 Arc::new(StringArray::from(vec![status])),
2139 ],
2140 )
2141 .unwrap();
2142 Event::new(timestamp, batch)
2143 }
2144
2145 fn create_test_context<'a>(
2146 timers: &'a mut TimerService,
2147 state: &'a mut dyn StateStore,
2148 watermark_gen: &'a mut dyn WatermarkGenerator,
2149 ) -> OperatorContext<'a> {
2150 OperatorContext {
2151 event_time: 0,
2152 processing_time: 0,
2153 timers,
2154 state,
2155 watermark_generator: watermark_gen,
2156 operator_index: 0,
2157 }
2158 }
2159
2160 #[test]
2161 fn test_join_type_properties() {
2162 assert!(!JoinType::Inner.emits_unmatched_left());
2163 assert!(!JoinType::Inner.emits_unmatched_right());
2164
2165 assert!(JoinType::Left.emits_unmatched_left());
2166 assert!(!JoinType::Left.emits_unmatched_right());
2167
2168 assert!(!JoinType::Right.emits_unmatched_left());
2169 assert!(JoinType::Right.emits_unmatched_right());
2170
2171 assert!(JoinType::Full.emits_unmatched_left());
2172 assert!(JoinType::Full.emits_unmatched_right());
2173 }
2174
2175 #[test]
2176 fn test_join_operator_creation() {
2177 let operator = StreamJoinOperator::new(
2178 "order_id".to_string(),
2179 "order_id".to_string(),
2180 Duration::from_secs(3600),
2181 JoinType::Inner,
2182 );
2183
2184 assert_eq!(operator.join_type(), JoinType::Inner);
2185 assert_eq!(operator.time_bound_ms(), 3_600_000);
2186 }
2187
2188 #[test]
2189 fn test_join_operator_with_id() {
2190 let operator = StreamJoinOperator::with_id(
2191 "order_id".to_string(),
2192 "order_id".to_string(),
2193 Duration::from_secs(3600),
2194 JoinType::Left,
2195 "test_join".to_string(),
2196 );
2197
2198 assert_eq!(operator.operator_id, "test_join");
2199 assert_eq!(operator.join_type(), JoinType::Left);
2200 }
2201
2202 #[test]
2203 fn test_inner_join_basic() {
2204 let mut operator = StreamJoinOperator::with_id(
2205 "order_id".to_string(),
2206 "order_id".to_string(),
2207 Duration::from_secs(3600),
2208 JoinType::Inner,
2209 "test_join".to_string(),
2210 );
2211
2212 let mut timers = TimerService::new();
2213 let mut state = InMemoryStore::new();
2214 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2215
2216 let order = create_order_event(1000, "order_1", 100);
2218 {
2219 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2220 let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2221 assert!(
2223 outputs
2224 .iter()
2225 .filter(|o| matches!(o, Output::Event(_)))
2226 .count()
2227 == 0
2228 );
2229 }
2230
2231 let payment = create_payment_event(2000, "order_1", "paid");
2233 {
2234 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2235 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2236 assert_eq!(
2238 outputs
2239 .iter()
2240 .filter(|o| matches!(o, Output::Event(_)))
2241 .count(),
2242 1
2243 );
2244 }
2245
2246 assert_eq!(operator.metrics().matches, 1);
2247 assert_eq!(operator.metrics().left_events, 1);
2248 assert_eq!(operator.metrics().right_events, 1);
2249 }
2250
2251 #[test]
2252 fn test_inner_join_no_match_different_key() {
2253 let mut operator = StreamJoinOperator::with_id(
2254 "order_id".to_string(),
2255 "order_id".to_string(),
2256 Duration::from_secs(3600),
2257 JoinType::Inner,
2258 "test_join".to_string(),
2259 );
2260
2261 let mut timers = TimerService::new();
2262 let mut state = InMemoryStore::new();
2263 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2264
2265 let order = create_order_event(1000, "order_1", 100);
2267 {
2268 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2269 operator.process_side(&order, JoinSide::Left, &mut ctx);
2270 }
2271
2272 let payment = create_payment_event(2000, "order_2", "paid");
2274 {
2275 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2276 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2277 assert_eq!(
2279 outputs
2280 .iter()
2281 .filter(|o| matches!(o, Output::Event(_)))
2282 .count(),
2283 0
2284 );
2285 }
2286
2287 assert_eq!(operator.metrics().matches, 0);
2288 }
2289
2290 #[test]
2291 fn test_inner_join_no_match_outside_time_bound() {
2292 let mut operator = StreamJoinOperator::with_id(
2293 "order_id".to_string(),
2294 "order_id".to_string(),
2295 Duration::from_secs(1), JoinType::Inner,
2297 "test_join".to_string(),
2298 );
2299
2300 let mut timers = TimerService::new();
2301 let mut state = InMemoryStore::new();
2302 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2303
2304 let order = create_order_event(1000, "order_1", 100);
2306 {
2307 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2308 operator.process_side(&order, JoinSide::Left, &mut ctx);
2309 }
2310
2311 let payment = create_payment_event(5000, "order_1", "paid");
2313 {
2314 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2315 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2316 assert_eq!(
2318 outputs
2319 .iter()
2320 .filter(|o| matches!(o, Output::Event(_)))
2321 .count(),
2322 0
2323 );
2324 }
2325
2326 assert_eq!(operator.metrics().matches, 0);
2327 }
2328
2329 #[test]
2330 fn test_join_multiple_matches() {
2331 let mut operator = StreamJoinOperator::with_id(
2332 "order_id".to_string(),
2333 "order_id".to_string(),
2334 Duration::from_secs(3600),
2335 JoinType::Inner,
2336 "test_join".to_string(),
2337 );
2338
2339 let mut timers = TimerService::new();
2340 let mut state = InMemoryStore::new();
2341 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2342
2343 for ts in [1000, 2000] {
2345 let order = create_order_event(ts, "order_1", 100);
2346 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2347 operator.process_side(&order, JoinSide::Left, &mut ctx);
2348 }
2349
2350 let payment = create_payment_event(1500, "order_1", "paid");
2352 {
2353 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2354 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2355 assert_eq!(
2357 outputs
2358 .iter()
2359 .filter(|o| matches!(o, Output::Event(_)))
2360 .count(),
2361 2
2362 );
2363 }
2364
2365 assert_eq!(operator.metrics().matches, 2);
2366 }
2367
2368 #[test]
2369 fn test_join_late_event() {
2370 let mut operator = StreamJoinOperator::with_id(
2371 "order_id".to_string(),
2372 "order_id".to_string(),
2373 Duration::from_secs(1),
2374 JoinType::Inner,
2375 "test_join".to_string(),
2376 );
2377
2378 let mut timers = TimerService::new();
2379 let mut state = InMemoryStore::new();
2380 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2381
2382 let future_order = create_order_event(10000, "order_2", 200);
2384 {
2385 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2386 operator.process_side(&future_order, JoinSide::Left, &mut ctx);
2387 }
2388
2389 let late_payment = create_payment_event(100, "order_1", "paid");
2391 {
2392 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2393 let outputs = operator.process_side(&late_payment, JoinSide::Right, &mut ctx);
2394 assert!(outputs.iter().any(|o| matches!(o, Output::LateEvent(_))));
2396 }
2397
2398 assert_eq!(operator.metrics().late_events, 1);
2399 }
2400
2401 #[test]
2402 fn test_join_row_serialization() {
2403 let schema = Arc::new(Schema::new(vec![
2404 Field::new("id", DataType::Utf8, false),
2405 Field::new("value", DataType::Int64, false),
2406 ]));
2407 let batch = RecordBatch::try_new(
2408 schema,
2409 vec![
2410 Arc::new(StringArray::from(vec!["test"])),
2411 Arc::new(Int64Array::from(vec![42])),
2412 ],
2413 )
2414 .unwrap();
2415
2416 let row = JoinRow::new(1000, b"key".to_vec(), &batch).unwrap();
2417
2418 let restored_batch = row.to_batch().unwrap();
2420 assert_eq!(restored_batch.num_rows(), 1);
2421 assert_eq!(restored_batch.num_columns(), 2);
2422 }
2423
2424 #[test]
2425 fn test_cleanup_timer() {
2426 let mut operator = StreamJoinOperator::with_id(
2427 "order_id".to_string(),
2428 "order_id".to_string(),
2429 Duration::from_secs(1),
2430 JoinType::Inner,
2431 "test_join".to_string(),
2432 );
2433
2434 let mut timers = TimerService::new();
2435 let mut state = InMemoryStore::new();
2436 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2437
2438 let order = create_order_event(1000, "order_1", 100);
2440 {
2441 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2442 operator.process_side(&order, JoinSide::Left, &mut ctx);
2443 }
2444
2445 assert!(state.len() > 0);
2447 let initial_state_len = state.len();
2448
2449 let registered_timers = timers.poll_timers(2001); assert!(!registered_timers.is_empty());
2452
2453 for timer_reg in registered_timers {
2455 let timer = Timer {
2456 key: timer_reg.key.unwrap_or_default(),
2457 timestamp: timer_reg.timestamp,
2458 };
2459 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2460 operator.on_timer(timer, &mut ctx);
2461 }
2462
2463 assert!(state.len() < initial_state_len || operator.metrics().state_cleanups > 0);
2465 }
2466
2467 #[test]
2468 fn test_checkpoint_restore() {
2469 let mut operator = StreamJoinOperator::with_id(
2470 "order_id".to_string(),
2471 "order_id".to_string(),
2472 Duration::from_secs(3600),
2473 JoinType::Inner,
2474 "test_join".to_string(),
2475 );
2476
2477 operator.metrics.left_events = 10;
2479 operator.metrics.right_events = 5;
2480 operator.metrics.matches = 3;
2481
2482 let checkpoint = operator.checkpoint();
2484
2485 let mut restored = StreamJoinOperator::with_id(
2487 "order_id".to_string(),
2488 "order_id".to_string(),
2489 Duration::from_secs(3600),
2490 JoinType::Inner,
2491 "test_join".to_string(),
2492 );
2493
2494 restored.restore(checkpoint).unwrap();
2495
2496 assert_eq!(restored.metrics().left_events, 10);
2497 assert_eq!(restored.metrics().right_events, 5);
2498 assert_eq!(restored.metrics().matches, 3);
2499 }
2500
2501 #[test]
2502 fn test_metrics_reset() {
2503 let mut operator = StreamJoinOperator::new(
2504 "order_id".to_string(),
2505 "order_id".to_string(),
2506 Duration::from_secs(3600),
2507 JoinType::Inner,
2508 );
2509
2510 operator.metrics.left_events = 10;
2511 operator.metrics.matches = 5;
2512
2513 operator.reset_metrics();
2514
2515 assert_eq!(operator.metrics().left_events, 0);
2516 assert_eq!(operator.metrics().matches, 0);
2517 }
2518
2519 #[test]
2520 fn test_bidirectional_join() {
2521 let mut operator = StreamJoinOperator::with_id(
2522 "order_id".to_string(),
2523 "order_id".to_string(),
2524 Duration::from_secs(3600),
2525 JoinType::Inner,
2526 "test_join".to_string(),
2527 );
2528
2529 let mut timers = TimerService::new();
2530 let mut state = InMemoryStore::new();
2531 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2532
2533 let payment = create_payment_event(1000, "order_1", "paid");
2535 {
2536 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2537 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2538 assert_eq!(
2539 outputs
2540 .iter()
2541 .filter(|o| matches!(o, Output::Event(_)))
2542 .count(),
2543 0
2544 );
2545 }
2546
2547 let order = create_order_event(1500, "order_1", 100);
2549 {
2550 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2551 let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2552 assert_eq!(
2553 outputs
2554 .iter()
2555 .filter(|o| matches!(o, Output::Event(_)))
2556 .count(),
2557 1
2558 );
2559 }
2560
2561 assert_eq!(operator.metrics().matches, 1);
2562 }
2563
2564 #[test]
2565 fn test_integer_key_join() {
2566 fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
2567 let schema = Arc::new(Schema::new(vec![
2568 Field::new("key", DataType::Int64, false),
2569 Field::new("value", DataType::Int64, false),
2570 ]));
2571 let batch = RecordBatch::try_new(
2572 schema,
2573 vec![
2574 Arc::new(Int64Array::from(vec![key])),
2575 Arc::new(Int64Array::from(vec![value])),
2576 ],
2577 )
2578 .unwrap();
2579 Event::new(timestamp, batch)
2580 }
2581
2582 let mut operator = StreamJoinOperator::with_id(
2583 "key".to_string(),
2584 "key".to_string(),
2585 Duration::from_secs(3600),
2586 JoinType::Inner,
2587 "test_join".to_string(),
2588 );
2589
2590 let mut timers = TimerService::new();
2591 let mut state = InMemoryStore::new();
2592 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2593
2594 let left = create_int_key_event(1000, 42, 100);
2596 {
2597 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2598 operator.process_side(&left, JoinSide::Left, &mut ctx);
2599 }
2600
2601 let right = create_int_key_event(1500, 42, 200);
2603 {
2604 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2605 let outputs = operator.process_side(&right, JoinSide::Right, &mut ctx);
2606 assert_eq!(
2607 outputs
2608 .iter()
2609 .filter(|o| matches!(o, Output::Event(_)))
2610 .count(),
2611 1
2612 );
2613 }
2614
2615 assert_eq!(operator.metrics().matches, 1);
2616 }
2617
2618 #[test]
2621 fn test_f057_join_row_encoding_enum() {
2622 assert_eq!(JoinRowEncoding::default(), JoinRowEncoding::Compact);
2623 assert_eq!(format!("{}", JoinRowEncoding::Compact), "compact");
2624 assert_eq!(format!("{}", JoinRowEncoding::CpuFriendly), "cpu_friendly");
2625
2626 assert_eq!(
2627 "compact".parse::<JoinRowEncoding>().unwrap(),
2628 JoinRowEncoding::Compact
2629 );
2630 assert_eq!(
2631 "cpu_friendly".parse::<JoinRowEncoding>().unwrap(),
2632 JoinRowEncoding::CpuFriendly
2633 );
2634 assert_eq!(
2635 "cpu-friendly".parse::<JoinRowEncoding>().unwrap(),
2636 JoinRowEncoding::CpuFriendly
2637 );
2638 assert!("invalid".parse::<JoinRowEncoding>().is_err());
2639 }
2640
2641 #[test]
2642 fn test_f057_config_builder() {
2643 let config = StreamJoinConfig::builder()
2644 .left_key_column("order_id")
2645 .right_key_column("payment_id")
2646 .time_bound(Duration::from_secs(3600))
2647 .join_type(JoinType::Left)
2648 .operator_id("test_join")
2649 .row_encoding(JoinRowEncoding::CpuFriendly)
2650 .asymmetric_compaction(true)
2651 .idle_threshold(Duration::from_secs(120))
2652 .per_key_tracking(true)
2653 .key_idle_threshold(Duration::from_secs(600))
2654 .build_side_pruning(true)
2655 .build_side(JoinSide::Left)
2656 .build();
2657
2658 assert_eq!(config.left_key_column, "order_id");
2659 assert_eq!(config.right_key_column, "payment_id");
2660 assert_eq!(config.time_bound_ms, 3_600_000);
2661 assert_eq!(config.join_type, JoinType::Left);
2662 assert_eq!(config.operator_id, Some("test_join".to_string()));
2663 assert_eq!(config.row_encoding, JoinRowEncoding::CpuFriendly);
2664 assert!(config.asymmetric_compaction);
2665 assert_eq!(config.idle_threshold_ms, 120_000);
2666 assert!(config.per_key_tracking);
2667 assert_eq!(config.key_idle_threshold_ms, 600_000);
2668 assert!(config.build_side_pruning);
2669 assert_eq!(config.build_side, Some(JoinSide::Left));
2670 }
2671
2672 #[test]
2673 fn test_f057_from_config() {
2674 let config = StreamJoinConfig::builder()
2675 .left_key_column("key")
2676 .right_key_column("key")
2677 .time_bound(Duration::from_secs(60))
2678 .join_type(JoinType::Inner)
2679 .row_encoding(JoinRowEncoding::CpuFriendly)
2680 .build();
2681
2682 let operator = StreamJoinOperator::from_config(config);
2683
2684 assert_eq!(operator.row_encoding(), JoinRowEncoding::CpuFriendly);
2685 assert!(operator.asymmetric_compaction_enabled());
2686 assert!(operator.per_key_tracking_enabled());
2687 }
2688
2689 #[test]
2690 fn test_f057_cpu_friendly_encoding_roundtrip() {
2691 let schema = Arc::new(Schema::new(vec![
2692 Field::new("id", DataType::Utf8, false),
2693 Field::new("value", DataType::Int64, false),
2694 Field::new("price", DataType::Float64, false),
2695 ]));
2696 let batch = RecordBatch::try_new(
2697 schema,
2698 vec![
2699 Arc::new(StringArray::from(vec!["test_key"])),
2700 Arc::new(Int64Array::from(vec![42])),
2701 Arc::new(Float64Array::from(vec![99.99])),
2702 ],
2703 )
2704 .unwrap();
2705
2706 let row =
2708 JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
2709 .unwrap();
2710 assert_eq!(row.encoding(), JoinRowEncoding::CpuFriendly);
2711
2712 let restored = row.to_batch().unwrap();
2714 assert_eq!(restored.num_rows(), 1);
2715 assert_eq!(restored.num_columns(), 3);
2716
2717 let id_col = restored
2719 .column(0)
2720 .as_any()
2721 .downcast_ref::<StringArray>()
2722 .unwrap();
2723 assert_eq!(id_col.value(0), "test_key");
2724
2725 let value_col = restored
2726 .column(1)
2727 .as_any()
2728 .downcast_ref::<Int64Array>()
2729 .unwrap();
2730 assert_eq!(value_col.value(0), 42);
2731
2732 let price_col = restored
2733 .column(2)
2734 .as_any()
2735 .downcast_ref::<Float64Array>()
2736 .unwrap();
2737 assert!((price_col.value(0) - 99.99).abs() < 0.001);
2738 }
2739
2740 #[test]
2741 fn test_f057_compact_encoding_still_works() {
2742 let schema = Arc::new(Schema::new(vec![
2743 Field::new("id", DataType::Utf8, false),
2744 Field::new("value", DataType::Int64, false),
2745 ]));
2746 let batch = RecordBatch::try_new(
2747 schema,
2748 vec![
2749 Arc::new(StringArray::from(vec!["test"])),
2750 Arc::new(Int64Array::from(vec![100])),
2751 ],
2752 )
2753 .unwrap();
2754
2755 let row = JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::Compact)
2756 .unwrap();
2757 assert_eq!(row.encoding(), JoinRowEncoding::Compact);
2758
2759 let restored = row.to_batch().unwrap();
2760 assert_eq!(restored.num_rows(), 1);
2761 }
2762
2763 #[test]
2764 fn test_f057_side_stats_tracking() {
2765 let mut stats = SideStats::new();
2766 assert_eq!(stats.events_received, 0);
2767 assert!(!stats.is_idle(1000, 60_000)); stats.record_event(1000);
2771 assert_eq!(stats.events_received, 1);
2772 assert_eq!(stats.last_event_time, 1000);
2773
2774 stats.record_event(2000);
2775 assert_eq!(stats.events_received, 2);
2776 assert_eq!(stats.last_event_time, 2000);
2777
2778 assert!(!stats.is_idle(2000, 60_000)); assert!(!stats.is_idle(50_000, 60_000)); stats.events_this_window = 0;
2784 assert!(stats.is_idle(100_000, 60_000)); }
2786
2787 #[test]
2788 fn test_f057_key_metadata_tracking() {
2789 let mut meta = KeyMetadata::new(1000);
2790 assert_eq!(meta.last_activity, 1000);
2791 assert_eq!(meta.event_count, 1);
2792 assert_eq!(meta.state_entries, 1);
2793
2794 meta.record_event(2000);
2795 assert_eq!(meta.last_activity, 2000);
2796 assert_eq!(meta.event_count, 2);
2797 assert_eq!(meta.state_entries, 2);
2798
2799 meta.decrement_entries();
2800 assert_eq!(meta.state_entries, 1);
2801
2802 assert!(!meta.is_idle(2000, 60_000));
2803 assert!(meta.is_idle(100_000, 60_000));
2804 }
2805
2806 #[test]
2807 fn test_f057_per_key_tracking_in_operator() {
2808 let config = StreamJoinConfig::builder()
2809 .left_key_column("order_id")
2810 .right_key_column("order_id")
2811 .time_bound(Duration::from_secs(3600))
2812 .join_type(JoinType::Inner)
2813 .per_key_tracking(true)
2814 .build();
2815
2816 let mut operator = StreamJoinOperator::from_config(config);
2817 let mut timers = TimerService::new();
2818 let mut state = InMemoryStore::new();
2819 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2820
2821 for (i, key) in ["order_1", "order_2", "order_3"].iter().enumerate() {
2823 let event = create_order_event(1000 + i as i64 * 100, key, 100);
2824 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2825 operator.process_side(&event, JoinSide::Left, &mut ctx);
2826 }
2827
2828 assert_eq!(operator.tracked_key_count(), 3);
2830 assert_eq!(operator.metrics().tracked_keys, 3);
2831 }
2832
2833 #[test]
2834 fn test_f057_encoding_metrics() {
2835 let mut compact_op = StreamJoinOperator::from_config(
2837 StreamJoinConfig::builder()
2838 .left_key_column("order_id")
2839 .right_key_column("order_id")
2840 .time_bound(Duration::from_secs(3600))
2841 .join_type(JoinType::Inner)
2842 .row_encoding(JoinRowEncoding::Compact)
2843 .build(),
2844 );
2845
2846 let mut timers = TimerService::new();
2847 let mut state = InMemoryStore::new();
2848 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2849
2850 let event = create_order_event(1000, "order_1", 100);
2851 {
2852 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2853 compact_op.process_side(&event, JoinSide::Left, &mut ctx);
2854 }
2855 assert_eq!(compact_op.metrics().compact_encodes, 1);
2856 assert_eq!(compact_op.metrics().cpu_friendly_encodes, 0);
2857
2858 let mut cpu_op = StreamJoinOperator::from_config(
2860 StreamJoinConfig::builder()
2861 .left_key_column("order_id")
2862 .right_key_column("order_id")
2863 .time_bound(Duration::from_secs(3600))
2864 .join_type(JoinType::Inner)
2865 .row_encoding(JoinRowEncoding::CpuFriendly)
2866 .build(),
2867 );
2868
2869 let mut state2 = InMemoryStore::new();
2870 {
2871 let mut ctx = create_test_context(&mut timers, &mut state2, &mut watermark_gen);
2872 cpu_op.process_side(&event, JoinSide::Left, &mut ctx);
2873 }
2874 assert_eq!(cpu_op.metrics().cpu_friendly_encodes, 1);
2875 assert_eq!(cpu_op.metrics().compact_encodes, 0);
2876 }
2877
2878 #[test]
2879 fn test_f057_asymmetric_compaction_detection() {
2880 let config = StreamJoinConfig::builder()
2881 .left_key_column("order_id")
2882 .right_key_column("order_id")
2883 .time_bound(Duration::from_secs(60))
2884 .join_type(JoinType::Inner)
2885 .asymmetric_compaction(true)
2886 .idle_threshold(Duration::from_secs(10)) .build();
2888
2889 let mut operator = StreamJoinOperator::from_config(config);
2890 let mut timers = TimerService::new();
2891 let mut state = InMemoryStore::new();
2892 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2893
2894 for i in 0..5 {
2896 let event = create_order_event(1000 + i * 100, "order_1", 100);
2897 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2898 ctx.processing_time = 1000 + i * 100;
2899 operator.process_side(&event, JoinSide::Left, &mut ctx);
2900 }
2901
2902 assert!(!operator.is_side_idle(JoinSide::Left, 1500));
2904
2905 assert!(!operator.is_side_idle(JoinSide::Right, 1500));
2907
2908 operator.left_stats.events_this_window = 0;
2910 assert!(operator.is_side_idle(JoinSide::Left, 100_000));
2911 }
2912
2913 #[test]
2914 fn test_f057_effective_build_side_selection() {
2915 let config = StreamJoinConfig::builder()
2917 .left_key_column("key")
2918 .right_key_column("key")
2919 .time_bound(Duration::from_secs(60))
2920 .join_type(JoinType::Inner)
2921 .build_side(JoinSide::Right)
2922 .build();
2923
2924 let operator = StreamJoinOperator::from_config(config);
2925 assert_eq!(operator.effective_build_side(), JoinSide::Right);
2926
2927 let config2 = StreamJoinConfig::builder()
2929 .left_key_column("key")
2930 .right_key_column("key")
2931 .time_bound(Duration::from_secs(60))
2932 .join_type(JoinType::Inner)
2933 .build();
2934
2935 let mut operator2 = StreamJoinOperator::from_config(config2);
2936 operator2.left_stats.events_received = 100;
2937 operator2.right_stats.events_received = 1000;
2938
2939 assert_eq!(operator2.effective_build_side(), JoinSide::Left);
2941 }
2942
2943 #[test]
2944 fn test_f057_join_with_cpu_friendly_encoding() {
2945 let config = StreamJoinConfig::builder()
2946 .left_key_column("order_id")
2947 .right_key_column("order_id")
2948 .time_bound(Duration::from_secs(3600))
2949 .join_type(JoinType::Inner)
2950 .row_encoding(JoinRowEncoding::CpuFriendly)
2951 .build();
2952
2953 let mut operator = StreamJoinOperator::from_config(config);
2954 let mut timers = TimerService::new();
2955 let mut state = InMemoryStore::new();
2956 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2957
2958 let order = create_order_event(1000, "order_1", 100);
2960 {
2961 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2962 operator.process_side(&order, JoinSide::Left, &mut ctx);
2963 }
2964
2965 let payment = create_payment_event(2000, "order_1", "paid");
2967 {
2968 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2969 let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2970 assert_eq!(
2971 outputs
2972 .iter()
2973 .filter(|o| matches!(o, Output::Event(_)))
2974 .count(),
2975 1
2976 );
2977 }
2978
2979 assert_eq!(operator.metrics().matches, 1);
2980 assert_eq!(operator.metrics().cpu_friendly_encodes, 2); }
2982
2983 #[test]
2984 fn test_f057_checkpoint_restore_with_optimization_state() {
2985 let config = StreamJoinConfig::builder()
2986 .left_key_column("key")
2987 .right_key_column("key")
2988 .time_bound(Duration::from_secs(60))
2989 .join_type(JoinType::Inner)
2990 .operator_id("test_join")
2991 .build();
2992
2993 let mut operator = StreamJoinOperator::from_config(config);
2994
2995 operator.metrics.left_events = 100;
2997 operator.metrics.right_events = 50;
2998 operator.metrics.matches = 25;
2999 operator.metrics.cpu_friendly_encodes = 10;
3000 operator.metrics.compact_encodes = 140;
3001 operator.metrics.asymmetric_skips = 5;
3002 operator.metrics.idle_key_cleanups = 3;
3003 operator.metrics.build_side_prunes = 2;
3004 operator.left_stats.events_received = 100;
3005 operator.right_stats.events_received = 50;
3006 operator.left_watermark = 5000;
3007 operator.right_watermark = 4000;
3008
3009 let checkpoint = operator.checkpoint();
3011
3012 let config2 = StreamJoinConfig::builder()
3014 .left_key_column("key")
3015 .right_key_column("key")
3016 .time_bound(Duration::from_secs(60))
3017 .join_type(JoinType::Inner)
3018 .operator_id("test_join")
3019 .build();
3020
3021 let mut restored = StreamJoinOperator::from_config(config2);
3022 restored.restore(checkpoint).unwrap();
3023
3024 assert_eq!(restored.metrics().left_events, 100);
3026 assert_eq!(restored.metrics().right_events, 50);
3027 assert_eq!(restored.metrics().matches, 25);
3028 assert_eq!(restored.metrics().cpu_friendly_encodes, 10);
3029 assert_eq!(restored.metrics().compact_encodes, 140);
3030 assert_eq!(restored.metrics().asymmetric_skips, 5);
3031 assert_eq!(restored.metrics().idle_key_cleanups, 3);
3032 assert_eq!(restored.metrics().build_side_prunes, 2);
3033 assert_eq!(restored.left_stats.events_received, 100);
3034 assert_eq!(restored.right_stats.events_received, 50);
3035 assert_eq!(restored.left_watermark, 5000);
3036 assert_eq!(restored.right_watermark, 4000);
3037 }
3038
3039 #[test]
3040 fn test_f057_should_skip_compaction() {
3041 let config = StreamJoinConfig::builder()
3042 .left_key_column("key")
3043 .right_key_column("key")
3044 .time_bound(Duration::from_secs(60))
3045 .join_type(JoinType::Inner)
3046 .asymmetric_compaction(true)
3047 .idle_threshold(Duration::from_secs(10))
3048 .build();
3049
3050 let mut operator = StreamJoinOperator::from_config(config);
3051
3052 operator.left_stats.record_event(1000);
3054 operator.left_stats.events_this_window = 0; assert!(operator.should_skip_compaction(JoinSide::Left, 100_000));
3058
3059 operator.asymmetric_compaction = false;
3061 assert!(!operator.should_skip_compaction(JoinSide::Left, 100_000));
3062 }
3063
3064 #[test]
3065 fn test_f057_multiple_rows_cpu_friendly() {
3066 let schema = Arc::new(Schema::new(vec![
3068 Field::new("id", DataType::Int64, false),
3069 Field::new("name", DataType::Utf8, false),
3070 ]));
3071
3072 let batch = RecordBatch::try_new(
3074 schema.clone(),
3075 vec![
3076 Arc::new(Int64Array::from(vec![1])),
3077 Arc::new(StringArray::from(vec!["Alice"])),
3078 ],
3079 )
3080 .unwrap();
3081
3082 let row =
3083 JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3084 .unwrap();
3085 let restored = row.to_batch().unwrap();
3086
3087 let id_col = restored
3088 .column(0)
3089 .as_any()
3090 .downcast_ref::<Int64Array>()
3091 .unwrap();
3092 let name_col = restored
3093 .column(1)
3094 .as_any()
3095 .downcast_ref::<StringArray>()
3096 .unwrap();
3097
3098 assert_eq!(id_col.value(0), 1);
3099 assert_eq!(name_col.value(0), "Alice");
3100 }
3101}