Skip to main content

laminar_core/operator/
stream_join.rs

1//! # Stream-Stream Join Operators
2//!
3//! Implementation of time-bounded joins between two event streams.
4//!
5//! Stream-stream joins match events from two streams based on a join key
6//! and a time bound. Events are matched if they share a key and their
7//! timestamps fall within the specified time window.
8//!
9//! ## Join Types
10//!
11//! - **Inner**: Only emit matched pairs
12//! - **Left**: Emit all left events, with right match if exists
13//! - **Right**: Emit all right events, with left match if exists
14//! - **Full**: Emit all events, with matches where they exist
15//!
16//! ## Example
17//!
18//! ```rust,no_run
19//! use laminar_core::operator::stream_join::{
20//!     StreamJoinOperator, JoinType, JoinSide, StreamJoinConfig, JoinRowEncoding,
21//! };
22//! use std::time::Duration;
23//!
24//! // Basic join (backward compatible)
25//! let operator = StreamJoinOperator::new(
26//!     "order_id".to_string(),  // left key column
27//!     "order_id".to_string(),  // right key column
28//!     Duration::from_secs(3600), // 1 hour time bound
29//!     JoinType::Inner,
30//! );
31//!
32//! // Optimized join with CPU-friendly encoding (F057)
33//! let config = StreamJoinConfig::builder()
34//!     .left_key_column("order_id")
35//!     .right_key_column("order_id")
36//!     .time_bound(Duration::from_secs(3600))
37//!     .join_type(JoinType::Inner)
38//!     .row_encoding(JoinRowEncoding::CpuFriendly)  // 30-50% faster for memory-resident state
39//!     .asymmetric_compaction(true)                  // Skip compaction on finished sides
40//!     .per_key_tracking(true)                       // Aggressive cleanup for sparse keys
41//!     .build();
42//! let optimized_operator = StreamJoinOperator::from_config(config);
43//! ```
44//!
45//! ## SQL Syntax
46//!
47//! ```sql
48//! SELECT o.*, p.status
49//! FROM orders o
50//! JOIN payments p
51//!     ON o.order_id = p.order_id
52//!     AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR;
53//!
54//! -- Session variables for optimization (F057)
55//! SET streaming_join_row_encoding = 'cpu_friendly';
56//! SET streaming_join_asymmetric_compaction = true;
57//! ```
58//!
59//! ## State Management
60//!
61//! Events are stored in state with keys formatted as:
62//! - `sjl:<key_hash>:<timestamp>:<event_id>` for left events
63//! - `sjr:<key_hash>:<timestamp>:<event_id>` for right events
64//!
65//! State is automatically cleaned up when watermark passes
66//! `event_timestamp + time_bound`.
67//!
68//! ## Optimizations (F057)
69//!
70//! - **CPU-Friendly Encoding**: Inlines primitive values for faster access (30-50% improvement)
71//! - **Asymmetric Compaction**: Skips compaction on finished/idle sides
72//! - **Per-Key Tracking**: Aggressive cleanup for sparse key patterns
73//! - **Build-Side Pruning**: Early pruning based on probe-side watermark
74
75use super::{
76    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
77    TimerKey,
78};
79use crate::state::{StateStore, StateStoreExt};
80use arrow_array::{Array, ArrayRef, Float64Array, Int64Array, RecordBatch, StringArray};
81use arrow_schema::{DataType, Field, Schema, SchemaRef};
82use bytes::Bytes;
83use rkyv::{
84    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
85};
86use std::collections::HashMap;
87use std::sync::atomic::{AtomicU64, Ordering};
88use std::sync::Arc;
89use std::time::Duration;
90
91/// Join type for stream-stream joins.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
93pub enum JoinType {
94    /// Inner join - only emit matched pairs.
95    #[default]
96    Inner,
97    /// Left outer join - emit all left events, with right match if exists.
98    Left,
99    /// Right outer join - emit all right events, with left match if exists.
100    Right,
101    /// Full outer join - emit all events, with matches where they exist.
102    Full,
103}
104
105impl JoinType {
106    /// Returns true if unmatched left events should be emitted.
107    #[must_use]
108    pub fn emits_unmatched_left(&self) -> bool {
109        matches!(self, JoinType::Left | JoinType::Full)
110    }
111
112    /// Returns true if unmatched right events should be emitted.
113    #[must_use]
114    pub fn emits_unmatched_right(&self) -> bool {
115        matches!(self, JoinType::Right | JoinType::Full)
116    }
117}
118
119/// Identifies which side of the join an event came from.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum JoinSide {
122    /// Left side of the join.
123    Left,
124    /// Right side of the join.
125    Right,
126}
127
128// F057: Stream Join Optimizations
129
130/// Row encoding strategy for join state (F057).
131///
132/// Controls how join rows are serialized for storage. The encoding choice
133/// affects the tradeoff between memory usage and CPU access speed.
134#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
135pub enum JoinRowEncoding {
136    /// Compact encoding using Arrow IPC format.
137    ///
138    /// - Smaller memory footprint
139    /// - Higher CPU decode cost per access
140    /// - Best when: state exceeds memory, disk spills frequent
141    #[default]
142    Compact,
143
144    /// CPU-friendly encoding with inlined primitive values.
145    ///
146    /// - Larger memory footprint (~20-40% more)
147    /// - Faster access (~30-50% improvement per `RisingWave` benchmarks)
148    /// - Best when: state fits in memory, CPU-bound workloads
149    CpuFriendly,
150}
151
152impl std::fmt::Display for JoinRowEncoding {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        match self {
155            Self::Compact => write!(f, "compact"),
156            Self::CpuFriendly => write!(f, "cpu_friendly"),
157        }
158    }
159}
160
161impl std::str::FromStr for JoinRowEncoding {
162    type Err = String;
163
164    fn from_str(s: &str) -> Result<Self, Self::Err> {
165        match s.to_lowercase().as_str() {
166            "compact" => Ok(Self::Compact),
167            "cpu_friendly" | "cpufriendly" | "cpu-friendly" => Ok(Self::CpuFriendly),
168            _ => Err(format!(
169                "Unknown encoding: {s}. Expected 'compact' or 'cpu_friendly'"
170            )),
171        }
172    }
173}
174
175/// Configuration for stream-stream joins (F057).
176///
177/// Provides fine-grained control over join behavior and optimizations.
178/// Use the builder pattern for convenient construction.
179///
180/// # Example
181///
182/// ```rust
183/// use laminar_core::operator::stream_join::{StreamJoinConfig, JoinType, JoinRowEncoding};
184/// use std::time::Duration;
185///
186/// let config = StreamJoinConfig::builder()
187///     .left_key_column("order_id")
188///     .right_key_column("order_id")
189///     .time_bound(Duration::from_secs(3600))
190///     .join_type(JoinType::Inner)
191///     .row_encoding(JoinRowEncoding::CpuFriendly)
192///     .build();
193/// ```
194#[derive(Debug, Clone)]
195pub struct StreamJoinConfig {
196    /// Left stream key column name.
197    pub left_key_column: String,
198    /// Right stream key column name.
199    pub right_key_column: String,
200    /// Time bound for matching (milliseconds).
201    pub time_bound_ms: i64,
202    /// Type of join to perform.
203    pub join_type: JoinType,
204    /// Operator ID for checkpointing.
205    pub operator_id: Option<String>,
206
207    // F057 Optimizations
208    /// Row encoding strategy.
209    pub row_encoding: JoinRowEncoding,
210    /// Enable asymmetric compaction optimization.
211    pub asymmetric_compaction: bool,
212    /// Threshold for considering a side "finished" (ms).
213    pub idle_threshold_ms: i64,
214    /// Enable per-key cleanup tracking.
215    pub per_key_tracking: bool,
216    /// Threshold for idle key cleanup (ms).
217    pub key_idle_threshold_ms: i64,
218    /// Enable build-side pruning.
219    pub build_side_pruning: bool,
220    /// Which side to use as build side (None = auto-select based on statistics).
221    pub build_side: Option<JoinSide>,
222}
223
224impl Default for StreamJoinConfig {
225    fn default() -> Self {
226        Self {
227            left_key_column: String::new(),
228            right_key_column: String::new(),
229            time_bound_ms: 0,
230            join_type: JoinType::Inner,
231            operator_id: None,
232            row_encoding: JoinRowEncoding::Compact,
233            asymmetric_compaction: true,
234            idle_threshold_ms: 60_000, // 1 minute
235            per_key_tracking: true,
236            key_idle_threshold_ms: 300_000, // 5 minutes
237            build_side_pruning: true,
238            build_side: None,
239        }
240    }
241}
242
243impl StreamJoinConfig {
244    /// Creates a new configuration builder.
245    #[must_use]
246    pub fn builder() -> StreamJoinConfigBuilder {
247        StreamJoinConfigBuilder::default()
248    }
249}
250
251/// Builder for `StreamJoinConfig`.
252#[derive(Debug, Default)]
253pub struct StreamJoinConfigBuilder {
254    config: StreamJoinConfig,
255}
256
257impl StreamJoinConfigBuilder {
258    /// Sets the left stream key column name.
259    #[must_use]
260    pub fn left_key_column(mut self, column: impl Into<String>) -> Self {
261        self.config.left_key_column = column.into();
262        self
263    }
264
265    /// Sets the right stream key column name.
266    #[must_use]
267    pub fn right_key_column(mut self, column: impl Into<String>) -> Self {
268        self.config.right_key_column = column.into();
269        self
270    }
271
272    /// Sets the time bound for matching events.
273    #[must_use]
274    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
275    pub fn time_bound(mut self, duration: Duration) -> Self {
276        self.config.time_bound_ms = duration.as_millis() as i64;
277        self
278    }
279
280    /// Sets the time bound in milliseconds.
281    #[must_use]
282    pub fn time_bound_ms(mut self, ms: i64) -> Self {
283        self.config.time_bound_ms = ms;
284        self
285    }
286
287    /// Sets the join type.
288    #[must_use]
289    pub fn join_type(mut self, join_type: JoinType) -> Self {
290        self.config.join_type = join_type;
291        self
292    }
293
294    /// Sets the operator ID for checkpointing.
295    #[must_use]
296    pub fn operator_id(mut self, id: impl Into<String>) -> Self {
297        self.config.operator_id = Some(id.into());
298        self
299    }
300
301    /// Sets the row encoding strategy (F057).
302    #[must_use]
303    pub fn row_encoding(mut self, encoding: JoinRowEncoding) -> Self {
304        self.config.row_encoding = encoding;
305        self
306    }
307
308    /// Enables or disables asymmetric compaction (F057).
309    #[must_use]
310    pub fn asymmetric_compaction(mut self, enabled: bool) -> Self {
311        self.config.asymmetric_compaction = enabled;
312        self
313    }
314
315    /// Sets the idle threshold for asymmetric compaction (F057).
316    #[must_use]
317    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
318    pub fn idle_threshold(mut self, duration: Duration) -> Self {
319        self.config.idle_threshold_ms = duration.as_millis() as i64;
320        self
321    }
322
323    /// Enables or disables per-key tracking (F057).
324    #[must_use]
325    pub fn per_key_tracking(mut self, enabled: bool) -> Self {
326        self.config.per_key_tracking = enabled;
327        self
328    }
329
330    /// Sets the key idle threshold for cleanup (F057).
331    #[must_use]
332    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
333    pub fn key_idle_threshold(mut self, duration: Duration) -> Self {
334        self.config.key_idle_threshold_ms = duration.as_millis() as i64;
335        self
336    }
337
338    /// Enables or disables build-side pruning (F057).
339    #[must_use]
340    pub fn build_side_pruning(mut self, enabled: bool) -> Self {
341        self.config.build_side_pruning = enabled;
342        self
343    }
344
345    /// Sets which side to use as the build side (F057).
346    #[must_use]
347    pub fn build_side(mut self, side: JoinSide) -> Self {
348        self.config.build_side = Some(side);
349        self
350    }
351
352    /// Builds the configuration.
353    #[must_use]
354    pub fn build(self) -> StreamJoinConfig {
355        self.config
356    }
357}
358
359/// Per-side statistics for asymmetric optimization (F057).
360#[derive(Debug, Clone, Default)]
361pub struct SideStats {
362    /// Total events received on this side.
363    pub events_received: u64,
364    /// Events in current tracking window.
365    pub events_this_window: u64,
366    /// Last event timestamp (processing time).
367    pub last_event_time: i64,
368    /// Estimated write rate (events/second).
369    pub write_rate: f64,
370    /// Window start time for rate calculation.
371    window_start: i64,
372}
373
374impl SideStats {
375    /// Creates new side statistics.
376    #[must_use]
377    pub fn new() -> Self {
378        Self::default()
379    }
380
381    /// Records an event arrival.
382    #[allow(clippy::cast_precision_loss)]
383    pub fn record_event(&mut self, processing_time: i64) {
384        self.events_received += 1;
385        self.events_this_window += 1;
386        self.last_event_time = processing_time;
387
388        // Update write rate every 1000ms
389        if self.window_start == 0 {
390            self.window_start = processing_time;
391        } else {
392            let elapsed_ms = processing_time - self.window_start;
393            if elapsed_ms >= 1000 {
394                // Precision loss is acceptable for rate estimation
395                self.write_rate = (self.events_this_window as f64 * 1000.0) / elapsed_ms as f64;
396                self.events_this_window = 0;
397                self.window_start = processing_time;
398            }
399        }
400    }
401
402    /// Checks if this side is considered "finished" (no recent activity).
403    #[must_use]
404    pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
405        if self.events_received == 0 {
406            return false; // Never received events, not idle
407        }
408        let time_since_last = current_time - self.last_event_time;
409        time_since_last > threshold_ms && self.events_this_window == 0
410    }
411}
412
413/// Per-key metadata for cleanup tracking (F057).
414#[derive(Debug, Clone)]
415pub struct KeyMetadata {
416    /// Last event timestamp for this key (processing time).
417    pub last_activity: i64,
418    /// Number of events for this key.
419    pub event_count: u64,
420    /// Number of state entries for this key.
421    pub state_entries: u64,
422}
423
424impl KeyMetadata {
425    /// Creates new key metadata.
426    #[must_use]
427    pub fn new(processing_time: i64) -> Self {
428        Self {
429            last_activity: processing_time,
430            event_count: 1,
431            state_entries: 1,
432        }
433    }
434
435    /// Records an event for this key.
436    pub fn record_event(&mut self, processing_time: i64) {
437        self.last_activity = processing_time;
438        self.event_count += 1;
439        self.state_entries += 1;
440    }
441
442    /// Decrements state entry count (called on cleanup).
443    pub fn decrement_entries(&mut self) {
444        self.state_entries = self.state_entries.saturating_sub(1);
445    }
446
447    /// Checks if this key is idle.
448    #[must_use]
449    pub fn is_idle(&self, current_time: i64, threshold_ms: i64) -> bool {
450        current_time - self.last_activity > threshold_ms
451    }
452}
453
454/// State key prefixes for join state.
455const LEFT_STATE_PREFIX: &[u8; 4] = b"sjl:";
456const RIGHT_STATE_PREFIX: &[u8; 4] = b"sjr:";
457
458/// Timer key prefix for left-side cleanup.
459const LEFT_TIMER_PREFIX: u8 = 0x10;
460/// Timer key prefix for right-side cleanup.
461const RIGHT_TIMER_PREFIX: u8 = 0x20;
462/// Timer key prefix for unmatched event emission.
463const UNMATCHED_TIMER_PREFIX: u8 = 0x30;
464
465/// Static counter for generating unique operator IDs.
466static JOIN_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
467
468/// Static counter for generating unique event IDs within an operator.
469static EVENT_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
470
471/// A stored join row containing serialized event data.
472#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
473pub struct JoinRow {
474    /// Event timestamp in milliseconds.
475    pub timestamp: i64,
476    /// Serialized key value (as bytes).
477    pub key_value: Vec<u8>,
478    /// Serialized record batch data.
479    pub data: Vec<u8>,
480    /// Whether this row has been matched (for outer joins).
481    pub matched: bool,
482    /// Encoding used for serialization (F057).
483    /// 0 = Compact (Arrow IPC), 1 = `CpuFriendly`
484    encoding: u8,
485}
486
487/// Magic bytes for CPU-friendly encoding format.
488const CPU_FRIENDLY_MAGIC: [u8; 4] = *b"CPUF";
489
490/// Type tag for CPU-friendly encoding.
491#[repr(u8)]
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493enum CpuFriendlyType {
494    Null = 0,
495    Int64 = 1,
496    Float64 = 2,
497    Utf8 = 3,
498}
499
500impl JoinRow {
501    /// Creates a new join row from an event and extracted key.
502    /// Uses compact encoding by default.
503    #[cfg(test)]
504    fn new(timestamp: i64, key_value: Vec<u8>, batch: &RecordBatch) -> Result<Self, OperatorError> {
505        Self::with_encoding(timestamp, key_value, batch, JoinRowEncoding::Compact)
506    }
507
508    /// Creates a new join row with specified encoding (F057).
509    fn with_encoding(
510        timestamp: i64,
511        key_value: Vec<u8>,
512        batch: &RecordBatch,
513        encoding: JoinRowEncoding,
514    ) -> Result<Self, OperatorError> {
515        let (data, encoding_byte) = match encoding {
516            JoinRowEncoding::Compact => (Self::serialize_compact(batch)?, 0),
517            JoinRowEncoding::CpuFriendly => (Self::serialize_cpu_friendly(batch)?, 1),
518        };
519        Ok(Self {
520            timestamp,
521            key_value,
522            data,
523            matched: false,
524            encoding: encoding_byte,
525        })
526    }
527
528    /// Serializes using compact Arrow IPC format.
529    fn serialize_compact(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
530        let mut buf = Vec::new();
531        {
532            let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut buf, &batch.schema())
533                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
534            writer
535                .write(batch)
536                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
537            writer
538                .finish()
539                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
540        }
541        Ok(buf)
542    }
543
544    /// Serializes using CPU-friendly format (F057).
545    ///
546    /// Format:
547    /// - Magic (4 bytes): "CPUF"
548    /// - Num columns (2 bytes): u16
549    /// - Num rows (4 bytes): u32
550    /// - For each column:
551    ///   - Name length (2 bytes): u16
552    ///   - Name bytes (variable)
553    ///   - Type tag (1 byte)
554    ///   - Nullable (1 byte): 0 or 1
555    ///   - Data (depends on type):
556    ///     - Int64: validity bitmap + raw i64 values
557    ///     - Float64: validity bitmap + raw f64 values
558    ///     - Utf8: validity bitmap + offsets (u32) + data bytes
559    #[allow(clippy::cast_possible_truncation)] // Wire format uses u32 for row/column counts and offsets
560    fn serialize_cpu_friendly(batch: &RecordBatch) -> Result<Vec<u8>, OperatorError> {
561        let schema = batch.schema();
562        let num_rows = batch.num_rows();
563        let num_cols = batch.num_columns();
564
565        // Estimate capacity
566        let mut buf = Vec::with_capacity(4 + 2 + 4 + num_cols * 64 + num_rows * num_cols * 8);
567
568        // Header
569        buf.extend_from_slice(&CPU_FRIENDLY_MAGIC);
570        buf.extend_from_slice(&(num_cols as u16).to_le_bytes());
571        buf.extend_from_slice(&(num_rows as u32).to_le_bytes());
572
573        // Columns
574        for (i, field) in schema.fields().iter().enumerate() {
575            let column = batch.column(i);
576
577            // Column name
578            let name_bytes = field.name().as_bytes();
579            buf.extend_from_slice(&(name_bytes.len() as u16).to_le_bytes());
580            buf.extend_from_slice(name_bytes);
581
582            // Nullable flag
583            buf.push(u8::from(field.is_nullable()));
584
585            // Type and data
586            match field.data_type() {
587                DataType::Int64 => {
588                    buf.push(CpuFriendlyType::Int64 as u8);
589                    Self::write_int64_column(&mut buf, column, num_rows)?;
590                }
591                DataType::Float64 => {
592                    buf.push(CpuFriendlyType::Float64 as u8);
593                    Self::write_float64_column(&mut buf, column, num_rows)?;
594                }
595                DataType::Utf8 => {
596                    buf.push(CpuFriendlyType::Utf8 as u8);
597                    Self::write_utf8_column(&mut buf, column, num_rows)?;
598                }
599                _ => {
600                    // Fallback: encode as null for unsupported types
601                    buf.push(CpuFriendlyType::Null as u8);
602                }
603            }
604        }
605
606        Ok(buf)
607    }
608
609    /// Writes an Int64 column in CPU-friendly format.
610    fn write_int64_column(
611        buf: &mut Vec<u8>,
612        column: &ArrayRef,
613        num_rows: usize,
614    ) -> Result<(), OperatorError> {
615        let arr = column
616            .as_any()
617            .downcast_ref::<Int64Array>()
618            .ok_or_else(|| OperatorError::SerializationFailed("Expected Int64Array".into()))?;
619
620        // Validity bitmap (1 bit per row, padded to bytes)
621        let validity_bytes = num_rows.div_ceil(8);
622        if let Some(nulls) = arr.nulls() {
623            // Copy validity buffer
624            let buffer = nulls.buffer();
625            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
626            buf.extend_from_slice(slice);
627            // Pad if needed
628            for _ in slice.len()..validity_bytes {
629                buf.push(0xFF);
630            }
631        } else {
632            // All valid
633            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
634        }
635
636        // Raw values (8 bytes each)
637        // SAFETY: Converting i64 slice to bytes for zero-copy serialization.
638        // The pointer cast is safe because we're reinterpreting the same memory.
639        let values = arr.values();
640        let value_bytes =
641            unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
642        buf.extend_from_slice(value_bytes);
643
644        Ok(())
645    }
646
647    /// Writes a Float64 column in CPU-friendly format.
648    fn write_float64_column(
649        buf: &mut Vec<u8>,
650        column: &ArrayRef,
651        num_rows: usize,
652    ) -> Result<(), OperatorError> {
653        let arr = column
654            .as_any()
655            .downcast_ref::<Float64Array>()
656            .ok_or_else(|| OperatorError::SerializationFailed("Expected Float64Array".into()))?;
657
658        // Validity bitmap
659        let validity_bytes = num_rows.div_ceil(8);
660        if let Some(nulls) = arr.nulls() {
661            let buffer = nulls.buffer();
662            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
663            buf.extend_from_slice(slice);
664            for _ in slice.len()..validity_bytes {
665                buf.push(0xFF);
666            }
667        } else {
668            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
669        }
670
671        // Raw values (8 bytes each)
672        // SAFETY: Converting f64 slice to bytes for zero-copy serialization.
673        let values = arr.values();
674        let value_bytes =
675            unsafe { std::slice::from_raw_parts(values.as_ptr().cast::<u8>(), values.len() * 8) };
676        buf.extend_from_slice(value_bytes);
677
678        Ok(())
679    }
680
681    /// Writes a Utf8 column in CPU-friendly format.
682    #[allow(clippy::cast_sign_loss)]
683    fn write_utf8_column(
684        buf: &mut Vec<u8>,
685        column: &ArrayRef,
686        num_rows: usize,
687    ) -> Result<(), OperatorError> {
688        let arr = column
689            .as_any()
690            .downcast_ref::<StringArray>()
691            .ok_or_else(|| OperatorError::SerializationFailed("Expected StringArray".into()))?;
692
693        // Validity bitmap
694        let validity_bytes = num_rows.div_ceil(8);
695        if let Some(nulls) = arr.nulls() {
696            let buffer = nulls.buffer();
697            let slice = &buffer.as_slice()[..validity_bytes.min(buffer.len())];
698            buf.extend_from_slice(slice);
699            for _ in slice.len()..validity_bytes {
700                buf.push(0xFF);
701            }
702        } else {
703            buf.extend(std::iter::repeat_n(0xFF, validity_bytes));
704        }
705
706        // Offsets (u32 for each row + 1)
707        // Note: Arrow string offsets are always non-negative
708        let offsets = arr.offsets();
709        for offset in offsets.iter() {
710            buf.extend_from_slice(&(*offset as u32).to_le_bytes());
711        }
712
713        // String data
714        let values = arr.values();
715        buf.extend_from_slice(values.as_slice());
716
717        Ok(())
718    }
719
720    /// Deserializes a record batch from bytes.
721    fn deserialize_batch(data: &[u8], encoding: u8) -> Result<RecordBatch, OperatorError> {
722        if encoding == 1 && data.starts_with(&CPU_FRIENDLY_MAGIC) {
723            Self::deserialize_cpu_friendly(data)
724        } else {
725            Self::deserialize_compact(data)
726        }
727    }
728
729    /// Deserializes from compact Arrow IPC format.
730    fn deserialize_compact(data: &[u8]) -> Result<RecordBatch, OperatorError> {
731        let cursor = std::io::Cursor::new(data);
732        let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
733            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
734        reader
735            .next()
736            .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
737            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
738    }
739
740    /// Deserializes from CPU-friendly format (F057).
741    fn deserialize_cpu_friendly(data: &[u8]) -> Result<RecordBatch, OperatorError> {
742        if data.len() < 10 {
743            return Err(OperatorError::SerializationFailed(
744                "Buffer too short".into(),
745            ));
746        }
747
748        // Parse header
749        let num_cols = u16::from_le_bytes([data[4], data[5]]) as usize;
750        let num_rows = u32::from_le_bytes([data[6], data[7], data[8], data[9]]) as usize;
751
752        let mut offset = 10;
753        let mut fields = Vec::with_capacity(num_cols);
754        let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_cols);
755
756        for _ in 0..num_cols {
757            if offset + 2 > data.len() {
758                return Err(OperatorError::SerializationFailed(
759                    "Truncated column header".into(),
760                ));
761            }
762
763            // Read column name
764            let name_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
765            offset += 2;
766
767            if offset + name_len > data.len() {
768                return Err(OperatorError::SerializationFailed(
769                    "Truncated column name".into(),
770                ));
771            }
772            let name = String::from_utf8_lossy(&data[offset..offset + name_len]).to_string();
773            offset += name_len;
774
775            if offset + 2 > data.len() {
776                return Err(OperatorError::SerializationFailed(
777                    "Truncated type info".into(),
778                ));
779            }
780
781            // Read nullable and type
782            let nullable = data[offset] != 0;
783            offset += 1;
784            let type_tag = data[offset];
785            offset += 1;
786
787            // Read column data based on type
788            let validity_bytes = num_rows.div_ceil(8);
789
790            match type_tag {
791                t if t == CpuFriendlyType::Int64 as u8 => {
792                    let (arr, new_offset) =
793                        Self::read_int64_column(data, offset, num_rows, validity_bytes)?;
794                    offset = new_offset;
795                    fields.push(Field::new(&name, DataType::Int64, nullable));
796                    columns.push(Arc::new(arr));
797                }
798                t if t == CpuFriendlyType::Float64 as u8 => {
799                    let (arr, new_offset) =
800                        Self::read_float64_column(data, offset, num_rows, validity_bytes)?;
801                    offset = new_offset;
802                    fields.push(Field::new(&name, DataType::Float64, nullable));
803                    columns.push(Arc::new(arr));
804                }
805                t if t == CpuFriendlyType::Utf8 as u8 => {
806                    let (arr, new_offset) =
807                        Self::read_utf8_column(data, offset, num_rows, validity_bytes)?;
808                    offset = new_offset;
809                    fields.push(Field::new(&name, DataType::Utf8, nullable));
810                    columns.push(Arc::new(arr));
811                }
812                _ => {
813                    // Null/unsupported type - create null array
814                    fields.push(Field::new(&name, DataType::Int64, true));
815                    columns.push(Arc::new(Int64Array::from(vec![None; num_rows])));
816                }
817            }
818        }
819
820        let schema = Arc::new(Schema::new(fields));
821        RecordBatch::try_new(schema, columns)
822            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))
823    }
824
825    /// Reads an Int64 column from CPU-friendly format.
826    fn read_int64_column(
827        data: &[u8],
828        offset: usize,
829        num_rows: usize,
830        validity_bytes: usize,
831    ) -> Result<(Int64Array, usize), OperatorError> {
832        let mut pos = offset;
833
834        // Skip validity bitmap (we don't reconstruct it for simplicity)
835        if pos + validity_bytes > data.len() {
836            return Err(OperatorError::SerializationFailed(
837                "Truncated validity".into(),
838            ));
839        }
840        pos += validity_bytes;
841
842        // Read values
843        let values_bytes = num_rows * 8;
844        if pos + values_bytes > data.len() {
845            return Err(OperatorError::SerializationFailed(
846                "Truncated int64 values".into(),
847            ));
848        }
849
850        let mut values = Vec::with_capacity(num_rows);
851        for i in 0..num_rows {
852            let start = pos + i * 8;
853            let bytes = [
854                data[start],
855                data[start + 1],
856                data[start + 2],
857                data[start + 3],
858                data[start + 4],
859                data[start + 5],
860                data[start + 6],
861                data[start + 7],
862            ];
863            values.push(i64::from_le_bytes(bytes));
864        }
865        pos += values_bytes;
866
867        Ok((Int64Array::from(values), pos))
868    }
869
870    /// Reads a Float64 column from CPU-friendly format.
871    fn read_float64_column(
872        data: &[u8],
873        offset: usize,
874        num_rows: usize,
875        validity_bytes: usize,
876    ) -> Result<(Float64Array, usize), OperatorError> {
877        let mut pos = offset;
878
879        // Skip validity bitmap
880        if pos + validity_bytes > data.len() {
881            return Err(OperatorError::SerializationFailed(
882                "Truncated validity".into(),
883            ));
884        }
885        pos += validity_bytes;
886
887        // Read values
888        let values_bytes = num_rows * 8;
889        if pos + values_bytes > data.len() {
890            return Err(OperatorError::SerializationFailed(
891                "Truncated float64 values".into(),
892            ));
893        }
894
895        let mut values = Vec::with_capacity(num_rows);
896        for i in 0..num_rows {
897            let start = pos + i * 8;
898            let bytes = [
899                data[start],
900                data[start + 1],
901                data[start + 2],
902                data[start + 3],
903                data[start + 4],
904                data[start + 5],
905                data[start + 6],
906                data[start + 7],
907            ];
908            values.push(f64::from_le_bytes(bytes));
909        }
910        pos += values_bytes;
911
912        Ok((Float64Array::from(values), pos))
913    }
914
915    /// Reads a Utf8 column from CPU-friendly format.
916    fn read_utf8_column(
917        data: &[u8],
918        offset: usize,
919        num_rows: usize,
920        validity_bytes: usize,
921    ) -> Result<(StringArray, usize), OperatorError> {
922        let mut pos = offset;
923
924        // Skip validity bitmap
925        if pos + validity_bytes > data.len() {
926            return Err(OperatorError::SerializationFailed(
927                "Truncated validity".into(),
928            ));
929        }
930        pos += validity_bytes;
931
932        // Read offsets
933        let offsets_bytes = (num_rows + 1) * 4;
934        if pos + offsets_bytes > data.len() {
935            return Err(OperatorError::SerializationFailed(
936                "Truncated offsets".into(),
937            ));
938        }
939
940        let mut offsets = Vec::with_capacity(num_rows + 1);
941        for i in 0..=num_rows {
942            let start = pos + i * 4;
943            let bytes = [
944                data[start],
945                data[start + 1],
946                data[start + 2],
947                data[start + 3],
948            ];
949            offsets.push(u32::from_le_bytes(bytes) as usize);
950        }
951        pos += offsets_bytes;
952
953        // Calculate data length and read string data
954        let data_len = offsets.last().copied().unwrap_or(0);
955        if pos + data_len > data.len() {
956            return Err(OperatorError::SerializationFailed(
957                "Truncated string data".into(),
958            ));
959        }
960
961        let string_data = &data[pos..pos + data_len];
962        pos += data_len;
963
964        // Build strings
965        let mut strings = Vec::with_capacity(num_rows);
966        for i in 0..num_rows {
967            let start = offsets[i];
968            let end = offsets[i + 1];
969            let s = String::from_utf8_lossy(&string_data[start..end]).to_string();
970            strings.push(s);
971        }
972
973        Ok((StringArray::from(strings), pos))
974    }
975
976    /// Converts this join row back to a record batch.
977    ///
978    /// # Errors
979    ///
980    /// Returns `OperatorError::SerializationFailed` if the batch data is invalid.
981    pub fn to_batch(&self) -> Result<RecordBatch, OperatorError> {
982        Self::deserialize_batch(&self.data, self.encoding)
983    }
984
985    /// Returns the encoding used for this row.
986    #[must_use]
987    pub fn encoding(&self) -> JoinRowEncoding {
988        if self.encoding == 1 {
989            JoinRowEncoding::CpuFriendly
990        } else {
991            JoinRowEncoding::Compact
992        }
993    }
994}
995
996/// Metrics for tracking join operations.
997#[derive(Debug, Clone, Default)]
998pub struct JoinMetrics {
999    /// Number of left events processed.
1000    pub left_events: u64,
1001    /// Number of right events processed.
1002    pub right_events: u64,
1003    /// Number of join matches produced.
1004    pub matches: u64,
1005    /// Number of unmatched left events emitted (left/full joins).
1006    pub unmatched_left: u64,
1007    /// Number of unmatched right events emitted (right/full joins).
1008    pub unmatched_right: u64,
1009    /// Number of late events dropped.
1010    pub late_events: u64,
1011    /// Number of state entries cleaned up.
1012    pub state_cleanups: u64,
1013
1014    // F057 Optimization Metrics
1015    /// Rows encoded with CPU-friendly format.
1016    pub cpu_friendly_encodes: u64,
1017    /// Rows encoded with compact format.
1018    pub compact_encodes: u64,
1019    /// Compactions skipped due to asymmetric optimization.
1020    pub asymmetric_skips: u64,
1021    /// Idle keys cleaned up.
1022    pub idle_key_cleanups: u64,
1023    /// Build-side entries pruned early.
1024    pub build_side_prunes: u64,
1025    /// Current number of tracked keys (for per-key tracking).
1026    pub tracked_keys: u64,
1027}
1028
1029impl JoinMetrics {
1030    /// Creates new metrics.
1031    #[must_use]
1032    pub fn new() -> Self {
1033        Self::default()
1034    }
1035
1036    /// Resets all counters.
1037    pub fn reset(&mut self) {
1038        *self = Self::default();
1039    }
1040}
1041
1042/// Stream-stream join operator.
1043///
1044/// Joins events from two streams based on a key column and time bound.
1045/// Events are matched if they share a key value and their timestamps
1046/// are within the specified time window.
1047///
1048/// # State Management
1049///
1050/// Events from both sides are stored in state until they can no longer
1051/// produce matches (watermark passes `timestamp + time_bound`). State
1052/// is automatically cleaned up via timers.
1053///
1054/// # Performance Considerations
1055///
1056/// - State grows linearly with the number of events within the time window
1057/// - For high-cardinality joins, consider using shorter time bounds
1058/// - Inner joins use less state than outer joins (no unmatched tracking)
1059///
1060/// # Optimizations (F057)
1061///
1062/// - **CPU-Friendly Encoding**: Use `JoinRowEncoding::CpuFriendly` for 30-50% faster access
1063/// - **Asymmetric Compaction**: Automatically skips compaction on finished/idle sides
1064/// - **Per-Key Tracking**: Aggressive cleanup for sparse key patterns
1065/// - **Build-Side Pruning**: Early pruning based on probe-side watermark progress
1066pub struct StreamJoinOperator {
1067    /// Left stream key column name.
1068    left_key_column: String,
1069    /// Right stream key column name.
1070    right_key_column: String,
1071    /// Time bound for matching (events match if within this duration).
1072    time_bound_ms: i64,
1073    /// Type of join to perform.
1074    join_type: JoinType,
1075    /// Operator ID for checkpointing.
1076    operator_id: String,
1077    /// Metrics for monitoring.
1078    metrics: JoinMetrics,
1079    /// Output schema (lazily initialized).
1080    output_schema: Option<SchemaRef>,
1081    /// Left schema (captured from first left event).
1082    left_schema: Option<SchemaRef>,
1083    /// Right schema (captured from first right event).
1084    right_schema: Option<SchemaRef>,
1085
1086    // F057 Optimization Fields
1087    /// Row encoding strategy.
1088    row_encoding: JoinRowEncoding,
1089    /// Enable asymmetric compaction.
1090    asymmetric_compaction: bool,
1091    /// Idle threshold for asymmetric compaction (ms).
1092    idle_threshold_ms: i64,
1093    /// Enable per-key tracking.
1094    per_key_tracking: bool,
1095    /// Key idle threshold (ms).
1096    key_idle_threshold_ms: i64,
1097    /// Enable build-side pruning.
1098    build_side_pruning: bool,
1099    /// Configured build side.
1100    build_side: Option<JoinSide>,
1101    /// Left-side statistics.
1102    left_stats: SideStats,
1103    /// Right-side statistics.
1104    right_stats: SideStats,
1105    /// Per-key metadata (`key_hash` -> metadata).
1106    key_metadata: HashMap<u64, KeyMetadata>,
1107    /// Left-side watermark.
1108    left_watermark: i64,
1109    /// Right-side watermark.
1110    right_watermark: i64,
1111    /// Reusable buffer for `prune_build_side` to avoid per-call allocation.
1112    prune_buffer: Vec<Bytes>,
1113}
1114
1115impl StreamJoinOperator {
1116    /// Creates a new stream join operator.
1117    ///
1118    /// # Arguments
1119    ///
1120    /// * `left_key_column` - Name of the key column in left stream events
1121    /// * `right_key_column` - Name of the key column in right stream events
1122    /// * `time_bound` - Maximum time difference for matching events
1123    /// * `join_type` - Type of join to perform
1124    #[must_use]
1125    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
1126    pub fn new(
1127        left_key_column: String,
1128        right_key_column: String,
1129        time_bound: Duration,
1130        join_type: JoinType,
1131    ) -> Self {
1132        let operator_num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1133        Self {
1134            left_key_column,
1135            right_key_column,
1136            time_bound_ms: time_bound.as_millis() as i64,
1137            join_type,
1138            operator_id: format!("stream_join_{operator_num}"),
1139            metrics: JoinMetrics::new(),
1140            output_schema: None,
1141            left_schema: None,
1142            right_schema: None,
1143            // F057: Default optimizations
1144            row_encoding: JoinRowEncoding::Compact,
1145            asymmetric_compaction: true,
1146            idle_threshold_ms: 60_000,
1147            per_key_tracking: true,
1148            key_idle_threshold_ms: 300_000,
1149            build_side_pruning: true,
1150            build_side: None,
1151            left_stats: SideStats::new(),
1152            right_stats: SideStats::new(),
1153            key_metadata: HashMap::new(),
1154            left_watermark: i64::MIN,
1155            right_watermark: i64::MIN,
1156            prune_buffer: Vec::with_capacity(100),
1157        }
1158    }
1159
1160    /// Creates a new stream join operator with a custom operator ID.
1161    #[must_use]
1162    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
1163    pub fn with_id(
1164        left_key_column: String,
1165        right_key_column: String,
1166        time_bound: Duration,
1167        join_type: JoinType,
1168        operator_id: String,
1169    ) -> Self {
1170        Self {
1171            left_key_column,
1172            right_key_column,
1173            time_bound_ms: time_bound.as_millis() as i64,
1174            join_type,
1175            operator_id,
1176            metrics: JoinMetrics::new(),
1177            output_schema: None,
1178            left_schema: None,
1179            right_schema: None,
1180            // F057: Default optimizations
1181            row_encoding: JoinRowEncoding::Compact,
1182            asymmetric_compaction: true,
1183            idle_threshold_ms: 60_000,
1184            per_key_tracking: true,
1185            key_idle_threshold_ms: 300_000,
1186            build_side_pruning: true,
1187            build_side: None,
1188            left_stats: SideStats::new(),
1189            right_stats: SideStats::new(),
1190            key_metadata: HashMap::new(),
1191            left_watermark: i64::MIN,
1192            right_watermark: i64::MIN,
1193            prune_buffer: Vec::with_capacity(100),
1194        }
1195    }
1196
1197    /// Creates a new stream join operator from configuration (F057).
1198    ///
1199    /// This is the recommended constructor for production use, allowing
1200    /// fine-grained control over optimization settings.
1201    ///
1202    /// # Example
1203    ///
1204    /// ```rust
1205    /// use laminar_core::operator::stream_join::{
1206    ///     StreamJoinOperator, StreamJoinConfig, JoinType, JoinRowEncoding,
1207    /// };
1208    /// use std::time::Duration;
1209    ///
1210    /// let config = StreamJoinConfig::builder()
1211    ///     .left_key_column("order_id")
1212    ///     .right_key_column("order_id")
1213    ///     .time_bound(Duration::from_secs(3600))
1214    ///     .join_type(JoinType::Inner)
1215    ///     .row_encoding(JoinRowEncoding::CpuFriendly)
1216    ///     .build();
1217    ///
1218    /// let operator = StreamJoinOperator::from_config(config);
1219    /// ```
1220    #[must_use]
1221    pub fn from_config(config: StreamJoinConfig) -> Self {
1222        let operator_id = config.operator_id.unwrap_or_else(|| {
1223            let num = JOIN_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
1224            format!("stream_join_{num}")
1225        });
1226
1227        Self {
1228            left_key_column: config.left_key_column,
1229            right_key_column: config.right_key_column,
1230            time_bound_ms: config.time_bound_ms,
1231            join_type: config.join_type,
1232            operator_id,
1233            metrics: JoinMetrics::new(),
1234            output_schema: None,
1235            left_schema: None,
1236            right_schema: None,
1237            row_encoding: config.row_encoding,
1238            asymmetric_compaction: config.asymmetric_compaction,
1239            idle_threshold_ms: config.idle_threshold_ms,
1240            per_key_tracking: config.per_key_tracking,
1241            key_idle_threshold_ms: config.key_idle_threshold_ms,
1242            build_side_pruning: config.build_side_pruning,
1243            build_side: config.build_side,
1244            left_stats: SideStats::new(),
1245            right_stats: SideStats::new(),
1246            key_metadata: HashMap::new(),
1247            left_watermark: i64::MIN,
1248            right_watermark: i64::MIN,
1249            prune_buffer: Vec::with_capacity(100),
1250        }
1251    }
1252
1253    /// Returns the join type.
1254    #[must_use]
1255    pub fn join_type(&self) -> JoinType {
1256        self.join_type
1257    }
1258
1259    /// Returns the time bound in milliseconds.
1260    #[must_use]
1261    pub fn time_bound_ms(&self) -> i64 {
1262        self.time_bound_ms
1263    }
1264
1265    /// Returns the metrics.
1266    #[must_use]
1267    pub fn metrics(&self) -> &JoinMetrics {
1268        &self.metrics
1269    }
1270
1271    /// Resets the metrics.
1272    pub fn reset_metrics(&mut self) {
1273        self.metrics.reset();
1274    }
1275
1276    /// Returns the row encoding strategy (F057).
1277    #[must_use]
1278    pub fn row_encoding(&self) -> JoinRowEncoding {
1279        self.row_encoding
1280    }
1281
1282    /// Returns whether asymmetric compaction is enabled (F057).
1283    #[must_use]
1284    pub fn asymmetric_compaction_enabled(&self) -> bool {
1285        self.asymmetric_compaction
1286    }
1287
1288    /// Returns whether per-key tracking is enabled (F057).
1289    #[must_use]
1290    pub fn per_key_tracking_enabled(&self) -> bool {
1291        self.per_key_tracking
1292    }
1293
1294    /// Returns the left-side statistics (F057).
1295    #[must_use]
1296    pub fn left_stats(&self) -> &SideStats {
1297        &self.left_stats
1298    }
1299
1300    /// Returns the right-side statistics (F057).
1301    #[must_use]
1302    pub fn right_stats(&self) -> &SideStats {
1303        &self.right_stats
1304    }
1305
1306    /// Returns the number of tracked keys (F057).
1307    #[must_use]
1308    pub fn tracked_key_count(&self) -> usize {
1309        self.key_metadata.len()
1310    }
1311
1312    /// Checks if a side is considered "finished" (idle) (F057).
1313    #[must_use]
1314    pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool {
1315        match side {
1316            JoinSide::Left => self
1317                .left_stats
1318                .is_idle(current_time, self.idle_threshold_ms),
1319            JoinSide::Right => self
1320                .right_stats
1321                .is_idle(current_time, self.idle_threshold_ms),
1322        }
1323    }
1324
1325    /// Determines the effective build side based on configuration or heuristics (F057).
1326    #[must_use]
1327    pub fn effective_build_side(&self) -> JoinSide {
1328        // Use configured build side if set
1329        if let Some(side) = self.build_side {
1330            return side;
1331        }
1332
1333        // Auto-select based on statistics: smaller side is typically better as build
1334        if self.left_stats.events_received < self.right_stats.events_received {
1335            JoinSide::Left
1336        } else {
1337            JoinSide::Right
1338        }
1339    }
1340
1341    /// Processes an event from either the left or right side.
1342    ///
1343    /// This is the main entry point for the join operator. Call this with
1344    /// the appropriate `JoinSide` to indicate which stream the event came from.
1345    pub fn process_side(
1346        &mut self,
1347        event: &Event,
1348        side: JoinSide,
1349        ctx: &mut OperatorContext,
1350    ) -> OutputVec {
1351        match side {
1352            JoinSide::Left => self.process_left(event, ctx),
1353            JoinSide::Right => self.process_right(event, ctx),
1354        }
1355    }
1356
1357    /// Processes a left-side event.
1358    fn process_left(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1359        self.metrics.left_events += 1;
1360
1361        // F057: Track side statistics for asymmetric compaction
1362        self.left_stats.record_event(ctx.processing_time);
1363
1364        // Capture left schema on first event
1365        if self.left_schema.is_none() {
1366            self.left_schema = Some(event.data.schema());
1367            self.update_output_schema();
1368        }
1369
1370        self.process_event(event, JoinSide::Left, ctx)
1371    }
1372
1373    /// Processes a right-side event.
1374    fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1375        self.metrics.right_events += 1;
1376
1377        // F057: Track side statistics for asymmetric compaction
1378        self.right_stats.record_event(ctx.processing_time);
1379
1380        // Capture right schema on first event
1381        if self.right_schema.is_none() {
1382            self.right_schema = Some(event.data.schema());
1383            self.update_output_schema();
1384        }
1385
1386        self.process_event(event, JoinSide::Right, ctx)
1387    }
1388
1389    /// Updates the output schema when both input schemas are known.
1390    fn update_output_schema(&mut self) {
1391        if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
1392            let mut fields: Vec<Field> = left.fields().iter().map(|f| f.as_ref().clone()).collect();
1393
1394            // Add right fields, prefixing duplicates
1395            for field in right.fields() {
1396                let name = if left.field_with_name(field.name()).is_ok() {
1397                    format!("right_{}", field.name())
1398                } else {
1399                    field.name().clone()
1400                };
1401                fields.push(Field::new(
1402                    name,
1403                    field.data_type().clone(),
1404                    true, // Nullable for outer joins
1405                ));
1406            }
1407
1408            self.output_schema = Some(Arc::new(Schema::new(fields)));
1409        }
1410    }
1411
1412    /// Core event processing logic.
1413    fn process_event(
1414        &mut self,
1415        event: &Event,
1416        side: JoinSide,
1417        ctx: &mut OperatorContext,
1418    ) -> OutputVec {
1419        let mut output = OutputVec::new();
1420        let event_time = event.timestamp;
1421
1422        // Update watermark
1423        let emitted_watermark = ctx.watermark_generator.on_event(event_time);
1424
1425        // F057: Track per-side watermarks for build-side pruning
1426        match side {
1427            JoinSide::Left => self.left_watermark = self.left_watermark.max(event_time),
1428            JoinSide::Right => self.right_watermark = self.right_watermark.max(event_time),
1429        }
1430
1431        // Check if event is too late
1432        let current_wm = ctx.watermark_generator.current_watermark();
1433        if current_wm > i64::MIN && event_time + self.time_bound_ms < current_wm {
1434            self.metrics.late_events += 1;
1435            output.push(Output::LateEvent(event.clone()));
1436            return output;
1437        }
1438
1439        // Extract join key
1440        let key_column = match side {
1441            JoinSide::Left => &self.left_key_column,
1442            JoinSide::Right => &self.right_key_column,
1443        };
1444        let Some(key_value) = Self::extract_key(&event.data, key_column) else {
1445            // Can't extract key, skip this event
1446            return output;
1447        };
1448
1449        // F057: Compute key hash for per-key tracking
1450        let key_hash = fxhash::hash64(&key_value);
1451
1452        // F057: Track per-key metadata
1453        if self.per_key_tracking {
1454            self.key_metadata
1455                .entry(key_hash)
1456                .and_modify(|meta| meta.record_event(ctx.processing_time))
1457                .or_insert_with(|| KeyMetadata::new(ctx.processing_time));
1458            self.metrics.tracked_keys = self.key_metadata.len() as u64;
1459        }
1460
1461        // Create join row with configured encoding (F057)
1462        let join_row = match JoinRow::with_encoding(
1463            event_time,
1464            key_value.clone(),
1465            &event.data,
1466            self.row_encoding,
1467        ) {
1468            Ok(row) => {
1469                // Track encoding metrics
1470                match self.row_encoding {
1471                    JoinRowEncoding::Compact => self.metrics.compact_encodes += 1,
1472                    JoinRowEncoding::CpuFriendly => self.metrics.cpu_friendly_encodes += 1,
1473                }
1474                row
1475            }
1476            Err(_) => return output,
1477        };
1478
1479        // Store the event in state
1480        let state_key = Self::make_state_key(side, &key_value, event_time);
1481        if ctx.state.put_typed(&state_key, &join_row).is_err() {
1482            return output;
1483        }
1484
1485        // Register cleanup timer
1486        let cleanup_time = event_time + self.time_bound_ms;
1487        let timer_key = Self::make_timer_key(side, &state_key);
1488        ctx.timers
1489            .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
1490
1491        // For outer joins, register unmatched emission timer
1492        if (side == JoinSide::Left && self.join_type.emits_unmatched_left())
1493            || (side == JoinSide::Right && self.join_type.emits_unmatched_right())
1494        {
1495            let unmatched_timer_key = Self::make_unmatched_timer_key(side, &state_key);
1496            ctx.timers.register_timer(
1497                cleanup_time,
1498                Some(unmatched_timer_key),
1499                Some(ctx.operator_index),
1500            );
1501        }
1502
1503        // F057: Build-side pruning - prune entries that can no longer produce matches
1504        if self.build_side_pruning {
1505            self.prune_build_side(side, ctx);
1506        }
1507
1508        // Probe the opposite side for matches
1509        let matches = self.probe_opposite_side(side, &key_value, event_time, ctx.state);
1510
1511        // Emit join results
1512        for (other_row_key, mut other_row) in matches {
1513            self.metrics.matches += 1;
1514
1515            // Mark this row as matched in state
1516            other_row.matched = true;
1517            let _ = ctx.state.put_typed(&other_row_key, &other_row);
1518
1519            // Also mark our row as matched
1520            if let Ok(Some(mut our_row)) = ctx.state.get_typed::<JoinRow>(&state_key) {
1521                our_row.matched = true;
1522                let _ = ctx.state.put_typed(&state_key, &our_row);
1523            }
1524
1525            // Create joined output
1526            if let Some(joined_event) = self.create_joined_event(
1527                side,
1528                &join_row,
1529                &other_row,
1530                std::cmp::max(event_time, other_row.timestamp),
1531            ) {
1532                output.push(Output::Event(joined_event));
1533            }
1534        }
1535
1536        // Emit watermark if generated
1537        if let Some(wm) = emitted_watermark {
1538            output.push(Output::Watermark(wm.timestamp()));
1539        }
1540
1541        output
1542    }
1543
1544    /// F057: Prunes build-side entries that cannot produce future matches.
1545    ///
1546    /// An entry can be pruned if its `timestamp + time_bound < probe_side_watermark`,
1547    /// meaning the probe side has advanced beyond any possible match window.
1548    fn prune_build_side(&mut self, current_side: JoinSide, ctx: &mut OperatorContext) {
1549        let build_side = self.effective_build_side();
1550
1551        // Only prune when processing from probe side
1552        if current_side == build_side {
1553            return;
1554        }
1555
1556        // Get probe side watermark
1557        let probe_watermark = match build_side {
1558            JoinSide::Left => self.right_watermark,
1559            JoinSide::Right => self.left_watermark,
1560        };
1561
1562        if probe_watermark == i64::MIN {
1563            return;
1564        }
1565
1566        // Calculate prune threshold
1567        let prune_threshold = probe_watermark - self.time_bound_ms;
1568        if prune_threshold == i64::MIN {
1569            return;
1570        }
1571
1572        // For inner joins, we can prune more aggressively
1573        if self.join_type == JoinType::Inner {
1574            let prefix = match build_side {
1575                JoinSide::Left => LEFT_STATE_PREFIX,
1576                JoinSide::Right => RIGHT_STATE_PREFIX,
1577            };
1578
1579            // Reuse prune_buffer to avoid per-call allocation
1580            self.prune_buffer.clear();
1581            let time_bound = self.time_bound_ms;
1582            for (key, value) in ctx.state.prefix_scan(prefix) {
1583                if self.prune_buffer.len() >= 100 {
1584                    break; // Limit per-event pruning to avoid latency spikes
1585                }
1586                // Try to get timestamp from the key (bytes 12-20)
1587                if key.len() >= 20 {
1588                    if let Ok(ts_bytes) = <[u8; 8]>::try_from(&key[12..20]) {
1589                        let timestamp = i64::from_be_bytes(ts_bytes);
1590                        if timestamp + time_bound < prune_threshold {
1591                            // Also verify via deserialization
1592                            if let Ok(row) =
1593                                rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1594                                    .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1595                            {
1596                                if row.timestamp + time_bound < prune_threshold {
1597                                    self.prune_buffer.push(key);
1598                                }
1599                            }
1600                        }
1601                    }
1602                }
1603            }
1604
1605            for key in &self.prune_buffer {
1606                if ctx.state.delete(key).is_ok() {
1607                    self.metrics.build_side_prunes += 1;
1608                }
1609            }
1610        }
1611    }
1612
1613    /// F057: Scans for idle keys and cleans them up aggressively.
1614    ///
1615    /// Called periodically (e.g., on timer) to identify keys with no recent
1616    /// activity and remove their state entries.
1617    pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext) {
1618        if !self.per_key_tracking {
1619            return;
1620        }
1621
1622        let threshold = ctx.processing_time - self.key_idle_threshold_ms;
1623
1624        // Find idle keys
1625        let idle_keys: Vec<u64> = self
1626            .key_metadata
1627            .iter()
1628            .filter(|(_, meta)| meta.last_activity < threshold && meta.state_entries == 0)
1629            .map(|(k, _)| *k)
1630            .collect();
1631
1632        // Remove idle key metadata
1633        for key_hash in idle_keys {
1634            self.key_metadata.remove(&key_hash);
1635            self.metrics.idle_key_cleanups += 1;
1636        }
1637
1638        self.metrics.tracked_keys = self.key_metadata.len() as u64;
1639    }
1640
1641    /// F057: Checks if compaction should be skipped for a side due to asymmetric optimization.
1642    #[must_use]
1643    pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool {
1644        if !self.asymmetric_compaction {
1645            return false;
1646        }
1647
1648        // Skip compaction if side is idle
1649        let is_idle = match side {
1650            JoinSide::Left => self
1651                .left_stats
1652                .is_idle(current_time, self.idle_threshold_ms),
1653            JoinSide::Right => self
1654                .right_stats
1655                .is_idle(current_time, self.idle_threshold_ms),
1656        };
1657
1658        if is_idle {
1659            // Note: metrics update happens in the caller
1660            true
1661        } else {
1662            false
1663        }
1664    }
1665
1666    /// Extracts the join key value from a record batch.
1667    fn extract_key(batch: &RecordBatch, column_name: &str) -> Option<Vec<u8>> {
1668        let column_index = batch.schema().index_of(column_name).ok()?;
1669        let column = batch.column(column_index);
1670
1671        // Handle different column types
1672        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
1673            if string_array.is_empty() || string_array.is_null(0) {
1674                return None;
1675            }
1676            return Some(string_array.value(0).as_bytes().to_vec());
1677        }
1678
1679        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
1680            if int_array.is_empty() || int_array.is_null(0) {
1681                return None;
1682            }
1683            return Some(int_array.value(0).to_le_bytes().to_vec());
1684        }
1685
1686        // For other types, use the raw bytes if available
1687        // This is a fallback - in practice, keys should be string or integer
1688        None
1689    }
1690
1691    /// Creates a state key for storing a join row.
1692    #[allow(clippy::cast_sign_loss)]
1693    fn make_state_key(side: JoinSide, key_value: &[u8], timestamp: i64) -> Vec<u8> {
1694        let prefix = match side {
1695            JoinSide::Left => LEFT_STATE_PREFIX,
1696            JoinSide::Right => RIGHT_STATE_PREFIX,
1697        };
1698
1699        let event_id = EVENT_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
1700
1701        // Key format: prefix (4) + key_hash (8) + timestamp (8) + event_id (8) = 28 bytes
1702        let mut key = Vec::with_capacity(28);
1703        key.extend_from_slice(prefix);
1704
1705        // Use FxHash for the key value
1706        let key_hash = fxhash::hash64(key_value);
1707        key.extend_from_slice(&key_hash.to_be_bytes());
1708        key.extend_from_slice(&timestamp.to_be_bytes());
1709        key.extend_from_slice(&event_id.to_be_bytes());
1710
1711        key
1712    }
1713
1714    /// Creates a timer key for cleanup.
1715    fn make_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1716        let prefix = match side {
1717            JoinSide::Left => LEFT_TIMER_PREFIX,
1718            JoinSide::Right => RIGHT_TIMER_PREFIX,
1719        };
1720
1721        let mut key = TimerKey::new();
1722        key.push(prefix);
1723        key.extend_from_slice(state_key);
1724        key
1725    }
1726
1727    /// Creates a timer key for unmatched event emission.
1728    fn make_unmatched_timer_key(side: JoinSide, state_key: &[u8]) -> TimerKey {
1729        let side_byte = match side {
1730            JoinSide::Left => 0x01,
1731            JoinSide::Right => 0x02,
1732        };
1733
1734        let mut key = TimerKey::new();
1735        key.push(UNMATCHED_TIMER_PREFIX);
1736        key.push(side_byte);
1737        key.extend_from_slice(state_key);
1738        key
1739    }
1740
1741    /// Probes the opposite side for matching events.
1742    fn probe_opposite_side(
1743        &self,
1744        current_side: JoinSide,
1745        key_value: &[u8],
1746        timestamp: i64,
1747        state: &dyn StateStore,
1748    ) -> Vec<(Vec<u8>, JoinRow)> {
1749        let mut matches = Vec::new();
1750
1751        let prefix = match current_side {
1752            JoinSide::Left => RIGHT_STATE_PREFIX,
1753            JoinSide::Right => LEFT_STATE_PREFIX,
1754        };
1755
1756        // Build prefix for scanning: prefix + key_hash
1757        let key_hash = fxhash::hash64(key_value);
1758        let mut scan_prefix = Vec::with_capacity(12);
1759        scan_prefix.extend_from_slice(prefix);
1760        scan_prefix.extend_from_slice(&key_hash.to_be_bytes());
1761
1762        // Scan for matching keys
1763        for (state_key, value) in state.prefix_scan(&scan_prefix) {
1764            // Deserialize the join row
1765            let Ok(row) = rkyv::access::<rkyv::Archived<JoinRow>, RkyvError>(&value)
1766                .and_then(rkyv::deserialize::<JoinRow, RkyvError>)
1767            else {
1768                continue;
1769            };
1770
1771            // Check if timestamps are within time bound
1772            let time_diff = (timestamp - row.timestamp).abs();
1773            if time_diff <= self.time_bound_ms {
1774                // Verify key matches (in case of hash collision)
1775                if row.key_value == key_value {
1776                    matches.push((state_key.to_vec(), row));
1777                }
1778            }
1779        }
1780
1781        matches
1782    }
1783
1784    /// Creates a joined event from two matching rows.
1785    fn create_joined_event(
1786        &self,
1787        current_side: JoinSide,
1788        current_row: &JoinRow,
1789        other_row: &JoinRow,
1790        output_timestamp: i64,
1791    ) -> Option<Event> {
1792        let (left_row, right_row) = match current_side {
1793            JoinSide::Left => (current_row, other_row),
1794            JoinSide::Right => (other_row, current_row),
1795        };
1796
1797        let left_batch = left_row.to_batch().ok()?;
1798        let right_batch = right_row.to_batch().ok()?;
1799
1800        let joined_batch = self.concat_batches(&left_batch, &right_batch)?;
1801
1802        Some(Event::new(output_timestamp, joined_batch))
1803    }
1804
1805    /// Concatenates two batches horizontally.
1806    fn concat_batches(&self, left: &RecordBatch, right: &RecordBatch) -> Option<RecordBatch> {
1807        let schema = self.output_schema.as_ref()?;
1808
1809        let mut columns: Vec<ArrayRef> = left.columns().to_vec();
1810
1811        // Add right columns
1812        for column in right.columns() {
1813            columns.push(Arc::clone(column));
1814        }
1815
1816        RecordBatch::try_new(Arc::clone(schema), columns).ok()
1817    }
1818
1819    /// Creates an unmatched event for outer joins.
1820    fn create_unmatched_event(&self, side: JoinSide, row: &JoinRow) -> Option<Event> {
1821        let batch = row.to_batch().ok()?;
1822        let schema = self.output_schema.as_ref()?;
1823
1824        let num_rows = batch.num_rows();
1825        let mut columns: Vec<ArrayRef> = Vec::new();
1826
1827        match side {
1828            JoinSide::Left => {
1829                // Left columns are populated, right columns are null
1830                columns.extend(batch.columns().iter().cloned());
1831
1832                // Add null columns for right side
1833                if let Some(right_schema) = &self.right_schema {
1834                    for field in right_schema.fields() {
1835                        columns.push(Self::create_null_array(field.data_type(), num_rows));
1836                    }
1837                }
1838            }
1839            JoinSide::Right => {
1840                // Left columns are null, right columns are populated
1841                if let Some(left_schema) = &self.left_schema {
1842                    for field in left_schema.fields() {
1843                        columns.push(Self::create_null_array(field.data_type(), num_rows));
1844                    }
1845                }
1846
1847                columns.extend(batch.columns().iter().cloned());
1848            }
1849        }
1850
1851        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
1852
1853        Some(Event::new(row.timestamp, joined_batch))
1854    }
1855
1856    /// Creates a null array of the given type and length.
1857    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
1858        match data_type {
1859            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
1860            // Default to Int64 for all other types (add more specific cases as needed)
1861            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
1862        }
1863    }
1864
1865    /// Handles cleanup timer expiration.
1866    fn handle_cleanup_timer(
1867        &mut self,
1868        side: JoinSide,
1869        state_key: &[u8],
1870        ctx: &mut OperatorContext,
1871    ) -> OutputVec {
1872        let output = OutputVec::new();
1873
1874        // F057: Check asymmetric compaction - skip if side is idle
1875        if self.should_skip_compaction(side, ctx.processing_time) {
1876            self.metrics.asymmetric_skips += 1;
1877            // Don't skip the actual cleanup, but could be used for compaction
1878        }
1879
1880        // F057: Update per-key metadata before deleting
1881        if self.per_key_tracking && state_key.len() >= 12 {
1882            // Extract key hash from state key (bytes 4-12)
1883            if let Ok(hash_bytes) = state_key[4..12].try_into() {
1884                let key_hash = u64::from_be_bytes(hash_bytes);
1885                if let Some(meta) = self.key_metadata.get_mut(&key_hash) {
1886                    meta.decrement_entries();
1887                }
1888            }
1889        }
1890
1891        // Delete the state entry
1892        if ctx.state.delete(state_key).is_ok() {
1893            self.metrics.state_cleanups += 1;
1894        }
1895
1896        output
1897    }
1898
1899    /// Handles unmatched timer expiration for outer joins.
1900    fn handle_unmatched_timer(
1901        &mut self,
1902        side: JoinSide,
1903        state_key: &[u8],
1904        ctx: &mut OperatorContext,
1905    ) -> OutputVec {
1906        let mut output = OutputVec::new();
1907
1908        // Get the join row
1909        let Ok(Some(row)) = ctx.state.get_typed::<JoinRow>(state_key) else {
1910            return output;
1911        };
1912
1913        // Only emit if not matched
1914        if !row.matched {
1915            match side {
1916                JoinSide::Left if self.join_type.emits_unmatched_left() => {
1917                    self.metrics.unmatched_left += 1;
1918                    if let Some(event) = self.create_unmatched_event(side, &row) {
1919                        output.push(Output::Event(event));
1920                    }
1921                }
1922                JoinSide::Right if self.join_type.emits_unmatched_right() => {
1923                    self.metrics.unmatched_right += 1;
1924                    if let Some(event) = self.create_unmatched_event(side, &row) {
1925                        output.push(Output::Event(event));
1926                    }
1927                }
1928                _ => {}
1929            }
1930        }
1931
1932        output
1933    }
1934
1935    /// Parses a timer key to determine its type and extract the state key.
1936    fn parse_timer_key(key: &[u8]) -> Option<(TimerKeyType, JoinSide, Vec<u8>)> {
1937        if key.is_empty() {
1938            return None;
1939        }
1940
1941        match key[0] {
1942            LEFT_TIMER_PREFIX => {
1943                let state_key = key[1..].to_vec();
1944                Some((TimerKeyType::Cleanup, JoinSide::Left, state_key))
1945            }
1946            RIGHT_TIMER_PREFIX => {
1947                let state_key = key[1..].to_vec();
1948                Some((TimerKeyType::Cleanup, JoinSide::Right, state_key))
1949            }
1950            UNMATCHED_TIMER_PREFIX => {
1951                if key.len() < 2 {
1952                    return None;
1953                }
1954                let side = match key[1] {
1955                    0x01 => JoinSide::Left,
1956                    0x02 => JoinSide::Right,
1957                    _ => return None,
1958                };
1959                let state_key = key[2..].to_vec();
1960                Some((TimerKeyType::Unmatched, side, state_key))
1961            }
1962            _ => None,
1963        }
1964    }
1965}
1966
1967/// Type of timer key.
1968#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1969enum TimerKeyType {
1970    /// Cleanup timer - delete state entry.
1971    Cleanup,
1972    /// Unmatched timer - emit unmatched event for outer joins.
1973    Unmatched,
1974}
1975
1976impl Operator for StreamJoinOperator {
1977    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
1978        // Default to left side - in practice, users should call process_side directly
1979        self.process_left(event, ctx)
1980    }
1981
1982    fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext) -> OutputVec {
1983        let Some((timer_type, side, state_key)) = Self::parse_timer_key(&timer.key) else {
1984            return OutputVec::new();
1985        };
1986
1987        match timer_type {
1988            TimerKeyType::Cleanup => self.handle_cleanup_timer(side, &state_key, ctx),
1989            TimerKeyType::Unmatched => self.handle_unmatched_timer(side, &state_key, ctx),
1990        }
1991    }
1992
1993    fn checkpoint(&self) -> OperatorState {
1994        // Checkpoint the metrics, configuration, and F057 state
1995        // Use nested tuples to stay within rkyv's tuple size limit (max 12)
1996        // Format: ((config), (core_metrics), (f057_metrics, side_stats))
1997        let checkpoint_data = (
1998            // Part 1: Configuration (3 elements)
1999            (
2000                self.left_key_column.clone(),
2001                self.right_key_column.clone(),
2002                self.time_bound_ms,
2003            ),
2004            // Part 2: Core metrics (3 elements)
2005            (
2006                self.metrics.left_events,
2007                self.metrics.right_events,
2008                self.metrics.matches,
2009            ),
2010            // Part 3: F057 metrics (5 elements)
2011            (
2012                self.metrics.cpu_friendly_encodes,
2013                self.metrics.compact_encodes,
2014                self.metrics.asymmetric_skips,
2015                self.metrics.idle_key_cleanups,
2016                self.metrics.build_side_prunes,
2017            ),
2018            // Part 4: F057 side stats (4 elements)
2019            (
2020                self.left_stats.events_received,
2021                self.right_stats.events_received,
2022                self.left_watermark,
2023                self.right_watermark,
2024            ),
2025        );
2026
2027        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
2028            .map(|v| v.to_vec())
2029            .unwrap_or_default();
2030
2031        OperatorState {
2032            operator_id: self.operator_id.clone(),
2033            data,
2034        }
2035    }
2036
2037    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
2038        // Extended checkpoint data type with F057 fields using nested tuples
2039        type CheckpointData = (
2040            (String, String, i64),     // config
2041            (u64, u64, u64),           // core metrics
2042            (u64, u64, u64, u64, u64), // F057 metrics
2043            (u64, u64, i64, i64),      // F057 side stats
2044        );
2045        // Legacy checkpoint type for backward compatibility
2046        type LegacyCheckpointData = (String, String, i64, u64, u64, u64);
2047
2048        if state.operator_id != self.operator_id {
2049            return Err(OperatorError::StateAccessFailed(format!(
2050                "Operator ID mismatch: expected {}, got {}",
2051                self.operator_id, state.operator_id
2052            )));
2053        }
2054
2055        // Try to restore full F057 checkpoint first
2056        if let Ok(archived) = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
2057        {
2058            if let Ok(data) = rkyv::deserialize::<CheckpointData, RkyvError>(archived) {
2059                let (
2060                    _config,
2061                    (left_events, right_events, matches),
2062                    (
2063                        cpu_friendly_encodes,
2064                        compact_encodes,
2065                        asymmetric_skips,
2066                        idle_key_cleanups,
2067                        build_side_prunes,
2068                    ),
2069                    (left_received, right_received, left_wm, right_wm),
2070                ) = data;
2071
2072                self.metrics.left_events = left_events;
2073                self.metrics.right_events = right_events;
2074                self.metrics.matches = matches;
2075                self.metrics.cpu_friendly_encodes = cpu_friendly_encodes;
2076                self.metrics.compact_encodes = compact_encodes;
2077                self.metrics.asymmetric_skips = asymmetric_skips;
2078                self.metrics.idle_key_cleanups = idle_key_cleanups;
2079                self.metrics.build_side_prunes = build_side_prunes;
2080                self.left_stats.events_received = left_received;
2081                self.right_stats.events_received = right_received;
2082                self.left_watermark = left_wm;
2083                self.right_watermark = right_wm;
2084
2085                return Ok(());
2086            }
2087        }
2088
2089        // Fall back to legacy checkpoint format
2090        let archived = rkyv::access::<rkyv::Archived<LegacyCheckpointData>, RkyvError>(&state.data)
2091            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2092        let (_, _, _, left_events, right_events, matches) =
2093            rkyv::deserialize::<LegacyCheckpointData, RkyvError>(archived)
2094                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
2095
2096        self.metrics.left_events = left_events;
2097        self.metrics.right_events = right_events;
2098        self.metrics.matches = matches;
2099
2100        Ok(())
2101    }
2102}
2103
2104#[cfg(test)]
2105#[allow(clippy::cast_possible_wrap)]
2106mod tests {
2107    use super::*;
2108    use crate::state::InMemoryStore;
2109    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
2110    use arrow_array::{Int64Array, StringArray};
2111    use arrow_schema::{DataType, Field, Schema};
2112
2113    fn create_order_event(timestamp: i64, order_id: &str, amount: i64) -> Event {
2114        let schema = Arc::new(Schema::new(vec![
2115            Field::new("order_id", DataType::Utf8, false),
2116            Field::new("amount", DataType::Int64, false),
2117        ]));
2118        let batch = RecordBatch::try_new(
2119            schema,
2120            vec![
2121                Arc::new(StringArray::from(vec![order_id])),
2122                Arc::new(Int64Array::from(vec![amount])),
2123            ],
2124        )
2125        .unwrap();
2126        Event::new(timestamp, batch)
2127    }
2128
2129    fn create_payment_event(timestamp: i64, order_id: &str, status: &str) -> Event {
2130        let schema = Arc::new(Schema::new(vec![
2131            Field::new("order_id", DataType::Utf8, false),
2132            Field::new("status", DataType::Utf8, false),
2133        ]));
2134        let batch = RecordBatch::try_new(
2135            schema,
2136            vec![
2137                Arc::new(StringArray::from(vec![order_id])),
2138                Arc::new(StringArray::from(vec![status])),
2139            ],
2140        )
2141        .unwrap();
2142        Event::new(timestamp, batch)
2143    }
2144
2145    fn create_test_context<'a>(
2146        timers: &'a mut TimerService,
2147        state: &'a mut dyn StateStore,
2148        watermark_gen: &'a mut dyn WatermarkGenerator,
2149    ) -> OperatorContext<'a> {
2150        OperatorContext {
2151            event_time: 0,
2152            processing_time: 0,
2153            timers,
2154            state,
2155            watermark_generator: watermark_gen,
2156            operator_index: 0,
2157        }
2158    }
2159
2160    #[test]
2161    fn test_join_type_properties() {
2162        assert!(!JoinType::Inner.emits_unmatched_left());
2163        assert!(!JoinType::Inner.emits_unmatched_right());
2164
2165        assert!(JoinType::Left.emits_unmatched_left());
2166        assert!(!JoinType::Left.emits_unmatched_right());
2167
2168        assert!(!JoinType::Right.emits_unmatched_left());
2169        assert!(JoinType::Right.emits_unmatched_right());
2170
2171        assert!(JoinType::Full.emits_unmatched_left());
2172        assert!(JoinType::Full.emits_unmatched_right());
2173    }
2174
2175    #[test]
2176    fn test_join_operator_creation() {
2177        let operator = StreamJoinOperator::new(
2178            "order_id".to_string(),
2179            "order_id".to_string(),
2180            Duration::from_secs(3600),
2181            JoinType::Inner,
2182        );
2183
2184        assert_eq!(operator.join_type(), JoinType::Inner);
2185        assert_eq!(operator.time_bound_ms(), 3_600_000);
2186    }
2187
2188    #[test]
2189    fn test_join_operator_with_id() {
2190        let operator = StreamJoinOperator::with_id(
2191            "order_id".to_string(),
2192            "order_id".to_string(),
2193            Duration::from_secs(3600),
2194            JoinType::Left,
2195            "test_join".to_string(),
2196        );
2197
2198        assert_eq!(operator.operator_id, "test_join");
2199        assert_eq!(operator.join_type(), JoinType::Left);
2200    }
2201
2202    #[test]
2203    fn test_inner_join_basic() {
2204        let mut operator = StreamJoinOperator::with_id(
2205            "order_id".to_string(),
2206            "order_id".to_string(),
2207            Duration::from_secs(3600),
2208            JoinType::Inner,
2209            "test_join".to_string(),
2210        );
2211
2212        let mut timers = TimerService::new();
2213        let mut state = InMemoryStore::new();
2214        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2215
2216        // Process left event (order)
2217        let order = create_order_event(1000, "order_1", 100);
2218        {
2219            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2220            let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2221            // No match yet, should produce no output
2222            assert!(
2223                outputs
2224                    .iter()
2225                    .filter(|o| matches!(o, Output::Event(_)))
2226                    .count()
2227                    == 0
2228            );
2229        }
2230
2231        // Process right event (payment) - should produce a match
2232        let payment = create_payment_event(2000, "order_1", "paid");
2233        {
2234            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2235            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2236            // Should have one match
2237            assert_eq!(
2238                outputs
2239                    .iter()
2240                    .filter(|o| matches!(o, Output::Event(_)))
2241                    .count(),
2242                1
2243            );
2244        }
2245
2246        assert_eq!(operator.metrics().matches, 1);
2247        assert_eq!(operator.metrics().left_events, 1);
2248        assert_eq!(operator.metrics().right_events, 1);
2249    }
2250
2251    #[test]
2252    fn test_inner_join_no_match_different_key() {
2253        let mut operator = StreamJoinOperator::with_id(
2254            "order_id".to_string(),
2255            "order_id".to_string(),
2256            Duration::from_secs(3600),
2257            JoinType::Inner,
2258            "test_join".to_string(),
2259        );
2260
2261        let mut timers = TimerService::new();
2262        let mut state = InMemoryStore::new();
2263        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2264
2265        // Process left event
2266        let order = create_order_event(1000, "order_1", 100);
2267        {
2268            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2269            operator.process_side(&order, JoinSide::Left, &mut ctx);
2270        }
2271
2272        // Process right event with different key
2273        let payment = create_payment_event(2000, "order_2", "paid");
2274        {
2275            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2276            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2277            // No match due to different key
2278            assert_eq!(
2279                outputs
2280                    .iter()
2281                    .filter(|o| matches!(o, Output::Event(_)))
2282                    .count(),
2283                0
2284            );
2285        }
2286
2287        assert_eq!(operator.metrics().matches, 0);
2288    }
2289
2290    #[test]
2291    fn test_inner_join_no_match_outside_time_bound() {
2292        let mut operator = StreamJoinOperator::with_id(
2293            "order_id".to_string(),
2294            "order_id".to_string(),
2295            Duration::from_secs(1), // 1 second time bound
2296            JoinType::Inner,
2297            "test_join".to_string(),
2298        );
2299
2300        let mut timers = TimerService::new();
2301        let mut state = InMemoryStore::new();
2302        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2303
2304        // Process left event at t=1000
2305        let order = create_order_event(1000, "order_1", 100);
2306        {
2307            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2308            operator.process_side(&order, JoinSide::Left, &mut ctx);
2309        }
2310
2311        // Process right event at t=5000 (4 seconds later, outside 1s bound)
2312        let payment = create_payment_event(5000, "order_1", "paid");
2313        {
2314            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2315            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2316            // No match due to time bound
2317            assert_eq!(
2318                outputs
2319                    .iter()
2320                    .filter(|o| matches!(o, Output::Event(_)))
2321                    .count(),
2322                0
2323            );
2324        }
2325
2326        assert_eq!(operator.metrics().matches, 0);
2327    }
2328
2329    #[test]
2330    fn test_join_multiple_matches() {
2331        let mut operator = StreamJoinOperator::with_id(
2332            "order_id".to_string(),
2333            "order_id".to_string(),
2334            Duration::from_secs(3600),
2335            JoinType::Inner,
2336            "test_join".to_string(),
2337        );
2338
2339        let mut timers = TimerService::new();
2340        let mut state = InMemoryStore::new();
2341        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2342
2343        // Process two left events with same key
2344        for ts in [1000, 2000] {
2345            let order = create_order_event(ts, "order_1", 100);
2346            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2347            operator.process_side(&order, JoinSide::Left, &mut ctx);
2348        }
2349
2350        // Process right event - should match both
2351        let payment = create_payment_event(1500, "order_1", "paid");
2352        {
2353            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2354            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2355            // Should have two matches
2356            assert_eq!(
2357                outputs
2358                    .iter()
2359                    .filter(|o| matches!(o, Output::Event(_)))
2360                    .count(),
2361                2
2362            );
2363        }
2364
2365        assert_eq!(operator.metrics().matches, 2);
2366    }
2367
2368    #[test]
2369    fn test_join_late_event() {
2370        let mut operator = StreamJoinOperator::with_id(
2371            "order_id".to_string(),
2372            "order_id".to_string(),
2373            Duration::from_secs(1),
2374            JoinType::Inner,
2375            "test_join".to_string(),
2376        );
2377
2378        let mut timers = TimerService::new();
2379        let mut state = InMemoryStore::new();
2380        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(0);
2381
2382        // Advance watermark significantly
2383        let future_order = create_order_event(10000, "order_2", 200);
2384        {
2385            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2386            operator.process_side(&future_order, JoinSide::Left, &mut ctx);
2387        }
2388
2389        // Process very late event
2390        let late_payment = create_payment_event(100, "order_1", "paid");
2391        {
2392            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2393            let outputs = operator.process_side(&late_payment, JoinSide::Right, &mut ctx);
2394            // Should be marked as late
2395            assert!(outputs.iter().any(|o| matches!(o, Output::LateEvent(_))));
2396        }
2397
2398        assert_eq!(operator.metrics().late_events, 1);
2399    }
2400
2401    #[test]
2402    fn test_join_row_serialization() {
2403        let schema = Arc::new(Schema::new(vec![
2404            Field::new("id", DataType::Utf8, false),
2405            Field::new("value", DataType::Int64, false),
2406        ]));
2407        let batch = RecordBatch::try_new(
2408            schema,
2409            vec![
2410                Arc::new(StringArray::from(vec!["test"])),
2411                Arc::new(Int64Array::from(vec![42])),
2412            ],
2413        )
2414        .unwrap();
2415
2416        let row = JoinRow::new(1000, b"key".to_vec(), &batch).unwrap();
2417
2418        // Verify we can deserialize back
2419        let restored_batch = row.to_batch().unwrap();
2420        assert_eq!(restored_batch.num_rows(), 1);
2421        assert_eq!(restored_batch.num_columns(), 2);
2422    }
2423
2424    #[test]
2425    fn test_cleanup_timer() {
2426        let mut operator = StreamJoinOperator::with_id(
2427            "order_id".to_string(),
2428            "order_id".to_string(),
2429            Duration::from_secs(1),
2430            JoinType::Inner,
2431            "test_join".to_string(),
2432        );
2433
2434        let mut timers = TimerService::new();
2435        let mut state = InMemoryStore::new();
2436        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2437
2438        // Process an event
2439        let order = create_order_event(1000, "order_1", 100);
2440        {
2441            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2442            operator.process_side(&order, JoinSide::Left, &mut ctx);
2443        }
2444
2445        // State should have one entry
2446        assert!(state.len() > 0);
2447        let initial_state_len = state.len();
2448
2449        // Get the registered timers and fire them
2450        let registered_timers = timers.poll_timers(2001); // After cleanup time
2451        assert!(!registered_timers.is_empty());
2452
2453        // Fire the cleanup timer - convert TimerRegistration to Timer
2454        for timer_reg in registered_timers {
2455            let timer = Timer {
2456                key: timer_reg.key.unwrap_or_default(),
2457                timestamp: timer_reg.timestamp,
2458            };
2459            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2460            operator.on_timer(timer, &mut ctx);
2461        }
2462
2463        // Verify cleanup happened (state decreased)
2464        assert!(state.len() < initial_state_len || operator.metrics().state_cleanups > 0);
2465    }
2466
2467    #[test]
2468    fn test_checkpoint_restore() {
2469        let mut operator = StreamJoinOperator::with_id(
2470            "order_id".to_string(),
2471            "order_id".to_string(),
2472            Duration::from_secs(3600),
2473            JoinType::Inner,
2474            "test_join".to_string(),
2475        );
2476
2477        // Simulate some activity
2478        operator.metrics.left_events = 10;
2479        operator.metrics.right_events = 5;
2480        operator.metrics.matches = 3;
2481
2482        // Checkpoint
2483        let checkpoint = operator.checkpoint();
2484
2485        // Create new operator and restore
2486        let mut restored = StreamJoinOperator::with_id(
2487            "order_id".to_string(),
2488            "order_id".to_string(),
2489            Duration::from_secs(3600),
2490            JoinType::Inner,
2491            "test_join".to_string(),
2492        );
2493
2494        restored.restore(checkpoint).unwrap();
2495
2496        assert_eq!(restored.metrics().left_events, 10);
2497        assert_eq!(restored.metrics().right_events, 5);
2498        assert_eq!(restored.metrics().matches, 3);
2499    }
2500
2501    #[test]
2502    fn test_metrics_reset() {
2503        let mut operator = StreamJoinOperator::new(
2504            "order_id".to_string(),
2505            "order_id".to_string(),
2506            Duration::from_secs(3600),
2507            JoinType::Inner,
2508        );
2509
2510        operator.metrics.left_events = 10;
2511        operator.metrics.matches = 5;
2512
2513        operator.reset_metrics();
2514
2515        assert_eq!(operator.metrics().left_events, 0);
2516        assert_eq!(operator.metrics().matches, 0);
2517    }
2518
2519    #[test]
2520    fn test_bidirectional_join() {
2521        let mut operator = StreamJoinOperator::with_id(
2522            "order_id".to_string(),
2523            "order_id".to_string(),
2524            Duration::from_secs(3600),
2525            JoinType::Inner,
2526            "test_join".to_string(),
2527        );
2528
2529        let mut timers = TimerService::new();
2530        let mut state = InMemoryStore::new();
2531        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2532
2533        // Right event arrives first
2534        let payment = create_payment_event(1000, "order_1", "paid");
2535        {
2536            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2537            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2538            assert_eq!(
2539                outputs
2540                    .iter()
2541                    .filter(|o| matches!(o, Output::Event(_)))
2542                    .count(),
2543                0
2544            );
2545        }
2546
2547        // Left event arrives and matches
2548        let order = create_order_event(1500, "order_1", 100);
2549        {
2550            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2551            let outputs = operator.process_side(&order, JoinSide::Left, &mut ctx);
2552            assert_eq!(
2553                outputs
2554                    .iter()
2555                    .filter(|o| matches!(o, Output::Event(_)))
2556                    .count(),
2557                1
2558            );
2559        }
2560
2561        assert_eq!(operator.metrics().matches, 1);
2562    }
2563
2564    #[test]
2565    fn test_integer_key_join() {
2566        fn create_int_key_event(timestamp: i64, key: i64, value: i64) -> Event {
2567            let schema = Arc::new(Schema::new(vec![
2568                Field::new("key", DataType::Int64, false),
2569                Field::new("value", DataType::Int64, false),
2570            ]));
2571            let batch = RecordBatch::try_new(
2572                schema,
2573                vec![
2574                    Arc::new(Int64Array::from(vec![key])),
2575                    Arc::new(Int64Array::from(vec![value])),
2576                ],
2577            )
2578            .unwrap();
2579            Event::new(timestamp, batch)
2580        }
2581
2582        let mut operator = StreamJoinOperator::with_id(
2583            "key".to_string(),
2584            "key".to_string(),
2585            Duration::from_secs(3600),
2586            JoinType::Inner,
2587            "test_join".to_string(),
2588        );
2589
2590        let mut timers = TimerService::new();
2591        let mut state = InMemoryStore::new();
2592        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2593
2594        // Process left with integer key
2595        let left = create_int_key_event(1000, 42, 100);
2596        {
2597            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2598            operator.process_side(&left, JoinSide::Left, &mut ctx);
2599        }
2600
2601        // Process right with same integer key
2602        let right = create_int_key_event(1500, 42, 200);
2603        {
2604            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2605            let outputs = operator.process_side(&right, JoinSide::Right, &mut ctx);
2606            assert_eq!(
2607                outputs
2608                    .iter()
2609                    .filter(|o| matches!(o, Output::Event(_)))
2610                    .count(),
2611                1
2612            );
2613        }
2614
2615        assert_eq!(operator.metrics().matches, 1);
2616    }
2617
2618    // F057: Stream Join Optimization Tests
2619
2620    #[test]
2621    fn test_f057_join_row_encoding_enum() {
2622        assert_eq!(JoinRowEncoding::default(), JoinRowEncoding::Compact);
2623        assert_eq!(format!("{}", JoinRowEncoding::Compact), "compact");
2624        assert_eq!(format!("{}", JoinRowEncoding::CpuFriendly), "cpu_friendly");
2625
2626        assert_eq!(
2627            "compact".parse::<JoinRowEncoding>().unwrap(),
2628            JoinRowEncoding::Compact
2629        );
2630        assert_eq!(
2631            "cpu_friendly".parse::<JoinRowEncoding>().unwrap(),
2632            JoinRowEncoding::CpuFriendly
2633        );
2634        assert_eq!(
2635            "cpu-friendly".parse::<JoinRowEncoding>().unwrap(),
2636            JoinRowEncoding::CpuFriendly
2637        );
2638        assert!("invalid".parse::<JoinRowEncoding>().is_err());
2639    }
2640
2641    #[test]
2642    fn test_f057_config_builder() {
2643        let config = StreamJoinConfig::builder()
2644            .left_key_column("order_id")
2645            .right_key_column("payment_id")
2646            .time_bound(Duration::from_secs(3600))
2647            .join_type(JoinType::Left)
2648            .operator_id("test_join")
2649            .row_encoding(JoinRowEncoding::CpuFriendly)
2650            .asymmetric_compaction(true)
2651            .idle_threshold(Duration::from_secs(120))
2652            .per_key_tracking(true)
2653            .key_idle_threshold(Duration::from_secs(600))
2654            .build_side_pruning(true)
2655            .build_side(JoinSide::Left)
2656            .build();
2657
2658        assert_eq!(config.left_key_column, "order_id");
2659        assert_eq!(config.right_key_column, "payment_id");
2660        assert_eq!(config.time_bound_ms, 3_600_000);
2661        assert_eq!(config.join_type, JoinType::Left);
2662        assert_eq!(config.operator_id, Some("test_join".to_string()));
2663        assert_eq!(config.row_encoding, JoinRowEncoding::CpuFriendly);
2664        assert!(config.asymmetric_compaction);
2665        assert_eq!(config.idle_threshold_ms, 120_000);
2666        assert!(config.per_key_tracking);
2667        assert_eq!(config.key_idle_threshold_ms, 600_000);
2668        assert!(config.build_side_pruning);
2669        assert_eq!(config.build_side, Some(JoinSide::Left));
2670    }
2671
2672    #[test]
2673    fn test_f057_from_config() {
2674        let config = StreamJoinConfig::builder()
2675            .left_key_column("key")
2676            .right_key_column("key")
2677            .time_bound(Duration::from_secs(60))
2678            .join_type(JoinType::Inner)
2679            .row_encoding(JoinRowEncoding::CpuFriendly)
2680            .build();
2681
2682        let operator = StreamJoinOperator::from_config(config);
2683
2684        assert_eq!(operator.row_encoding(), JoinRowEncoding::CpuFriendly);
2685        assert!(operator.asymmetric_compaction_enabled());
2686        assert!(operator.per_key_tracking_enabled());
2687    }
2688
2689    #[test]
2690    fn test_f057_cpu_friendly_encoding_roundtrip() {
2691        let schema = Arc::new(Schema::new(vec![
2692            Field::new("id", DataType::Utf8, false),
2693            Field::new("value", DataType::Int64, false),
2694            Field::new("price", DataType::Float64, false),
2695        ]));
2696        let batch = RecordBatch::try_new(
2697            schema,
2698            vec![
2699                Arc::new(StringArray::from(vec!["test_key"])),
2700                Arc::new(Int64Array::from(vec![42])),
2701                Arc::new(Float64Array::from(vec![99.99])),
2702            ],
2703        )
2704        .unwrap();
2705
2706        // Test CPU-friendly encoding
2707        let row =
2708            JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
2709                .unwrap();
2710        assert_eq!(row.encoding(), JoinRowEncoding::CpuFriendly);
2711
2712        // Verify roundtrip
2713        let restored = row.to_batch().unwrap();
2714        assert_eq!(restored.num_rows(), 1);
2715        assert_eq!(restored.num_columns(), 3);
2716
2717        // Verify values
2718        let id_col = restored
2719            .column(0)
2720            .as_any()
2721            .downcast_ref::<StringArray>()
2722            .unwrap();
2723        assert_eq!(id_col.value(0), "test_key");
2724
2725        let value_col = restored
2726            .column(1)
2727            .as_any()
2728            .downcast_ref::<Int64Array>()
2729            .unwrap();
2730        assert_eq!(value_col.value(0), 42);
2731
2732        let price_col = restored
2733            .column(2)
2734            .as_any()
2735            .downcast_ref::<Float64Array>()
2736            .unwrap();
2737        assert!((price_col.value(0) - 99.99).abs() < 0.001);
2738    }
2739
2740    #[test]
2741    fn test_f057_compact_encoding_still_works() {
2742        let schema = Arc::new(Schema::new(vec![
2743            Field::new("id", DataType::Utf8, false),
2744            Field::new("value", DataType::Int64, false),
2745        ]));
2746        let batch = RecordBatch::try_new(
2747            schema,
2748            vec![
2749                Arc::new(StringArray::from(vec!["test"])),
2750                Arc::new(Int64Array::from(vec![100])),
2751            ],
2752        )
2753        .unwrap();
2754
2755        let row = JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::Compact)
2756            .unwrap();
2757        assert_eq!(row.encoding(), JoinRowEncoding::Compact);
2758
2759        let restored = row.to_batch().unwrap();
2760        assert_eq!(restored.num_rows(), 1);
2761    }
2762
2763    #[test]
2764    fn test_f057_side_stats_tracking() {
2765        let mut stats = SideStats::new();
2766        assert_eq!(stats.events_received, 0);
2767        assert!(!stats.is_idle(1000, 60_000)); // No events yet, not idle
2768
2769        // Record events
2770        stats.record_event(1000);
2771        assert_eq!(stats.events_received, 1);
2772        assert_eq!(stats.last_event_time, 1000);
2773
2774        stats.record_event(2000);
2775        assert_eq!(stats.events_received, 2);
2776        assert_eq!(stats.last_event_time, 2000);
2777
2778        // Check idle detection
2779        assert!(!stats.is_idle(2000, 60_000)); // Just received event
2780        assert!(!stats.is_idle(50_000, 60_000)); // Within threshold
2781
2782        // After threshold with no new events in window
2783        stats.events_this_window = 0;
2784        assert!(stats.is_idle(100_000, 60_000)); // Past threshold
2785    }
2786
2787    #[test]
2788    fn test_f057_key_metadata_tracking() {
2789        let mut meta = KeyMetadata::new(1000);
2790        assert_eq!(meta.last_activity, 1000);
2791        assert_eq!(meta.event_count, 1);
2792        assert_eq!(meta.state_entries, 1);
2793
2794        meta.record_event(2000);
2795        assert_eq!(meta.last_activity, 2000);
2796        assert_eq!(meta.event_count, 2);
2797        assert_eq!(meta.state_entries, 2);
2798
2799        meta.decrement_entries();
2800        assert_eq!(meta.state_entries, 1);
2801
2802        assert!(!meta.is_idle(2000, 60_000));
2803        assert!(meta.is_idle(100_000, 60_000));
2804    }
2805
2806    #[test]
2807    fn test_f057_per_key_tracking_in_operator() {
2808        let config = StreamJoinConfig::builder()
2809            .left_key_column("order_id")
2810            .right_key_column("order_id")
2811            .time_bound(Duration::from_secs(3600))
2812            .join_type(JoinType::Inner)
2813            .per_key_tracking(true)
2814            .build();
2815
2816        let mut operator = StreamJoinOperator::from_config(config);
2817        let mut timers = TimerService::new();
2818        let mut state = InMemoryStore::new();
2819        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2820
2821        // Process events with different keys
2822        for (i, key) in ["order_1", "order_2", "order_3"].iter().enumerate() {
2823            let event = create_order_event(1000 + i as i64 * 100, key, 100);
2824            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2825            operator.process_side(&event, JoinSide::Left, &mut ctx);
2826        }
2827
2828        // Verify key tracking
2829        assert_eq!(operator.tracked_key_count(), 3);
2830        assert_eq!(operator.metrics().tracked_keys, 3);
2831    }
2832
2833    #[test]
2834    fn test_f057_encoding_metrics() {
2835        // Test compact encoding
2836        let mut compact_op = StreamJoinOperator::from_config(
2837            StreamJoinConfig::builder()
2838                .left_key_column("order_id")
2839                .right_key_column("order_id")
2840                .time_bound(Duration::from_secs(3600))
2841                .join_type(JoinType::Inner)
2842                .row_encoding(JoinRowEncoding::Compact)
2843                .build(),
2844        );
2845
2846        let mut timers = TimerService::new();
2847        let mut state = InMemoryStore::new();
2848        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2849
2850        let event = create_order_event(1000, "order_1", 100);
2851        {
2852            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2853            compact_op.process_side(&event, JoinSide::Left, &mut ctx);
2854        }
2855        assert_eq!(compact_op.metrics().compact_encodes, 1);
2856        assert_eq!(compact_op.metrics().cpu_friendly_encodes, 0);
2857
2858        // Test CPU-friendly encoding
2859        let mut cpu_op = StreamJoinOperator::from_config(
2860            StreamJoinConfig::builder()
2861                .left_key_column("order_id")
2862                .right_key_column("order_id")
2863                .time_bound(Duration::from_secs(3600))
2864                .join_type(JoinType::Inner)
2865                .row_encoding(JoinRowEncoding::CpuFriendly)
2866                .build(),
2867        );
2868
2869        let mut state2 = InMemoryStore::new();
2870        {
2871            let mut ctx = create_test_context(&mut timers, &mut state2, &mut watermark_gen);
2872            cpu_op.process_side(&event, JoinSide::Left, &mut ctx);
2873        }
2874        assert_eq!(cpu_op.metrics().cpu_friendly_encodes, 1);
2875        assert_eq!(cpu_op.metrics().compact_encodes, 0);
2876    }
2877
2878    #[test]
2879    fn test_f057_asymmetric_compaction_detection() {
2880        let config = StreamJoinConfig::builder()
2881            .left_key_column("order_id")
2882            .right_key_column("order_id")
2883            .time_bound(Duration::from_secs(60))
2884            .join_type(JoinType::Inner)
2885            .asymmetric_compaction(true)
2886            .idle_threshold(Duration::from_secs(10)) // 10 seconds
2887            .build();
2888
2889        let mut operator = StreamJoinOperator::from_config(config);
2890        let mut timers = TimerService::new();
2891        let mut state = InMemoryStore::new();
2892        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2893
2894        // Process left events
2895        for i in 0..5 {
2896            let event = create_order_event(1000 + i * 100, "order_1", 100);
2897            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2898            ctx.processing_time = 1000 + i * 100;
2899            operator.process_side(&event, JoinSide::Left, &mut ctx);
2900        }
2901
2902        // Left side is not idle (just processed events)
2903        assert!(!operator.is_side_idle(JoinSide::Left, 1500));
2904
2905        // Right side has no events - but is_idle returns false when no events received
2906        assert!(!operator.is_side_idle(JoinSide::Right, 1500));
2907
2908        // Simulate time passing with no left events
2909        operator.left_stats.events_this_window = 0;
2910        assert!(operator.is_side_idle(JoinSide::Left, 100_000));
2911    }
2912
2913    #[test]
2914    fn test_f057_effective_build_side_selection() {
2915        // Test configured build side
2916        let config = StreamJoinConfig::builder()
2917            .left_key_column("key")
2918            .right_key_column("key")
2919            .time_bound(Duration::from_secs(60))
2920            .join_type(JoinType::Inner)
2921            .build_side(JoinSide::Right)
2922            .build();
2923
2924        let operator = StreamJoinOperator::from_config(config);
2925        assert_eq!(operator.effective_build_side(), JoinSide::Right);
2926
2927        // Test auto-selection (smaller side)
2928        let config2 = StreamJoinConfig::builder()
2929            .left_key_column("key")
2930            .right_key_column("key")
2931            .time_bound(Duration::from_secs(60))
2932            .join_type(JoinType::Inner)
2933            .build();
2934
2935        let mut operator2 = StreamJoinOperator::from_config(config2);
2936        operator2.left_stats.events_received = 100;
2937        operator2.right_stats.events_received = 1000;
2938
2939        // Left is smaller, so should be build side
2940        assert_eq!(operator2.effective_build_side(), JoinSide::Left);
2941    }
2942
2943    #[test]
2944    fn test_f057_join_with_cpu_friendly_encoding() {
2945        let config = StreamJoinConfig::builder()
2946            .left_key_column("order_id")
2947            .right_key_column("order_id")
2948            .time_bound(Duration::from_secs(3600))
2949            .join_type(JoinType::Inner)
2950            .row_encoding(JoinRowEncoding::CpuFriendly)
2951            .build();
2952
2953        let mut operator = StreamJoinOperator::from_config(config);
2954        let mut timers = TimerService::new();
2955        let mut state = InMemoryStore::new();
2956        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
2957
2958        // Process left event
2959        let order = create_order_event(1000, "order_1", 100);
2960        {
2961            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2962            operator.process_side(&order, JoinSide::Left, &mut ctx);
2963        }
2964
2965        // Process right event - should produce a match
2966        let payment = create_payment_event(2000, "order_1", "paid");
2967        {
2968            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
2969            let outputs = operator.process_side(&payment, JoinSide::Right, &mut ctx);
2970            assert_eq!(
2971                outputs
2972                    .iter()
2973                    .filter(|o| matches!(o, Output::Event(_)))
2974                    .count(),
2975                1
2976            );
2977        }
2978
2979        assert_eq!(operator.metrics().matches, 1);
2980        assert_eq!(operator.metrics().cpu_friendly_encodes, 2); // Both events encoded
2981    }
2982
2983    #[test]
2984    fn test_f057_checkpoint_restore_with_optimization_state() {
2985        let config = StreamJoinConfig::builder()
2986            .left_key_column("key")
2987            .right_key_column("key")
2988            .time_bound(Duration::from_secs(60))
2989            .join_type(JoinType::Inner)
2990            .operator_id("test_join")
2991            .build();
2992
2993        let mut operator = StreamJoinOperator::from_config(config);
2994
2995        // Simulate activity
2996        operator.metrics.left_events = 100;
2997        operator.metrics.right_events = 50;
2998        operator.metrics.matches = 25;
2999        operator.metrics.cpu_friendly_encodes = 10;
3000        operator.metrics.compact_encodes = 140;
3001        operator.metrics.asymmetric_skips = 5;
3002        operator.metrics.idle_key_cleanups = 3;
3003        operator.metrics.build_side_prunes = 2;
3004        operator.left_stats.events_received = 100;
3005        operator.right_stats.events_received = 50;
3006        operator.left_watermark = 5000;
3007        operator.right_watermark = 4000;
3008
3009        // Checkpoint
3010        let checkpoint = operator.checkpoint();
3011
3012        // Restore
3013        let config2 = StreamJoinConfig::builder()
3014            .left_key_column("key")
3015            .right_key_column("key")
3016            .time_bound(Duration::from_secs(60))
3017            .join_type(JoinType::Inner)
3018            .operator_id("test_join")
3019            .build();
3020
3021        let mut restored = StreamJoinOperator::from_config(config2);
3022        restored.restore(checkpoint).unwrap();
3023
3024        // Verify F057 state was restored
3025        assert_eq!(restored.metrics().left_events, 100);
3026        assert_eq!(restored.metrics().right_events, 50);
3027        assert_eq!(restored.metrics().matches, 25);
3028        assert_eq!(restored.metrics().cpu_friendly_encodes, 10);
3029        assert_eq!(restored.metrics().compact_encodes, 140);
3030        assert_eq!(restored.metrics().asymmetric_skips, 5);
3031        assert_eq!(restored.metrics().idle_key_cleanups, 3);
3032        assert_eq!(restored.metrics().build_side_prunes, 2);
3033        assert_eq!(restored.left_stats.events_received, 100);
3034        assert_eq!(restored.right_stats.events_received, 50);
3035        assert_eq!(restored.left_watermark, 5000);
3036        assert_eq!(restored.right_watermark, 4000);
3037    }
3038
3039    #[test]
3040    fn test_f057_should_skip_compaction() {
3041        let config = StreamJoinConfig::builder()
3042            .left_key_column("key")
3043            .right_key_column("key")
3044            .time_bound(Duration::from_secs(60))
3045            .join_type(JoinType::Inner)
3046            .asymmetric_compaction(true)
3047            .idle_threshold(Duration::from_secs(10))
3048            .build();
3049
3050        let mut operator = StreamJoinOperator::from_config(config);
3051
3052        // Record some left events
3053        operator.left_stats.record_event(1000);
3054        operator.left_stats.events_this_window = 0; // Simulate window rollover
3055
3056        // Should skip compaction for idle left side
3057        assert!(operator.should_skip_compaction(JoinSide::Left, 100_000));
3058
3059        // Should not skip when asymmetric compaction is disabled
3060        operator.asymmetric_compaction = false;
3061        assert!(!operator.should_skip_compaction(JoinSide::Left, 100_000));
3062    }
3063
3064    #[test]
3065    fn test_f057_multiple_rows_cpu_friendly() {
3066        // Test with multiple values in arrays
3067        let schema = Arc::new(Schema::new(vec![
3068            Field::new("id", DataType::Int64, false),
3069            Field::new("name", DataType::Utf8, false),
3070        ]));
3071
3072        // Single row (typical for streaming)
3073        let batch = RecordBatch::try_new(
3074            schema.clone(),
3075            vec![
3076                Arc::new(Int64Array::from(vec![1])),
3077                Arc::new(StringArray::from(vec!["Alice"])),
3078            ],
3079        )
3080        .unwrap();
3081
3082        let row =
3083            JoinRow::with_encoding(1000, b"key".to_vec(), &batch, JoinRowEncoding::CpuFriendly)
3084                .unwrap();
3085        let restored = row.to_batch().unwrap();
3086
3087        let id_col = restored
3088            .column(0)
3089            .as_any()
3090            .downcast_ref::<Int64Array>()
3091            .unwrap();
3092        let name_col = restored
3093            .column(1)
3094            .as_any()
3095            .downcast_ref::<StringArray>()
3096            .unwrap();
3097
3098        assert_eq!(id_col.value(0), 1);
3099        assert_eq!(name_col.value(0), "Alice");
3100    }
3101}