Skip to main content

laminar_core/time/
event_time.rs

1//! Event Time Extraction
2//!
3//! This module provides `EventTimeExtractor` for extracting timestamps from Arrow `RecordBatch`
4//! columns. It supports multiple timestamp formats and extraction modes for correct event-time
5//! processing in streaming windows.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use laminar_core::time::{EventTimeExtractor, TimestampFormat};
11//!
12//! let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
13//! let timestamp = extractor.extract(&batch)?;
14//! ```
15
16use std::fmt;
17use std::sync::Arc;
18
19use arrow::array::{
20    Array, Int64Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
21    TimestampNanosecondArray, TimestampSecondArray,
22};
23use arrow::datatypes::{DataType, Schema, TimeUnit};
24use arrow::record_batch::RecordBatch;
25use arrow_cast::parse::string_to_datetime;
26use chrono::Utc;
27
28/// Timestamp format variants for extraction.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum TimestampFormat {
31    /// Unix timestamp in milliseconds (i64)
32    UnixMillis,
33    /// Unix timestamp in seconds (i64) - converted to millis
34    UnixSeconds,
35    /// Unix timestamp in microseconds (i64) - converted to millis
36    UnixMicros,
37    /// Unix timestamp in nanoseconds (i64) - converted to millis
38    UnixNanos,
39    /// ISO 8601 string format (e.g., "2024-01-15T10:30:00Z")
40    Iso8601,
41    /// Auto-detect from Arrow Timestamp type
42    ArrowNative,
43}
44
45impl fmt::Display for TimestampFormat {
46    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            TimestampFormat::UnixMillis => write!(f, "UnixMillis"),
49            TimestampFormat::UnixSeconds => write!(f, "UnixSeconds"),
50            TimestampFormat::UnixMicros => write!(f, "UnixMicros"),
51            TimestampFormat::UnixNanos => write!(f, "UnixNanos"),
52            TimestampFormat::Iso8601 => write!(f, "ISO8601"),
53            TimestampFormat::ArrowNative => write!(f, "ArrowNative"),
54        }
55    }
56}
57
58/// Column identifier for timestamp field.
59#[derive(Debug, Clone)]
60pub enum TimestampField {
61    /// Column name (cached after first lookup)
62    Name(String),
63    /// Column index (most efficient)
64    Index(usize),
65}
66
67/// Multi-row extraction strategy.
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
69pub enum ExtractionMode {
70    /// Extract from first row - O(1), default
71    #[default]
72    First,
73    /// Extract from last row - O(1)
74    Last,
75    /// Extract maximum timestamp - O(n)
76    Max,
77    /// Extract minimum timestamp - O(n)
78    Min,
79}
80
81/// Errors that can occur during event time extraction.
82#[derive(Debug, thiserror::Error)]
83pub enum EventTimeError {
84    /// Column not found in schema
85    #[error("Column not found: {0}")]
86    ColumnNotFound(String),
87
88    /// Column index out of bounds
89    #[error("Column index {index} out of bounds (batch has {num_columns} columns)")]
90    IndexOutOfBounds {
91        /// Requested index
92        index: usize,
93        /// Number of columns in batch
94        num_columns: usize,
95    },
96
97    /// Incompatible column type for format
98    #[error("Incompatible type for format {format}: expected {expected}, found {found}")]
99    IncompatibleType {
100        /// Requested format
101        format: TimestampFormat,
102        /// Expected type
103        expected: String,
104        /// Actual type found
105        found: String,
106    },
107
108    /// Failed to parse timestamp value
109    #[error("Failed to parse timestamp '{value}': {reason}")]
110    ParseError {
111        /// The value that failed to parse
112        value: String,
113        /// Reason for failure
114        reason: String,
115    },
116
117    /// Null timestamp encountered
118    #[error("Null timestamp at row {row}")]
119    NullTimestamp {
120        /// Row index with null value
121        row: usize,
122    },
123
124    /// Empty batch provided
125    #[error("Cannot extract timestamp from empty batch")]
126    EmptyBatch,
127}
128
129/// Extracts event timestamps from Arrow `RecordBatch` columns.
130///
131/// Supports multiple timestamp formats and extraction modes for multi-row batches.
132/// Uses internal caching for column index lookup to optimize repeated extractions.
133#[derive(Debug)]
134pub struct EventTimeExtractor {
135    field: TimestampField,
136    format: TimestampFormat,
137    mode: ExtractionMode,
138    cached_index: Option<usize>,
139}
140
141impl EventTimeExtractor {
142    /// Creates an extractor that looks up a column by name.
143    ///
144    /// The column index is cached after the first extraction for efficiency.
145    #[must_use]
146    pub fn from_column(name: &str, format: TimestampFormat) -> Self {
147        Self {
148            field: TimestampField::Name(name.to_string()),
149            format,
150            mode: ExtractionMode::default(),
151            cached_index: None,
152        }
153    }
154
155    /// Creates an extractor that uses a column by index.
156    ///
157    /// This is the most efficient option as no lookup is required.
158    #[must_use]
159    pub fn from_index(index: usize, format: TimestampFormat) -> Self {
160        Self {
161            field: TimestampField::Index(index),
162            format,
163            mode: ExtractionMode::default(),
164            cached_index: Some(index),
165        }
166    }
167
168    /// Sets the extraction mode for multi-row batches.
169    #[must_use]
170    pub fn with_mode(mut self, mode: ExtractionMode) -> Self {
171        self.mode = mode;
172        self
173    }
174
175    /// Gets the configured format.
176    #[must_use]
177    pub fn format(&self) -> TimestampFormat {
178        self.format
179    }
180
181    /// Gets the configured mode.
182    #[must_use]
183    pub fn mode(&self) -> ExtractionMode {
184        self.mode
185    }
186
187    /// Validates that the schema contains a compatible timestamp column.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the column is not found or has an incompatible type.
192    pub fn validate_schema(&self, schema: &Schema) -> Result<(), EventTimeError> {
193        let (index, data_type) = self.resolve_column(schema)?;
194        self.validate_type(data_type, index)?;
195        Ok(())
196    }
197
198    /// Extracts the event timestamp from a batch.
199    ///
200    /// Returns the timestamp in milliseconds since Unix epoch.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if:
205    /// - The batch is empty
206    /// - The column is not found
207    /// - The column type is incompatible with the format
208    /// - The timestamp value is null
209    /// - The timestamp cannot be parsed
210    pub fn extract(&mut self, batch: &RecordBatch) -> Result<i64, EventTimeError> {
211        if batch.num_rows() == 0 {
212            return Err(EventTimeError::EmptyBatch);
213        }
214
215        let index = self.get_column_index(batch.schema().as_ref())?;
216        let column = batch.column(index);
217
218        self.extract_from_column(column)
219    }
220
221    /// Resolves the column index, using cache if available.
222    fn get_column_index(&mut self, schema: &Schema) -> Result<usize, EventTimeError> {
223        if let Some(idx) = self.cached_index {
224            // Validate the cached index is still valid
225            if idx < schema.fields().len() {
226                return Ok(idx);
227            }
228        }
229
230        let (index, _) = self.resolve_column(schema)?;
231        self.cached_index = Some(index);
232        Ok(index)
233    }
234
235    /// Resolves column name/index to actual index and data type.
236    fn resolve_column<'a>(
237        &self,
238        schema: &'a Schema,
239    ) -> Result<(usize, &'a DataType), EventTimeError> {
240        match &self.field {
241            TimestampField::Name(name) => {
242                let index = schema
243                    .index_of(name)
244                    .map_err(|_| EventTimeError::ColumnNotFound(name.clone()))?;
245                let data_type = schema.field(index).data_type();
246                Ok((index, data_type))
247            }
248            TimestampField::Index(index) => {
249                if *index >= schema.fields().len() {
250                    return Err(EventTimeError::IndexOutOfBounds {
251                        index: *index,
252                        num_columns: schema.fields().len(),
253                    });
254                }
255                let data_type = schema.field(*index).data_type();
256                Ok((*index, data_type))
257            }
258        }
259    }
260
261    /// Validates that the data type is compatible with the format.
262    fn validate_type(&self, data_type: &DataType, _index: usize) -> Result<(), EventTimeError> {
263        match self.format {
264            TimestampFormat::UnixMillis
265            | TimestampFormat::UnixSeconds
266            | TimestampFormat::UnixMicros
267            | TimestampFormat::UnixNanos => {
268                if !matches!(data_type, DataType::Int64) {
269                    return Err(EventTimeError::IncompatibleType {
270                        format: self.format,
271                        expected: "Int64".to_string(),
272                        found: format!("{data_type:?}"),
273                    });
274                }
275            }
276            TimestampFormat::Iso8601 => {
277                if !matches!(data_type, DataType::Utf8 | DataType::LargeUtf8) {
278                    return Err(EventTimeError::IncompatibleType {
279                        format: self.format,
280                        expected: "Utf8 or LargeUtf8".to_string(),
281                        found: format!("{data_type:?}"),
282                    });
283                }
284            }
285            TimestampFormat::ArrowNative => {
286                if !matches!(data_type, DataType::Timestamp(_, _)) {
287                    return Err(EventTimeError::IncompatibleType {
288                        format: self.format,
289                        expected: "Timestamp".to_string(),
290                        found: format!("{data_type:?}"),
291                    });
292                }
293            }
294        }
295        Ok(())
296    }
297
298    /// Extracts timestamp from a column array.
299    fn extract_from_column(&self, column: &Arc<dyn Array>) -> Result<i64, EventTimeError> {
300        match self.format {
301            TimestampFormat::UnixMillis => {
302                let array = column
303                    .as_any()
304                    .downcast_ref::<Int64Array>()
305                    .ok_or_else(|| EventTimeError::IncompatibleType {
306                        format: self.format,
307                        expected: "Int64".to_string(),
308                        found: format!("{:?}", column.data_type()),
309                    })?;
310                self.extract_i64(array, |v| v)
311            }
312            TimestampFormat::UnixSeconds => {
313                let array = column
314                    .as_any()
315                    .downcast_ref::<Int64Array>()
316                    .ok_or_else(|| EventTimeError::IncompatibleType {
317                        format: self.format,
318                        expected: "Int64".to_string(),
319                        found: format!("{:?}", column.data_type()),
320                    })?;
321                self.extract_i64(array, |v| v.saturating_mul(1000))
322            }
323            TimestampFormat::UnixMicros => {
324                let array = column
325                    .as_any()
326                    .downcast_ref::<Int64Array>()
327                    .ok_or_else(|| EventTimeError::IncompatibleType {
328                        format: self.format,
329                        expected: "Int64".to_string(),
330                        found: format!("{:?}", column.data_type()),
331                    })?;
332                self.extract_i64(array, |v| v / 1000)
333            }
334            TimestampFormat::UnixNanos => {
335                let array = column
336                    .as_any()
337                    .downcast_ref::<Int64Array>()
338                    .ok_or_else(|| EventTimeError::IncompatibleType {
339                        format: self.format,
340                        expected: "Int64".to_string(),
341                        found: format!("{:?}", column.data_type()),
342                    })?;
343                self.extract_i64(array, |v| v / 1_000_000)
344            }
345            TimestampFormat::Iso8601 => {
346                let array = column
347                    .as_any()
348                    .downcast_ref::<StringArray>()
349                    .ok_or_else(|| EventTimeError::IncompatibleType {
350                        format: self.format,
351                        expected: "Utf8".to_string(),
352                        found: format!("{:?}", column.data_type()),
353                    })?;
354                self.extract_iso8601(array)
355            }
356            TimestampFormat::ArrowNative => self.extract_arrow_timestamp(column),
357        }
358    }
359
360    /// Extracts from Int64 array with conversion function.
361    fn extract_i64<F>(&self, array: &Int64Array, convert: F) -> Result<i64, EventTimeError>
362    where
363        F: Fn(i64) -> i64,
364    {
365        match self.mode {
366            ExtractionMode::First => {
367                if array.is_null(0) {
368                    Err(EventTimeError::NullTimestamp { row: 0 })
369                } else {
370                    Ok(convert(array.value(0)))
371                }
372            }
373            ExtractionMode::Last => {
374                let last = array.len() - 1;
375                if array.is_null(last) {
376                    Err(EventTimeError::NullTimestamp { row: last })
377                } else {
378                    Ok(convert(array.value(last)))
379                }
380            }
381            ExtractionMode::Max => {
382                let mut max_val = i64::MIN;
383                let mut found = false;
384                for i in 0..array.len() {
385                    if !array.is_null(i) {
386                        found = true;
387                        let v = convert(array.value(i));
388                        if v > max_val {
389                            max_val = v;
390                        }
391                    }
392                }
393                if found {
394                    Ok(max_val)
395                } else {
396                    Err(EventTimeError::NullTimestamp { row: 0 })
397                }
398            }
399            ExtractionMode::Min => {
400                let mut min_val = i64::MAX;
401                let mut found = false;
402                for i in 0..array.len() {
403                    if !array.is_null(i) {
404                        found = true;
405                        let v = convert(array.value(i));
406                        if v < min_val {
407                            min_val = v;
408                        }
409                    }
410                }
411                if found {
412                    Ok(min_val)
413                } else {
414                    Err(EventTimeError::NullTimestamp { row: 0 })
415                }
416            }
417        }
418    }
419
420    /// Extracts from ISO 8601 string array.
421    fn extract_iso8601(&self, array: &StringArray) -> Result<i64, EventTimeError> {
422        let parse_value = |idx: usize| -> Result<i64, EventTimeError> {
423            if array.is_null(idx) {
424                return Err(EventTimeError::NullTimestamp { row: idx });
425            }
426            let s = array.value(idx);
427            let dt = string_to_datetime(&Utc, s).map_err(|e| EventTimeError::ParseError {
428                value: s.to_string(),
429                reason: e.to_string(),
430            })?;
431            Ok(dt.timestamp_millis())
432        };
433
434        match self.mode {
435            ExtractionMode::First => parse_value(0),
436            ExtractionMode::Last => parse_value(array.len() - 1),
437            ExtractionMode::Max => {
438                let mut max_val = i64::MIN;
439                let mut found = false;
440                for i in 0..array.len() {
441                    if !array.is_null(i) {
442                        let v = parse_value(i)?;
443                        found = true;
444                        if v > max_val {
445                            max_val = v;
446                        }
447                    }
448                }
449                if found {
450                    Ok(max_val)
451                } else {
452                    Err(EventTimeError::NullTimestamp { row: 0 })
453                }
454            }
455            ExtractionMode::Min => {
456                let mut min_val = i64::MAX;
457                let mut found = false;
458                for i in 0..array.len() {
459                    if !array.is_null(i) {
460                        let v = parse_value(i)?;
461                        found = true;
462                        if v < min_val {
463                            min_val = v;
464                        }
465                    }
466                }
467                if found {
468                    Ok(min_val)
469                } else {
470                    Err(EventTimeError::NullTimestamp { row: 0 })
471                }
472            }
473        }
474    }
475
476    /// Extracts from Arrow native Timestamp array.
477    fn extract_arrow_timestamp(&self, column: &Arc<dyn Array>) -> Result<i64, EventTimeError> {
478        match column.data_type() {
479            DataType::Timestamp(TimeUnit::Second, _) => {
480                let array = column
481                    .as_any()
482                    .downcast_ref::<TimestampSecondArray>()
483                    .ok_or_else(|| EventTimeError::IncompatibleType {
484                        format: self.format,
485                        expected: "TimestampSecond".to_string(),
486                        found: format!("{:?}", column.data_type()),
487                    })?;
488                self.extract_ts_seconds(array)
489            }
490            DataType::Timestamp(TimeUnit::Millisecond, _) => {
491                let array = column
492                    .as_any()
493                    .downcast_ref::<TimestampMillisecondArray>()
494                    .ok_or_else(|| EventTimeError::IncompatibleType {
495                        format: self.format,
496                        expected: "TimestampMillisecond".to_string(),
497                        found: format!("{:?}", column.data_type()),
498                    })?;
499                self.extract_ts_millis(array)
500            }
501            DataType::Timestamp(TimeUnit::Microsecond, _) => {
502                let array = column
503                    .as_any()
504                    .downcast_ref::<TimestampMicrosecondArray>()
505                    .ok_or_else(|| EventTimeError::IncompatibleType {
506                        format: self.format,
507                        expected: "TimestampMicrosecond".to_string(),
508                        found: format!("{:?}", column.data_type()),
509                    })?;
510                self.extract_ts_micros(array)
511            }
512            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
513                let array = column
514                    .as_any()
515                    .downcast_ref::<TimestampNanosecondArray>()
516                    .ok_or_else(|| EventTimeError::IncompatibleType {
517                        format: self.format,
518                        expected: "TimestampNanosecond".to_string(),
519                        found: format!("{:?}", column.data_type()),
520                    })?;
521                self.extract_ts_nanos(array)
522            }
523            _ => Err(EventTimeError::IncompatibleType {
524                format: self.format,
525                expected: "Timestamp".to_string(),
526                found: format!("{:?}", column.data_type()),
527            }),
528        }
529    }
530
531    /// Extracts from `TimestampSecondArray`.
532    fn extract_ts_seconds(&self, array: &TimestampSecondArray) -> Result<i64, EventTimeError> {
533        match self.mode {
534            ExtractionMode::First => {
535                if array.is_null(0) {
536                    Err(EventTimeError::NullTimestamp { row: 0 })
537                } else {
538                    Ok(array.value(0).saturating_mul(1000))
539                }
540            }
541            ExtractionMode::Last => {
542                let last = array.len() - 1;
543                if array.is_null(last) {
544                    Err(EventTimeError::NullTimestamp { row: last })
545                } else {
546                    Ok(array.value(last).saturating_mul(1000))
547                }
548            }
549            ExtractionMode::Max => {
550                let mut max_val = i64::MIN;
551                let mut found = false;
552                for i in 0..array.len() {
553                    if !array.is_null(i) {
554                        found = true;
555                        let v = array.value(i).saturating_mul(1000);
556                        if v > max_val {
557                            max_val = v;
558                        }
559                    }
560                }
561                if found {
562                    Ok(max_val)
563                } else {
564                    Err(EventTimeError::NullTimestamp { row: 0 })
565                }
566            }
567            ExtractionMode::Min => {
568                let mut min_val = i64::MAX;
569                let mut found = false;
570                for i in 0..array.len() {
571                    if !array.is_null(i) {
572                        found = true;
573                        let v = array.value(i).saturating_mul(1000);
574                        if v < min_val {
575                            min_val = v;
576                        }
577                    }
578                }
579                if found {
580                    Ok(min_val)
581                } else {
582                    Err(EventTimeError::NullTimestamp { row: 0 })
583                }
584            }
585        }
586    }
587
588    /// Extracts from `TimestampMillisecondArray`.
589    fn extract_ts_millis(&self, array: &TimestampMillisecondArray) -> Result<i64, EventTimeError> {
590        match self.mode {
591            ExtractionMode::First => {
592                if array.is_null(0) {
593                    Err(EventTimeError::NullTimestamp { row: 0 })
594                } else {
595                    Ok(array.value(0))
596                }
597            }
598            ExtractionMode::Last => {
599                let last = array.len() - 1;
600                if array.is_null(last) {
601                    Err(EventTimeError::NullTimestamp { row: last })
602                } else {
603                    Ok(array.value(last))
604                }
605            }
606            ExtractionMode::Max => {
607                let mut max_val = i64::MIN;
608                let mut found = false;
609                for i in 0..array.len() {
610                    if !array.is_null(i) {
611                        found = true;
612                        let v = array.value(i);
613                        if v > max_val {
614                            max_val = v;
615                        }
616                    }
617                }
618                if found {
619                    Ok(max_val)
620                } else {
621                    Err(EventTimeError::NullTimestamp { row: 0 })
622                }
623            }
624            ExtractionMode::Min => {
625                let mut min_val = i64::MAX;
626                let mut found = false;
627                for i in 0..array.len() {
628                    if !array.is_null(i) {
629                        found = true;
630                        let v = array.value(i);
631                        if v < min_val {
632                            min_val = v;
633                        }
634                    }
635                }
636                if found {
637                    Ok(min_val)
638                } else {
639                    Err(EventTimeError::NullTimestamp { row: 0 })
640                }
641            }
642        }
643    }
644
645    /// Extracts from `TimestampMicrosecondArray`.
646    fn extract_ts_micros(&self, array: &TimestampMicrosecondArray) -> Result<i64, EventTimeError> {
647        match self.mode {
648            ExtractionMode::First => {
649                if array.is_null(0) {
650                    Err(EventTimeError::NullTimestamp { row: 0 })
651                } else {
652                    Ok(array.value(0) / 1000)
653                }
654            }
655            ExtractionMode::Last => {
656                let last = array.len() - 1;
657                if array.is_null(last) {
658                    Err(EventTimeError::NullTimestamp { row: last })
659                } else {
660                    Ok(array.value(last) / 1000)
661                }
662            }
663            ExtractionMode::Max => {
664                let mut max_val = i64::MIN;
665                let mut found = false;
666                for i in 0..array.len() {
667                    if !array.is_null(i) {
668                        found = true;
669                        let v = array.value(i) / 1000;
670                        if v > max_val {
671                            max_val = v;
672                        }
673                    }
674                }
675                if found {
676                    Ok(max_val)
677                } else {
678                    Err(EventTimeError::NullTimestamp { row: 0 })
679                }
680            }
681            ExtractionMode::Min => {
682                let mut min_val = i64::MAX;
683                let mut found = false;
684                for i in 0..array.len() {
685                    if !array.is_null(i) {
686                        found = true;
687                        let v = array.value(i) / 1000;
688                        if v < min_val {
689                            min_val = v;
690                        }
691                    }
692                }
693                if found {
694                    Ok(min_val)
695                } else {
696                    Err(EventTimeError::NullTimestamp { row: 0 })
697                }
698            }
699        }
700    }
701
702    /// Extracts from `TimestampNanosecondArray`.
703    fn extract_ts_nanos(&self, array: &TimestampNanosecondArray) -> Result<i64, EventTimeError> {
704        match self.mode {
705            ExtractionMode::First => {
706                if array.is_null(0) {
707                    Err(EventTimeError::NullTimestamp { row: 0 })
708                } else {
709                    Ok(array.value(0) / 1_000_000)
710                }
711            }
712            ExtractionMode::Last => {
713                let last = array.len() - 1;
714                if array.is_null(last) {
715                    Err(EventTimeError::NullTimestamp { row: last })
716                } else {
717                    Ok(array.value(last) / 1_000_000)
718                }
719            }
720            ExtractionMode::Max => {
721                let mut max_val = i64::MIN;
722                let mut found = false;
723                for i in 0..array.len() {
724                    if !array.is_null(i) {
725                        found = true;
726                        let v = array.value(i) / 1_000_000;
727                        if v > max_val {
728                            max_val = v;
729                        }
730                    }
731                }
732                if found {
733                    Ok(max_val)
734                } else {
735                    Err(EventTimeError::NullTimestamp { row: 0 })
736                }
737            }
738            ExtractionMode::Min => {
739                let mut min_val = i64::MAX;
740                let mut found = false;
741                for i in 0..array.len() {
742                    if !array.is_null(i) {
743                        found = true;
744                        let v = array.value(i) / 1_000_000;
745                        if v < min_val {
746                            min_val = v;
747                        }
748                    }
749                }
750                if found {
751                    Ok(min_val)
752                } else {
753                    Err(EventTimeError::NullTimestamp { row: 0 })
754                }
755            }
756        }
757    }
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763    use arrow::array::{ArrayRef, Int64Builder, StringBuilder};
764    use arrow::datatypes::Field;
765    use std::sync::Arc;
766
767    fn make_int64_batch(name: &str, values: &[Option<i64>]) -> RecordBatch {
768        let mut builder = Int64Builder::new();
769        for v in values {
770            match v {
771                Some(val) => builder.append_value(*val),
772                None => builder.append_null(),
773            }
774        }
775        let array: ArrayRef = Arc::new(builder.finish());
776        let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int64, true)]));
777        RecordBatch::try_new(schema, vec![array]).unwrap()
778    }
779
780    fn make_string_batch(name: &str, values: &[Option<&str>]) -> RecordBatch {
781        let mut builder = StringBuilder::new();
782        for v in values {
783            match v {
784                Some(val) => builder.append_value(*val),
785                None => builder.append_null(),
786            }
787        }
788        let array: ArrayRef = Arc::new(builder.finish());
789        let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Utf8, true)]));
790        RecordBatch::try_new(schema, vec![array]).unwrap()
791    }
792
793    fn make_timestamp_millis_batch(name: &str, values: &[Option<i64>]) -> RecordBatch {
794        use arrow::array::TimestampMillisecondBuilder;
795        let mut builder = TimestampMillisecondBuilder::new();
796        for v in values {
797            match v {
798                Some(val) => builder.append_value(*val),
799                None => builder.append_null(),
800            }
801        }
802        let array: ArrayRef = Arc::new(builder.finish());
803        let schema = Arc::new(Schema::new(vec![Field::new(
804            name,
805            DataType::Timestamp(TimeUnit::Millisecond, None),
806            true,
807        )]));
808        RecordBatch::try_new(schema, vec![array]).unwrap()
809    }
810
811    #[test]
812    fn test_unix_millis_extraction() {
813        let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000)]);
814        let mut extractor =
815            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
816        let ts = extractor.extract(&batch).unwrap();
817        assert_eq!(ts, 1_705_312_200_000);
818    }
819
820    #[test]
821    fn test_unix_seconds_extraction() {
822        let batch = make_int64_batch("event_time", &[Some(1_705_312_200)]);
823        let mut extractor =
824            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixSeconds);
825        let ts = extractor.extract(&batch).unwrap();
826        assert_eq!(ts, 1_705_312_200_000);
827    }
828
829    #[test]
830    fn test_unix_micros_extraction() {
831        let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000_000)]);
832        let mut extractor =
833            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMicros);
834        let ts = extractor.extract(&batch).unwrap();
835        assert_eq!(ts, 1_705_312_200_000);
836    }
837
838    #[test]
839    fn test_unix_nanos_extraction() {
840        let batch = make_int64_batch("event_time", &[Some(1_705_312_200_000_000_000)]);
841        let mut extractor =
842            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixNanos);
843        let ts = extractor.extract(&batch).unwrap();
844        assert_eq!(ts, 1_705_312_200_000);
845    }
846
847    #[test]
848    fn test_iso8601_extraction() {
849        let timestamp_str = "2024-01-15T10:30:00Z";
850        let batch = make_string_batch("event_time", &[Some(timestamp_str)]);
851        let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::Iso8601);
852        let ts = extractor.extract(&batch).unwrap();
853
854        // Compute expected value using the same parsing method for consistency
855        let expected = string_to_datetime(&Utc, timestamp_str)
856            .unwrap()
857            .timestamp_millis();
858        assert_eq!(ts, expected);
859    }
860
861    #[test]
862    fn test_arrow_native_millis_extraction() {
863        let batch = make_timestamp_millis_batch("event_time", &[Some(1_705_312_200_000)]);
864        let mut extractor =
865            EventTimeExtractor::from_column("event_time", TimestampFormat::ArrowNative);
866        let ts = extractor.extract(&batch).unwrap();
867        assert_eq!(ts, 1_705_312_200_000);
868    }
869
870    #[test]
871    fn test_extraction_mode_first() {
872        let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
873        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
874            .with_mode(ExtractionMode::First);
875        let ts = extractor.extract(&batch).unwrap();
876        assert_eq!(ts, 100);
877    }
878
879    #[test]
880    fn test_extraction_mode_last() {
881        let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
882        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
883            .with_mode(ExtractionMode::Last);
884        let ts = extractor.extract(&batch).unwrap();
885        assert_eq!(ts, 150);
886    }
887
888    #[test]
889    fn test_extraction_mode_max() {
890        let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
891        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
892            .with_mode(ExtractionMode::Max);
893        let ts = extractor.extract(&batch).unwrap();
894        assert_eq!(ts, 200);
895    }
896
897    #[test]
898    fn test_extraction_mode_min() {
899        let batch = make_int64_batch("ts", &[Some(100), Some(200), Some(150)]);
900        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
901            .with_mode(ExtractionMode::Min);
902        let ts = extractor.extract(&batch).unwrap();
903        assert_eq!(ts, 100);
904    }
905
906    #[test]
907    fn test_max_with_nulls() {
908        let batch = make_int64_batch("ts", &[Some(100), None, Some(200), Some(150)]);
909        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
910            .with_mode(ExtractionMode::Max);
911        let ts = extractor.extract(&batch).unwrap();
912        assert_eq!(ts, 200);
913    }
914
915    #[test]
916    fn test_min_with_nulls() {
917        let batch = make_int64_batch("ts", &[Some(100), None, Some(200), Some(50)]);
918        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
919            .with_mode(ExtractionMode::Min);
920        let ts = extractor.extract(&batch).unwrap();
921        assert_eq!(ts, 50);
922    }
923
924    #[test]
925    fn test_column_not_found() {
926        let batch = make_int64_batch("other_column", &[Some(100)]);
927        let mut extractor =
928            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
929        let result = extractor.extract(&batch);
930        assert!(matches!(result, Err(EventTimeError::ColumnNotFound(_))));
931    }
932
933    #[test]
934    fn test_index_out_of_bounds() {
935        let batch = make_int64_batch("ts", &[Some(100)]);
936        let mut extractor = EventTimeExtractor::from_index(5, TimestampFormat::UnixMillis);
937        let result = extractor.extract(&batch);
938        assert!(matches!(
939            result,
940            Err(EventTimeError::IndexOutOfBounds { .. })
941        ));
942    }
943
944    #[test]
945    fn test_incompatible_type() {
946        let batch = make_string_batch("ts", &[Some("not a number")]);
947        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
948        let result = extractor.extract(&batch);
949        assert!(matches!(
950            result,
951            Err(EventTimeError::IncompatibleType { .. })
952        ));
953    }
954
955    #[test]
956    fn test_null_timestamp_first() {
957        let batch = make_int64_batch("ts", &[None, Some(100)]);
958        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
959            .with_mode(ExtractionMode::First);
960        let result = extractor.extract(&batch);
961        assert!(matches!(
962            result,
963            Err(EventTimeError::NullTimestamp { row: 0 })
964        ));
965    }
966
967    #[test]
968    fn test_null_timestamp_last() {
969        let batch = make_int64_batch("ts", &[Some(100), None]);
970        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
971            .with_mode(ExtractionMode::Last);
972        let result = extractor.extract(&batch);
973        assert!(matches!(
974            result,
975            Err(EventTimeError::NullTimestamp { row: 1 })
976        ));
977    }
978
979    #[test]
980    fn test_empty_batch() {
981        let batch = make_int64_batch("ts", &[]);
982        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
983        let result = extractor.extract(&batch);
984        assert!(matches!(result, Err(EventTimeError::EmptyBatch)));
985    }
986
987    #[test]
988    fn test_all_nulls_max() {
989        let batch = make_int64_batch("ts", &[None, None, None]);
990        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
991            .with_mode(ExtractionMode::Max);
992        let result = extractor.extract(&batch);
993        assert!(matches!(result, Err(EventTimeError::NullTimestamp { .. })));
994    }
995
996    #[test]
997    fn test_parse_error_iso8601() {
998        let batch = make_string_batch("ts", &[Some("not-a-timestamp")]);
999        let mut extractor = EventTimeExtractor::from_column("ts", TimestampFormat::Iso8601);
1000        let result = extractor.extract(&batch);
1001        assert!(matches!(result, Err(EventTimeError::ParseError { .. })));
1002    }
1003
1004    #[test]
1005    fn test_column_index_caching() {
1006        let batch = make_int64_batch("event_time", &[Some(100)]);
1007        let mut extractor =
1008            EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
1009
1010        // First extraction should cache the index
1011        assert!(extractor.cached_index.is_none());
1012        let _ = extractor.extract(&batch).unwrap();
1013        assert_eq!(extractor.cached_index, Some(0));
1014
1015        // Second extraction should use cached index
1016        let ts = extractor.extract(&batch).unwrap();
1017        assert_eq!(ts, 100);
1018    }
1019
1020    #[test]
1021    fn test_index_extractor_no_lookup() {
1022        let batch = make_int64_batch("ts", &[Some(100)]);
1023        let mut extractor = EventTimeExtractor::from_index(0, TimestampFormat::UnixMillis);
1024
1025        // Index extractor should have cached index from start
1026        assert_eq!(extractor.cached_index, Some(0));
1027
1028        let ts = extractor.extract(&batch).unwrap();
1029        assert_eq!(ts, 100);
1030    }
1031
1032    #[test]
1033    fn test_validate_schema_success() {
1034        let schema = Schema::new(vec![Field::new("ts", DataType::Int64, true)]);
1035        let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1036        assert!(extractor.validate_schema(&schema).is_ok());
1037    }
1038
1039    #[test]
1040    fn test_validate_schema_column_not_found() {
1041        let schema = Schema::new(vec![Field::new("other", DataType::Int64, true)]);
1042        let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1043        let result = extractor.validate_schema(&schema);
1044        assert!(matches!(result, Err(EventTimeError::ColumnNotFound(_))));
1045    }
1046
1047    #[test]
1048    fn test_validate_schema_incompatible_type() {
1049        let schema = Schema::new(vec![Field::new("ts", DataType::Utf8, true)]);
1050        let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis);
1051        let result = extractor.validate_schema(&schema);
1052        assert!(matches!(
1053            result,
1054            Err(EventTimeError::IncompatibleType { .. })
1055        ));
1056    }
1057
1058    #[test]
1059    fn test_format_accessor() {
1060        let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMicros);
1061        assert_eq!(extractor.format(), TimestampFormat::UnixMicros);
1062    }
1063
1064    #[test]
1065    fn test_mode_accessor() {
1066        let extractor = EventTimeExtractor::from_column("ts", TimestampFormat::UnixMillis)
1067            .with_mode(ExtractionMode::Max);
1068        assert_eq!(extractor.mode(), ExtractionMode::Max);
1069    }
1070}