Skip to main content

laminar_core/operator/
lookup_join.rs

1//! # Lookup Join Operators
2//!
3//! Join streaming events with external reference tables (dimension tables).
4//!
5//! Lookup joins enrich streaming data with information from slowly-changing
6//! external tables. Unlike stream-stream joins (F019), lookup joins:
7//! - Have one streaming side (events) and one static/slowly-changing side (table)
8//! - Use caching to avoid repeated lookups for the same key
9//! - Support TTL-based cache expiration for slowly-changing data
10//!
11//! ## Architecture
12//!
13//! ```text
14//! ┌─────────────────────────────────────────────────────────────┐
15//! │                   LookupJoinOperator                        │
16//! │                                                             │
17//! │  ┌─────────┐    ┌─────────┐    ┌─────────┐                 │
18//! │  │ Event   │───▶│ Extract │───▶│ Cache   │──┐              │
19//! │  │ Stream  │    │   Key   │    │ Lookup  │  │ hit          │
20//! │  └─────────┘    └─────────┘    └────┬────┘  │              │
21//! │                                      │miss  ▼              │
22//! │                              ┌───────┴──────────┐          │
23//! │                              │  Table Loader    │          │
24//! │                              │  (async lookup)  │          │
25//! │                              └───────┬──────────┘          │
26//! │                                      │                     │
27//! │                                      ▼                     │
28//! │                              ┌───────────────┐             │
29//! │                              │ Update Cache  │             │
30//! │                              └───────┬───────┘             │
31//! │                                      │                     │
32//! │                                      ▼                     │
33//! │  ┌──────────────────────────────────────────────────────┐ │
34//! │  │                    Join & Emit                        │ │
35//! │  │  - Inner: only emit if lookup found                   │ │
36//! │  │  - Left: emit all events (null for missing lookups)   │ │
37//! │  └──────────────────────────────────────────────────────┘ │
38//! └─────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Example
42//!
43//! ```rust,no_run
44//! use laminar_core::operator::lookup_join::{
45//!     LookupJoinOperator, LookupJoinConfig, LookupJoinType,
46//! };
47//! use std::time::Duration;
48//! use std::sync::Arc;
49//!
50//! # struct MyTableLoader;
51//! # impl MyTableLoader { fn new() -> Self { Self } }
52//!
53//! // Create a lookup join that enriches orders with customer data
54//! let config = LookupJoinConfig::builder()
55//!     .stream_key_column("customer_id".to_string())
56//!     .lookup_key_column("id".to_string())
57//!     .cache_ttl(Duration::from_secs(300))  // 5 minute cache
58//!     .join_type(LookupJoinType::Left)
59//!     .build()
60//!     .unwrap();
61//!
62//! // let operator = LookupJoinOperator::new(config, Arc::new(loader));
63//! ```
64//!
65//! ## SQL Syntax
66//!
67//! ```sql
68//! SELECT o.*, c.name, c.tier
69//! FROM orders o
70//! JOIN customers c  -- Reference table
71//!     ON o.customer_id = c.id;
72//! ```
73//!
74//! ## Cache Management
75//!
76//! The operator maintains an internal cache in the state store:
77//! - Cache entries are keyed by `lkc:<key_hash>`
78//! - Each entry stores the lookup result and insertion timestamp
79//! - TTL-based expiration using processing time (not event time)
80//! - Timer-based cleanup of expired entries
81//!
82//! ## Performance Considerations
83//!
84//! - First lookup for a key incurs async table loader latency
85//! - Subsequent lookups within TTL are sub-microsecond (state store)
86//! - Cache size is bounded by unique keys in the stream
87//! - For high-cardinality keys, consider shorter TTL to limit memory
88
89use super::{
90    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
91    TimerKey,
92};
93use crate::state::StateStoreExt;
94use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
95use arrow_schema::{DataType, Field, Schema, SchemaRef};
96use fxhash::FxHashMap;
97use rkyv::{
98    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
99};
100use std::sync::atomic::{AtomicU64, Ordering};
101use std::sync::Arc;
102use std::time::Duration;
103
104/// Type of lookup join to perform.
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
106pub enum LookupJoinType {
107    /// Inner join - only emit events where lookup succeeds.
108    #[default]
109    Inner,
110    /// Left outer join - emit all events, with nulls for failed lookups.
111    Left,
112}
113
114impl LookupJoinType {
115    /// Returns true if failed lookups should still emit output.
116    #[must_use]
117    pub fn emits_on_miss(&self) -> bool {
118        matches!(self, LookupJoinType::Left)
119    }
120}
121
122/// Configuration for a lookup join operator.
123#[derive(Debug, Clone)]
124pub struct LookupJoinConfig {
125    /// Column name in the stream to use as lookup key.
126    pub stream_key_column: String,
127    /// Column name in the lookup table that matches the stream key.
128    pub lookup_key_column: String,
129    /// Time-to-live for cached lookup results.
130    pub cache_ttl: Duration,
131    /// Type of join to perform.
132    pub join_type: LookupJoinType,
133    /// Maximum number of entries to cache (0 = unlimited).
134    pub max_cache_size: usize,
135    /// Operator ID for checkpointing.
136    pub operator_id: Option<String>,
137}
138
139impl LookupJoinConfig {
140    /// Creates a new builder for lookup join configuration.
141    #[must_use]
142    pub fn builder() -> LookupJoinConfigBuilder {
143        LookupJoinConfigBuilder::default()
144    }
145}
146
147/// Builder for [`LookupJoinConfig`].
148#[derive(Debug, Default)]
149pub struct LookupJoinConfigBuilder {
150    stream_key_column: Option<String>,
151    lookup_key_column: Option<String>,
152    cache_ttl: Option<Duration>,
153    join_type: Option<LookupJoinType>,
154    max_cache_size: Option<usize>,
155    operator_id: Option<String>,
156}
157
158impl LookupJoinConfigBuilder {
159    /// Sets the stream key column name.
160    #[must_use]
161    pub fn stream_key_column(mut self, column: String) -> Self {
162        self.stream_key_column = Some(column);
163        self
164    }
165
166    /// Sets the lookup key column name.
167    #[must_use]
168    pub fn lookup_key_column(mut self, column: String) -> Self {
169        self.lookup_key_column = Some(column);
170        self
171    }
172
173    /// Sets the cache TTL.
174    #[must_use]
175    pub fn cache_ttl(mut self, ttl: Duration) -> Self {
176        self.cache_ttl = Some(ttl);
177        self
178    }
179
180    /// Sets the join type.
181    #[must_use]
182    pub fn join_type(mut self, join_type: LookupJoinType) -> Self {
183        self.join_type = Some(join_type);
184        self
185    }
186
187    /// Sets the maximum cache size (0 = unlimited).
188    #[must_use]
189    pub fn max_cache_size(mut self, size: usize) -> Self {
190        self.max_cache_size = Some(size);
191        self
192    }
193
194    /// Sets a custom operator ID.
195    #[must_use]
196    pub fn operator_id(mut self, id: String) -> Self {
197        self.operator_id = Some(id);
198        self
199    }
200
201    /// Builds the configuration.
202    ///
203    /// # Errors
204    ///
205    /// Returns `OperatorError::ConfigError` if required fields
206    /// (`stream_key_column`, `lookup_key_column`) are not set.
207    pub fn build(self) -> Result<LookupJoinConfig, OperatorError> {
208        Ok(LookupJoinConfig {
209            stream_key_column: self.stream_key_column.ok_or_else(|| {
210                OperatorError::ConfigError("stream_key_column is required".into())
211            })?,
212            lookup_key_column: self.lookup_key_column.ok_or_else(|| {
213                OperatorError::ConfigError("lookup_key_column is required".into())
214            })?,
215            cache_ttl: self.cache_ttl.unwrap_or(Duration::from_secs(300)),
216            join_type: self.join_type.unwrap_or_default(),
217            max_cache_size: self.max_cache_size.unwrap_or(0),
218            operator_id: self.operator_id,
219        })
220    }
221}
222
223/// State key prefix for cache entries.
224const CACHE_STATE_PREFIX: &[u8; 4] = b"lkc:";
225
226/// Timer key prefix for cache TTL expiration.
227const CACHE_TIMER_PREFIX: u8 = 0x40;
228
229/// Static counter for generating unique operator IDs.
230static LOOKUP_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
231
232/// A cached lookup entry stored in state.
233#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
234pub struct CacheEntry {
235    /// When this entry was inserted (processing time in microseconds).
236    pub inserted_at: i64,
237    /// Whether the lookup found a result.
238    pub found: bool,
239    /// Serialized lookup result (Arrow IPC format).
240    pub data: Vec<u8>,
241}
242
243impl CacheEntry {
244    /// Creates a new cache entry with a found result.
245    fn found(inserted_at: i64, batch: &RecordBatch) -> Result<Self, OperatorError> {
246        let data = Self::serialize_batch(batch)?;
247        Ok(Self {
248            inserted_at,
249            found: true,
250            data,
251        })
252    }
253
254    /// Creates a new cache entry for a not-found result.
255    fn not_found(inserted_at: i64) -> Self {
256        Self {
257            inserted_at,
258            found: false,
259            data: Vec::new(),
260        }
261    }
262
263    /// Checks if this entry has expired.
264    fn is_expired(&self, now: i64, ttl_us: i64) -> bool {
265        now - self.inserted_at > ttl_us
266    }
267
268    /// Serializes a record batch to bytes.
269    fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
270        let mut buf = Vec::new();
271        {
272            let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
273                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
274            writer
275                .write(batch)
276                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
277            writer
278                .finish()
279                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
280        }
281        Ok(buf)
282    }
283
284    /// Deserializes a record batch from bytes.
285    fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
286        let cursor = std::io::Cursor::new(data);
287        let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
288            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
289        reader
290            .next()
291            .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
292            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
293    }
294
295    /// Returns the cached batch if found.
296    ///
297    /// # Errors
298    ///
299    /// Returns `OperatorError::SerializationFailed` if the batch data is invalid.
300    pub fn to_batch(&self) -> Result<Option<RecordBatch>, OperatorError> {
301        if self.found {
302            Ok(Some(Self::deserialize_batch(&self.data)?))
303        } else {
304            Ok(None)
305        }
306    }
307}
308
309/// Metrics for tracking lookup join operations.
310#[derive(Debug, Clone, Default)]
311pub struct LookupJoinMetrics {
312    /// Number of events processed.
313    pub events_processed: u64,
314    /// Number of cache hits.
315    pub cache_hits: u64,
316    /// Number of cache misses (required lookup).
317    pub cache_misses: u64,
318    /// Number of successful lookups.
319    pub lookups_found: u64,
320    /// Number of lookups that returned not found.
321    pub lookups_not_found: u64,
322    /// Number of lookup errors.
323    pub lookup_errors: u64,
324    /// Number of joined events emitted.
325    pub events_emitted: u64,
326    /// Number of events dropped (inner join with no match).
327    pub events_dropped: u64,
328    /// Number of cache entries expired.
329    pub cache_expirations: u64,
330}
331
332impl LookupJoinMetrics {
333    /// Creates new metrics.
334    #[must_use]
335    pub fn new() -> Self {
336        Self::default()
337    }
338
339    /// Resets all counters.
340    pub fn reset(&mut self) {
341        *self = Self::default();
342    }
343}
344
345/// Result of a synchronous lookup operation.
346///
347/// Since the `Operator` trait is synchronous but `TableLoader` is async,
348/// the lookup join must handle lookups differently. This type represents
349/// lookups that can be resolved from cache.
350#[derive(Debug, Clone)]
351pub enum SyncLookupResult {
352    /// Cache hit - result is available.
353    CacheHit(Option<RecordBatch>),
354    /// Cache miss - lookup is pending (must be provided externally).
355    CacheMiss,
356}
357
358/// Lookup join operator that enriches streaming events with external table data.
359///
360/// This operator caches lookup results in the state store to minimize
361/// external lookups. The cache uses processing-time-based TTL for expiration.
362///
363/// # Async Lookup Handling
364///
365/// Since the [`Operator`] trait is synchronous, async table lookups must be
366/// handled externally. Use [`LookupJoinOperator::pending_lookups`] to get
367/// keys that need to be looked up, and [`LookupJoinOperator::provide_lookup`]
368/// to provide the results.
369///
370/// For simple synchronous use cases, use `LookupJoinOperator::process_with_lookup`
371/// which accepts a closure for lookups.
372pub struct LookupJoinOperator {
373    /// Configuration.
374    config: LookupJoinConfig,
375    /// Operator ID.
376    operator_id: String,
377    /// Metrics.
378    metrics: LookupJoinMetrics,
379    /// Output schema (lazily initialized).
380    output_schema: Option<SchemaRef>,
381    /// Stream schema (captured from first event).
382    stream_schema: Option<SchemaRef>,
383    /// Lookup table schema (captured from first lookup result).
384    lookup_schema: Option<SchemaRef>,
385    /// Cache TTL in microseconds.
386    cache_ttl_us: i64,
387    /// Keys pending external lookup.
388    pending_keys: Vec<Vec<u8>>,
389    /// Events waiting for pending lookups.
390    pending_events: Vec<(Event, Vec<u8>)>,
391    /// In-memory cache of deserialized batches to avoid repeated Arrow IPC deserialization.
392    /// Maps cache key bytes to the deserialized `RecordBatch`.
393    batch_cache: FxHashMap<Vec<u8>, Option<RecordBatch>>,
394}
395
396impl LookupJoinOperator {
397    /// Creates a new lookup join operator.
398    #[must_use]
399    #[allow(clippy::cast_possible_truncation)] // Duration.as_micros() fits i64 for practical values
400    pub fn new(config: LookupJoinConfig) -> Self {
401        let operator_id = config.operator_id.clone().unwrap_or_else(|| {
402            let num = LOOKUP_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
403            format!("lookup_join_{num}")
404        });
405
406        let cache_ttl_us = config.cache_ttl.as_micros() as i64;
407
408        Self {
409            config,
410            operator_id,
411            metrics: LookupJoinMetrics::new(),
412            output_schema: None,
413            stream_schema: None,
414            lookup_schema: None,
415            cache_ttl_us,
416            pending_keys: Vec::new(),
417            pending_events: Vec::new(),
418            batch_cache: FxHashMap::default(),
419        }
420    }
421
422    /// Creates a new lookup join operator with explicit ID.
423    #[must_use]
424    pub fn with_id(mut config: LookupJoinConfig, operator_id: String) -> Self {
425        config.operator_id = Some(operator_id);
426        Self::new(config)
427    }
428
429    /// Returns the configuration.
430    #[must_use]
431    pub fn config(&self) -> &LookupJoinConfig {
432        &self.config
433    }
434
435    /// Returns the metrics.
436    #[must_use]
437    pub fn metrics(&self) -> &LookupJoinMetrics {
438        &self.metrics
439    }
440
441    /// Resets the metrics.
442    pub fn reset_metrics(&mut self) {
443        self.metrics.reset();
444    }
445
446    /// Returns keys that need external lookup.
447    ///
448    /// After processing events, check this for keys that weren't in the cache.
449    /// Provide lookup results via [`provide_lookup`](Self::provide_lookup).
450    #[must_use]
451    pub fn pending_lookups(&self) -> &[Vec<u8>] {
452        &self.pending_keys
453    }
454
455    /// Provides a lookup result for a pending key.
456    ///
457    /// Call this after performing the async lookup to complete processing
458    /// of pending events.
459    ///
460    /// # Returns
461    ///
462    /// Output events that can now be emitted after the lookup result is available.
463    pub fn provide_lookup(
464        &mut self,
465        key: &[u8],
466        result: Option<&RecordBatch>,
467        ctx: &mut OperatorContext,
468    ) -> OutputVec {
469        let mut output = OutputVec::new();
470
471        // Update cache
472        let cache_key = Self::make_cache_key(key);
473        let entry = if let Some(batch) = result {
474            // Capture lookup schema on first result
475            if self.lookup_schema.is_none() {
476                self.lookup_schema = Some(batch.schema());
477                self.update_output_schema();
478            }
479            self.metrics.lookups_found += 1;
480            let Ok(e) = CacheEntry::found(ctx.processing_time, batch) else {
481                self.metrics.lookup_errors += 1;
482                return output;
483            };
484            e
485        } else {
486            self.metrics.lookups_not_found += 1;
487            CacheEntry::not_found(ctx.processing_time)
488        };
489
490        // Store in cache
491        if ctx.state.put_typed(&cache_key, &entry).is_err() {
492            return output;
493        }
494
495        // Populate in-memory batch cache
496        self.batch_cache.insert(cache_key.clone(), result.cloned());
497
498        // Register TTL timer
499        let expiry_time = ctx.processing_time + self.cache_ttl_us;
500        let timer_key = Self::make_timer_key(&cache_key);
501        ctx.timers
502            .register_timer(expiry_time, Some(timer_key), Some(ctx.operator_index));
503
504        // Process pending events for this key
505        let events_to_process: Vec<_> = self
506            .pending_events
507            .iter()
508            .filter(|(_, k)| k == key)
509            .map(|(e, _)| e.clone())
510            .collect();
511
512        self.pending_events.retain(|(_, k)| k != key);
513        self.pending_keys.retain(|k| k != key);
514
515        for event in events_to_process {
516            if let Some(joined) = self.create_joined_event(&event, result) {
517                self.metrics.events_emitted += 1;
518                output.push(Output::Event(joined));
519            } else if self.config.join_type.emits_on_miss() {
520                // Left join: emit with nulls
521                if let Some(joined) = self.create_unmatched_event(&event) {
522                    self.metrics.events_emitted += 1;
523                    output.push(Output::Event(joined));
524                }
525            } else {
526                self.metrics.events_dropped += 1;
527            }
528        }
529
530        output
531    }
532
533    /// Processes an event with a synchronous lookup function.
534    ///
535    /// This is a convenience method for cases where lookups can be done
536    /// synchronously (e.g., from an in-memory table).
537    pub fn process_with_lookup<F>(
538        &mut self,
539        event: &Event,
540        ctx: &mut OperatorContext,
541        lookup_fn: F,
542    ) -> OutputVec
543    where
544        F: FnOnce(&[u8]) -> Option<RecordBatch>,
545    {
546        self.metrics.events_processed += 1;
547
548        // Capture stream schema
549        if self.stream_schema.is_none() {
550            self.stream_schema = Some(event.data.schema());
551        }
552
553        // Extract join key
554        let Some(key) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
555            return OutputVec::new();
556        };
557
558        // Check cache first
559        let cache_key = Self::make_cache_key(&key);
560        if let Some(cached) = self.lookup_cache(&cache_key, ctx) {
561            self.metrics.cache_hits += 1;
562            return self.emit_result(event, cached.as_ref(), ctx);
563        }
564
565        // Cache miss - perform lookup
566        self.metrics.cache_misses += 1;
567        let result = lookup_fn(&key);
568
569        // Update cache
570        let entry = if let Some(ref batch) = result {
571            if self.lookup_schema.is_none() {
572                self.lookup_schema = Some(batch.schema());
573                self.update_output_schema();
574            }
575            self.metrics.lookups_found += 1;
576            let Ok(e) = CacheEntry::found(ctx.processing_time, batch) else {
577                self.metrics.lookup_errors += 1;
578                return OutputVec::new();
579            };
580            e
581        } else {
582            self.metrics.lookups_not_found += 1;
583            CacheEntry::not_found(ctx.processing_time)
584        };
585
586        if ctx.state.put_typed(&cache_key, &entry).is_ok() {
587            // Populate in-memory batch cache
588            self.batch_cache.insert(cache_key.clone(), result.clone());
589
590            // Register TTL timer
591            let expiry_time = ctx.processing_time + self.cache_ttl_us;
592            let timer_key = Self::make_timer_key(&cache_key);
593            ctx.timers
594                .register_timer(expiry_time, Some(timer_key), Some(ctx.operator_index));
595        }
596
597        self.emit_result(event, result.as_ref(), ctx)
598    }
599
600    /// Looks up a key in the cache.
601    ///
602    /// Returns:
603    /// - `None` if cache miss (key not found or expired)
604    /// - `Some(None)` if cached "not found" result
605    /// - `Some(Some(batch))` if cached found result
606    ///
607    /// Uses an in-memory batch cache to avoid repeated Arrow IPC deserialization
608    /// when the same cache key is looked up multiple times.
609    #[allow(clippy::option_option)]
610    fn lookup_cache(
611        &mut self,
612        cache_key: &[u8],
613        ctx: &OperatorContext,
614    ) -> Option<Option<RecordBatch>> {
615        let entry: CacheEntry = ctx.state.get_typed(cache_key).ok()??;
616
617        // Check if expired
618        if entry.is_expired(ctx.processing_time, self.cache_ttl_us) {
619            self.batch_cache.remove(cache_key);
620            return None;
621        }
622
623        // Check in-memory batch cache first
624        if let Some(cached) = self.batch_cache.get(cache_key) {
625            return Some(cached.clone());
626        }
627
628        // Deserialize and cache the batch
629        let result = entry.to_batch().ok()?;
630        self.batch_cache.insert(cache_key.to_vec(), result.clone());
631        Some(result)
632    }
633
634    /// Emits the join result for an event.
635    fn emit_result(
636        &mut self,
637        event: &Event,
638        lookup_result: Option<&RecordBatch>,
639        _ctx: &mut OperatorContext,
640    ) -> OutputVec {
641        let mut output = OutputVec::new();
642
643        if let Some(lookup_batch) = lookup_result {
644            // Found - create joined output
645            if let Some(joined) = self.create_joined_event(event, Some(lookup_batch)) {
646                self.metrics.events_emitted += 1;
647                output.push(Output::Event(joined));
648            }
649        } else if self.config.join_type.emits_on_miss() {
650            // Left join - emit with nulls
651            if let Some(joined) = self.create_unmatched_event(event) {
652                self.metrics.events_emitted += 1;
653                output.push(Output::Event(joined));
654            }
655        } else {
656            // Inner join - drop event
657            self.metrics.events_dropped += 1;
658        }
659
660        output
661    }
662
663    /// Extracts the join key value from a record batch.
664    fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
665        let column_index = batch.schema().index_of(column_name).ok()?;
666        let column = batch.column(column_index);
667
668        // Handle different column types
669        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
670            if string_array.is_empty() || string_array.is_null(0) {
671                return None;
672            }
673            return Some(string_array.value(0).as_bytes().to_vec());
674        }
675
676        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
677            if int_array.is_empty() || int_array.is_null(0) {
678                return None;
679            }
680            return Some(int_array.value(0).to_le_bytes().to_vec());
681        }
682
683        None
684    }
685
686    /// Creates a cache key from a lookup key.
687    fn make_cache_key(key: &[u8]) -> Vec<u8> {
688        let key_hash = fxhash::hash64(key);
689        let mut cache_key = Vec::with_capacity(12);
690        cache_key.extend_from_slice(CACHE_STATE_PREFIX);
691        cache_key.extend_from_slice(&key_hash.to_be_bytes());
692        cache_key
693    }
694
695    /// Creates a timer key for cache TTL expiration.
696    fn make_timer_key(cache_key: &[u8]) -> TimerKey {
697        let mut key = TimerKey::new();
698        key.push(CACHE_TIMER_PREFIX);
699        key.extend_from_slice(cache_key);
700        key
701    }
702
703    /// Parses a timer key to extract the cache key.
704    fn parse_timer_key(key: &[u8]) -> Option<Vec<u8>> {
705        if key.is_empty() || key[0] != CACHE_TIMER_PREFIX {
706            return None;
707        }
708        Some(key[1..].to_vec())
709    }
710
711    /// Updates the output schema when both input schemas are known.
712    fn update_output_schema(&mut self) {
713        if let (Some(stream), Some(lookup)) = (&self.stream_schema, &self.lookup_schema) {
714            let mut fields: Vec<Field> =
715                stream.fields().iter().map(|f| f.as_ref().clone()).collect();
716
717            // Add lookup fields, prefixing duplicates
718            for field in lookup.fields() {
719                let name = if stream.field_with_name(field.name()).is_ok() {
720                    format!("lookup_{}", field.name())
721                } else {
722                    field.name().clone()
723                };
724                fields.push(Field::new(
725                    name,
726                    field.data_type().clone(),
727                    true, // Nullable for left joins
728                ));
729            }
730
731            self.output_schema = Some(Arc::new(Schema::new(fields)));
732        }
733    }
734
735    /// Creates a joined event from stream event and lookup result.
736    fn create_joined_event(&self, event: &Event, lookup: Option<&RecordBatch>) -> Option<Event> {
737        let lookup_batch = lookup?;
738        let schema = self.output_schema.as_ref()?;
739
740        let mut columns: Vec<ArrayRef> = event.data.columns().to_vec();
741        for column in lookup_batch.columns() {
742            columns.push(Arc::clone(column));
743        }
744
745        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
746
747        Some(Event::new(event.timestamp, joined_batch))
748    }
749
750    /// Creates an unmatched event for left joins (with null lookup columns).
751    fn create_unmatched_event(&self, event: &Event) -> Option<Event> {
752        let schema = self.output_schema.as_ref()?;
753        let lookup_schema = self.lookup_schema.as_ref()?;
754
755        let num_rows = event.data.num_rows();
756        let mut columns: Vec<ArrayRef> = event.data.columns().to_vec();
757
758        // Add null columns for lookup side
759        for field in lookup_schema.fields() {
760            columns.push(Self::create_null_array(field.data_type(), num_rows));
761        }
762
763        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
764
765        Some(Event::new(event.timestamp, joined_batch))
766    }
767
768    /// Creates a null array of the given type and length.
769    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
770        match data_type {
771            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
772            // Default to Int64 for unknown types
773            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
774        }
775    }
776
777    /// Handles cache TTL timer expiration.
778    fn handle_cache_expiry(&mut self, cache_key: &[u8], ctx: &mut OperatorContext) -> OutputVec {
779        if ctx.state.delete(cache_key).is_ok() {
780            self.batch_cache.remove(cache_key);
781            self.metrics.cache_expirations += 1;
782        }
783        OutputVec::new()
784    }
785}
786
787impl Operator for LookupJoinOperator {
788    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
789        self.metrics.events_processed += 1;
790
791        // Capture stream schema
792        if self.stream_schema.is_none() {
793            self.stream_schema = Some(event.data.schema());
794        }
795
796        // Extract join key
797        let Some(key) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
798            return OutputVec::new();
799        };
800
801        // Check cache
802        let cache_key = Self::make_cache_key(&key);
803        if let Some(cached) = self.lookup_cache(&cache_key, ctx) {
804            self.metrics.cache_hits += 1;
805            return self.emit_result(event, cached.as_ref(), ctx);
806        }
807
808        // Cache miss - add to pending
809        self.metrics.cache_misses += 1;
810        if !self.pending_keys.contains(&key) {
811            self.pending_keys.push(key.clone());
812        }
813        self.pending_events.push((event.clone(), key));
814
815        OutputVec::new()
816    }
817
818    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
819        if let Some(cache_key) = Self::parse_timer_key(&timer.key) {
820            return self.handle_cache_expiry(&cache_key, ctx);
821        }
822        OutputVec::new()
823    }
824
825    fn checkpoint(&self) -> OperatorState {
826        let checkpoint_data = (
827            self.config.stream_key_column.clone(),
828            self.config.lookup_key_column.clone(),
829            self.metrics.events_processed,
830            self.metrics.cache_hits,
831            self.metrics.cache_misses,
832            self.metrics.lookups_found,
833            self.metrics.lookups_not_found,
834        );
835
836        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
837            .map(|v| v.to_vec())
838            .unwrap_or_default();
839
840        OperatorState {
841            operator_id: self.operator_id.clone(),
842            data,
843        }
844    }
845
846    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
847        type CheckpointData = (String, String, u64, u64, u64, u64, u64);
848
849        if state.operator_id != self.operator_id {
850            return Err(OperatorError::StateAccessFailed(format!(
851                "Operator ID mismatch: expected {}, got {}",
852                self.operator_id, state.operator_id
853            )));
854        }
855
856        let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
857            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
858        let (_, _, events_processed, cache_hits, cache_misses, lookups_found, lookups_not_found) =
859            rkyv::deserialize::<CheckpointData, RkyvError>(archived)
860                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
861
862        self.metrics.events_processed = events_processed;
863        self.metrics.cache_hits = cache_hits;
864        self.metrics.cache_misses = cache_misses;
865        self.metrics.lookups_found = lookups_found;
866        self.metrics.lookups_not_found = lookups_not_found;
867        self.batch_cache.clear();
868
869        Ok(())
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use super::*;
876    use crate::state::{InMemoryStore, StateStore};
877    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
878    use arrow_schema::{DataType, Field, Schema};
879    use std::collections::HashMap;
880
881    fn create_order_event(timestamp: i64, customer_id: &str, amount: i64) -> Event {
882        let schema = Arc::new(Schema::new(vec![
883            Field::new("customer_id", DataType::Utf8, false),
884            Field::new("amount", DataType::Int64, false),
885        ]));
886        let batch = RecordBatch::try_new(
887            schema,
888            vec![
889                Arc::new(StringArray::from(vec![customer_id])),
890                Arc::new(Int64Array::from(vec![amount])),
891            ],
892        )
893        .unwrap();
894        Event::new(timestamp, batch)
895    }
896
897    fn create_customer_batch(id: &str, name: &str, tier: &str) -> RecordBatch {
898        let schema = Arc::new(Schema::new(vec![
899            Field::new("id", DataType::Utf8, false),
900            Field::new("name", DataType::Utf8, false),
901            Field::new("tier", DataType::Utf8, false),
902        ]));
903        RecordBatch::try_new(
904            schema,
905            vec![
906                Arc::new(StringArray::from(vec![id])),
907                Arc::new(StringArray::from(vec![name])),
908                Arc::new(StringArray::from(vec![tier])),
909            ],
910        )
911        .unwrap()
912    }
913
914    fn create_test_context<'a>(
915        timers: &'a mut TimerService,
916        state: &'a mut dyn StateStore,
917        watermark_gen: &'a mut dyn WatermarkGenerator,
918    ) -> OperatorContext<'a> {
919        OperatorContext {
920            event_time: 0,
921            processing_time: 1_000_000, // 1 second in microseconds
922            timers,
923            state,
924            watermark_generator: watermark_gen,
925            operator_index: 0,
926        }
927    }
928
929    fn create_lookup_table() -> HashMap<Vec<u8>, RecordBatch> {
930        let mut table = HashMap::new();
931        table.insert(
932            b"cust_1".to_vec(),
933            create_customer_batch("cust_1", "Alice", "gold"),
934        );
935        table.insert(
936            b"cust_2".to_vec(),
937            create_customer_batch("cust_2", "Bob", "silver"),
938        );
939        table.insert(
940            b"cust_3".to_vec(),
941            create_customer_batch("cust_3", "Charlie", "bronze"),
942        );
943        table
944    }
945
946    #[test]
947    fn test_lookup_join_type_properties() {
948        assert!(!LookupJoinType::Inner.emits_on_miss());
949        assert!(LookupJoinType::Left.emits_on_miss());
950    }
951
952    #[test]
953    fn test_config_builder() {
954        let config = LookupJoinConfig::builder()
955            .stream_key_column("customer_id".to_string())
956            .lookup_key_column("id".to_string())
957            .cache_ttl(Duration::from_secs(60))
958            .join_type(LookupJoinType::Left)
959            .max_cache_size(1000)
960            .operator_id("test_op".to_string())
961            .build()
962            .unwrap();
963
964        assert_eq!(config.stream_key_column, "customer_id");
965        assert_eq!(config.lookup_key_column, "id");
966        assert_eq!(config.cache_ttl, Duration::from_secs(60));
967        assert_eq!(config.join_type, LookupJoinType::Left);
968        assert_eq!(config.max_cache_size, 1000);
969        assert_eq!(config.operator_id, Some("test_op".to_string()));
970    }
971
972    #[test]
973    fn test_inner_join_basic() {
974        let config = LookupJoinConfig::builder()
975            .stream_key_column("customer_id".to_string())
976            .lookup_key_column("id".to_string())
977            .cache_ttl(Duration::from_secs(300))
978            .join_type(LookupJoinType::Inner)
979            .build()
980            .unwrap();
981
982        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
983        let lookup_table = create_lookup_table();
984
985        let mut timers = TimerService::new();
986        let mut state = InMemoryStore::new();
987        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
988
989        // Process order for existing customer
990        let order = create_order_event(1000, "cust_1", 100);
991        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
992
993        let outputs =
994            operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
995
996        // Should emit joined event
997        assert_eq!(
998            outputs
999                .iter()
1000                .filter(|o| matches!(o, Output::Event(_)))
1001                .count(),
1002            1
1003        );
1004
1005        // Check output schema has both stream and lookup columns
1006        if let Some(Output::Event(event)) = outputs.first() {
1007            assert_eq!(event.data.num_columns(), 5); // 2 stream + 3 lookup
1008        }
1009
1010        assert_eq!(operator.metrics().events_processed, 1);
1011        assert_eq!(operator.metrics().events_emitted, 1);
1012        assert_eq!(operator.metrics().lookups_found, 1);
1013    }
1014
1015    #[test]
1016    fn test_inner_join_no_match() {
1017        let config = LookupJoinConfig::builder()
1018            .stream_key_column("customer_id".to_string())
1019            .lookup_key_column("id".to_string())
1020            .join_type(LookupJoinType::Inner)
1021            .build()
1022            .unwrap();
1023
1024        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1025        let lookup_table = create_lookup_table();
1026
1027        let mut timers = TimerService::new();
1028        let mut state = InMemoryStore::new();
1029        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1030
1031        // Process order for non-existing customer
1032        let order = create_order_event(1000, "cust_999", 100);
1033        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1034
1035        let outputs =
1036            operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1037
1038        // Inner join should emit nothing for missing lookup
1039        assert_eq!(outputs.len(), 0);
1040        assert_eq!(operator.metrics().events_dropped, 1);
1041        assert_eq!(operator.metrics().lookups_not_found, 1);
1042    }
1043
1044    #[test]
1045    fn test_left_join_no_match() {
1046        let config = LookupJoinConfig::builder()
1047            .stream_key_column("customer_id".to_string())
1048            .lookup_key_column("id".to_string())
1049            .join_type(LookupJoinType::Left)
1050            .build()
1051            .unwrap();
1052
1053        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1054
1055        let mut timers = TimerService::new();
1056        let mut state = InMemoryStore::new();
1057        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1058
1059        // First, process an event that matches to establish lookup schema
1060        let order1 = create_order_event(1000, "cust_1", 100);
1061        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1062        let lookup_table = create_lookup_table();
1063        operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1064
1065        // Now process order for non-existing customer
1066        let order2 = create_order_event(2000, "cust_999", 200);
1067        ctx.processing_time = 2_000_000;
1068        let outputs = operator.process_with_lookup(&order2, &mut ctx, |_| None);
1069
1070        // Left join should emit event with nulls
1071        assert_eq!(
1072            outputs
1073                .iter()
1074                .filter(|o| matches!(o, Output::Event(_)))
1075                .count(),
1076            1
1077        );
1078
1079        if let Some(Output::Event(event)) = outputs.first() {
1080            assert_eq!(event.data.num_columns(), 5); // Stream cols + null lookup cols
1081        }
1082    }
1083
1084    #[test]
1085    fn test_cache_hit() {
1086        let config = LookupJoinConfig::builder()
1087            .stream_key_column("customer_id".to_string())
1088            .lookup_key_column("id".to_string())
1089            .cache_ttl(Duration::from_secs(300))
1090            .build()
1091            .unwrap();
1092
1093        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1094        let lookup_table = create_lookup_table();
1095
1096        let mut timers = TimerService::new();
1097        let mut state = InMemoryStore::new();
1098        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1099
1100        // First lookup - cache miss
1101        let order1 = create_order_event(1000, "cust_1", 100);
1102        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1103        operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1104
1105        assert_eq!(operator.metrics().cache_misses, 1);
1106        assert_eq!(operator.metrics().cache_hits, 0);
1107
1108        // Second lookup - cache hit
1109        let order2 = create_order_event(2000, "cust_1", 200);
1110        ctx.processing_time = 2_000_000;
1111        operator.process_with_lookup(&order2, &mut ctx, |key| lookup_table.get(key).cloned());
1112
1113        assert_eq!(operator.metrics().cache_misses, 1); // Still 1
1114        assert_eq!(operator.metrics().cache_hits, 1); // Now 1
1115    }
1116
1117    #[test]
1118    fn test_cache_expiry() {
1119        let config = LookupJoinConfig::builder()
1120            .stream_key_column("customer_id".to_string())
1121            .lookup_key_column("id".to_string())
1122            .cache_ttl(Duration::from_secs(1)) // 1 second TTL
1123            .build()
1124            .unwrap();
1125
1126        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1127        let lookup_table = create_lookup_table();
1128
1129        let mut timers = TimerService::new();
1130        let mut state = InMemoryStore::new();
1131        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1132
1133        // First lookup at t=1s
1134        let order1 = create_order_event(1000, "cust_1", 100);
1135        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1136        ctx.processing_time = 1_000_000; // 1 second in microseconds
1137        operator.process_with_lookup(&order1, &mut ctx, |key| lookup_table.get(key).cloned());
1138
1139        assert_eq!(operator.metrics().cache_misses, 1);
1140
1141        // Second lookup at t=1.5s - should still hit cache
1142        let order2 = create_order_event(2000, "cust_1", 200);
1143        ctx.processing_time = 1_500_000;
1144        operator.process_with_lookup(&order2, &mut ctx, |key| lookup_table.get(key).cloned());
1145
1146        assert_eq!(operator.metrics().cache_hits, 1);
1147
1148        // Third lookup at t=3s - cache should have expired
1149        let order3 = create_order_event(3000, "cust_1", 300);
1150        ctx.processing_time = 3_000_000;
1151        operator.process_with_lookup(&order3, &mut ctx, |key| lookup_table.get(key).cloned());
1152
1153        assert_eq!(operator.metrics().cache_misses, 2); // New miss
1154    }
1155
1156    #[test]
1157    fn test_cache_timer_cleanup() {
1158        let config = LookupJoinConfig::builder()
1159            .stream_key_column("customer_id".to_string())
1160            .lookup_key_column("id".to_string())
1161            .cache_ttl(Duration::from_secs(1))
1162            .build()
1163            .unwrap();
1164
1165        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1166        let lookup_table = create_lookup_table();
1167
1168        let mut timers = TimerService::new();
1169        let mut state = InMemoryStore::new();
1170        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1171
1172        // Perform lookup
1173        let order = create_order_event(1000, "cust_1", 100);
1174        {
1175            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1176            ctx.processing_time = 1_000_000;
1177            operator.process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1178        }
1179
1180        // State should have cache entry
1181        assert!(state.len() > 0);
1182
1183        // Fire timer after TTL
1184        let registered_timers = timers.poll_timers(2_000_001);
1185        assert!(!registered_timers.is_empty());
1186
1187        for timer_reg in registered_timers {
1188            let timer = Timer {
1189                key: timer_reg.key.unwrap_or_default(),
1190                timestamp: timer_reg.timestamp,
1191            };
1192            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1193            ctx.processing_time = timer_reg.timestamp;
1194            operator.on_timer(timer, &mut ctx);
1195        }
1196
1197        assert_eq!(operator.metrics().cache_expirations, 1);
1198    }
1199
1200    #[test]
1201    fn test_checkpoint_restore() {
1202        let config = LookupJoinConfig::builder()
1203            .stream_key_column("customer_id".to_string())
1204            .lookup_key_column("id".to_string())
1205            .build()
1206            .unwrap();
1207
1208        let mut operator = LookupJoinOperator::with_id(config.clone(), "test_join".to_string());
1209
1210        // Simulate activity
1211        operator.metrics.events_processed = 100;
1212        operator.metrics.cache_hits = 80;
1213        operator.metrics.cache_misses = 20;
1214        operator.metrics.lookups_found = 15;
1215        operator.metrics.lookups_not_found = 5;
1216
1217        // Checkpoint
1218        let checkpoint = operator.checkpoint();
1219
1220        // Restore to new operator
1221        let mut restored = LookupJoinOperator::with_id(config, "test_join".to_string());
1222        restored.restore(checkpoint).unwrap();
1223
1224        assert_eq!(restored.metrics().events_processed, 100);
1225        assert_eq!(restored.metrics().cache_hits, 80);
1226        assert_eq!(restored.metrics().cache_misses, 20);
1227        assert_eq!(restored.metrics().lookups_found, 15);
1228        assert_eq!(restored.metrics().lookups_not_found, 5);
1229    }
1230
1231    #[test]
1232    fn test_integer_key_lookup() {
1233        fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
1234            let schema = Arc::new(Schema::new(vec![
1235                Field::new("key", DataType::Int64, false),
1236                Field::new("value", DataType::Int64, false),
1237            ]));
1238            let batch = RecordBatch::try_new(
1239                schema,
1240                vec![
1241                    Arc::new(Int64Array::from(vec![key])),
1242                    Arc::new(Int64Array::from(vec![value])),
1243                ],
1244            )
1245            .unwrap();
1246            Event::new(timestamp, batch)
1247        }
1248
1249        fn create_int_key_lookup(key: i64, data: &str) -> RecordBatch {
1250            let schema = Arc::new(Schema::new(vec![
1251                Field::new("key", DataType::Int64, false),
1252                Field::new("data", DataType::Utf8, false),
1253            ]));
1254            RecordBatch::try_new(
1255                schema,
1256                vec![
1257                    Arc::new(Int64Array::from(vec![key])),
1258                    Arc::new(StringArray::from(vec![data])),
1259                ],
1260            )
1261            .unwrap()
1262        }
1263
1264        let config = LookupJoinConfig::builder()
1265            .stream_key_column("key".to_string())
1266            .lookup_key_column("key".to_string())
1267            .build()
1268            .unwrap();
1269
1270        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1271
1272        let mut lookup_table: HashMap<Vec<u8>, RecordBatch> = HashMap::new();
1273        lookup_table.insert(
1274            42i64.to_le_bytes().to_vec(),
1275            create_int_key_lookup(42, "matched"),
1276        );
1277
1278        let mut timers = TimerService::new();
1279        let mut state = InMemoryStore::new();
1280        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1281
1282        let event = create_int_key_event(1000, 42, 100);
1283        let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1284
1285        let outputs =
1286            operator.process_with_lookup(&event, &mut ctx, |key| lookup_table.get(key).cloned());
1287
1288        assert_eq!(
1289            outputs
1290                .iter()
1291                .filter(|o| matches!(o, Output::Event(_)))
1292                .count(),
1293            1
1294        );
1295        assert_eq!(operator.metrics().lookups_found, 1);
1296    }
1297
1298    #[test]
1299    fn test_async_lookup_flow() {
1300        let config = LookupJoinConfig::builder()
1301            .stream_key_column("customer_id".to_string())
1302            .lookup_key_column("id".to_string())
1303            .build()
1304            .unwrap();
1305
1306        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1307
1308        let mut timers = TimerService::new();
1309        let mut state = InMemoryStore::new();
1310        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1311
1312        // Process event - should go to pending
1313        let order = create_order_event(1000, "cust_1", 100);
1314        {
1315            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1316            let outputs = operator.process(&order, &mut ctx);
1317            assert!(outputs.is_empty()); // No output yet
1318        }
1319
1320        // Check pending lookups
1321        assert_eq!(operator.pending_lookups().len(), 1);
1322        assert_eq!(operator.pending_lookups()[0], b"cust_1");
1323
1324        // Provide lookup result
1325        let lookup_result = create_customer_batch("cust_1", "Alice", "gold");
1326        {
1327            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1328            let outputs = operator.provide_lookup(b"cust_1", Some(&lookup_result), &mut ctx);
1329
1330            // Now should emit
1331            assert_eq!(
1332                outputs
1333                    .iter()
1334                    .filter(|o| matches!(o, Output::Event(_)))
1335                    .count(),
1336                1
1337            );
1338        }
1339
1340        // Pending should be cleared
1341        assert!(operator.pending_lookups().is_empty());
1342    }
1343
1344    #[test]
1345    fn test_cache_entry_serialization() {
1346        let batch = create_customer_batch("test", "Test", "gold");
1347        let entry = CacheEntry::found(1_000_000, &batch).unwrap();
1348
1349        assert!(entry.found);
1350        assert_eq!(entry.inserted_at, 1_000_000);
1351
1352        // Verify round-trip
1353        let restored = entry.to_batch().unwrap().unwrap();
1354        assert_eq!(restored.num_rows(), 1);
1355        assert_eq!(restored.num_columns(), 3);
1356    }
1357
1358    #[test]
1359    fn test_cache_entry_expiry() {
1360        let batch = create_customer_batch("test", "Test", "gold");
1361        let entry = CacheEntry::found(1_000_000, &batch).unwrap();
1362
1363        // Not expired at t=1.5s with 1s TTL
1364        assert!(!entry.is_expired(1_500_000, 1_000_000));
1365
1366        // Expired at t=2.5s with 1s TTL
1367        assert!(entry.is_expired(2_500_000, 1_000_000));
1368    }
1369
1370    #[test]
1371    fn test_not_found_cache_entry() {
1372        let entry = CacheEntry::not_found(1_000_000);
1373
1374        assert!(!entry.found);
1375        assert!(entry.data.is_empty());
1376        assert!(entry.to_batch().unwrap().is_none());
1377    }
1378
1379    #[test]
1380    fn test_metrics_reset() {
1381        let mut metrics = LookupJoinMetrics::new();
1382        metrics.events_processed = 100;
1383        metrics.cache_hits = 50;
1384
1385        metrics.reset();
1386
1387        assert_eq!(metrics.events_processed, 0);
1388        assert_eq!(metrics.cache_hits, 0);
1389    }
1390
1391    #[test]
1392    fn test_multiple_events_same_key() {
1393        let config = LookupJoinConfig::builder()
1394            .stream_key_column("customer_id".to_string())
1395            .lookup_key_column("id".to_string())
1396            .cache_ttl(Duration::from_secs(300))
1397            .build()
1398            .unwrap();
1399
1400        let mut operator = LookupJoinOperator::with_id(config, "test_join".to_string());
1401        let lookup_table = create_lookup_table();
1402
1403        let mut timers = TimerService::new();
1404        let mut state = InMemoryStore::new();
1405        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1406
1407        // Process multiple events with same key
1408        for i in 0..5 {
1409            let order = create_order_event(1000 + i * 100, "cust_1", 100 + i);
1410            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1411            ctx.processing_time = 1_000_000 + i * 100_000;
1412            let outputs = operator
1413                .process_with_lookup(&order, &mut ctx, |key| lookup_table.get(key).cloned());
1414            assert_eq!(
1415                outputs
1416                    .iter()
1417                    .filter(|o| matches!(o, Output::Event(_)))
1418                    .count(),
1419                1
1420            );
1421        }
1422
1423        // Should have 1 miss and 4 hits
1424        assert_eq!(operator.metrics().cache_misses, 1);
1425        assert_eq!(operator.metrics().cache_hits, 4);
1426        assert_eq!(operator.metrics().events_emitted, 5);
1427    }
1428}