1use super::{
55 Event, Operator, OperatorContext, OperatorError, OperatorState, Output, OutputVec, Timer,
56 TimerKey,
57};
58use arrow_array::{Array, ArrayRef, Int64Array, RecordBatch, StringArray};
59use arrow_schema::{DataType, Field, Schema, SchemaRef};
60use fxhash::FxHashMap;
61use rkyv::{
62 rancor::Error as RkyvError, Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize,
63};
64use smallvec::SmallVec;
65use std::collections::BTreeMap;
66use std::sync::atomic::{AtomicU64, Ordering};
67use std::sync::Arc;
68use std::time::Duration;
69
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
72pub enum AsofDirection {
73 #[default]
76 Backward,
77 Forward,
79 Nearest,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum AsofJoinType {
86 #[default]
88 Inner,
89 Left,
91}
92
93impl AsofJoinType {
94 #[must_use]
96 pub fn emits_unmatched(&self) -> bool {
97 matches!(self, AsofJoinType::Left)
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct AsofJoinConfig {
104 pub key_column: String,
106 pub left_time_column: String,
108 pub right_time_column: String,
110 pub direction: AsofDirection,
112 pub tolerance: Option<Duration>,
114 pub join_type: AsofJoinType,
116 pub operator_id: Option<String>,
118}
119
120impl AsofJoinConfig {
121 #[must_use]
123 pub fn builder() -> AsofJoinConfigBuilder {
124 AsofJoinConfigBuilder::default()
125 }
126
127 #[must_use]
129 #[allow(clippy::cast_possible_truncation)] pub fn tolerance_ms(&self) -> i64 {
131 self.tolerance.map_or(i64::MAX, |d| d.as_millis() as i64)
132 }
133}
134
135#[derive(Debug, Default)]
137pub struct AsofJoinConfigBuilder {
138 key_column: Option<String>,
139 left_time_column: Option<String>,
140 right_time_column: Option<String>,
141 direction: Option<AsofDirection>,
142 tolerance: Option<Duration>,
143 join_type: Option<AsofJoinType>,
144 operator_id: Option<String>,
145}
146
147impl AsofJoinConfigBuilder {
148 #[must_use]
150 pub fn key_column(mut self, column: String) -> Self {
151 self.key_column = Some(column);
152 self
153 }
154
155 #[must_use]
157 pub fn left_time_column(mut self, column: String) -> Self {
158 self.left_time_column = Some(column);
159 self
160 }
161
162 #[must_use]
164 pub fn right_time_column(mut self, column: String) -> Self {
165 self.right_time_column = Some(column);
166 self
167 }
168
169 #[must_use]
171 pub fn direction(mut self, direction: AsofDirection) -> Self {
172 self.direction = Some(direction);
173 self
174 }
175
176 #[must_use]
178 pub fn tolerance(mut self, tolerance: Duration) -> Self {
179 self.tolerance = Some(tolerance);
180 self
181 }
182
183 #[must_use]
185 pub fn join_type(mut self, join_type: AsofJoinType) -> Self {
186 self.join_type = Some(join_type);
187 self
188 }
189
190 #[must_use]
192 pub fn operator_id(mut self, id: String) -> Self {
193 self.operator_id = Some(id);
194 self
195 }
196
197 pub fn build(self) -> Result<AsofJoinConfig, OperatorError> {
204 Ok(AsofJoinConfig {
205 key_column: self
206 .key_column
207 .ok_or_else(|| OperatorError::ConfigError("key_column is required".into()))?,
208 left_time_column: self
209 .left_time_column
210 .ok_or_else(|| OperatorError::ConfigError("left_time_column is required".into()))?,
211 right_time_column: self.right_time_column.ok_or_else(|| {
212 OperatorError::ConfigError("right_time_column is required".into())
213 })?,
214 direction: self.direction.unwrap_or_default(),
215 tolerance: self.tolerance,
216 join_type: self.join_type.unwrap_or_default(),
217 operator_id: self.operator_id,
218 })
219 }
220}
221
222const ASOF_TIMER_PREFIX: u8 = 0x50;
224
225static ASOF_OPERATOR_COUNTER: AtomicU64 = AtomicU64::new(0);
227
228type AsofKey = SmallVec<[u8; 24]>;
231
232#[derive(Debug, Clone)]
237pub struct AsofRow {
238 pub timestamp: i64,
240 batch: Arc<RecordBatch>,
242}
243
244impl AsofRow {
245 fn new(timestamp: i64, batch: &Arc<RecordBatch>) -> Self {
249 Self {
250 timestamp,
251 batch: Arc::clone(batch),
252 }
253 }
254
255 #[must_use]
257 pub fn batch(&self) -> &RecordBatch {
258 &self.batch
259 }
260}
261
262#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
265struct SerializableAsofRow {
266 timestamp: i64,
267 data: Vec<u8>,
268}
269
270impl SerializableAsofRow {
271 fn from_row(row: &AsofRow) -> Result<Self, OperatorError> {
273 let mut buf = Vec::new();
274 {
275 let mut writer =
276 arrow_ipc::writer::StreamWriter::try_new(&mut buf, &row.batch.schema())
277 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
278 writer
279 .write(&row.batch)
280 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
281 writer
282 .finish()
283 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
284 }
285 Ok(Self {
286 timestamp: row.timestamp,
287 data: buf,
288 })
289 }
290
291 fn to_row(&self) -> Result<AsofRow, OperatorError> {
293 let cursor = std::io::Cursor::new(&self.data);
294 let mut reader = arrow_ipc::reader::StreamReader::try_new(cursor, None)
295 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
296 let batch = reader
297 .next()
298 .ok_or_else(|| OperatorError::SerializationFailed("Empty batch data".to_string()))?
299 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
300 Ok(AsofRow {
301 timestamp: self.timestamp,
302 batch: Arc::new(batch),
303 })
304 }
305}
306
307#[derive(Debug, Clone, Default)]
311pub struct KeyState {
312 pub events: BTreeMap<i64, SmallVec<[AsofRow; 1]>>,
315 pub min_timestamp: i64,
317 pub max_timestamp: i64,
319}
320
321impl KeyState {
322 #[must_use]
324 pub fn new() -> Self {
325 Self {
326 events: BTreeMap::new(),
327 min_timestamp: i64::MAX,
328 max_timestamp: i64::MIN,
329 }
330 }
331
332 pub fn insert(&mut self, row: AsofRow) {
334 let ts = row.timestamp;
335 self.events.entry(ts).or_default().push(row);
336 self.min_timestamp = self.min_timestamp.min(ts);
337 self.max_timestamp = self.max_timestamp.max(ts);
338 }
339
340 #[must_use]
342 pub fn len(&self) -> usize {
343 self.events.values().map(SmallVec::len).sum()
344 }
345
346 #[must_use]
348 pub fn is_empty(&self) -> bool {
349 self.events.is_empty()
350 }
351
352 pub fn cleanup_before(&mut self, threshold: i64) {
354 self.events = self.events.split_off(&threshold);
355 self.min_timestamp = self.events.keys().next().copied().unwrap_or(i64::MAX);
356 }
357}
358
359#[derive(Debug, Clone, Archive, RkyvSerialize, RkyvDeserialize)]
361struct SerializableKeyState {
362 events: Vec<(i64, Vec<SerializableAsofRow>)>,
363 min_timestamp: i64,
364 max_timestamp: i64,
365}
366
367impl SerializableKeyState {
368 fn from_key_state(state: &KeyState) -> Result<Self, OperatorError> {
370 let events = state
371 .events
372 .iter()
373 .map(|(ts, rows)| {
374 let ser_rows: Result<Vec<_>, _> =
375 rows.iter().map(SerializableAsofRow::from_row).collect();
376 ser_rows.map(|r| (*ts, r))
377 })
378 .collect::<Result<Vec<_>, _>>()?;
379 Ok(Self {
380 events,
381 min_timestamp: state.min_timestamp,
382 max_timestamp: state.max_timestamp,
383 })
384 }
385
386 fn to_key_state(&self) -> Result<KeyState, OperatorError> {
388 let mut events = BTreeMap::new();
389 for (ts, rows) in &self.events {
390 let asof_rows: Result<SmallVec<[AsofRow; 1]>, _> =
391 rows.iter().map(SerializableAsofRow::to_row).collect();
392 events.insert(*ts, asof_rows?);
393 }
394 Ok(KeyState {
395 events,
396 min_timestamp: self.min_timestamp,
397 max_timestamp: self.max_timestamp,
398 })
399 }
400}
401
402#[derive(Debug, Clone, Default)]
404pub struct AsofJoinMetrics {
405 pub left_events: u64,
407 pub right_events: u64,
409 pub matches: u64,
411 pub unmatched_left: u64,
413 pub within_tolerance: u64,
415 pub outside_tolerance: u64,
417 pub late_events: u64,
419 pub state_cleanups: u64,
421}
422
423impl AsofJoinMetrics {
424 #[must_use]
426 pub fn new() -> Self {
427 Self::default()
428 }
429
430 pub fn reset(&mut self) {
432 *self = Self::default();
433 }
434}
435
436pub struct AsofJoinOperator {
453 config: AsofJoinConfig,
455 operator_id: String,
457 right_state: FxHashMap<AsofKey, KeyState>,
459 watermark: i64,
461 metrics: AsofJoinMetrics,
463 output_schema: Option<SchemaRef>,
465 left_schema: Option<SchemaRef>,
467 right_schema: Option<SchemaRef>,
469 left_key_index: Option<usize>,
471 right_key_index: Option<usize>,
473}
474
475impl AsofJoinOperator {
476 #[must_use]
478 pub fn new(config: AsofJoinConfig) -> Self {
479 let operator_id = config.operator_id.clone().unwrap_or_else(|| {
480 let num = ASOF_OPERATOR_COUNTER.fetch_add(1, Ordering::Relaxed);
481 format!("asof_join_{num}")
482 });
483
484 Self {
485 config,
486 operator_id,
487 right_state: FxHashMap::default(),
488 watermark: i64::MIN,
489 metrics: AsofJoinMetrics::new(),
490 output_schema: None,
491 left_schema: None,
492 right_schema: None,
493 left_key_index: None,
494 right_key_index: None,
495 }
496 }
497
498 #[must_use]
500 pub fn with_id(mut config: AsofJoinConfig, operator_id: String) -> Self {
501 config.operator_id = Some(operator_id);
502 Self::new(config)
503 }
504
505 #[must_use]
507 pub fn config(&self) -> &AsofJoinConfig {
508 &self.config
509 }
510
511 #[must_use]
513 pub fn metrics(&self) -> &AsofJoinMetrics {
514 &self.metrics
515 }
516
517 pub fn reset_metrics(&mut self) {
519 self.metrics.reset();
520 }
521
522 #[must_use]
524 pub fn watermark(&self) -> i64 {
525 self.watermark
526 }
527
528 #[must_use]
530 pub fn state_size(&self) -> usize {
531 self.right_state.values().map(KeyState::len).sum()
532 }
533
534 pub fn process_left(&mut self, event: &Event, _ctx: &mut OperatorContext) -> OutputVec {
536 self.metrics.left_events += 1;
537
538 if self.left_schema.is_none() {
540 self.left_schema = Some(event.data.schema());
541 self.update_output_schema();
542 }
543
544 let mut output = OutputVec::new();
545
546 let Some(key_value) = Self::extract_key(
548 &event.data,
549 &self.config.key_column,
550 &mut self.left_key_index,
551 ) else {
552 return output;
553 };
554
555 let left_timestamp = event.timestamp;
557
558 let match_result = self.find_match(&key_value, left_timestamp);
560
561 match match_result {
562 Some(matched_row) => {
563 self.metrics.matches += 1;
564 self.metrics.within_tolerance += 1;
565
566 if let Some(joined) = self.create_joined_event(event, &matched_row) {
568 output.push(Output::Event(joined));
569 }
570 }
571 None => {
572 if self.config.join_type.emits_unmatched() {
573 self.metrics.unmatched_left += 1;
574 if let Some(unmatched) = self.create_unmatched_event(event) {
575 output.push(Output::Event(unmatched));
576 }
577 }
578 }
579 }
580
581 output
582 }
583
584 pub fn process_right(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
586 self.metrics.right_events += 1;
587
588 if self.right_schema.is_none() {
590 self.right_schema = Some(event.data.schema());
591 self.update_output_schema();
592 }
593
594 let output = OutputVec::new();
595
596 let Some(key_value) = Self::extract_key(
598 &event.data,
599 &self.config.key_column,
600 &mut self.right_key_index,
601 ) else {
602 return output;
603 };
604
605 if self.watermark > i64::MIN && event.timestamp < self.watermark {
607 self.metrics.late_events += 1;
608 }
610
611 let row = AsofRow::new(event.timestamp, &event.data);
613
614 let cleanup_time = self.calculate_cleanup_time(event.timestamp);
616
617 let key_state = self.right_state.entry(key_value).or_default();
618 key_state.insert(row);
619
620 let timer_key = Self::make_cleanup_timer_key(&key_state.max_timestamp.to_be_bytes());
622 ctx.timers
623 .register_timer(cleanup_time, Some(timer_key), Some(ctx.operator_index));
624
625 output
626 }
627
628 fn find_match(&self, key: &[u8], left_timestamp: i64) -> Option<AsofRow> {
630 let key_state = self.right_state.get(key)?;
631
632 match self.config.direction {
633 AsofDirection::Backward => self.find_backward_match(key_state, left_timestamp),
634 AsofDirection::Forward => self.find_forward_match(key_state, left_timestamp),
635 AsofDirection::Nearest => self.find_nearest_match(key_state, left_timestamp),
636 }
637 }
638
639 fn find_backward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
641 let (ts, rows) = key_state.events.range(..=left_timestamp).next_back()?;
643
644 let diff = left_timestamp - ts;
646 if diff > self.config.tolerance_ms() {
647 return None;
648 }
649
650 rows.last().cloned()
652 }
653
654 fn find_forward_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
656 let (ts, rows) = key_state.events.range(left_timestamp..).next()?;
658
659 let diff = ts - left_timestamp;
661 if diff > self.config.tolerance_ms() {
662 return None;
663 }
664
665 rows.first().cloned()
667 }
668
669 fn find_nearest_match(&self, key_state: &KeyState, left_timestamp: i64) -> Option<AsofRow> {
671 let before = key_state.events.range(..=left_timestamp).next_back();
672 let after = key_state.events.range(left_timestamp..).next();
673
674 let candidate = match (before, after) {
675 (Some((ts_before, rows_before)), Some((ts_after, rows_after))) => {
676 let diff_before = left_timestamp - ts_before;
677 let diff_after = ts_after - left_timestamp;
678 if diff_before <= diff_after {
679 Some((diff_before, rows_before.last()?.clone()))
680 } else {
681 Some((diff_after, rows_after.first()?.clone()))
682 }
683 }
684 (Some((ts, rows)), None) => {
685 let diff = left_timestamp - ts;
686 Some((diff, rows.last()?.clone()))
687 }
688 (None, Some((ts, rows))) => {
689 let diff = ts - left_timestamp;
690 Some((diff, rows.first()?.clone()))
691 }
692 (None, None) => None,
693 };
694
695 let (diff, row) = candidate?;
696
697 if diff > self.config.tolerance_ms() {
699 return None;
700 }
701
702 Some(row)
703 }
704
705 fn calculate_cleanup_time(&self, timestamp: i64) -> i64 {
707 let tolerance_ms = self.config.tolerance_ms();
708 match self.config.direction {
709 AsofDirection::Backward | AsofDirection::Nearest => {
712 if tolerance_ms == i64::MAX {
713 i64::MAX
714 } else {
715 timestamp.saturating_add(tolerance_ms)
716 }
717 }
718 AsofDirection::Forward => timestamp,
720 }
721 }
722
723 pub fn on_watermark(&mut self, watermark: i64, _ctx: &mut OperatorContext) -> OutputVec {
725 self.watermark = watermark;
726 self.cleanup_state(watermark);
727 OutputVec::new()
728 }
729
730 fn cleanup_state(&mut self, watermark: i64) {
732 let tolerance_ms = self.config.tolerance_ms();
733
734 let threshold = match self.config.direction {
735 AsofDirection::Backward | AsofDirection::Nearest => {
736 if tolerance_ms == i64::MAX {
737 i64::MIN } else {
739 watermark.saturating_sub(tolerance_ms)
740 }
741 }
742 AsofDirection::Forward => watermark,
743 };
744
745 if threshold == i64::MIN {
746 return;
747 }
748
749 let initial_count: usize = self.right_state.values().map(KeyState::len).sum();
750
751 for key_state in self.right_state.values_mut() {
752 key_state.cleanup_before(threshold);
753 }
754
755 self.right_state.retain(|_, v| !v.is_empty());
757
758 let final_count: usize = self.right_state.values().map(KeyState::len).sum();
759 if final_count < initial_count {
760 self.metrics.state_cleanups += (initial_count - final_count) as u64;
761 }
762 }
763
764 fn extract_key(
768 batch: &RecordBatch,
769 column_name: &str,
770 cached_index: &mut Option<usize>,
771 ) -> Option<AsofKey> {
772 let column_index = if let Some(idx) = *cached_index {
773 idx
774 } else {
775 let idx = batch.schema().index_of(column_name).ok()?;
776 *cached_index = Some(idx);
777 idx
778 };
779 let column = batch.column(column_index);
780
781 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
782 if string_array.is_empty() || string_array.is_null(0) {
783 return None;
784 }
785 return Some(AsofKey::from_slice(string_array.value(0).as_bytes()));
786 }
787
788 if let Some(int_array) = column.as_any().downcast_ref::<Int64Array>() {
789 if int_array.is_empty() || int_array.is_null(0) {
790 return None;
791 }
792 return Some(AsofKey::from_slice(&int_array.value(0).to_le_bytes()));
793 }
794
795 None
796 }
797
798 fn make_cleanup_timer_key(key_suffix: &[u8]) -> TimerKey {
800 let mut key = TimerKey::new();
801 key.push(ASOF_TIMER_PREFIX);
802 key.extend_from_slice(key_suffix);
803 key
804 }
805
806 fn update_output_schema(&mut self) {
808 if let (Some(left), Some(right)) = (&self.left_schema, &self.right_schema) {
809 let mut fields: Vec<Field> = left.fields().iter().map(|f| f.as_ref().clone()).collect();
810
811 for field in right.fields() {
813 let name = if left.field_with_name(field.name()).is_ok() {
814 format!("right_{}", field.name())
815 } else {
816 field.name().clone()
817 };
818 fields.push(Field::new(
819 name,
820 field.data_type().clone(),
821 true, ));
823 }
824
825 self.output_schema = Some(Arc::new(Schema::new(fields)));
826 }
827 }
828
829 fn create_joined_event(&self, left_event: &Event, right_row: &AsofRow) -> Option<Event> {
831 let schema = self.output_schema.as_ref()?;
832
833 let mut columns: Vec<ArrayRef> = left_event.data.columns().to_vec();
834 for column in right_row.batch().columns() {
835 columns.push(Arc::clone(column));
836 }
837
838 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
839
840 Some(Event::new(left_event.timestamp, joined_batch))
841 }
842
843 fn create_unmatched_event(&self, left_event: &Event) -> Option<Event> {
845 let schema = self.output_schema.as_ref()?;
846 let right_schema = self.right_schema.as_ref()?;
847
848 let num_rows = left_event.data.num_rows();
849 let mut columns: Vec<ArrayRef> = left_event.data.columns().to_vec();
850
851 for field in right_schema.fields() {
853 columns.push(Self::create_null_array(field.data_type(), num_rows));
854 }
855
856 let joined_batch = RecordBatch::try_new(Arc::clone(schema), columns).ok()?;
857
858 Some(Event::new(left_event.timestamp, joined_batch))
859 }
860
861 fn create_null_array(data_type: &DataType, num_rows: usize) -> ArrayRef {
863 match data_type {
864 DataType::Utf8 => Arc::new(StringArray::from(vec![None::<&str>; num_rows])) as ArrayRef,
865 DataType::Float64 => {
866 use arrow_array::Float64Array;
867 Arc::new(Float64Array::from(vec![None; num_rows])) as ArrayRef
868 }
869 _ => Arc::new(Int64Array::from(vec![None; num_rows])) as ArrayRef,
871 }
872 }
873}
874
875impl Operator for AsofJoinOperator {
876 fn process(&mut self, event: &Event, ctx: &mut OperatorContext) -> OutputVec {
877 self.process_left(event, ctx)
879 }
880
881 fn on_timer(&mut self, timer: Timer, _ctx: &mut OperatorContext) -> OutputVec {
882 if timer.key.first() == Some(&ASOF_TIMER_PREFIX) {
884 self.cleanup_state(timer.timestamp);
885 }
886 OutputVec::new()
887 }
888
889 fn checkpoint(&self) -> OperatorState {
890 let state_entries: Vec<(Vec<u8>, SerializableKeyState)> = self
892 .right_state
893 .iter()
894 .filter_map(|(k, v)| {
895 SerializableKeyState::from_key_state(v)
896 .ok()
897 .map(|s| (k.to_vec(), s))
898 })
899 .collect();
900
901 let checkpoint_data = (
902 self.watermark,
903 self.metrics.left_events,
904 self.metrics.right_events,
905 self.metrics.matches,
906 self.metrics.unmatched_left,
907 state_entries,
908 );
909
910 let data = rkyv::to_bytes::<RkyvError>(&checkpoint_data)
911 .map(|v| v.to_vec())
912 .unwrap_or_default();
913
914 OperatorState {
915 operator_id: self.operator_id.clone(),
916 data,
917 }
918 }
919
920 fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError> {
921 type CheckpointData = (
922 i64,
923 u64,
924 u64,
925 u64,
926 u64,
927 Vec<(Vec<u8>, SerializableKeyState)>,
928 );
929
930 if state.operator_id != self.operator_id {
931 return Err(OperatorError::StateAccessFailed(format!(
932 "Operator ID mismatch: expected {}, got {}",
933 self.operator_id, state.operator_id
934 )));
935 }
936
937 let archived = rkyv::access::<rkyv::Archived<CheckpointData>, RkyvError>(&state.data)
938 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
939 let (watermark, left_events, right_events, matches, unmatched_left, state_entries) =
940 rkyv::deserialize::<CheckpointData, RkyvError>(archived)
941 .map_err(|e| OperatorError::SerializationFailed(e.to_string()))?;
942
943 self.watermark = watermark;
944 self.metrics.left_events = left_events;
945 self.metrics.right_events = right_events;
946 self.metrics.matches = matches;
947 self.metrics.unmatched_left = unmatched_left;
948
949 self.right_state.clear();
951 for (key, serializable) in state_entries {
952 let key_state = serializable.to_key_state()?;
953 self.right_state
954 .insert(AsofKey::from_slice(&key), key_state);
955 }
956
957 Ok(())
958 }
959}
960
961#[cfg(test)]
962mod tests {
963 use super::*;
964 use crate::state::{InMemoryStore, StateStore};
965 use crate::time::{BoundedOutOfOrdernessGenerator, TimerService, WatermarkGenerator};
966 use arrow_array::Float64Array;
967 use arrow_schema::{DataType, Field, Schema};
968
969 fn create_trade_event(timestamp: i64, symbol: &str, price: f64) -> Event {
971 let schema = Arc::new(Schema::new(vec![
972 Field::new("symbol", DataType::Utf8, false),
973 Field::new("price", DataType::Float64, false),
974 ]));
975 let batch = RecordBatch::try_new(
976 schema,
977 vec![
978 Arc::new(StringArray::from(vec![symbol])),
979 Arc::new(Float64Array::from(vec![price])),
980 ],
981 )
982 .unwrap();
983 Event::new(timestamp, batch)
984 }
985
986 fn create_quote_event(timestamp: i64, symbol: &str, bid: f64, ask: f64) -> Event {
988 let schema = Arc::new(Schema::new(vec![
989 Field::new("symbol", DataType::Utf8, false),
990 Field::new("bid", DataType::Float64, false),
991 Field::new("ask", DataType::Float64, false),
992 ]));
993 let batch = RecordBatch::try_new(
994 schema,
995 vec![
996 Arc::new(StringArray::from(vec![symbol])),
997 Arc::new(Float64Array::from(vec![bid])),
998 Arc::new(Float64Array::from(vec![ask])),
999 ],
1000 )
1001 .unwrap();
1002 Event::new(timestamp, batch)
1003 }
1004
1005 fn create_test_context<'a>(
1006 timers: &'a mut TimerService,
1007 state: &'a mut dyn StateStore,
1008 watermark_gen: &'a mut dyn WatermarkGenerator,
1009 ) -> OperatorContext<'a> {
1010 OperatorContext {
1011 event_time: 0,
1012 processing_time: 0,
1013 timers,
1014 state,
1015 watermark_generator: watermark_gen,
1016 operator_index: 0,
1017 }
1018 }
1019
1020 #[test]
1021 fn test_asof_direction_default() {
1022 assert_eq!(AsofDirection::default(), AsofDirection::Backward);
1023 }
1024
1025 #[test]
1026 fn test_asof_join_type_properties() {
1027 assert!(!AsofJoinType::Inner.emits_unmatched());
1028 assert!(AsofJoinType::Left.emits_unmatched());
1029 }
1030
1031 #[test]
1032 fn test_config_builder() {
1033 let config = AsofJoinConfig::builder()
1034 .key_column("symbol".to_string())
1035 .left_time_column("trade_time".to_string())
1036 .right_time_column("quote_time".to_string())
1037 .direction(AsofDirection::Backward)
1038 .tolerance(Duration::from_secs(5))
1039 .join_type(AsofJoinType::Left)
1040 .operator_id("test_op".to_string())
1041 .build()
1042 .unwrap();
1043
1044 assert_eq!(config.key_column, "symbol");
1045 assert_eq!(config.left_time_column, "trade_time");
1046 assert_eq!(config.right_time_column, "quote_time");
1047 assert_eq!(config.direction, AsofDirection::Backward);
1048 assert_eq!(config.tolerance, Some(Duration::from_secs(5)));
1049 assert_eq!(config.join_type, AsofJoinType::Left);
1050 assert_eq!(config.tolerance_ms(), 5000);
1051 }
1052
1053 #[test]
1054 fn test_backward_asof_basic() {
1055 let config = AsofJoinConfig::builder()
1056 .key_column("symbol".to_string())
1057 .left_time_column("trade_time".to_string())
1058 .right_time_column("quote_time".to_string())
1059 .direction(AsofDirection::Backward)
1060 .tolerance(Duration::from_secs(10))
1061 .join_type(AsofJoinType::Inner)
1062 .build()
1063 .unwrap();
1064
1065 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1066
1067 let mut timers = TimerService::new();
1068 let mut state = InMemoryStore::new();
1069 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1070
1071 let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1073 {
1074 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1075 operator.process_right("e, &mut ctx);
1076 }
1077
1078 let quote2 = create_quote_event(950, "AAPL", 152.0, 153.0);
1080 {
1081 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1082 operator.process_right("e2, &mut ctx);
1083 }
1084
1085 let trade = create_trade_event(1000, "AAPL", 152.5);
1087 let outputs = {
1088 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1089 operator.process_left(&trade, &mut ctx)
1090 };
1091
1092 assert_eq!(
1093 outputs
1094 .iter()
1095 .filter(|o| matches!(o, Output::Event(_)))
1096 .count(),
1097 1
1098 );
1099 assert_eq!(operator.metrics().matches, 1);
1100
1101 if let Some(Output::Event(event)) = outputs.first() {
1103 assert_eq!(event.data.num_columns(), 5); }
1105 }
1106
1107 #[test]
1108 fn test_forward_asof_basic() {
1109 let config = AsofJoinConfig::builder()
1110 .key_column("symbol".to_string())
1111 .left_time_column("trade_time".to_string())
1112 .right_time_column("quote_time".to_string())
1113 .direction(AsofDirection::Forward)
1114 .tolerance(Duration::from_secs(10))
1115 .join_type(AsofJoinType::Inner)
1116 .build()
1117 .unwrap();
1118
1119 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1120
1121 let mut timers = TimerService::new();
1122 let mut state = InMemoryStore::new();
1123 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1124
1125 let quote1 = create_quote_event(1050, "AAPL", 150.0, 151.0);
1127 let quote2 = create_quote_event(1100, "AAPL", 152.0, 153.0);
1128 {
1129 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1130 operator.process_right("e1, &mut ctx);
1131 operator.process_right("e2, &mut ctx);
1132 }
1133
1134 let trade = create_trade_event(1000, "AAPL", 150.5);
1136 let outputs = {
1137 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1138 operator.process_left(&trade, &mut ctx)
1139 };
1140
1141 assert_eq!(
1142 outputs
1143 .iter()
1144 .filter(|o| matches!(o, Output::Event(_)))
1145 .count(),
1146 1
1147 );
1148 assert_eq!(operator.metrics().matches, 1);
1149 }
1150
1151 #[test]
1152 fn test_nearest_asof() {
1153 let config = AsofJoinConfig::builder()
1154 .key_column("symbol".to_string())
1155 .left_time_column("trade_time".to_string())
1156 .right_time_column("quote_time".to_string())
1157 .direction(AsofDirection::Nearest)
1158 .tolerance(Duration::from_secs(10))
1159 .join_type(AsofJoinType::Inner)
1160 .build()
1161 .unwrap();
1162
1163 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1164
1165 let mut timers = TimerService::new();
1166 let mut state = InMemoryStore::new();
1167 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1168
1169 let quote_before = create_quote_event(990, "AAPL", 150.0, 151.0);
1171 let quote_after = create_quote_event(1020, "AAPL", 152.0, 153.0);
1172 {
1173 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1174 operator.process_right("e_before, &mut ctx);
1175 operator.process_right("e_after, &mut ctx);
1176 }
1177
1178 let trade = create_trade_event(1000, "AAPL", 150.5);
1181 let outputs = {
1182 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1183 operator.process_left(&trade, &mut ctx)
1184 };
1185
1186 assert_eq!(
1187 outputs
1188 .iter()
1189 .filter(|o| matches!(o, Output::Event(_)))
1190 .count(),
1191 1
1192 );
1193 }
1194
1195 #[test]
1196 fn test_tolerance_exceeded() {
1197 let config = AsofJoinConfig::builder()
1198 .key_column("symbol".to_string())
1199 .left_time_column("trade_time".to_string())
1200 .right_time_column("quote_time".to_string())
1201 .direction(AsofDirection::Backward)
1202 .tolerance(Duration::from_millis(50)) .join_type(AsofJoinType::Inner)
1204 .build()
1205 .unwrap();
1206
1207 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1208
1209 let mut timers = TimerService::new();
1210 let mut state = InMemoryStore::new();
1211 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1212
1213 let quote = create_quote_event(900, "AAPL", 150.0, 151.0);
1215 {
1216 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1217 operator.process_right("e, &mut ctx);
1218 }
1219
1220 let trade = create_trade_event(1000, "AAPL", 150.5);
1222 let outputs = {
1223 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1224 operator.process_left(&trade, &mut ctx)
1225 };
1226
1227 assert_eq!(outputs.len(), 0);
1229 assert_eq!(operator.metrics().matches, 0);
1230 }
1231
1232 #[test]
1233 fn test_tolerance_within() {
1234 let config = AsofJoinConfig::builder()
1235 .key_column("symbol".to_string())
1236 .left_time_column("trade_time".to_string())
1237 .right_time_column("quote_time".to_string())
1238 .direction(AsofDirection::Backward)
1239 .tolerance(Duration::from_millis(100)) .join_type(AsofJoinType::Inner)
1241 .build()
1242 .unwrap();
1243
1244 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1245
1246 let mut timers = TimerService::new();
1247 let mut state = InMemoryStore::new();
1248 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1249
1250 let quote = create_quote_event(950, "AAPL", 150.0, 151.0);
1252 {
1253 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1254 operator.process_right("e, &mut ctx);
1255 }
1256
1257 let trade = create_trade_event(1000, "AAPL", 150.5);
1259 let outputs = {
1260 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1261 operator.process_left(&trade, &mut ctx)
1262 };
1263
1264 assert_eq!(
1265 outputs
1266 .iter()
1267 .filter(|o| matches!(o, Output::Event(_)))
1268 .count(),
1269 1
1270 );
1271 assert_eq!(operator.metrics().within_tolerance, 1);
1272 }
1273
1274 #[test]
1275 fn test_no_match_empty_state() {
1276 let config = AsofJoinConfig::builder()
1277 .key_column("symbol".to_string())
1278 .left_time_column("trade_time".to_string())
1279 .right_time_column("quote_time".to_string())
1280 .direction(AsofDirection::Backward)
1281 .join_type(AsofJoinType::Inner)
1282 .build()
1283 .unwrap();
1284
1285 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1286
1287 let mut timers = TimerService::new();
1288 let mut state = InMemoryStore::new();
1289 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1290
1291 let trade = create_trade_event(1000, "AAPL", 150.5);
1293 let outputs = {
1294 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1295 operator.process_left(&trade, &mut ctx)
1296 };
1297
1298 assert_eq!(outputs.len(), 0);
1299 assert_eq!(operator.metrics().matches, 0);
1300 }
1301
1302 #[test]
1303 fn test_multiple_keys() {
1304 let config = AsofJoinConfig::builder()
1305 .key_column("symbol".to_string())
1306 .left_time_column("trade_time".to_string())
1307 .right_time_column("quote_time".to_string())
1308 .direction(AsofDirection::Backward)
1309 .tolerance(Duration::from_secs(10))
1310 .join_type(AsofJoinType::Inner)
1311 .build()
1312 .unwrap();
1313
1314 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1315
1316 let mut timers = TimerService::new();
1317 let mut state = InMemoryStore::new();
1318 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1319
1320 {
1322 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1323 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1324 operator.process_right(&create_quote_event(960, "GOOG", 2800.0, 2801.0), &mut ctx);
1325 }
1326
1327 let trade = create_trade_event(1000, "AAPL", 150.5);
1329 let outputs = {
1330 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1331 operator.process_left(&trade, &mut ctx)
1332 };
1333
1334 assert_eq!(
1335 outputs
1336 .iter()
1337 .filter(|o| matches!(o, Output::Event(_)))
1338 .count(),
1339 1
1340 );
1341
1342 let trade2 = create_trade_event(1000, "GOOG", 2800.5);
1344 let outputs2 = {
1345 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1346 operator.process_left(&trade2, &mut ctx)
1347 };
1348
1349 assert_eq!(
1350 outputs2
1351 .iter()
1352 .filter(|o| matches!(o, Output::Event(_)))
1353 .count(),
1354 1
1355 );
1356 assert_eq!(operator.metrics().matches, 2);
1357 }
1358
1359 #[test]
1360 fn test_multiple_events_same_timestamp() {
1361 let config = AsofJoinConfig::builder()
1362 .key_column("symbol".to_string())
1363 .left_time_column("trade_time".to_string())
1364 .right_time_column("quote_time".to_string())
1365 .direction(AsofDirection::Backward)
1366 .tolerance(Duration::from_secs(10))
1367 .join_type(AsofJoinType::Inner)
1368 .build()
1369 .unwrap();
1370
1371 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1372
1373 let mut timers = TimerService::new();
1374 let mut state = InMemoryStore::new();
1375 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1376
1377 {
1379 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1380 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1381 operator.process_right(&create_quote_event(950, "AAPL", 150.5, 151.5), &mut ctx);
1382 }
1384
1385 let trade = create_trade_event(1000, "AAPL", 150.5);
1387 let outputs = {
1388 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1389 operator.process_left(&trade, &mut ctx)
1390 };
1391
1392 assert_eq!(
1393 outputs
1394 .iter()
1395 .filter(|o| matches!(o, Output::Event(_)))
1396 .count(),
1397 1
1398 );
1399 }
1400
1401 #[test]
1402 fn test_left_outer_join() {
1403 let config = AsofJoinConfig::builder()
1404 .key_column("symbol".to_string())
1405 .left_time_column("trade_time".to_string())
1406 .right_time_column("quote_time".to_string())
1407 .direction(AsofDirection::Backward)
1408 .tolerance(Duration::from_millis(50))
1409 .join_type(AsofJoinType::Left) .build()
1411 .unwrap();
1412
1413 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1414
1415 let mut timers = TimerService::new();
1416 let mut state = InMemoryStore::new();
1417 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1418
1419 {
1421 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1422 operator.process_right(&create_quote_event(990, "AAPL", 150.0, 151.0), &mut ctx);
1423 }
1424 {
1425 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1426 operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1427 }
1428
1429 let trade = create_trade_event(2000, "GOOG", 2800.5);
1431 let outputs = {
1432 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1433 operator.process_left(&trade, &mut ctx)
1434 };
1435
1436 assert_eq!(
1438 outputs
1439 .iter()
1440 .filter(|o| matches!(o, Output::Event(_)))
1441 .count(),
1442 1
1443 );
1444 assert_eq!(operator.metrics().unmatched_left, 1);
1445
1446 if let Some(Output::Event(event)) = outputs.first() {
1447 assert_eq!(event.data.num_columns(), 5);
1449 }
1450 }
1451
1452 #[test]
1453 fn test_inner_join_no_output() {
1454 let config = AsofJoinConfig::builder()
1455 .key_column("symbol".to_string())
1456 .left_time_column("trade_time".to_string())
1457 .right_time_column("quote_time".to_string())
1458 .direction(AsofDirection::Backward)
1459 .tolerance(Duration::from_millis(50))
1460 .join_type(AsofJoinType::Inner)
1461 .build()
1462 .unwrap();
1463
1464 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1465
1466 let mut timers = TimerService::new();
1467 let mut state = InMemoryStore::new();
1468 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1469
1470 let trade = create_trade_event(1000, "AAPL", 150.5);
1472 let outputs = {
1473 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1474 operator.process_left(&trade, &mut ctx)
1475 };
1476
1477 assert_eq!(outputs.len(), 0);
1479 }
1480
1481 #[test]
1482 fn test_state_cleanup() {
1483 let config = AsofJoinConfig::builder()
1484 .key_column("symbol".to_string())
1485 .left_time_column("trade_time".to_string())
1486 .right_time_column("quote_time".to_string())
1487 .direction(AsofDirection::Backward)
1488 .tolerance(Duration::from_millis(100))
1489 .join_type(AsofJoinType::Inner)
1490 .build()
1491 .unwrap();
1492
1493 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1494
1495 let mut timers = TimerService::new();
1496 let mut state = InMemoryStore::new();
1497 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1498
1499 {
1501 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1502 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1503 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1504 }
1505
1506 assert_eq!(operator.state_size(), 2);
1507
1508 {
1510 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1511 operator.on_watermark(1100, &mut ctx);
1512 }
1513
1514 assert!(operator.state_size() < 2 || operator.metrics().state_cleanups > 0);
1516 }
1517
1518 #[test]
1519 fn test_late_event_still_stored() {
1520 let config = AsofJoinConfig::builder()
1521 .key_column("symbol".to_string())
1522 .left_time_column("trade_time".to_string())
1523 .right_time_column("quote_time".to_string())
1524 .direction(AsofDirection::Backward)
1525 .tolerance(Duration::from_secs(10))
1526 .join_type(AsofJoinType::Inner)
1527 .build()
1528 .unwrap();
1529
1530 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1531
1532 let mut timers = TimerService::new();
1533 let mut state = InMemoryStore::new();
1534 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1535
1536 operator.watermark = 1000;
1538
1539 let quote = create_quote_event(500, "AAPL", 150.0, 151.0);
1541 {
1542 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1543 operator.process_right("e, &mut ctx);
1544 }
1545
1546 assert_eq!(operator.metrics().late_events, 1);
1547 assert_eq!(operator.state_size(), 1);
1549 }
1550
1551 #[test]
1552 fn test_checkpoint_restore() {
1553 let config = AsofJoinConfig::builder()
1554 .key_column("symbol".to_string())
1555 .left_time_column("trade_time".to_string())
1556 .right_time_column("quote_time".to_string())
1557 .direction(AsofDirection::Backward)
1558 .tolerance(Duration::from_secs(10))
1559 .join_type(AsofJoinType::Inner)
1560 .build()
1561 .unwrap();
1562
1563 let mut operator = AsofJoinOperator::with_id(config.clone(), "test_asof".to_string());
1564
1565 let mut timers = TimerService::new();
1566 let mut state = InMemoryStore::new();
1567 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1568
1569 {
1571 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1572 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1573 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1574 }
1575
1576 operator.metrics.left_events = 10;
1578 operator.metrics.matches = 5;
1579 operator.watermark = 800;
1580
1581 let checkpoint = operator.checkpoint();
1583
1584 let mut restored = AsofJoinOperator::with_id(config, "test_asof".to_string());
1586 restored.restore(checkpoint).unwrap();
1587
1588 assert_eq!(restored.metrics().left_events, 10);
1590 assert_eq!(restored.metrics().matches, 5);
1591 assert_eq!(restored.watermark(), 800);
1592 assert_eq!(restored.state_size(), 2);
1593 }
1594
1595 #[test]
1596 fn test_schema_composition() {
1597 let config = AsofJoinConfig::builder()
1598 .key_column("symbol".to_string())
1599 .left_time_column("trade_time".to_string())
1600 .right_time_column("quote_time".to_string())
1601 .direction(AsofDirection::Backward)
1602 .tolerance(Duration::from_secs(10))
1603 .join_type(AsofJoinType::Inner)
1604 .build()
1605 .unwrap();
1606
1607 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1608
1609 let mut timers = TimerService::new();
1610 let mut state = InMemoryStore::new();
1611 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1612
1613 {
1615 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1616 operator.process_right(&create_quote_event(950, "AAPL", 150.0, 151.0), &mut ctx);
1617 }
1618
1619 let trade = create_trade_event(1000, "AAPL", 150.5);
1621 let outputs = {
1622 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1623 operator.process_left(&trade, &mut ctx)
1624 };
1625
1626 assert_eq!(outputs.len(), 1);
1627
1628 if let Some(Output::Event(event)) = outputs.first() {
1629 let schema = event.data.schema();
1630
1631 assert!(schema.field_with_name("price").is_ok());
1633
1634 assert!(schema.field_with_name("right_symbol").is_ok());
1636 assert!(schema.field_with_name("bid").is_ok());
1637 assert!(schema.field_with_name("ask").is_ok());
1638 }
1639 }
1640
1641 #[test]
1642 fn test_metrics_tracking() {
1643 let config = AsofJoinConfig::builder()
1644 .key_column("symbol".to_string())
1645 .left_time_column("trade_time".to_string())
1646 .right_time_column("quote_time".to_string())
1647 .direction(AsofDirection::Backward)
1648 .tolerance(Duration::from_secs(10))
1649 .join_type(AsofJoinType::Inner)
1650 .build()
1651 .unwrap();
1652
1653 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1654
1655 let mut timers = TimerService::new();
1656 let mut state = InMemoryStore::new();
1657 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1658
1659 {
1661 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1662 operator.process_right(&create_quote_event(900, "AAPL", 150.0, 151.0), &mut ctx);
1663 operator.process_right(&create_quote_event(950, "AAPL", 152.0, 153.0), &mut ctx);
1664 }
1665
1666 {
1667 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1668 operator.process_left(&create_trade_event(1000, "AAPL", 150.5), &mut ctx);
1669 operator.process_left(&create_trade_event(1100, "AAPL", 151.5), &mut ctx);
1670 }
1671
1672 assert_eq!(operator.metrics().right_events, 2);
1673 assert_eq!(operator.metrics().left_events, 2);
1674 assert_eq!(operator.metrics().matches, 2);
1675 assert_eq!(operator.metrics().within_tolerance, 2);
1676 }
1677
1678 #[test]
1679 fn test_key_state_operations() {
1680 let mut key_state = KeyState::new();
1681 assert!(key_state.is_empty());
1682
1683 let empty_batch = Arc::new(RecordBatch::new_empty(Arc::new(Schema::empty())));
1685 let row1 = AsofRow::new(100, &empty_batch);
1686 let row2 = AsofRow::new(200, &empty_batch);
1687
1688 key_state.insert(row1);
1689 key_state.insert(row2);
1690
1691 assert_eq!(key_state.len(), 2);
1692 assert_eq!(key_state.min_timestamp, 100);
1693 assert_eq!(key_state.max_timestamp, 200);
1694
1695 key_state.cleanup_before(150);
1697 assert_eq!(key_state.len(), 1);
1698 assert_eq!(key_state.min_timestamp, 200);
1699 }
1700
1701 #[test]
1702 fn test_asof_row_serialization() {
1703 let schema = Arc::new(Schema::new(vec![
1704 Field::new("symbol", DataType::Utf8, false),
1705 Field::new("value", DataType::Float64, false),
1706 ]));
1707 let batch = Arc::new(
1708 RecordBatch::try_new(
1709 schema,
1710 vec![
1711 Arc::new(StringArray::from(vec!["AAPL"])),
1712 Arc::new(Float64Array::from(vec![150.5])),
1713 ],
1714 )
1715 .unwrap(),
1716 );
1717
1718 let row = AsofRow::new(1000, &batch);
1719
1720 let serializable = SerializableAsofRow::from_row(&row).unwrap();
1722 let restored = serializable.to_row().unwrap();
1723 assert_eq!(restored.batch().num_rows(), 1);
1724 assert_eq!(restored.batch().num_columns(), 2);
1725 assert_eq!(restored.timestamp, 1000);
1726 }
1727
1728 #[test]
1729 fn test_metrics_reset() {
1730 let mut metrics = AsofJoinMetrics::new();
1731 metrics.left_events = 100;
1732 metrics.matches = 50;
1733
1734 metrics.reset();
1735
1736 assert_eq!(metrics.left_events, 0);
1737 assert_eq!(metrics.matches, 0);
1738 }
1739
1740 #[test]
1741 fn test_unlimited_tolerance() {
1742 let config = AsofJoinConfig::builder()
1743 .key_column("symbol".to_string())
1744 .left_time_column("trade_time".to_string())
1745 .right_time_column("quote_time".to_string())
1746 .direction(AsofDirection::Backward)
1747 .join_type(AsofJoinType::Inner)
1749 .build()
1750 .unwrap();
1751
1752 assert_eq!(config.tolerance_ms(), i64::MAX);
1753
1754 let mut operator = AsofJoinOperator::with_id(config, "test_asof".to_string());
1755
1756 let mut timers = TimerService::new();
1757 let mut state = InMemoryStore::new();
1758 let mut watermark_gen = BoundedOutOfOrdernessGenerator::new(100);
1759
1760 {
1762 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1763 operator.process_right(&create_quote_event(100, "AAPL", 150.0, 151.0), &mut ctx);
1764 }
1765
1766 let trade = create_trade_event(1_000_000, "AAPL", 150.5);
1768 let outputs = {
1769 let mut ctx = create_test_context(&mut timers, &mut state, &mut watermark_gen);
1770 operator.process_left(&trade, &mut ctx)
1771 };
1772
1773 assert_eq!(
1774 outputs
1775 .iter()
1776 .filter(|o| matches!(o, Output::Event(_)))
1777 .count(),
1778 1
1779 );
1780 }
1781}