1use std::path::Path;
19
20use arrow::array::Scalar;
21use arrow::array::{
22 Array, RecordBatchReader, TimestampMicrosecondArray, TimestampMillisecondArray,
23 TimestampNanosecondArray, TimestampSecondArray,
24};
25use arrow::compute::filter_record_batch;
26use arrow::compute::kernels::{boolean as boolean_kernels, cmp as cmp_kernels};
27use arrow::datatypes::{Field, TimeUnit};
28use arrow::{array::RecordBatch, datatypes::DataType};
29use bytes::Bytes;
30use chrono::{DateTime, Utc};
31use futures::{StreamExt, TryStreamExt};
32use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
33use snafu::prelude::*;
34
35use crate::metadata::segments::SegmentMeta;
36use crate::storage::{self, TableLocation};
37use crate::transaction_log::TableState;
38
39use super::error::{InvalidRangeSnafu, StorageSnafu, TableError};
40use super::{TimeSeriesScan, TimeSeriesTable};
41use crate::metadata::segments::cmp_segment_meta_by_time;
42
43fn segments_for_range(
44 state: &TableState,
45 ts_start: DateTime<Utc>,
46 ts_end: DateTime<Utc>,
47) -> Vec<SegmentMeta> {
48 state
49 .segments
50 .values()
51 .filter(|seg| {
52 seg.ts_max >= ts_start && seg.ts_min < ts_end
56 })
57 .cloned()
58 .collect()
59}
60
61macro_rules! filter_ts_batch {
105 ($array_ty: ty,
106 $batch:expr,
107 $ts_idx:expr,
108 $start_bound:expr,
109 $end_bound:expr,
110 $time_col:expr,
111 $ts_field:expr,
112 $out:expr
113) => {{
114 let col = $batch.column($ts_idx);
116 let ts_arr = col.as_any().downcast_ref::<$array_ty>().ok_or_else(|| {
117 TableError::UnsupportedTimeType {
118 column: $time_col.to_string(),
119 datatype: $ts_field.data_type().clone(),
120 }
121 })?;
122
123 let tz_opt = match ts_arr.data_type() {
125 DataType::Timestamp(_, tz_opt) => tz_opt.clone(),
126 _ => None,
127 };
128
129 let start_arr = <$array_ty>::from(vec![$start_bound]).with_timezone_opt(tz_opt.clone());
135 let end_arr = <$array_ty>::from(vec![$end_bound]).with_timezone_opt(tz_opt);
136
137 let start_scalar = Scalar::new(start_arr);
139 let end_scalar = Scalar::new(end_arr);
140
141 let ge_mask = cmp_kernels::gt_eq(ts_arr, &start_scalar)
145 .map_err(|source| TableError::Arrow { source })?;
146 let lt_mask =
147 cmp_kernels::lt(ts_arr, &end_scalar).map_err(|source| TableError::Arrow { source })?;
148
149 let mask = boolean_kernels::and(&ge_mask, <_mask)
151 .map_err(|source| TableError::Arrow { source })?;
152
153 let filtered =
160 filter_record_batch(&$batch, &mask).map_err(|source| TableError::Arrow { source })?;
161
162 if filtered.num_rows() > 0 {
163 $out.push(filtered);
164 }
165
166 Ok::<(), TableError>(())
167 }};
168}
169
170fn to_bounds_i64(
171 field: &Field,
172 column: &str,
173 ts_start: DateTime<Utc>,
174 ts_end: DateTime<Utc>,
175) -> Result<(i64, i64), TableError> {
176 let to_ns = |dt: DateTime<Utc>| {
177 dt.timestamp()
178 .checked_mul(1_000_000_000)
179 .and_then(|secs| secs.checked_add(dt.timestamp_subsec_nanos() as i64))
180 .ok_or_else(|| TableError::TimeConversionOverflow {
181 column: column.to_string(),
182 timestamp: dt,
183 })
184 };
185
186 match field.data_type() {
187 DataType::Timestamp(TimeUnit::Second, _) => Ok((ts_start.timestamp(), ts_end.timestamp())),
188
189 DataType::Timestamp(TimeUnit::Millisecond, _) => {
190 Ok((ts_start.timestamp_millis(), ts_end.timestamp_millis()))
191 }
192
193 DataType::Timestamp(TimeUnit::Microsecond, _) => {
194 Ok((ts_start.timestamp_micros(), ts_end.timestamp_micros()))
195 }
196
197 DataType::Timestamp(TimeUnit::Nanosecond, _) => Ok((to_ns(ts_start)?, to_ns(ts_end)?)),
198
199 other => Err(TableError::UnsupportedTimeType {
200 column: column.to_string(),
201 datatype: other.clone(),
202 }),
203 }
204}
205
206async fn read_segment_range(
207 location: &TableLocation,
208 segment: &SegmentMeta,
209 time_column: &str,
210 ts_start: DateTime<Utc>,
211 ts_end: DateTime<Utc>,
212) -> Result<Vec<RecordBatch>, TableError> {
213 let rel_path = Path::new(&segment.path);
214
215 let bytes = storage::read_all_bytes(location.as_ref(), rel_path)
217 .await
218 .context(StorageSnafu)?;
219
220 let bytes = Bytes::from(bytes);
222 let builder = ParquetRecordBatchReaderBuilder::try_new(bytes)
223 .map_err(|source| TableError::ParquetRead { source })?;
224
225 let reader = builder
226 .build()
227 .map_err(|source| TableError::ParquetRead { source })?;
228
229 let schema = reader.schema();
231 let ts_idx = schema
232 .index_of(time_column)
233 .map_err(|_| TableError::MissingTimeColumn {
234 column: time_column.to_string(),
235 })?;
236
237 let ts_field = schema.field(ts_idx);
238
239 let (start_bound, end_bound) = to_bounds_i64(ts_field, time_column, ts_start, ts_end)?;
240
241 let mut out = Vec::new();
242
243 for batch_res in reader {
245 let batch = batch_res.map_err(|source| TableError::Arrow { source })?;
246
247 match ts_field.data_type() {
248 DataType::Timestamp(TimeUnit::Second, _) => {
249 filter_ts_batch!(
250 TimestampSecondArray,
251 batch,
252 ts_idx,
253 start_bound,
254 end_bound,
255 time_column,
256 ts_field,
257 out
258 )?;
259 }
260 DataType::Timestamp(TimeUnit::Millisecond, _) => {
261 filter_ts_batch!(
262 TimestampMillisecondArray,
263 batch,
264 ts_idx,
265 start_bound,
266 end_bound,
267 time_column,
268 ts_field,
269 out
270 )?;
271 }
272 DataType::Timestamp(TimeUnit::Microsecond, _) => {
273 filter_ts_batch!(
274 TimestampMicrosecondArray,
275 batch,
276 ts_idx,
277 start_bound,
278 end_bound,
279 time_column,
280 ts_field,
281 out
282 )?;
283 }
284 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
285 filter_ts_batch!(
286 TimestampNanosecondArray,
287 batch,
288 ts_idx,
289 start_bound,
290 end_bound,
291 time_column,
292 ts_field,
293 out
294 )?;
295 }
296 other => {
297 return Err(TableError::UnsupportedTimeType {
298 column: time_column.to_string(),
299 datatype: other.clone(),
300 });
301 }
302 }
303 }
304
305 Ok(out)
306}
307
308impl TimeSeriesTable {
309 pub async fn scan_range(
312 &self,
313 ts_start: DateTime<Utc>,
314 ts_end: DateTime<Utc>,
315 ) -> Result<TimeSeriesScan, TableError> {
316 if ts_start >= ts_end {
317 return InvalidRangeSnafu {
318 start: ts_start,
319 end: ts_end,
320 }
321 .fail();
322 }
323
324 let ts_column = self.index.timestamp_column.clone();
325
326 let mut candidates = segments_for_range(&self.state, ts_start, ts_end);
328
329 candidates.sort_unstable_by(cmp_segment_meta_by_time);
333
334 let location = self.location().clone();
335
336 let stream = futures::stream::iter(candidates.into_iter())
338 .then(move |seg| {
339 let location = location.clone();
340 let ts_column = ts_column.clone();
341
342 async move {
343 let batches =
344 read_segment_range(&location, &seg, &ts_column, ts_start, ts_end).await?;
345
346 Ok::<_, TableError>(futures::stream::iter(
347 batches.into_iter().map(Ok::<_, TableError>),
348 ))
349 }
350 })
351 .try_flatten();
352
353 Ok(Box::pin(stream))
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::storage::TableLocation;
361 use crate::table::test_util::*;
362
363 use crate::metadata::logical_schema::LogicalTimestampUnit;
364 use crate::metadata::segments::{FileFormat, SegmentId};
365
366 use arrow::datatypes::TimeUnit as ArrowTimeUnit;
367
368 use chrono::{TimeZone, Utc};
369 use futures::StreamExt;
370
371 use tempfile::TempDir;
372
373 #[tokio::test]
374 async fn read_segment_range_errors_when_missing_time_column() -> TestResult {
375 let tmp = TempDir::new()?;
376 let location = TableLocation::local(tmp.path());
377
378 let rel = "data/no-ts.parquet";
379 let path = tmp.path().join(rel);
380 write_parquet_without_time_column(&path, &["A"], &[1.0])?;
381
382 let segment = SegmentMeta {
383 segment_id: SegmentId("seg-no-ts".to_string()),
384 path: rel.to_string(),
385 format: FileFormat::Parquet,
386 ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
387 ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
388 row_count: 1,
389 file_size: None,
390 coverage_path: None,
391 };
392
393 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
394 let end = utc_datetime(2024, 1, 1, 0, 1, 0);
395
396 let err = read_segment_range(&location, &segment, "ts", start, end)
397 .await
398 .expect_err("missing ts column should error");
399
400 assert!(matches!(err, TableError::MissingTimeColumn { .. }));
401 Ok(())
402 }
403
404 #[tokio::test]
405 async fn read_segment_range_errors_on_unsupported_time_type() -> TestResult {
406 let tmp = TempDir::new()?;
407 let location = TableLocation::local(tmp.path());
408
409 let rel = "data/int-ts.parquet";
410 let path = tmp.path().join(rel);
411 let ts_vals = [1_000_i64, 2_000];
412 write_arrow_parquet_int_time(&path, &ts_vals, &["A", "B"], &[1.0, 2.0])?;
413
414 let segment = SegmentMeta {
415 segment_id: SegmentId("seg-int".to_string()),
416 path: rel.to_string(),
417 format: FileFormat::Parquet,
418 ts_min: utc_datetime(2024, 1, 1, 0, 0, 1),
419 ts_max: utc_datetime(2024, 1, 1, 0, 0, 2),
420 row_count: ts_vals.len() as u64,
421 file_size: None,
422 coverage_path: None,
423 };
424
425 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
426 let end = utc_datetime(2024, 1, 1, 0, 1, 0);
427
428 let err = read_segment_range(&location, &segment, "ts", start, end)
429 .await
430 .expect_err("unsupported time type should error");
431
432 assert!(matches!(err, TableError::UnsupportedTimeType { .. }));
433 Ok(())
434 }
435
436 #[tokio::test]
437 async fn read_segment_range_overflow_bounds_nanoseconds() -> TestResult {
438 let tmp = TempDir::new()?;
439 let location = TableLocation::local(tmp.path());
440
441 let rel = "data/nano-empty.parquet";
442 let path = tmp.path().join(rel);
443 write_arrow_parquet_with_unit(&path, ArrowTimeUnit::Nanosecond, &[], &[], &[])?;
444
445 let segment = SegmentMeta {
446 segment_id: SegmentId("seg-nano-empty".to_string()),
447 path: rel.to_string(),
448 format: FileFormat::Parquet,
449 ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
450 ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
451 row_count: 0,
452 file_size: None,
453 coverage_path: None,
454 };
455
456 let huge = Utc
457 .timestamp_opt(9_223_372_037, 0)
458 .single()
459 .expect("overflow ts");
460 let err = read_segment_range(&location, &segment, "ts", huge, huge)
461 .await
462 .expect_err("overflow during bound conversion should error");
463
464 assert!(matches!(err, TableError::TimeConversionOverflow { .. }));
465 Ok(())
466 }
467
468 #[tokio::test]
469 async fn scan_range_filters_and_orders_across_segments() -> TestResult {
470 let tmp = TempDir::new()?;
471 let location = TableLocation::local(tmp.path());
472 let meta = make_basic_table_meta();
473 let mut table = TimeSeriesTable::create(location, meta).await?;
474
475 let rel1 = "data/seg-scan-1.parquet";
476 let path1 = tmp.path().join(rel1);
477 write_test_parquet(
478 &path1,
479 true,
480 false,
481 &[
482 TestRow {
483 ts_millis: 1_000,
484 symbol: "A",
485 price: 10.0,
486 },
487 TestRow {
488 ts_millis: 2_000,
489 symbol: "A",
490 price: 20.0,
491 },
492 ],
493 )?;
494
495 let rel2 = "data/seg-scan-2.parquet";
496 let path2 = tmp.path().join(rel2);
497 write_test_parquet(
498 &path2,
499 true,
500 false,
501 &[
502 TestRow {
503 ts_millis: 61_000,
504 symbol: "A",
505 price: 30.0,
506 },
507 TestRow {
508 ts_millis: 62_000,
509 symbol: "A",
510 price: 40.0,
511 },
512 ],
513 )?;
514
515 table
516 .append_parquet_segment_with_id(SegmentId("seg-scan-1".to_string()), rel1, "ts")
517 .await?;
518 table
519 .append_parquet_segment_with_id(SegmentId("seg-scan-2".to_string()), rel2, "ts")
520 .await?;
521
522 let start = Utc.timestamp_millis_opt(1_500).single().expect("valid ts");
524 let end = Utc.timestamp_millis_opt(61_500).single().expect("valid ts");
525
526 let rows = collect_scan_rows(&table, start, end).await?;
527
528 assert_eq!(
529 rows,
530 vec![
531 (2_000, "A".to_string(), 20.0),
532 (61_000, "A".to_string(), 30.0),
533 ]
534 );
535
536 Ok(())
537 }
538
539 #[tokio::test]
540 async fn scan_range_exclusive_end_and_empty() -> TestResult {
541 let tmp = TempDir::new()?;
542 let location = TableLocation::local(tmp.path());
543 let meta = make_basic_table_meta();
544 let mut table = TimeSeriesTable::create(location, meta).await?;
545
546 let rel = "data/seg-boundary.parquet";
547 let path = tmp.path().join(rel);
548 write_test_parquet(
549 &path,
550 true,
551 false,
552 &[
553 TestRow {
554 ts_millis: 1_000,
555 symbol: "A",
556 price: 10.0,
557 },
558 TestRow {
559 ts_millis: 2_000,
560 symbol: "A",
561 price: 20.0,
562 },
563 ],
564 )?;
565
566 table
567 .append_parquet_segment_with_id(SegmentId("seg-boundary".to_string()), rel, "ts")
568 .await?;
569
570 let start = Utc.timestamp_millis_opt(1_000).single().expect("valid ts");
571 let end = Utc.timestamp_millis_opt(2_000).single().expect("valid ts");
572 let rows = collect_scan_rows(&table, start, end).await?;
573 assert_eq!(rows, vec![(1_000, "A".to_string(), 10.0)]);
574
575 let empty_start = Utc.timestamp_millis_opt(5_000).single().expect("valid ts");
576 let empty_end = Utc.timestamp_millis_opt(6_000).single().expect("valid ts");
577 let rows = collect_scan_rows(&table, empty_start, empty_end).await?;
578 assert!(rows.is_empty());
579
580 Ok(())
581 }
582
583 #[tokio::test]
584 async fn scan_range_rejects_invalid_range() -> TestResult {
585 let tmp = TempDir::new()?;
586 let location = TableLocation::local(tmp.path());
587 let meta = make_basic_table_meta();
588 let table = TimeSeriesTable::create(location, meta).await?;
589
590 let start = Utc.timestamp_millis_opt(1_000).single().expect("valid ts");
591 let end = start;
592
593 let result = table.scan_range(start, end).await;
594
595 assert!(matches!(result, Err(TableError::InvalidRange { .. })));
596 Ok(())
597 }
598
599 #[tokio::test]
600 async fn scan_range_supports_microsecond_unit() -> TestResult {
601 let tmp = TempDir::new()?;
602 let location = TableLocation::local(tmp.path());
603 let meta = make_table_meta_with_unit(LogicalTimestampUnit::Micros);
604 let mut table = TimeSeriesTable::create(location, meta).await?;
605
606 let rel = "data/seg-micros.parquet";
607 let path = tmp.path().join(rel);
608 write_arrow_parquet_with_unit(
609 &path,
610 ArrowTimeUnit::Microsecond,
611 &[Some(1_000_000), Some(2_000_000), Some(3_000_000)],
612 &["A", "A", "A"],
613 &[1.0, 2.0, 3.0],
614 )?;
615
616 table
617 .append_parquet_segment_with_id(SegmentId("seg-micros".to_string()), rel, "ts")
618 .await?;
619
620 let start = Utc
621 .timestamp_opt(1, 500_000_000)
622 .single()
623 .expect("valid start");
624 let end = Utc
625 .timestamp_opt(2, 500_000_000)
626 .single()
627 .expect("valid end");
628 let rows = collect_scan_rows(&table, start, end).await?;
629
630 assert_eq!(rows, vec![(2_000_000, "A".to_string(), 2.0)]);
631 Ok(())
632 }
633
634 #[tokio::test]
635 async fn scan_range_supports_nanosecond_unit() -> TestResult {
636 let tmp = TempDir::new()?;
637 let location = TableLocation::local(tmp.path());
638 let meta = make_table_meta_with_unit(LogicalTimestampUnit::Nanos);
639 let mut table = TimeSeriesTable::create(location, meta).await?;
640
641 let rel = "data/seg-nanos.parquet";
642 let path = tmp.path().join(rel);
643 write_arrow_parquet_with_unit(
644 &path,
645 ArrowTimeUnit::Nanosecond,
646 &[
647 Some(1_000_000_000),
648 Some(1_500_000_000),
649 Some(2_000_000_000),
650 ],
651 &["A", "A", "A"],
652 &[1.0, 2.0, 3.0],
653 )?;
654
655 table
656 .append_parquet_segment_with_id(SegmentId("seg-nanos".to_string()), rel, "ts")
657 .await?;
658
659 let start = Utc
660 .timestamp_opt(1, 250_000_000)
661 .single()
662 .expect("valid start");
663 let end = Utc
664 .timestamp_opt(1, 750_000_000)
665 .single()
666 .expect("valid end");
667 let rows = collect_scan_rows(&table, start, end).await?;
668
669 assert_eq!(rows, vec![(1_500_000_000, "A".to_string(), 2.0)]);
670 Ok(())
671 }
672
673 #[tokio::test]
674 async fn scan_range_filters_null_timestamps() -> TestResult {
675 let tmp = TempDir::new()?;
676 let location = TableLocation::local(tmp.path());
677 let meta = make_table_meta_with_unit(LogicalTimestampUnit::Millis);
678 let mut table = TimeSeriesTable::create(location, meta).await?;
679
680 let rel = "data/seg-null-ts.parquet";
681 let path = tmp.path().join(rel);
682 write_arrow_parquet_with_unit(
683 &path,
684 ArrowTimeUnit::Millisecond,
685 &[Some(1_000), None, Some(2_000)],
686 &["A", "A", "A"],
687 &[1.0, 2.0, 3.0],
688 )?;
689
690 table
691 .append_parquet_segment_with_id(SegmentId("seg-null".to_string()), rel, "ts")
692 .await?;
693
694 let start = Utc.timestamp_millis_opt(500).single().unwrap();
695 let end = Utc.timestamp_millis_opt(2_500).single().unwrap();
696 let rows = collect_scan_rows(&table, start, end).await?;
697
698 assert_eq!(
699 rows,
700 vec![(1_000, "A".to_string(), 1.0), (2_000, "A".to_string(), 3.0)]
701 );
702 Ok(())
703 }
704
705 #[tokio::test]
706 async fn scan_range_empty_when_no_segments() -> TestResult {
707 let tmp = TempDir::new()?;
708 let location = TableLocation::local(tmp.path());
709 let meta = make_basic_table_meta();
710 let table = TimeSeriesTable::create(location, meta).await?;
711
712 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
713 let end = utc_datetime(2024, 1, 1, 0, 1, 0);
714
715 let mut stream = table.scan_range(start, end).await?;
716 assert!(stream.next().await.is_none());
717 Ok(())
718 }
719
720 #[tokio::test]
721 async fn scan_range_empty_for_zero_row_segment() -> TestResult {
722 let tmp = TempDir::new()?;
723 let location = TableLocation::local(tmp.path());
724 let meta = make_basic_table_meta();
725 let mut table = TimeSeriesTable::create(location, meta).await?;
726
727 let rel = "data/seg-empty.parquet";
728 let path = tmp.path().join(rel);
729 write_arrow_parquet_with_unit(&path, ArrowTimeUnit::Millisecond, &[], &[], &[])?;
730
731 let segment = SegmentMeta {
732 segment_id: SegmentId("seg-empty".to_string()),
733 path: rel.to_string(),
734 format: FileFormat::Parquet,
735 ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
736 ts_max: utc_datetime(2024, 1, 1, 0, 0, 0),
737 row_count: 0,
738 file_size: None,
739 coverage_path: None,
740 };
741
742 table
743 .state
744 .segments
745 .insert(segment.segment_id.clone(), segment);
746
747 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
748 let end = utc_datetime(2024, 1, 1, 0, 1, 0);
749
750 let mut stream = table.scan_range(start, end).await?;
751 assert!(stream.next().await.is_none());
752 Ok(())
753 }
754
755 #[tokio::test]
756 async fn scan_range_all_null_time_filtered_out() -> TestResult {
757 let tmp = TempDir::new()?;
758 let location = TableLocation::local(tmp.path());
759 let meta = make_table_meta_with_unit(LogicalTimestampUnit::Millis);
760 let mut table = TimeSeriesTable::create(location, meta).await?;
761
762 let rel = "data/seg-null-only.parquet";
763 let path = tmp.path().join(rel);
764 write_arrow_parquet_with_unit(
765 &path,
766 ArrowTimeUnit::Millisecond,
767 &[None, None],
768 &["A", "B"],
769 &[1.0, 2.0],
770 )?;
771
772 let segment = SegmentMeta {
773 segment_id: SegmentId("seg-null-only".to_string()),
774 path: rel.to_string(),
775 format: FileFormat::Parquet,
776 ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
777 ts_max: utc_datetime(2024, 1, 1, 0, 0, 1),
778 row_count: 2,
779 file_size: None,
780 coverage_path: None,
781 };
782
783 table
784 .state
785 .segments
786 .insert(segment.segment_id.clone(), segment);
787
788 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
789 let end = utc_datetime(2024, 1, 1, 0, 0, 5);
790
791 let mut stream = table.scan_range(start, end).await?;
792 assert!(stream.next().await.is_none());
793 Ok(())
794 }
795
796 #[tokio::test]
797 async fn scan_range_errors_on_missing_time_column_in_segment() -> TestResult {
798 let tmp = TempDir::new()?;
799 let location = TableLocation::local(tmp.path());
800 let meta = make_basic_table_meta();
801 let mut table = TimeSeriesTable::create(location, meta).await?;
802
803 let rel = "data/seg-scan-no-ts.parquet";
804 let path = tmp.path().join(rel);
805 write_parquet_without_time_column(&path, &["A"], &[1.0])?;
806
807 let segment = SegmentMeta {
808 segment_id: SegmentId("seg-scan-no-ts".to_string()),
809 path: rel.to_string(),
810 format: FileFormat::Parquet,
811 ts_min: utc_datetime(2024, 1, 1, 0, 0, 0),
812 ts_max: utc_datetime(2024, 1, 1, 0, 1, 0),
813 row_count: 1,
814 file_size: None,
815 coverage_path: None,
816 };
817
818 table
819 .state
820 .segments
821 .insert(segment.segment_id.clone(), segment);
822
823 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
824 let end = utc_datetime(2024, 1, 1, 0, 2, 0);
825
826 let mut stream = table.scan_range(start, end).await?;
827 let err = stream.next().await.expect("expected error from scan");
828
829 assert!(matches!(err, Err(TableError::MissingTimeColumn { .. })));
830 Ok(())
831 }
832
833 #[tokio::test]
834 async fn scan_range_errors_on_unsupported_time_type_segment() -> TestResult {
835 let tmp = TempDir::new()?;
836 let location = TableLocation::local(tmp.path());
837 let meta = make_basic_table_meta();
838 let mut table = TimeSeriesTable::create(location, meta).await?;
839
840 let rel = "data/seg-scan-int-ts.parquet";
841 let path = tmp.path().join(rel);
842 write_arrow_parquet_int_time(&path, &[1_000], &["A"], &[1.0])?;
843
844 let segment = SegmentMeta {
845 segment_id: SegmentId("seg-scan-int".to_string()),
846 path: rel.to_string(),
847 format: FileFormat::Parquet,
848 ts_min: utc_datetime(2024, 1, 1, 0, 0, 1),
849 ts_max: utc_datetime(2024, 1, 1, 0, 0, 1),
850 row_count: 1,
851 file_size: None,
852 coverage_path: None,
853 };
854
855 table
856 .state
857 .segments
858 .insert(segment.segment_id.clone(), segment);
859
860 let start = utc_datetime(2024, 1, 1, 0, 0, 0);
861 let end = utc_datetime(2024, 1, 1, 0, 1, 0);
862
863 let mut stream = table.scan_range(start, end).await?;
864 let err = stream.next().await.expect("expected error from scan");
865
866 assert!(matches!(err, Err(TableError::UnsupportedTimeType { .. })));
867 Ok(())
868 }
869
870 #[tokio::test]
871 async fn scan_range_orders_segments_by_ts_min() -> TestResult {
872 let tmp = TempDir::new()?;
873 let location = TableLocation::local(tmp.path());
874 let meta = make_basic_table_meta();
875 let mut table = TimeSeriesTable::create(location, meta).await?;
876
877 let rel_b = "data/seg-overlap-b.parquet";
878 let path_b = tmp.path().join(rel_b);
879 write_test_parquet(
880 &path_b,
881 true,
882 false,
883 &[TestRow {
884 ts_millis: 120_000,
885 symbol: "A",
886 price: 2.0,
887 }],
888 )?;
889
890 let rel_a = "data/seg-overlap-a.parquet";
891 let path_a = tmp.path().join(rel_a);
892 write_test_parquet(
893 &path_a,
894 true,
895 false,
896 &[TestRow {
897 ts_millis: 60_000,
898 symbol: "A",
899 price: 1.0,
900 }],
901 )?;
902
903 table
905 .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel_b, "ts")
906 .await?;
907 table
908 .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel_a, "ts")
909 .await?;
910
911 let start = Utc.timestamp_millis_opt(50_000).single().unwrap();
912 let end = Utc.timestamp_millis_opt(150_000).single().unwrap();
913 let rows = collect_scan_rows(&table, start, end).await?;
914
915 assert_eq!(
916 rows,
917 vec![
918 (60_000, "A".to_string(), 1.0),
919 (120_000, "A".to_string(), 2.0)
920 ]
921 );
922 Ok(())
923 }
924
925 #[tokio::test]
926 async fn scan_range_skips_non_overlapping_segments() -> TestResult {
927 let tmp = TempDir::new()?;
928 let location = TableLocation::local(tmp.path());
929 let meta = make_basic_table_meta();
930 let mut table = TimeSeriesTable::create(location, meta).await?;
931
932 let rel1 = "data/seg-early.parquet";
933 let path1 = tmp.path().join(rel1);
934 write_test_parquet(
935 &path1,
936 true,
937 false,
938 &[TestRow {
939 ts_millis: 1_000,
940 symbol: "A",
941 price: 1.0,
942 }],
943 )?;
944
945 let rel2 = "data/seg-late.parquet";
946 let path2 = tmp.path().join(rel2);
947 write_test_parquet(
948 &path2,
949 true,
950 false,
951 &[TestRow {
952 ts_millis: 70_000,
953 symbol: "A",
954 price: 9.0,
955 }],
956 )?;
957
958 table
959 .append_parquet_segment_with_id(SegmentId("seg-early".to_string()), rel1, "ts")
960 .await?;
961 table
962 .append_parquet_segment_with_id(SegmentId("seg-late".to_string()), rel2, "ts")
963 .await?;
964
965 let start = Utc.timestamp_millis_opt(1_500).single().unwrap();
966 let end = Utc.timestamp_millis_opt(2_000).single().unwrap();
967 let rows = collect_scan_rows(&table, start, end).await?;
968
969 assert_eq!(rows, Vec::new());
970 Ok(())
971 }
972
973 #[tokio::test]
974 async fn scan_range_propagates_parquet_read_error() -> TestResult {
975 let tmp = TempDir::new()?;
976 let location = TableLocation::local(tmp.path());
977 let meta = make_basic_table_meta();
978 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
979
980 let rel = "data/seg-corrupt.parquet";
981 let path = tmp.path().join(rel);
982 write_test_parquet(
983 &path,
984 true,
985 false,
986 &[TestRow {
987 ts_millis: 1_000,
988 symbol: "A",
989 price: 1.0,
990 }],
991 )?;
992
993 table
994 .append_parquet_segment_with_id(SegmentId("seg-corrupt".to_string()), rel, "ts")
995 .await?;
996
997 let f = std::fs::OpenOptions::new().write(true).open(&path)?;
999 f.set_len(4)?;
1000
1001 let start = Utc.timestamp_millis_opt(0).single().unwrap();
1002 let end = Utc.timestamp_millis_opt(2_000).single().unwrap();
1003
1004 let mut stream = table.scan_range(start, end).await?;
1005 let err = stream.next().await.expect("first item should be error");
1006
1007 assert!(matches!(err, Err(TableError::ParquetRead { .. })));
1008 Ok(())
1009 }
1010}