1use std::fmt;
17use std::sync::Arc;
18
19use arrow::array::{
20 Array, Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21 TimestampNanosecondArray, TimestampSecondArray,
22};
23use arrow::datatypes::{DataType, Schema, TimeUnit};
24use arrow::record_batch::RecordBatch;
25use arrow_cast::parse::string_to_datetime;
26use chrono::Utc;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum TimestampFormat {
31 UnixMillis,
33 UnixSeconds,
35 UnixMicros,
37 UnixNanos,
39 Iso8601,
41 ArrowNative,
43}
44
45impl fmt::Display for TimestampFormat {
46 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 TimestampFormat::UnixMillis => write!(f, "UnixMillis"),
49 TimestampFormat::UnixSeconds => write!(f, "UnixSeconds"),
50 TimestampFormat::UnixMicros => write!(f, "UnixMicros"),
51 TimestampFormat::UnixNanos => write!(f, "UnixNanos"),
52 TimestampFormat::Iso8601 => write!(f, "ISO8601"),
53 TimestampFormat::ArrowNative => write!(f, "ArrowNative"),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub enum TimestampField {
61 Name(String),
63 Index(usize),
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
69pub enum ExtractionMode {
70 #[default]
72 First,
73 Last,
75 Max,
77 Min,
79}
80
81#[derive(Debug, thiserror::Error)]
83pub enum EventTimeError {
84 #[error("Column not found: {0}")]
86 ColumnNotFound(String),
87
88 #[error("Column index {index} out of bounds (batch has {num_columns} columns)")]
90 IndexOutOfBounds {
91 index: usize,
93 num_columns: usize,
95 },
96
97 #[error("Incompatible type for format {format}: expected {expected}, found {found}")]
99 IncompatibleType {
100 format: TimestampFormat,
102 expected: String,
104 found: String,
106 },
107
108 #[error("Failed to parse timestamp '{value}': {reason}")]
110 ParseError {
111 value: String,
113 reason: String,
115 },
116
117 #[error("Null timestamp at row {row}")]
119 NullTimestamp {
120 row: usize,
122 },
123
124 #[error("Cannot extract timestamp from empty batch")]
126 EmptyBatch,
127}
128
129#[derive(Debug)]
134pub struct EventTimeExtractor {
135 field: TimestampField,
136 format: TimestampFormat,
137 mode: ExtractionMode,
138 cached_index: Option<usize>,
139}
140
141impl EventTimeExtractor {
142 #[must_use]
146 pub fn from_column(name: &str, format: TimestampFormat) -> Self {
147 Self {
148 field: TimestampField::Name(name.to_string()),
149 format,
150 mode: ExtractionMode::default(),
151 cached_index: None,
152 }
153 }
154
155 #[must_use]
159 pub fn from_index(index: usize, format: TimestampFormat) -> Self {
160 Self {
161 field: TimestampField::Index(index),
162 format,
163 mode: ExtractionMode::default(),
164 cached_index: Some(index),
165 }
166 }
167
168 #[must_use]
170 pub fn with_mode(mut self, mode: ExtractionMode) -> Self {
171 self.mode = mode;
172 self
173 }
174
175 #[must_use]
177 pub fn format(&self) -> TimestampFormat {
178 self.format
179 }
180
181 #[must_use]
183 pub fn mode(&self) -> ExtractionMode {
184 self.mode
185 }
186
187 pub fn validate_schema(&self, schema: &Schema) -> Result<(), EventTimeError> {
193 let (index, data_type) = self.resolve_column(schema)?;
194 self.validate_type(data_type, index)?;
195 Ok(())
196 }
197
198 pub fn extract(&mut self, batch: &RecordBatch) -> Result<i64, EventTimeError> {
211 if batch.num_rows() == 0 {
212 return Err(EventTimeError::EmptyBatch);
213 }
214
215 let index = self.get_column_index(batch.schema().as_ref())?;
216 let column = batch.column(index);
217
218 self.extract_from_column(column)
219 }
220
221 fn get_column_index(&mut self, schema: &Schema) -> Result<usize, EventTimeError> {
223 if let Some(idx) = self.cached_index {
224 if idx < schema.fields().len() {
226 return Ok(idx);
227 }
228 }
229
230 let (index, _) = self.resolve_column(schema)?;
231 self.cached_index = Some(index);
232 Ok(index)
233 }
234
235 fn resolve_column<'a>(
237 &self,
238 schema: &'a Schema,
239 ) -> Result<(usize, &'a DataType), EventTimeError> {
240 match &self.field {
241 TimestampField::Name(name) => {
242 let index = schema
243 .index_of(name)
244 .map_err(|_| EventTimeError::ColumnNotFound(name.clone()))?;
245 let data_type = schema.field(index).data_type();
246 Ok((index, data_type))
247 }
248 TimestampField::Index(index) => {
249 if *index >= schema.fields().len() {
250 return Err(EventTimeError::IndexOutOfBounds {
251 index: *index,
252 num_columns: schema.fields().len(),
253 });
254 }
255 let data_type = schema.field(*index).data_type();
256 Ok((*index, data_type))
257 }
258 }
259 }
260
261 fn validate_type(&self, data_type: &DataType, _index: usize) -> Result<(), EventTimeError> {
263 match self.format {
264 TimestampFormat::UnixMillis
265 | TimestampFormat::UnixSeconds
266 | TimestampFormat::UnixMicros
267 | TimestampFormat::UnixNanos => {
268 if !matches!(data_type, DataType::Int64) {
269 return Err(EventTimeError::IncompatibleType {
270 format: self.format,
271 expected: "Int64".to_string(),
272 found: format!("{data_type:?}"),
273 });
274 }
275 }
276 TimestampFormat::Iso8601 => {
277 if !matches!(data_type, DataType::Utf8 | DataType::LargeUtf8) {
278 return Err(EventTimeError::IncompatibleType {
279 format: self.format,
280 expected: "Utf8 or LargeUtf8".to_string(),
281 found: format!("{data_type:?}"),
282 });
283 }
284 }
285 TimestampFormat::ArrowNative => {
286 if !matches!(data_type, DataType::Timestamp(_, _)) {
287 return Err(EventTimeError::IncompatibleType {
288 format: self.format,
289 expected: "Timestamp".to_string(),
290 found: format!("{data_type:?}"),
291 });
292 }
293 }
294 }
295 Ok(())
296 }
297
298 fn extract_from_column(&self, column: &Arc<dyn Array>) -> Result<i64, EventTimeError> {
300 match self.format {
301 TimestampFormat::UnixMillis => {
302 let array = column
303 .as_any()
304 .downcast_ref::<Int64Array>()
305 .ok_or_else(|| EventTimeError::IncompatibleType {
306 format: self.format,
307 expected: "Int64".to_string(),
308 found: format!("{:?}", column.data_type()),
309 })?;
310 self.extract_i64(array, |v| v)
311 }
312 TimestampFormat::UnixSeconds => {
313 let array = column
314 .as_any()
315 .downcast_ref::<Int64Array>()
316 .ok_or_else(|| EventTimeError::IncompatibleType {
317 format: self.format,
318 expected: "Int64".to_string(),
319 found: format!("{:?}", column.data_type()),
320 })?;
321 self.extract_i64(array, |v| v.saturating_mul(1000))
322 }
323 TimestampFormat::UnixMicros => {
324 let array = column
325 .as_any()
326 .downcast_ref::<Int64Array>()
327 .ok_or_else(|| EventTimeError::IncompatibleType {
328 format: self.format,
329 expected: "Int64".to_string(),
330 found: format!("{:?}", column.data_type()),
331 })?;
332 self.extract_i64(array, |v| v / 1000)
333 }
334 TimestampFormat::UnixNanos => {
335 let array = column
336 .as_any()
337 .downcast_ref::<Int64Array>()
338 .ok_or_else(|| EventTimeError::IncompatibleType {
339 format: self.format,
340 expected: "Int64".to_string(),
341 found: format!("{:?}", column.data_type()),
342 })?;
343 self.extract_i64(array, |v| v / 1_000_000)
344 }
345 TimestampFormat::Iso8601 => {
346 let array = column
347 .as_any()
348 .downcast_ref::<StringArray>()
349 .ok_or_else(|| EventTimeError::IncompatibleType {
350 format: self.format,
351 expected: "Utf8".to_string(),
352 found: format!("{:?}", column.data_type()),
353 })?;
354 self.extract_iso8601(array)
355 }
356 TimestampFormat::ArrowNative => self.extract_arrow_timestamp(column),
357 }
358 }
359
360 fn extract_i64<F>(&self, array: &Int64Array, convert: F) -> Result<i64, EventTimeError>
362 where
363 F: Fn(i64) -> i64,
364 {
365 match self.mode {
366 ExtractionMode::First => {
367 if array.is_null(0) {
368 Err(EventTimeError::NullTimestamp { row: 0 })
369 } else {
370 Ok(convert(array.value(0)))
371 }
372 }
373 ExtractionMode::Last => {
374 let last = array.len() - 1;
375 if array.is_null(last) {
376 Err(EventTimeError::NullTimestamp { row: last })
377 } else {
378 Ok(convert(array.value(last)))
379 }
380 }
381 ExtractionMode::Max => {
382 let mut max_val = i64::MIN;
383 let mut found = false;
384 for i in 0..array.len() {
385 if !array.is_null(i) {
386 found = true;
387 let v = convert(array.value(i));
388 if v > max_val {
389 max_val = v;
390 }
391 }
392 }
393 if found {
394 Ok(max_val)
395 } else {
396 Err(EventTimeError::NullTimestamp { row: 0 })
397 }
398 }
399 ExtractionMode::Min => {
400 let mut min_val = i64::MAX;
401 let mut found = false;
402 for i in 0..array.len() {
403 if !array.is_null(i) {
404 found = true;
405 let v = convert(array.value(i));
406 if v < min_val {
407 min_val = v;
408 }
409 }
410 }
411 if found {
412 Ok(min_val)
413 } else {
414 Err(EventTimeError::NullTimestamp { row: 0 })
415 }
416 }
417 }
418 }
419
420 fn extract_iso8601(&self, array: &StringArray) -> Result<i64, EventTimeError> {
422 let parse_value = |idx: usize| -> Result<i64, EventTimeError> {
423 if array.is_null(idx) {
424 return Err(EventTimeError::NullTimestamp { row: idx });
425 }
426 let s = array.value(idx);
427 let dt = string_to_datetime(&Utc, s).map_err(|e| EventTimeError::ParseError {
428 value: s.to_string(),
429 reason: e.to_string(),
430 })?;
431 Ok(dt.timestamp_millis())
432 };
433
434 match self.mode {
435 ExtractionMode::First => parse_value(0),
436 ExtractionMode::Last => parse_value(array.len() - 1),
437 ExtractionMode::Max => {
438 let mut max_val = i64::MIN;
439 let mut found = false;
440 for i in 0..array.len() {
441 if !array.is_null(i) {
442 let v = parse_value(i)?;
443 found = true;
444 if v > max_val {
445 max_val = v;
446 }
447 }
448 }
449 if found {
450 Ok(max_val)
451 } else {
452 Err(EventTimeError::NullTimestamp { row: 0 })
453 }
454 }
455 ExtractionMode::Min => {
456 let mut min_val = i64::MAX;
457 let mut found = false;
458 for i in 0..array.len() {
459 if !array.is_null(i) {
460 let v = parse_value(i)?;
461 found = true;
462 if v < min_val {
463 min_val = v;
464 }
465 }
466 }
467 if found {
468 Ok(min_val)
469 } else {
470 Err(EventTimeError::NullTimestamp { row: 0 })
471 }
472 }
473 }
474 }
475
476 fn extract_arrow_timestamp(&self, column: &Arc<dyn Array>) -> Result<i64, EventTimeError> {
478 match column.data_type() {
479 DataType::Timestamp(TimeUnit::Second, _) => {
480 let array = column
481 .as_any()
482 .downcast_ref::<TimestampSecondArray>()
483 .ok_or_else(|| EventTimeError::IncompatibleType {
484 format: self.format,
485 expected: "TimestampSecond".to_string(),
486 found: format!("{:?}", column.data_type()),
487 })?;
488 self.extract_ts_seconds(array)
489 }
490 DataType::Timestamp(TimeUnit::Millisecond, _) => {
491 let array = column
492 .as_any()
493 .downcast_ref::<TimestampMillisecondArray>()
494 .ok_or_else(|| EventTimeError::IncompatibleType {
495 format: self.format,
496 expected: "TimestampMillisecond".to_string(),
497 found: format!("{:?}", column.data_type()),
498 })?;
499 self.extract_ts_millis(array)
500 }
501 DataType::Timestamp(TimeUnit::Microsecond, _) => {
502 let array = column
503 .as_any()
504 .downcast_ref::<TimestampMicrosecondArray>()
505 .ok_or_else(|| EventTimeError::IncompatibleType {
506 format: self.format,
507 expected: "TimestampMicrosecond".to_string(),
508 found: format!("{:?}", column.data_type()),
509 })?;
510 self.extract_ts_micros(array)
511 }
512 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
513 let array = column
514 .as_any()
515 .downcast_ref::<TimestampNanosecondArray>()
516 .ok_or_else(|| EventTimeError::IncompatibleType {
517 format: self.format,
518 expected: "TimestampNanosecond".to_string(),
519 found: format!("{:?}", column.data_type()),
520 })?;
521 self.extract_ts_nanos(array)
522 }
523 _ => Err(EventTimeError::IncompatibleType {
524 format: self.format,
525 expected: "Timestamp".to_string(),
526 found: format!("{:?}", column.data_type()),
527 }),
528 }
529 }
530
531 fn extract_ts_seconds(&self, array: &TimestampSecondArray) -> Result<i64, EventTimeError> {
533 match self.mode {
534 ExtractionMode::First => {
535 if array.is_null(0) {
536 Err(EventTimeError::NullTimestamp { row: 0 })
537 } else {
538 Ok(array.value(0).saturating_mul(1000))
539 }
540 }
541 ExtractionMode::Last => {
542 let last = array.len() - 1;
543 if array.is_null(last) {
544 Err(EventTimeError::NullTimestamp { row: last })
545 } else {
546 Ok(array.value(last).saturating_mul(1000))
547 }
548 }
549 ExtractionMode::Max => {
550 let mut max_val = i64::MIN;
551 let mut found = false;
552 for i in 0..array.len() {
553 if !array.is_null(i) {
554 found = true;
555 let v = array.value(i).saturating_mul(1000);
556 if v > max_val {
557 max_val = v;
558 }
559 }
560 }
561 if found {
562 Ok(max_val)
563 } else {
564 Err(EventTimeError::NullTimestamp { row: 0 })
565 }
566 }
567 ExtractionMode::Min => {
568 let mut min_val = i64::MAX;
569 let mut found = false;
570 for i in 0..array.len() {
571 if !array.is_null(i) {
572 found = true;
573 let v = array.value(i).saturating_mul(1000);
574 if v < min_val {
575 min_val = v;
576 }
577 }
578 }
579 if found {
580 Ok(min_val)
581 } else {
582 Err(EventTimeError::NullTimestamp { row: 0 })
583 }
584 }
585 }
586 }
587
588 fn extract_ts_millis(&self, array: &TimestampMillisecondArray) -> Result<i64, EventTimeError> {
590 match self.mode {
591 ExtractionMode::First => {
592 if array.is_null(0) {
593 Err(EventTimeError::NullTimestamp { row: 0 })
594 } else {
595 Ok(array.value(0))
596 }
597 }
598 ExtractionMode::Last => {
599 let last = array.len() - 1;
600 if array.is_null(last) {
601 Err(EventTimeError::NullTimestamp { row: last })
602 } else {
603 Ok(array.value(last))
604 }
605 }
606 ExtractionMode::Max => {
607 let mut max_val = i64::MIN;
608 let mut found = false;
609 for i in 0..array.len() {
610 if !array.is_null(i) {
611 found = true;
612 let v = array.value(i);
613 if v > max_val {
614 max_val = v;
615 }
616 }
617 }
618 if found {
619 Ok(max_val)
620 } else {
621 Err(EventTimeError::NullTimestamp { row: 0 })
622 }
623 }
624 ExtractionMode::Min => {
625 let mut min_val = i64::MAX;
626 let mut found = false;
627 for i in 0..array.len() {
628 if !array.is_null(i) {
629 found = true;
630 let v = array.value(i);
631 if v < min_val {
632 min_val = v;
633 }
634 }
635 }
636 if found {
637 Ok(min_val)
638 } else {
639 Err(EventTimeError::NullTimestamp { row: 0 })
640 }
641 }
642 }
643 }
644
645 fn extract_ts_micros(&self, array: &TimestampMicrosecondArray) -> Result<i64, EventTimeError> {
647 match self.mode {
648 ExtractionMode::First => {
649 if array.is_null(0) {
650 Err(EventTimeError::NullTimestamp { row: 0 })
651 } else {
652 Ok(array.value(0) / 1000)
653 }
654 }
655 ExtractionMode::Last => {
656 let last = array.len() - 1;
657 if array.is_null(last) {
658 Err(EventTimeError::NullTimestamp { row: last })
659 } else {
660 Ok(array.value(last) / 1000)
661 }
662 }
663 ExtractionMode::Max => {
664 let mut max_val = i64::MIN;
665 let mut found = false;
666 for i in 0..array.len() {
667 if !array.is_null(i) {
668 found = true;
669 let v = array.value(i) / 1000;
670 if v > max_val {
671 max_val = v;
672 }
673 }
674 }
675 if found {
676 Ok(max_val)
677 } else {
678 Err(EventTimeError::NullTimestamp { row: 0 })
679 }
680 }
681 ExtractionMode::Min => {
682 let mut min_val = i64::MAX;
683 let mut found = false;
684 for i in 0..array.len() {
685 if !array.is_null(i) {
686 found = true;
687 let v = array.value(i) / 1000;
688 if v < min_val {
689 min_val = v;
690 }
691 }
692 }
693 if found {
694 Ok(min_val)
695 } else {
696 Err(EventTimeError::NullTimestamp { row: 0 })
697 }
698 }
699 }
700 }
701
702 fn extract_ts_nanos(&self, array: &TimestampNanosecondArray) -> Result<i64, EventTimeError> {
704 match self.mode {
705 ExtractionMode::First => {
706 if array.is_null(0) {
707 Err(EventTimeError::NullTimestamp { row: 0 })
708 } else {
709 Ok(array.value(0) / 1_000_000)
710 }
711 }
712 ExtractionMode::Last => {
713 let last = array.len() - 1;
714 if array.is_null(last) {
715 Err(EventTimeError::NullTimestamp { row: last })
716 } else {
717 Ok(array.value(last) / 1_000_000)
718 }
719 }
720 ExtractionMode::Max => {
721 let mut max_val = i64::MIN;
722 let mut found = false;
723 for i in 0..array.len() {
724 if !array.is_null(i) {
725 found = true;
726 let v = array.value(i) / 1_000_000;
727 if v > max_val {
728 max_val = v;
729 }
730 }
731 }
732 if found {
733 Ok(max_val)
734 } else {
735 Err(EventTimeError::NullTimestamp { row: 0 })
736 }
737 }
738 ExtractionMode::Min => {
739 let mut min_val = i64::MAX;
740 let mut found = false;
741 for i in 0..array.len() {
742 if !array.is_null(i) {
743 found = true;
744 let v = array.value(i) / 1_000_000;
745 if v < min_val {
746 min_val = v;
747 }
748 }
749 }
750 if found {
751 Ok(min_val)
752 } else {
753 Err(EventTimeError::NullTimestamp { row: 0 })
754 }
755 }
756 }
757 }
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763 use arrow::array::{ArrayRef, Int64Builder, StringBuilder};
764 use arrow::datatypes::Field;
765 use std::sync::Arc;
766
767 fn make_int64_batch(name: &str, values: &[Option<i64>]) -> RecordBatch {
768 let mut builder = Int64Builder::new();
769 for v in values {
770 match v {
771 Some(val) => builder.append_value(*val),
772 None => builder.append_null(),
773 }
774 }
775 let array: ArrayRef = Arc::new(builder.finish());
776 let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int64, true)]));
777 RecordBatch::try_new(schema, vec![array]).unwrap()
778 }
779
780 fn make_string_batch(name: &str, values: &[Option<&str>]) -> RecordBatch {
781 let mut builder = StringBuilder::new();
782 for v in values {
783 match v {
784 Some(val) => builder.append_value(*val),
785 None => builder.append_null(),
786 }
787 }
788 let array: ArrayRef = Arc::new(builder.finish());
789 let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Utf8, true)]));
790 RecordBatch::try_new(schema, vec![array]).unwrap()
791 }
792
793 fn make_timestamp_millis_batch(name: &str, values: &[Option<i64>]) -> RecordBatch {
794 use arrow::array::TimestampMillisecondBuilder;
795 let mut builder = TimestampMillisecondBuilder::new();
796 for v in values {
797 match v {
798 Some(val) => builder.append_value(*val),
799 None => builder.append_null(),
800 }
801 }
802 let array: ArrayRef = Arc::new(builder.finish());
803 let schema = Arc::new(Schema::new(vec![Field::new(
804 name,
805 DataType::Timestamp(TimeUnit::Millisecond, None),
806 true,
807 )]));
808 RecordBatch::try_new(schema, vec![array]).unwrap()
809 }
810
811 #[test]
812 fn test_unix_millis_extraction() {
813 let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000)]);
814 let mut extractor =
815 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
816 let ts = extractor.extract(&batch).unwrap();
817 assert_eq!(ts, 1_705_312_200_000);
818 }
819
820 #[test]
821 fn test_unix_seconds_extraction() {
822 let batch = make_int64_batch("event_time", &[Some(1_705_312_200)]);
823 let mut extractor =
824 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixSeconds);
825 let ts = extractor.extract(&batch).unwrap();
826 assert_eq!(ts, 1_705_312_200_000);
827 }
828
829 #[test]
830 fn test_unix_micros_extraction() {
831 let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000_000)]);
832 let mut extractor =
833 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMicros);
834 let ts = extractor.extract(&batch).unwrap();
835 assert_eq!(ts, 1_705_312_200_000);
836 }
837
838 #[test]
839 fn test_unix_nanos_extraction() {
840 let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000_000_000)]);
841 let mut extractor =
842 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixNanos);
843 let ts = extractor.extract(&batch).unwrap();
844 assert_eq!(ts, 1_705_312_200_000);
845 }
846
847 #[test]
848 fn test_iso8601_extraction() {
849 let timestamp_str = "2024-01-15T10:30:00Z";
850 let batch = make_string_batch("event_time", &[Some(timestamp_str)]);
851 let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::Iso8601);
852 let ts = extractor.extract(&batch).unwrap();
853
854 let expected = string_to_datetime(&Utc, timestamp_str)
856 .unwrap()
857 .timestamp_millis();
858 assert_eq!(ts, expected);
859 }
860
861 #[test]
862 fn test_arrow_native_millis_extraction() {
863 let batch = make_timestamp_millis_batch("event_time", &[Some(1_705_312_200_000)]);
864 let mut extractor =
865 EventTimeExtractor::from_column("event_time", TimestampFormat::ArrowNative);
866 let ts = extractor.extract(&batch).unwrap();
867 assert_eq!(ts, 1_705_312_200_000);
868 }
869
870 #[test]
871 fn test_extraction_mode_first() {
872 let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
873 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
874 .with_mode(ExtractionMode::First);
875 let ts = extractor.extract(&batch).unwrap();
876 assert_eq!(ts, 100);
877 }
878
879 #[test]
880 fn test_extraction_mode_last() {
881 let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
882 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
883 .with_mode(ExtractionMode::Last);
884 let ts = extractor.extract(&batch).unwrap();
885 assert_eq!(ts, 150);
886 }
887
888 #[test]
889 fn test_extraction_mode_max() {
890 let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
891 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
892 .with_mode(ExtractionMode::Max);
893 let ts = extractor.extract(&batch).unwrap();
894 assert_eq!(ts, 200);
895 }
896
897 #[test]
898 fn test_extraction_mode_min() {
899 let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
900 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
901 .with_mode(ExtractionMode::Min);
902 let ts = extractor.extract(&batch).unwrap();
903 assert_eq!(ts, 100);
904 }
905
906 #[test]
907 fn test_max_with_nulls() {
908 let batch = make_int64_batch("ts", &[Some(100), None, Some(200), Some(150)]);
909 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
910 .with_mode(ExtractionMode::Max);
911 let ts = extractor.extract(&batch).unwrap();
912 assert_eq!(ts, 200);
913 }
914
915 #[test]
916 fn test_min_with_nulls() {
917 let batch = make_int64_batch("ts", &[Some(100), None, Some(200), Some(50)]);
918 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
919 .with_mode(ExtractionMode::Min);
920 let ts = extractor.extract(&batch).unwrap();
921 assert_eq!(ts, 50);
922 }
923
924 #[test]
925 fn test_column_not_found() {
926 let batch = make_int64_batch("other_column", &[Some(100)]);
927 let mut extractor =
928 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
929 let result = extractor.extract(&batch);
930 assert!(matches!(result, Err(EventTimeError::ColumnNotFound(_))));
931 }
932
933 #[test]
934 fn test_index_out_of_bounds() {
935 let batch = make_int64_batch("ts", &[Some(100)]);
936 let mut extractor = EventTimeExtractor::from_index(5, TimestampFormat::UnixMillis);
937 let result = extractor.extract(&batch);
938 assert!(matches!(
939 result,
940 Err(EventTimeError::IndexOutOfBounds { .. })
941 ));
942 }
943
944 #[test]
945 fn test_incompatible_type() {
946 let batch = make_string_batch("ts", &[Some("not a number")]);
947 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
948 let result = extractor.extract(&batch);
949 assert!(matches!(
950 result,
951 Err(EventTimeError::IncompatibleType { .. })
952 ));
953 }
954
955 #[test]
956 fn test_null_timestamp_first() {
957 let batch = make_int64_batch("ts", &[None, Some(100)]);
958 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
959 .with_mode(ExtractionMode::First);
960 let result = extractor.extract(&batch);
961 assert!(matches!(
962 result,
963 Err(EventTimeError::NullTimestamp { row: 0 })
964 ));
965 }
966
967 #[test]
968 fn test_null_timestamp_last() {
969 let batch = make_int64_batch("ts", &[Some(100), None]);
970 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
971 .with_mode(ExtractionMode::Last);
972 let result = extractor.extract(&batch);
973 assert!(matches!(
974 result,
975 Err(EventTimeError::NullTimestamp { row: 1 })
976 ));
977 }
978
979 #[test]
980 fn test_empty_batch() {
981 let batch = make_int64_batch("ts", &[]);
982 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
983 let result = extractor.extract(&batch);
984 assert!(matches!(result, Err(EventTimeError::EmptyBatch)));
985 }
986
987 #[test]
988 fn test_all_nulls_max() {
989 let batch = make_int64_batch("ts", &[None, None, None]);
990 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
991 .with_mode(ExtractionMode::Max);
992 let result = extractor.extract(&batch);
993 assert!(matches!(result, Err(EventTimeError::NullTimestamp { .. })));
994 }
995
996 #[test]
997 fn test_parse_error_iso8601() {
998 let batch = make_string_batch("ts", &[Some("not-a-timestamp")]);
999 let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::Iso8601);
1000 let result = extractor.extract(&batch);
1001 assert!(matches!(result, Err(EventTimeError::ParseError { .. })));
1002 }
1003
1004 #[test]
1005 fn test_column_index_caching() {
1006 let batch = make_int64_batch("event_time", &[Some(100)]);
1007 let mut extractor =
1008 EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
1009
1010 assert!(extractor.cached_index.is_none());
1012 let _ = extractor.extract(&batch).unwrap();
1013 assert_eq!(extractor.cached_index, Some(0));
1014
1015 let ts = extractor.extract(&batch).unwrap();
1017 assert_eq!(ts, 100);
1018 }
1019
1020 #[test]
1021 fn test_index_extractor_no_lookup() {
1022 let batch = make_int64_batch("ts", &[Some(100)]);
1023 let mut extractor = EventTimeExtractor::from_index(0, TimestampFormat::UnixMillis);
1024
1025 assert_eq!(extractor.cached_index, Some(0));
1027
1028 let ts = extractor.extract(&batch).unwrap();
1029 assert_eq!(ts, 100);
1030 }
1031
1032 #[test]
1033 fn test_validate_schema_success() {
1034 let schema = Schema::new(vec![Field::new("ts", DataType::Int64, true)]);
1035 let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1036 assert!(extractor.validate_schema(&schema).is_ok());
1037 }
1038
1039 #[test]
1040 fn test_validate_schema_column_not_found() {
1041 let schema = Schema::new(vec![Field::new("other", DataType::Int64, true)]);
1042 let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1043 let result = extractor.validate_schema(&schema);
1044 assert!(matches!(result, Err(EventTimeError::ColumnNotFound(_))));
1045 }
1046
1047 #[test]
1048 fn test_validate_schema_incompatible_type() {
1049 let schema = Schema::new(vec![Field::new("ts", DataType::Utf8, true)]);
1050 let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1051 let result = extractor.validate_schema(&schema);
1052 assert!(matches!(
1053 result,
1054 Err(EventTimeError::IncompatibleType { .. })
1055 ));
1056 }
1057
1058 #[test]
1059 fn test_format_accessor() {
1060 let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMicros);
1061 assert_eq!(extractor.format(), TimestampFormat::UnixMicros);
1062 }
1063
1064 #[test]
1065 fn test_mode_accessor() {
1066 let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
1067 .with_mode(ExtractionMode::Max);
1068 assert_eq!(extractor.mode(), ExtractionMode::Max);
1069 }
1070}