lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use crate::rowids::version::{
18    created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb, RowDatasetVersionMeta,
19};
20use lance_core::datatypes::Schema;
21use lance_core::error::Result;
22
23/// Lance Data File
24///
25/// A data file is one piece of file storing data.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
27pub struct DataFile {
28    /// Relative path of the data file to dataset root.
29    pub path: String,
30    /// The ids of fields in this file.
31    pub fields: Vec<i32>,
32    /// The offsets of the fields listed in `fields`, empty in v1 files
33    ///
34    /// Note that -1 is a possibility and it indices that the field has
35    /// no top-level column in the file.
36    ///
37    /// Columns that lack a field id may still exist as extra entries in
38    /// `column_indices`; such columns are ignored by field-id–based projection.
39    /// For example, some fields, such as blob fields, occupy multiple
40    /// columns in the file but only have a single field id.
41    #[serde(default)]
42    pub column_indices: Vec<i32>,
43    /// The major version of the file format used to write this file.
44    #[serde(default)]
45    pub file_major_version: u32,
46    /// The minor version of the file format used to write this file.
47    #[serde(default)]
48    pub file_minor_version: u32,
49
50    /// The size of the file in bytes, if known.
51    pub file_size_bytes: CachedFileSize,
52
53    /// The base path of the datafile, when the datafile is outside the dataset.
54    pub base_id: Option<u32>,
55}
56
57impl DataFile {
58    pub fn new(
59        path: impl Into<String>,
60        fields: Vec<i32>,
61        column_indices: Vec<i32>,
62        file_major_version: u32,
63        file_minor_version: u32,
64        file_size_bytes: Option<NonZero<u64>>,
65        base_id: Option<u32>,
66    ) -> Self {
67        Self {
68            path: path.into(),
69            fields,
70            column_indices,
71            file_major_version,
72            file_minor_version,
73            file_size_bytes: file_size_bytes.into(),
74            base_id,
75        }
76    }
77
78    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
79    pub fn new_unstarted(
80        path: impl Into<String>,
81        file_major_version: u32,
82        file_minor_version: u32,
83    ) -> Self {
84        Self {
85            path: path.into(),
86            fields: vec![],
87            column_indices: vec![],
88            file_major_version,
89            file_minor_version,
90            file_size_bytes: Default::default(),
91            base_id: None,
92        }
93    }
94
95    pub fn new_legacy_from_fields(
96        path: impl Into<String>,
97        fields: Vec<i32>,
98        base_id: Option<u32>,
99    ) -> Self {
100        Self::new(
101            path,
102            fields,
103            vec![],
104            MAJOR_VERSION as u32,
105            MINOR_VERSION as u32,
106            None,
107            base_id,
108        )
109    }
110
111    pub fn new_legacy(
112        path: impl Into<String>,
113        schema: &Schema,
114        file_size_bytes: Option<NonZero<u64>>,
115        base_id: Option<u32>,
116    ) -> Self {
117        let mut field_ids = schema.field_ids();
118        field_ids.sort();
119        Self::new(
120            path,
121            field_ids,
122            vec![],
123            MAJOR_VERSION as u32,
124            MINOR_VERSION as u32,
125            file_size_bytes,
126            base_id,
127        )
128    }
129
130    pub fn schema(&self, full_schema: &Schema) -> Schema {
131        full_schema.project_by_ids(&self.fields, false)
132    }
133
134    pub fn is_legacy_file(&self) -> bool {
135        self.file_major_version == 0 && self.file_minor_version < 3
136    }
137
138    pub fn validate(&self, base_path: &Path) -> Result<()> {
139        if self.is_legacy_file() {
140            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
141                return Err(Error::corrupt_file(
142                    base_path.child(self.path.clone()),
143                    "contained unsorted or duplicate field ids",
144                    location!(),
145                ));
146            }
147        } else if self.column_indices.len() < self.fields.len() {
148            // Every recorded field id must have a column index, but not every column needs
149            // to be associated with a field id (extra columns are allowed).
150            return Err(Error::corrupt_file(
151                base_path.child(self.path.clone()),
152                "contained fewer column_indices than fields",
153                location!(),
154            ));
155        }
156        Ok(())
157    }
158}
159
160impl From<&DataFile> for pb::DataFile {
161    fn from(df: &DataFile) -> Self {
162        Self {
163            path: df.path.clone(),
164            fields: df.fields.clone(),
165            column_indices: df.column_indices.clone(),
166            file_major_version: df.file_major_version,
167            file_minor_version: df.file_minor_version,
168            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
169            base_id: df.base_id,
170        }
171    }
172}
173
174impl TryFrom<pb::DataFile> for DataFile {
175    type Error = Error;
176
177    fn try_from(proto: pb::DataFile) -> Result<Self> {
178        Ok(Self {
179            path: proto.path,
180            fields: proto.fields,
181            column_indices: proto.column_indices,
182            file_major_version: proto.file_major_version,
183            file_minor_version: proto.file_minor_version,
184            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
185            base_id: proto.base_id,
186        })
187    }
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
191#[serde(rename_all = "lowercase")]
192pub enum DeletionFileType {
193    Array,
194    Bitmap,
195}
196
197impl DeletionFileType {
198    // TODO: pub(crate)
199    pub fn suffix(&self) -> &str {
200        match self {
201            Self::Array => "arrow",
202            Self::Bitmap => "bin",
203        }
204    }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
208pub struct DeletionFile {
209    pub read_version: u64,
210    pub id: u64,
211    pub file_type: DeletionFileType,
212    /// Number of deleted rows in this file. If None, this is unknown.
213    pub num_deleted_rows: Option<usize>,
214    pub base_id: Option<u32>,
215}
216
217impl TryFrom<pb::DeletionFile> for DeletionFile {
218    type Error = Error;
219
220    fn try_from(value: pb::DeletionFile) -> Result<Self> {
221        let file_type = match value.file_type {
222            0 => DeletionFileType::Array,
223            1 => DeletionFileType::Bitmap,
224            _ => {
225                return Err(Error::NotSupported {
226                    source: "Unknown deletion file type".into(),
227                    location: location!(),
228                })
229            }
230        };
231        let num_deleted_rows = if value.num_deleted_rows == 0 {
232            None
233        } else {
234            Some(value.num_deleted_rows as usize)
235        };
236        Ok(Self {
237            read_version: value.read_version,
238            id: value.id,
239            file_type,
240            num_deleted_rows,
241            base_id: value.base_id,
242        })
243    }
244}
245
246/// A reference to a part of a file.
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
248pub struct ExternalFile {
249    pub path: String,
250    pub offset: u64,
251    pub size: u64,
252}
253
254/// Metadata about location of the row id sequence.
255#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
256pub enum RowIdMeta {
257    Inline(Vec<u8>),
258    External(ExternalFile),
259}
260
261impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
262    type Error = Error;
263
264    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
265        match value {
266            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
267            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
268                Ok(Self::External(ExternalFile {
269                    path: file.path.clone(),
270                    offset: file.offset,
271                    size: file.size,
272                }))
273            }
274        }
275    }
276}
277
278/// Data fragment.
279///
280/// A fragment is a set of files which represent the different columns of the same rows.
281/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
283pub struct Fragment {
284    /// Fragment ID
285    pub id: u64,
286
287    /// Files within the fragment.
288    pub files: Vec<DataFile>,
289
290    /// Optional file with deleted local row offsets.
291    #[serde(skip_serializing_if = "Option::is_none")]
292    pub deletion_file: Option<DeletionFile>,
293
294    /// RowIndex
295    #[serde(skip_serializing_if = "Option::is_none")]
296    pub row_id_meta: Option<RowIdMeta>,
297
298    /// Original number of rows in the fragment. If this is None, then it is
299    /// unknown. This is only optional for legacy reasons. All new tables should
300    /// have this set.
301    pub physical_rows: Option<usize>,
302
303    /// Last updated at version metadata
304    #[serde(skip_serializing_if = "Option::is_none")]
305    pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
306
307    /// Created at version metadata
308    #[serde(skip_serializing_if = "Option::is_none")]
309    pub created_at_version_meta: Option<RowDatasetVersionMeta>,
310}
311
312impl Fragment {
313    pub fn new(id: u64) -> Self {
314        Self {
315            id,
316            files: vec![],
317            deletion_file: None,
318            row_id_meta: None,
319            physical_rows: None,
320            last_updated_at_version_meta: None,
321            created_at_version_meta: None,
322        }
323    }
324
325    pub fn num_rows(&self) -> Option<usize> {
326        match (self.physical_rows, &self.deletion_file) {
327            // Known fragment length, no deletion file.
328            (Some(len), None) => Some(len),
329            // Known fragment length, but don't know deletion file size.
330            (
331                Some(len),
332                Some(DeletionFile {
333                    num_deleted_rows: Some(num_deleted_rows),
334                    ..
335                }),
336            ) => Some(len - num_deleted_rows),
337            _ => None,
338        }
339    }
340
341    pub fn from_json(json: &str) -> Result<Self> {
342        let fragment: Self = serde_json::from_str(json)?;
343        Ok(fragment)
344    }
345
346    /// Create a `Fragment` with one DataFile
347    pub fn with_file_legacy(
348        id: u64,
349        path: &str,
350        schema: &Schema,
351        physical_rows: Option<usize>,
352    ) -> Self {
353        Self {
354            id,
355            files: vec![DataFile::new_legacy(path, schema, None, None)],
356            deletion_file: None,
357            physical_rows,
358            row_id_meta: None,
359            last_updated_at_version_meta: None,
360            created_at_version_meta: None,
361        }
362    }
363
364    pub fn with_file(
365        mut self,
366        path: impl Into<String>,
367        field_ids: Vec<i32>,
368        column_indices: Vec<i32>,
369        version: &LanceFileVersion,
370        file_size_bytes: Option<NonZero<u64>>,
371    ) -> Self {
372        let (major, minor) = version.to_numbers();
373        let data_file = DataFile::new(
374            path,
375            field_ids,
376            column_indices,
377            major,
378            minor,
379            file_size_bytes,
380            None,
381        );
382        self.files.push(data_file);
383        self
384    }
385
386    pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
387        self.physical_rows = Some(physical_rows);
388        self
389    }
390
391    pub fn add_file(
392        &mut self,
393        path: impl Into<String>,
394        field_ids: Vec<i32>,
395        column_indices: Vec<i32>,
396        version: &LanceFileVersion,
397        file_size_bytes: Option<NonZero<u64>>,
398    ) {
399        let (major, minor) = version.to_numbers();
400        self.files.push(DataFile::new(
401            path,
402            field_ids,
403            column_indices,
404            major,
405            minor,
406            file_size_bytes,
407            None,
408        ));
409    }
410
411    /// Add a new [`DataFile`] to this fragment.
412    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
413        self.files
414            .push(DataFile::new_legacy(path, schema, None, None));
415    }
416
417    // True if this fragment is made up of legacy v1 files, false otherwise
418    pub fn has_legacy_files(&self) -> bool {
419        // If any file in a fragment is legacy then all files in the fragment must be
420        self.files[0].is_legacy_file()
421    }
422
423    // Helper method to infer the Lance version from a set of fragments
424    //
425    // Returns None if there are no data files
426    // Returns an error if the data files have different versions
427    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
428        // Otherwise we need to check the actual file versions
429        // Determine version from first file
430        let Some(sample_file) = fragments
431            .iter()
432            .find(|f| !f.files.is_empty())
433            .map(|f| &f.files[0])
434        else {
435            return Ok(None);
436        };
437        let file_version = LanceFileVersion::try_from_major_minor(
438            sample_file.file_major_version,
439            sample_file.file_minor_version,
440        )?;
441        // Ensure all files match
442        for frag in fragments {
443            for file in &frag.files {
444                let this_file_version = LanceFileVersion::try_from_major_minor(
445                    file.file_major_version,
446                    file.file_minor_version,
447                )?;
448                if file_version != this_file_version {
449                    return Err(Error::invalid_input(
450                        format!(
451                            "All data files must have the same version.  Detected both {} and {}",
452                            file_version, this_file_version
453                        ),
454                        location!(),
455                    ));
456                }
457            }
458        }
459        Ok(Some(file_version))
460    }
461}
462
463impl TryFrom<pb::DataFragment> for Fragment {
464    type Error = Error;
465
466    fn try_from(p: pb::DataFragment) -> Result<Self> {
467        let physical_rows = if p.physical_rows > 0 {
468            Some(p.physical_rows as usize)
469        } else {
470            None
471        };
472        Ok(Self {
473            id: p.id,
474            files: p
475                .files
476                .into_iter()
477                .map(DataFile::try_from)
478                .collect::<Result<_>>()?,
479            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
480            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
481            physical_rows,
482            last_updated_at_version_meta: p
483                .last_updated_at_version_sequence
484                .map(RowDatasetVersionMeta::try_from)
485                .transpose()?,
486            created_at_version_meta: p
487                .created_at_version_sequence
488                .map(RowDatasetVersionMeta::try_from)
489                .transpose()?,
490        })
491    }
492}
493
494impl From<&Fragment> for pb::DataFragment {
495    fn from(f: &Fragment) -> Self {
496        let deletion_file = f.deletion_file.as_ref().map(|f| {
497            let file_type = match f.file_type {
498                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
499                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
500            };
501            pb::DeletionFile {
502                read_version: f.read_version,
503                id: f.id,
504                file_type: file_type.into(),
505                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
506                base_id: f.base_id,
507            }
508        });
509
510        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
511            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
512            RowIdMeta::External(file) => {
513                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
514                    path: file.path.clone(),
515                    offset: file.offset,
516                    size: file.size,
517                })
518            }
519        });
520        let last_updated_at_version_sequence =
521            last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
522        let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
523        Self {
524            id: f.id,
525            files: f.files.iter().map(pb::DataFile::from).collect(),
526            deletion_file,
527            row_id_sequence,
528            physical_rows: f.physical_rows.unwrap_or_default() as u64,
529            last_updated_at_version_sequence,
530            created_at_version_sequence,
531        }
532    }
533}
534
535#[cfg(test)]
536mod tests {
537    use super::*;
538    use arrow_schema::{
539        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
540    };
541    use object_store::path::Path;
542    use serde_json::{json, Value};
543
544    #[test]
545    fn test_new_fragment() {
546        let path = "foobar.lance";
547
548        let arrow_schema = ArrowSchema::new(vec![
549            ArrowField::new(
550                "s",
551                DataType::Struct(ArrowFields::from(vec![
552                    ArrowField::new("si", DataType::Int32, false),
553                    ArrowField::new("sb", DataType::Binary, true),
554                ])),
555                true,
556            ),
557            ArrowField::new("bool", DataType::Boolean, true),
558        ]);
559        let schema = Schema::try_from(&arrow_schema).unwrap();
560        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
561
562        assert_eq!(123, fragment.id);
563        assert_eq!(
564            fragment.files,
565            vec![DataFile::new_legacy_from_fields(
566                path.to_string(),
567                vec![0, 1, 2, 3],
568                None,
569            )]
570        )
571    }
572
573    #[test]
574    fn test_roundtrip_fragment() {
575        let mut fragment = Fragment::new(123);
576        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
577        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
578        fragment.deletion_file = Some(DeletionFile {
579            read_version: 123,
580            id: 456,
581            file_type: DeletionFileType::Array,
582            num_deleted_rows: Some(10),
583            base_id: None,
584        });
585
586        let proto = pb::DataFragment::from(&fragment);
587        let fragment2 = Fragment::try_from(proto).unwrap();
588        assert_eq!(fragment, fragment2);
589
590        fragment.deletion_file = None;
591        let proto = pb::DataFragment::from(&fragment);
592        let fragment2 = Fragment::try_from(proto).unwrap();
593        assert_eq!(fragment, fragment2);
594    }
595
596    #[test]
597    fn test_to_json() {
598        let mut fragment = Fragment::new(123);
599        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
600        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
601        fragment.deletion_file = Some(DeletionFile {
602            read_version: 123,
603            id: 456,
604            file_type: DeletionFileType::Array,
605            num_deleted_rows: Some(10),
606            base_id: None,
607        });
608
609        let json = serde_json::to_string(&fragment).unwrap();
610
611        let value: Value = serde_json::from_str(&json).unwrap();
612        assert_eq!(
613            value,
614            json!({
615                "id": 123,
616                "files":[
617                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
618                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
619                     "file_size_bytes": null, "base_id": null }
620                ],
621                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
622                                  "num_deleted_rows": 10, "base_id": null},
623                "physical_rows": None::<usize>}),
624        );
625
626        let frag2 = Fragment::from_json(&json).unwrap();
627        assert_eq!(fragment, frag2);
628    }
629
630    #[test]
631    fn data_file_validate_allows_extra_columns() {
632        let data_file = DataFile {
633            path: "foo.lance".to_string(),
634            fields: vec![1, 2],
635            // One extra column without a field id mapping
636            column_indices: vec![0, 1, 2],
637            file_major_version: MAJOR_VERSION as u32,
638            file_minor_version: MINOR_VERSION as u32,
639            file_size_bytes: Default::default(),
640            base_id: None,
641        };
642
643        let base_path = Path::from("base");
644        data_file
645            .validate(&base_path)
646            .expect("validation should allow extra columns without field ids");
647    }
648}