Skip to main content

laminar_core/operator/
temporal_join.rs

1//! # Temporal Join Operators (F021)
2//!
3//! Join streaming events with versioned tables using point-in-time lookups.
4//! Temporal joins return the table value that was valid at the event's timestamp,
5//! enabling consistent enrichment with time-varying dimension data.
6//!
7//! ## Use Cases
8//!
9//! - Currency rate lookup at transaction time
10//! - Product price lookup at order time
11//! - User tier lookup at event time
12//! - Regulatory compliance (audit trail)
13//!
14//! ## Join Types
15//!
16//! - **Event-Time**: Deterministic lookup based on event timestamp (`FOR SYSTEM_TIME AS OF`)
17//! - **Process-Time**: Non-deterministic lookup based on processing time (latest value)
18//!
19//! ## Table Characteristics
20//!
21//! - **Append-Only**: Only inserts, no updates/deletes. Optimized: no stream-side state needed.
22//! - **Non-Append-Only**: Has updates/deletes. Requires state to track join results for retractions.
23//!
24//! ## Example
25//!
26//! ```rust,no_run
27//! use laminar_core::operator::temporal_join::{
28//!     TemporalJoinOperator, TemporalJoinConfig, TemporalJoinSemantics,
29//!     TableCharacteristics, TemporalJoinType,
30//! };
31//!
32//! // Join orders with currency rates valid at order time
33//! let config = TemporalJoinConfig::builder()
34//!     .stream_key_column("currency".to_string())
35//!     .table_key_column("currency".to_string())
36//!     .table_version_column("valid_from".to_string())
37//!     .semantics(TemporalJoinSemantics::EventTime)
38//!     .table_characteristics(TableCharacteristics::AppendOnly)
39//!     .join_type(TemporalJoinType::Inner)
40//!     .build();
41//!
42//! let operator = TemporalJoinOperator::new(config);
43//! ```
44//!
45//! ## SQL Syntax (Future)
46//!
47//! ```sql
48//! -- Event-time temporal join
49//! SELECT o.*, r.rate
50//! FROM orders o
51//! JOIN currency_rates FOR SYSTEM_TIME AS OF o.order_time r
52//!     ON o.currency = r.currency;
53//!
54//! -- Process-time temporal join (latest value)
55//! SELECT o.*, c.tier
56//! FROM orders o
57//! JOIN customers FOR SYSTEM_TIME AS OF PROCTIME() c
58//!     ON o.customer_id = c.id;
59//! ```
60
61use super::{
62    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
63    TimerKey,
64};
65use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
66use arrow_schema::{DataType, Field, Schema, SchemaRef};
67use fxhash::FxHashMap;
68use rkyv::{
69    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
70};
71use smallvec::SmallVec;
72use std::collections::BTreeMap;
73use std::sync::atomic::{AtomicU64, Ordering};
74use std::sync::Arc;
75
76/// Type of temporal join semantics.
77#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
78pub enum TemporalJoinSemantics {
79    /// Event-time: Lookup value valid at event timestamp.
80    /// Deterministic, requires versioned table.
81    #[default]
82    EventTime,
83
84    /// Process-time: Lookup current value at processing time.
85    /// Non-deterministic, simpler but less predictable.
86    ProcessTime,
87}
88
89/// Table update characteristics.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
91pub enum TableCharacteristics {
92    /// Append-only: Only inserts, no updates/deletes.
93    /// Optimization: No state needed on streaming side.
94    #[default]
95    AppendOnly,
96
97    /// Non-append-only: Has updates and/or deletes.
98    /// Requires state to track which rows were joined.
99    NonAppendOnly,
100}
101
102/// Type of temporal join to perform.
103#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
104pub enum TemporalJoinType {
105    /// Inner join - only emit when a match is found.
106    #[default]
107    Inner,
108    /// Left outer join - emit all stream events, with nulls for unmatched.
109    Left,
110}
111
112impl TemporalJoinType {
113    /// Returns true if unmatched stream events should be emitted.
114    #[must_use]
115    pub fn emits_unmatched(&self) -> bool {
116        matches!(self, TemporalJoinType::Left)
117    }
118}
119
120/// Configuration for a temporal join operator.
121#[derive(Debug, Clone)]
122pub struct TemporalJoinConfig {
123    /// Column name in the stream to use as lookup key.
124    pub stream_key_column: String,
125    /// Column name in the table that matches the stream key.
126    pub table_key_column: String,
127    /// Column name for the version timestamp in the table.
128    pub table_version_column: String,
129    /// Join semantics (event-time or process-time).
130    pub semantics: TemporalJoinSemantics,
131    /// Table characteristics (append-only or non-append-only).
132    pub table_characteristics: TableCharacteristics,
133    /// Type of join to perform.
134    pub join_type: TemporalJoinType,
135    /// Operator ID for checkpointing.
136    pub operator_id: Option<String>,
137    /// Maximum number of versions to retain per key (0 = unlimited).
138    pub max_versions_per_key: usize,
139}
140
141impl TemporalJoinConfig {
142    /// Creates a new builder for temporal join configuration.
143    #[must_use]
144    pub fn builder() -> TemporalJoinConfigBuilder {
145        TemporalJoinConfigBuilder::default()
146    }
147}
148
149/// Builder for [`TemporalJoinConfig`].
150#[derive(Debug, Default)]
151pub struct TemporalJoinConfigBuilder {
152    stream_key_column: Option<String>,
153    table_key_column: Option<String>,
154    table_version_column: Option<String>,
155    semantics: Option<TemporalJoinSemantics>,
156    table_characteristics: Option<TableCharacteristics>,
157    join_type: Option<TemporalJoinType>,
158    operator_id: Option<String>,
159    max_versions_per_key: Option<usize>,
160}
161
162impl TemporalJoinConfigBuilder {
163    /// Sets the stream key column name.
164    #[must_use]
165    pub fn stream_key_column(mut self, column: String) -> Self {
166        self.stream_key_column = Some(column);
167        self
168    }
169
170    /// Sets the table key column name.
171    #[must_use]
172    pub fn table_key_column(mut self, column: String) -> Self {
173        self.table_key_column = Some(column);
174        self
175    }
176
177    /// Sets the table version column name.
178    #[must_use]
179    pub fn table_version_column(mut self, column: String) -> Self {
180        self.table_version_column = Some(column);
181        self
182    }
183
184    /// Sets the join semantics.
185    #[must_use]
186    pub fn semantics(mut self, semantics: TemporalJoinSemantics) -> Self {
187        self.semantics = Some(semantics);
188        self
189    }
190
191    /// Sets the table characteristics.
192    #[must_use]
193    pub fn table_characteristics(mut self, characteristics: TableCharacteristics) -> Self {
194        self.table_characteristics = Some(characteristics);
195        self
196    }
197
198    /// Sets the join type.
199    #[must_use]
200    pub fn join_type(mut self, join_type: TemporalJoinType) -> Self {
201        self.join_type = Some(join_type);
202        self
203    }
204
205    /// Sets a custom operator ID.
206    #[must_use]
207    pub fn operator_id(mut self, id: String) -> Self {
208        self.operator_id = Some(id);
209        self
210    }
211
212    /// Sets the maximum number of versions to retain per key.
213    #[must_use]
214    pub fn max_versions_per_key(mut self, max: usize) -> Self {
215        self.max_versions_per_key = Some(max);
216        self
217    }
218
219    /// Builds the configuration.
220    ///
221    /// # Panics
222    ///
223    /// Panics if required fields are not set.
224    #[must_use]
225    pub fn build(self) -> TemporalJoinConfig {
226        TemporalJoinConfig {
227            stream_key_column: self
228                .stream_key_column
229                .expect("stream_key_column is required"),
230            table_key_column: self.table_key_column.expect("table_key_column is required"),
231            table_version_column: self
232                .table_version_column
233                .expect("table_version_column is required"),
234            semantics: self.semantics.unwrap_or_default(),
235            table_characteristics: self.table_characteristics.unwrap_or_default(),
236            join_type: self.join_type.unwrap_or_default(),
237            operator_id: self.operator_id,
238            max_versions_per_key: self.max_versions_per_key.unwrap_or(0),
239        }
240    }
241}
242
243/// Timer key prefix for version cleanup.
244const TEMPORAL_TIMER_PREFIX: u8 = 0x60;
245
246/// Static counter for generating unique operator IDs.
247static TEMPORAL_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
248
249/// A stored table row for temporal joining.
250#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
251pub struct TableRow {
252    /// Version timestamp (when this row became valid).
253    pub version_timestamp: i64,
254    /// Serialized key value.
255    pub key_value: Vec<u8>,
256    /// Serialized record batch data.
257    pub data: Vec<u8>,
258}
259
260impl TableRow {
261    /// Creates a new table row from a record batch.
262    fn new(
263        version_timestamp: i64,
264        key_value: Vec<u8>,
265        batch: &RecordBatch,
266    ) -> Result<Self, OperatorError> {
267        let data = Self::serialize_batch(batch)?;
268        Ok(Self {
269            version_timestamp,
270            key_value,
271            data,
272        })
273    }
274
275    /// Serializes a record batch to bytes.
276    fn serialize_batch(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
277        let mut buf = Vec::new();
278        {
279            let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
280                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
281            writer
282                .write(batch)
283                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
284            writer
285                .finish()
286                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
287        }
288        Ok(buf)
289    }
290
291    /// Deserializes a record batch from bytes.
292    fn deserialize_batch(data: &[u8]) -> Result<RecordBatch, OperatorError> {
293        let cursor = std::io::Cursor::new(data);
294        let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
295            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
296        reader
297            .next()
298            .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
299            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
300    }
301
302    /// Converts this row back to a record batch.
303    ///
304    /// # Errors
305    ///
306    /// Returns `OperatorError::SerializationFailed` if the batch data is invalid.
307    pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
308        Self::deserialize_batch(&self.data)
309    }
310}
311
312/// Record of a joined event for retraction tracking (non-append-only tables).
313#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
314pub struct JoinedEventRecord {
315    /// Original stream event timestamp.
316    pub event_timestamp: i64,
317    /// Serialized stream event data.
318    pub event_data: Vec<u8>,
319    /// Table row version that was joined.
320    pub table_version: i64,
321    /// The key value used for joining.
322    pub key_value: Vec<u8>,
323}
324
325/// Per-key versioned table state.
326#[derive(Debug, Clone, Default)]
327pub struct VersionedKeyState {
328    /// Rows indexed by version timestamp.
329    /// Multiple rows at the same timestamp stored in a vector.
330    pub versions: BTreeMap<i64, SmallVec<[TableRow; 1]>>,
331    /// Minimum version timestamp.
332    pub min_version: i64,
333    /// Maximum version timestamp.
334    pub max_version: i64,
335}
336
337impl VersionedKeyState {
338    /// Creates a new empty key state.
339    #[must_use]
340    pub fn new() -> Self {
341        Self {
342            versions: BTreeMap::new(),
343            min_version: i64::MAX,
344            max_version: i64::MIN,
345        }
346    }
347
348    /// Inserts a table row.
349    pub fn insert(&mut self, row: TableRow) {
350        let version = row.version_timestamp;
351        self.versions.entry(version).or_default().push(row);
352        self.min_version = self.min_version.min(version);
353        self.max_version = self.max_version.max(version);
354    }
355
356    /// Returns the number of rows in this key's state.
357    #[must_use]
358    pub fn len(&self) -> usize {
359        self.versions.values().map(SmallVec::len).sum()
360    }
361
362    /// Returns true if this key has no rows.
363    #[must_use]
364    pub fn is_empty(&self) -> bool {
365        self.versions.is_empty()
366    }
367
368    /// Finds the row valid at the given timestamp.
369    /// Returns the row with the largest `version_ts` <= `event_ts`.
370    #[must_use]
371    pub fn lookup_at_time(&self, timestamp: i64) -> Option<&TableRow> {
372        let (_, rows) = self.versions.range(..=timestamp).next_back()?;
373        rows.last()
374    }
375
376    /// Finds the latest row (for process-time lookups).
377    #[must_use]
378    pub fn lookup_latest(&self) -> Option<&TableRow> {
379        let (_, rows) = self.versions.iter().next_back()?;
380        rows.last()
381    }
382
383    /// Removes versions before the given timestamp.
384    pub fn cleanup_before(&mut self, threshold: i64) {
385        self.versions = self.versions.split_off(&threshold);
386        self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
387    }
388
389    /// Removes a specific version (for non-append-only table deletes).
390    pub fn remove_version(&mut self, version: i64) -> Option<SmallVec<[TableRow; 1]>> {
391        let removed = self.versions.remove(&version);
392        if removed.is_some() {
393            self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
394            self.max_version = self
395                .versions
396                .keys()
397                .next_back()
398                .copied()
399                .unwrap_or(i64::MIN);
400        }
401        removed
402    }
403
404    /// Limits the number of versions (keeps most recent).
405    pub fn limit_versions(&mut self, max_versions: usize) {
406        if max_versions == 0 || self.versions.len() <= max_versions {
407            return;
408        }
409
410        let to_remove = self.versions.len() - max_versions;
411        let keys_to_remove: Vec<_> = self.versions.keys().take(to_remove).copied().collect();
412        for key in keys_to_remove {
413            self.versions.remove(&key);
414        }
415        self.min_version = self.versions.keys().next().copied().unwrap_or(i64::MAX);
416    }
417}
418
419/// Serializable version of `VersionedKeyState` for checkpointing.
420#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
421struct SerializableVersionedKeyState {
422    versions: Vec<(i64, Vec<TableRow>)>,
423    min_version: i64,
424    max_version: i64,
425}
426
427impl From<&VersionedKeyState> for SerializableVersionedKeyState {
428    fn from(state: &VersionedKeyState) -> Self {
429        Self {
430            versions: state
431                .versions
432                .iter()
433                .map(|(ts, rows)| (*ts, rows.to_vec()))
434                .collect(),
435            min_version: state.min_version,
436            max_version: state.max_version,
437        }
438    }
439}
440
441impl From<SerializableVersionedKeyState> for VersionedKeyState {
442    fn from(state: SerializableVersionedKeyState) -> Self {
443        let mut versions = BTreeMap::new();
444        for (ts, rows) in state.versions {
445            versions.insert(ts, SmallVec::from_vec(rows));
446        }
447        Self {
448            versions,
449            min_version: state.min_version,
450            max_version: state.max_version,
451        }
452    }
453}
454
455/// Table change event for non-append-only tables.
456#[derive(Debug, Clone)]
457pub enum TableChange {
458    /// Insert a new row.
459    Insert(TableRow),
460    /// Update an existing row (provides old and new versions).
461    Update {
462        /// The old row being replaced.
463        old: TableRow,
464        /// The new row.
465        new: TableRow,
466    },
467    /// Delete a row.
468    Delete(TableRow),
469}
470
471/// Metrics for tracking temporal join operations.
472#[derive(Debug, Clone, Default)]
473pub struct TemporalJoinMetrics {
474    /// Number of stream events processed.
475    pub stream_events: u64,
476    /// Number of table inserts processed.
477    pub table_inserts: u64,
478    /// Number of table updates processed.
479    pub table_updates: u64,
480    /// Number of table deletes processed.
481    pub table_deletes: u64,
482    /// Number of successful matches.
483    pub matches: u64,
484    /// Number of unmatched stream events.
485    pub unmatched: u64,
486    /// Number of retractions emitted.
487    pub retractions: u64,
488    /// Number of state cleanup operations.
489    pub state_cleanups: u64,
490}
491
492impl TemporalJoinMetrics {
493    /// Creates new metrics.
494    #[must_use]
495    pub fn new() -> Self {
496        Self::default()
497    }
498
499    /// Resets all counters.
500    pub fn reset(&mut self) {
501        *self = Self::default();
502    }
503}
504
505/// Temporal join operator.
506///
507/// Performs point-in-time lookups against a versioned table. Supports both
508/// append-only tables (optimized, no stream-side state) and non-append-only
509/// tables (with retraction support).
510pub struct TemporalJoinOperator {
511    /// Configuration.
512    config: TemporalJoinConfig,
513    /// Operator ID.
514    operator_id: String,
515    /// Versioned table state: key -> versions.
516    table_state: FxHashMap<Vec<u8>, VersionedKeyState>,
517    /// Stream event state for retraction tracking (non-append-only only).
518    /// key -> list of joined event records.
519    stream_state: FxHashMap<Vec<u8>, Vec<JoinedEventRecord>>,
520    /// Current watermark.
521    watermark: i64,
522    /// Metrics.
523    metrics: TemporalJoinMetrics,
524    /// Output schema (lazily initialized).
525    output_schema: Option<SchemaRef>,
526    /// Stream schema.
527    stream_schema: Option<SchemaRef>,
528    /// Table schema.
529    table_schema: Option<SchemaRef>,
530}
531
532impl TemporalJoinOperator {
533    /// Creates a new temporal join operator.
534    #[must_use]
535    pub fn new(config: TemporalJoinConfig) -> Self {
536        let operator_id = config.operator_id.clone().unwrap_or_else(|| {
537            let num = TEMPORAL_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
538            format!("temporal_join_{num}")
539        });
540
541        Self {
542            config,
543            operator_id,
544            table_state: FxHashMap::default(),
545            stream_state: FxHashMap::default(),
546            watermark: i64::MIN,
547            metrics: TemporalJoinMetrics::new(),
548            output_schema: None,
549            stream_schema: None,
550            table_schema: None,
551        }
552    }
553
554    /// Creates a new temporal join operator with explicit ID.
555    #[must_use]
556    pub fn with_id(mut config: TemporalJoinConfig, operator_id: String) -> Self {
557        config.operator_id = Some(operator_id);
558        Self::new(config)
559    }
560
561    /// Returns the configuration.
562    #[must_use]
563    pub fn config(&self) -> &TemporalJoinConfig {
564        &self.config
565    }
566
567    /// Returns the metrics.
568    #[must_use]
569    pub fn metrics(&self) -> &TemporalJoinMetrics {
570        &self.metrics
571    }
572
573    /// Resets the metrics.
574    pub fn reset_metrics(&mut self) {
575        self.metrics.reset();
576    }
577
578    /// Returns the current watermark.
579    #[must_use]
580    pub fn watermark(&self) -> i64 {
581        self.watermark
582    }
583
584    /// Returns the total number of table rows in state.
585    #[must_use]
586    pub fn table_state_size(&self) -> usize {
587        self.table_state.values().map(VersionedKeyState::len).sum()
588    }
589
590    /// Returns the total number of tracked stream events (non-append-only only).
591    #[must_use]
592    pub fn stream_state_size(&self) -> usize {
593        self.stream_state.values().map(Vec::len).sum()
594    }
595
596    /// Processes a stream event (probe side).
597    pub fn process_stream(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
598        self.metrics.stream_events += 1;
599
600        // Capture stream schema
601        if self.stream_schema.is_none() {
602            self.stream_schema = Some(event.data.schema());
603            self.update_output_schema();
604        }
605
606        let mut output = OutputVec::new();
607
608        // Extract join key
609        let Some(key_value) = Self::extract_key(&event.data, &self.config.stream_key_column) else {
610            return output;
611        };
612
613        // Determine lookup timestamp
614        let lookup_ts = match self.config.semantics {
615            TemporalJoinSemantics::EventTime => event.timestamp,
616            TemporalJoinSemantics::ProcessTime => ctx.processing_time,
617        };
618
619        // Find matching table row
620        if let Some(table_row) = self.lookup_table(&key_value, lookup_ts) {
621            self.metrics.matches += 1;
622
623            // Track join for potential retraction (non-append-only)
624            if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
625                if let Ok(event_data) = TableRow::serialize_batch(&event.data) {
626                    let record = JoinedEventRecord {
627                        event_timestamp: event.timestamp,
628                        event_data,
629                        table_version: table_row.version_timestamp,
630                        key_value: key_value.clone(),
631                    };
632                    self.stream_state.entry(key_value).or_default().push(record);
633                }
634            }
635
636            // Create joined output
637            if let Some(joined) = self.create_joined_event(event, &table_row) {
638                output.push(Output::Event(joined));
639            }
640        } else {
641            self.metrics.unmatched += 1;
642            if self.config.join_type.emits_unmatched() {
643                if let Some(unmatched) = self.create_unmatched_event(event) {
644                    output.push(Output::Event(unmatched));
645                }
646            }
647        }
648
649        output
650    }
651
652    /// Processes a table insert (append-only or non-append-only).
653    pub fn process_table_insert(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
654        self.metrics.table_inserts += 1;
655
656        // Capture table schema
657        if self.table_schema.is_none() {
658            self.table_schema = Some(event.data.schema());
659            self.update_output_schema();
660        }
661
662        // Extract key and version
663        let Some(key_value) = Self::extract_key(&event.data, &self.config.table_key_column) else {
664            return OutputVec::new();
665        };
666
667        let version_ts = Self::extract_timestamp(&event.data, &self.config.table_version_column)
668            .unwrap_or(event.timestamp);
669
670        // Create and store table row
671        let Ok(row) = TableRow::new(version_ts, key_value.clone(), &event.data) else {
672            return OutputVec::new();
673        };
674
675        let key_state = self.table_state.entry(key_value).or_default();
676        key_state.insert(row);
677
678        // Limit versions if configured
679        if self.config.max_versions_per_key > 0 {
680            key_state.limit_versions(self.config.max_versions_per_key);
681        }
682
683        OutputVec::new()
684    }
685
686    /// Processes a table change (non-append-only tables).
687    /// Returns retractions and new join results for affected stream events.
688    pub fn process_table_change(
689        &mut self,
690        change: &TableChange,
691        _ctx: &mut OperatorContext,
692    ) -> OutputVec {
693        if self.config.table_characteristics != TableCharacteristics::NonAppendOnly {
694            // For append-only tables, only inserts are valid
695            if let TableChange::Insert(row) = change {
696                let key_state = self.table_state.entry(row.key_value.clone()).or_default();
697                key_state.insert(row.clone());
698            }
699            return OutputVec::new();
700        }
701
702        let mut output = OutputVec::new();
703
704        match change {
705            TableChange::Insert(row) => {
706                self.metrics.table_inserts += 1;
707                let key_state = self.table_state.entry(row.key_value.clone()).or_default();
708                key_state.insert(row.clone());
709            }
710            TableChange::Update { old, new } => {
711                self.metrics.table_updates += 1;
712
713                // Emit retractions for events that joined with old version
714                self.emit_retractions_for_version(
715                    &old.key_value,
716                    old.version_timestamp,
717                    &mut output,
718                );
719
720                // Update table state
721                if let Some(key_state) = self.table_state.get_mut(&old.key_value) {
722                    key_state.remove_version(old.version_timestamp);
723                }
724                let key_state = self.table_state.entry(new.key_value.clone()).or_default();
725                key_state.insert(new.clone());
726
727                // Re-emit joins for affected events with new version
728                self.rejoin_affected_events(&new.key_value, new.version_timestamp, &mut output);
729            }
730            TableChange::Delete(row) => {
731                self.metrics.table_deletes += 1;
732
733                // Emit retractions for events that joined with deleted version
734                self.emit_retractions_for_version(
735                    &row.key_value,
736                    row.version_timestamp,
737                    &mut output,
738                );
739
740                // Remove from table state
741                if let Some(key_state) = self.table_state.get_mut(&row.key_value) {
742                    key_state.remove_version(row.version_timestamp);
743                }
744            }
745        }
746
747        output
748    }
749
750    /// Emits retractions for all stream events that joined with a specific table version.
751    fn emit_retractions_for_version(&mut self, key: &[u8], version: i64, output: &mut OutputVec) {
752        let Some(records) = self.stream_state.get(key) else {
753            return;
754        };
755
756        for record in records {
757            if record.table_version == version {
758                // Emit retraction (reconstruct the joined event and mark as retraction)
759                if let Ok(event_batch) = TableRow::deserialize_batch(&record.event_data) {
760                    let event = Event::new(record.event_timestamp, event_batch);
761
762                    // Look up the old table row to reconstruct the join
763                    if let Some(key_state) = self.table_state.get(key) {
764                        if let Some((_, rows)) = key_state.versions.get_key_value(&version) {
765                            if let Some(table_row) = rows.last() {
766                                if let Some(joined) = self.create_joined_event(&event, table_row) {
767                                    // Emit as late event with retraction semantic
768                                    // In a full implementation, this would be Output::Retraction
769                                    output.push(Output::LateEvent(joined));
770                                    self.metrics.retractions += 1;
771                                }
772                            }
773                        }
774                    }
775                }
776            }
777        }
778    }
779
780    /// Re-joins affected stream events with a new table version.
781    fn rejoin_affected_events(&mut self, key: &[u8], new_version: i64, output: &mut OutputVec) {
782        // Collect events that need re-joining
783        let events_to_rejoin: Vec<(i64, Vec<u8>)> = {
784            let Some(records) = self.stream_state.get(key) else {
785                return;
786            };
787            let Some(key_state) = self.table_state.get(key) else {
788                return;
789            };
790
791            records
792                .iter()
793                .filter_map(|record| {
794                    let lookup_ts = record.event_timestamp;
795                    if let Some(new_row) = key_state.lookup_at_time(lookup_ts) {
796                        if new_row.version_timestamp == new_version {
797                            return Some((record.event_timestamp, record.event_data.clone()));
798                        }
799                    }
800                    None
801                })
802                .collect()
803        };
804
805        // Now emit the rejoined events
806        if let Some(key_state) = self.table_state.get(key) {
807            for (event_ts, event_data) in &events_to_rejoin {
808                if let Ok(event_batch) = TableRow::deserialize_batch(event_data) {
809                    let event = Event::new(*event_ts, event_batch);
810                    if let Some(new_row) = key_state.lookup_at_time(*event_ts) {
811                        if let Some(joined) = self.create_joined_event(&event, new_row) {
812                            output.push(Output::Event(joined));
813                        }
814                    }
815                }
816            }
817        }
818
819        // Update tracked versions
820        if let Some(records) = self.stream_state.get_mut(key) {
821            for record in records.iter_mut() {
822                if events_to_rejoin
823                    .iter()
824                    .any(|(ts, _)| *ts == record.event_timestamp)
825                {
826                    record.table_version = new_version;
827                }
828            }
829        }
830    }
831
832    /// Looks up a table row for the given key and timestamp.
833    fn lookup_table(&self, key: &[u8], timestamp: i64) -> Option<TableRow> {
834        let key_state = self.table_state.get(key)?;
835
836        match self.config.semantics {
837            TemporalJoinSemantics::EventTime => key_state.lookup_at_time(timestamp).cloned(),
838            TemporalJoinSemantics::ProcessTime => key_state.lookup_latest().cloned(),
839        }
840    }
841
842    /// Handles watermark updates and triggers cleanup.
843    pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
844        self.watermark = watermark;
845
846        // Cleanup old stream state for non-append-only
847        if self.config.table_characteristics == TableCharacteristics::NonAppendOnly {
848            self.cleanup_stream_state(watermark);
849        }
850
851        OutputVec::new()
852    }
853
854    /// Cleans up stream state for events that can no longer receive retractions.
855    fn cleanup_stream_state(&mut self, watermark: i64) {
856        let initial_count: usize = self.stream_state.values().map(Vec::len).sum();
857
858        for records in self.stream_state.values_mut() {
859            records.retain(|r| r.event_timestamp >= watermark);
860        }
861        self.stream_state.retain(|_, v| !v.is_empty());
862
863        let final_count: usize = self.stream_state.values().map(Vec::len).sum();
864        if final_count < initial_count {
865            self.metrics.state_cleanups += (initial_count - final_count) as u64;
866        }
867    }
868
869    /// Extracts the key value from a record batch.
870    fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
871        let column_index = batch.schema().index_of(column_name).ok()?;
872        let column = batch.column(column_index);
873
874        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
875            if string_array.is_empty() || string_array.is_null(0) {
876                return None;
877            }
878            return Some(string_array.value(0).as_bytes().to_vec());
879        }
880
881        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
882            if int_array.is_empty() || int_array.is_null(0) {
883                return None;
884            }
885            return Some(int_array.value(0).to_le_bytes().to_vec());
886        }
887
888        None
889    }
890
891    /// Extracts a timestamp value from a record batch.
892    fn extract_timestamp(batch: &RecordBatch, column_name: &str) -> Option<i64> {
893        let column_index = batch.schema().index_of(column_name).ok()?;
894        let column = batch.column(column_index);
895
896        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
897            if int_array.is_empty() || int_array.is_null(0) {
898                return None;
899            }
900            return Some(int_array.value(0));
901        }
902
903        None
904    }
905
906    /// Creates a timer key for cleanup.
907    /// Reserved for future use when timer-based cleanup is implemented.
908    #[allow(dead_code)]
909    fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
910        let mut key = TimerKey::new();
911        key.push(TEMPORAL_TIMER_PREFIX);
912        key.extend_from_slice(key_suffix);
913        key
914    }
915
916    /// Updates the output schema when both input schemas are known.
917    fn update_output_schema(&mut self) {
918        if let (Some(stream), Some(table)) = (&self.stream_schema, &self.table_schema) {
919            let mut fields: Vec<Field> =
920                stream.fields().iter().map(|f| f.as_ref().clone()).collect();
921
922            // Add table fields, prefixing duplicates
923            for field in table.fields() {
924                let name = if stream.field_with_name(field.name()).is_ok() {
925                    format!("table_{}", field.name())
926                } else {
927                    field.name().clone()
928                };
929                fields.push(Field::new(
930                    name,
931                    field.data_type().clone(),
932                    true, // Nullable for outer joins
933                ));
934            }
935
936            self.output_schema = Some(Arc::new(Schema::new(fields)));
937        }
938    }
939
940    /// Creates a joined event from stream event and table row.
941    fn create_joined_event(&self, stream_event: &Event, table_row: &TableRow) -> Option<Event> {
942        let schema = self.output_schema.as_ref()?;
943        let table_batch = table_row.to_batch().ok()?;
944
945        let mut columns: Vec<ArrayRef> = stream_event.data.columns().to_vec();
946        for column in table_batch.columns() {
947            columns.push(Arc::clone(column));
948        }
949
950        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
951
952        Some(Event::new(stream_event.timestamp, joined_batch))
953    }
954
955    /// Creates an unmatched event for left outer joins (with null table columns).
956    fn create_unmatched_event(&self, stream_event: &Event) -> Option<Event> {
957        let schema = self.output_schema.as_ref()?;
958        let table_schema = self.table_schema.as_ref()?;
959
960        let num_rows = stream_event.data.num_rows();
961        let mut columns: Vec<ArrayRef> = stream_event.data.columns().to_vec();
962
963        // Add null columns for table side
964        for field in table_schema.fields() {
965            columns.push(Self::create_null_array(field.data_type(), num_rows));
966        }
967
968        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
969
970        Some(Event::new(stream_event.timestamp, joined_batch))
971    }
972
973    /// Creates a null array of the given type and length.
974    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
975        match data_type {
976            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
977            DataType::Float64 => {
978                use arrow_array::Float64Array;
979                Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
980            }
981            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
982        }
983    }
984}
985
986impl Operator for TemporalJoinOperator {
987    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
988        // Default to processing as stream event
989        self.process_stream(event, ctx)
990    }
991
992    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
993        if timer.key.first() == Some(&TEMPORAL_TIMER_PREFIX) {
994            // Cleanup triggered
995            self.cleanup_stream_state(timer.timestamp);
996        }
997        OutputVec::new()
998    }
999
1000    fn checkpoint(&self) -> OperatorState {
1001        // Serialize table state
1002        let table_entries: Vec<(Vec<u8>, SerializableVersionedKeyState)> = self
1003            .table_state
1004            .iter()
1005            .map(|(k, v)| (k.clone(), v.into()))
1006            .collect();
1007
1008        // Serialize stream state (for non-append-only)
1009        let stream_entries: Vec<(Vec<u8>, Vec<JoinedEventRecord>)> = self
1010            .stream_state
1011            .iter()
1012            .map(|(k, v)| (k.clone(), v.clone()))
1013            .collect();
1014
1015        let checkpoint_data = (
1016            self.watermark,
1017            self.metrics.stream_events,
1018            self.metrics.table_inserts,
1019            self.metrics.matches,
1020            self.metrics.unmatched,
1021            self.metrics.retractions,
1022            table_entries,
1023            stream_entries,
1024        );
1025
1026        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
1027            .map(|v| v.to_vec())
1028            .unwrap_or_default();
1029
1030        OperatorState {
1031            operator_id: self.operator_id.clone(),
1032            data,
1033        }
1034    }
1035
1036    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
1037        type CheckpointData = (
1038            i64,
1039            u64,
1040            u64,
1041            u64,
1042            u64,
1043            u64,
1044            Vec<(Vec<u8>, SerializableVersionedKeyState)>,
1045            Vec<(Vec<u8>, Vec<JoinedEventRecord>)>,
1046        );
1047
1048        if state.operator_id != self.operator_id {
1049            return Err(OperatorError::StateAccessFailed(format!(
1050                "Operator ID mismatch: expected {}, got {}",
1051                self.operator_id, state.operator_id
1052            )));
1053        }
1054
1055        let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
1056            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1057        let (
1058            watermark,
1059            stream_events,
1060            table_inserts,
1061            matches,
1062            unmatched,
1063            retractions,
1064            table_entries,
1065            stream_entries,
1066        ) = rkyv::deserialize::<CheckpointData, RkyvError>(archived)
1067            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
1068
1069        self.watermark = watermark;
1070        self.metrics.stream_events = stream_events;
1071        self.metrics.table_inserts = table_inserts;
1072        self.metrics.matches = matches;
1073        self.metrics.unmatched = unmatched;
1074        self.metrics.retractions = retractions;
1075
1076        // Restore table state
1077        self.table_state.clear();
1078        for (key, serializable) in table_entries {
1079            self.table_state.insert(key, serializable.into());
1080        }
1081
1082        // Restore stream state
1083        self.stream_state.clear();
1084        for (key, records) in stream_entries {
1085            self.stream_state.insert(key, records);
1086        }
1087
1088        Ok(())
1089    }
1090}
1091
1092#[cfg(test)]
1093#[allow(clippy::cast_precision_loss)]
1094#[allow(clippy::unnecessary_to_owned)]
1095mod tests {
1096    use super::*;
1097    use crate::state::{InMemoryStore, StateStore};
1098    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
1099    use arrow_array::Float64Array;
1100    use arrow_schema::{DataType, Field, Schema};
1101
1102    /// Creates an order event for testing.
1103    fn create_order_event(timestamp: i64, currency: &str, amount: f64) -> Event {
1104        let schema = Arc::new(Schema::new(vec![
1105            Field::new("currency", DataType::Utf8, false),
1106            Field::new("amount", DataType::Float64, false),
1107        ]));
1108        let batch = RecordBatch::try_new(
1109            schema,
1110            vec![
1111                Arc::new(StringArray::from(vec![currency])),
1112                Arc::new(Float64Array::from(vec![amount])),
1113            ],
1114        )
1115        .unwrap();
1116        Event::new(timestamp, batch)
1117    }
1118
1119    /// Creates a currency rate event for testing.
1120    fn create_rate_event(timestamp: i64, currency: &str, rate: f64, valid_from: i64) -> Event {
1121        let schema = Arc::new(Schema::new(vec![
1122            Field::new("currency", DataType::Utf8, false),
1123            Field::new("rate", DataType::Float64, false),
1124            Field::new("valid_from", DataType::Int64, false),
1125        ]));
1126        let batch = RecordBatch::try_new(
1127            schema,
1128            vec![
1129                Arc::new(StringArray::from(vec![currency])),
1130                Arc::new(Float64Array::from(vec![rate])),
1131                Arc::new(Int64Array::from(vec![valid_from])),
1132            ],
1133        )
1134        .unwrap();
1135        Event::new(timestamp, batch)
1136    }
1137
1138    fn create_test_context<'a>(
1139        timers: &'a mut TimerService,
1140        state: &'a mut dyn StateStore,
1141        watermark_gen: &'a mut dyn WatermarkGenerator,
1142    ) -> OperatorContext<'a> {
1143        OperatorContext {
1144            event_time: 0,
1145            processing_time: 0,
1146            timers,
1147            state,
1148            watermark_generator: watermark_gen,
1149            operator_index: 0,
1150        }
1151    }
1152
1153    #[test]
1154    fn test_temporal_join_semantics_default() {
1155        assert_eq!(
1156            TemporalJoinSemantics::default(),
1157            TemporalJoinSemantics::EventTime
1158        );
1159    }
1160
1161    #[test]
1162    fn test_table_characteristics_default() {
1163        assert_eq!(
1164            TableCharacteristics::default(),
1165            TableCharacteristics::AppendOnly
1166        );
1167    }
1168
1169    #[test]
1170    fn test_temporal_join_type_properties() {
1171        assert!(!TemporalJoinType::Inner.emits_unmatched());
1172        assert!(TemporalJoinType::Left.emits_unmatched());
1173    }
1174
1175    #[test]
1176    fn test_config_builder() {
1177        let config = TemporalJoinConfig::builder()
1178            .stream_key_column("currency".to_string())
1179            .table_key_column("currency".to_string())
1180            .table_version_column("valid_from".to_string())
1181            .semantics(TemporalJoinSemantics::EventTime)
1182            .table_characteristics(TableCharacteristics::AppendOnly)
1183            .join_type(TemporalJoinType::Left)
1184            .max_versions_per_key(100)
1185            .operator_id("test_temporal".to_string())
1186            .build();
1187
1188        assert_eq!(config.stream_key_column, "currency");
1189        assert_eq!(config.table_key_column, "currency");
1190        assert_eq!(config.table_version_column, "valid_from");
1191        assert_eq!(config.semantics, TemporalJoinSemantics::EventTime);
1192        assert_eq!(
1193            config.table_characteristics,
1194            TableCharacteristics::AppendOnly
1195        );
1196        assert_eq!(config.join_type, TemporalJoinType::Left);
1197        assert_eq!(config.max_versions_per_key, 100);
1198    }
1199
1200    #[test]
1201    fn test_event_time_temporal_join_basic() {
1202        let config = TemporalJoinConfig::builder()
1203            .stream_key_column("currency".to_string())
1204            .table_key_column("currency".to_string())
1205            .table_version_column("valid_from".to_string())
1206            .semantics(TemporalJoinSemantics::EventTime)
1207            .join_type(TemporalJoinType::Inner)
1208            .build();
1209
1210        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1211
1212        let mut timers = TimerService::new();
1213        let mut state = InMemoryStore::new();
1214        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1215
1216        // Insert rate versions
1217        // Rate 1.1 valid from t=500
1218        let rate1 = create_rate_event(500, "USD", 1.1, 500);
1219        {
1220            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1221            operator.process_table_insert(&rate1, &mut ctx);
1222        }
1223
1224        // Rate 1.2 valid from t=800
1225        let rate2 = create_rate_event(800, "USD", 1.2, 800);
1226        {
1227            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1228            operator.process_table_insert(&rate2, &mut ctx);
1229        }
1230
1231        // Rate 1.3 valid from t=1200
1232        let rate3 = create_rate_event(1200, "USD", 1.3, 1200);
1233        {
1234            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1235            operator.process_table_insert(&rate3, &mut ctx);
1236        }
1237
1238        // Order at t=1000 should join with rate 1.2 (valid from t=800)
1239        let order = create_order_event(1000, "USD", 100.0);
1240        let outputs = {
1241            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1242            operator.process_stream(&order, &mut ctx)
1243        };
1244
1245        assert_eq!(
1246            outputs
1247                .iter()
1248                .filter(|o| matches!(o, Output::Event(_)))
1249                .count(),
1250            1
1251        );
1252        assert_eq!(operator.metrics().matches, 1);
1253
1254        // Verify output has both order and rate columns
1255        if let Some(Output::Event(event)) = outputs.first() {
1256            assert_eq!(event.data.num_columns(), 5); // 2 order + 3 rate
1257        }
1258    }
1259
1260    #[test]
1261    fn test_event_time_multiple_versions() {
1262        let config = TemporalJoinConfig::builder()
1263            .stream_key_column("currency".to_string())
1264            .table_key_column("currency".to_string())
1265            .table_version_column("valid_from".to_string())
1266            .semantics(TemporalJoinSemantics::EventTime)
1267            .join_type(TemporalJoinType::Inner)
1268            .build();
1269
1270        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1271
1272        let mut timers = TimerService::new();
1273        let mut state = InMemoryStore::new();
1274        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1275
1276        // Insert multiple rate versions
1277        {
1278            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1279            operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1280            operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1281            operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1282        }
1283
1284        // Order at t=150 should join with rate 1.0 (valid from t=100)
1285        let order1 = create_order_event(150, "USD", 100.0);
1286        let outputs1 = {
1287            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1288            operator.process_stream(&order1, &mut ctx)
1289        };
1290        assert_eq!(
1291            outputs1
1292                .iter()
1293                .filter(|o| matches!(o, Output::Event(_)))
1294                .count(),
1295            1
1296        );
1297
1298        // Order at t=250 should join with rate 1.1 (valid from t=200)
1299        let order2 = create_order_event(250, "USD", 100.0);
1300        let outputs2 = {
1301            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1302            operator.process_stream(&order2, &mut ctx)
1303        };
1304        assert_eq!(
1305            outputs2
1306                .iter()
1307                .filter(|o| matches!(o, Output::Event(_)))
1308                .count(),
1309            1
1310        );
1311
1312        // Order at t=350 should join with rate 1.2 (valid from t=300)
1313        let order3 = create_order_event(350, "USD", 100.0);
1314        let outputs3 = {
1315            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1316            operator.process_stream(&order3, &mut ctx)
1317        };
1318        assert_eq!(
1319            outputs3
1320                .iter()
1321                .filter(|o| matches!(o, Output::Event(_)))
1322                .count(),
1323            1
1324        );
1325
1326        assert_eq!(operator.metrics().matches, 3);
1327    }
1328
1329    #[test]
1330    fn test_no_match_before_first_version() {
1331        let config = TemporalJoinConfig::builder()
1332            .stream_key_column("currency".to_string())
1333            .table_key_column("currency".to_string())
1334            .table_version_column("valid_from".to_string())
1335            .semantics(TemporalJoinSemantics::EventTime)
1336            .join_type(TemporalJoinType::Inner)
1337            .build();
1338
1339        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1340
1341        let mut timers = TimerService::new();
1342        let mut state = InMemoryStore::new();
1343        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1344
1345        // Insert rate valid from t=500
1346        {
1347            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1348            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1349        }
1350
1351        // Order at t=400 (before any rate is valid)
1352        let order = create_order_event(400, "USD", 100.0);
1353        let outputs = {
1354            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1355            operator.process_stream(&order, &mut ctx)
1356        };
1357
1358        // Inner join - no output
1359        assert_eq!(outputs.len(), 0);
1360        assert_eq!(operator.metrics().unmatched, 1);
1361    }
1362
1363    #[test]
1364    fn test_left_join_no_match() {
1365        let config = TemporalJoinConfig::builder()
1366            .stream_key_column("currency".to_string())
1367            .table_key_column("currency".to_string())
1368            .table_version_column("valid_from".to_string())
1369            .semantics(TemporalJoinSemantics::EventTime)
1370            .join_type(TemporalJoinType::Left)
1371            .build();
1372
1373        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1374
1375        let mut timers = TimerService::new();
1376        let mut state = InMemoryStore::new();
1377        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1378
1379        // First, establish schemas by processing a matched event
1380        {
1381            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1382            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1383        }
1384        {
1385            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1386            operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1387        }
1388
1389        // Order for different currency (no matching rate)
1390        let order = create_order_event(700, "EUR", 100.0);
1391        let outputs = {
1392            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1393            operator.process_stream(&order, &mut ctx)
1394        };
1395
1396        // Left join should emit with nulls
1397        assert_eq!(
1398            outputs
1399                .iter()
1400                .filter(|o| matches!(o, Output::Event(_)))
1401                .count(),
1402            1
1403        );
1404        assert_eq!(operator.metrics().unmatched, 1);
1405
1406        if let Some(Output::Event(event)) = outputs.first() {
1407            assert_eq!(event.data.num_columns(), 5); // Order cols + null rate cols
1408        }
1409    }
1410
1411    #[test]
1412    fn test_process_time_semantics() {
1413        let config = TemporalJoinConfig::builder()
1414            .stream_key_column("currency".to_string())
1415            .table_key_column("currency".to_string())
1416            .table_version_column("valid_from".to_string())
1417            .semantics(TemporalJoinSemantics::ProcessTime) // Process time
1418            .join_type(TemporalJoinType::Inner)
1419            .build();
1420
1421        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1422
1423        let mut timers = TimerService::new();
1424        let mut state = InMemoryStore::new();
1425        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1426
1427        // Insert rate versions
1428        {
1429            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1430            operator.process_table_insert(&create_rate_event(100, "USD", 1.0, 100), &mut ctx);
1431            operator.process_table_insert(&create_rate_event(200, "USD", 1.1, 200), &mut ctx);
1432            operator.process_table_insert(&create_rate_event(300, "USD", 1.2, 300), &mut ctx);
1433        }
1434
1435        // Process-time lookup always gets latest version
1436        // Order at t=150 should still get rate 1.2 (latest)
1437        let order = create_order_event(150, "USD", 100.0);
1438        let outputs = {
1439            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1440            ctx.processing_time = 1000; // Processing time doesn't matter for latest
1441            operator.process_stream(&order, &mut ctx)
1442        };
1443
1444        assert_eq!(
1445            outputs
1446                .iter()
1447                .filter(|o| matches!(o, Output::Event(_)))
1448                .count(),
1449            1
1450        );
1451    }
1452
1453    #[test]
1454    fn test_append_only_no_stream_state() {
1455        let config = TemporalJoinConfig::builder()
1456            .stream_key_column("currency".to_string())
1457            .table_key_column("currency".to_string())
1458            .table_version_column("valid_from".to_string())
1459            .table_characteristics(TableCharacteristics::AppendOnly)
1460            .build();
1461
1462        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1463
1464        let mut timers = TimerService::new();
1465        let mut state = InMemoryStore::new();
1466        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1467
1468        // Insert rate
1469        {
1470            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1471            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1472        }
1473
1474        // Process order
1475        let order = create_order_event(600, "USD", 100.0);
1476        {
1477            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1478            operator.process_stream(&order, &mut ctx);
1479        }
1480
1481        // Append-only should not track stream events
1482        assert_eq!(operator.stream_state_size(), 0);
1483    }
1484
1485    #[test]
1486    fn test_non_append_only_tracks_stream_state() {
1487        let config = TemporalJoinConfig::builder()
1488            .stream_key_column("currency".to_string())
1489            .table_key_column("currency".to_string())
1490            .table_version_column("valid_from".to_string())
1491            .table_characteristics(TableCharacteristics::NonAppendOnly)
1492            .build();
1493
1494        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1495
1496        let mut timers = TimerService::new();
1497        let mut state = InMemoryStore::new();
1498        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1499
1500        // Insert rate
1501        {
1502            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1503            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1504        }
1505
1506        // Process order
1507        let order = create_order_event(600, "USD", 100.0);
1508        {
1509            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1510            operator.process_stream(&order, &mut ctx);
1511        }
1512
1513        // Non-append-only should track stream events
1514        assert_eq!(operator.stream_state_size(), 1);
1515    }
1516
1517    #[test]
1518    fn test_table_delete_emits_retraction() {
1519        let config = TemporalJoinConfig::builder()
1520            .stream_key_column("currency".to_string())
1521            .table_key_column("currency".to_string())
1522            .table_version_column("valid_from".to_string())
1523            .table_characteristics(TableCharacteristics::NonAppendOnly)
1524            .build();
1525
1526        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1527
1528        let mut timers = TimerService::new();
1529        let mut state = InMemoryStore::new();
1530        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1531
1532        // Insert rate and process order
1533        let rate = create_rate_event(500, "USD", 1.1, 500);
1534        {
1535            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1536            operator.process_table_insert(&rate, &mut ctx);
1537        }
1538
1539        let order = create_order_event(600, "USD", 100.0);
1540        {
1541            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1542            operator.process_stream(&order, &mut ctx);
1543        }
1544
1545        // Create table row for delete
1546        let table_row = TableRow::new(500, b"USD".to_vec(), &rate.data).unwrap();
1547
1548        // Delete the rate
1549        let change = TableChange::Delete(table_row);
1550        let outputs = {
1551            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1552            operator.process_table_change(&change, &mut ctx)
1553        };
1554
1555        // Should emit retraction
1556        assert_eq!(operator.metrics().table_deletes, 1);
1557        assert!(
1558            operator.metrics().retractions >= 1
1559                || outputs.iter().any(|o| matches!(o, Output::LateEvent(_)))
1560        );
1561    }
1562
1563    #[test]
1564    fn test_multiple_keys() {
1565        let config = TemporalJoinConfig::builder()
1566            .stream_key_column("currency".to_string())
1567            .table_key_column("currency".to_string())
1568            .table_version_column("valid_from".to_string())
1569            .build();
1570
1571        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1572
1573        let mut timers = TimerService::new();
1574        let mut state = InMemoryStore::new();
1575        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1576
1577        // Insert rates for different currencies
1578        {
1579            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1580            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1581            operator.process_table_insert(&create_rate_event(500, "EUR", 0.9, 500), &mut ctx);
1582            operator.process_table_insert(&create_rate_event(500, "GBP", 0.8, 500), &mut ctx);
1583        }
1584
1585        // Orders for different currencies
1586        {
1587            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1588            let outputs1 =
1589                operator.process_stream(&create_order_event(600, "USD", 100.0), &mut ctx);
1590            assert_eq!(
1591                outputs1
1592                    .iter()
1593                    .filter(|o| matches!(o, Output::Event(_)))
1594                    .count(),
1595                1
1596            );
1597        }
1598
1599        {
1600            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1601            let outputs2 =
1602                operator.process_stream(&create_order_event(600, "EUR", 100.0), &mut ctx);
1603            assert_eq!(
1604                outputs2
1605                    .iter()
1606                    .filter(|o| matches!(o, Output::Event(_)))
1607                    .count(),
1608                1
1609            );
1610        }
1611
1612        assert_eq!(operator.metrics().matches, 2);
1613    }
1614
1615    #[test]
1616    fn test_max_versions_per_key() {
1617        let config = TemporalJoinConfig::builder()
1618            .stream_key_column("currency".to_string())
1619            .table_key_column("currency".to_string())
1620            .table_version_column("valid_from".to_string())
1621            .max_versions_per_key(2) // Only keep 2 versions
1622            .build();
1623
1624        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1625
1626        let mut timers = TimerService::new();
1627        let mut state = InMemoryStore::new();
1628        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1629
1630        // Insert 5 versions
1631        for i in 0..5 {
1632            let rate =
1633                create_rate_event(100 * (i + 1), "USD", 1.0 + (i as f64) * 0.1, 100 * (i + 1));
1634            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1635            operator.process_table_insert(&rate, &mut ctx);
1636        }
1637
1638        // Should only have 2 versions (most recent)
1639        assert_eq!(operator.table_state_size(), 2);
1640
1641        // Should have versions for t=400 and t=500 only
1642        let key_state = operator.table_state.get(&b"USD".to_vec()).unwrap();
1643        assert!(key_state.lookup_at_time(400).is_some());
1644        assert!(key_state.lookup_at_time(500).is_some());
1645        // Earlier versions should be gone
1646        assert!(key_state.lookup_at_time(100).is_none());
1647    }
1648
1649    #[test]
1650    fn test_checkpoint_restore() {
1651        let config = TemporalJoinConfig::builder()
1652            .stream_key_column("currency".to_string())
1653            .table_key_column("currency".to_string())
1654            .table_version_column("valid_from".to_string())
1655            .table_characteristics(TableCharacteristics::NonAppendOnly)
1656            .build();
1657
1658        let mut operator =
1659            TemporalJoinOperator::with_id(config.clone(), "test_temporal".to_string());
1660
1661        let mut timers = TimerService::new();
1662        let mut state = InMemoryStore::new();
1663        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1664
1665        // Add some state
1666        {
1667            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1668            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1669            operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1670        }
1671
1672        {
1673            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1674            operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1675        }
1676
1677        operator.watermark = 500;
1678        operator.metrics.matches = 10;
1679        operator.metrics.retractions = 2;
1680
1681        // Checkpoint
1682        let checkpoint = operator.checkpoint();
1683
1684        // Restore to new operator
1685        let mut restored = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1686        restored.restore(checkpoint).unwrap();
1687
1688        // Verify state restored
1689        assert_eq!(restored.watermark(), 500);
1690        assert_eq!(restored.metrics().matches, 10);
1691        assert_eq!(restored.metrics().retractions, 2);
1692        assert_eq!(restored.table_state_size(), 2);
1693        assert_eq!(restored.stream_state_size(), 1);
1694    }
1695
1696    #[test]
1697    fn test_schema_composition() {
1698        let config = TemporalJoinConfig::builder()
1699            .stream_key_column("currency".to_string())
1700            .table_key_column("currency".to_string())
1701            .table_version_column("valid_from".to_string())
1702            .build();
1703
1704        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1705
1706        let mut timers = TimerService::new();
1707        let mut state = InMemoryStore::new();
1708        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1709
1710        // Process table to capture schema
1711        {
1712            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1713            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1714        }
1715
1716        // Process stream to capture schema and produce output
1717        let order = create_order_event(600, "USD", 100.0);
1718        let outputs = {
1719            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1720            operator.process_stream(&order, &mut ctx)
1721        };
1722
1723        assert_eq!(outputs.len(), 1);
1724
1725        if let Some(Output::Event(event)) = outputs.first() {
1726            let schema = event.data.schema();
1727
1728            // Check stream columns (order)
1729            assert!(schema.field_with_name("amount").is_ok());
1730
1731            // Check table columns (rate) - currency is duplicated so prefixed
1732            assert!(schema.field_with_name("table_currency").is_ok());
1733            assert!(schema.field_with_name("rate").is_ok());
1734            assert!(schema.field_with_name("valid_from").is_ok());
1735        }
1736    }
1737
1738    #[test]
1739    fn test_metrics_tracking() {
1740        let config = TemporalJoinConfig::builder()
1741            .stream_key_column("currency".to_string())
1742            .table_key_column("currency".to_string())
1743            .table_version_column("valid_from".to_string())
1744            .build();
1745
1746        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1747
1748        let mut timers = TimerService::new();
1749        let mut state = InMemoryStore::new();
1750        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1751
1752        // Process events
1753        {
1754            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1755            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1756            operator.process_table_insert(&create_rate_event(600, "USD", 1.2, 600), &mut ctx);
1757        }
1758
1759        {
1760            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1761            operator.process_stream(&create_order_event(550, "USD", 100.0), &mut ctx);
1762            operator.process_stream(&create_order_event(650, "USD", 200.0), &mut ctx);
1763        }
1764
1765        assert_eq!(operator.metrics().table_inserts, 2);
1766        assert_eq!(operator.metrics().stream_events, 2);
1767        assert_eq!(operator.metrics().matches, 2);
1768    }
1769
1770    #[test]
1771    fn test_versioned_key_state_operations() {
1772        let mut key_state = VersionedKeyState::new();
1773        assert!(key_state.is_empty());
1774
1775        // Insert some rows
1776        let row1 = TableRow {
1777            version_timestamp: 100,
1778            key_value: b"test".to_vec(),
1779            data: vec![],
1780        };
1781        let row2 = TableRow {
1782            version_timestamp: 200,
1783            key_value: b"test".to_vec(),
1784            data: vec![],
1785        };
1786        let row3 = TableRow {
1787            version_timestamp: 300,
1788            key_value: b"test".to_vec(),
1789            data: vec![],
1790        };
1791
1792        key_state.insert(row1);
1793        key_state.insert(row2);
1794        key_state.insert(row3);
1795
1796        assert_eq!(key_state.len(), 3);
1797        assert_eq!(key_state.min_version, 100);
1798        assert_eq!(key_state.max_version, 300);
1799
1800        // Lookup at different times
1801        assert!(key_state.lookup_at_time(50).is_none()); // Before first version
1802        assert_eq!(
1803            key_state.lookup_at_time(150).unwrap().version_timestamp,
1804            100
1805        );
1806        assert_eq!(
1807            key_state.lookup_at_time(250).unwrap().version_timestamp,
1808            200
1809        );
1810        assert_eq!(
1811            key_state.lookup_at_time(350).unwrap().version_timestamp,
1812            300
1813        );
1814
1815        // Latest lookup
1816        assert_eq!(key_state.lookup_latest().unwrap().version_timestamp, 300);
1817
1818        // Cleanup before 200
1819        key_state.cleanup_before(200);
1820        assert_eq!(key_state.len(), 2);
1821        assert_eq!(key_state.min_version, 200);
1822
1823        // Remove specific version
1824        key_state.remove_version(200);
1825        assert_eq!(key_state.len(), 1);
1826    }
1827
1828    #[test]
1829    fn test_version_limiting() {
1830        let mut key_state = VersionedKeyState::new();
1831
1832        // Insert 10 versions
1833        for i in 0..10 {
1834            key_state.insert(TableRow {
1835                version_timestamp: 100 * (i + 1),
1836                key_value: b"test".to_vec(),
1837                data: vec![],
1838            });
1839        }
1840
1841        assert_eq!(key_state.len(), 10);
1842
1843        // Limit to 3 versions
1844        key_state.limit_versions(3);
1845        assert_eq!(key_state.len(), 3);
1846
1847        // Should have versions 800, 900, 1000
1848        assert!(key_state.lookup_at_time(700).is_none());
1849        assert!(key_state.lookup_at_time(800).is_some());
1850    }
1851
1852    #[test]
1853    fn test_metrics_reset() {
1854        let mut metrics = TemporalJoinMetrics::new();
1855        metrics.stream_events = 100;
1856        metrics.matches = 50;
1857        metrics.retractions = 5;
1858
1859        metrics.reset();
1860
1861        assert_eq!(metrics.stream_events, 0);
1862        assert_eq!(metrics.matches, 0);
1863        assert_eq!(metrics.retractions, 0);
1864    }
1865
1866    #[test]
1867    fn test_table_row_serialization() {
1868        let schema = Arc::new(Schema::new(vec![
1869            Field::new("currency", DataType::Utf8, false),
1870            Field::new("rate", DataType::Float64, false),
1871        ]));
1872        let batch = RecordBatch::try_new(
1873            schema,
1874            vec![
1875                Arc::new(StringArray::from(vec!["USD"])),
1876                Arc::new(Float64Array::from(vec![1.25])),
1877            ],
1878        )
1879        .unwrap();
1880
1881        let row = TableRow::new(1000, b"USD".to_vec(), &batch).unwrap();
1882
1883        // Verify round-trip
1884        let restored_batch = row.to_batch().unwrap();
1885        assert_eq!(restored_batch.num_rows(), 1);
1886        assert_eq!(restored_batch.num_columns(), 2);
1887    }
1888
1889    #[test]
1890    fn test_stream_state_cleanup() {
1891        let config = TemporalJoinConfig::builder()
1892            .stream_key_column("currency".to_string())
1893            .table_key_column("currency".to_string())
1894            .table_version_column("valid_from".to_string())
1895            .table_characteristics(TableCharacteristics::NonAppendOnly)
1896            .build();
1897
1898        let mut operator = TemporalJoinOperator::with_id(config, "test_temporal".to_string());
1899
1900        let mut timers = TimerService::new();
1901        let mut state = InMemoryStore::new();
1902        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1903
1904        // Add state
1905        {
1906            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1907            operator.process_table_insert(&create_rate_event(500, "USD", 1.1, 500), &mut ctx);
1908        }
1909
1910        // Process some orders at different timestamps
1911        for i in 0..5 {
1912            let order = create_order_event(600 + i * 100, "USD", 100.0);
1913            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1914            operator.process_stream(&order, &mut ctx);
1915        }
1916
1917        assert_eq!(operator.stream_state_size(), 5);
1918
1919        // Advance watermark past some events
1920        {
1921            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1922            operator.on_watermark(900, &mut ctx);
1923        }
1924
1925        // Old events should be cleaned up
1926        assert!(operator.stream_state_size() < 5);
1927        assert!(operator.metrics().state_cleanups > 0);
1928    }
1929}