Skip to main content

lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13
14use crate::format::pb;
15
16use crate::rowids::version::{
17    RowDatasetVersionMeta, created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb,
18};
19use lance_core::datatypes::Schema;
20use lance_core::error::Result;
21
22/// Lance Data File
23///
24/// A data file is one piece of file storing data.
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
26pub struct DataFile {
27    /// Relative path of the data file to dataset root.
28    pub path: String,
29    /// The ids of fields in this file.
30    pub fields: Vec<i32>,
31    /// The offsets of the fields listed in `fields`, empty in v1 files
32    ///
33    /// Note that -1 is a possibility and it indices that the field has
34    /// no top-level column in the file.
35    ///
36    /// Columns that lack a field id may still exist as extra entries in
37    /// `column_indices`; such columns are ignored by field-id–based projection.
38    /// For example, some fields, such as blob fields, occupy multiple
39    /// columns in the file but only have a single field id.
40    #[serde(default)]
41    pub column_indices: Vec<i32>,
42    /// The major version of the file format used to write this file.
43    #[serde(default)]
44    pub file_major_version: u32,
45    /// The minor version of the file format used to write this file.
46    #[serde(default)]
47    pub file_minor_version: u32,
48
49    /// The size of the file in bytes, if known.
50    pub file_size_bytes: CachedFileSize,
51
52    /// The base path of the datafile, when the datafile is outside the dataset.
53    pub base_id: Option<u32>,
54}
55
56impl DataFile {
57    pub fn new(
58        path: impl Into<String>,
59        fields: Vec<i32>,
60        column_indices: Vec<i32>,
61        file_major_version: u32,
62        file_minor_version: u32,
63        file_size_bytes: Option<NonZero<u64>>,
64        base_id: Option<u32>,
65    ) -> Self {
66        Self {
67            path: path.into(),
68            fields,
69            column_indices,
70            file_major_version,
71            file_minor_version,
72            file_size_bytes: file_size_bytes.into(),
73            base_id,
74        }
75    }
76
77    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
78    pub fn new_unstarted(
79        path: impl Into<String>,
80        file_major_version: u32,
81        file_minor_version: u32,
82    ) -> Self {
83        Self {
84            path: path.into(),
85            fields: vec![],
86            column_indices: vec![],
87            file_major_version,
88            file_minor_version,
89            file_size_bytes: Default::default(),
90            base_id: None,
91        }
92    }
93
94    pub fn new_legacy_from_fields(
95        path: impl Into<String>,
96        fields: Vec<i32>,
97        base_id: Option<u32>,
98    ) -> Self {
99        Self::new(
100            path,
101            fields,
102            vec![],
103            MAJOR_VERSION as u32,
104            MINOR_VERSION as u32,
105            None,
106            base_id,
107        )
108    }
109
110    pub fn new_legacy(
111        path: impl Into<String>,
112        schema: &Schema,
113        file_size_bytes: Option<NonZero<u64>>,
114        base_id: Option<u32>,
115    ) -> Self {
116        let mut field_ids = schema.field_ids();
117        field_ids.sort();
118        Self::new(
119            path,
120            field_ids,
121            vec![],
122            MAJOR_VERSION as u32,
123            MINOR_VERSION as u32,
124            file_size_bytes,
125            base_id,
126        )
127    }
128
129    pub fn schema(&self, full_schema: &Schema) -> Schema {
130        full_schema.project_by_ids(&self.fields, false)
131    }
132
133    pub fn is_legacy_file(&self) -> bool {
134        self.file_major_version == 0 && self.file_minor_version < 3
135    }
136
137    pub fn validate(&self, base_path: &Path) -> Result<()> {
138        if self.is_legacy_file() {
139            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
140                return Err(Error::corrupt_file(
141                    base_path.child(self.path.clone()),
142                    "contained unsorted or duplicate field ids",
143                ));
144            }
145        } else if self.column_indices.len() < self.fields.len() {
146            // Every recorded field id must have a column index, but not every column needs
147            // to be associated with a field id (extra columns are allowed).
148            return Err(Error::corrupt_file(
149                base_path.child(self.path.clone()),
150                "contained fewer column_indices than fields",
151            ));
152        }
153        Ok(())
154    }
155}
156
157impl From<&DataFile> for pb::DataFile {
158    fn from(df: &DataFile) -> Self {
159        Self {
160            path: df.path.clone(),
161            fields: df.fields.clone(),
162            column_indices: df.column_indices.clone(),
163            file_major_version: df.file_major_version,
164            file_minor_version: df.file_minor_version,
165            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
166            base_id: df.base_id,
167        }
168    }
169}
170
171impl TryFrom<pb::DataFile> for DataFile {
172    type Error = Error;
173
174    fn try_from(proto: pb::DataFile) -> Result<Self> {
175        Ok(Self {
176            path: proto.path,
177            fields: proto.fields,
178            column_indices: proto.column_indices,
179            file_major_version: proto.file_major_version,
180            file_minor_version: proto.file_minor_version,
181            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
182            base_id: proto.base_id,
183        })
184    }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
188#[serde(rename_all = "lowercase")]
189pub enum DeletionFileType {
190    Array,
191    Bitmap,
192}
193
194impl DeletionFileType {
195    // TODO: pub(crate)
196    pub fn suffix(&self) -> &str {
197        match self {
198            Self::Array => "arrow",
199            Self::Bitmap => "bin",
200        }
201    }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
205pub struct DeletionFile {
206    pub read_version: u64,
207    pub id: u64,
208    pub file_type: DeletionFileType,
209    /// Number of deleted rows in this file. If None, this is unknown.
210    pub num_deleted_rows: Option<usize>,
211    pub base_id: Option<u32>,
212}
213
214impl TryFrom<pb::DeletionFile> for DeletionFile {
215    type Error = Error;
216
217    fn try_from(value: pb::DeletionFile) -> Result<Self> {
218        let file_type = match value.file_type {
219            0 => DeletionFileType::Array,
220            1 => DeletionFileType::Bitmap,
221            _ => {
222                return Err(Error::not_supported_source(
223                    "Unknown deletion file type".into(),
224                ));
225            }
226        };
227        let num_deleted_rows = if value.num_deleted_rows == 0 {
228            None
229        } else {
230            Some(value.num_deleted_rows as usize)
231        };
232        Ok(Self {
233            read_version: value.read_version,
234            id: value.id,
235            file_type,
236            num_deleted_rows,
237            base_id: value.base_id,
238        })
239    }
240}
241
242/// A reference to a part of a file.
243#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
244pub struct ExternalFile {
245    pub path: String,
246    pub offset: u64,
247    pub size: u64,
248}
249
250/// Metadata about location of the row id sequence.
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
252pub enum RowIdMeta {
253    Inline(Vec<u8>),
254    External(ExternalFile),
255}
256
257impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
258    type Error = Error;
259
260    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
261        match value {
262            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
263            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
264                Ok(Self::External(ExternalFile {
265                    path: file.path.clone(),
266                    offset: file.offset,
267                    size: file.size,
268                }))
269            }
270        }
271    }
272}
273
274/// Data fragment.
275///
276/// A fragment is a set of files which represent the different columns of the same rows.
277/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
278#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
279pub struct Fragment {
280    /// Fragment ID
281    pub id: u64,
282
283    /// Files within the fragment.
284    pub files: Vec<DataFile>,
285
286    /// Optional file with deleted local row offsets.
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub deletion_file: Option<DeletionFile>,
289
290    /// RowIndex
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub row_id_meta: Option<RowIdMeta>,
293
294    /// Original number of rows in the fragment. If this is None, then it is
295    /// unknown. This is only optional for legacy reasons. All new tables should
296    /// have this set.
297    pub physical_rows: Option<usize>,
298
299    /// Last updated at version metadata
300    #[serde(skip_serializing_if = "Option::is_none")]
301    pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
302
303    /// Created at version metadata
304    #[serde(skip_serializing_if = "Option::is_none")]
305    pub created_at_version_meta: Option<RowDatasetVersionMeta>,
306}
307
308impl Fragment {
309    pub fn new(id: u64) -> Self {
310        Self {
311            id,
312            files: vec![],
313            deletion_file: None,
314            row_id_meta: None,
315            physical_rows: None,
316            last_updated_at_version_meta: None,
317            created_at_version_meta: None,
318        }
319    }
320
321    pub fn num_rows(&self) -> Option<usize> {
322        match (self.physical_rows, &self.deletion_file) {
323            // Known fragment length, no deletion file.
324            (Some(len), None) => Some(len),
325            // Known fragment length, but don't know deletion file size.
326            (
327                Some(len),
328                Some(DeletionFile {
329                    num_deleted_rows: Some(num_deleted_rows),
330                    ..
331                }),
332            ) => Some(len - num_deleted_rows),
333            _ => None,
334        }
335    }
336
337    pub fn from_json(json: &str) -> Result<Self> {
338        let fragment: Self = serde_json::from_str(json)?;
339        Ok(fragment)
340    }
341
342    /// Create a `Fragment` with one DataFile
343    pub fn with_file_legacy(
344        id: u64,
345        path: &str,
346        schema: &Schema,
347        physical_rows: Option<usize>,
348    ) -> Self {
349        Self {
350            id,
351            files: vec![DataFile::new_legacy(path, schema, None, None)],
352            deletion_file: None,
353            physical_rows,
354            row_id_meta: None,
355            last_updated_at_version_meta: None,
356            created_at_version_meta: None,
357        }
358    }
359
360    pub fn with_file(
361        mut self,
362        path: impl Into<String>,
363        field_ids: Vec<i32>,
364        column_indices: Vec<i32>,
365        version: &LanceFileVersion,
366        file_size_bytes: Option<NonZero<u64>>,
367    ) -> Self {
368        let (major, minor) = version.to_numbers();
369        let data_file = DataFile::new(
370            path,
371            field_ids,
372            column_indices,
373            major,
374            minor,
375            file_size_bytes,
376            None,
377        );
378        self.files.push(data_file);
379        self
380    }
381
382    pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
383        self.physical_rows = Some(physical_rows);
384        self
385    }
386
387    pub fn add_file(
388        &mut self,
389        path: impl Into<String>,
390        field_ids: Vec<i32>,
391        column_indices: Vec<i32>,
392        version: &LanceFileVersion,
393        file_size_bytes: Option<NonZero<u64>>,
394    ) {
395        let (major, minor) = version.to_numbers();
396        self.files.push(DataFile::new(
397            path,
398            field_ids,
399            column_indices,
400            major,
401            minor,
402            file_size_bytes,
403            None,
404        ));
405    }
406
407    /// Add a new [`DataFile`] to this fragment.
408    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
409        self.files
410            .push(DataFile::new_legacy(path, schema, None, None));
411    }
412
413    // True if this fragment is made up of legacy v1 files, false otherwise
414    pub fn has_legacy_files(&self) -> bool {
415        // If any file in a fragment is legacy then all files in the fragment must be
416        self.files[0].is_legacy_file()
417    }
418
419    // Helper method to infer the Lance version from a set of fragments
420    //
421    // Returns None if there are no data files
422    // Returns an error if the data files have different versions
423    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
424        // Otherwise we need to check the actual file versions
425        // Determine version from first file
426        let Some(sample_file) = fragments
427            .iter()
428            .find(|f| !f.files.is_empty())
429            .map(|f| &f.files[0])
430        else {
431            return Ok(None);
432        };
433        let file_version = LanceFileVersion::try_from_major_minor(
434            sample_file.file_major_version,
435            sample_file.file_minor_version,
436        )?;
437        // Ensure all files match
438        for frag in fragments {
439            for file in &frag.files {
440                let this_file_version = LanceFileVersion::try_from_major_minor(
441                    file.file_major_version,
442                    file.file_minor_version,
443                )?;
444                if file_version != this_file_version {
445                    return Err(Error::invalid_input(format!(
446                        "All data files must have the same version.  Detected both {} and {}",
447                        file_version, this_file_version
448                    )));
449                }
450            }
451        }
452        Ok(Some(file_version))
453    }
454}
455
456impl TryFrom<pb::DataFragment> for Fragment {
457    type Error = Error;
458
459    fn try_from(p: pb::DataFragment) -> Result<Self> {
460        let physical_rows = if p.physical_rows > 0 {
461            Some(p.physical_rows as usize)
462        } else {
463            None
464        };
465        Ok(Self {
466            id: p.id,
467            files: p
468                .files
469                .into_iter()
470                .map(DataFile::try_from)
471                .collect::<Result<_>>()?,
472            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
473            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
474            physical_rows,
475            last_updated_at_version_meta: p
476                .last_updated_at_version_sequence
477                .map(RowDatasetVersionMeta::try_from)
478                .transpose()?,
479            created_at_version_meta: p
480                .created_at_version_sequence
481                .map(RowDatasetVersionMeta::try_from)
482                .transpose()?,
483        })
484    }
485}
486
487impl From<&Fragment> for pb::DataFragment {
488    fn from(f: &Fragment) -> Self {
489        let deletion_file = f.deletion_file.as_ref().map(|f| {
490            let file_type = match f.file_type {
491                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
492                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
493            };
494            pb::DeletionFile {
495                read_version: f.read_version,
496                id: f.id,
497                file_type: file_type.into(),
498                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
499                base_id: f.base_id,
500            }
501        });
502
503        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
504            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
505            RowIdMeta::External(file) => {
506                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
507                    path: file.path.clone(),
508                    offset: file.offset,
509                    size: file.size,
510                })
511            }
512        });
513        let last_updated_at_version_sequence =
514            last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
515        let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
516        Self {
517            id: f.id,
518            files: f.files.iter().map(pb::DataFile::from).collect(),
519            deletion_file,
520            row_id_sequence,
521            physical_rows: f.physical_rows.unwrap_or_default() as u64,
522            last_updated_at_version_sequence,
523            created_at_version_sequence,
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use arrow_schema::{
532        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
533    };
534    use object_store::path::Path;
535    use serde_json::{Value, json};
536
537    #[test]
538    fn test_new_fragment() {
539        let path = "foobar.lance";
540
541        let arrow_schema = ArrowSchema::new(vec![
542            ArrowField::new(
543                "s",
544                DataType::Struct(ArrowFields::from(vec![
545                    ArrowField::new("si", DataType::Int32, false),
546                    ArrowField::new("sb", DataType::Binary, true),
547                ])),
548                true,
549            ),
550            ArrowField::new("bool", DataType::Boolean, true),
551        ]);
552        let schema = Schema::try_from(&arrow_schema).unwrap();
553        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
554
555        assert_eq!(123, fragment.id);
556        assert_eq!(
557            fragment.files,
558            vec![DataFile::new_legacy_from_fields(
559                path.to_string(),
560                vec![0, 1, 2, 3],
561                None,
562            )]
563        )
564    }
565
566    #[test]
567    fn test_roundtrip_fragment() {
568        let mut fragment = Fragment::new(123);
569        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
570        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
571        fragment.deletion_file = Some(DeletionFile {
572            read_version: 123,
573            id: 456,
574            file_type: DeletionFileType::Array,
575            num_deleted_rows: Some(10),
576            base_id: None,
577        });
578
579        let proto = pb::DataFragment::from(&fragment);
580        let fragment2 = Fragment::try_from(proto).unwrap();
581        assert_eq!(fragment, fragment2);
582
583        fragment.deletion_file = None;
584        let proto = pb::DataFragment::from(&fragment);
585        let fragment2 = Fragment::try_from(proto).unwrap();
586        assert_eq!(fragment, fragment2);
587    }
588
589    #[test]
590    fn test_to_json() {
591        let mut fragment = Fragment::new(123);
592        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
593        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
594        fragment.deletion_file = Some(DeletionFile {
595            read_version: 123,
596            id: 456,
597            file_type: DeletionFileType::Array,
598            num_deleted_rows: Some(10),
599            base_id: None,
600        });
601
602        let json = serde_json::to_string(&fragment).unwrap();
603
604        let value: Value = serde_json::from_str(&json).unwrap();
605        assert_eq!(
606            value,
607            json!({
608                "id": 123,
609                "files":[
610                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
611                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
612                     "file_size_bytes": null, "base_id": null }
613                ],
614                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
615                                  "num_deleted_rows": 10, "base_id": null},
616                "physical_rows": None::<usize>}),
617        );
618
619        let frag2 = Fragment::from_json(&json).unwrap();
620        assert_eq!(fragment, frag2);
621    }
622
623    #[test]
624    fn data_file_validate_allows_extra_columns() {
625        let data_file = DataFile {
626            path: "foo.lance".to_string(),
627            fields: vec![1, 2],
628            // One extra column without a field id mapping
629            column_indices: vec![0, 1, 2],
630            file_major_version: MAJOR_VERSION as u32,
631            file_minor_version: MINOR_VERSION as u32,
632            file_size_bytes: Default::default(),
633            base_id: None,
634        };
635
636        let base_path = Path::from("base");
637        data_file
638            .validate(&base_path)
639            .expect("validation should allow extra columns without field ids");
640    }
641}