Skip to main content

timeseries_table_core/table/
scan.rs

1//! Range scan implementation for `TimeSeriesTable`.
2//!
3//! This module wires the public `scan_range` entry point to the underlying
4//! segment metadata and Parquet readers:
5//! - Pick candidate segments whose `[ts_min, ts_max]` intersects the requested
6//!   half-open window `[ts_start, ts_end)`.
7//! - Stream those segments in timestamp order, reading Parquet bytes from
8//!   storage and building `RecordBatch` readers over in-memory buffers.
9//! - Filter each batch by the time column with half-open semantics, converting
10//!   the requested bounds to the column’s Arrow timestamp unit while preserving
11//!   timezone metadata.
12//!
13//! The filtering path uses Arrow’s scalar comparison kernels to avoid
14//! allocating full-length bound arrays, and treats null timestamp values as
15//! “drop row” via `filter_record_batch`. The implementation assumes v0.1
16//! invariants (non-overlapping segments) so chronological ordering is a simple
17//! sort by `ts_min`.
18use std::path::Path;
19
20use arrow::array::Scalar;
21use arrow::array::{
22    Array, RecordBatchReader, TimestampMicrosecondArray, TimestampMillisecondArray,
23    TimestampNanosecondArray, TimestampSecondArray,
24};
25use arrow::compute::filter_record_batch;
26use arrow::compute::kernels::{boolean as boolean_kernels, cmp as cmp_kernels};
27use arrow::datatypes::{Field, TimeUnit};
28use arrow::{array::RecordBatch, datatypes::DataType};
29use bytes::Bytes;
30use chrono::{DateTime, Utc};
31use futures::{StreamExt, TryStreamExt};
32use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
33use snafu::prelude::*;
34
35use crate::metadata::segments::SegmentMeta;
36use crate::storage::{self, TableLocation};
37use crate::transaction_log::TableState;
38
39use super::error::{InvalidRangeSnafu, StorageSnafu, TableError};
40use super::{TimeSeriesScan, TimeSeriesTable};
41use crate::metadata::segments::cmp_segment_meta_by_time;
42
43fn segments_for_range(
44    state: &TableState,
45    ts_start: DateTime<Utc>,
46    ts_end: DateTime<Utc>,
47) -> Vec<SegmentMeta> {
48    state
49        .segments
50        .values()
51        .filter(|seg| {
52            // half-open query [ts_start, ts_end)
53            // intersection with segment's [ts_min, ts_max] (closed) is:
54            // seg.ts_max >= ts_start && seg.ts_min < ts_end
55            seg.ts_max >= ts_start && seg.ts_min < ts_end
56        })
57        .cloned()
58        .collect()
59}
60
61/// Helper macro to filter a `RecordBatch` by a timestamp column for a
62/// half-open time range `[start, end)`.
63///
64/// This macro is used by `read_segment_range` for all supported timestamp
65/// units (`second`, `millisecond`, `microsecond`, `nanosecond`) and
66/// encapsulates three non-obvious choices:
67///
68/// 1. **Half-open semantics**:
69///    Rows are kept iff `start_bound <= ts < end_bound`, where
70///    `start_bound`/`end_bound` are already converted to the same integer
71///    time unit as the column (via `to_bounds_i64`).
72///
73/// 2. **Timezone preservation**:
74///    Arrow timestamps carry an optional timezone in their `DataType`
75///    (`Timestamp(unit, Option<tz>)`). The comparison kernels require the
76///    types (including timezone) of both operands to match. To avoid
77///    mismatches, we:
78///       - read the timezone from the actual column’s `DataType`,
79///       - build 1-element timestamp arrays for `start` and `end` with
80///         the same unit and timezone,
81///       - wrap those arrays as `Scalar<Timestamp…Array>`.
82///
83///    This ensures `ts_arr` and the scalar bounds have identical
84///    `DataType`, so the Arrow `gt_eq` / `lt` kernels accept them.
85///
86/// 3. **Scalar-based vectorization (no full-length bound arrays)**:
87///    Arrow’s compute kernels accept `Datum` operands, which can be
88///    either arrays or scalars. When one side is a scalar, the kernel
89///    *broadcasts* the single value across the length of the array without
90///    materializing a repeated column. Using `Scalar::new` over a
91///    1-element array gives us:
92///       - vectorized, element-wise comparison over the whole batch, and
93///       - minimal extra allocation (two tiny 1-element arrays),
94///         instead of allocating full-length `[start; len]` / `[end; len]`
95///         arrays.
96///
97/// The resulting `BooleanArray` mask is then passed to
98/// `filter_record_batch`, which drops nulls in the mask (null => “do not
99/// keep row”), matching the intended `null -> false` semantics for the
100/// time column.
101///
102/// This macro returns `Result<(), TableError>` so callers can use `?` for
103/// error propagation inside the match over different timestamp units.
104macro_rules! filter_ts_batch {
105    ($array_ty: ty,
106    $batch:expr,
107    $ts_idx:expr,
108    $start_bound:expr,
109    $end_bound:expr,
110    $time_col:expr,
111    $ts_field:expr,
112    $out:expr
113) => {{
114        // 1) Downcast the column to the concrete timestamp array type
115        let col = $batch.column($ts_idx);
116        let ts_arr = col.as_any().downcast_ref::<$array_ty>().ok_or_else(|| {
117            TableError::UnsupportedTimeType {
118                column: $time_col.to_string(),
119                datatype: $ts_field.data_type().clone(),
120            }
121        })?;
122
123        // 2) Extract timezone from the array's DataType to ensure that our scalar matches and comparisons are compatible
124        let tz_opt = match ts_arr.data_type() {
125            DataType::Timestamp(_, tz_opt) => tz_opt.clone(),
126            _ => None,
127        };
128
129        // 3) Build *1-element* arrays for the bounds, with matching timezone,
130        //    then wrap them as Scalars. Arrow's comparison kernels operate on
131        //    `Datum` (array or scalar) and will broadcast these scalar bounds
132        //    across the whole `ts_arr` without allocating full-length repeated
133        //    arrays.
134        let start_arr = <$array_ty>::from(vec![$start_bound]).with_timezone_opt(tz_opt.clone());
135        let end_arr = <$array_ty>::from(vec![$end_bound]).with_timezone_opt(tz_opt);
136
137        // Wrap them as scalars (no repeated buffers)
138        let start_scalar = Scalar::new(start_arr);
139        let end_scalar = Scalar::new(end_arr);
140
141        // 4) Vectorized comparisons:
142        // ge_mask = (ts >= start)
143        // lt_mask = (ts < end)
144        let ge_mask = cmp_kernels::gt_eq(ts_arr, &start_scalar)
145            .map_err(|source| TableError::Arrow { source })?;
146        let lt_mask =
147            cmp_kernels::lt(ts_arr, &end_scalar).map_err(|source| TableError::Arrow { source })?;
148
149        // 5) Combine: keep rows where ts >= start AND ts < end
150        let mask = boolean_kernels::and(&ge_mask, &lt_mask)
151            .map_err(|source| TableError::Arrow { source })?;
152
153        // Note on null semantics:
154        // - If ts_arr[i] is null, both comparisons produce null in the mask.
155        // Arrow's `filter_record_batch` treats null mask values as false,
156        // excluding those rows from results.
157
158        // 6) apply the mask to the whole batch
159        let filtered =
160            filter_record_batch(&$batch, &mask).map_err(|source| TableError::Arrow { source })?;
161
162        if filtered.num_rows() > 0 {
163            $out.push(filtered);
164        }
165
166        Ok::<(), TableError>(())
167    }};
168}
169
170fn to_bounds_i64(
171    field: &Field,
172    column: &str,
173    ts_start: DateTime<Utc>,
174    ts_end: DateTime<Utc>,
175) -> Result<(i64, i64), TableError> {
176    let to_ns = |dt: DateTime<Utc>| {
177        dt.timestamp()
178            .checked_mul(1_000_000_000)
179            .and_then(|secs| secs.checked_add(dt.timestamp_subsec_nanos() as i64))
180            .ok_or_else(|| TableError::TimeConversionOverflow {
181                column: column.to_string(),
182                timestamp: dt,
183            })
184    };
185
186    match field.data_type() {
187        DataType::Timestamp(TimeUnit::Second, _) => Ok((ts_start.timestamp(), ts_end.timestamp())),
188
189        DataType::Timestamp(TimeUnit::Millisecond, _) => {
190            Ok((ts_start.timestamp_millis(), ts_end.timestamp_millis()))
191        }
192
193        DataType::Timestamp(TimeUnit::Microsecond, _) => {
194            Ok((ts_start.timestamp_micros(), ts_end.timestamp_micros()))
195        }
196
197        DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok((to_ns(ts_start)?, to_ns(ts_end)?)),
198
199        other => Err(TableError::UnsupportedTimeType {
200            column: column.to_string(),
201            datatype: other.clone(),
202        }),
203    }
204}
205
206async fn read_segment_range(
207    location: &TableLocation,
208    segment: &SegmentMeta,
209    time_column: &str,
210    ts_start: DateTime<Utc>,
211    ts_end: DateTime<Utc>,
212) -> Result<Vec<RecordBatch>, TableError> {
213    let rel_path = Path::new(&segment.path);
214
215    // 1) Use storage layer to get raw bytes.
216    let bytes = storage::read_all_bytes(location.as_ref(), rel_path)
217        .await
218        .context(StorageSnafu)?;
219
220    // 2) Build a reader over an in-memory cursor.
221    let bytes = Bytes::from(bytes);
222    let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
223        .map_err(|source| TableError::ParquetRead { source })?;
224
225    let reader = builder
226        .build()
227        .map_err(|source| TableError::ParquetRead { source })?;
228
229    // 3) locate the time column and compute numeric bounds
230    let schema = reader.schema();
231    let ts_idx = schema
232        .index_of(time_column)
233        .map_err(|_| TableError::MissingTimeColumn {
234            column: time_column.to_string(),
235        })?;
236
237    let ts_field = schema.field(ts_idx);
238
239    let (start_bound, end_bound) = to_bounds_i64(ts_field, time_column, ts_start, ts_end)?;
240
241    let mut out = Vec::new();
242
243    // 4) iterate over batches and filter them
244    for batch_res in reader {
245        let batch = batch_res.map_err(|source| TableError::Arrow { source })?;
246
247        match ts_field.data_type() {
248            DataType::Timestamp(TimeUnit::Second, _) => {
249                filter_ts_batch!(
250                    TimestampSecondArray,
251                    batch,
252                    ts_idx,
253                    start_bound,
254                    end_bound,
255                    time_column,
256                    ts_field,
257                    out
258                )?;
259            }
260            DataType::Timestamp(TimeUnit::Millisecond, _) => {
261                filter_ts_batch!(
262                    TimestampMillisecondArray,
263                    batch,
264                    ts_idx,
265                    start_bound,
266                    end_bound,
267                    time_column,
268                    ts_field,
269                    out
270                )?;
271            }
272            DataType::Timestamp(TimeUnit::Microsecond, _) => {
273                filter_ts_batch!(
274                    TimestampMicrosecondArray,
275                    batch,
276                    ts_idx,
277                    start_bound,
278                    end_bound,
279                    time_column,
280                    ts_field,
281                    out
282                )?;
283            }
284            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
285                filter_ts_batch!(
286                    TimestampNanosecondArray,
287                    batch,
288                    ts_idx,
289                    start_bound,
290                    end_bound,
291                    time_column,
292                    ts_field,
293                    out
294                )?;
295            }
296            other => {
297                return Err(TableError::UnsupportedTimeType {
298                    column: time_column.to_string(),
299                    datatype: other.clone(),
300                });
301            }
302        }
303    }
304
305    Ok(out)
306}
307
308impl TimeSeriesTable {
309    /// Scan the time-series table for record batches overlapping `[ts_start, ts_end)`,
310    /// returning a stream of filtered batches from the segments covering that range.
311    pub async fn scan_range(
312        &self,
313        ts_start: DateTime<Utc>,
314        ts_end: DateTime<Utc>,
315    ) -> Result<TimeSeriesScan, TableError> {
316        if ts_start >= ts_end {
317            return InvalidRangeSnafu {
318                start: ts_start,
319                end: ts_end,
320            }
321            .fail();
322        }
323
324        let ts_column = self.index.timestamp_column.clone();
325
326        // 1) Pick candidate segments.
327        let mut candidates = segments_for_range(&self.state, ts_start, ts_end);
328
329        // 2) Sort by ts_min to ensure segments are processed in chronological order.
330        //    In v0.1 we assume non-overlapping segments, so sorting guarantees scan order.
331        //    Unstable is fine here; we only care about ordering by ts_min.
332        candidates.sort_unstable_by(cmp_segment_meta_by_time);
333
334        let location = self.location().clone();
335
336        // 3) Build stream: for each segment, read + filter
337        let stream = futures::stream::iter(candidates.into_iter())
338            .then(move |seg| {
339                let location = location.clone();
340                let ts_column = ts_column.clone();
341
342                async move {
343                    let batches =
344                        read_segment_range(&location, &seg, &ts_column, ts_start, ts_end).await?;
345
346                    Ok::<_, TableError>(futures::stream::iter(
347                        batches.into_iter().map(Ok::<_, TableError>),
348                    ))
349                }
350            })
351            .try_flatten();
352
353        Ok(Box::pin(stream))
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use crate::storage::TableLocation;
361    use crate::table::test_util::*;
362
363    use crate::metadata::logical_schema::LogicalTimestampUnit;
364    use crate::metadata::segments::{FileFormat, SegmentId};
365
366    use arrow::datatypes::TimeUnit as ArrowTimeUnit;
367
368    use chrono::{TimeZone, Utc};
369    use futures::StreamExt;
370
371    use tempfile::TempDir;
372
373    #[tokio::test]
374    async fn read_segment_range_errors_when_missing_time_column() -> TestResult {
375        let tmp = TempDir::new()?;
376        let location = TableLocation::local(tmp.path());
377
378        let rel = "data/no-ts.parquet";
379        let path = tmp.path().join(rel);
380        write_parquet_without_time_column(&path, &["A"], &[1.0])?;
381
382        let segment = SegmentMeta {
383            segment_id: SegmentId("seg-no-ts".to_string()),
384            path: rel.to_string(),
385            format: FileFormat::Parquet,
386            ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
387            ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
388            row_count: 1,
389            file_size: None,
390            coverage_path: None,
391        };
392
393        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
394        let end = utc_datetime(2024, 1, 1, 0, 1, 0);
395
396        let err = read_segment_range(&location, &segment, "ts", start, end)
397            .await
398            .expect_err("missing ts column should error");
399
400        assert!(matches!(err, TableError::MissingTimeColumn { .. }));
401        Ok(())
402    }
403
404    #[tokio::test]
405    async fn read_segment_range_errors_on_unsupported_time_type() -> TestResult {
406        let tmp = TempDir::new()?;
407        let location = TableLocation::local(tmp.path());
408
409        let rel = "data/int-ts.parquet";
410        let path = tmp.path().join(rel);
411        let ts_vals = [1_000_i64, 2_000];
412        write_arrow_parquet_int_time(&path, &ts_vals, &["A", "B"], &[1.0, 2.0])?;
413
414        let segment = SegmentMeta {
415            segment_id: SegmentId("seg-int".to_string()),
416            path: rel.to_string(),
417            format: FileFormat::Parquet,
418            ts_min: utc_datetime(2024, 1, 1, 0, 0, 1),
419            ts_max: utc_datetime(2024, 1, 1, 0, 0, 2),
420            row_count: ts_vals.len() as u64,
421            file_size: None,
422            coverage_path: None,
423        };
424
425        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
426        let end = utc_datetime(2024, 1, 1, 0, 1, 0);
427
428        let err = read_segment_range(&location, &segment, "ts", start, end)
429            .await
430            .expect_err("unsupported time type should error");
431
432        assert!(matches!(err, TableError::UnsupportedTimeType { .. }));
433        Ok(())
434    }
435
436    #[tokio::test]
437    async fn read_segment_range_overflow_bounds_nanoseconds() -> TestResult {
438        let tmp = TempDir::new()?;
439        let location = TableLocation::local(tmp.path());
440
441        let rel = "data/nano-empty.parquet";
442        let path = tmp.path().join(rel);
443        write_arrow_parquet_with_unit(&path, ArrowTimeUnit::Nanosecond, &[], &[], &[])?;
444
445        let segment = SegmentMeta {
446            segment_id: SegmentId("seg-nano-empty".to_string()),
447            path: rel.to_string(),
448            format: FileFormat::Parquet,
449            ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
450            ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
451            row_count: 0,
452            file_size: None,
453            coverage_path: None,
454        };
455
456        let huge = Utc
457            .timestamp_opt(9_223_372_037, 0)
458            .single()
459            .expect("overflow ts");
460        let err = read_segment_range(&location, &segment, "ts", huge, huge)
461            .await
462            .expect_err("overflow during bound conversion should error");
463
464        assert!(matches!(err, TableError::TimeConversionOverflow { .. }));
465        Ok(())
466    }
467
468    #[tokio::test]
469    async fn scan_range_filters_and_orders_across_segments() -> TestResult {
470        let tmp = TempDir::new()?;
471        let location = TableLocation::local(tmp.path());
472        let meta = make_basic_table_meta();
473        let mut table = TimeSeriesTable::create(location, meta).await?;
474
475        let rel1 = "data/seg-scan-1.parquet";
476        let path1 = tmp.path().join(rel1);
477        write_test_parquet(
478            &path1,
479            true,
480            false,
481            &[
482                TestRow {
483                    ts_millis: 1_000,
484                    symbol: "A",
485                    price: 10.0,
486                },
487                TestRow {
488                    ts_millis: 2_000,
489                    symbol: "A",
490                    price: 20.0,
491                },
492            ],
493        )?;
494
495        let rel2 = "data/seg-scan-2.parquet";
496        let path2 = tmp.path().join(rel2);
497        write_test_parquet(
498            &path2,
499            true,
500            false,
501            &[
502                TestRow {
503                    ts_millis: 61_000,
504                    symbol: "A",
505                    price: 30.0,
506                },
507                TestRow {
508                    ts_millis: 62_000,
509                    symbol: "A",
510                    price: 40.0,
511                },
512            ],
513        )?;
514
515        table
516            .append_parquet_segment_with_id(SegmentId("seg-scan-1".to_string()), rel1, "ts")
517            .await?;
518        table
519            .append_parquet_segment_with_id(SegmentId("seg-scan-2".to_string()), rel2, "ts")
520            .await?;
521
522        // Query spans both segments but excludes the last row of the second segment.
523        let start = Utc.timestamp_millis_opt(1_500).single().expect("valid ts");
524        let end = Utc.timestamp_millis_opt(61_500).single().expect("valid ts");
525
526        let rows = collect_scan_rows(&table, start, end).await?;
527
528        assert_eq!(
529            rows,
530            vec![
531                (2_000, "A".to_string(), 20.0),
532                (61_000, "A".to_string(), 30.0),
533            ]
534        );
535
536        Ok(())
537    }
538
539    #[tokio::test]
540    async fn scan_range_exclusive_end_and_empty() -> TestResult {
541        let tmp = TempDir::new()?;
542        let location = TableLocation::local(tmp.path());
543        let meta = make_basic_table_meta();
544        let mut table = TimeSeriesTable::create(location, meta).await?;
545
546        let rel = "data/seg-boundary.parquet";
547        let path = tmp.path().join(rel);
548        write_test_parquet(
549            &path,
550            true,
551            false,
552            &[
553                TestRow {
554                    ts_millis: 1_000,
555                    symbol: "A",
556                    price: 10.0,
557                },
558                TestRow {
559                    ts_millis: 2_000,
560                    symbol: "A",
561                    price: 20.0,
562                },
563            ],
564        )?;
565
566        table
567            .append_parquet_segment_with_id(SegmentId("seg-boundary".to_string()), rel, "ts")
568            .await?;
569
570        let start = Utc.timestamp_millis_opt(1_000).single().expect("valid ts");
571        let end = Utc.timestamp_millis_opt(2_000).single().expect("valid ts");
572        let rows = collect_scan_rows(&table, start, end).await?;
573        assert_eq!(rows, vec![(1_000, "A".to_string(), 10.0)]);
574
575        let empty_start = Utc.timestamp_millis_opt(5_000).single().expect("valid ts");
576        let empty_end = Utc.timestamp_millis_opt(6_000).single().expect("valid ts");
577        let rows = collect_scan_rows(&table, empty_start, empty_end).await?;
578        assert!(rows.is_empty());
579
580        Ok(())
581    }
582
583    #[tokio::test]
584    async fn scan_range_rejects_invalid_range() -> TestResult {
585        let tmp = TempDir::new()?;
586        let location = TableLocation::local(tmp.path());
587        let meta = make_basic_table_meta();
588        let table = TimeSeriesTable::create(location, meta).await?;
589
590        let start = Utc.timestamp_millis_opt(1_000).single().expect("valid ts");
591        let end = start;
592
593        let result = table.scan_range(start, end).await;
594
595        assert!(matches!(result, Err(TableError::InvalidRange { .. })));
596        Ok(())
597    }
598
599    #[tokio::test]
600    async fn scan_range_supports_microsecond_unit() -> TestResult {
601        let tmp = TempDir::new()?;
602        let location = TableLocation::local(tmp.path());
603        let meta = make_table_meta_with_unit(LogicalTimestampUnit::Micros);
604        let mut table = TimeSeriesTable::create(location, meta).await?;
605
606        let rel = "data/seg-micros.parquet";
607        let path = tmp.path().join(rel);
608        write_arrow_parquet_with_unit(
609            &path,
610            ArrowTimeUnit::Microsecond,
611            &[Some(1_000_000), Some(2_000_000), Some(3_000_000)],
612            &["A", "A", "A"],
613            &[1.0, 2.0, 3.0],
614        )?;
615
616        table
617            .append_parquet_segment_with_id(SegmentId("seg-micros".to_string()), rel, "ts")
618            .await?;
619
620        let start = Utc
621            .timestamp_opt(1, 500_000_000)
622            .single()
623            .expect("valid start");
624        let end = Utc
625            .timestamp_opt(2, 500_000_000)
626            .single()
627            .expect("valid end");
628        let rows = collect_scan_rows(&table, start, end).await?;
629
630        assert_eq!(rows, vec![(2_000_000, "A".to_string(), 2.0)]);
631        Ok(())
632    }
633
634    #[tokio::test]
635    async fn scan_range_supports_nanosecond_unit() -> TestResult {
636        let tmp = TempDir::new()?;
637        let location = TableLocation::local(tmp.path());
638        let meta = make_table_meta_with_unit(LogicalTimestampUnit::Nanos);
639        let mut table = TimeSeriesTable::create(location, meta).await?;
640
641        let rel = "data/seg-nanos.parquet";
642        let path = tmp.path().join(rel);
643        write_arrow_parquet_with_unit(
644            &path,
645            ArrowTimeUnit::Nanosecond,
646            &[
647                Some(1_000_000_000),
648                Some(1_500_000_000),
649                Some(2_000_000_000),
650            ],
651            &["A", "A", "A"],
652            &[1.0, 2.0, 3.0],
653        )?;
654
655        table
656            .append_parquet_segment_with_id(SegmentId("seg-nanos".to_string()), rel, "ts")
657            .await?;
658
659        let start = Utc
660            .timestamp_opt(1, 250_000_000)
661            .single()
662            .expect("valid start");
663        let end = Utc
664            .timestamp_opt(1, 750_000_000)
665            .single()
666            .expect("valid end");
667        let rows = collect_scan_rows(&table, start, end).await?;
668
669        assert_eq!(rows, vec![(1_500_000_000, "A".to_string(), 2.0)]);
670        Ok(())
671    }
672
673    #[tokio::test]
674    async fn scan_range_filters_null_timestamps() -> TestResult {
675        let tmp = TempDir::new()?;
676        let location = TableLocation::local(tmp.path());
677        let meta = make_table_meta_with_unit(LogicalTimestampUnit::Millis);
678        let mut table = TimeSeriesTable::create(location, meta).await?;
679
680        let rel = "data/seg-null-ts.parquet";
681        let path = tmp.path().join(rel);
682        write_arrow_parquet_with_unit(
683            &path,
684            ArrowTimeUnit::Millisecond,
685            &[Some(1_000), None, Some(2_000)],
686            &["A", "A", "A"],
687            &[1.0, 2.0, 3.0],
688        )?;
689
690        table
691            .append_parquet_segment_with_id(SegmentId("seg-null".to_string()), rel, "ts")
692            .await?;
693
694        let start = Utc.timestamp_millis_opt(500).single().unwrap();
695        let end = Utc.timestamp_millis_opt(2_500).single().unwrap();
696        let rows = collect_scan_rows(&table, start, end).await?;
697
698        assert_eq!(
699            rows,
700            vec![(1_000, "A".to_string(), 1.0), (2_000, "A".to_string(), 3.0)]
701        );
702        Ok(())
703    }
704
705    #[tokio::test]
706    async fn scan_range_empty_when_no_segments() -> TestResult {
707        let tmp = TempDir::new()?;
708        let location = TableLocation::local(tmp.path());
709        let meta = make_basic_table_meta();
710        let table = TimeSeriesTable::create(location, meta).await?;
711
712        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
713        let end = utc_datetime(2024, 1, 1, 0, 1, 0);
714
715        let mut stream = table.scan_range(start, end).await?;
716        assert!(stream.next().await.is_none());
717        Ok(())
718    }
719
720    #[tokio::test]
721    async fn scan_range_empty_for_zero_row_segment() -> TestResult {
722        let tmp = TempDir::new()?;
723        let location = TableLocation::local(tmp.path());
724        let meta = make_basic_table_meta();
725        let mut table = TimeSeriesTable::create(location, meta).await?;
726
727        let rel = "data/seg-empty.parquet";
728        let path = tmp.path().join(rel);
729        write_arrow_parquet_with_unit(&path, ArrowTimeUnit::Millisecond, &[], &[], &[])?;
730
731        let segment = SegmentMeta {
732            segment_id: SegmentId("seg-empty".to_string()),
733            path: rel.to_string(),
734            format: FileFormat::Parquet,
735            ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
736            ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
737            row_count: 0,
738            file_size: None,
739            coverage_path: None,
740        };
741
742        table
743            .state
744            .segments
745            .insert(segment.segment_id.clone(), segment);
746
747        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
748        let end = utc_datetime(2024, 1, 1, 0, 1, 0);
749
750        let mut stream = table.scan_range(start, end).await?;
751        assert!(stream.next().await.is_none());
752        Ok(())
753    }
754
755    #[tokio::test]
756    async fn scan_range_all_null_time_filtered_out() -> TestResult {
757        let tmp = TempDir::new()?;
758        let location = TableLocation::local(tmp.path());
759        let meta = make_table_meta_with_unit(LogicalTimestampUnit::Millis);
760        let mut table = TimeSeriesTable::create(location, meta).await?;
761
762        let rel = "data/seg-null-only.parquet";
763        let path = tmp.path().join(rel);
764        write_arrow_parquet_with_unit(
765            &path,
766            ArrowTimeUnit::Millisecond,
767            &[None, None],
768            &["A", "B"],
769            &[1.0, 2.0],
770        )?;
771
772        let segment = SegmentMeta {
773            segment_id: SegmentId("seg-null-only".to_string()),
774            path: rel.to_string(),
775            format: FileFormat::Parquet,
776            ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
777            ts_max: utc_datetime(2024, 1, 1, 0, 0, 1),
778            row_count: 2,
779            file_size: None,
780            coverage_path: None,
781        };
782
783        table
784            .state
785            .segments
786            .insert(segment.segment_id.clone(), segment);
787
788        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
789        let end = utc_datetime(2024, 1, 1, 0, 0, 5);
790
791        let mut stream = table.scan_range(start, end).await?;
792        assert!(stream.next().await.is_none());
793        Ok(())
794    }
795
796    #[tokio::test]
797    async fn scan_range_errors_on_missing_time_column_in_segment() -> TestResult {
798        let tmp = TempDir::new()?;
799        let location = TableLocation::local(tmp.path());
800        let meta = make_basic_table_meta();
801        let mut table = TimeSeriesTable::create(location, meta).await?;
802
803        let rel = "data/seg-scan-no-ts.parquet";
804        let path = tmp.path().join(rel);
805        write_parquet_without_time_column(&path, &["A"], &[1.0])?;
806
807        let segment = SegmentMeta {
808            segment_id: SegmentId("seg-scan-no-ts".to_string()),
809            path: rel.to_string(),
810            format: FileFormat::Parquet,
811            ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
812            ts_max: utc_datetime(2024, 1, 1, 0, 1, 0),
813            row_count: 1,
814            file_size: None,
815            coverage_path: None,
816        };
817
818        table
819            .state
820            .segments
821            .insert(segment.segment_id.clone(), segment);
822
823        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
824        let end = utc_datetime(2024, 1, 1, 0, 2, 0);
825
826        let mut stream = table.scan_range(start, end).await?;
827        let err = stream.next().await.expect("expected error from scan");
828
829        assert!(matches!(err, Err(TableError::MissingTimeColumn { .. })));
830        Ok(())
831    }
832
833    #[tokio::test]
834    async fn scan_range_errors_on_unsupported_time_type_segment() -> TestResult {
835        let tmp = TempDir::new()?;
836        let location = TableLocation::local(tmp.path());
837        let meta = make_basic_table_meta();
838        let mut table = TimeSeriesTable::create(location, meta).await?;
839
840        let rel = "data/seg-scan-int-ts.parquet";
841        let path = tmp.path().join(rel);
842        write_arrow_parquet_int_time(&path, &[1_000], &["A"], &[1.0])?;
843
844        let segment = SegmentMeta {
845            segment_id: SegmentId("seg-scan-int".to_string()),
846            path: rel.to_string(),
847            format: FileFormat::Parquet,
848            ts_min: utc_datetime(2024, 1, 1, 0, 0, 1),
849            ts_max: utc_datetime(2024, 1, 1, 0, 0, 1),
850            row_count: 1,
851            file_size: None,
852            coverage_path: None,
853        };
854
855        table
856            .state
857            .segments
858            .insert(segment.segment_id.clone(), segment);
859
860        let start = utc_datetime(2024, 1, 1, 0, 0, 0);
861        let end = utc_datetime(2024, 1, 1, 0, 1, 0);
862
863        let mut stream = table.scan_range(start, end).await?;
864        let err = stream.next().await.expect("expected error from scan");
865
866        assert!(matches!(err, Err(TableError::UnsupportedTimeType { .. })));
867        Ok(())
868    }
869
870    #[tokio::test]
871    async fn scan_range_orders_segments_by_ts_min() -> TestResult {
872        let tmp = TempDir::new()?;
873        let location = TableLocation::local(tmp.path());
874        let meta = make_basic_table_meta();
875        let mut table = TimeSeriesTable::create(location, meta).await?;
876
877        let rel_b = "data/seg-overlap-b.parquet";
878        let path_b = tmp.path().join(rel_b);
879        write_test_parquet(
880            &path_b,
881            true,
882            false,
883            &[TestRow {
884                ts_millis: 120_000,
885                symbol: "A",
886                price: 2.0,
887            }],
888        )?;
889
890        let rel_a = "data/seg-overlap-a.parquet";
891        let path_a = tmp.path().join(rel_a);
892        write_test_parquet(
893            &path_a,
894            true,
895            false,
896            &[TestRow {
897                ts_millis: 60_000,
898                symbol: "A",
899                price: 1.0,
900            }],
901        )?;
902
903        // append in reverse ts_min order to ensure sort_by_key is exercised
904        table
905            .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel_b, "ts")
906            .await?;
907        table
908            .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel_a, "ts")
909            .await?;
910
911        let start = Utc.timestamp_millis_opt(50_000).single().unwrap();
912        let end = Utc.timestamp_millis_opt(150_000).single().unwrap();
913        let rows = collect_scan_rows(&table, start, end).await?;
914
915        assert_eq!(
916            rows,
917            vec![
918                (60_000, "A".to_string(), 1.0),
919                (120_000, "A".to_string(), 2.0)
920            ]
921        );
922        Ok(())
923    }
924
925    #[tokio::test]
926    async fn scan_range_skips_non_overlapping_segments() -> TestResult {
927        let tmp = TempDir::new()?;
928        let location = TableLocation::local(tmp.path());
929        let meta = make_basic_table_meta();
930        let mut table = TimeSeriesTable::create(location, meta).await?;
931
932        let rel1 = "data/seg-early.parquet";
933        let path1 = tmp.path().join(rel1);
934        write_test_parquet(
935            &path1,
936            true,
937            false,
938            &[TestRow {
939                ts_millis: 1_000,
940                symbol: "A",
941                price: 1.0,
942            }],
943        )?;
944
945        let rel2 = "data/seg-late.parquet";
946        let path2 = tmp.path().join(rel2);
947        write_test_parquet(
948            &path2,
949            true,
950            false,
951            &[TestRow {
952                ts_millis: 70_000,
953                symbol: "A",
954                price: 9.0,
955            }],
956        )?;
957
958        table
959            .append_parquet_segment_with_id(SegmentId("seg-early".to_string()), rel1, "ts")
960            .await?;
961        table
962            .append_parquet_segment_with_id(SegmentId("seg-late".to_string()), rel2, "ts")
963            .await?;
964
965        let start = Utc.timestamp_millis_opt(1_500).single().unwrap();
966        let end = Utc.timestamp_millis_opt(2_000).single().unwrap();
967        let rows = collect_scan_rows(&table, start, end).await?;
968
969        assert_eq!(rows, Vec::new());
970        Ok(())
971    }
972
973    #[tokio::test]
974    async fn scan_range_propagates_parquet_read_error() -> TestResult {
975        let tmp = TempDir::new()?;
976        let location = TableLocation::local(tmp.path());
977        let meta = make_basic_table_meta();
978        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
979
980        let rel = "data/seg-corrupt.parquet";
981        let path = tmp.path().join(rel);
982        write_test_parquet(
983            &path,
984            true,
985            false,
986            &[TestRow {
987                ts_millis: 1_000,
988                symbol: "A",
989                price: 1.0,
990            }],
991        )?;
992
993        table
994            .append_parquet_segment_with_id(SegmentId("seg-corrupt".to_string()), rel, "ts")
995            .await?;
996
997        // Corrupt the file after append so scan encounters a read failure.
998        let f = std::fs::OpenOptions::new().write(true).open(&path)?;
999        f.set_len(4)?;
1000
1001        let start = Utc.timestamp_millis_opt(0).single().unwrap();
1002        let end = Utc.timestamp_millis_opt(2_000).single().unwrap();
1003
1004        let mut stream = table.scan_range(start, end).await?;
1005        let err = stream.next().await.expect("first item should be error");
1006
1007        assert!(matches!(err, Err(TableError::ParquetRead { .. })));
1008        Ok(())
1009    }
1010}