1use super::{
62 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
63 TimerKey,
64};
65use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
66use arrow_schema::{DataType, Field, Schema, SchemaRef};
67use fxhash::FxHashMap;
68use rkyv::{
69 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
70};
71use smallvec::SmallVec;
72use std::collections::BTreeMap;
73use std::sync::atomic::{AtomicU64, Ordering};
74use std::sync::Arc;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
78pub enum TemporalJoinSemantics {
79 #[default]
82 EventTime,
83
84 ProcessTime,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum TableCharacteristics {
92 #[default]
95 AppendOnly,
96
97 NonAppendOnly,
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
104pub enum TemporalJoinType {
105 #[default]
107 Inner,
108 Left,
110}
111
112impl TemporalJoinType {
113 #[must_use]
115 pub fn emits_unmatched(&self) -> bool {
116 matches!(self, TemporalJoinType::Left)
117 }
118}
119
120#[derive(Debug, Clone)]
122pub struct TemporalJoinConfig {
123 pub stream_key_column: String,
125 pub table_key_column: String,
127 pub table_version_column: String,
129 pub semantics: TemporalJoinSemantics,
131 pub table_characteristics: TableCharacteristics,
133 pub join_type: TemporalJoinType,
135 pub operator_id: Option<String>,
137 pub max_versions_per_key: usize,
139}
140
141impl TemporalJoinConfig {
142 #[must_use]
144 pub fn builder() -> TemporalJoinConfigBuilder {
145 TemporalJoinConfigBuilder::default()
146 }
147}
148
149#[derive(Debug, Default)]
151pub struct TemporalJoinConfigBuilder {
152 stream_key_column: Option<String>,
153 table_key_column: Option<String>,
154 table_version_column: Option<String>,
155 semantics: Option<TemporalJoinSemantics>,
156 table_characteristics: Option<TableCharacteristics>,
157 join_type: Option<TemporalJoinType>,
158 operator_id: Option<String>,
159 max_versions_per_key: Option<usize>,
160}
161
162impl TemporalJoinConfigBuilder {
163 #[must_use]
165 pub fn stream_key_column(mut self, column: String) -> Self {
166 self.stream_key_column = Some(column);
167 self
168 }
169
170 #[must_use]
172 pub fn table_key_column(mut self, column: String) -> Self {
173 self.table_key_column = Some(column);
174 self
175 }
176
177 #[must_use]
179 pub fn table_version_column(mut self, column: String) -> Self {
180 self.table_version_column = Some(column);
181 self
182 }
183
184 #[must_use]
186 pub fn semantics(mut self, semantics: TemporalJoinSemantics) -> Self {
187 self.semantics = Some(semantics);
188 self
189 }
190
191 #[must_use]
193 pub fn table_characteristics(mut self, characteristics: TableCharacteristics) -> Self {
194 self.table_characteristics = Some(characteristics);
195 self
196 }
197
198 #[must_use]
200 pub fn join_type(mut self, join_type: TemporalJoinType) -> Self {
201 self.join_type = Some(join_type);
202 self
203 }
204
205 #[must_use]
207 pub fn operator_id(mut self, id: String) -> Self {
208 self.operator_id = Some(id);
209 self
210 }
211
212 #[must_use]
214 pub fn max_versions_per_key(mut self, max: usize) -> Self {
215 self.max_versions_per_key = Some(max);
216 self
217 }
218
219 #[must_use]
225 pub fn build(self) -> TemporalJoinConfig {
226 TemporalJoinConfig {
227 stream_key_column: self
228 .stream_key_column
229 .expect("stream_key_column is required"),
230 table_key_column: self.table_key_column.expect("table_key_column is required"),
231 table_version_column: self
232 .table_version_column
233 .expect("table_version_column is required"),
234 semantics: self.semantics.unwrap_or_default(),
235 table_characteristics: self.table_characteristics.unwrap_or_default(),
236 join_type: self.join_type.unwrap_or_default(),
237 operator_id: self.operator_id,
238 max_versions_per_key: self.max_versions_per_key.unwrap_or(0),
239 }
240 }
241}
242
243const TEMPORAL_TIMER_PREFIX: u8 = 0x60;
245
246static TEMPORAL_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
248
249#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
251pub struct TableRow {
252 pub version_timestamp: i64,
254 pub key_value: Vec<u8>,
256 pub data: Vec<u8>,
258}
259
260impl TableRow {
261 fn new(
263 version_timestamp: i64,
264 key_value: Vec<u8>,
265 batch: &RecordBatch,
266 ) -> Result<Self, OperatorError> {
267 let data = Self::serialize_batch(batch)?;
268 Ok(Self {
269 version_timestamp,
270 key_value,
271 data,
272 })
273 }
274
275 fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
277 let mut buf = Vec::new();
278 {
279 let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
280 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
281 writer
282 .write(batch)
283 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
284 writer
285 .finish()
286 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
287 }
288 Ok(buf)
289 }
290
291 fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
293 let cursor = std::io::Cursor::new(data);
294 let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
295 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
296 reader
297 .next()
298 .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
299 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
300 }
301
302 pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
308 Self::deserialize_batch(&self.data)
309 }
310}
311
312#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
314pub struct JoinedEventRecord {
315 pub event_timestamp: i64,
317 pub event_data: Vec<u8>,
319 pub table_version: i64,
321 pub key_value: Vec<u8>,
323}
324
325#[derive(Debug, Clone, Default)]
327pub struct VersionedKeyState {
328 pub versions: BTreeMap<i64, SmallVec<[TableRow; 1]>>,
331 pub min_version: i64,
333 pub max_version: i64,
335}
336
337impl VersionedKeyState {
338 #[must_use]
340 pub fn new() -> Self {
341 Self {
342 versions: BTreeMap::new(),
343 min_version: i64::MAX,
344 max_version: i64::MIN,
345 }
346 }
347
348 pub fn insert(&mut self, row: TableRow) {
350 let version = row.version_timestamp;
351 self.versions.entry(version).or_default().push(row);
352 self.min_version = self.min_version.min(version);
353 self.max_version = self.max_version.max(version);
354 }
355
356 #[must_use]
358 pub fn len(&self) -> usize {
359 self.versions.values().map(SmallVec::len).sum()
360 }
361
362 #[must_use]
364 pub fn is_empty(&self) -> bool {
365 self.versions.is_empty()
366 }
367
368 #[must_use]
371 pub fn lookup_at_time(&self, timestamp: i64) -> Option<&TableRow> {
372 let (_, rows) = self.versions.range(..=timestamp).next_back()?;
373 rows.last()
374 }
375
376 #[must_use]
378 pub fn lookup_latest(&self) -> Option<&TableRow> {
379 let (_, rows) = self.versions.iter().next_back()?;
380 rows.last()
381 }
382
383 pub fn cleanup_before(&mut self, threshold: i64) {
385 self.versions = self.versions.split_off(&threshold);
386 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
387 }
388
389 pub fn remove_version(&mut self, version: i64) -> Option<SmallVec<[TableRow; 1]>> {
391 let removed = self.versions.remove(&version);
392 if removed.is_some() {
393 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
394 self.max_version = self
395 .versions
396 .keys()
397 .next_back()
398 .copied()
399 .unwrap_or(i64::MIN);
400 }
401 removed
402 }
403
404 pub fn limit_versions(&mut self, max_versions: usize) {
406 if max_versions == 0 || self.versions.len() <= max_versions {
407 return;
408 }
409
410 let to_remove = self.versions.len() - max_versions;
411 let keys_to_remove: Vec<_> = self.versions.keys().take(to_remove).copied().collect();
412 for key in keys_to_remove {
413 self.versions.remove(&key);
414 }
415 self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
416 }
417}
418
419#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
421struct SerializableVersionedKeyState {
422 versions: Vec<(i64, Vec<TableRow>)>,
423 min_version: i64,
424 max_version: i64,
425}
426
427impl From<&VersionedKeyState> for SerializableVersionedKeyState {
428 fn from(state: &VersionedKeyState) -> Self {
429 Self {
430 versions: state
431 .versions
432 .iter()
433 .map(|(ts, rows)| (*ts, rows.to_vec()))
434 .collect(),
435 min_version: state.min_version,
436 max_version: state.max_version,
437 }
438 }
439}
440
441impl From<SerializableVersionedKeyState> for VersionedKeyState {
442 fn from(state: SerializableVersionedKeyState) -> Self {
443 let mut versions = BTreeMap::new();
444 for (ts, rows) in state.versions {
445 versions.insert(ts, SmallVec::from_vec(rows));
446 }
447 Self {
448 versions,
449 min_version: state.min_version,
450 max_version: state.max_version,
451 }
452 }
453}
454
455#[derive(Debug, Clone)]
457pub enum TableChange {
458 Insert(TableRow),
460 Update {
462 old: TableRow,
464 new: TableRow,
466 },
467 Delete(TableRow),
469}
470
471#[derive(Debug, Clone, Default)]
473pub struct TemporalJoinMetrics {
474 pub stream_events: u64,
476 pub table_inserts: u64,
478 pub table_updates: u64,
480 pub table_deletes: u64,
482 pub matches: u64,
484 pub unmatched: u64,
486 pub retractions: u64,
488 pub state_cleanups: u64,
490}
491
492impl TemporalJoinMetrics {
493 #[must_use]
495 pub fn new() -> Self {
496 Self::default()
497 }
498
499 pub fn reset(&mut self) {
501 *self = Self::default();
502 }
503}
504
505pub struct TemporalJoinOperator {
511 config: TemporalJoinConfig,
513 operator_id: String,
515 table_state: FxHashMap<Vec<u8>, VersionedKeyState>,
517 stream_state: FxHashMap<Vec<u8>, Vec<JoinedEventRecord>>,
520 watermark: i64,
522 metrics: TemporalJoinMetrics,
524 output_schema: Option<SchemaRef>,
526 stream_schema: Option<SchemaRef>,
528 table_schema: Option<SchemaRef>,
530}
531
532impl TemporalJoinOperator {
533 #[must_use]
535 pub fn new(config: TemporalJoinConfig) -> Self {
536 let operator_id = config.operator_id.clone().unwrap_or_else(|| {
537 let num = TEMPORAL_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
538 format!("temporal_join_{num}")
539 });
540
541 Self {
542 config,
543 operator_id,
544 table_state: FxHashMap::default(),
545 stream_state: FxHashMap::default(),
546 watermark: i64::MIN,
547 metrics: TemporalJoinMetrics::new(),
548 output_schema: None,
549 stream_schema: None,
550 table_schema: None,
551 }
552 }
553
554 #[must_use]
556 pub fn with_id(mut config: TemporalJoinConfig, operator_id: String) -> Self {
557 config.operator_id = Some(operator_id);
558 Self::new(config)
559 }
560
561 #[must_use]
563 pub fn config(&self) -> &TemporalJoinConfig {
564 &self.config
565 }
566
567 #[must_use]
569 pub fn metrics(&self) -> &TemporalJoinMetrics {
570 &self.metrics
571 }
572
573 pub fn reset_metrics(&mut self) {
575 self.metrics.reset();
576 }
577
578 #[must_use]
580 pub fn watermark(&self) -> i64 {
581 self.watermark
582 }
583
584 #[must_use]
586 pub fn table_state_size(&self) -> usize {
587 self.table_state.values().map(VersionedKeyState::len).sum()
588 }
589
590 #[must_use]
592 pub fn stream_state_size(&self) -> usize {
593 self.stream_state.values().map(Vec::len).sum()
594 }
595
596 pub fn process_stream(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
598 self.metrics.stream_events += 1;
599
600 if self.stream_schema.is_none() {
602 self.stream_schema = Some(event.data.schema());
603 self.update_output_schema();
604 }
605
606 let mut output = OutputVec::new();
607
608 let Some(key_value) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
610 return output;
611 };
612
613 let lookup_ts = match self.config.semantics {
615 TemporalJoinSemantics::EventTime => event.timestamp,
616 TemporalJoinSemantics::ProcessTime => ctx.processing_time,
617 };
618
619 if let Some(table_row) = self.lookup_table(&key_value, lookup_ts) {
621 self.metrics.matches += 1;
622
623 if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
625 if let Ok(event_data) = TableRow::serialize_batch(&event.data) {
626 let record = JoinedEventRecord {
627 event_timestamp: event.timestamp,
628 event_data,
629 table_version: table_row.version_timestamp,
630 key_value: key_value.clone(),
631 };
632 self.stream_state.entry(key_value).or_default().push(record);
633 }
634 }
635
636 if let Some(joined) = self.create_joined_event(event, &table_row) {
638 output.push(Output::Event(joined));
639 }
640 } else {
641 self.metrics.unmatched += 1;
642 if self.config.join_type.emits_unmatched() {
643 if let Some(unmatched) = self.create_unmatched_event(event) {
644 output.push(Output::Event(unmatched));
645 }
646 }
647 }
648
649 output
650 }
651
652 pub fn process_table_insert(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
654 self.metrics.table_inserts += 1;
655
656 if self.table_schema.is_none() {
658 self.table_schema = Some(event.data.schema());
659 self.update_output_schema();
660 }
661
662 let Some(key_value) = Self::extract_key(&event.data, &self.config.table_key_column) else {
664 return OutputVec::new();
665 };
666
667 let version_ts = Self::extract_timestamp(&event.data, &self.config.table_version_column)
668 .unwrap_or(event.timestamp);
669
670 let Ok(row) = TableRow::new(version_ts, key_value.clone(), &event.data) else {
672 return OutputVec::new();
673 };
674
675 let key_state = self.table_state.entry(key_value).or_default();
676 key_state.insert(row);
677
678 if self.config.max_versions_per_key > 0 {
680 key_state.limit_versions(self.config.max_versions_per_key);
681 }
682
683 OutputVec::new()
684 }
685
686 pub fn process_table_change(
689 &mut self,
690 change: &TableChange,
691 _ctx: &mut OperatorContext,
692 ) -> OutputVec {
693 if self.config.table_characteristics != TableCharacteristics::NonAppendOnly {
694 if let TableChange::Insert(row) = change {
696 let key_state = self.table_state.entry(row.key_value.clone()).or_default();
697 key_state.insert(row.clone());
698 }
699 return OutputVec::new();
700 }
701
702 let mut output = OutputVec::new();
703
704 match change {
705 TableChange::Insert(row) => {
706 self.metrics.table_inserts += 1;
707 let key_state = self.table_state.entry(row.key_value.clone()).or_default();
708 key_state.insert(row.clone());
709 }
710 TableChange::Update { old, new } => {
711 self.metrics.table_updates += 1;
712
713 self.emit_retractions_for_version(
715 &old.key_value,
716 old.version_timestamp,
717 &mut output,
718 );
719
720 if let Some(key_state) = self.table_state.get_mut(&old.key_value) {
722 key_state.remove_version(old.version_timestamp);
723 }
724 let key_state = self.table_state.entry(new.key_value.clone()).or_default();
725 key_state.insert(new.clone());
726
727 self.rejoin_affected_events(&new.key_value, new.version_timestamp, &mut output);
729 }
730 TableChange::Delete(row) => {
731 self.metrics.table_deletes += 1;
732
733 self.emit_retractions_for_version(
735 &row.key_value,
736 row.version_timestamp,
737 &mut output,
738 );
739
740 if let Some(key_state) = self.table_state.get_mut(&row.key_value) {
742 key_state.remove_version(row.version_timestamp);
743 }
744 }
745 }
746
747 output
748 }
749
750 fn emit_retractions_for_version(&mut self, key: &[u8], version: i64, output: &mut OutputVec) {
752 let Some(records) = self.stream_state.get(key) else {
753 return;
754 };
755
756 for record in records {
757 if record.table_version == version {
758 if let Ok(event_batch) = TableRow::deserialize_batch(&record.event_data) {
760 let event = Event::new(record.event_timestamp, event_batch);
761
762 if let Some(key_state) = self.table_state.get(key) {
764 if let Some((_, rows)) = key_state.versions.get_key_value(&version) {
765 if let Some(table_row) = rows.last() {
766 if let Some(joined) = self.create_joined_event(&event, table_row) {
767 output.push(Output::LateEvent(joined));
770 self.metrics.retractions += 1;
771 }
772 }
773 }
774 }
775 }
776 }
777 }
778 }
779
780 fn rejoin_affected_events(&mut self, key: &[u8], new_version: i64, output: &mut OutputVec) {
782 let events_to_rejoin: Vec<(i64, Vec<u8>)> = {
784 let Some(records) = self.stream_state.get(key) else {
785 return;
786 };
787 let Some(key_state) = self.table_state.get(key) else {
788 return;
789 };
790
791 records
792 .iter()
793 .filter_map(|record| {
794 let lookup_ts = record.event_timestamp;
795 if let Some(new_row) = key_state.lookup_at_time(lookup_ts) {
796 if new_row.version_timestamp == new_version {
797 return Some((record.event_timestamp, record.event_data.clone()));
798 }
799 }
800 None
801 })
802 .collect()
803 };
804
805 if let Some(key_state) = self.table_state.get(key) {
807 for (event_ts, event_data) in &events_to_rejoin {
808 if let Ok(event_batch) = TableRow::deserialize_batch(event_data) {
809 let event = Event::new(*event_ts, event_batch);
810 if let Some(new_row) = key_state.lookup_at_time(*event_ts) {
811 if let Some(joined) = self.create_joined_event(&event, new_row) {
812 output.push(Output::Event(joined));
813 }
814 }
815 }
816 }
817 }
818
819 if let Some(records) = self.stream_state.get_mut(key) {
821 for record in records.iter_mut() {
822 if events_to_rejoin
823 .iter()
824 .any(|(ts, _)| *ts == record.event_timestamp)
825 {
826 record.table_version = new_version;
827 }
828 }
829 }
830 }
831
832 fn lookup_table(&self, key: &[u8], timestamp: i64) -> Option<TableRow> {
834 let key_state = self.table_state.get(key)?;
835
836 match self.config.semantics {
837 TemporalJoinSemantics::EventTime => key_state.lookup_at_time(timestamp).cloned(),
838 TemporalJoinSemantics::ProcessTime => key_state.lookup_latest().cloned(),
839 }
840 }
841
842 pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
844 self.watermark = watermark;
845
846 if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
848 self.cleanup_stream_state(watermark);
849 }
850
851 OutputVec::new()
852 }
853
854 fn cleanup_stream_state(&mut self, watermark: i64) {
856 let initial_count: usize = self.stream_state.values().map(Vec::len).sum();
857
858 for records in self.stream_state.values_mut() {
859 records.retain(|r| r.event_timestamp >= watermark);
860 }
861 self.stream_state.retain(|_, v| !v.is_empty());
862
863 let final_count: usize = self.stream_state.values().map(Vec::len).sum();
864 if final_count < initial_count {
865 self.metrics.state_cleanups += (initial_count - final_count) as u64;
866 }
867 }
868
869 fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
871 let column_index = batch.schema().index_of(column_name).ok()?;
872 let column = batch.column(column_index);
873
874 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
875 if string_array.is_empty() || string_array.is_null(0) {
876 return None;
877 }
878 return Some(string_array.value(0).as_bytes().to_vec());
879 }
880
881 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
882 if int_array.is_empty() || int_array.is_null(0) {
883 return None;
884 }
885 return Some(int_array.value(0).to_le_bytes().to_vec());
886 }
887
888 None
889 }
890
891 fn extract_timestamp(batch: &RecordBatch, column_name: &str) -> Option<i64> {
893 let column_index = batch.schema().index_of(column_name).ok()?;
894 let column = batch.column(column_index);
895
896 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
897 if int_array.is_empty() || int_array.is_null(0) {
898 return None;
899 }
900 return Some(int_array.value(0));
901 }
902
903 None
904 }
905
906 #[allow(dead_code)]
909 fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
910 let mut key = TimerKey::new();
911 key.push(TEMPORAL_TIMER_PREFIX);
912 key.extend_from_slice(key_suffix);
913 key
914 }
915
916 fn update_output_schema(&mut self) {
918 if let (Some(stream), Some(table)) = (&self.stream_schema, &self.table_schema) {
919 let mut fields: Vec<Field> =
920 stream.fields().iter().map(|f| f.as_ref().clone()).collect();
921
922 for field in table.fields() {
924 let name = if stream.field_with_name(field.name()).is_ok() {
925 format!("table_{}", field.name())
926 } else {
927 field.name().clone()
928 };
929 fields.push(Field::new(
930 name,
931 field.data_type().clone(),
932 true, ));
934 }
935
936 self.output_schema = Some(Arc::new(Schema::new(fields)));
937 }
938 }
939
940 fn create_joined_event(&self, stream_event: &Event, table_row: &TableRow) -> Option<Event> {
942 let schema = self.output_schema.as_ref()?;
943 let table_batch = table_row.to_batch().ok()?;
944
945 let mut columns: Vec<ArrayRef> = stream_event.data.columns().to_vec();
946 for column in table_batch.columns() {
947 columns.push(Arc::clone(column));
948 }
949
950 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
951
952 Some(Event::new(stream_event.timestamp, joined_batch))
953 }
954
955 fn create_unmatched_event(&self, stream_event: &Event) -> Option<Event> {
957 let schema = self.output_schema.as_ref()?;
958 let table_schema = self.table_schema.as_ref()?;
959
960 let num_rows = stream_event.data.num_rows();
961 let mut columns: Vec<ArrayRef> = stream_event.data.columns().to_vec();
962
963 for field in table_schema.fields() {
965 columns.push(Self::create_null_array(field.data_type(), num_rows));
966 }
967
968 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
969
970 Some(Event::new(stream_event.timestamp, joined_batch))
971 }
972
973 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
975 match data_type {
976 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
977 DataType::Float64 => {
978 use arrow_array::Float64Array;
979 Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
980 }
981 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
982 }
983 }
984}
985
986impl Operator for TemporalJoinOperator {
987 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
988 self.process_stream(event, ctx)
990 }
991
992 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
993 if timer.key.first() == Some(&TEMPORAL_TIMER_PREFIX) {
994 self.cleanup_stream_state(timer.timestamp);
996 }
997 OutputVec::new()
998 }
999
1000 fn checkpoint(&self) -> OperatorState {
1001 let table_entries: Vec<(Vec<u8>, SerializableVersionedKeyState)> = self
1003 .table_state
1004 .iter()
1005 .map(|(k, v)| (k.clone(), v.into()))
1006 .collect();
1007
1008 let stream_entries: Vec<(Vec<u8>, Vec<JoinedEventRecord>)> = self
1010 .stream_state
1011 .iter()
1012 .map(|(k, v)| (k.clone(), v.clone()))
1013 .collect();
1014
1015 let checkpoint_data = (
1016 self.watermark,
1017 self.metrics.stream_events,
1018 self.metrics.table_inserts,
1019 self.metrics.matches,
1020 self.metrics.unmatched,
1021 self.metrics.retractions,
1022 table_entries,
1023 stream_entries,
1024 );
1025
1026 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
1027 .map(|v| v.to_vec())
1028 .unwrap_or_default();
1029
1030 OperatorState {
1031 operator_id: self.operator_id.clone(),
1032 data,
1033 }
1034 }
1035
1036 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
1037 type CheckpointData = (
1038 i64,
1039 u64,
1040 u64,
1041 u64,
1042 u64,
1043 u64,
1044 Vec<(Vec<u8>, SerializableVersionedKeyState)>,
1045 Vec<(Vec<u8>, Vec<JoinedEventRecord>)>,
1046 );
1047
1048 if state.operator_id != self.operator_id {
1049 return Err(OperatorError::StateAccessFailed(format!(
1050 "Operator ID mismatch: expected {}, got {}",
1051 self.operator_id, state.operator_id
1052 )));
1053 }
1054
1055 let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
1056 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1057 let (
1058 watermark,
1059 stream_events,
1060 table_inserts,
1061 matches,
1062 unmatched,
1063 retractions,
1064 table_entries,
1065 stream_entries,
1066 ) = rkyv::deserialize::<CheckpointData, RkyvError>(archived)
1067 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1068
1069 self.watermark = watermark;
1070 self.metrics.stream_events = stream_events;
1071 self.metrics.table_inserts = table_inserts;
1072 self.metrics.matches = matches;
1073 self.metrics.unmatched = unmatched;
1074 self.metrics.retractions = retractions;
1075
1076 self.table_state.clear();
1078 for (key, serializable) in table_entries {
1079 self.table_state.insert(key, serializable.into());
1080 }
1081
1082 self.stream_state.clear();
1084 for (key, records) in stream_entries {
1085 self.stream_state.insert(key, records);
1086 }
1087
1088 Ok(())
1089 }
1090}
1091
1092#[cfg(test)]
1093#[allow(clippy::cast_precision_loss)]
1094#[allow(clippy::unnecessary_to_owned)]
1095mod tests {
1096 use super::*;
1097 use crate::state::{InMemoryStore, StateStore};
1098 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
1099 use arrow_array::Float64Array;
1100 use arrow_schema::{DataType, Field, Schema};
1101
1102 fn create_order_event(timestamp: i64, currency: &str, amount: f64) -> Event {
1104 let schema = Arc::new(Schema::new(vec![
1105 Field::new("currency", DataType::Utf8, false),
1106 Field::new("amount", DataType::Float64, false),
1107 ]));
1108 let batch = RecordBatch::try_new(
1109 schema,
1110 vec![
1111 Arc::new(StringArray::from(vec![currency])),
1112 Arc::new(Float64Array::from(vec![amount])),
1113 ],
1114 )
1115 .unwrap();
1116 Event::new(timestamp, batch)
1117 }
1118
1119 fn create_rate_event(timestamp: i64, currency: &str, rate: f64, valid_from: i64) -> Event {
1121 let schema = Arc::new(Schema::new(vec![
1122 Field::new("currency", DataType::Utf8, false),
1123 Field::new("rate", DataType::Float64, false),
1124 Field::new("valid_from", DataType::Int64, false),
1125 ]));
1126 let batch = RecordBatch::try_new(
1127 schema,
1128 vec![
1129 Arc::new(StringArray::from(vec![currency])),
1130 Arc::new(Float64Array::from(vec![rate])),
1131 Arc::new(Int64Array::from(vec![valid_from])),
1132 ],
1133 )
1134 .unwrap();
1135 Event::new(timestamp, batch)
1136 }
1137
1138 fn create_test_context<'a>(
1139 timers: &'a mut TimerService,
1140 state: &'a mut dyn StateStore,
1141 watermark_gen: &'a mut dyn WatermarkGenerator,
1142 ) -> OperatorContext<'a> {
1143 OperatorContext {
1144 event_time: 0,
1145 processing_time: 0,
1146 timers,
1147 state,
1148 watermark_generator: watermark_gen,
1149 operator_index: 0,
1150 }
1151 }
1152
1153 #[test]
1154 fn test_temporal_join_semantics_default() {
1155 assert_eq!(
1156 TemporalJoinSemantics::default(),
1157 TemporalJoinSemantics::EventTime
1158 );
1159 }
1160
1161 #[test]
1162 fn test_table_characteristics_default() {
1163 assert_eq!(
1164 TableCharacteristics::default(),
1165 TableCharacteristics::AppendOnly
1166 );
1167 }
1168
1169 #[test]
1170 fn test_temporal_join_type_properties() {
1171 assert!(!TemporalJoinType::Inner.emits_unmatched());
1172 assert!(TemporalJoinType::Left.emits_unmatched());
1173 }
1174
1175 #[test]
1176 fn test_config_builder() {
1177 let config = TemporalJoinConfig::builder()
1178 .stream_key_column("currency".to_string())
1179 .table_key_column("currency".to_string())
1180 .table_version_column("valid_from".to_string())
1181 .semantics(TemporalJoinSemantics::EventTime)
1182 .table_characteristics(TableCharacteristics::AppendOnly)
1183 .join_type(TemporalJoinType::Left)
1184 .max_versions_per_key(100)
1185 .operator_id("test_temporal".to_string())
1186 .build();
1187
1188 assert_eq!(config.stream_key_column, "currency");
1189 assert_eq!(config.table_key_column, "currency");
1190 assert_eq!(config.table_version_column, "valid_from");
1191 assert_eq!(config.semantics, TemporalJoinSemantics::EventTime);
1192 assert_eq!(
1193 config.table_characteristics,
1194 TableCharacteristics::AppendOnly
1195 );
1196 assert_eq!(config.join_type, TemporalJoinType::Left);
1197 assert_eq!(config.max_versions_per_key, 100);
1198 }
1199
1200 #[test]
1201 fn test_event_time_temporal_join_basic() {
1202 let config = TemporalJoinConfig::builder()
1203 .stream_key_column("currency".to_string())
1204 .table_key_column("currency".to_string())
1205 .table_version_column("valid_from".to_string())
1206 .semantics(TemporalJoinSemantics::EventTime)
1207 .join_type(TemporalJoinType::Inner)
1208 .build();
1209
1210 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1211
1212 let mut timers = TimerService::new();
1213 let mut state = InMemoryStore::new();
1214 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1215
1216 let rate1 = create_rate_event(500, "USD", 1.1, 500);
1219 {
1220 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1221 operator.process_table_insert(&rate1, &mut ctx);
1222 }
1223
1224 let rate2 = create_rate_event(800, "USD", 1.2, 800);
1226 {
1227 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1228 operator.process_table_insert(&rate2, &mut ctx);
1229 }
1230
1231 let rate3 = create_rate_event(1200, "USD", 1.3, 1200);
1233 {
1234 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1235 operator.process_table_insert(&rate3, &mut ctx);
1236 }
1237
1238 let order = create_order_event(1000, "USD", 100.0);
1240 let outputs = {
1241 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1242 operator.process_stream(&order, &mut ctx)
1243 };
1244
1245 assert_eq!(
1246 outputs
1247 .iter()
1248 .filter(|o| matches!(o, Output::Event(_)))
1249 .count(),
1250 1
1251 );
1252 assert_eq!(operator.metrics().matches, 1);
1253
1254 if let Some(Output::Event(event)) = outputs.first() {
1256 assert_eq!(event.data.num_columns(), 5); }
1258 }
1259
1260 #[test]
1261 fn test_event_time_multiple_versions() {
1262 let config = TemporalJoinConfig::builder()
1263 .stream_key_column("currency".to_string())
1264 .table_key_column("currency".to_string())
1265 .table_version_column("valid_from".to_string())
1266 .semantics(TemporalJoinSemantics::EventTime)
1267 .join_type(TemporalJoinType::Inner)
1268 .build();
1269
1270 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1271
1272 let mut timers = TimerService::new();
1273 let mut state = InMemoryStore::new();
1274 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1275
1276 {
1278 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1279 operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1280 operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1281 operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1282 }
1283
1284 let order1 = create_order_event(150, "USD", 100.0);
1286 let outputs1 = {
1287 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1288 operator.process_stream(&order1, &mut ctx)
1289 };
1290 assert_eq!(
1291 outputs1
1292 .iter()
1293 .filter(|o| matches!(o, Output::Event(_)))
1294 .count(),
1295 1
1296 );
1297
1298 let order2 = create_order_event(250, "USD", 100.0);
1300 let outputs2 = {
1301 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1302 operator.process_stream(&order2, &mut ctx)
1303 };
1304 assert_eq!(
1305 outputs2
1306 .iter()
1307 .filter(|o| matches!(o, Output::Event(_)))
1308 .count(),
1309 1
1310 );
1311
1312 let order3 = create_order_event(350, "USD", 100.0);
1314 let outputs3 = {
1315 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1316 operator.process_stream(&order3, &mut ctx)
1317 };
1318 assert_eq!(
1319 outputs3
1320 .iter()
1321 .filter(|o| matches!(o, Output::Event(_)))
1322 .count(),
1323 1
1324 );
1325
1326 assert_eq!(operator.metrics().matches, 3);
1327 }
1328
1329 #[test]
1330 fn test_no_match_before_first_version() {
1331 let config = TemporalJoinConfig::builder()
1332 .stream_key_column("currency".to_string())
1333 .table_key_column("currency".to_string())
1334 .table_version_column("valid_from".to_string())
1335 .semantics(TemporalJoinSemantics::EventTime)
1336 .join_type(TemporalJoinType::Inner)
1337 .build();
1338
1339 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1340
1341 let mut timers = TimerService::new();
1342 let mut state = InMemoryStore::new();
1343 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1344
1345 {
1347 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1348 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1349 }
1350
1351 let order = create_order_event(400, "USD", 100.0);
1353 let outputs = {
1354 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1355 operator.process_stream(&order, &mut ctx)
1356 };
1357
1358 assert_eq!(outputs.len(), 0);
1360 assert_eq!(operator.metrics().unmatched, 1);
1361 }
1362
1363 #[test]
1364 fn test_left_join_no_match() {
1365 let config = TemporalJoinConfig::builder()
1366 .stream_key_column("currency".to_string())
1367 .table_key_column("currency".to_string())
1368 .table_version_column("valid_from".to_string())
1369 .semantics(TemporalJoinSemantics::EventTime)
1370 .join_type(TemporalJoinType::Left)
1371 .build();
1372
1373 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1374
1375 let mut timers = TimerService::new();
1376 let mut state = InMemoryStore::new();
1377 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1378
1379 {
1381 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1382 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1383 }
1384 {
1385 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1386 operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1387 }
1388
1389 let order = create_order_event(700, "EUR", 100.0);
1391 let outputs = {
1392 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1393 operator.process_stream(&order, &mut ctx)
1394 };
1395
1396 assert_eq!(
1398 outputs
1399 .iter()
1400 .filter(|o| matches!(o, Output::Event(_)))
1401 .count(),
1402 1
1403 );
1404 assert_eq!(operator.metrics().unmatched, 1);
1405
1406 if let Some(Output::Event(event)) = outputs.first() {
1407 assert_eq!(event.data.num_columns(), 5); }
1409 }
1410
1411 #[test]
1412 fn test_process_time_semantics() {
1413 let config = TemporalJoinConfig::builder()
1414 .stream_key_column("currency".to_string())
1415 .table_key_column("currency".to_string())
1416 .table_version_column("valid_from".to_string())
1417 .semantics(TemporalJoinSemantics::ProcessTime) .join_type(TemporalJoinType::Inner)
1419 .build();
1420
1421 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1422
1423 let mut timers = TimerService::new();
1424 let mut state = InMemoryStore::new();
1425 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1426
1427 {
1429 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1430 operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1431 operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1432 operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1433 }
1434
1435 let order = create_order_event(150, "USD", 100.0);
1438 let outputs = {
1439 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1440 ctx.processing_time = 1000; operator.process_stream(&order, &mut ctx)
1442 };
1443
1444 assert_eq!(
1445 outputs
1446 .iter()
1447 .filter(|o| matches!(o, Output::Event(_)))
1448 .count(),
1449 1
1450 );
1451 }
1452
1453 #[test]
1454 fn test_append_only_no_stream_state() {
1455 let config = TemporalJoinConfig::builder()
1456 .stream_key_column("currency".to_string())
1457 .table_key_column("currency".to_string())
1458 .table_version_column("valid_from".to_string())
1459 .table_characteristics(TableCharacteristics::AppendOnly)
1460 .build();
1461
1462 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1463
1464 let mut timers = TimerService::new();
1465 let mut state = InMemoryStore::new();
1466 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1467
1468 {
1470 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1471 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1472 }
1473
1474 let order = create_order_event(600, "USD", 100.0);
1476 {
1477 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1478 operator.process_stream(&order, &mut ctx);
1479 }
1480
1481 assert_eq!(operator.stream_state_size(), 0);
1483 }
1484
1485 #[test]
1486 fn test_non_append_only_tracks_stream_state() {
1487 let config = TemporalJoinConfig::builder()
1488 .stream_key_column("currency".to_string())
1489 .table_key_column("currency".to_string())
1490 .table_version_column("valid_from".to_string())
1491 .table_characteristics(TableCharacteristics::NonAppendOnly)
1492 .build();
1493
1494 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1495
1496 let mut timers = TimerService::new();
1497 let mut state = InMemoryStore::new();
1498 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1499
1500 {
1502 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1503 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1504 }
1505
1506 let order = create_order_event(600, "USD", 100.0);
1508 {
1509 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1510 operator.process_stream(&order, &mut ctx);
1511 }
1512
1513 assert_eq!(operator.stream_state_size(), 1);
1515 }
1516
1517 #[test]
1518 fn test_table_delete_emits_retraction() {
1519 let config = TemporalJoinConfig::builder()
1520 .stream_key_column("currency".to_string())
1521 .table_key_column("currency".to_string())
1522 .table_version_column("valid_from".to_string())
1523 .table_characteristics(TableCharacteristics::NonAppendOnly)
1524 .build();
1525
1526 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1527
1528 let mut timers = TimerService::new();
1529 let mut state = InMemoryStore::new();
1530 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1531
1532 let rate = create_rate_event(500, "USD", 1.1, 500);
1534 {
1535 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1536 operator.process_table_insert(&rate, &mut ctx);
1537 }
1538
1539 let order = create_order_event(600, "USD", 100.0);
1540 {
1541 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1542 operator.process_stream(&order, &mut ctx);
1543 }
1544
1545 let table_row = TableRow::new(500, b"USD".to_vec(), &rate.data).unwrap();
1547
1548 let change = TableChange::Delete(table_row);
1550 let outputs = {
1551 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1552 operator.process_table_change(&change, &mut ctx)
1553 };
1554
1555 assert_eq!(operator.metrics().table_deletes, 1);
1557 assert!(
1558 operator.metrics().retractions >= 1
1559 || outputs.iter().any(|o| matches!(o, Output::LateEvent(_)))
1560 );
1561 }
1562
1563 #[test]
1564 fn test_multiple_keys() {
1565 let config = TemporalJoinConfig::builder()
1566 .stream_key_column("currency".to_string())
1567 .table_key_column("currency".to_string())
1568 .table_version_column("valid_from".to_string())
1569 .build();
1570
1571 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1572
1573 let mut timers = TimerService::new();
1574 let mut state = InMemoryStore::new();
1575 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1576
1577 {
1579 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1580 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1581 operator.process_table_insert(&create_rate_event(500, "EUR", 0.9, 500), &mut ctx);
1582 operator.process_table_insert(&create_rate_event(500, "GBP", 0.8, 500), &mut ctx);
1583 }
1584
1585 {
1587 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1588 let outputs1 =
1589 operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1590 assert_eq!(
1591 outputs1
1592 .iter()
1593 .filter(|o| matches!(o, Output::Event(_)))
1594 .count(),
1595 1
1596 );
1597 }
1598
1599 {
1600 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1601 let outputs2 =
1602 operator.process_stream(&create_order_event(600, "EUR", 100.0), &mut ctx);
1603 assert_eq!(
1604 outputs2
1605 .iter()
1606 .filter(|o| matches!(o, Output::Event(_)))
1607 .count(),
1608 1
1609 );
1610 }
1611
1612 assert_eq!(operator.metrics().matches, 2);
1613 }
1614
1615 #[test]
1616 fn test_max_versions_per_key() {
1617 let config = TemporalJoinConfig::builder()
1618 .stream_key_column("currency".to_string())
1619 .table_key_column("currency".to_string())
1620 .table_version_column("valid_from".to_string())
1621 .max_versions_per_key(2) .build();
1623
1624 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1625
1626 let mut timers = TimerService::new();
1627 let mut state = InMemoryStore::new();
1628 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1629
1630 for i in 0..5 {
1632 let rate =
1633 create_rate_event(100 * (i + 1), "USD", 1.0 + (i as f64) * 0.1, 100 * (i + 1));
1634 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1635 operator.process_table_insert(&rate, &mut ctx);
1636 }
1637
1638 assert_eq!(operator.table_state_size(), 2);
1640
1641 let key_state = operator.table_state.get(&b"USD".to_vec()).unwrap();
1643 assert!(key_state.lookup_at_time(400).is_some());
1644 assert!(key_state.lookup_at_time(500).is_some());
1645 assert!(key_state.lookup_at_time(100).is_none());
1647 }
1648
1649 #[test]
1650 fn test_checkpoint_restore() {
1651 let config = TemporalJoinConfig::builder()
1652 .stream_key_column("currency".to_string())
1653 .table_key_column("currency".to_string())
1654 .table_version_column("valid_from".to_string())
1655 .table_characteristics(TableCharacteristics::NonAppendOnly)
1656 .build();
1657
1658 let mut operator =
1659 TemporalJoinOperator::with_id(config.clone(), "test_temporal".to_string());
1660
1661 let mut timers = TimerService::new();
1662 let mut state = InMemoryStore::new();
1663 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1664
1665 {
1667 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1668 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1669 operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1670 }
1671
1672 {
1673 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1674 operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1675 }
1676
1677 operator.watermark = 500;
1678 operator.metrics.matches = 10;
1679 operator.metrics.retractions = 2;
1680
1681 let checkpoint = operator.checkpoint();
1683
1684 let mut restored = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1686 restored.restore(checkpoint).unwrap();
1687
1688 assert_eq!(restored.watermark(), 500);
1690 assert_eq!(restored.metrics().matches, 10);
1691 assert_eq!(restored.metrics().retractions, 2);
1692 assert_eq!(restored.table_state_size(), 2);
1693 assert_eq!(restored.stream_state_size(), 1);
1694 }
1695
1696 #[test]
1697 fn test_schema_composition() {
1698 let config = TemporalJoinConfig::builder()
1699 .stream_key_column("currency".to_string())
1700 .table_key_column("currency".to_string())
1701 .table_version_column("valid_from".to_string())
1702 .build();
1703
1704 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1705
1706 let mut timers = TimerService::new();
1707 let mut state = InMemoryStore::new();
1708 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1709
1710 {
1712 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1713 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1714 }
1715
1716 let order = create_order_event(600, "USD", 100.0);
1718 let outputs = {
1719 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1720 operator.process_stream(&order, &mut ctx)
1721 };
1722
1723 assert_eq!(outputs.len(), 1);
1724
1725 if let Some(Output::Event(event)) = outputs.first() {
1726 let schema = event.data.schema();
1727
1728 assert!(schema.field_with_name("amount").is_ok());
1730
1731 assert!(schema.field_with_name("table_currency").is_ok());
1733 assert!(schema.field_with_name("rate").is_ok());
1734 assert!(schema.field_with_name("valid_from").is_ok());
1735 }
1736 }
1737
1738 #[test]
1739 fn test_metrics_tracking() {
1740 let config = TemporalJoinConfig::builder()
1741 .stream_key_column("currency".to_string())
1742 .table_key_column("currency".to_string())
1743 .table_version_column("valid_from".to_string())
1744 .build();
1745
1746 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1747
1748 let mut timers = TimerService::new();
1749 let mut state = InMemoryStore::new();
1750 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1751
1752 {
1754 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1755 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1756 operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1757 }
1758
1759 {
1760 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1761 operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1762 operator.process_stream(&create_order_event(650, "USD", 200.0), &mut ctx);
1763 }
1764
1765 assert_eq!(operator.metrics().table_inserts, 2);
1766 assert_eq!(operator.metrics().stream_events, 2);
1767 assert_eq!(operator.metrics().matches, 2);
1768 }
1769
1770 #[test]
1771 fn test_versioned_key_state_operations() {
1772 let mut key_state = VersionedKeyState::new();
1773 assert!(key_state.is_empty());
1774
1775 let row1 = TableRow {
1777 version_timestamp: 100,
1778 key_value: b"test".to_vec(),
1779 data: vec![],
1780 };
1781 let row2 = TableRow {
1782 version_timestamp: 200,
1783 key_value: b"test".to_vec(),
1784 data: vec![],
1785 };
1786 let row3 = TableRow {
1787 version_timestamp: 300,
1788 key_value: b"test".to_vec(),
1789 data: vec![],
1790 };
1791
1792 key_state.insert(row1);
1793 key_state.insert(row2);
1794 key_state.insert(row3);
1795
1796 assert_eq!(key_state.len(), 3);
1797 assert_eq!(key_state.min_version, 100);
1798 assert_eq!(key_state.max_version, 300);
1799
1800 assert!(key_state.lookup_at_time(50).is_none()); assert_eq!(
1803 key_state.lookup_at_time(150).unwrap().version_timestamp,
1804 100
1805 );
1806 assert_eq!(
1807 key_state.lookup_at_time(250).unwrap().version_timestamp,
1808 200
1809 );
1810 assert_eq!(
1811 key_state.lookup_at_time(350).unwrap().version_timestamp,
1812 300
1813 );
1814
1815 assert_eq!(key_state.lookup_latest().unwrap().version_timestamp, 300);
1817
1818 key_state.cleanup_before(200);
1820 assert_eq!(key_state.len(), 2);
1821 assert_eq!(key_state.min_version, 200);
1822
1823 key_state.remove_version(200);
1825 assert_eq!(key_state.len(), 1);
1826 }
1827
1828 #[test]
1829 fn test_version_limiting() {
1830 let mut key_state = VersionedKeyState::new();
1831
1832 for i in 0..10 {
1834 key_state.insert(TableRow {
1835 version_timestamp: 100 * (i + 1),
1836 key_value: b"test".to_vec(),
1837 data: vec![],
1838 });
1839 }
1840
1841 assert_eq!(key_state.len(), 10);
1842
1843 key_state.limit_versions(3);
1845 assert_eq!(key_state.len(), 3);
1846
1847 assert!(key_state.lookup_at_time(700).is_none());
1849 assert!(key_state.lookup_at_time(800).is_some());
1850 }
1851
1852 #[test]
1853 fn test_metrics_reset() {
1854 let mut metrics = TemporalJoinMetrics::new();
1855 metrics.stream_events = 100;
1856 metrics.matches = 50;
1857 metrics.retractions = 5;
1858
1859 metrics.reset();
1860
1861 assert_eq!(metrics.stream_events, 0);
1862 assert_eq!(metrics.matches, 0);
1863 assert_eq!(metrics.retractions, 0);
1864 }
1865
1866 #[test]
1867 fn test_table_row_serialization() {
1868 let schema = Arc::new(Schema::new(vec![
1869 Field::new("currency", DataType::Utf8, false),
1870 Field::new("rate", DataType::Float64, false),
1871 ]));
1872 let batch = RecordBatch::try_new(
1873 schema,
1874 vec![
1875 Arc::new(StringArray::from(vec!["USD"])),
1876 Arc::new(Float64Array::from(vec![1.25])),
1877 ],
1878 )
1879 .unwrap();
1880
1881 let row = TableRow::new(1000, b"USD".to_vec(), &batch).unwrap();
1882
1883 let restored_batch = row.to_batch().unwrap();
1885 assert_eq!(restored_batch.num_rows(), 1);
1886 assert_eq!(restored_batch.num_columns(), 2);
1887 }
1888
1889 #[test]
1890 fn test_stream_state_cleanup() {
1891 let config = TemporalJoinConfig::builder()
1892 .stream_key_column("currency".to_string())
1893 .table_key_column("currency".to_string())
1894 .table_version_column("valid_from".to_string())
1895 .table_characteristics(TableCharacteristics::NonAppendOnly)
1896 .build();
1897
1898 let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1899
1900 let mut timers = TimerService::new();
1901 let mut state = InMemoryStore::new();
1902 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1903
1904 {
1906 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1907 operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1908 }
1909
1910 for i in 0..5 {
1912 let order = create_order_event(600 + i * 100, "USD", 100.0);
1913 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1914 operator.process_stream(&order, &mut ctx);
1915 }
1916
1917 assert_eq!(operator.stream_state_size(), 5);
1918
1919 {
1921 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1922 operator.on_watermark(900, &mut ctx);
1923 }
1924
1925 assert!(operator.stream_state_size() < 5);
1927 assert!(operator.metrics().state_cleanups > 0);
1928 }
1929}