Skip to main content

laminar_core/operator/
asof_join.rs

1//! # ASOF Join Operators
2//!
3//! Implementation of temporal proximity joins that match events based on
4//! closest timestamp rather than exact equality.
5//!
6//! ASOF joins are essential for financial and time-series applications where
7//! you need to enrich events with the most recent prior data (e.g., enriching
8//! trades with the most recent quote).
9//!
10//! ## Join Directions
11//!
12//! - **Backward**: Match with the most recent prior event (default for finance)
13//! - **Forward**: Match with the next future event
14//! - **Nearest**: Match with the closest event by absolute time difference
15//!
16//! ## Example
17//!
18//! ```rust,no_run
19//! use laminar_core::operator::asof_join::{
20//!     AsofJoinOperator, AsofJoinConfig, AsofDirection, AsofJoinType,
21//! };
22//! use std::time::Duration;
23//!
24//! // Join trades with the most recent quote within 5 seconds
25//! let config = AsofJoinConfig {
26//!     key_column: "symbol".to_string(),
27//!     left_time_column: "trade_time".to_string(),
28//!     right_time_column: "quote_time".to_string(),
29//!     direction: AsofDirection::Backward,
30//!     tolerance: Some(Duration::from_secs(5)),
31//!     join_type: AsofJoinType::Inner,
32//!     operator_id: Some("trade_quote_join".to_string()),
33//! };
34//!
35//! let operator = AsofJoinOperator::new(config);
36//! ```
37//!
38//! ## SQL Syntax (Future)
39//!
40//! ```sql
41//! SELECT t.*, q.bid, q.ask
42//! FROM trades t
43//! ASOF JOIN quotes q
44//!     ON t.symbol = q.symbol
45//!     AND t.trade_time >= q.quote_time
46//!     AND t.trade_time - q.quote_time <= INTERVAL '5' SECOND;
47//! ```
48//!
49//! ## State Management
50//!
51//! Right-side events are stored in per-key `BTreeMap` structures for O(log n)
52//! temporal lookups. State is cleaned up based on watermark progress.
53
54use super::{
55    Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
56    TimerKey,
57};
58use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use fxhash::FxHashMap;
61use rkyv::{
62    rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
63};
64use smallvec::SmallVec;
65use std::collections::BTreeMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::sync::Arc;
68use std::time::Duration;
69
70/// Direction for ASOF matching.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum AsofDirection {
73    /// Match with the most recent prior event (timestamp <= left timestamp).
74    /// This is the default and most common for financial applications.
75    #[default]
76    Backward,
77    /// Match with the next future event (timestamp >= left timestamp).
78    Forward,
79    /// Match with the closest event by absolute time difference.
80    Nearest,
81}
82
83/// Type of ASOF join to perform.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum AsofJoinType {
86    /// Inner join - only emit when a match is found.
87    #[default]
88    Inner,
89    /// Left outer join - emit all left events, with nulls for unmatched.
90    Left,
91}
92
93impl AsofJoinType {
94    /// Returns true if unmatched left events should be emitted.
95    #[must_use]
96    pub fn emits_unmatched(&self) -> bool {
97        matches!(self, AsofJoinType::Left)
98    }
99}
100
101/// Configuration for an ASOF join operator.
102#[derive(Debug, Clone)]
103pub struct AsofJoinConfig {
104    /// Column name used as the join key (must match in both streams).
105    pub key_column: String,
106    /// Column name for the timestamp in left stream events.
107    pub left_time_column: String,
108    /// Column name for the timestamp in right stream events.
109    pub right_time_column: String,
110    /// Direction for temporal matching.
111    pub direction: AsofDirection,
112    /// Maximum time difference for matching (None = unlimited).
113    pub tolerance: Option<Duration>,
114    /// Type of join to perform.
115    pub join_type: AsofJoinType,
116    /// Operator ID for checkpointing.
117    pub operator_id: Option<String>,
118}
119
120impl AsofJoinConfig {
121    /// Creates a new builder for ASOF join configuration.
122    #[must_use]
123    pub fn builder() -> AsofJoinConfigBuilder {
124        AsofJoinConfigBuilder::default()
125    }
126
127    /// Returns the tolerance in milliseconds, or `i64::MAX` if unlimited.
128    #[must_use]
129    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
130    pub fn tolerance_ms(&self) -> i64 {
131        self.tolerance.map_or(i64::MAX, |d| d.as_millis() as i64)
132    }
133}
134
135/// Builder for [`AsofJoinConfig`].
136#[derive(Debug, Default)]
137pub struct AsofJoinConfigBuilder {
138    key_column: Option<String>,
139    left_time_column: Option<String>,
140    right_time_column: Option<String>,
141    direction: Option<AsofDirection>,
142    tolerance: Option<Duration>,
143    join_type: Option<AsofJoinType>,
144    operator_id: Option<String>,
145}
146
147impl AsofJoinConfigBuilder {
148    /// Sets the join key column name.
149    #[must_use]
150    pub fn key_column(mut self, column: String) -> Self {
151        self.key_column = Some(column);
152        self
153    }
154
155    /// Sets the left timestamp column name.
156    #[must_use]
157    pub fn left_time_column(mut self, column: String) -> Self {
158        self.left_time_column = Some(column);
159        self
160    }
161
162    /// Sets the right timestamp column name.
163    #[must_use]
164    pub fn right_time_column(mut self, column: String) -> Self {
165        self.right_time_column = Some(column);
166        self
167    }
168
169    /// Sets the ASOF direction.
170    #[must_use]
171    pub fn direction(mut self, direction: AsofDirection) -> Self {
172        self.direction = Some(direction);
173        self
174    }
175
176    /// Sets the tolerance for matching.
177    #[must_use]
178    pub fn tolerance(mut self, tolerance: Duration) -> Self {
179        self.tolerance = Some(tolerance);
180        self
181    }
182
183    /// Sets the join type.
184    #[must_use]
185    pub fn join_type(mut self, join_type: AsofJoinType) -> Self {
186        self.join_type = Some(join_type);
187        self
188    }
189
190    /// Sets a custom operator ID.
191    #[must_use]
192    pub fn operator_id(mut self, id: String) -> Self {
193        self.operator_id = Some(id);
194        self
195    }
196
197    /// Builds the configuration.
198    ///
199    /// # Errors
200    ///
201    /// Returns `OperatorError::ConfigError` if required fields
202    /// (`key_column`, `left_time_column`, `right_time_column`) are not set.
203    pub fn build(self) -> Result<AsofJoinConfig, OperatorError> {
204        Ok(AsofJoinConfig {
205            key_column: self
206                .key_column
207                .ok_or_else(|| OperatorError::ConfigError("key_column is required".into()))?,
208            left_time_column: self
209                .left_time_column
210                .ok_or_else(|| OperatorError::ConfigError("left_time_column is required".into()))?,
211            right_time_column: self.right_time_column.ok_or_else(|| {
212                OperatorError::ConfigError("right_time_column is required".into())
213            })?,
214            direction: self.direction.unwrap_or_default(),
215            tolerance: self.tolerance,
216            join_type: self.join_type.unwrap_or_default(),
217            operator_id: self.operator_id,
218        })
219    }
220}
221
222/// Timer key prefix for state cleanup.
223const ASOF_TIMER_PREFIX: u8 = 0x50;
224
225/// Static counter for generating unique operator IDs.
226static ASOF_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
227
228/// Stack-allocated key buffer for join keys.
229/// 24 bytes covers most string symbol names and all numeric keys.
230type AsofKey = SmallVec<[u8; 24]>;
231
232/// A stored right-side event for ASOF matching.
233///
234/// Stores `Arc<RecordBatch>` directly for zero-copy access on the hot path.
235/// IPC serialization is only used during checkpoint/restore (Ring 1).
236#[derive(Debug, Clone)]
237pub struct AsofRow {
238    /// Event timestamp in milliseconds.
239    pub timestamp: i64,
240    /// The record batch data (zero-copy via Arc).
241    batch: Arc<RecordBatch>,
242}
243
244impl AsofRow {
245    /// Creates a new ASOF row from an event.
246    ///
247    /// Cost: O(1) — just an atomic increment on the Arc.
248    fn new(timestamp: i64, batch: &Arc<RecordBatch>) -> Self {
249        Self {
250            timestamp,
251            batch: Arc::clone(batch),
252        }
253    }
254
255    /// Returns a reference to the record batch.
256    #[must_use]
257    pub fn batch(&self) -> &RecordBatch {
258        &self.batch
259    }
260}
261
262/// Serializable version of [`AsofRow`] for checkpointing.
263/// IPC serialization lives here, not on the hot path.
264#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
265struct SerializableAsofRow {
266    timestamp: i64,
267    data: Vec<u8>,
268}
269
270impl SerializableAsofRow {
271    /// Serializes an `AsofRow` to IPC bytes for checkpointing.
272    fn from_row(row: &AsofRow) -> Result<Self, OperatorError> {
273        let mut buf = Vec::new();
274        {
275            let mut writer =
276                arrow_ipc::writer::StreamWriter::try_new(&mut buf, &row.batch.schema())
277                    .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
278            writer
279                .write(&row.batch)
280                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
281            writer
282                .finish()
283                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
284        }
285        Ok(Self {
286            timestamp: row.timestamp,
287            data: buf,
288        })
289    }
290
291    /// Deserializes IPC bytes back to an `AsofRow` during restore.
292    fn to_row(&self) -> Result<AsofRow, OperatorError> {
293        let cursor = std::io::Cursor::new(&self.data);
294        let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
295            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
296        let batch = reader
297            .next()
298            .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
299            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
300        Ok(AsofRow {
301            timestamp: self.timestamp,
302            batch: Arc::new(batch),
303        })
304    }
305}
306
307/// Per-key state for ASOF joining.
308///
309/// Uses `BTreeMap` for O(log n) range queries on timestamps.
310#[derive(Debug, Clone, Default)]
311pub struct KeyState {
312    /// Events indexed by timestamp for efficient range queries.
313    /// Multiple events at the same timestamp are stored in a vector.
314    pub events: BTreeMap<i64, SmallVec<[AsofRow; 1]>>,
315    /// Minimum timestamp in this key's state.
316    pub min_timestamp: i64,
317    /// Maximum timestamp in this key's state.
318    pub max_timestamp: i64,
319}
320
321impl KeyState {
322    /// Creates a new empty key state.
323    #[must_use]
324    pub fn new() -> Self {
325        Self {
326            events: BTreeMap::new(),
327            min_timestamp: i64::MAX,
328            max_timestamp: i64::MIN,
329        }
330    }
331
332    /// Inserts an event into the state.
333    pub fn insert(&mut self, row: AsofRow) {
334        let ts = row.timestamp;
335        self.events.entry(ts).or_default().push(row);
336        self.min_timestamp = self.min_timestamp.min(ts);
337        self.max_timestamp = self.max_timestamp.max(ts);
338    }
339
340    /// Returns the number of events in this key's state.
341    #[must_use]
342    pub fn len(&self) -> usize {
343        self.events.values().map(SmallVec::len).sum()
344    }
345
346    /// Returns true if this key has no events.
347    #[must_use]
348    pub fn is_empty(&self) -> bool {
349        self.events.is_empty()
350    }
351
352    /// Removes events with timestamps before the given threshold.
353    pub fn cleanup_before(&mut self, threshold: i64) {
354        self.events = self.events.split_off(&threshold);
355        self.min_timestamp = self.events.keys().next().copied().unwrap_or(i64::MAX);
356    }
357}
358
359/// Serializable version of `KeyState` for checkpointing.
360#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
361struct SerializableKeyState {
362    events: Vec<(i64, Vec<SerializableAsofRow>)>,
363    min_timestamp: i64,
364    max_timestamp: i64,
365}
366
367impl SerializableKeyState {
368    /// Converts a `KeyState` to serializable form for checkpointing.
369    fn from_key_state(state: &KeyState) -> Result<Self, OperatorError> {
370        let events = state
371            .events
372            .iter()
373            .map(|(ts, rows)| {
374                let ser_rows: Result<Vec<_>, _> =
375                    rows.iter().map(SerializableAsofRow::from_row).collect();
376                ser_rows.map(|r| (*ts, r))
377            })
378            .collect::<Result<Vec<_>, _>>()?;
379        Ok(Self {
380            events,
381            min_timestamp: state.min_timestamp,
382            max_timestamp: state.max_timestamp,
383        })
384    }
385
386    /// Converts serialized form back to a `KeyState` during restore.
387    fn to_key_state(&self) -> Result<KeyState, OperatorError> {
388        let mut events = BTreeMap::new();
389        for (ts, rows) in &self.events {
390            let asof_rows: Result<SmallVec<[AsofRow; 1]>, _> =
391                rows.iter().map(SerializableAsofRow::to_row).collect();
392            events.insert(*ts, asof_rows?);
393        }
394        Ok(KeyState {
395            events,
396            min_timestamp: self.min_timestamp,
397            max_timestamp: self.max_timestamp,
398        })
399    }
400}
401
402/// Metrics for tracking ASOF join operations.
403#[derive(Debug, Clone, Default)]
404pub struct AsofJoinMetrics {
405    /// Number of left events processed.
406    pub left_events: u64,
407    /// Number of right events processed.
408    pub right_events: u64,
409    /// Number of matches found.
410    pub matches: u64,
411    /// Number of unmatched left events (for left join).
412    pub unmatched_left: u64,
413    /// Number of matches within tolerance.
414    pub within_tolerance: u64,
415    /// Number of matches rejected due to tolerance.
416    pub outside_tolerance: u64,
417    /// Number of late events dropped.
418    pub late_events: u64,
419    /// Number of state cleanup operations.
420    pub state_cleanups: u64,
421}
422
423impl AsofJoinMetrics {
424    /// Creates new metrics.
425    #[must_use]
426    pub fn new() -> Self {
427        Self::default()
428    }
429
430    /// Resets all counters.
431    pub fn reset(&mut self) {
432        *self = Self::default();
433    }
434}
435
436/// ASOF join operator.
437///
438/// Performs temporal proximity joins between two event streams. Left events
439/// probe the right-side state for the closest matching timestamp.
440///
441/// # State Management
442///
443/// Right-side events are stored in memory in per-key `BTreeMap` structures.
444/// State is persisted to the state store on checkpoint and cleaned up
445/// based on watermark progress.
446///
447/// # Performance Characteristics
448///
449/// - Matching: O(log n) per left event (`BTreeMap` range query)
450/// - State size: Bounded by right-side events within tolerance + watermark lag
451/// - Memory: Linear in right-side event count per key
452pub struct AsofJoinOperator {
453    /// Configuration.
454    config: AsofJoinConfig,
455    /// Operator ID.
456    operator_id: String,
457    /// Per-key right-side state.
458    right_state: FxHashMap<AsofKey, KeyState>,
459    /// Current watermark.
460    watermark: i64,
461    /// Metrics.
462    metrics: AsofJoinMetrics,
463    /// Output schema (lazily initialized).
464    output_schema: Option<SchemaRef>,
465    /// Left schema (captured from first left event).
466    left_schema: Option<SchemaRef>,
467    /// Right schema (captured from first right event).
468    right_schema: Option<SchemaRef>,
469    /// Cached column index for join key in left schema.
470    left_key_index: Option<usize>,
471    /// Cached column index for join key in right schema.
472    right_key_index: Option<usize>,
473}
474
475impl AsofJoinOperator {
476    /// Creates a new ASOF join operator.
477    #[must_use]
478    pub fn new(config: AsofJoinConfig) -> Self {
479        let operator_id = config.operator_id.clone().unwrap_or_else(|| {
480            let num = ASOF_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
481            format!("asof_join_{num}")
482        });
483
484        Self {
485            config,
486            operator_id,
487            right_state: FxHashMap::default(),
488            watermark: i64::MIN,
489            metrics: AsofJoinMetrics::new(),
490            output_schema: None,
491            left_schema: None,
492            right_schema: None,
493            left_key_index: None,
494            right_key_index: None,
495        }
496    }
497
498    /// Creates a new ASOF join operator with explicit ID.
499    #[must_use]
500    pub fn with_id(mut config: AsofJoinConfig, operator_id: String) -> Self {
501        config.operator_id = Some(operator_id);
502        Self::new(config)
503    }
504
505    /// Returns the configuration.
506    #[must_use]
507    pub fn config(&self) -> &AsofJoinConfig {
508        &self.config
509    }
510
511    /// Returns the metrics.
512    #[must_use]
513    pub fn metrics(&self) -> &AsofJoinMetrics {
514        &self.metrics
515    }
516
517    /// Resets the metrics.
518    pub fn reset_metrics(&mut self) {
519        self.metrics.reset();
520    }
521
522    /// Returns the current watermark.
523    #[must_use]
524    pub fn watermark(&self) -> i64 {
525        self.watermark
526    }
527
528    /// Returns the total number of right-side events in state.
529    #[must_use]
530    pub fn state_size(&self) -> usize {
531        self.right_state.values().map(KeyState::len).sum()
532    }
533
534    /// Processes a left-side event (probe side).
535    pub fn process_left(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
536        self.metrics.left_events += 1;
537
538        // Capture left schema on first event
539        if self.left_schema.is_none() {
540            self.left_schema = Some(event.data.schema());
541            self.update_output_schema();
542        }
543
544        let mut output = OutputVec::new();
545
546        // Extract join key
547        let Some(key_value) = Self::extract_key(
548            &event.data,
549            &self.config.key_column,
550            &mut self.left_key_index,
551        ) else {
552            return output;
553        };
554
555        // Extract timestamp from left event
556        let left_timestamp = event.timestamp;
557
558        // Find matching right event
559        let match_result = self.find_match(&key_value, left_timestamp);
560
561        match match_result {
562            Some(matched_row) => {
563                self.metrics.matches += 1;
564                self.metrics.within_tolerance += 1;
565
566                // Create joined output
567                if let Some(joined) = self.create_joined_event(event, &matched_row) {
568                    output.push(Output::Event(joined));
569                }
570            }
571            None => {
572                if self.config.join_type.emits_unmatched() {
573                    self.metrics.unmatched_left += 1;
574                    if let Some(unmatched) = self.create_unmatched_event(event) {
575                        output.push(Output::Event(unmatched));
576                    }
577                }
578            }
579        }
580
581        output
582    }
583
584    /// Processes a right-side event (build side).
585    pub fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
586        self.metrics.right_events += 1;
587
588        // Capture right schema on first event
589        if self.right_schema.is_none() {
590            self.right_schema = Some(event.data.schema());
591            self.update_output_schema();
592        }
593
594        let output = OutputVec::new();
595
596        // Extract join key
597        let Some(key_value) = Self::extract_key(
598            &event.data,
599            &self.config.key_column,
600            &mut self.right_key_index,
601        ) else {
602            return output;
603        };
604
605        // Check if event is too late
606        if self.watermark > i64::MIN && event.timestamp < self.watermark {
607            self.metrics.late_events += 1;
608            // Still store it as it might match future left events for Backward direction
609        }
610
611        // Create row and store in state (key is stored in HashMap, not in row)
612        let row = AsofRow::new(event.timestamp, &event.data);
613
614        // Calculate cleanup time before borrowing state
615        let cleanup_time = self.calculate_cleanup_time(event.timestamp);
616
617        let key_state = self.right_state.entry(key_value).or_default();
618        key_state.insert(row);
619
620        // Register cleanup timer based on tolerance and direction
621        let timer_key = Self::make_cleanup_timer_key(&key_state.max_timestamp.to_be_bytes());
622        ctx.timers
623            .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
624
625        output
626    }
627
628    /// Finds a matching right-side event for the given left timestamp.
629    fn find_match(&self, key: &[u8], left_timestamp: i64) -> Option<AsofRow> {
630        let key_state = self.right_state.get(key)?;
631
632        match self.config.direction {
633            AsofDirection::Backward => self.find_backward_match(key_state, left_timestamp),
634            AsofDirection::Forward => self.find_forward_match(key_state, left_timestamp),
635            AsofDirection::Nearest => self.find_nearest_match(key_state, left_timestamp),
636        }
637    }
638
639    /// Finds the most recent prior event (timestamp <= `left_timestamp`).
640    fn find_backward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
641        // Use range(..=left_timestamp).last() to find the most recent prior event
642        let (ts, rows) = key_state.events.range(..=left_timestamp).next_back()?;
643
644        // Check tolerance
645        let diff = left_timestamp - ts;
646        if diff > self.config.tolerance_ms() {
647            return None;
648        }
649
650        // Return the last row at this timestamp (most recent)
651        rows.last().cloned()
652    }
653
654    /// Finds the next future event (timestamp >= `left_timestamp`).
655    fn find_forward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
656        // Use range(left_timestamp..).first() to find the next future event
657        let (ts, rows) = key_state.events.range(left_timestamp..).next()?;
658
659        // Check tolerance
660        let diff = ts - left_timestamp;
661        if diff > self.config.tolerance_ms() {
662            return None;
663        }
664
665        // Return the first row at this timestamp
666        rows.first().cloned()
667    }
668
669    /// Finds the closest event by absolute time difference.
670    fn find_nearest_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
671        let before = key_state.events.range(..=left_timestamp).next_back();
672        let after = key_state.events.range(left_timestamp..).next();
673
674        let candidate = match (before, after) {
675            (Some((ts_before, rows_before)), Some((ts_after, rows_after))) => {
676                let diff_before = left_timestamp - ts_before;
677                let diff_after = ts_after - left_timestamp;
678                if diff_before <= diff_after {
679                    Some((diff_before, rows_before.last()?.clone()))
680                } else {
681                    Some((diff_after, rows_after.first()?.clone()))
682                }
683            }
684            (Some((ts, rows)), None) => {
685                let diff = left_timestamp - ts;
686                Some((diff, rows.last()?.clone()))
687            }
688            (None, Some((ts, rows))) => {
689                let diff = ts - left_timestamp;
690                Some((diff, rows.first()?.clone()))
691            }
692            (None, None) => None,
693        };
694
695        let (diff, row) = candidate?;
696
697        // Check tolerance
698        if diff > self.config.tolerance_ms() {
699            return None;
700        }
701
702        Some(row)
703    }
704
705    /// Calculates when state for a given timestamp can be cleaned up.
706    fn calculate_cleanup_time(&self, timestamp: i64) -> i64 {
707        let tolerance_ms = self.config.tolerance_ms();
708        match self.config.direction {
709            // For Backward and Nearest, we need to keep state longer
710            // because future left events may match with these right events
711            AsofDirection::Backward | AsofDirection::Nearest => {
712                if tolerance_ms == i64::MAX {
713                    i64::MAX
714                } else {
715                    timestamp.saturating_add(tolerance_ms)
716                }
717            }
718            // For Forward, we can clean up more aggressively
719            AsofDirection::Forward => timestamp,
720        }
721    }
722
723    /// Handles watermark updates and triggers state cleanup.
724    pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
725        self.watermark = watermark;
726        self.cleanup_state(watermark);
727        OutputVec::new()
728    }
729
730    /// Cleans up state that can no longer produce matches.
731    fn cleanup_state(&mut self, watermark: i64) {
732        let tolerance_ms = self.config.tolerance_ms();
733
734        let threshold = match self.config.direction {
735            AsofDirection::Backward | AsofDirection::Nearest => {
736                if tolerance_ms == i64::MAX {
737                    i64::MIN // Never clean up
738                } else {
739                    watermark.saturating_sub(tolerance_ms)
740                }
741            }
742            AsofDirection::Forward => watermark,
743        };
744
745        if threshold == i64::MIN {
746            return;
747        }
748
749        let initial_count: usize = self.right_state.values().map(KeyState::len).sum();
750
751        for key_state in self.right_state.values_mut() {
752            key_state.cleanup_before(threshold);
753        }
754
755        // Remove empty key states
756        self.right_state.retain(|_, v| !v.is_empty());
757
758        let final_count: usize = self.right_state.values().map(KeyState::len).sum();
759        if final_count < initial_count {
760            self.metrics.state_cleanups += (initial_count - final_count) as u64;
761        }
762    }
763
764    /// Extracts the join key value from a record batch.
765    ///
766    /// Uses a cached column index to avoid O(n) schema lookups after the first call.
767    fn extract_key(
768        batch: &RecordBatch,
769        column_name: &str,
770        cached_index: &mut Option<usize>,
771    ) -> Option<AsofKey> {
772        let column_index = if let Some(idx) = *cached_index {
773            idx
774        } else {
775            let idx = batch.schema().index_of(column_name).ok()?;
776            *cached_index = Some(idx);
777            idx
778        };
779        let column = batch.column(column_index);
780
781        if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
782            if string_array.is_empty() || string_array.is_null(0) {
783                return None;
784            }
785            return Some(AsofKey::from_slice(string_array.value(0).as_bytes()));
786        }
787
788        if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
789            if int_array.is_empty() || int_array.is_null(0) {
790                return None;
791            }
792            return Some(AsofKey::from_slice(&int_array.value(0).to_le_bytes()));
793        }
794
795        None
796    }
797
798    /// Creates a timer key for state cleanup.
799    fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
800        let mut key = TimerKey::new();
801        key.push(ASOF_TIMER_PREFIX);
802        key.extend_from_slice(key_suffix);
803        key
804    }
805
806    /// Updates the output schema when both input schemas are known.
807    fn update_output_schema(&mut self) {
808        if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
809            let mut fields: Vec<Field> = left.fields().iter().map(|f| f.as_ref().clone()).collect();
810
811            // Add right fields, prefixing duplicates
812            for field in right.fields() {
813                let name = if left.field_with_name(field.name()).is_ok() {
814                    format!("right_{}", field.name())
815                } else {
816                    field.name().clone()
817                };
818                fields.push(Field::new(
819                    name,
820                    field.data_type().clone(),
821                    true, // Nullable for outer joins
822                ));
823            }
824
825            self.output_schema = Some(Arc::new(Schema::new(fields)));
826        }
827    }
828
829    /// Creates a joined event from left event and matched right row.
830    fn create_joined_event(&self, left_event: &Event, right_row: &AsofRow) -> Option<Event> {
831        let schema = self.output_schema.as_ref()?;
832
833        let mut columns: Vec<ArrayRef> = left_event.data.columns().to_vec();
834        for column in right_row.batch().columns() {
835            columns.push(Arc::clone(column));
836        }
837
838        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
839
840        Some(Event::new(left_event.timestamp, joined_batch))
841    }
842
843    /// Creates an unmatched event for left outer joins (with null right columns).
844    fn create_unmatched_event(&self, left_event: &Event) -> Option<Event> {
845        let schema = self.output_schema.as_ref()?;
846        let right_schema = self.right_schema.as_ref()?;
847
848        let num_rows = left_event.data.num_rows();
849        let mut columns: Vec<ArrayRef> = left_event.data.columns().to_vec();
850
851        // Add null columns for right side
852        for field in right_schema.fields() {
853            columns.push(Self::create_null_array(field.data_type(), num_rows));
854        }
855
856        let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
857
858        Some(Event::new(left_event.timestamp, joined_batch))
859    }
860
861    /// Creates a null array of the given type and length.
862    fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
863        match data_type {
864            DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
865            DataType::Float64 => {
866                use arrow_array::Float64Array;
867                Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
868            }
869            // Default to Int64 for numeric and other types
870            _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
871        }
872    }
873}
874
875impl Operator for AsofJoinOperator {
876    fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
877        // Default to processing as left event
878        self.process_left(event, ctx)
879    }
880
881    fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
882        // Timers trigger state cleanup
883        if timer.key.first() == Some(&ASOF_TIMER_PREFIX) {
884            self.cleanup_state(timer.timestamp);
885        }
886        OutputVec::new()
887    }
888
889    fn checkpoint(&self) -> OperatorState {
890        // Serialize metrics and state summary
891        let state_entries: Vec<(Vec<u8>, SerializableKeyState)> = self
892            .right_state
893            .iter()
894            .filter_map(|(k, v)| {
895                SerializableKeyState::from_key_state(v)
896                    .ok()
897                    .map(|s| (k.to_vec(), s))
898            })
899            .collect();
900
901        let checkpoint_data = (
902            self.watermark,
903            self.metrics.left_events,
904            self.metrics.right_events,
905            self.metrics.matches,
906            self.metrics.unmatched_left,
907            state_entries,
908        );
909
910        let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
911            .map(|v| v.to_vec())
912            .unwrap_or_default();
913
914        OperatorState {
915            operator_id: self.operator_id.clone(),
916            data,
917        }
918    }
919
920    fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
921        type CheckpointData = (
922            i64,
923            u64,
924            u64,
925            u64,
926            u64,
927            Vec<(Vec<u8>, SerializableKeyState)>,
928        );
929
930        if state.operator_id != self.operator_id {
931            return Err(OperatorError::StateAccessFailed(format!(
932                "Operator ID mismatch: expected {}, got {}",
933                self.operator_id, state.operator_id
934            )));
935        }
936
937        let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
938            .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
939        let (watermark, left_events, right_events, matches, unmatched_left, state_entries) =
940            rkyv::deserialize::<CheckpointData, RkyvError>(archived)
941                .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
942
943        self.watermark = watermark;
944        self.metrics.left_events = left_events;
945        self.metrics.right_events = right_events;
946        self.metrics.matches = matches;
947        self.metrics.unmatched_left = unmatched_left;
948
949        // Restore state
950        self.right_state.clear();
951        for (key, serializable) in state_entries {
952            let key_state = serializable.to_key_state()?;
953            self.right_state
954                .insert(AsofKey::from_slice(&key), key_state);
955        }
956
957        Ok(())
958    }
959}
960
961#[cfg(test)]
962mod tests {
963    use super::*;
964    use crate::state::{InMemoryStore, StateStore};
965    use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
966    use arrow_array::Float64Array;
967    use arrow_schema::{DataType, Field, Schema};
968
969    /// Creates a trade event for testing.
970    fn create_trade_event(timestamp: i64, symbol: &str, price: f64) -> Event {
971        let schema = Arc::new(Schema::new(vec![
972            Field::new("symbol", DataType::Utf8, false),
973            Field::new("price", DataType::Float64, false),
974        ]));
975        let batch = RecordBatch::try_new(
976            schema,
977            vec![
978                Arc::new(StringArray::from(vec![symbol])),
979                Arc::new(Float64Array::from(vec![price])),
980            ],
981        )
982        .unwrap();
983        Event::new(timestamp, batch)
984    }
985
986    /// Creates a quote event for testing.
987    fn create_quote_event(timestamp: i64, symbol: &str, bid: f64, ask: f64) -> Event {
988        let schema = Arc::new(Schema::new(vec![
989            Field::new("symbol", DataType::Utf8, false),
990            Field::new("bid", DataType::Float64, false),
991            Field::new("ask", DataType::Float64, false),
992        ]));
993        let batch = RecordBatch::try_new(
994            schema,
995            vec![
996                Arc::new(StringArray::from(vec![symbol])),
997                Arc::new(Float64Array::from(vec![bid])),
998                Arc::new(Float64Array::from(vec![ask])),
999            ],
1000        )
1001        .unwrap();
1002        Event::new(timestamp, batch)
1003    }
1004
1005    fn create_test_context<'a>(
1006        timers: &'a mut TimerService,
1007        state: &'a mut dyn StateStore,
1008        watermark_gen: &'a mut dyn WatermarkGenerator,
1009    ) -> OperatorContext<'a> {
1010        OperatorContext {
1011            event_time: 0,
1012            processing_time: 0,
1013            timers,
1014            state,
1015            watermark_generator: watermark_gen,
1016            operator_index: 0,
1017        }
1018    }
1019
1020    #[test]
1021    fn test_asof_direction_default() {
1022        assert_eq!(AsofDirection::default(), AsofDirection::Backward);
1023    }
1024
1025    #[test]
1026    fn test_asof_join_type_properties() {
1027        assert!(!AsofJoinType::Inner.emits_unmatched());
1028        assert!(AsofJoinType::Left.emits_unmatched());
1029    }
1030
1031    #[test]
1032    fn test_config_builder() {
1033        let config = AsofJoinConfig::builder()
1034            .key_column("symbol".to_string())
1035            .left_time_column("trade_time".to_string())
1036            .right_time_column("quote_time".to_string())
1037            .direction(AsofDirection::Backward)
1038            .tolerance(Duration::from_secs(5))
1039            .join_type(AsofJoinType::Left)
1040            .operator_id("test_op".to_string())
1041            .build()
1042            .unwrap();
1043
1044        assert_eq!(config.key_column, "symbol");
1045        assert_eq!(config.left_time_column, "trade_time");
1046        assert_eq!(config.right_time_column, "quote_time");
1047        assert_eq!(config.direction, AsofDirection::Backward);
1048        assert_eq!(config.tolerance, Some(Duration::from_secs(5)));
1049        assert_eq!(config.join_type, AsofJoinType::Left);
1050        assert_eq!(config.tolerance_ms(), 5000);
1051    }
1052
1053    #[test]
1054    fn test_backward_asof_basic() {
1055        let config = AsofJoinConfig::builder()
1056            .key_column("symbol".to_string())
1057            .left_time_column("trade_time".to_string())
1058            .right_time_column("quote_time".to_string())
1059            .direction(AsofDirection::Backward)
1060            .tolerance(Duration::from_secs(10))
1061            .join_type(AsofJoinType::Inner)
1062            .build()
1063            .unwrap();
1064
1065        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1066
1067        let mut timers = TimerService::new();
1068        let mut state = InMemoryStore::new();
1069        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1070
1071        // Store quote at t=900
1072        let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1073        {
1074            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1075            operator.process_right(&quote, &mut ctx);
1076        }
1077
1078        // Store quote at t=950
1079        let quote2 = create_quote_event(950, "AAPL", 152.0, 153.0);
1080        {
1081            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1082            operator.process_right(&quote2, &mut ctx);
1083        }
1084
1085        // Trade at t=1000 should match quote at t=950 (most recent prior)
1086        let trade = create_trade_event(1000, "AAPL", 152.5);
1087        let outputs = {
1088            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1089            operator.process_left(&trade, &mut ctx)
1090        };
1091
1092        assert_eq!(
1093            outputs
1094                .iter()
1095                .filter(|o| matches!(o, Output::Event(_)))
1096                .count(),
1097            1
1098        );
1099        assert_eq!(operator.metrics().matches, 1);
1100
1101        // Verify output has both trade and quote columns
1102        if let Some(Output::Event(event)) = outputs.first() {
1103            assert_eq!(event.data.num_columns(), 5); // 2 trade + 3 quote
1104        }
1105    }
1106
1107    #[test]
1108    fn test_forward_asof_basic() {
1109        let config = AsofJoinConfig::builder()
1110            .key_column("symbol".to_string())
1111            .left_time_column("trade_time".to_string())
1112            .right_time_column("quote_time".to_string())
1113            .direction(AsofDirection::Forward)
1114            .tolerance(Duration::from_secs(10))
1115            .join_type(AsofJoinType::Inner)
1116            .build()
1117            .unwrap();
1118
1119        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1120
1121        let mut timers = TimerService::new();
1122        let mut state = InMemoryStore::new();
1123        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1124
1125        // Store quotes after the trade time
1126        let quote1 = create_quote_event(1050, "AAPL", 150.0, 151.0);
1127        let quote2 = create_quote_event(1100, "AAPL", 152.0, 153.0);
1128        {
1129            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1130            operator.process_right(&quote1, &mut ctx);
1131            operator.process_right(&quote2, &mut ctx);
1132        }
1133
1134        // Trade at t=1000 should match quote at t=1050 (next future)
1135        let trade = create_trade_event(1000, "AAPL", 150.5);
1136        let outputs = {
1137            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1138            operator.process_left(&trade, &mut ctx)
1139        };
1140
1141        assert_eq!(
1142            outputs
1143                .iter()
1144                .filter(|o| matches!(o, Output::Event(_)))
1145                .count(),
1146            1
1147        );
1148        assert_eq!(operator.metrics().matches, 1);
1149    }
1150
1151    #[test]
1152    fn test_nearest_asof() {
1153        let config = AsofJoinConfig::builder()
1154            .key_column("symbol".to_string())
1155            .left_time_column("trade_time".to_string())
1156            .right_time_column("quote_time".to_string())
1157            .direction(AsofDirection::Nearest)
1158            .tolerance(Duration::from_secs(10))
1159            .join_type(AsofJoinType::Inner)
1160            .build()
1161            .unwrap();
1162
1163        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1164
1165        let mut timers = TimerService::new();
1166        let mut state = InMemoryStore::new();
1167        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1168
1169        // Store quotes before and after
1170        let quote_before = create_quote_event(990, "AAPL", 150.0, 151.0);
1171        let quote_after = create_quote_event(1020, "AAPL", 152.0, 153.0);
1172        {
1173            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1174            operator.process_right(&quote_before, &mut ctx);
1175            operator.process_right(&quote_after, &mut ctx);
1176        }
1177
1178        // Trade at t=1000 - quote_before is 10ms away, quote_after is 20ms away
1179        // Should match quote_before (closer)
1180        let trade = create_trade_event(1000, "AAPL", 150.5);
1181        let outputs = {
1182            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1183            operator.process_left(&trade, &mut ctx)
1184        };
1185
1186        assert_eq!(
1187            outputs
1188                .iter()
1189                .filter(|o| matches!(o, Output::Event(_)))
1190                .count(),
1191            1
1192        );
1193    }
1194
1195    #[test]
1196    fn test_tolerance_exceeded() {
1197        let config = AsofJoinConfig::builder()
1198            .key_column("symbol".to_string())
1199            .left_time_column("trade_time".to_string())
1200            .right_time_column("quote_time".to_string())
1201            .direction(AsofDirection::Backward)
1202            .tolerance(Duration::from_millis(50)) // 50ms tolerance
1203            .join_type(AsofJoinType::Inner)
1204            .build()
1205            .unwrap();
1206
1207        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1208
1209        let mut timers = TimerService::new();
1210        let mut state = InMemoryStore::new();
1211        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1212
1213        // Store quote at t=900
1214        let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1215        {
1216            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1217            operator.process_right(&quote, &mut ctx);
1218        }
1219
1220        // Trade at t=1000 - 100ms after quote, exceeds 50ms tolerance
1221        let trade = create_trade_event(1000, "AAPL", 150.5);
1222        let outputs = {
1223            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1224            operator.process_left(&trade, &mut ctx)
1225        };
1226
1227        // No match due to tolerance exceeded
1228        assert_eq!(outputs.len(), 0);
1229        assert_eq!(operator.metrics().matches, 0);
1230    }
1231
1232    #[test]
1233    fn test_tolerance_within() {
1234        let config = AsofJoinConfig::builder()
1235            .key_column("symbol".to_string())
1236            .left_time_column("trade_time".to_string())
1237            .right_time_column("quote_time".to_string())
1238            .direction(AsofDirection::Backward)
1239            .tolerance(Duration::from_millis(100)) // 100ms tolerance
1240            .join_type(AsofJoinType::Inner)
1241            .build()
1242            .unwrap();
1243
1244        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1245
1246        let mut timers = TimerService::new();
1247        let mut state = InMemoryStore::new();
1248        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1249
1250        // Store quote at t=950
1251        let quote = create_quote_event(950, "AAPL", 150.0, 151.0);
1252        {
1253            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1254            operator.process_right(&quote, &mut ctx);
1255        }
1256
1257        // Trade at t=1000 - 50ms after quote, within 100ms tolerance
1258        let trade = create_trade_event(1000, "AAPL", 150.5);
1259        let outputs = {
1260            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1261            operator.process_left(&trade, &mut ctx)
1262        };
1263
1264        assert_eq!(
1265            outputs
1266                .iter()
1267                .filter(|o| matches!(o, Output::Event(_)))
1268                .count(),
1269            1
1270        );
1271        assert_eq!(operator.metrics().within_tolerance, 1);
1272    }
1273
1274    #[test]
1275    fn test_no_match_empty_state() {
1276        let config = AsofJoinConfig::builder()
1277            .key_column("symbol".to_string())
1278            .left_time_column("trade_time".to_string())
1279            .right_time_column("quote_time".to_string())
1280            .direction(AsofDirection::Backward)
1281            .join_type(AsofJoinType::Inner)
1282            .build()
1283            .unwrap();
1284
1285        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1286
1287        let mut timers = TimerService::new();
1288        let mut state = InMemoryStore::new();
1289        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1290
1291        // Trade with no quotes in state
1292        let trade = create_trade_event(1000, "AAPL", 150.5);
1293        let outputs = {
1294            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1295            operator.process_left(&trade, &mut ctx)
1296        };
1297
1298        assert_eq!(outputs.len(), 0);
1299        assert_eq!(operator.metrics().matches, 0);
1300    }
1301
1302    #[test]
1303    fn test_multiple_keys() {
1304        let config = AsofJoinConfig::builder()
1305            .key_column("symbol".to_string())
1306            .left_time_column("trade_time".to_string())
1307            .right_time_column("quote_time".to_string())
1308            .direction(AsofDirection::Backward)
1309            .tolerance(Duration::from_secs(10))
1310            .join_type(AsofJoinType::Inner)
1311            .build()
1312            .unwrap();
1313
1314        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1315
1316        let mut timers = TimerService::new();
1317        let mut state = InMemoryStore::new();
1318        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1319
1320        // Store quotes for different symbols
1321        {
1322            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1323            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1324            operator.process_right(&create_quote_event(960, "GOOG", 2800.0, 2801.0), &mut ctx);
1325        }
1326
1327        // Trade for AAPL should match AAPL quote, not GOOG
1328        let trade = create_trade_event(1000, "AAPL", 150.5);
1329        let outputs = {
1330            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1331            operator.process_left(&trade, &mut ctx)
1332        };
1333
1334        assert_eq!(
1335            outputs
1336                .iter()
1337                .filter(|o| matches!(o, Output::Event(_)))
1338                .count(),
1339            1
1340        );
1341
1342        // Trade for GOOG should match GOOG quote
1343        let trade2 = create_trade_event(1000, "GOOG", 2800.5);
1344        let outputs2 = {
1345            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1346            operator.process_left(&trade2, &mut ctx)
1347        };
1348
1349        assert_eq!(
1350            outputs2
1351                .iter()
1352                .filter(|o| matches!(o, Output::Event(_)))
1353                .count(),
1354            1
1355        );
1356        assert_eq!(operator.metrics().matches, 2);
1357    }
1358
1359    #[test]
1360    fn test_multiple_events_same_timestamp() {
1361        let config = AsofJoinConfig::builder()
1362            .key_column("symbol".to_string())
1363            .left_time_column("trade_time".to_string())
1364            .right_time_column("quote_time".to_string())
1365            .direction(AsofDirection::Backward)
1366            .tolerance(Duration::from_secs(10))
1367            .join_type(AsofJoinType::Inner)
1368            .build()
1369            .unwrap();
1370
1371        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1372
1373        let mut timers = TimerService::new();
1374        let mut state = InMemoryStore::new();
1375        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1376
1377        // Store multiple quotes at the same timestamp
1378        {
1379            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1380            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1381            operator.process_right(&create_quote_event(950, "AAPL", 150.5, 151.5), &mut ctx);
1382            // Same ts
1383        }
1384
1385        // Trade should match (last quote at that timestamp for Backward)
1386        let trade = create_trade_event(1000, "AAPL", 150.5);
1387        let outputs = {
1388            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1389            operator.process_left(&trade, &mut ctx)
1390        };
1391
1392        assert_eq!(
1393            outputs
1394                .iter()
1395                .filter(|o| matches!(o, Output::Event(_)))
1396                .count(),
1397            1
1398        );
1399    }
1400
1401    #[test]
1402    fn test_left_outer_join() {
1403        let config = AsofJoinConfig::builder()
1404            .key_column("symbol".to_string())
1405            .left_time_column("trade_time".to_string())
1406            .right_time_column("quote_time".to_string())
1407            .direction(AsofDirection::Backward)
1408            .tolerance(Duration::from_millis(50))
1409            .join_type(AsofJoinType::Left) // Left outer join
1410            .build()
1411            .unwrap();
1412
1413        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1414
1415        let mut timers = TimerService::new();
1416        let mut state = InMemoryStore::new();
1417        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1418
1419        // First, process a matched event to establish right schema
1420        {
1421            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1422            operator.process_right(&create_quote_event(990, "AAPL", 150.0, 151.0), &mut ctx);
1423        }
1424        {
1425            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1426            operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1427        }
1428
1429        // Now process trade with no matching quote (different symbol)
1430        let trade = create_trade_event(2000, "GOOG", 2800.5);
1431        let outputs = {
1432            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1433            operator.process_left(&trade, &mut ctx)
1434        };
1435
1436        // Left join should emit with nulls
1437        assert_eq!(
1438            outputs
1439                .iter()
1440                .filter(|o| matches!(o, Output::Event(_)))
1441                .count(),
1442            1
1443        );
1444        assert_eq!(operator.metrics().unmatched_left, 1);
1445
1446        if let Some(Output::Event(event)) = outputs.first() {
1447            // Should have both left and right columns (with nulls)
1448            assert_eq!(event.data.num_columns(), 5);
1449        }
1450    }
1451
1452    #[test]
1453    fn test_inner_join_no_output() {
1454        let config = AsofJoinConfig::builder()
1455            .key_column("symbol".to_string())
1456            .left_time_column("trade_time".to_string())
1457            .right_time_column("quote_time".to_string())
1458            .direction(AsofDirection::Backward)
1459            .tolerance(Duration::from_millis(50))
1460            .join_type(AsofJoinType::Inner)
1461            .build()
1462            .unwrap();
1463
1464        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1465
1466        let mut timers = TimerService::new();
1467        let mut state = InMemoryStore::new();
1468        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1469
1470        // Trade with no matching quote
1471        let trade = create_trade_event(1000, "AAPL", 150.5);
1472        let outputs = {
1473            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1474            operator.process_left(&trade, &mut ctx)
1475        };
1476
1477        // Inner join should emit nothing when no match
1478        assert_eq!(outputs.len(), 0);
1479    }
1480
1481    #[test]
1482    fn test_state_cleanup() {
1483        let config = AsofJoinConfig::builder()
1484            .key_column("symbol".to_string())
1485            .left_time_column("trade_time".to_string())
1486            .right_time_column("quote_time".to_string())
1487            .direction(AsofDirection::Backward)
1488            .tolerance(Duration::from_millis(100))
1489            .join_type(AsofJoinType::Inner)
1490            .build()
1491            .unwrap();
1492
1493        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1494
1495        let mut timers = TimerService::new();
1496        let mut state = InMemoryStore::new();
1497        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1498
1499        // Store quotes
1500        {
1501            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1502            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1503            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1504        }
1505
1506        assert_eq!(operator.state_size(), 2);
1507
1508        // Advance watermark past old quotes
1509        {
1510            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1511            operator.on_watermark(1100, &mut ctx);
1512        }
1513
1514        // Old quotes should be cleaned up
1515        assert!(operator.state_size() < 2 || operator.metrics().state_cleanups > 0);
1516    }
1517
1518    #[test]
1519    fn test_late_event_still_stored() {
1520        let config = AsofJoinConfig::builder()
1521            .key_column("symbol".to_string())
1522            .left_time_column("trade_time".to_string())
1523            .right_time_column("quote_time".to_string())
1524            .direction(AsofDirection::Backward)
1525            .tolerance(Duration::from_secs(10))
1526            .join_type(AsofJoinType::Inner)
1527            .build()
1528            .unwrap();
1529
1530        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1531
1532        let mut timers = TimerService::new();
1533        let mut state = InMemoryStore::new();
1534        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1535
1536        // Set watermark
1537        operator.watermark = 1000;
1538
1539        // Process late quote
1540        let quote = create_quote_event(500, "AAPL", 150.0, 151.0);
1541        {
1542            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1543            operator.process_right(&quote, &mut ctx);
1544        }
1545
1546        assert_eq!(operator.metrics().late_events, 1);
1547        // Late events are still stored (for Backward joins)
1548        assert_eq!(operator.state_size(), 1);
1549    }
1550
1551    #[test]
1552    fn test_checkpoint_restore() {
1553        let config = AsofJoinConfig::builder()
1554            .key_column("symbol".to_string())
1555            .left_time_column("trade_time".to_string())
1556            .right_time_column("quote_time".to_string())
1557            .direction(AsofDirection::Backward)
1558            .tolerance(Duration::from_secs(10))
1559            .join_type(AsofJoinType::Inner)
1560            .build()
1561            .unwrap();
1562
1563        let mut operator = AsofJoinOperator::with_id(config.clone(), "test_asof".to_string());
1564
1565        let mut timers = TimerService::new();
1566        let mut state = InMemoryStore::new();
1567        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1568
1569        // Add some state
1570        {
1571            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1572            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1573            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1574        }
1575
1576        // Record metrics
1577        operator.metrics.left_events = 10;
1578        operator.metrics.matches = 5;
1579        operator.watermark = 800;
1580
1581        // Checkpoint
1582        let checkpoint = operator.checkpoint();
1583
1584        // Restore to new operator
1585        let mut restored = AsofJoinOperator::with_id(config, "test_asof".to_string());
1586        restored.restore(checkpoint).unwrap();
1587
1588        // Verify state restored
1589        assert_eq!(restored.metrics().left_events, 10);
1590        assert_eq!(restored.metrics().matches, 5);
1591        assert_eq!(restored.watermark(), 800);
1592        assert_eq!(restored.state_size(), 2);
1593    }
1594
1595    #[test]
1596    fn test_schema_composition() {
1597        let config = AsofJoinConfig::builder()
1598            .key_column("symbol".to_string())
1599            .left_time_column("trade_time".to_string())
1600            .right_time_column("quote_time".to_string())
1601            .direction(AsofDirection::Backward)
1602            .tolerance(Duration::from_secs(10))
1603            .join_type(AsofJoinType::Inner)
1604            .build()
1605            .unwrap();
1606
1607        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1608
1609        let mut timers = TimerService::new();
1610        let mut state = InMemoryStore::new();
1611        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1612
1613        // Process right to capture schema
1614        {
1615            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1616            operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1617        }
1618
1619        // Process left to capture schema and produce output
1620        let trade = create_trade_event(1000, "AAPL", 150.5);
1621        let outputs = {
1622            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1623            operator.process_left(&trade, &mut ctx)
1624        };
1625
1626        assert_eq!(outputs.len(), 1);
1627
1628        if let Some(Output::Event(event)) = outputs.first() {
1629            let schema = event.data.schema();
1630
1631            // Check left columns (trade)
1632            assert!(schema.field_with_name("price").is_ok());
1633
1634            // Check right columns (quote) - symbol is duplicated, so prefixed
1635            assert!(schema.field_with_name("right_symbol").is_ok());
1636            assert!(schema.field_with_name("bid").is_ok());
1637            assert!(schema.field_with_name("ask").is_ok());
1638        }
1639    }
1640
1641    #[test]
1642    fn test_metrics_tracking() {
1643        let config = AsofJoinConfig::builder()
1644            .key_column("symbol".to_string())
1645            .left_time_column("trade_time".to_string())
1646            .right_time_column("quote_time".to_string())
1647            .direction(AsofDirection::Backward)
1648            .tolerance(Duration::from_secs(10))
1649            .join_type(AsofJoinType::Inner)
1650            .build()
1651            .unwrap();
1652
1653        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1654
1655        let mut timers = TimerService::new();
1656        let mut state = InMemoryStore::new();
1657        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1658
1659        // Process some events
1660        {
1661            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1662            operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1663            operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1664        }
1665
1666        {
1667            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1668            operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1669            operator.process_left(&create_trade_event(1100, "AAPL", 151.5), &mut ctx);
1670        }
1671
1672        assert_eq!(operator.metrics().right_events, 2);
1673        assert_eq!(operator.metrics().left_events, 2);
1674        assert_eq!(operator.metrics().matches, 2);
1675        assert_eq!(operator.metrics().within_tolerance, 2);
1676    }
1677
1678    #[test]
1679    fn test_key_state_operations() {
1680        let mut key_state = KeyState::new();
1681        assert!(key_state.is_empty());
1682
1683        // Insert some rows
1684        let empty_batch = Arc::new(RecordBatch::new_empty(Arc::new(Schema::empty())));
1685        let row1 = AsofRow::new(100, &empty_batch);
1686        let row2 = AsofRow::new(200, &empty_batch);
1687
1688        key_state.insert(row1);
1689        key_state.insert(row2);
1690
1691        assert_eq!(key_state.len(), 2);
1692        assert_eq!(key_state.min_timestamp, 100);
1693        assert_eq!(key_state.max_timestamp, 200);
1694
1695        // Cleanup before 150
1696        key_state.cleanup_before(150);
1697        assert_eq!(key_state.len(), 1);
1698        assert_eq!(key_state.min_timestamp, 200);
1699    }
1700
1701    #[test]
1702    fn test_asof_row_serialization() {
1703        let schema = Arc::new(Schema::new(vec![
1704            Field::new("symbol", DataType::Utf8, false),
1705            Field::new("value", DataType::Float64, false),
1706        ]));
1707        let batch = Arc::new(
1708            RecordBatch::try_new(
1709                schema,
1710                vec![
1711                    Arc::new(StringArray::from(vec!["AAPL"])),
1712                    Arc::new(Float64Array::from(vec![150.5])),
1713                ],
1714            )
1715            .unwrap(),
1716        );
1717
1718        let row = AsofRow::new(1000, &batch);
1719
1720        // Verify round-trip through serializable form (checkpoint path)
1721        let serializable = SerializableAsofRow::from_row(&row).unwrap();
1722        let restored = serializable.to_row().unwrap();
1723        assert_eq!(restored.batch().num_rows(), 1);
1724        assert_eq!(restored.batch().num_columns(), 2);
1725        assert_eq!(restored.timestamp, 1000);
1726    }
1727
1728    #[test]
1729    fn test_metrics_reset() {
1730        let mut metrics = AsofJoinMetrics::new();
1731        metrics.left_events = 100;
1732        metrics.matches = 50;
1733
1734        metrics.reset();
1735
1736        assert_eq!(metrics.left_events, 0);
1737        assert_eq!(metrics.matches, 0);
1738    }
1739
1740    #[test]
1741    fn test_unlimited_tolerance() {
1742        let config = AsofJoinConfig::builder()
1743            .key_column("symbol".to_string())
1744            .left_time_column("trade_time".to_string())
1745            .right_time_column("quote_time".to_string())
1746            .direction(AsofDirection::Backward)
1747            // No tolerance set - unlimited
1748            .join_type(AsofJoinType::Inner)
1749            .build()
1750            .unwrap();
1751
1752        assert_eq!(config.tolerance_ms(), i64::MAX);
1753
1754        let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1755
1756        let mut timers = TimerService::new();
1757        let mut state = InMemoryStore::new();
1758        let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1759
1760        // Store very old quote
1761        {
1762            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1763            operator.process_right(&create_quote_event(100, "AAPL", 150.0, 151.0), &mut ctx);
1764        }
1765
1766        // Trade much later should still match with unlimited tolerance
1767        let trade = create_trade_event(1_000_000, "AAPL", 150.5);
1768        let outputs = {
1769            let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1770            operator.process_left(&trade, &mut ctx)
1771        };
1772
1773        assert_eq!(
1774            outputs
1775                .iter()
1776                .filter(|o| matches!(o, Output::Event(_)))
1777                .count(),
1778            1
1779        );
1780    }
1781}