1use super::{
90 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
91 TimerKey,
92};
93use crate::state::StateStoreExt;
94use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
95use arrow_schema::{DataType, Field, Schema, SchemaRef};
96use fxhash::FxHashMap;
97use rkyv::{
98 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
99};
100use std::sync::atomic::{AtomicU64, Ordering};
101use std::sync::Arc;
102use std::time::Duration;
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
106pub enum LookupJoinType {
107 #[default]
109 Inner,
110 Left,
112}
113
114impl LookupJoinType {
115 #[must_use]
117 pub fn emits_on_miss(&self) -> bool {
118 matches!(self, LookupJoinType::Left)
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct LookupJoinConfig {
125 pub stream_key_column: String,
127 pub lookup_key_column: String,
129 pub cache_ttl: Duration,
131 pub join_type: LookupJoinType,
133 pub max_cache_size: usize,
135 pub operator_id: Option<String>,
137}
138
139impl LookupJoinConfig {
140 #[must_use]
142 pub fn builder() -> LookupJoinConfigBuilder {
143 LookupJoinConfigBuilder::default()
144 }
145}
146
147#[derive(Debug, Default)]
149pub struct LookupJoinConfigBuilder {
150 stream_key_column: Option<String>,
151 lookup_key_column: Option<String>,
152 cache_ttl: Option<Duration>,
153 join_type: Option<LookupJoinType>,
154 max_cache_size: Option<usize>,
155 operator_id: Option<String>,
156}
157
158impl LookupJoinConfigBuilder {
159 #[must_use]
161 pub fn stream_key_column(mut self, column: String) -> Self {
162 self.stream_key_column = Some(column);
163 self
164 }
165
166 #[must_use]
168 pub fn lookup_key_column(mut self, column: String) -> Self {
169 self.lookup_key_column = Some(column);
170 self
171 }
172
173 #[must_use]
175 pub fn cache_ttl(mut self, ttl: Duration) -> Self {
176 self.cache_ttl = Some(ttl);
177 self
178 }
179
180 #[must_use]
182 pub fn join_type(mut self, join_type: LookupJoinType) -> Self {
183 self.join_type = Some(join_type);
184 self
185 }
186
187 #[must_use]
189 pub fn max_cache_size(mut self, size: usize) -> Self {
190 self.max_cache_size = Some(size);
191 self
192 }
193
194 #[must_use]
196 pub fn operator_id(mut self, id: String) -> Self {
197 self.operator_id = Some(id);
198 self
199 }
200
201 pub fn build(self) -> Result<LookupJoinConfig, OperatorError> {
208 Ok(LookupJoinConfig {
209 stream_key_column: self.stream_key_column.ok_or_else(|| {
210 OperatorError::ConfigError("stream_key_column is required".into())
211 })?,
212 lookup_key_column: self.lookup_key_column.ok_or_else(|| {
213 OperatorError::ConfigError("lookup_key_column is required".into())
214 })?,
215 cache_ttl: self.cache_ttl.unwrap_or(Duration::from_secs(300)),
216 join_type: self.join_type.unwrap_or_default(),
217 max_cache_size: self.max_cache_size.unwrap_or(0),
218 operator_id: self.operator_id,
219 })
220 }
221}
222
223const CACHE_STATE_PREFIX: &[u8; 4] = b"lkc:";
225
226const CACHE_TIMER_PREFIX: u8 = 0x40;
228
229static LOOKUP_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
231
232#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
234pub struct CacheEntry {
235 pub inserted_at: i64,
237 pub found: bool,
239 pub data: Vec<u8>,
241}
242
243impl CacheEntry {
244 fn found(inserted_at: i64, batch: &RecordBatch) -> Result<Self, OperatorError> {
246 let data = Self::serialize_batch(batch)?;
247 Ok(Self {
248 inserted_at,
249 found: true,
250 data,
251 })
252 }
253
254 fn not_found(inserted_at: i64) -> Self {
256 Self {
257 inserted_at,
258 found: false,
259 data: Vec::new(),
260 }
261 }
262
263 fn is_expired(&self, now: i64, ttl_us: i64) -> bool {
265 now - self.inserted_at > ttl_us
266 }
267
268 fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
270 let mut buf = Vec::new();
271 {
272 let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
273 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
274 writer
275 .write(batch)
276 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
277 writer
278 .finish()
279 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
280 }
281 Ok(buf)
282 }
283
284 fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
286 let cursor = std::io::Cursor::new(data);
287 let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
288 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
289 reader
290 .next()
291 .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
292 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
293 }
294
295 pub fn to_batch(&self) -> Result<Option<RecordBatch>, OperatorError> {
301 if self.found {
302 Ok(Some(Self::deserialize_batch(&self.data)?))
303 } else {
304 Ok(None)
305 }
306 }
307}
308
309#[derive(Debug, Clone, Default)]
311pub struct LookupJoinMetrics {
312 pub events_processed: u64,
314 pub cache_hits: u64,
316 pub cache_misses: u64,
318 pub lookups_found: u64,
320 pub lookups_not_found: u64,
322 pub lookup_errors: u64,
324 pub events_emitted: u64,
326 pub events_dropped: u64,
328 pub cache_expirations: u64,
330}
331
332impl LookupJoinMetrics {
333 #[must_use]
335 pub fn new() -> Self {
336 Self::default()
337 }
338
339 pub fn reset(&mut self) {
341 *self = Self::default();
342 }
343}
344
345#[derive(Debug, Clone)]
351pub enum SyncLookupResult {
352 CacheHit(Option<RecordBatch>),
354 CacheMiss,
356}
357
358pub struct LookupJoinOperator {
373 config: LookupJoinConfig,
375 operator_id: String,
377 metrics: LookupJoinMetrics,
379 output_schema: Option<SchemaRef>,
381 stream_schema: Option<SchemaRef>,
383 lookup_schema: Option<SchemaRef>,
385 cache_ttl_us: i64,
387 pending_keys: Vec<Vec<u8>>,
389 pending_events: Vec<(Event, Vec<u8>)>,
391 batch_cache: FxHashMap<Vec<u8>, Option<RecordBatch>>,
394}
395
396impl LookupJoinOperator {
397 #[must_use]
399 #[allow(clippy::cast_possible_truncation)] pub fn new(config: LookupJoinConfig) -> Self {
401 let operator_id = config.operator_id.clone().unwrap_or_else(|| {
402 let num = LOOKUP_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
403 format!("lookup_join_{num}")
404 });
405
406 let cache_ttl_us = config.cache_ttl.as_micros() as i64;
407
408 Self {
409 config,
410 operator_id,
411 metrics: LookupJoinMetrics::new(),
412 output_schema: None,
413 stream_schema: None,
414 lookup_schema: None,
415 cache_ttl_us,
416 pending_keys: Vec::new(),
417 pending_events: Vec::new(),
418 batch_cache: FxHashMap::default(),
419 }
420 }
421
422 #[must_use]
424 pub fn with_id(mut config: LookupJoinConfig, operator_id: String) -> Self {
425 config.operator_id = Some(operator_id);
426 Self::new(config)
427 }
428
429 #[must_use]
431 pub fn config(&self) -> &LookupJoinConfig {
432 &self.config
433 }
434
435 #[must_use]
437 pub fn metrics(&self) -> &LookupJoinMetrics {
438 &self.metrics
439 }
440
441 pub fn reset_metrics(&mut self) {
443 self.metrics.reset();
444 }
445
446 #[must_use]
451 pub fn pending_lookups(&self) -> &[Vec<u8>] {
452 &self.pending_keys
453 }
454
455 pub fn provide_lookup(
464 &mut self,
465 key: &[u8],
466 result: Option<&RecordBatch>,
467 ctx: &mut OperatorContext,
468 ) -> OutputVec {
469 let mut output = OutputVec::new();
470
471 let cache_key = Self::make_cache_key(key);
473 let entry = if let Some(batch) = result {
474 if self.lookup_schema.is_none() {
476 self.lookup_schema = Some(batch.schema());
477 self.update_output_schema();
478 }
479 self.metrics.lookups_found += 1;
480 let Ok(e) = CacheEntry::found(ctx.processing_time, batch) else {
481 self.metrics.lookup_errors += 1;
482 return output;
483 };
484 e
485 } else {
486 self.metrics.lookups_not_found += 1;
487 CacheEntry::not_found(ctx.processing_time)
488 };
489
490 if ctx.state.put_typed(&cache_key, &entry).is_err() {
492 return output;
493 }
494
495 self.batch_cache.insert(cache_key.clone(), result.cloned());
497
498 let expiry_time = ctx.processing_time + self.cache_ttl_us;
500 let timer_key = Self::make_timer_key(&cache_key);
501 ctx.timers
502 .register_timer(expiry_time, Some(timer_key), Some(ctx.operator_index));
503
504 let events_to_process: Vec<_> = self
506 .pending_events
507 .iter()
508 .filter(|(_, k)| k == key)
509 .map(|(e, _)| e.clone())
510 .collect();
511
512 self.pending_events.retain(|(_, k)| k != key);
513 self.pending_keys.retain(|k| k != key);
514
515 for event in events_to_process {
516 if let Some(joined) = self.create_joined_event(&event, result) {
517 self.metrics.events_emitted += 1;
518 output.push(Output::Event(joined));
519 } else if self.config.join_type.emits_on_miss() {
520 if let Some(joined) = self.create_unmatched_event(&event) {
522 self.metrics.events_emitted += 1;
523 output.push(Output::Event(joined));
524 }
525 } else {
526 self.metrics.events_dropped += 1;
527 }
528 }
529
530 output
531 }
532
533 pub fn process_with_lookup<F>(
538 &mut self,
539 event: &Event,
540 ctx: &mut OperatorContext,
541 lookup_fn: F,
542 ) -> OutputVec
543 where
544 F: FnOnce(&[u8]) -> Option<RecordBatch>,
545 {
546 self.metrics.events_processed += 1;
547
548 if self.stream_schema.is_none() {
550 self.stream_schema = Some(event.data.schema());
551 }
552
553 let Some(key) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
555 return OutputVec::new();
556 };
557
558 let cache_key = Self::make_cache_key(&key);
560 if let Some(cached) = self.lookup_cache(&cache_key, ctx) {
561 self.metrics.cache_hits += 1;
562 return self.emit_result(event, cached.as_ref(), ctx);
563 }
564
565 self.metrics.cache_misses += 1;
567 let result = lookup_fn(&key);
568
569 let entry = if let Some(ref batch) = result {
571 if self.lookup_schema.is_none() {
572 self.lookup_schema = Some(batch.schema());
573 self.update_output_schema();
574 }
575 self.metrics.lookups_found += 1;
576 let Ok(e) = CacheEntry::found(ctx.processing_time, batch) else {
577 self.metrics.lookup_errors += 1;
578 return OutputVec::new();
579 };
580 e
581 } else {
582 self.metrics.lookups_not_found += 1;
583 CacheEntry::not_found(ctx.processing_time)
584 };
585
586 if ctx.state.put_typed(&cache_key, &entry).is_ok() {
587 self.batch_cache.insert(cache_key.clone(), result.clone());
589
590 let expiry_time = ctx.processing_time + self.cache_ttl_us;
592 let timer_key = Self::make_timer_key(&cache_key);
593 ctx.timers
594 .register_timer(expiry_time, Some(timer_key), Some(ctx.operator_index));
595 }
596
597 self.emit_result(event, result.as_ref(), ctx)
598 }
599
600 #[allow(clippy::option_option)]
610 fn lookup_cache(
611 &mut self,
612 cache_key: &[u8],
613 ctx: &OperatorContext,
614 ) -> Option<Option<RecordBatch>> {
615 let entry: CacheEntry = ctx.state.get_typed(cache_key).ok()??;
616
617 if entry.is_expired(ctx.processing_time, self.cache_ttl_us) {
619 self.batch_cache.remove(cache_key);
620 return None;
621 }
622
623 if let Some(cached) = self.batch_cache.get(cache_key) {
625 return Some(cached.clone());
626 }
627
628 let result = entry.to_batch().ok()?;
630 self.batch_cache.insert(cache_key.to_vec(), result.clone());
631 Some(result)
632 }
633
634 fn emit_result(
636 &mut self,
637 event: &Event,
638 lookup_result: Option<&RecordBatch>,
639 _ctx: &mut OperatorContext,
640 ) -> OutputVec {
641 let mut output = OutputVec::new();
642
643 if let Some(lookup_batch) = lookup_result {
644 if let Some(joined) = self.create_joined_event(event, Some(lookup_batch)) {
646 self.metrics.events_emitted += 1;
647 output.push(Output::Event(joined));
648 }
649 } else if self.config.join_type.emits_on_miss() {
650 if let Some(joined) = self.create_unmatched_event(event) {
652 self.metrics.events_emitted += 1;
653 output.push(Output::Event(joined));
654 }
655 } else {
656 self.metrics.events_dropped += 1;
658 }
659
660 output
661 }
662
663 fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
665 let column_index = batch.schema().index_of(column_name).ok()?;
666 let column = batch.column(column_index);
667
668 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
670 if string_array.is_empty() || string_array.is_null(0) {
671 return None;
672 }
673 return Some(string_array.value(0).as_bytes().to_vec());
674 }
675
676 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
677 if int_array.is_empty() || int_array.is_null(0) {
678 return None;
679 }
680 return Some(int_array.value(0).to_le_bytes().to_vec());
681 }
682
683 None
684 }
685
686 fn make_cache_key(key: &[u8]) -> Vec<u8> {
688 let key_hash = fxhash::hash64(key);
689 let mut cache_key = Vec::with_capacity(12);
690 cache_key.extend_from_slice(CACHE_STATE_PREFIX);
691 cache_key.extend_from_slice(&key_hash.to_be_bytes());
692 cache_key
693 }
694
695 fn make_timer_key(cache_key: &[u8]) -> TimerKey {
697 let mut key = TimerKey::new();
698 key.push(CACHE_TIMER_PREFIX);
699 key.extend_from_slice(cache_key);
700 key
701 }
702
703 fn parse_timer_key(key: &[u8]) -> Option<Vec<u8>> {
705 if key.is_empty() || key[0] != CACHE_TIMER_PREFIX {
706 return None;
707 }
708 Some(key[1..].to_vec())
709 }
710
711 fn update_output_schema(&mut self) {
713 if let (Some(stream), Some(lookup)) = (&self.stream_schema, &self.lookup_schema) {
714 let mut fields: Vec<Field> =
715 stream.fields().iter().map(|f| f.as_ref().clone()).collect();
716
717 for field in lookup.fields() {
719 let name = if stream.field_with_name(field.name()).is_ok() {
720 format!("lookup_{}", field.name())
721 } else {
722 field.name().clone()
723 };
724 fields.push(Field::new(
725 name,
726 field.data_type().clone(),
727 true, ));
729 }
730
731 self.output_schema = Some(Arc::new(Schema::new(fields)));
732 }
733 }
734
735 fn create_joined_event(&self, event: &Event, lookup: Option<&RecordBatch>) -> Option<Event> {
737 let lookup_batch = lookup?;
738 let schema = self.output_schema.as_ref()?;
739
740 let mut columns: Vec<ArrayRef> = event.data.columns().to_vec();
741 for column in lookup_batch.columns() {
742 columns.push(Arc::clone(column));
743 }
744
745 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
746
747 Some(Event::new(event.timestamp, joined_batch))
748 }
749
750 fn create_unmatched_event(&self, event: &Event) -> Option<Event> {
752 let schema = self.output_schema.as_ref()?;
753 let lookup_schema = self.lookup_schema.as_ref()?;
754
755 let num_rows = event.data.num_rows();
756 let mut columns: Vec<ArrayRef> = event.data.columns().to_vec();
757
758 for field in lookup_schema.fields() {
760 columns.push(Self::create_null_array(field.data_type(), num_rows));
761 }
762
763 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
764
765 Some(Event::new(event.timestamp, joined_batch))
766 }
767
768 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
770 match data_type {
771 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
772 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
774 }
775 }
776
777 fn handle_cache_expiry(&mut self, cache_key: &[u8], ctx: &mut OperatorContext) -> OutputVec {
779 if ctx.state.delete(cache_key).is_ok() {
780 self.batch_cache.remove(cache_key);
781 self.metrics.cache_expirations += 1;
782 }
783 OutputVec::new()
784 }
785}
786
787impl Operator for LookupJoinOperator {
788 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
789 self.metrics.events_processed += 1;
790
791 if self.stream_schema.is_none() {
793 self.stream_schema = Some(event.data.schema());
794 }
795
796 let Some(key) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
798 return OutputVec::new();
799 };
800
801 let cache_key = Self::make_cache_key(&key);
803 if let Some(cached) = self.lookup_cache(&cache_key, ctx) {
804 self.metrics.cache_hits += 1;
805 return self.emit_result(event, cached.as_ref(), ctx);
806 }
807
808 self.metrics.cache_misses += 1;
810 if !self.pending_keys.contains(&key) {
811 self.pending_keys.push(key.clone());
812 }
813 self.pending_events.push((event.clone(), key));
814
815 OutputVec::new()
816 }
817
818 fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
819 if let Some(cache_key) = Self::parse_timer_key(&timer.key) {
820 return self.handle_cache_expiry(&cache_key, ctx);
821 }
822 OutputVec::new()
823 }
824
825 fn checkpoint(&self) -> OperatorState {
826 let checkpoint_data = (
827 self.config.stream_key_column.clone(),
828 self.config.lookup_key_column.clone(),
829 self.metrics.events_processed,
830 self.metrics.cache_hits,
831 self.metrics.cache_misses,
832 self.metrics.lookups_found,
833 self.metrics.lookups_not_found,
834 );
835
836 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
837 .map(|v| v.to_vec())
838 .unwrap_or_default();
839
840 OperatorState {
841 operator_id: self.operator_id.clone(),
842 data,
843 }
844 }
845
846 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
847 type CheckpointData = (String, String, u64, u64, u64, u64, u64);
848
849 if state.operator_id != self.operator_id {
850 return Err(OperatorError::StateAccessFailed(format!(
851 "Operator ID mismatch: expected {}, got {}",
852 self.operator_id, state.operator_id
853 )));
854 }
855
856 let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
857 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
858 let (_, _, events_processed, cache_hits, cache_misses, lookups_found, lookups_not_found) =
859 rkyv::deserialize::<CheckpointData, RkyvError>(archived)
860 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
861
862 self.metrics.events_processed = events_processed;
863 self.metrics.cache_hits = cache_hits;
864 self.metrics.cache_misses = cache_misses;
865 self.metrics.lookups_found = lookups_found;
866 self.metrics.lookups_not_found = lookups_not_found;
867 self.batch_cache.clear();
868
869 Ok(())
870 }
871}
872
873#[cfg(test)]
874mod tests {
875 use super::*;
876 use crate::state::{InMemoryStore, StateStore};
877 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
878 use arrow_schema::{DataType, Field, Schema};
879 use std::collections::HashMap;
880
881 fn create_order_event(timestamp: i64, customer_id: &str, amount: i64) -> Event {
882 let schema = Arc::new(Schema::new(vec![
883 Field::new("customer_id", DataType::Utf8, false),
884 Field::new("amount", DataType::Int64, false),
885 ]));
886 let batch = RecordBatch::try_new(
887 schema,
888 vec![
889 Arc::new(StringArray::from(vec![customer_id])),
890 Arc::new(Int64Array::from(vec![amount])),
891 ],
892 )
893 .unwrap();
894 Event::new(timestamp, batch)
895 }
896
897 fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
898 let schema = Arc::new(Schema::new(vec![
899 Field::new("id", DataType::Utf8, false),
900 Field::new("name", DataType::Utf8, false),
901 Field::new("tier", DataType::Utf8, false),
902 ]));
903 RecordBatch::try_new(
904 schema,
905 vec![
906 Arc::new(StringArray::from(vec![id])),
907 Arc::new(StringArray::from(vec![name])),
908 Arc::new(StringArray::from(vec![tier])),
909 ],
910 )
911 .unwrap()
912 }
913
914 fn create_test_context<'a>(
915 timers: &'a mut TimerService,
916 state: &'a mut dyn StateStore,
917 watermark_gen: &'a mut dyn WatermarkGenerator,
918 ) -> OperatorContext<'a> {
919 OperatorContext {
920 event_time: 0,
921 processing_time: 1_000_000, timers,
923 state,
924 watermark_generator: watermark_gen,
925 operator_index: 0,
926 }
927 }
928
929 fn create_lookup_table() -> HashMap<Vec<u8>, RecordBatch> {
930 let mut table = HashMap::new();
931 table.insert(
932 b"cust_1".to_vec(),
933 create_customer_batch("cust_1", "Alice", "gold"),
934 );
935 table.insert(
936 b"cust_2".to_vec(),
937 create_customer_batch("cust_2", "Bob", "silver"),
938 );
939 table.insert(
940 b"cust_3".to_vec(),
941 create_customer_batch("cust_3", "Charlie", "bronze"),
942 );
943 table
944 }
945
946 #[test]
947 fn test_lookup_join_type_properties() {
948 assert!(!LookupJoinType::Inner.emits_on_miss());
949 assert!(LookupJoinType::Left.emits_on_miss());
950 }
951
952 #[test]
953 fn test_config_builder() {
954 let config = LookupJoinConfig::builder()
955 .stream_key_column("customer_id".to_string())
956 .lookup_key_column("id".to_string())
957 .cache_ttl(Duration::from_secs(60))
958 .join_type(LookupJoinType::Left)
959 .max_cache_size(1000)
960 .operator_id("test_op".to_string())
961 .build()
962 .unwrap();
963
964 assert_eq!(config.stream_key_column, "customer_id");
965 assert_eq!(config.lookup_key_column, "id");
966 assert_eq!(config.cache_ttl, Duration::from_secs(60));
967 assert_eq!(config.join_type, LookupJoinType::Left);
968 assert_eq!(config.max_cache_size, 1000);
969 assert_eq!(config.operator_id, Some("test_op".to_string()));
970 }
971
972 #[test]
973 fn test_inner_join_basic() {
974 let config = LookupJoinConfig::builder()
975 .stream_key_column("customer_id".to_string())
976 .lookup_key_column("id".to_string())
977 .cache_ttl(Duration::from_secs(300))
978 .join_type(LookupJoinType::Inner)
979 .build()
980 .unwrap();
981
982 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
983 let lookup_table = create_lookup_table();
984
985 let mut timers = TimerService::new();
986 let mut state = InMemoryStore::new();
987 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
988
989 let order = create_order_event(1000, "cust_1", 100);
991 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
992
993 let outputs =
994 operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
995
996 assert_eq!(
998 outputs
999 .iter()
1000 .filter(|o| matches!(o, Output::Event(_)))
1001 .count(),
1002 1
1003 );
1004
1005 if let Some(Output::Event(event)) = outputs.first() {
1007 assert_eq!(event.data.num_columns(), 5); }
1009
1010 assert_eq!(operator.metrics().events_processed, 1);
1011 assert_eq!(operator.metrics().events_emitted, 1);
1012 assert_eq!(operator.metrics().lookups_found, 1);
1013 }
1014
1015 #[test]
1016 fn test_inner_join_no_match() {
1017 let config = LookupJoinConfig::builder()
1018 .stream_key_column("customer_id".to_string())
1019 .lookup_key_column("id".to_string())
1020 .join_type(LookupJoinType::Inner)
1021 .build()
1022 .unwrap();
1023
1024 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1025 let lookup_table = create_lookup_table();
1026
1027 let mut timers = TimerService::new();
1028 let mut state = InMemoryStore::new();
1029 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1030
1031 let order = create_order_event(1000, "cust_999", 100);
1033 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1034
1035 let outputs =
1036 operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1037
1038 assert_eq!(outputs.len(), 0);
1040 assert_eq!(operator.metrics().events_dropped, 1);
1041 assert_eq!(operator.metrics().lookups_not_found, 1);
1042 }
1043
1044 #[test]
1045 fn test_left_join_no_match() {
1046 let config = LookupJoinConfig::builder()
1047 .stream_key_column("customer_id".to_string())
1048 .lookup_key_column("id".to_string())
1049 .join_type(LookupJoinType::Left)
1050 .build()
1051 .unwrap();
1052
1053 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1054
1055 let mut timers = TimerService::new();
1056 let mut state = InMemoryStore::new();
1057 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1058
1059 let order1 = create_order_event(1000, "cust_1", 100);
1061 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1062 let lookup_table = create_lookup_table();
1063 operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1064
1065 let order2 = create_order_event(2000, "cust_999", 200);
1067 ctx.processing_time = 2_000_000;
1068 let outputs = operator.process_with_lookup(&order2, &mut ctx, |_| None);
1069
1070 assert_eq!(
1072 outputs
1073 .iter()
1074 .filter(|o| matches!(o, Output::Event(_)))
1075 .count(),
1076 1
1077 );
1078
1079 if let Some(Output::Event(event)) = outputs.first() {
1080 assert_eq!(event.data.num_columns(), 5); }
1082 }
1083
1084 #[test]
1085 fn test_cache_hit() {
1086 let config = LookupJoinConfig::builder()
1087 .stream_key_column("customer_id".to_string())
1088 .lookup_key_column("id".to_string())
1089 .cache_ttl(Duration::from_secs(300))
1090 .build()
1091 .unwrap();
1092
1093 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1094 let lookup_table = create_lookup_table();
1095
1096 let mut timers = TimerService::new();
1097 let mut state = InMemoryStore::new();
1098 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1099
1100 let order1 = create_order_event(1000, "cust_1", 100);
1102 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1103 operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1104
1105 assert_eq!(operator.metrics().cache_misses, 1);
1106 assert_eq!(operator.metrics().cache_hits, 0);
1107
1108 let order2 = create_order_event(2000, "cust_1", 200);
1110 ctx.processing_time = 2_000_000;
1111 operator.process_with_lookup(&order2, &mut ctx, |key| lookup_table.get(key).cloned());
1112
1113 assert_eq!(operator.metrics().cache_misses, 1); assert_eq!(operator.metrics().cache_hits, 1); }
1116
1117 #[test]
1118 fn test_cache_expiry() {
1119 let config = LookupJoinConfig::builder()
1120 .stream_key_column("customer_id".to_string())
1121 .lookup_key_column("id".to_string())
1122 .cache_ttl(Duration::from_secs(1)) .build()
1124 .unwrap();
1125
1126 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1127 let lookup_table = create_lookup_table();
1128
1129 let mut timers = TimerService::new();
1130 let mut state = InMemoryStore::new();
1131 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1132
1133 let order1 = create_order_event(1000, "cust_1", 100);
1135 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1136 ctx.processing_time = 1_000_000; operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1138
1139 assert_eq!(operator.metrics().cache_misses, 1);
1140
1141 let order2 = create_order_event(2000, "cust_1", 200);
1143 ctx.processing_time = 1_500_000;
1144 operator.process_with_lookup(&order2, &mut ctx, |key| lookup_table.get(key).cloned());
1145
1146 assert_eq!(operator.metrics().cache_hits, 1);
1147
1148 let order3 = create_order_event(3000, "cust_1", 300);
1150 ctx.processing_time = 3_000_000;
1151 operator.process_with_lookup(&order3, &mut ctx, |key| lookup_table.get(key).cloned());
1152
1153 assert_eq!(operator.metrics().cache_misses, 2); }
1155
1156 #[test]
1157 fn test_cache_timer_cleanup() {
1158 let config = LookupJoinConfig::builder()
1159 .stream_key_column("customer_id".to_string())
1160 .lookup_key_column("id".to_string())
1161 .cache_ttl(Duration::from_secs(1))
1162 .build()
1163 .unwrap();
1164
1165 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1166 let lookup_table = create_lookup_table();
1167
1168 let mut timers = TimerService::new();
1169 let mut state = InMemoryStore::new();
1170 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1171
1172 let order = create_order_event(1000, "cust_1", 100);
1174 {
1175 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1176 ctx.processing_time = 1_000_000;
1177 operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1178 }
1179
1180 assert!(state.len() > 0);
1182
1183 let registered_timers = timers.poll_timers(2_000_001);
1185 assert!(!registered_timers.is_empty());
1186
1187 for timer_reg in registered_timers {
1188 let timer = Timer {
1189 key: timer_reg.key.unwrap_or_default(),
1190 timestamp: timer_reg.timestamp,
1191 };
1192 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1193 ctx.processing_time = timer_reg.timestamp;
1194 operator.on_timer(timer, &mut ctx);
1195 }
1196
1197 assert_eq!(operator.metrics().cache_expirations, 1);
1198 }
1199
1200 #[test]
1201 fn test_checkpoint_restore() {
1202 let config = LookupJoinConfig::builder()
1203 .stream_key_column("customer_id".to_string())
1204 .lookup_key_column("id".to_string())
1205 .build()
1206 .unwrap();
1207
1208 let mut operator = LookupJoinOperator::with_id(config.clone(), "test_join".to_string());
1209
1210 operator.metrics.events_processed = 100;
1212 operator.metrics.cache_hits = 80;
1213 operator.metrics.cache_misses = 20;
1214 operator.metrics.lookups_found = 15;
1215 operator.metrics.lookups_not_found = 5;
1216
1217 let checkpoint = operator.checkpoint();
1219
1220 let mut restored = LookupJoinOperator::with_id(config, "test_join".to_string());
1222 restored.restore(checkpoint).unwrap();
1223
1224 assert_eq!(restored.metrics().events_processed, 100);
1225 assert_eq!(restored.metrics().cache_hits, 80);
1226 assert_eq!(restored.metrics().cache_misses, 20);
1227 assert_eq!(restored.metrics().lookups_found, 15);
1228 assert_eq!(restored.metrics().lookups_not_found, 5);
1229 }
1230
1231 #[test]
1232 fn test_integer_key_lookup() {
1233 fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
1234 let schema = Arc::new(Schema::new(vec![
1235 Field::new("key", DataType::Int64, false),
1236 Field::new("value", DataType::Int64, false),
1237 ]));
1238 let batch = RecordBatch::try_new(
1239 schema,
1240 vec![
1241 Arc::new(Int64Array::from(vec![key])),
1242 Arc::new(Int64Array::from(vec![value])),
1243 ],
1244 )
1245 .unwrap();
1246 Event::new(timestamp, batch)
1247 }
1248
1249 fn create_int_key_lookup(key: i64, data: &str) -> RecordBatch {
1250 let schema = Arc::new(Schema::new(vec![
1251 Field::new("key", DataType::Int64, false),
1252 Field::new("data", DataType::Utf8, false),
1253 ]));
1254 RecordBatch::try_new(
1255 schema,
1256 vec![
1257 Arc::new(Int64Array::from(vec![key])),
1258 Arc::new(StringArray::from(vec![data])),
1259 ],
1260 )
1261 .unwrap()
1262 }
1263
1264 let config = LookupJoinConfig::builder()
1265 .stream_key_column("key".to_string())
1266 .lookup_key_column("key".to_string())
1267 .build()
1268 .unwrap();
1269
1270 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1271
1272 let mut lookup_table: HashMap<Vec<u8>, RecordBatch> = HashMap::new();
1273 lookup_table.insert(
1274 42i64.to_le_bytes().to_vec(),
1275 create_int_key_lookup(42, "matched"),
1276 );
1277
1278 let mut timers = TimerService::new();
1279 let mut state = InMemoryStore::new();
1280 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1281
1282 let event = create_int_key_event(1000, 42, 100);
1283 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1284
1285 let outputs =
1286 operator.process_with_lookup(&event, &mut ctx, |key| lookup_table.get(key).cloned());
1287
1288 assert_eq!(
1289 outputs
1290 .iter()
1291 .filter(|o| matches!(o, Output::Event(_)))
1292 .count(),
1293 1
1294 );
1295 assert_eq!(operator.metrics().lookups_found, 1);
1296 }
1297
1298 #[test]
1299 fn test_async_lookup_flow() {
1300 let config = LookupJoinConfig::builder()
1301 .stream_key_column("customer_id".to_string())
1302 .lookup_key_column("id".to_string())
1303 .build()
1304 .unwrap();
1305
1306 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1307
1308 let mut timers = TimerService::new();
1309 let mut state = InMemoryStore::new();
1310 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1311
1312 let order = create_order_event(1000, "cust_1", 100);
1314 {
1315 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1316 let outputs = operator.process(&order, &mut ctx);
1317 assert!(outputs.is_empty()); }
1319
1320 assert_eq!(operator.pending_lookups().len(), 1);
1322 assert_eq!(operator.pending_lookups()[0], b"cust_1");
1323
1324 let lookup_result = create_customer_batch("cust_1", "Alice", "gold");
1326 {
1327 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1328 let outputs = operator.provide_lookup(b"cust_1", Some(&lookup_result), &mut ctx);
1329
1330 assert_eq!(
1332 outputs
1333 .iter()
1334 .filter(|o| matches!(o, Output::Event(_)))
1335 .count(),
1336 1
1337 );
1338 }
1339
1340 assert!(operator.pending_lookups().is_empty());
1342 }
1343
1344 #[test]
1345 fn test_cache_entry_serialization() {
1346 let batch = create_customer_batch("test", "Test", "gold");
1347 let entry = CacheEntry::found(1_000_000, &batch).unwrap();
1348
1349 assert!(entry.found);
1350 assert_eq!(entry.inserted_at, 1_000_000);
1351
1352 let restored = entry.to_batch().unwrap().unwrap();
1354 assert_eq!(restored.num_rows(), 1);
1355 assert_eq!(restored.num_columns(), 3);
1356 }
1357
1358 #[test]
1359 fn test_cache_entry_expiry() {
1360 let batch = create_customer_batch("test", "Test", "gold");
1361 let entry = CacheEntry::found(1_000_000, &batch).unwrap();
1362
1363 assert!(!entry.is_expired(1_500_000, 1_000_000));
1365
1366 assert!(entry.is_expired(2_500_000, 1_000_000));
1368 }
1369
1370 #[test]
1371 fn test_not_found_cache_entry() {
1372 let entry = CacheEntry::not_found(1_000_000);
1373
1374 assert!(!entry.found);
1375 assert!(entry.data.is_empty());
1376 assert!(entry.to_batch().unwrap().is_none());
1377 }
1378
1379 #[test]
1380 fn test_metrics_reset() {
1381 let mut metrics = LookupJoinMetrics::new();
1382 metrics.events_processed = 100;
1383 metrics.cache_hits = 50;
1384
1385 metrics.reset();
1386
1387 assert_eq!(metrics.events_processed, 0);
1388 assert_eq!(metrics.cache_hits, 0);
1389 }
1390
1391 #[test]
1392 fn test_multiple_events_same_key() {
1393 let config = LookupJoinConfig::builder()
1394 .stream_key_column("customer_id".to_string())
1395 .lookup_key_column("id".to_string())
1396 .cache_ttl(Duration::from_secs(300))
1397 .build()
1398 .unwrap();
1399
1400 let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1401 let lookup_table = create_lookup_table();
1402
1403 let mut timers = TimerService::new();
1404 let mut state = InMemoryStore::new();
1405 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1406
1407 for i in 0..5 {
1409 let order = create_order_event(1000 + i * 100, "cust_1", 100 + i);
1410 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1411 ctx.processing_time = 1_000_000 + i * 100_000;
1412 let outputs = operator
1413 .process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1414 assert_eq!(
1415 outputs
1416 .iter()
1417 .filter(|o| matches!(o, Output::Event(_)))
1418 .count(),
1419 1
1420 );
1421 }
1422
1423 assert_eq!(operator.metrics().cache_misses, 1);
1425 assert_eq!(operator.metrics().cache_hits, 4);
1426 assert_eq!(operator.metrics().events_emitted, 5);
1427 }
1428}