Skip to main content

timeseries_table_core/table/
append.rs

1//! Append pipeline for `TimeSeriesTable`.
2//!
3//! This module contains the core append implementation plus the public
4//! wrappers. It is responsible for:
5//! - loading/deriving segment metadata and logical schema,
6//! - enforcing v0.1 schema rules (adopt on first append, otherwise exact match),
7//! - computing segment coverage, detecting overlaps, and writing coverage sidecars,
8//! - optimistic commit to the transaction log and in-memory state update.
9//!   Keep new append-time invariants here so the flow remains centralized.
10
11use std::path::Path;
12use std::time::Instant;
13
14use bytes::Bytes;
15
16use snafu::prelude::*;
17
18use crate::{
19    coverage::serde::coverage_to_bytes,
20    coverage::{
21        io::{CoverageError, write_coverage_sidecar_new_bytes},
22        layout::{
23            segment_coverage_id_v1, segment_coverage_path, table_coverage_id_v1,
24            table_snapshot_path,
25        },
26    },
27    formats::parquet::{
28        coverage::compute_segment_coverage_from_parquet_bytes, logical_schema_from_parquet_bytes,
29        segment_entity_identity_from_parquet_bytes, segment_meta_from_parquet_bytes_with_report,
30    },
31    metadata::schema_compat::ensure_schema_exact_match,
32    storage::{self, StorageError},
33    transaction_log::{
34        LogAction, SegmentId, TableState, segments::segment_id_v1,
35        table_state::TableCoveragePointer,
36    },
37};
38
39use super::{
40    TimeSeriesTable,
41    append_report::{AppendReport, AppendReportBuilder},
42    error::{
43        CoverageOverlapSnafu, EntityMismatchSnafu, ExistingSegmentMissingCoverageSnafu,
44        MissingCanonicalSchemaSnafu, SchemaCompatibilitySnafu, SegmentCoverageSnafu,
45        SegmentEntityIdentitySnafu, SegmentMetaSnafu, StorageSnafu, TableError,
46        TransactionLogSnafu,
47    },
48};
49
50fn ensure_existing_segments_have_coverage(state: &TableState) -> Result<(), TableError> {
51    for seg in state.segments.values() {
52        if seg.coverage_path.is_none() {
53            return ExistingSegmentMissingCoverageSnafu {
54                segment_id: seg.segment_id.clone(),
55            }
56            .fail();
57        }
58    }
59
60    Ok(())
61}
62
63impl TimeSeriesTable {
64    /// Core append implementation that operates on already-loaded Parquet bytes.
65    ///
66    /// This contains the full v0.1 append flow (schema adoption/enforcement,
67    /// coverage computation + overlap detection, sidecar writes, OCC commit,
68    /// and in-memory state update). The public
69    /// `append_parquet_segment_with_id` wrapper is responsible only for
70    /// fetching the bytes from storage before delegating here.
71    ///
72    /// Callers must ensure `data` corresponds to `relative_path`; the function
73    /// does not re-read from storage.
74    #[allow(dead_code)]
75    async fn append_parquet_segment_with_id_and_bytes(
76        &mut self,
77        segment_id: SegmentId,
78        relative_path: &str,
79        time_column: &str,
80        data: Bytes,
81    ) -> Result<u64, TableError> {
82        self.append_parquet_segment_with_id_and_bytes_inner(
83            segment_id,
84            relative_path,
85            time_column,
86            data,
87            None,
88        )
89        .await
90    }
91
92    async fn append_parquet_segment_with_id_and_bytes_inner(
93        &mut self,
94        segment_id: SegmentId,
95        relative_path: &str,
96        time_column: &str,
97        data: Bytes,
98        mut report: Option<&mut AppendReportBuilder>,
99    ) -> Result<u64, TableError> {
100        let rel_path = Path::new(relative_path);
101        let expected_version = self.state.version;
102        let bucket_spec = self.index.bucket.clone();
103
104        // 0) Coverage readiness checks.
105        ensure_existing_segments_have_coverage(&self.state)?;
106
107        // 1) Segment meta + schema.
108        let step_start = Instant::now();
109        let (mut segment_meta, meta_report) = segment_meta_from_parquet_bytes_with_report(
110            rel_path,
111            segment_id,
112            time_column,
113            data.clone(),
114        )
115        .context(SegmentMetaSnafu)?;
116        if let Some(r) = report.as_mut() {
117            let fields = vec![
118                ("row_groups".to_string(), meta_report.row_groups.to_string()),
119                ("row_count".to_string(), meta_report.row_count.to_string()),
120                ("used_stats".to_string(), meta_report.used_stats.to_string()),
121                (
122                    "scanned_rows".to_string(),
123                    meta_report.scanned_rows.to_string(),
124                ),
125            ];
126            r.push_step("segment_meta", step_start.elapsed(), fields);
127        }
128
129        let step_start = Instant::now();
130        let segment_schema =
131            logical_schema_from_parquet_bytes(rel_path, data.clone()).context(SegmentMetaSnafu)?;
132        if let Some(r) = report.as_mut() {
133            r.push_step("logical_schema", step_start.elapsed(), Vec::new());
134        }
135
136        // 2) Schema behavior (return maybe_updated_meta, but do NOT build actions yet).
137        //
138        // - logical_schema == None && version == 1:
139        //     first append after create() — adopt this segment’s schema.
140        // - logical_schema == None && version != 1:
141        //     table is in a bad state for v0.1 → error.
142        // - logical_schema == Some(..):
143        //     enforce “no schema evolution” via ensure_schema_exact_match.
144        let maybe_table_schema = self.state.table_meta.logical_schema.as_ref();
145
146        let mut maybe_updated_meta = match maybe_table_schema {
147            None if expected_version == 1 => {
148                let mut updated_meta = self.state.table_meta.clone();
149                updated_meta.logical_schema = Some(segment_schema.clone());
150                Some(updated_meta)
151            }
152            None => {
153                return MissingCanonicalSchemaSnafu {
154                    version: expected_version,
155                }
156                .fail();
157            }
158            Some(table_schema) => {
159                ensure_schema_exact_match(table_schema, &segment_schema, &self.index)
160                    .context(SchemaCompatibilitySnafu)?;
161                None
162            }
163        };
164
165        // 2.5) Entity identity enforcement / pinning (v0.1 single-entity-per-table)
166        if !self.index.entity_columns.is_empty() {
167            let step_start = Instant::now();
168            let seg_ident = segment_entity_identity_from_parquet_bytes(
169                data.clone(),
170                rel_path,
171                &self.index.entity_columns,
172            )
173            .context(SegmentEntityIdentitySnafu)?;
174            if let Some(r) = report.as_mut() {
175                r.push_step("entity_identity", step_start.elapsed(), Vec::new());
176            }
177
178            match &self.state.table_meta.entity_identity {
179                Some(expected) => {
180                    if expected != &seg_ident {
181                        return EntityMismatchSnafu {
182                            segment_path: relative_path.to_string(),
183                            expected: expected.clone(),
184                            found: seg_ident,
185                        }
186                        .fail();
187                    }
188                }
189                None => {
190                    // pin the first append that includes entity columns
191                    let updated =
192                        maybe_updated_meta.get_or_insert_with(|| self.state.table_meta.clone());
193                    updated.entity_identity = Some(seg_ident);
194                }
195            }
196        }
197
198        // 3) Load current table snapshot coverage (or empty if first append).
199        let step_start = Instant::now();
200        let table_cov = self.load_table_snapshot_coverage_with_heal().await?;
201        if let Some(r) = report.as_mut() {
202            r.push_step("load_table_snapshot", step_start.elapsed(), Vec::new());
203        }
204
205        // 4) Compute segment coverage.
206        let step_start = Instant::now();
207        let segment_cov = compute_segment_coverage_from_parquet_bytes(
208            rel_path,
209            time_column,
210            &bucket_spec,
211            data.clone(),
212        )
213        .context(SegmentCoverageSnafu)?;
214        if let Some(r) = report.as_mut() {
215            r.push_step("segment_coverage", step_start.elapsed(), Vec::new());
216        }
217
218        // 5) Overlap detection.
219        let step_start = Instant::now();
220        let overlap = segment_cov.intersect(&table_cov);
221        let overlap_count = overlap.cardinality();
222        if overlap_count > 0 {
223            let example_bucket = overlap.present().iter().next();
224            return CoverageOverlapSnafu {
225                segment_path: relative_path.to_string(),
226                overlap_count,
227                example_bucket,
228            }
229            .fail();
230        }
231        if let Some(r) = report.as_mut() {
232            r.push_step("overlap_check", step_start.elapsed(), Vec::new());
233        }
234        let seg_cov_bytes =
235            coverage_to_bytes(&segment_cov).map_err(|source| TableError::CoverageSidecar {
236                source: CoverageError::Serde { source },
237            })?;
238
239        // 6) Write sidecars BEFORE commit (orphan files OK on commit failure)
240        let coverage_id = segment_coverage_id_v1(&bucket_spec, time_column, &seg_cov_bytes);
241        let seg_cov_path =
242            segment_coverage_path(&coverage_id).map_err(|source| TableError::CoverageSidecar {
243                source: CoverageError::Layout { source },
244            })?;
245        let step_start = Instant::now();
246        match write_coverage_sidecar_new_bytes(self.location(), &seg_cov_path, &seg_cov_bytes).await
247        {
248            Ok(()) => {}
249            Err(CoverageError::Storage {
250                source: StorageError::AlreadyExists { .. },
251            }) => {
252                // ok: same id implies same intended content
253            }
254            Err(e) => return Err(TableError::CoverageSidecar { source: e }),
255        }
256        if let Some(r) = report.as_mut() {
257            r.push_step("write_segment_sidecar", step_start.elapsed(), Vec::new());
258        }
259
260        let new_version_guess = expected_version + 1;
261
262        let new_table_cov = table_cov.union(&segment_cov);
263
264        let new_snap_cov_bytes =
265            coverage_to_bytes(&new_table_cov).map_err(|source| TableError::CoverageSidecar {
266                source: CoverageError::Serde { source },
267            })?;
268        let snapshot_id = table_coverage_id_v1(&bucket_spec, time_column, &new_snap_cov_bytes);
269
270        let snapshot_path = table_snapshot_path(new_version_guess, &snapshot_id).map_err(|e| {
271            TableError::CoverageSidecar {
272                source: CoverageError::Layout { source: e },
273            }
274        })?;
275
276        let step_start = Instant::now();
277        match write_coverage_sidecar_new_bytes(self.location(), &snapshot_path, &new_snap_cov_bytes)
278            .await
279        {
280            Ok(()) => {}
281            Err(CoverageError::Storage {
282                source: StorageError::AlreadyExists { .. },
283            }) => {
284                // ok: same id implies same intended content
285            }
286            Err(e) => return Err(TableError::CoverageSidecar { source: e }),
287        }
288        if let Some(r) = report.as_mut() {
289            r.push_step("write_snapshot_sidecar", step_start.elapsed(), Vec::new());
290        }
291
292        // 7) Build actions and commit.
293        segment_meta.coverage_path = Some(seg_cov_path.to_string_lossy().to_string());
294
295        let mut actions = Vec::new();
296        if let Some(updated_meta) = maybe_updated_meta.clone() {
297            actions.push(LogAction::UpdateTableMeta(updated_meta));
298        }
299
300        actions.push(LogAction::AddSegment(segment_meta.clone()));
301        actions.push(LogAction::UpdateTableCoverage {
302            bucket_spec: bucket_spec.clone(),
303            coverage_path: snapshot_path.to_string_lossy().to_string(),
304        });
305
306        let step_start = Instant::now();
307        let new_version = self
308            .log
309            .commit_with_expected_version(expected_version, actions)
310            .await
311            .context(TransactionLogSnafu)?;
312        if let Some(r) = report.as_mut() {
313            r.push_step("commit_log", step_start.elapsed(), Vec::new());
314        }
315
316        // OCC invariant: a successful commit_with_expected_version must return
317        // the same "next" version we predicted when constructing `snapshot_path`.
318        // If this ever diverges, it indicates a severe bug between snapshot path
319        // construction and the transaction log implementation, so we panic rather
320        // than continuing with an inconsistent in-memory state.
321        assert_eq!(
322            new_version, new_version_guess,
323            "transaction log returned unexpected version: expected {}, got {}",
324            new_version_guess, new_version
325        );
326
327        // 8) Update in-memory state.
328        let step_start = Instant::now();
329        self.state.version = new_version;
330
331        if let Some(updated_meta) = maybe_updated_meta {
332            self.state.table_meta = updated_meta
333        }
334
335        self.state
336            .segments
337            .insert(segment_meta.segment_id.clone(), segment_meta);
338
339        // Also update the snapshot pointer in state.
340        self.state.table_coverage = Some(TableCoveragePointer {
341            bucket_spec,
342            coverage_path: snapshot_path.to_string_lossy().to_string(),
343            version: new_version,
344        });
345        if let Some(r) = report.as_mut() {
346            r.push_step("state_update", step_start.elapsed(), Vec::new());
347        }
348
349        Ok(new_version)
350    }
351
352    /// Append a new Parquet segment with a caller-provided `segment_id`, registering it in the transaction log.
353    ///
354    /// v0.1 behavior:
355    /// - Build SegmentMeta from the Parquet file (ts_min, ts_max, row_count).
356    /// - Derive the segment logical schema from the Parquet file.
357    /// - If the table has no logical_schema yet, adopt this segment schema
358    ///   as canonical and write an UpdateTableMeta + AddSegment commit.
359    /// - Otherwise, enforce "no schema evolution" via schema_helpers.
360    /// - Compute coverage for the segment and table; reject if coverage overlaps.
361    /// - Write the segment coverage sidecar before committing (safe to orphan on failure).
362    /// - Commit with OCC on the current version.
363    /// - Update in-memory TableState on success.
364    ///
365    /// v0.1: duplicates (same segment_id/path) are allowed if their coverage
366    /// does not overlap existing data; otherwise overlap is rejected.
367    ///
368    /// This wrapper reads the Parquet bytes from storage, then delegates to
369    /// `append_parquet_segment_with_id_and_bytes` for the core logic.
370    pub async fn append_parquet_segment_with_id(
371        &mut self,
372        segment_id: SegmentId,
373        relative_path: &str,
374        time_column: &str,
375    ) -> Result<u64, TableError> {
376        let rel_path = Path::new(relative_path);
377
378        let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
379            .await
380            .context(StorageSnafu)?;
381
382        self.append_parquet_segment_with_id_and_bytes_inner(
383            segment_id,
384            relative_path,
385            time_column,
386            Bytes::from(bytes),
387            None,
388        )
389        .await
390    }
391
392    /// Append a Parquet segment using a deterministic, content-derived `segment_id`.
393    ///
394    /// This wrapper reads the Parquet bytes from storage, derives `segment_id`
395    /// via `segment_id_v1(relative_path, bytes)`, then delegates to
396    /// `append_parquet_segment_with_id_and_bytes` for the core logic.
397    /// Behavior (schema adoption/enforcement, coverage, OCC, state updates)
398    /// matches `append_parquet_segment_with_id`.
399    pub async fn append_parquet_segment(
400        &mut self,
401        relative_path: &str,
402        time_column: &str,
403    ) -> Result<u64, TableError> {
404        let rel_path = Path::new(relative_path);
405        let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
406            .await
407            .context(StorageSnafu)?;
408        let data = Bytes::from(bytes);
409
410        let segment_id = segment_id_v1(relative_path, &data);
411        self.append_parquet_segment_with_id_and_bytes_inner(
412            segment_id,
413            relative_path,
414            time_column,
415            data,
416            None,
417        )
418        .await
419    }
420
421    /// Append a Parquet segment and return a profiling report.
422    pub async fn append_parquet_segment_with_report(
423        &mut self,
424        relative_path: &str,
425        time_column: &str,
426    ) -> Result<(u64, AppendReport), TableError> {
427        let mut report = AppendReportBuilder::new();
428        report.set_context("relative_path", relative_path);
429        report.set_context("time_column", time_column);
430
431        let rel_path = Path::new(relative_path);
432        let read_start = Instant::now();
433        let bytes = storage::read_all_bytes(self.location().as_ref(), rel_path)
434            .await
435            .context(StorageSnafu)?;
436        report.push_step("read_parquet_bytes", read_start.elapsed(), Vec::new());
437
438        report.set_context("bytes_len", bytes.len().to_string());
439        let data = Bytes::from(bytes);
440
441        let segment_id = segment_id_v1(relative_path, &data);
442        report.set_context("segment_id", segment_id.0.clone());
443
444        let version = self
445            .append_parquet_segment_with_id_and_bytes_inner(
446                segment_id,
447                relative_path,
448                time_column,
449                data,
450                Some(&mut report),
451            )
452            .await?;
453
454        Ok((version, report.finish()))
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::super::test_util::*;
461    use super::*;
462    use crate::coverage::Coverage;
463    use crate::coverage::io::read_coverage_sidecar;
464    use crate::metadata::logical_schema::{LogicalDataType, LogicalTimestampUnit};
465    use crate::metadata::time_column::TimeColumnError;
466    use crate::storage::layout;
467    use crate::storage::{StorageLocation, TableLocation};
468    use crate::transaction_log::segments::{SegmentError, SegmentMetaError};
469    use crate::transaction_log::{
470        Commit, CommitError, TableKind, TableMeta, TimeBucket, TimeIndexSpec,
471    };
472    use std::collections::BTreeMap;
473    use tempfile::TempDir;
474
475    #[tokio::test]
476    async fn append_parquet_segment_with_id_missing_time_column_errors() -> TestResult {
477        let tmp = TempDir::new()?;
478        let location = TableLocation::local(tmp.path());
479        let meta = make_basic_table_meta();
480        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
481
482        let rel = "data/seg-no-ts.parquet";
483        let path = tmp.path().join(rel);
484        write_parquet_without_time_column(&path, &["A"], &[1.0])?;
485
486        let err = table
487            .append_parquet_segment_with_id(SegmentId("seg-no-ts".to_string()), rel, "ts")
488            .await
489            .expect_err("expected missing time column");
490
491        match err {
492            TableError::SegmentMeta { source } => {
493                assert!(matches!(
494                    source,
495                    SegmentError::Meta {
496                        source: SegmentMetaError::TimeColumn {
497                            source: TimeColumnError::Missing { .. },
498                            ..
499                        }
500                    },
501                ));
502            }
503            other => panic!("unexpected error: {other:?}"),
504        }
505
506        Ok(())
507    }
508
509    #[tokio::test]
510    async fn append_parquet_segment_with_id_updates_state_and_log() -> TestResult {
511        let tmp = TempDir::new()?;
512        let location = TableLocation::local(tmp.path());
513        let meta = make_basic_table_meta();
514
515        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
516
517        let rel_path = "data/seg1.parquet";
518        let abs_path = tmp.path().join(rel_path);
519        write_test_parquet(
520            &abs_path,
521            true,
522            false,
523            &[TestRow {
524                ts_millis: 1_000,
525                symbol: "A",
526                price: 10.0,
527            }],
528        )?;
529
530        let new_version = table
531            .append_parquet_segment_with_id(SegmentId("seg-1".to_string()), rel_path, "ts")
532            .await?;
533
534        assert_eq!(new_version, 2);
535        assert_eq!(table.state.version, 2);
536        let seg = table
537            .state
538            .segments
539            .get(&SegmentId("seg-1".to_string()))
540            .expect("segment present");
541        assert_eq!(seg.path, rel_path);
542        assert_eq!(seg.row_count, 1);
543        assert_eq!(seg.ts_min.timestamp_millis(), 1_000);
544        assert_eq!(seg.ts_max.timestamp_millis(), 1_000);
545
546        let commit_path = tmp.path().join(layout::commit_rel_path(2));
547        assert!(commit_path.is_file());
548        let current =
549            tokio::fs::read_to_string(tmp.path().join(layout::current_rel_path())).await?;
550        assert_eq!(current.trim(), "2");
551
552        let reopened = TimeSeriesTable::open(location).await?;
553        assert!(
554            reopened
555                .state
556                .segments
557                .contains_key(&SegmentId("seg-1".to_string()))
558        );
559        Ok(())
560    }
561
562    #[tokio::test]
563    async fn append_pins_entity_identity_and_commits_actions() -> TestResult {
564        let tmp = TempDir::new()?;
565        let location = TableLocation::local(tmp.path());
566        let meta = make_basic_table_meta();
567        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
568
569        let rel_path = "data/seg-entity-a.parquet";
570        let abs_path = tmp.path().join(rel_path);
571        write_test_parquet(
572            &abs_path,
573            true,
574            false,
575            &[TestRow {
576                ts_millis: 1_000,
577                symbol: "A",
578                price: 10.0,
579            }],
580        )?;
581
582        let version = table
583            .append_parquet_segment_with_id(SegmentId("seg-entity-a".to_string()), rel_path, "ts")
584            .await?;
585        assert_eq!(version, 2);
586
587        let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
588        assert_eq!(
589            table.state.table_meta.entity_identity,
590            Some(expected_identity.clone())
591        );
592
593        let commit_path = tmp.path().join(layout::commit_rel_path(2));
594        let contents = tokio::fs::read_to_string(&commit_path).await?;
595        let commit: Commit = serde_json::from_str(&contents)?;
596
597        assert_eq!(commit.actions.len(), 3);
598        match &commit.actions[0] {
599            LogAction::UpdateTableMeta(meta) => {
600                assert_eq!(meta.entity_identity.as_ref(), Some(&expected_identity));
601            }
602            other => panic!("expected UpdateTableMeta, got {other:?}"),
603        }
604        assert!(matches!(commit.actions[1], LogAction::AddSegment(_)));
605        assert!(matches!(
606            commit.actions[2],
607            LogAction::UpdateTableCoverage { .. }
608        ));
609
610        Ok(())
611    }
612
613    #[tokio::test]
614    async fn append_allows_same_entity_identity() -> TestResult {
615        let tmp = TempDir::new()?;
616        let location = TableLocation::local(tmp.path());
617        let meta = make_basic_table_meta();
618        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
619
620        let rel_path1 = "data/seg-entity-a-1.parquet";
621        let abs_path1 = tmp.path().join(rel_path1);
622        write_test_parquet(
623            &abs_path1,
624            true,
625            false,
626            &[TestRow {
627                ts_millis: 1_000,
628                symbol: "A",
629                price: 10.0,
630            }],
631        )?;
632
633        table
634            .append_parquet_segment_with_id(
635                SegmentId("seg-entity-a-1".to_string()),
636                rel_path1,
637                "ts",
638            )
639            .await?;
640
641        let rel_path2 = "data/seg-entity-a-2.parquet";
642        let abs_path2 = tmp.path().join(rel_path2);
643        write_test_parquet(
644            &abs_path2,
645            true,
646            false,
647            &[TestRow {
648                ts_millis: 120_000,
649                symbol: "A",
650                price: 20.0,
651            }],
652        )?;
653
654        let version = table
655            .append_parquet_segment_with_id(
656                SegmentId("seg-entity-a-2".to_string()),
657                rel_path2,
658                "ts",
659            )
660            .await?;
661        assert_eq!(version, 3);
662
663        let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
664        assert_eq!(
665            table.state.table_meta.entity_identity,
666            Some(expected_identity.clone())
667        );
668
669        let commit_path = tmp.path().join(layout::commit_rel_path(3));
670        let contents = tokio::fs::read_to_string(&commit_path).await?;
671        let commit: Commit = serde_json::from_str(&contents)?;
672        assert_eq!(commit.actions.len(), 2);
673        assert!(matches!(commit.actions[0], LogAction::AddSegment(_)));
674        assert!(matches!(
675            commit.actions[1],
676            LogAction::UpdateTableCoverage { .. }
677        ));
678
679        Ok(())
680    }
681
682    #[tokio::test]
683    async fn append_rejects_mismatched_entity_identity() -> TestResult {
684        let tmp = TempDir::new()?;
685        let location = TableLocation::local(tmp.path());
686        let meta = make_basic_table_meta();
687        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
688
689        let rel_path1 = "data/seg-entity-a.parquet";
690        let abs_path1 = tmp.path().join(rel_path1);
691        write_test_parquet(
692            &abs_path1,
693            true,
694            false,
695            &[TestRow {
696                ts_millis: 1_000,
697                symbol: "A",
698                price: 10.0,
699            }],
700        )?;
701        table
702            .append_parquet_segment_with_id(SegmentId("seg-entity-a".to_string()), rel_path1, "ts")
703            .await?;
704
705        let rel_path2 = "data/seg-entity-b.parquet";
706        let abs_path2 = tmp.path().join(rel_path2);
707        write_test_parquet(
708            &abs_path2,
709            true,
710            false,
711            &[TestRow {
712                ts_millis: 120_000,
713                symbol: "B",
714                price: 20.0,
715            }],
716        )?;
717
718        let err = table
719            .append_parquet_segment_with_id(SegmentId("seg-entity-b".to_string()), rel_path2, "ts")
720            .await
721            .expect_err("expected entity identity mismatch");
722
723        let expected_identity = BTreeMap::from([("symbol".to_string(), "A".to_string())]);
724        let found_identity = BTreeMap::from([("symbol".to_string(), "B".to_string())]);
725
726        match err {
727            TableError::EntityMismatch {
728                expected, found, ..
729            } => {
730                assert_eq!(expected, expected_identity);
731                assert_eq!(found, found_identity);
732            }
733            other => panic!("unexpected error: {other:?}"),
734        }
735
736        let commit_path = tmp.path().join(layout::commit_rel_path(3));
737        assert!(!commit_path.exists());
738
739        Ok(())
740    }
741
742    #[tokio::test]
743    async fn append_parquet_segment_with_id_adopts_schema_when_missing() -> TestResult {
744        let tmp = TempDir::new()?;
745        let location = TableLocation::local(tmp.path());
746
747        let index = TimeIndexSpec {
748            timestamp_column: "ts".to_string(),
749            entity_columns: vec![],
750            bucket: TimeBucket::Minutes(1),
751            timezone: None,
752        };
753        let meta = TableMeta {
754            kind: TableKind::TimeSeries(index),
755            logical_schema: None,
756            created_at: utc_datetime(2025, 1, 1, 0, 0, 0),
757            format_version: 1,
758            entity_identity: None,
759        };
760
761        let mut table = TimeSeriesTable::create(location, meta).await?;
762
763        let rel_path = "data/seg-adopt.parquet";
764        let abs_path = tmp.path().join(rel_path);
765        write_test_parquet(
766            &abs_path,
767            true,
768            false,
769            &[TestRow {
770                ts_millis: 5_000,
771                symbol: "B",
772                price: 20.0,
773            }],
774        )?;
775
776        let new_version = table
777            .append_parquet_segment_with_id(SegmentId("seg-adopt".to_string()), rel_path, "ts")
778            .await?;
779
780        assert_eq!(new_version, 2);
781        let schema = table
782            .state
783            .table_meta
784            .logical_schema
785            .as_ref()
786            .expect("schema adopted");
787        let names: Vec<_> = schema.columns().iter().map(|c| c.name.as_str()).collect();
788        assert_eq!(names, vec!["ts", "symbol", "price"]);
789        let ts_col = &schema.columns()[0];
790        assert_eq!(
791            ts_col.data_type,
792            LogicalDataType::Timestamp {
793                unit: LogicalTimestampUnit::Millis,
794                timezone: None,
795            }
796        );
797        Ok(())
798    }
799
800    #[tokio::test]
801    async fn append_parquet_segment_with_id_rejects_schema_mismatch() -> TestResult {
802        let tmp = TempDir::new()?;
803        let location = TableLocation::local(tmp.path());
804        let meta = make_basic_table_meta();
805        let mut table = TimeSeriesTable::create(location, meta).await?;
806
807        let rel_path = "data/seg-missing-symbol.parquet";
808        let abs_path = tmp.path().join(rel_path);
809        write_test_parquet(
810            &abs_path,
811            false,
812            false,
813            &[TestRow {
814                ts_millis: 10_000,
815                symbol: "C",
816                price: 30.0,
817            }],
818        )?;
819
820        let err = table
821            .append_parquet_segment_with_id(SegmentId("seg-bad".to_string()), rel_path, "ts")
822            .await
823            .expect_err("expected schema mismatch");
824
825        match err {
826            TableError::SchemaCompatibility { source } => {
827                assert!(matches!(
828                    source,
829                    crate::metadata::schema_compat::SchemaCompatibilityError::MissingColumn { .. }
830                ));
831            }
832            other => panic!("unexpected error: {other:?}"),
833        }
834        Ok(())
835    }
836
837    #[tokio::test]
838    async fn append_parquet_segment_with_id_allows_same_id_with_nonoverlapping_coverage()
839    -> TestResult {
840        let tmp = TempDir::new()?;
841        let location = TableLocation::local(tmp.path());
842        let meta = make_basic_table_meta();
843        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
844
845        let rel_path = "data/dup.parquet";
846        let abs_path = tmp.path().join(rel_path);
847
848        write_test_parquet(
849            &abs_path,
850            true,
851            false,
852            &[TestRow {
853                ts_millis: 1_000,
854                symbol: "A",
855                price: 10.0,
856            }],
857        )?;
858        let v2 = table
859            .append_parquet_segment_with_id(SegmentId("seg-dup".to_string()), rel_path, "ts")
860            .await?;
861        assert_eq!(v2, 2);
862        assert_eq!(table.state.version, 2);
863        assert_eq!(
864            table
865                .state
866                .segments
867                .get(&SegmentId("seg-dup".to_string()))
868                .unwrap()
869                .row_count,
870            1
871        );
872
873        // Overwrite the same path with different content and append again with the same segment ID.
874        write_test_parquet(
875            &abs_path,
876            true,
877            false,
878            &[
879                TestRow {
880                    ts_millis: 120_000,
881                    symbol: "A",
882                    price: 20.0,
883                },
884                TestRow {
885                    ts_millis: 121_000,
886                    symbol: "A",
887                    price: 30.0,
888                },
889            ],
890        )?;
891
892        let v3 = table
893            .append_parquet_segment_with_id(SegmentId("seg-dup".to_string()), rel_path, "ts")
894            .await?;
895        assert_eq!(v3, 3);
896        assert_eq!(table.state.version, 3);
897
898        let seg = table
899            .state
900            .segments
901            .get(&SegmentId("seg-dup".to_string()))
902            .expect("segment retained after duplicate append");
903        assert_eq!(seg.row_count, 2);
904        assert_eq!(seg.ts_min.timestamp_millis(), 120_000);
905        assert_eq!(seg.ts_max.timestamp_millis(), 121_000);
906
907        assert!(tmp.path().join(layout::commit_rel_path(3)).is_file());
908        Ok(())
909    }
910
911    #[tokio::test]
912    async fn append_parquet_segment_generates_id_and_updates_snapshot() -> TestResult {
913        let tmp = TempDir::new()?;
914        let location = TableLocation::local(tmp.path());
915        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
916
917        let rel1 = "data/seg-auto-1.parquet";
918        let rel2 = "data/seg-auto-2.parquet";
919        let path1 = tmp.path().join(rel1);
920        let path2 = tmp.path().join(rel2);
921
922        write_test_parquet(
923            &path1,
924            true,
925            false,
926            &[
927                TestRow {
928                    ts_millis: 1_000,
929                    symbol: "A",
930                    price: 10.0,
931                },
932                TestRow {
933                    ts_millis: 2_000,
934                    symbol: "A",
935                    price: 20.0,
936                },
937            ],
938        )?;
939        write_test_parquet(
940            &path2,
941            true,
942            false,
943            &[
944                TestRow {
945                    ts_millis: 120_000,
946                    symbol: "A",
947                    price: 30.0,
948                },
949                TestRow {
950                    ts_millis: 121_000,
951                    symbol: "A",
952                    price: 40.0,
953                },
954            ],
955        )?;
956
957        let v2 = table.append_parquet_segment(rel1, "ts").await?;
958        let v3 = table.append_parquet_segment(rel2, "ts").await?;
959        assert_eq!(v2, 2);
960        assert_eq!(v3, 3);
961
962        let data1 = Bytes::from(tokio::fs::read(&path1).await?);
963        let data2 = Bytes::from(tokio::fs::read(&path2).await?);
964
965        let expected_id1 = segment_id_v1(rel1, &data1);
966        let expected_id2 = segment_id_v1(rel2, &data2);
967
968        let seg1 = table
969            .state
970            .segments
971            .get(&expected_id1)
972            .expect("segment 1 present");
973        let seg2 = table
974            .state
975            .segments
976            .get(&expected_id2)
977            .expect("segment 2 present");
978        assert!(seg1.coverage_path.is_some());
979        assert!(seg2.coverage_path.is_some());
980
981        let bucket_spec = table.index_spec().bucket.clone();
982
983        let cov1 = compute_segment_coverage_from_parquet_bytes(
984            Path::new(rel1),
985            "ts",
986            &bucket_spec,
987            data1.clone(),
988        )?;
989        let cov2 = compute_segment_coverage_from_parquet_bytes(
990            Path::new(rel2),
991            "ts",
992            &bucket_spec,
993            data2.clone(),
994        )?;
995        let expected_snapshot = cov1.union(&cov2);
996
997        let ptr = table
998            .state
999            .table_coverage
1000            .as_ref()
1001            .expect("table snapshot pointer present after append");
1002        assert_eq!(ptr.version, v3);
1003        assert_eq!(ptr.bucket_spec, bucket_spec);
1004
1005        let snapshot_cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1006
1007        assert_eq!(snapshot_cov.present(), expected_snapshot.present());
1008        Ok(())
1009    }
1010
1011    #[tokio::test]
1012    async fn append_parquet_segment_rejects_overlap() -> TestResult {
1013        let tmp = TempDir::new()?;
1014        let location = TableLocation::local(tmp.path());
1015        let mut table = TimeSeriesTable::create(location, make_basic_table_meta()).await?;
1016
1017        let rel1 = "data/seg-overlap-a.parquet";
1018        let rel2 = "data/seg-overlap-b.parquet";
1019        let path1 = tmp.path().join(rel1);
1020        let path2 = tmp.path().join(rel2);
1021
1022        write_test_parquet(
1023            &path1,
1024            true,
1025            false,
1026            &[TestRow {
1027                ts_millis: 1_000,
1028                symbol: "A",
1029                price: 10.0,
1030            }],
1031        )?;
1032        write_test_parquet(
1033            &path2,
1034            true,
1035            false,
1036            &[TestRow {
1037                ts_millis: 1_500,
1038                symbol: "A",
1039                price: 20.0,
1040            }],
1041        )?;
1042
1043        table.append_parquet_segment(rel1, "ts").await?;
1044
1045        let err = table
1046            .append_parquet_segment(rel2, "ts")
1047            .await
1048            .expect_err("overlapping append should fail");
1049
1050        assert!(matches!(err, TableError::CoverageOverlap { .. }));
1051        Ok(())
1052    }
1053
1054    #[tokio::test]
1055    async fn append_parquet_segment_snapshot_survives_reopen() -> TestResult {
1056        let tmp = TempDir::new()?;
1057        let location = TableLocation::local(tmp.path());
1058        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1059
1060        let rel1 = "data/seg-reopen-a.parquet";
1061        let rel2 = "data/seg-reopen-b.parquet";
1062        let path1 = tmp.path().join(rel1);
1063        let path2 = tmp.path().join(rel2);
1064
1065        write_test_parquet(
1066            &path1,
1067            true,
1068            false,
1069            &[TestRow {
1070                ts_millis: 1_000,
1071                symbol: "A",
1072                price: 10.0,
1073            }],
1074        )?;
1075        write_test_parquet(
1076            &path2,
1077            true,
1078            false,
1079            &[TestRow {
1080                ts_millis: 120_000,
1081                symbol: "A",
1082                price: 20.0,
1083            }],
1084        )?;
1085
1086        table.append_parquet_segment(rel1, "ts").await?;
1087        table.append_parquet_segment(rel2, "ts").await?;
1088
1089        let reopened = TimeSeriesTable::open(location.clone()).await?;
1090        let ptr = reopened
1091            .state()
1092            .table_coverage
1093            .as_ref()
1094            .expect("table snapshot pointer present after reopen");
1095
1096        let bucket_spec = reopened.index_spec().bucket.clone();
1097        assert_eq!(ptr.bucket_spec, bucket_spec);
1098
1099        let data1 = Bytes::from(tokio::fs::read(&path1).await?);
1100        let data2 = Bytes::from(tokio::fs::read(&path2).await?);
1101
1102        let cov1 = compute_segment_coverage_from_parquet_bytes(
1103            Path::new(rel1),
1104            "ts",
1105            &bucket_spec,
1106            data1,
1107        )?;
1108        let cov2 = compute_segment_coverage_from_parquet_bytes(
1109            Path::new(rel2),
1110            "ts",
1111            &bucket_spec,
1112            data2,
1113        )?;
1114        let expected = cov1.union(&cov2);
1115
1116        let snapshot_cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1117        assert_eq!(snapshot_cov.present(), expected.present());
1118        Ok(())
1119    }
1120
1121    #[tokio::test]
1122    async fn load_snapshot_recovers_when_missing_file() -> TestResult {
1123        let tmp = TempDir::new()?;
1124        let location = TableLocation::local(tmp.path());
1125        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1126
1127        // Append two segments so we have segment sidecars plus a table snapshot pointer.
1128        let rel1 = "data/seg-missing-snap-a.parquet";
1129        let rel2 = "data/seg-missing-snap-b.parquet";
1130        let path1 = tmp.path().join(rel1);
1131        let path2 = tmp.path().join(rel2);
1132        write_test_parquet(
1133            &path1,
1134            true,
1135            false,
1136            &[TestRow {
1137                ts_millis: 1_000,
1138                symbol: "A",
1139                price: 10.0,
1140            }],
1141        )?;
1142        write_test_parquet(
1143            &path2,
1144            true,
1145            false,
1146            &[TestRow {
1147                ts_millis: 120_000,
1148                symbol: "A",
1149                price: 20.0,
1150            }],
1151        )?;
1152
1153        table.append_parquet_segment(rel1, "ts").await?;
1154        table.append_parquet_segment(rel2, "ts").await?;
1155
1156        let state = table.state.clone();
1157        let ptr = state
1158            .table_coverage
1159            .as_ref()
1160            .expect("snapshot pointer present");
1161        let snapshot_abs = match &location.as_ref() {
1162            StorageLocation::Local(root) => root.join(&ptr.coverage_path),
1163        };
1164
1165        tokio::fs::remove_file(&snapshot_abs).await?;
1166
1167        let recovered = table.load_table_snapshot_coverage_with_heal().await?;
1168
1169        let mut expected = Coverage::empty();
1170        for seg in state.segments.values() {
1171            let cov_path = seg.coverage_path.as_ref().expect("coverage path");
1172            let cov = read_coverage_sidecar(&location, Path::new(cov_path)).await?;
1173            expected.union_inplace(&cov);
1174        }
1175
1176        assert_eq!(recovered.present(), expected.present());
1177        Ok(())
1178    }
1179
1180    #[tokio::test]
1181    async fn load_snapshot_recovers_when_corrupt_file() -> TestResult {
1182        let tmp = TempDir::new()?;
1183        let location = TableLocation::local(tmp.path());
1184        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1185
1186        let rel1 = "data/seg-corrupt-snap-a.parquet";
1187        let rel2 = "data/seg-corrupt-snap-b.parquet";
1188        let path1 = tmp.path().join(rel1);
1189        let path2 = tmp.path().join(rel2);
1190        write_test_parquet(
1191            &path1,
1192            true,
1193            false,
1194            &[TestRow {
1195                ts_millis: 1_000,
1196                symbol: "A",
1197                price: 10.0,
1198            }],
1199        )?;
1200        write_test_parquet(
1201            &path2,
1202            true,
1203            false,
1204            &[TestRow {
1205                ts_millis: 120_000,
1206                symbol: "A",
1207                price: 20.0,
1208            }],
1209        )?;
1210
1211        table.append_parquet_segment(rel1, "ts").await?;
1212        table.append_parquet_segment(rel2, "ts").await?;
1213
1214        let state = table.state.clone();
1215        let ptr = state
1216            .table_coverage
1217            .as_ref()
1218            .expect("snapshot pointer present");
1219        let snapshot_abs = match &location.as_ref() {
1220            StorageLocation::Local(root) => root.join(&ptr.coverage_path),
1221        };
1222
1223        tokio::fs::write(&snapshot_abs, b"garbage").await?;
1224
1225        let recovered = table.load_table_snapshot_coverage_with_heal().await?;
1226
1227        let mut expected = Coverage::empty();
1228        for seg in state.segments.values() {
1229            let cov_path = seg.coverage_path.as_ref().expect("coverage path");
1230            let cov = read_coverage_sidecar(&location, Path::new(cov_path)).await?;
1231            expected.union_inplace(&cov);
1232        }
1233
1234        assert_eq!(recovered.present(), expected.present());
1235        Ok(())
1236    }
1237
1238    #[tokio::test]
1239    async fn load_snapshot_errors_when_segment_missing_coverage_path() -> TestResult {
1240        let tmp = TempDir::new()?;
1241        let location = TableLocation::local(tmp.path());
1242        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1243
1244        let rel1 = "data/seg-missing-cov-path.parquet";
1245        let path1 = tmp.path().join(rel1);
1246        write_test_parquet(
1247            &path1,
1248            true,
1249            false,
1250            &[TestRow {
1251                ts_millis: 1_000,
1252                symbol: "A",
1253                price: 10.0,
1254            }],
1255        )?;
1256
1257        table.append_parquet_segment(rel1, "ts").await?;
1258
1259        let mut state = table.state.clone();
1260        state.table_coverage = None;
1261
1262        let seg_id = state
1263            .segments
1264            .keys()
1265            .next()
1266            .expect("segment present")
1267            .clone();
1268        state
1269            .segments
1270            .get_mut(&seg_id)
1271            .expect("segment present")
1272            .coverage_path = None;
1273
1274        // Overwrite table state with the modified snapshot missing coverage_path.
1275        table.state = state;
1276
1277        let err = table
1278            .load_table_snapshot_coverage_with_heal()
1279            .await
1280            .expect_err("missing coverage_path should error");
1281
1282        assert!(matches!(
1283            err,
1284            TableError::ExistingSegmentMissingCoverage { .. }
1285        ));
1286        Ok(())
1287    }
1288
1289    #[tokio::test]
1290    async fn load_snapshot_errors_when_segment_sidecar_corrupt() -> TestResult {
1291        let tmp = TempDir::new()?;
1292        let location = TableLocation::local(tmp.path());
1293        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1294
1295        let rel1 = "data/seg-corrupt-sidecar.parquet";
1296        let rel2 = "data/seg-corrupt-sidecar-ok.parquet";
1297        let path1 = tmp.path().join(rel1);
1298        let path2 = tmp.path().join(rel2);
1299        write_test_parquet(
1300            &path1,
1301            true,
1302            false,
1303            &[TestRow {
1304                ts_millis: 1_000,
1305                symbol: "A",
1306                price: 10.0,
1307            }],
1308        )?;
1309        write_test_parquet(
1310            &path2,
1311            true,
1312            false,
1313            &[TestRow {
1314                ts_millis: 120_000,
1315                symbol: "A",
1316                price: 20.0,
1317            }],
1318        )?;
1319
1320        table.append_parquet_segment(rel1, "ts").await?;
1321        table.append_parquet_segment(rel2, "ts").await?;
1322
1323        let mut state = table.state.clone();
1324        state.table_coverage = None;
1325        let (corrupt_seg_id, corrupt_cov_path) = state
1326            .segments
1327            .iter()
1328            .next()
1329            .map(|(id, meta)| {
1330                (
1331                    id.clone(),
1332                    meta.coverage_path.as_ref().expect("coverage path").clone(),
1333                )
1334            })
1335            .expect("at least one segment");
1336        table.state = state;
1337
1338        let corrupt_abs = match &location.as_ref() {
1339            StorageLocation::Local(root) => root.join(&corrupt_cov_path),
1340        };
1341        tokio::fs::write(&corrupt_abs, b"not a coverage bitmap").await?;
1342
1343        let err = table
1344            .load_table_snapshot_coverage_with_heal()
1345            .await
1346            .expect_err("corrupt sidecar should error");
1347
1348        match err {
1349            TableError::SegmentCoverageSidecarRead {
1350                segment_id,
1351                coverage_path,
1352                ..
1353            } => {
1354                assert_eq!(segment_id, corrupt_seg_id);
1355                assert_eq!(coverage_path, corrupt_cov_path);
1356            }
1357            other => panic!("unexpected error: {other:?}"),
1358        }
1359
1360        Ok(())
1361    }
1362
1363    #[tokio::test]
1364    async fn append_parquet_segment_with_id_conflict_returns_error() -> TestResult {
1365        let tmp = TempDir::new()?;
1366        let location = TableLocation::local(tmp.path());
1367        let meta = make_basic_table_meta();
1368        let mut table = TimeSeriesTable::create(location.clone(), meta).await?;
1369
1370        let rel_path = "data/conflict.parquet";
1371        let abs_path = tmp.path().join(rel_path);
1372        write_test_parquet(
1373            &abs_path,
1374            true,
1375            false,
1376            &[TestRow {
1377                ts_millis: 10_000,
1378                symbol: "X",
1379                price: 100.0,
1380            }],
1381        )?;
1382
1383        // Simulate an external writer advancing CURRENT to version 2 while this handle is at version 1.
1384        table
1385            .log
1386            .commit_with_expected_version(1, vec![])
1387            .await
1388            .expect("external commit succeeds");
1389
1390        let err = table
1391            .append_parquet_segment_with_id(SegmentId("seg-conflict".to_string()), rel_path, "ts")
1392            .await
1393            .expect_err("expected conflict due to stale version");
1394
1395        match err {
1396            TableError::TransactionLog { source } => {
1397                assert!(matches!(
1398                    source,
1399                    CommitError::Conflict {
1400                        expected: 1,
1401                        found: 2,
1402                        ..
1403                    }
1404                ));
1405            }
1406            other => panic!("unexpected error: {other:?}"),
1407        }
1408        Ok(())
1409    }
1410
1411    #[tokio::test]
1412    async fn append_fails_when_existing_segment_missing_coverage_path() -> TestResult {
1413        let tmp = TempDir::new()?;
1414        let location = TableLocation::local(tmp.path());
1415        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1416
1417        let rel1 = "data/seg-missing-cov.parquet";
1418        let rel2 = "data/seg-next.parquet";
1419        let path1 = tmp.path().join(rel1);
1420        let path2 = tmp.path().join(rel2);
1421
1422        write_test_parquet(
1423            &path1,
1424            true,
1425            false,
1426            &[TestRow {
1427                ts_millis: 1_000,
1428                symbol: "A",
1429                price: 10.0,
1430            }],
1431        )?;
1432        write_test_parquet(
1433            &path2,
1434            true,
1435            false,
1436            &[TestRow {
1437                ts_millis: 120_000,
1438                symbol: "A",
1439                price: 20.0,
1440            }],
1441        )?;
1442
1443        table
1444            .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1445            .await?;
1446
1447        // Simulate legacy/bad state: drop coverage_path on the existing segment.
1448        let seg = table
1449            .state
1450            .segments
1451            .get_mut(&SegmentId("seg-a".to_string()))
1452            .expect("segment present");
1453        seg.coverage_path = None;
1454
1455        let err = table
1456            .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1457            .await
1458            .expect_err("append should fail when existing segment lacks coverage");
1459
1460        assert!(matches!(
1461            err,
1462            TableError::ExistingSegmentMissingCoverage { .. }
1463        ));
1464        Ok(())
1465    }
1466
1467    #[tokio::test]
1468    // Unlike load_snapshot_recovers_when_missing_file (which exercises recovery when
1469    // the pointer exists but the snapshot file is gone), this covers the case where
1470    // the in-memory pointer itself is missing while segments exist, and append
1471    // must rebuild + rewrite the pointer as part of the append flow.
1472    async fn append_recovers_when_table_snapshot_pointer_missing() -> TestResult {
1473        let tmp = TempDir::new()?;
1474        let location = TableLocation::local(tmp.path());
1475        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1476
1477        let rel1 = "data/seg-no-pointer-a.parquet";
1478        let rel2 = "data/seg-no-pointer-b.parquet";
1479        let path1 = tmp.path().join(rel1);
1480        let path2 = tmp.path().join(rel2);
1481
1482        write_test_parquet(
1483            &path1,
1484            true,
1485            false,
1486            &[TestRow {
1487                ts_millis: 1_000,
1488                symbol: "A",
1489                price: 10.0,
1490            }],
1491        )?;
1492        write_test_parquet(
1493            &path2,
1494            true,
1495            false,
1496            &[TestRow {
1497                ts_millis: 120_000,
1498                symbol: "A",
1499                price: 20.0,
1500            }],
1501        )?;
1502
1503        table
1504            .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1505            .await?;
1506
1507        // Simulate missing snapshot pointer while segments exist.
1508        table.state.table_coverage = None;
1509
1510        table
1511            .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1512            .await?;
1513
1514        // Snapshot pointer should be restored after a successful append.
1515        let ptr = table
1516            .state
1517            .table_coverage
1518            .as_ref()
1519            .expect("snapshot pointer restored");
1520
1521        let cov = read_coverage_sidecar(&location, Path::new(&ptr.coverage_path)).await?;
1522
1523        let mut expected = Coverage::empty();
1524        for seg in table.state.segments.values() {
1525            let path = seg.coverage_path.as_ref().expect("coverage path");
1526            let seg_cov = read_coverage_sidecar(&location, Path::new(path)).await?;
1527            expected.union_inplace(&seg_cov);
1528        }
1529
1530        assert_eq!(cov.present(), expected.present());
1531        Ok(())
1532    }
1533
1534    #[tokio::test]
1535    async fn append_fails_when_table_snapshot_bucket_mismatches_index() -> TestResult {
1536        let tmp = TempDir::new()?;
1537        let location = TableLocation::local(tmp.path());
1538        let mut table = TimeSeriesTable::create(location.clone(), make_basic_table_meta()).await?;
1539
1540        let rel1 = "data/seg-bucket-a.parquet";
1541        let rel2 = "data/seg-bucket-b.parquet";
1542        let path1 = tmp.path().join(rel1);
1543        let path2 = tmp.path().join(rel2);
1544
1545        write_test_parquet(
1546            &path1,
1547            true,
1548            false,
1549            &[TestRow {
1550                ts_millis: 1_000,
1551                symbol: "A",
1552                price: 10.0,
1553            }],
1554        )?;
1555        write_test_parquet(
1556            &path2,
1557            true,
1558            false,
1559            &[TestRow {
1560                ts_millis: 120_000,
1561                symbol: "A",
1562                price: 20.0,
1563            }],
1564        )?;
1565
1566        table
1567            .append_parquet_segment_with_id(SegmentId("seg-a".to_string()), rel1, "ts")
1568            .await?;
1569
1570        // Tamper snapshot pointer to a mismatching bucket spec.
1571        let bad_bucket = TimeBucket::Hours(1);
1572        let ptr = table
1573            .state
1574            .table_coverage
1575            .as_ref()
1576            .expect("pointer present")
1577            .clone();
1578        table.state.table_coverage = Some(TableCoveragePointer {
1579            bucket_spec: bad_bucket.clone(),
1580            coverage_path: ptr.coverage_path.clone(),
1581            version: ptr.version,
1582        });
1583
1584        let err = table
1585            .append_parquet_segment_with_id(SegmentId("seg-b".to_string()), rel2, "ts")
1586            .await
1587            .expect_err("append should fail when snapshot bucket mismatches index");
1588
1589        assert!(matches!(
1590            err,
1591            TableError::TableCoverageBucketMismatch { .. }
1592        ));
1593        Ok(())
1594    }
1595}