1use std::path::Path;
12use std::time::Instant;
13
14use bytes::Bytes;
15
16use snafu::prelude::*;
17
18use crate::{
19 coverage::serde::coverage_to_bytes,
20 coverage::{
21 io::{CoverageError, write_coverage_sidecar_new_bytes},
22 layout::{
23 segment_coverage_id_v1, segment_coverage_path, table_coverage_id_v1,
24 table_snapshot_path,
25 },
26 },
27 formats::parquet::{
28 coverage::compute_segment_coverage_from_parquet_bytes, logical_schema_from_parquet_bytes,
29 segment_entity_identity_from_parquet_bytes, segment_meta_from_parquet_bytes_with_report,
30 },
31 metadata::schema_compat::ensure_schema_exact_match,
32 storage::{self, StorageError},
33 transaction_log::{
34 LogAction, SegmentId, TableState, segments::segment_id_v1,
35 table_state::TableCoveragePointer,
36 },
37};
38
39use super::{
40 TimeSeriesTable,
41 append_report::{AppendReport, AppendReportBuilder},
42 error::{
43 CoverageOverlapSnafu, EntityMismatchSnafu, ExistingSegmentMissingCoverageSnafu,
44 MissingCanonicalSchemaSnafu, SchemaCompatibilitySnafu, SegmentCoverageSnafu,
45 SegmentEntityIdentitySnafu, SegmentMetaSnafu, StorageSnafu, TableError,
46 TransactionLogSnafu,
47 },
48};
49
50fn ensure_existing_segments_have_coverage(state: &TableState) -> Result<(), TableError> {
51 for seg in state.segments.values() {
52 if seg.coverage_path.is_none() {
53 return ExistingSegmentMissingCoverageSnafu {
54 segment_id: seg.segment_id.clone(),
55 }
56 .fail();
57 }
58 }
59
60 Ok(())
61}
62
63impl TimeSeriesTable {
64 #[allow(dead_code)]
75 async fn append_parquet_segment_with_id_and_bytes(
76 &mut self,
77 segment_id: SegmentId,
78 relative_path: &str,
79 time_column: &str,
80 data: Bytes,
81 ) -> Result<u64, TableError> {
82 self.append_parquet_segment_with_id_and_bytes_inner(
83 segment_id,
84 relative_path,
85 time_column,
86 data,
87 None,
88 )
89 .await
90 }
91
92 async fn append_parquet_segment_with_id_and_bytes_inner(
93 &mut self,
94 segment_id: SegmentId,
95 relative_path: &str,
96 time_column: &str,
97 data: Bytes,
98 mut report: Option<&mut AppendReportBuilder>,
99 ) -> Result<u64, TableError> {
100 let rel_path = Path::new(relative_path);
101 let expected_version = self.state.version;
102 let bucket_spec = self.index.bucket.clone();
103
104 ensure_existing_segments_have_coverage(&self.state)?;
106
107 let step_start = Instant::now();
109 let (mut segment_meta, meta_report) = segment_meta_from_parquet_bytes_with_report(
110 rel_path,
111 segment_id,
112 time_column,
113 data.clone(),
114 )
115 .context(SegmentMetaSnafu)?;
116 if let Some(r) = report.as_mut() {
117 let fields = vec![
118 ("row_groups".to_string(), meta_report.row_groups.to_string()),
119 ("row_count".to_string(), meta_report.row_count.to_string()),
120 ("used_stats".to_string(), meta_report.used_stats.to_string()),
121 (
122 "scanned_rows".to_string(),
123 meta_report.scanned_rows.to_string(),
124 ),
125 ];
126 r.push_step("segment_meta", step_start.elapsed(), fields);
127 }
128
129 let step_start = Instant::now();
130 let segment_schema =
131 logical_schema_from_parquet_bytes(rel_path, data.clone()).context(SegmentMetaSnafu)?;
132 if let Some(r) = report.as_mut() {
133 r.push_step("logical_schema", step_start.elapsed(), Vec::new());
134 }
135
136 let maybe_table_schema = self.state.table_meta.logical_schema.as_ref();
145
146 let mut maybe_updated_meta = match maybe_table_schema {
147 None if expected_version == 1 => {
148 let mut updated_meta = self.state.table_meta.clone();
149 updated_meta.logical_schema = Some(segment_schema.clone());
150 Some(updated_meta)
151 }
152 None => {
153 return MissingCanonicalSchemaSnafu {
154 version: expected_version,
155 }
156 .fail();
157 }
158 Some(table_schema) => {
159 ensure_schema_exact_match(table_schema, &segment_schema, &self.index)
160 .context(SchemaCompatibilitySnafu)?;
161 None
162 }
163 };
164
165 if !self.index.entity_columns.is_empty() {
167 let step_start = Instant::now();
168 let seg_ident = segment_entity_identity_from_parquet_bytes(
169 data.clone(),
170 rel_path,
171 &self.index.entity_columns,
172 )
173 .context(SegmentEntityIdentitySnafu)?;
174 if let Some(r) = report.as_mut() {
175 r.push_step("entity_identity", step_start.elapsed(), Vec::new());
176 }
177
178 match &self.state.table_meta.entity_identity {
179 Some(expected) => {
180 if expected != &seg_ident {
181 return EntityMismatchSnafu {
182 segment_path: relative_path.to_string(),
183 expected: expected.clone(),
184 found: seg_ident,
185 }
186 .fail();
187 }
188 }
189 None => {
190 let updated =
192 maybe_updated_meta.get_or_insert_with(|| self.state.table_meta.clone());
193 updated.entity_identity = Some(seg_ident);
194 }
195 }
196 }
197
198 let step_start = Instant::now();
200 let table_cov = self.load_table_snapshot_coverage_with_heal().await?;
201 if let Some(r) = report.as_mut() {
202 r.push_step("load_table_snapshot", step_start.elapsed(), Vec::new());
203 }
204
205 let step_start = Instant::now();
207 let segment_cov = compute_segment_coverage_from_parquet_bytes(
208 rel_path,
209 time_column,
210 &bucket_spec,
211 data.clone(),
212 )
213 .context(SegmentCoverageSnafu)?;
214 if let Some(r) = report.as_mut() {
215 r.push_step("segment_coverage", step_start.elapsed(), Vec::new());
216 }
217
218 let step_start = Instant::now();
220 let overlap = segment_cov.intersect(&table_cov);
221 let overlap_count = overlap.cardinality();
222 if overlap_count > 0 {
223 let example_bucket = overlap.present().iter().next();
224 return CoverageOverlapSnafu {
225 segment_path: relative_path.to_string(),
226 overlap_count,
227 example_bucket,
228 }
229 .fail();
230 }
231 if let Some(r) = report.as_mut() {
232 r.push_step("overlap_check", step_start.elapsed(), Vec::new());
233 }
234 let seg_cov_bytes =
235 coverage_to_bytes(&segment_cov).map_err(|source| TableError::CoverageSidecar {
236 source: CoverageError::Serde { source },
237 })?;
238
239 let coverage_id = segment_coverage_id_v1(&bucket_spec, time_column, &seg_cov_bytes);
241 let seg_cov_path =
242 segment_coverage_path(&coverage_id).map_err(|source| TableError::CoverageSidecar {
243 source: CoverageError::Layout { source },
244 })?;
245 let step_start = Instant::now();
246 match write_coverage_sidecar_new_bytes(self.location(), &seg_cov_path, &seg_cov_bytes).await
247 {
248 Ok(()) => {}
249 Err(CoverageError::Storage {
250 source: StorageError::AlreadyExists { .. },
251 }) => {
252 }
254 Err(e) => return Err(TableError::CoverageSidecar { source: e }),
255 }
256 if let Some(r) = report.as_mut() {
257 r.push_step("write_segment_sidecar", step_start.elapsed(), Vec::new());
258 }
259
260 let new_version_guess = expected_version + 1;
261
262 let new_table_cov = table_cov.union(&segment_cov);
263
264 let new_snap_cov_bytes =
265 coverage_to_bytes(&new_table_cov).map_err(|source| TableError::CoverageSidecar {
266 source: CoverageError::Serde { source },
267 })?;
268 let snapshot_id = table_coverage_id_v1(&bucket_spec, time_column, &new_snap_cov_bytes);
269
270 let snapshot_path = table_snapshot_path(new_version_guess, &snapshot_id).map_err(|e| {
271 TableError::CoverageSidecar {
272 source: CoverageError::Layout { source: e },
273 }
274 })?;
275
276 let step_start = Instant::now();
277 match write_coverage_sidecar_new_bytes(self.location(), &snapshot_path, &new_snap_cov_bytes)
278 .await
279 {
280 Ok(()) => {}
281 Err(CoverageError::Storage {
282 source: StorageError::AlreadyExists { .. },
283 }) => {
284 }
286 Err(e) => return Err(TableError::CoverageSidecar { source: e }),
287 }
288 if let Some(r) = report.as_mut() {
289 r.push_step("write_snapshot_sidecar", step_start.elapsed(), Vec::new());
290 }
291
292 segment_meta.coverage_path = Some(seg_cov_path.to_string_lossy().to_string());
294
295 let mut actions = Vec::new();
296 if let Some(updated_meta) = maybe_updated_meta.clone() {
297 actions.push(LogAction::UpdateTableMeta(updated_meta));
298 }
299
300 actions.push(LogAction::AddSegment(segment_meta.clone()));
301 actions.push(LogAction::UpdateTableCoverage {
302 bucket_spec: bucket_spec.clone(),
303 coverage_path: snapshot_path.to_string_lossy().to_string(),
304 });
305
306 let step_start = Instant::now();
307 let new_version = self
308 .log
309 .commit_with_expected_version(expected_version, actions)
310 .await
311 .context(TransactionLogSnafu)?;
312 if let Some(r) = report.as_mut() {
313 r.push_step("commit_log", step_start.elapsed(), Vec::new());
314 }
315
316 assert_eq!(
322 new_version, new_version_guess,
323 "transaction log returned unexpected version: expected {}, got {}",
324 new_version_guess, new_version
325 );
326
327 let step_start = Instant::now();
329 self.state.version = new_version;
330
331 if let Some(updated_meta) = maybe_updated_meta {
332 self.state.table_meta = updated_meta
333 }
334
335 self.state
336 .segments
337 .insert(segment_meta.segment_id.clone(), segment_meta);
338
339 self.state.table_coverage = Some(TableCoveragePointer {
341 bucket_spec,
342 coverage_path: snapshot_path.to_string_lossy().to_string(),
343 version: new_version,
344 });
345 if let Some(r) = report.as_mut() {
346 r.push_step("state_update", step_start.elapsed(), Vec::new());
347 }
348
349 Ok(new_version)
350 }
351
352 pub async fn append_parquet_segment_with_id(
371 &mut self,
372 segment_id: SegmentId,
373 relative_path: &str,
374 time_column: &str,
375 ) -> Result<u64, TableError> {
376 let rel_path = Path::new(relative_path);
377
378 let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
379 .await
380 .context(StorageSnafu)?;
381
382 self.append_parquet_segment_with_id_and_bytes_inner(
383 segment_id,
384 relative_path,
385 time_column,
386 Bytes::from(bytes),
387 None,
388 )
389 .await
390 }
391
392 pub async fn append_parquet_segment(
400 &mut self,
401 relative_path: &str,
402 time_column: &str,
403 ) -> Result<u64, TableError> {
404 let rel_path = Path::new(relative_path);
405 let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
406 .await
407 .context(StorageSnafu)?;
408 let data = Bytes::from(bytes);
409
410 let segment_id = segment_id_v1(relative_path, &data);
411 self.append_parquet_segment_with_id_and_bytes_inner(
412 segment_id,
413 relative_path,
414 time_column,
415 data,
416 None,
417 )
418 .await
419 }
420
421 pub async fn append_parquet_segment_with_report(
423 &mut self,
424 relative_path: &str,
425 time_column: &str,
426 ) -> Result<(u64, AppendReport), TableError> {
427 let mut report = AppendReportBuilder::new();
428 report.set_context("relative_path", relative_path);
429 report.set_context("time_column", time_column);
430
431 let rel_path = Path::new(relative_path);
432 let read_start = Instant::now();
433 let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
434 .await
435 .context(StorageSnafu)?;
436 report.push_step("read_parquet_bytes", read_start.elapsed(), Vec::new());
437
438 report.set_context("bytes_len", bytes.len().to_string());
439 let data = Bytes::from(bytes);
440
441 let segment_id = segment_id_v1(relative_path, &data);
442 report.set_context("segment_id", segment_id.0.clone());
443
444 let version = self
445 .append_parquet_segment_with_id_and_bytes_inner(
446 segment_id,
447 relative_path,
448 time_column,
449 data,
450 Some(&mut report),
451 )
452 .await?;
453
454 Ok((version, report.finish()))
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::super::test_util::*;
461 use super::*;
462 use crate::coverage::Coverage;
463 use crate::coverage::io::read_coverage_sidecar;
464 use crate::metadata::logical_schema::{LogicalDataType, LogicalTimestampUnit};
465 use crate::metadata::time_column::TimeColumnError;
466 use crate::storage::layout;
467 use crate::storage::{StorageLocation, TableLocation};
468 use crate::transaction_log::segments::{SegmentError, SegmentMetaError};
469 use crate::transaction_log::{
470 Commit, CommitError, TableKind, TableMeta, TimeBucket, TimeIndexSpec,
471 };
472 use std::collections::BTreeMap;
473 use tempfile::TempDir;
474
475 #[tokio::test]
476 async fn append_parquet_segment_with_id_missing_time_column_errors() -> TestResult {
477 let tmp = TempDir::new()?;
478 let location = TableLocation::local(tmp.path());
479 let meta = make_basic_table_meta();
480 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
481
482 let rel = "data/seg-no-ts.parquet";
483 let path = tmp.path().join(rel);
484 write_parquet_without_time_column(&path, &["A"], &[1.0])?;
485
486 let err = table
487 .append_parquet_segment_with_id(SegmentId("seg-no-ts".to_string()), rel, "ts")
488 .await
489 .expect_err("expected missing time column");
490
491 match err {
492 TableError::SegmentMeta { source } => {
493 assert!(matches!(
494 source,
495 SegmentError::Meta {
496 source: SegmentMetaError::TimeColumn {
497 source: TimeColumnError::Missing { .. },
498 ..
499 }
500 },
501 ));
502 }
503 other => panic!("unexpected error: {other:?}"),
504 }
505
506 Ok(())
507 }
508
509 #[tokio::test]
510 async fn append_parquet_segment_with_id_updates_state_and_log() -> TestResult {
511 let tmp = TempDir::new()?;
512 let location = TableLocation::local(tmp.path());
513 let meta = make_basic_table_meta();
514
515 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
516
517 let rel_path = "data/seg1.parquet";
518 let abs_path = tmp.path().join(rel_path);
519 write_test_parquet(
520 &abs_path,
521 true,
522 false,
523 &[TestRow {
524 ts_millis: 1_000,
525 symbol: "A",
526 price: 10.0,
527 }],
528 )?;
529
530 let new_version = table
531 .append_parquet_segment_with_id(SegmentId("seg-1".to_string()), rel_path, "ts")
532 .await?;
533
534 assert_eq!(new_version, 2);
535 assert_eq!(table.state.version, 2);
536 let seg = table
537 .state
538 .segments
539 .get(&SegmentId("seg-1".to_string()))
540 .expect("segment present");
541 assert_eq!(seg.path, rel_path);
542 assert_eq!(seg.row_count, 1);
543 assert_eq!(seg.ts_min.timestamp_millis(), 1_000);
544 assert_eq!(seg.ts_max.timestamp_millis(), 1_000);
545
546 let commit_path = tmp.path().join(layout::commit_rel_path(2));
547 assert!(commit_path.is_file());
548 let current =
549 tokio::fs::read_to_string(tmp.path().join(layout::current_rel_path())).await?;
550 assert_eq!(current.trim(), "2");
551
552 let reopened = TimeSeriesTable::open(location).await?;
553 assert!(
554 reopened
555 .state
556 .segments
557 .contains_key(&SegmentId("seg-1".to_string()))
558 );
559 Ok(())
560 }
561
562 #[tokio::test]
563 async fn append_pins_entity_identity_and_commits_actions() -> TestResult {
564 let tmp = TempDir::new()?;
565 let location = TableLocation::local(tmp.path());
566 let meta = make_basic_table_meta();
567 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
568
569 let rel_path = "data/seg-entity-a.parquet";
570 let abs_path = tmp.path().join(rel_path);
571 write_test_parquet(
572 &abs_path,
573 true,
574 false,
575 &[TestRow {
576 ts_millis: 1_000,
577 symbol: "A",
578 price: 10.0,
579 }],
580 )?;
581
582 let version = table
583 .append_parquet_segment_with_id(SegmentId("seg-entity-a".to_string()), rel_path, "ts")
584 .await?;
585 assert_eq!(version, 2);
586
587 let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
588 assert_eq!(
589 table.state.table_meta.entity_identity,
590 Some(expected_identity.clone())
591 );
592
593 let commit_path = tmp.path().join(layout::commit_rel_path(2));
594 let contents = tokio::fs::read_to_string(&commit_path).await?;
595 let commit: Commit = serde_json::from_str(&contents)?;
596
597 assert_eq!(commit.actions.len(), 3);
598 match &commit.actions[0] {
599 LogAction::UpdateTableMeta(meta) => {
600 assert_eq!(meta.entity_identity.as_ref(), Some(&expected_identity));
601 }
602 other => panic!("expected UpdateTableMeta, got {other:?}"),
603 }
604 assert!(matches!(commit.actions[1], LogAction::AddSegment(_)));
605 assert!(matches!(
606 commit.actions[2],
607 LogAction::UpdateTableCoverage { .. }
608 ));
609
610 Ok(())
611 }
612
613 #[tokio::test]
614 async fn append_allows_same_entity_identity() -> TestResult {
615 let tmp = TempDir::new()?;
616 let location = TableLocation::local(tmp.path());
617 let meta = make_basic_table_meta();
618 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
619
620 let rel_path1 = "data/seg-entity-a-1.parquet";
621 let abs_path1 = tmp.path().join(rel_path1);
622 write_test_parquet(
623 &abs_path1,
624 true,
625 false,
626 &[TestRow {
627 ts_millis: 1_000,
628 symbol: "A",
629 price: 10.0,
630 }],
631 )?;
632
633 table
634 .append_parquet_segment_with_id(
635 SegmentId("seg-entity-a-1".to_string()),
636 rel_path1,
637 "ts",
638 )
639 .await?;
640
641 let rel_path2 = "data/seg-entity-a-2.parquet";
642 let abs_path2 = tmp.path().join(rel_path2);
643 write_test_parquet(
644 &abs_path2,
645 true,
646 false,
647 &[TestRow {
648 ts_millis: 120_000,
649 symbol: "A",
650 price: 20.0,
651 }],
652 )?;
653
654 let version = table
655 .append_parquet_segment_with_id(
656 SegmentId("seg-entity-a-2".to_string()),
657 rel_path2,
658 "ts",
659 )
660 .await?;
661 assert_eq!(version, 3);
662
663 let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
664 assert_eq!(
665 table.state.table_meta.entity_identity,
666 Some(expected_identity.clone())
667 );
668
669 let commit_path = tmp.path().join(layout::commit_rel_path(3));
670 let contents = tokio::fs::read_to_string(&commit_path).await?;
671 let commit: Commit = serde_json::from_str(&contents)?;
672 assert_eq!(commit.actions.len(), 2);
673 assert!(matches!(commit.actions[0], LogAction::AddSegment(_)));
674 assert!(matches!(
675 commit.actions[1],
676 LogAction::UpdateTableCoverage { .. }
677 ));
678
679 Ok(())
680 }
681
682 #[tokio::test]
683 async fn append_rejects_mismatched_entity_identity() -> TestResult {
684 let tmp = TempDir::new()?;
685 let location = TableLocation::local(tmp.path());
686 let meta = make_basic_table_meta();
687 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
688
689 let rel_path1 = "data/seg-entity-a.parquet";
690 let abs_path1 = tmp.path().join(rel_path1);
691 write_test_parquet(
692 &abs_path1,
693 true,
694 false,
695 &[TestRow {
696 ts_millis: 1_000,
697 symbol: "A",
698 price: 10.0,
699 }],
700 )?;
701 table
702 .append_parquet_segment_with_id(SegmentId("seg-entity-a".to_string()), rel_path1, "ts")
703 .await?;
704
705 let rel_path2 = "data/seg-entity-b.parquet";
706 let abs_path2 = tmp.path().join(rel_path2);
707 write_test_parquet(
708 &abs_path2,
709 true,
710 false,
711 &[TestRow {
712 ts_millis: 120_000,
713 symbol: "B",
714 price: 20.0,
715 }],
716 )?;
717
718 let err = table
719 .append_parquet_segment_with_id(SegmentId("seg-entity-b".to_string()), rel_path2, "ts")
720 .await
721 .expect_err("expected entity identity mismatch");
722
723 let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
724 let found_identity = BTreeMap::from([("symbol".to_string(), "B".to_string())]);
725
726 match err {
727 TableError::EntityMismatch {
728 expected, found, ..
729 } => {
730 assert_eq!(expected, expected_identity);
731 assert_eq!(found, found_identity);
732 }
733 other => panic!("unexpected error: {other:?}"),
734 }
735
736 let commit_path = tmp.path().join(layout::commit_rel_path(3));
737 assert!(!commit_path.exists());
738
739 Ok(())
740 }
741
742 #[tokio::test]
743 async fn append_parquet_segment_with_id_adopts_schema_when_missing() -> TestResult {
744 let tmp = TempDir::new()?;
745 let location = TableLocation::local(tmp.path());
746
747 let index = TimeIndexSpec {
748 timestamp_column: "ts".to_string(),
749 entity_columns: vec![],
750 bucket: TimeBucket::Minutes(1),
751 timezone: None,
752 };
753 let meta = TableMeta {
754 kind: TableKind::TimeSeries(index),
755 logical_schema: None,
756 created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
757 format_version: 1,
758 entity_identity: None,
759 };
760
761 let mut table = TimeSeriesTable::create(location, meta).await?;
762
763 let rel_path = "data/seg-adopt.parquet";
764 let abs_path = tmp.path().join(rel_path);
765 write_test_parquet(
766 &abs_path,
767 true,
768 false,
769 &[TestRow {
770 ts_millis: 5_000,
771 symbol: "B",
772 price: 20.0,
773 }],
774 )?;
775
776 let new_version = table
777 .append_parquet_segment_with_id(SegmentId("seg-adopt".to_string()), rel_path, "ts")
778 .await?;
779
780 assert_eq!(new_version, 2);
781 let schema = table
782 .state
783 .table_meta
784 .logical_schema
785 .as_ref()
786 .expect("schema adopted");
787 let names: Vec<_> = schema.columns().iter().map(|c| c.name.as_str()).collect();
788 assert_eq!(names, vec!["ts", "symbol", "price"]);
789 let ts_col = &schema.columns()[0];
790 assert_eq!(
791 ts_col.data_type,
792 LogicalDataType::Timestamp {
793 unit: LogicalTimestampUnit::Millis,
794 timezone: None,
795 }
796 );
797 Ok(())
798 }
799
800 #[tokio::test]
801 async fn append_parquet_segment_with_id_rejects_schema_mismatch() -> TestResult {
802 let tmp = TempDir::new()?;
803 let location = TableLocation::local(tmp.path());
804 let meta = make_basic_table_meta();
805 let mut table = TimeSeriesTable::create(location, meta).await?;
806
807 let rel_path = "data/seg-missing-symbol.parquet";
808 let abs_path = tmp.path().join(rel_path);
809 write_test_parquet(
810 &abs_path,
811 false,
812 false,
813 &[TestRow {
814 ts_millis: 10_000,
815 symbol: "C",
816 price: 30.0,
817 }],
818 )?;
819
820 let err = table
821 .append_parquet_segment_with_id(SegmentId("seg-bad".to_string()), rel_path, "ts")
822 .await
823 .expect_err("expected schema mismatch");
824
825 match err {
826 TableError::SchemaCompatibility { source } => {
827 assert!(matches!(
828 source,
829 crate::metadata::schema_compat::SchemaCompatibilityError::MissingColumn { .. }
830 ));
831 }
832 other => panic!("unexpected error: {other:?}"),
833 }
834 Ok(())
835 }
836
837 #[tokio::test]
838 async fn append_parquet_segment_with_id_allows_same_id_with_nonoverlapping_coverage()
839 -> TestResult {
840 let tmp = TempDir::new()?;
841 let location = TableLocation::local(tmp.path());
842 let meta = make_basic_table_meta();
843 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
844
845 let rel_path = "data/dup.parquet";
846 let abs_path = tmp.path().join(rel_path);
847
848 write_test_parquet(
849 &abs_path,
850 true,
851 false,
852 &[TestRow {
853 ts_millis: 1_000,
854 symbol: "A",
855 price: 10.0,
856 }],
857 )?;
858 let v2 = table
859 .append_parquet_segment_with_id(SegmentId("seg-dup".to_string()), rel_path, "ts")
860 .await?;
861 assert_eq!(v2, 2);
862 assert_eq!(table.state.version, 2);
863 assert_eq!(
864 table
865 .state
866 .segments
867 .get(&SegmentId("seg-dup".to_string()))
868 .unwrap()
869 .row_count,
870 1
871 );
872
873 write_test_parquet(
875 &abs_path,
876 true,
877 false,
878 &[
879 TestRow {
880 ts_millis: 120_000,
881 symbol: "A",
882 price: 20.0,
883 },
884 TestRow {
885 ts_millis: 121_000,
886 symbol: "A",
887 price: 30.0,
888 },
889 ],
890 )?;
891
892 let v3 = table
893 .append_parquet_segment_with_id(SegmentId("seg-dup".to_string()), rel_path, "ts")
894 .await?;
895 assert_eq!(v3, 3);
896 assert_eq!(table.state.version, 3);
897
898 let seg = table
899 .state
900 .segments
901 .get(&SegmentId("seg-dup".to_string()))
902 .expect("segment retained after duplicate append");
903 assert_eq!(seg.row_count, 2);
904 assert_eq!(seg.ts_min.timestamp_millis(), 120_000);
905 assert_eq!(seg.ts_max.timestamp_millis(), 121_000);
906
907 assert!(tmp.path().join(layout::commit_rel_path(3)).is_file());
908 Ok(())
909 }
910
911 #[tokio::test]
912 async fn append_parquet_segment_generates_id_and_updates_snapshot() -> TestResult {
913 let tmp = TempDir::new()?;
914 let location = TableLocation::local(tmp.path());
915 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
916
917 let rel1 = "data/seg-auto-1.parquet";
918 let rel2 = "data/seg-auto-2.parquet";
919 let path1 = tmp.path().join(rel1);
920 let path2 = tmp.path().join(rel2);
921
922 write_test_parquet(
923 &path1,
924 true,
925 false,
926 &[
927 TestRow {
928 ts_millis: 1_000,
929 symbol: "A",
930 price: 10.0,
931 },
932 TestRow {
933 ts_millis: 2_000,
934 symbol: "A",
935 price: 20.0,
936 },
937 ],
938 )?;
939 write_test_parquet(
940 &path2,
941 true,
942 false,
943 &[
944 TestRow {
945 ts_millis: 120_000,
946 symbol: "A",
947 price: 30.0,
948 },
949 TestRow {
950 ts_millis: 121_000,
951 symbol: "A",
952 price: 40.0,
953 },
954 ],
955 )?;
956
957 let v2 = table.append_parquet_segment(rel1, "ts").await?;
958 let v3 = table.append_parquet_segment(rel2, "ts").await?;
959 assert_eq!(v2, 2);
960 assert_eq!(v3, 3);
961
962 let data1 = Bytes::from(tokio::fs::read(&path1).await?);
963 let data2 = Bytes::from(tokio::fs::read(&path2).await?);
964
965 let expected_id1 = segment_id_v1(rel1, &data1);
966 let expected_id2 = segment_id_v1(rel2, &data2);
967
968 let seg1 = table
969 .state
970 .segments
971 .get(&expected_id1)
972 .expect("segment 1 present");
973 let seg2 = table
974 .state
975 .segments
976 .get(&expected_id2)
977 .expect("segment 2 present");
978 assert!(seg1.coverage_path.is_some());
979 assert!(seg2.coverage_path.is_some());
980
981 let bucket_spec = table.index_spec().bucket.clone();
982
983 let cov1 = compute_segment_coverage_from_parquet_bytes(
984 Path::new(rel1),
985 "ts",
986 &bucket_spec,
987 data1.clone(),
988 )?;
989 let cov2 = compute_segment_coverage_from_parquet_bytes(
990 Path::new(rel2),
991 "ts",
992 &bucket_spec,
993 data2.clone(),
994 )?;
995 let expected_snapshot = cov1.union(&cov2);
996
997 let ptr = table
998 .state
999 .table_coverage
1000 .as_ref()
1001 .expect("table snapshot pointer present after append");
1002 assert_eq!(ptr.version, v3);
1003 assert_eq!(ptr.bucket_spec, bucket_spec);
1004
1005 let snapshot_cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1006
1007 assert_eq!(snapshot_cov.present(), expected_snapshot.present());
1008 Ok(())
1009 }
1010
1011 #[tokio::test]
1012 async fn append_parquet_segment_rejects_overlap() -> TestResult {
1013 let tmp = TempDir::new()?;
1014 let location = TableLocation::local(tmp.path());
1015 let mut table = TimeSeriesTable::create(location, make_basic_table_meta()).await?;
1016
1017 let rel1 = "data/seg-overlap-a.parquet";
1018 let rel2 = "data/seg-overlap-b.parquet";
1019 let path1 = tmp.path().join(rel1);
1020 let path2 = tmp.path().join(rel2);
1021
1022 write_test_parquet(
1023 &path1,
1024 true,
1025 false,
1026 &[TestRow {
1027 ts_millis: 1_000,
1028 symbol: "A",
1029 price: 10.0,
1030 }],
1031 )?;
1032 write_test_parquet(
1033 &path2,
1034 true,
1035 false,
1036 &[TestRow {
1037 ts_millis: 1_500,
1038 symbol: "A",
1039 price: 20.0,
1040 }],
1041 )?;
1042
1043 table.append_parquet_segment(rel1, "ts").await?;
1044
1045 let err = table
1046 .append_parquet_segment(rel2, "ts")
1047 .await
1048 .expect_err("overlapping append should fail");
1049
1050 assert!(matches!(err, TableError::CoverageOverlap { .. }));
1051 Ok(())
1052 }
1053
1054 #[tokio::test]
1055 async fn append_parquet_segment_snapshot_survives_reopen() -> TestResult {
1056 let tmp = TempDir::new()?;
1057 let location = TableLocation::local(tmp.path());
1058 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1059
1060 let rel1 = "data/seg-reopen-a.parquet";
1061 let rel2 = "data/seg-reopen-b.parquet";
1062 let path1 = tmp.path().join(rel1);
1063 let path2 = tmp.path().join(rel2);
1064
1065 write_test_parquet(
1066 &path1,
1067 true,
1068 false,
1069 &[TestRow {
1070 ts_millis: 1_000,
1071 symbol: "A",
1072 price: 10.0,
1073 }],
1074 )?;
1075 write_test_parquet(
1076 &path2,
1077 true,
1078 false,
1079 &[TestRow {
1080 ts_millis: 120_000,
1081 symbol: "A",
1082 price: 20.0,
1083 }],
1084 )?;
1085
1086 table.append_parquet_segment(rel1, "ts").await?;
1087 table.append_parquet_segment(rel2, "ts").await?;
1088
1089 let reopened = TimeSeriesTable::open(location.clone()).await?;
1090 let ptr = reopened
1091 .state()
1092 .table_coverage
1093 .as_ref()
1094 .expect("table snapshot pointer present after reopen");
1095
1096 let bucket_spec = reopened.index_spec().bucket.clone();
1097 assert_eq!(ptr.bucket_spec, bucket_spec);
1098
1099 let data1 = Bytes::from(tokio::fs::read(&path1).await?);
1100 let data2 = Bytes::from(tokio::fs::read(&path2).await?);
1101
1102 let cov1 = compute_segment_coverage_from_parquet_bytes(
1103 Path::new(rel1),
1104 "ts",
1105 &bucket_spec,
1106 data1,
1107 )?;
1108 let cov2 = compute_segment_coverage_from_parquet_bytes(
1109 Path::new(rel2),
1110 "ts",
1111 &bucket_spec,
1112 data2,
1113 )?;
1114 let expected = cov1.union(&cov2);
1115
1116 let snapshot_cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1117 assert_eq!(snapshot_cov.present(), expected.present());
1118 Ok(())
1119 }
1120
1121 #[tokio::test]
1122 async fn load_snapshot_recovers_when_missing_file() -> TestResult {
1123 let tmp = TempDir::new()?;
1124 let location = TableLocation::local(tmp.path());
1125 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1126
1127 let rel1 = "data/seg-missing-snap-a.parquet";
1129 let rel2 = "data/seg-missing-snap-b.parquet";
1130 let path1 = tmp.path().join(rel1);
1131 let path2 = tmp.path().join(rel2);
1132 write_test_parquet(
1133 &path1,
1134 true,
1135 false,
1136 &[TestRow {
1137 ts_millis: 1_000,
1138 symbol: "A",
1139 price: 10.0,
1140 }],
1141 )?;
1142 write_test_parquet(
1143 &path2,
1144 true,
1145 false,
1146 &[TestRow {
1147 ts_millis: 120_000,
1148 symbol: "A",
1149 price: 20.0,
1150 }],
1151 )?;
1152
1153 table.append_parquet_segment(rel1, "ts").await?;
1154 table.append_parquet_segment(rel2, "ts").await?;
1155
1156 let state = table.state.clone();
1157 let ptr = state
1158 .table_coverage
1159 .as_ref()
1160 .expect("snapshot pointer present");
1161 let snapshot_abs = match &location.as_ref() {
1162 StorageLocation::Local(root) => root.join(&ptr.coverage_path),
1163 };
1164
1165 tokio::fs::remove_file(&snapshot_abs).await?;
1166
1167 let recovered = table.load_table_snapshot_coverage_with_heal().await?;
1168
1169 let mut expected = Coverage::empty();
1170 for seg in state.segments.values() {
1171 let cov_path = seg.coverage_path.as_ref().expect("coverage path");
1172 let cov = read_coverage_sidecar(&location, Path::new(cov_path)).await?;
1173 expected.union_inplace(&cov);
1174 }
1175
1176 assert_eq!(recovered.present(), expected.present());
1177 Ok(())
1178 }
1179
1180 #[tokio::test]
1181 async fn load_snapshot_recovers_when_corrupt_file() -> TestResult {
1182 let tmp = TempDir::new()?;
1183 let location = TableLocation::local(tmp.path());
1184 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1185
1186 let rel1 = "data/seg-corrupt-snap-a.parquet";
1187 let rel2 = "data/seg-corrupt-snap-b.parquet";
1188 let path1 = tmp.path().join(rel1);
1189 let path2 = tmp.path().join(rel2);
1190 write_test_parquet(
1191 &path1,
1192 true,
1193 false,
1194 &[TestRow {
1195 ts_millis: 1_000,
1196 symbol: "A",
1197 price: 10.0,
1198 }],
1199 )?;
1200 write_test_parquet(
1201 &path2,
1202 true,
1203 false,
1204 &[TestRow {
1205 ts_millis: 120_000,
1206 symbol: "A",
1207 price: 20.0,
1208 }],
1209 )?;
1210
1211 table.append_parquet_segment(rel1, "ts").await?;
1212 table.append_parquet_segment(rel2, "ts").await?;
1213
1214 let state = table.state.clone();
1215 let ptr = state
1216 .table_coverage
1217 .as_ref()
1218 .expect("snapshot pointer present");
1219 let snapshot_abs = match &location.as_ref() {
1220 StorageLocation::Local(root) => root.join(&ptr.coverage_path),
1221 };
1222
1223 tokio::fs::write(&snapshot_abs, b"garbage").await?;
1224
1225 let recovered = table.load_table_snapshot_coverage_with_heal().await?;
1226
1227 let mut expected = Coverage::empty();
1228 for seg in state.segments.values() {
1229 let cov_path = seg.coverage_path.as_ref().expect("coverage path");
1230 let cov = read_coverage_sidecar(&location, Path::new(cov_path)).await?;
1231 expected.union_inplace(&cov);
1232 }
1233
1234 assert_eq!(recovered.present(), expected.present());
1235 Ok(())
1236 }
1237
1238 #[tokio::test]
1239 async fn load_snapshot_errors_when_segment_missing_coverage_path() -> TestResult {
1240 let tmp = TempDir::new()?;
1241 let location = TableLocation::local(tmp.path());
1242 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1243
1244 let rel1 = "data/seg-missing-cov-path.parquet";
1245 let path1 = tmp.path().join(rel1);
1246 write_test_parquet(
1247 &path1,
1248 true,
1249 false,
1250 &[TestRow {
1251 ts_millis: 1_000,
1252 symbol: "A",
1253 price: 10.0,
1254 }],
1255 )?;
1256
1257 table.append_parquet_segment(rel1, "ts").await?;
1258
1259 let mut state = table.state.clone();
1260 state.table_coverage = None;
1261
1262 let seg_id = state
1263 .segments
1264 .keys()
1265 .next()
1266 .expect("segment present")
1267 .clone();
1268 state
1269 .segments
1270 .get_mut(&seg_id)
1271 .expect("segment present")
1272 .coverage_path = None;
1273
1274 table.state = state;
1276
1277 let err = table
1278 .load_table_snapshot_coverage_with_heal()
1279 .await
1280 .expect_err("missing coverage_path should error");
1281
1282 assert!(matches!(
1283 err,
1284 TableError::ExistingSegmentMissingCoverage { .. }
1285 ));
1286 Ok(())
1287 }
1288
1289 #[tokio::test]
1290 async fn load_snapshot_errors_when_segment_sidecar_corrupt() -> TestResult {
1291 let tmp = TempDir::new()?;
1292 let location = TableLocation::local(tmp.path());
1293 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1294
1295 let rel1 = "data/seg-corrupt-sidecar.parquet";
1296 let rel2 = "data/seg-corrupt-sidecar-ok.parquet";
1297 let path1 = tmp.path().join(rel1);
1298 let path2 = tmp.path().join(rel2);
1299 write_test_parquet(
1300 &path1,
1301 true,
1302 false,
1303 &[TestRow {
1304 ts_millis: 1_000,
1305 symbol: "A",
1306 price: 10.0,
1307 }],
1308 )?;
1309 write_test_parquet(
1310 &path2,
1311 true,
1312 false,
1313 &[TestRow {
1314 ts_millis: 120_000,
1315 symbol: "A",
1316 price: 20.0,
1317 }],
1318 )?;
1319
1320 table.append_parquet_segment(rel1, "ts").await?;
1321 table.append_parquet_segment(rel2, "ts").await?;
1322
1323 let mut state = table.state.clone();
1324 state.table_coverage = None;
1325 let (corrupt_seg_id, corrupt_cov_path) = state
1326 .segments
1327 .iter()
1328 .next()
1329 .map(|(id, meta)| {
1330 (
1331 id.clone(),
1332 meta.coverage_path.as_ref().expect("coverage path").clone(),
1333 )
1334 })
1335 .expect("at least one segment");
1336 table.state = state;
1337
1338 let corrupt_abs = match &location.as_ref() {
1339 StorageLocation::Local(root) => root.join(&corrupt_cov_path),
1340 };
1341 tokio::fs::write(&corrupt_abs, b"not a coverage bitmap").await?;
1342
1343 let err = table
1344 .load_table_snapshot_coverage_with_heal()
1345 .await
1346 .expect_err("corrupt sidecar should error");
1347
1348 match err {
1349 TableError::SegmentCoverageSidecarRead {
1350 segment_id,
1351 coverage_path,
1352 ..
1353 } => {
1354 assert_eq!(segment_id, corrupt_seg_id);
1355 assert_eq!(coverage_path, corrupt_cov_path);
1356 }
1357 other => panic!("unexpected error: {other:?}"),
1358 }
1359
1360 Ok(())
1361 }
1362
1363 #[tokio::test]
1364 async fn append_parquet_segment_with_id_conflict_returns_error() -> TestResult {
1365 let tmp = TempDir::new()?;
1366 let location = TableLocation::local(tmp.path());
1367 let meta = make_basic_table_meta();
1368 let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
1369
1370 let rel_path = "data/conflict.parquet";
1371 let abs_path = tmp.path().join(rel_path);
1372 write_test_parquet(
1373 &abs_path,
1374 true,
1375 false,
1376 &[TestRow {
1377 ts_millis: 10_000,
1378 symbol: "X",
1379 price: 100.0,
1380 }],
1381 )?;
1382
1383 table
1385 .log
1386 .commit_with_expected_version(1, vec![])
1387 .await
1388 .expect("external commit succeeds");
1389
1390 let err = table
1391 .append_parquet_segment_with_id(SegmentId("seg-conflict".to_string()), rel_path, "ts")
1392 .await
1393 .expect_err("expected conflict due to stale version");
1394
1395 match err {
1396 TableError::TransactionLog { source } => {
1397 assert!(matches!(
1398 source,
1399 CommitError::Conflict {
1400 expected: 1,
1401 found: 2,
1402 ..
1403 }
1404 ));
1405 }
1406 other => panic!("unexpected error: {other:?}"),
1407 }
1408 Ok(())
1409 }
1410
1411 #[tokio::test]
1412 async fn append_fails_when_existing_segment_missing_coverage_path() -> TestResult {
1413 let tmp = TempDir::new()?;
1414 let location = TableLocation::local(tmp.path());
1415 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1416
1417 let rel1 = "data/seg-missing-cov.parquet";
1418 let rel2 = "data/seg-next.parquet";
1419 let path1 = tmp.path().join(rel1);
1420 let path2 = tmp.path().join(rel2);
1421
1422 write_test_parquet(
1423 &path1,
1424 true,
1425 false,
1426 &[TestRow {
1427 ts_millis: 1_000,
1428 symbol: "A",
1429 price: 10.0,
1430 }],
1431 )?;
1432 write_test_parquet(
1433 &path2,
1434 true,
1435 false,
1436 &[TestRow {
1437 ts_millis: 120_000,
1438 symbol: "A",
1439 price: 20.0,
1440 }],
1441 )?;
1442
1443 table
1444 .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1445 .await?;
1446
1447 let seg = table
1449 .state
1450 .segments
1451 .get_mut(&SegmentId("seg-a".to_string()))
1452 .expect("segment present");
1453 seg.coverage_path = None;
1454
1455 let err = table
1456 .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1457 .await
1458 .expect_err("append should fail when existing segment lacks coverage");
1459
1460 assert!(matches!(
1461 err,
1462 TableError::ExistingSegmentMissingCoverage { .. }
1463 ));
1464 Ok(())
1465 }
1466
1467 #[tokio::test]
1468 async fn append_recovers_when_table_snapshot_pointer_missing() -> TestResult {
1473 let tmp = TempDir::new()?;
1474 let location = TableLocation::local(tmp.path());
1475 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1476
1477 let rel1 = "data/seg-no-pointer-a.parquet";
1478 let rel2 = "data/seg-no-pointer-b.parquet";
1479 let path1 = tmp.path().join(rel1);
1480 let path2 = tmp.path().join(rel2);
1481
1482 write_test_parquet(
1483 &path1,
1484 true,
1485 false,
1486 &[TestRow {
1487 ts_millis: 1_000,
1488 symbol: "A",
1489 price: 10.0,
1490 }],
1491 )?;
1492 write_test_parquet(
1493 &path2,
1494 true,
1495 false,
1496 &[TestRow {
1497 ts_millis: 120_000,
1498 symbol: "A",
1499 price: 20.0,
1500 }],
1501 )?;
1502
1503 table
1504 .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1505 .await?;
1506
1507 table.state.table_coverage = None;
1509
1510 table
1511 .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1512 .await?;
1513
1514 let ptr = table
1516 .state
1517 .table_coverage
1518 .as_ref()
1519 .expect("snapshot pointer restored");
1520
1521 let cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1522
1523 let mut expected = Coverage::empty();
1524 for seg in table.state.segments.values() {
1525 let path = seg.coverage_path.as_ref().expect("coverage path");
1526 let seg_cov = read_coverage_sidecar(&location, Path::new(path)).await?;
1527 expected.union_inplace(&seg_cov);
1528 }
1529
1530 assert_eq!(cov.present(), expected.present());
1531 Ok(())
1532 }
1533
1534 #[tokio::test]
1535 async fn append_fails_when_table_snapshot_bucket_mismatches_index() -> TestResult {
1536 let tmp = TempDir::new()?;
1537 let location = TableLocation::local(tmp.path());
1538 let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1539
1540 let rel1 = "data/seg-bucket-a.parquet";
1541 let rel2 = "data/seg-bucket-b.parquet";
1542 let path1 = tmp.path().join(rel1);
1543 let path2 = tmp.path().join(rel2);
1544
1545 write_test_parquet(
1546 &path1,
1547 true,
1548 false,
1549 &[TestRow {
1550 ts_millis: 1_000,
1551 symbol: "A",
1552 price: 10.0,
1553 }],
1554 )?;
1555 write_test_parquet(
1556 &path2,
1557 true,
1558 false,
1559 &[TestRow {
1560 ts_millis: 120_000,
1561 symbol: "A",
1562 price: 20.0,
1563 }],
1564 )?;
1565
1566 table
1567 .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1568 .await?;
1569
1570 let bad_bucket = TimeBucket::Hours(1);
1572 let ptr = table
1573 .state
1574 .table_coverage
1575 .as_ref()
1576 .expect("pointer present")
1577 .clone();
1578 table.state.table_coverage = Some(TableCoveragePointer {
1579 bucket_spec: bad_bucket.clone(),
1580 coverage_path: ptr.coverage_path.clone(),
1581 version: ptr.version,
1582 });
1583
1584 let err = table
1585 .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1586 .await
1587 .expect_err("append should fail when snapshot bucket mismatches index");
1588
1589 assert!(matches!(
1590 err,
1591 TableError::TableCoverageBucketMismatch { .. }
1592 ));
1593 Ok(())
1594 }
1595}