lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use lance_core::datatypes::Schema;
18use lance_core::error::Result;
19
20/// Lance Data File
21///
22/// A data file is one piece of file storing data.
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct DataFile {
25    /// Relative path of the data file to dataset root.
26    pub path: String,
27    /// The ids of fields in this file.
28    pub fields: Vec<i32>,
29    /// The offsets of the fields listed in `fields`, empty in v1 files
30    ///
31    /// Note that -1 is a possibility and it indices that the field has
32    /// no top-level column in the file.
33    #[serde(default)]
34    pub column_indices: Vec<i32>,
35    /// The major version of the file format used to write this file.
36    #[serde(default)]
37    pub file_major_version: u32,
38    /// The minor version of the file format used to write this file.
39    #[serde(default)]
40    pub file_minor_version: u32,
41
42    /// The size of the file in bytes, if known.
43    pub file_size_bytes: CachedFileSize,
44
45    /// The base path of the datafile, when the datafile is outside the dataset.
46    pub base_id: Option<u32>,
47}
48
49impl DataFile {
50    pub fn new(
51        path: impl Into<String>,
52        fields: Vec<i32>,
53        column_indices: Vec<i32>,
54        file_major_version: u32,
55        file_minor_version: u32,
56        file_size_bytes: Option<NonZero<u64>>,
57    ) -> Self {
58        Self {
59            path: path.into(),
60            fields,
61            column_indices,
62            file_major_version,
63            file_minor_version,
64            file_size_bytes: file_size_bytes.into(),
65            base_id: None,
66        }
67    }
68
69    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
70    pub fn new_unstarted(
71        path: impl Into<String>,
72        file_major_version: u32,
73        file_minor_version: u32,
74    ) -> Self {
75        Self {
76            path: path.into(),
77            fields: vec![],
78            column_indices: vec![],
79            file_major_version,
80            file_minor_version,
81            file_size_bytes: Default::default(),
82            base_id: None,
83        }
84    }
85
86    pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
87        Self::new(
88            path,
89            fields,
90            vec![],
91            MAJOR_VERSION as u32,
92            MINOR_VERSION as u32,
93            None,
94        )
95    }
96
97    pub fn new_legacy(
98        path: impl Into<String>,
99        schema: &Schema,
100        file_size_bytes: Option<NonZero<u64>>,
101    ) -> Self {
102        let mut field_ids = schema.field_ids();
103        field_ids.sort();
104        Self::new(
105            path,
106            field_ids,
107            vec![],
108            MAJOR_VERSION as u32,
109            MINOR_VERSION as u32,
110            file_size_bytes,
111        )
112    }
113
114    pub fn schema(&self, full_schema: &Schema) -> Schema {
115        full_schema.project_by_ids(&self.fields, false)
116    }
117
118    pub fn is_legacy_file(&self) -> bool {
119        self.file_major_version == 0 && self.file_minor_version < 3
120    }
121
122    pub fn validate(&self, base_path: &Path) -> Result<()> {
123        if self.is_legacy_file() {
124            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
125                return Err(Error::corrupt_file(
126                    base_path.child(self.path.clone()),
127                    "contained unsorted or duplicate field ids",
128                    location!(),
129                ));
130            }
131        } else if self.fields.len() != self.column_indices.len() {
132            return Err(Error::corrupt_file(
133                base_path.child(self.path.clone()),
134                "contained an unequal number of fields / column_indices",
135                location!(),
136            ));
137        }
138        Ok(())
139    }
140}
141
142impl From<&DataFile> for pb::DataFile {
143    fn from(df: &DataFile) -> Self {
144        Self {
145            path: df.path.clone(),
146            fields: df.fields.clone(),
147            column_indices: df.column_indices.clone(),
148            file_major_version: df.file_major_version,
149            file_minor_version: df.file_minor_version,
150            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
151            base_id: df.base_id,
152        }
153    }
154}
155
156impl TryFrom<pb::DataFile> for DataFile {
157    type Error = Error;
158
159    fn try_from(proto: pb::DataFile) -> Result<Self> {
160        Ok(Self {
161            path: proto.path,
162            fields: proto.fields,
163            column_indices: proto.column_indices,
164            file_major_version: proto.file_major_version,
165            file_minor_version: proto.file_minor_version,
166            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
167            base_id: proto.base_id,
168        })
169    }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
173#[serde(rename_all = "lowercase")]
174pub enum DeletionFileType {
175    Array,
176    Bitmap,
177}
178
179impl DeletionFileType {
180    // TODO: pub(crate)
181    pub fn suffix(&self) -> &str {
182        match self {
183            Self::Array => "arrow",
184            Self::Bitmap => "bin",
185        }
186    }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
190pub struct DeletionFile {
191    pub read_version: u64,
192    pub id: u64,
193    pub file_type: DeletionFileType,
194    /// Number of deleted rows in this file. If None, this is unknown.
195    pub num_deleted_rows: Option<usize>,
196    pub base_id: Option<u32>,
197}
198
199impl TryFrom<pb::DeletionFile> for DeletionFile {
200    type Error = Error;
201
202    fn try_from(value: pb::DeletionFile) -> Result<Self> {
203        let file_type = match value.file_type {
204            0 => DeletionFileType::Array,
205            1 => DeletionFileType::Bitmap,
206            _ => {
207                return Err(Error::NotSupported {
208                    source: "Unknown deletion file type".into(),
209                    location: location!(),
210                })
211            }
212        };
213        let num_deleted_rows = if value.num_deleted_rows == 0 {
214            None
215        } else {
216            Some(value.num_deleted_rows as usize)
217        };
218        Ok(Self {
219            read_version: value.read_version,
220            id: value.id,
221            file_type,
222            num_deleted_rows,
223            base_id: value.base_id,
224        })
225    }
226}
227
228/// A reference to a part of a file.
229#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
230pub struct ExternalFile {
231    pub path: String,
232    pub offset: u64,
233    pub size: u64,
234}
235
236/// Metadata about location of the row id sequence.
237#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
238pub enum RowIdMeta {
239    Inline(Vec<u8>),
240    External(ExternalFile),
241}
242
243impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
244    type Error = Error;
245
246    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
247        match value {
248            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
249            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
250                Ok(Self::External(ExternalFile {
251                    path: file.path.clone(),
252                    offset: file.offset,
253                    size: file.size,
254                }))
255            }
256        }
257    }
258}
259
260/// Data fragment.
261///
262/// A fragment is a set of files which represent the different columns of the same rows.
263/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
264#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
265pub struct Fragment {
266    /// Fragment ID
267    pub id: u64,
268
269    /// Files within the fragment.
270    pub files: Vec<DataFile>,
271
272    /// Optional file with deleted local row offsets.
273    #[serde(skip_serializing_if = "Option::is_none")]
274    pub deletion_file: Option<DeletionFile>,
275
276    /// RowIndex
277    #[serde(skip_serializing_if = "Option::is_none")]
278    pub row_id_meta: Option<RowIdMeta>,
279
280    /// Original number of rows in the fragment. If this is None, then it is
281    /// unknown. This is only optional for legacy reasons. All new tables should
282    /// have this set.
283    pub physical_rows: Option<usize>,
284}
285
286impl Fragment {
287    pub fn new(id: u64) -> Self {
288        Self {
289            id,
290            files: vec![],
291            deletion_file: None,
292            row_id_meta: None,
293            physical_rows: None,
294        }
295    }
296
297    pub fn num_rows(&self) -> Option<usize> {
298        match (self.physical_rows, &self.deletion_file) {
299            // Known fragment length, no deletion file.
300            (Some(len), None) => Some(len),
301            // Known fragment length, but don't know deletion file size.
302            (
303                Some(len),
304                Some(DeletionFile {
305                    num_deleted_rows: Some(num_deleted_rows),
306                    ..
307                }),
308            ) => Some(len - num_deleted_rows),
309            _ => None,
310        }
311    }
312
313    pub fn from_json(json: &str) -> Result<Self> {
314        let fragment: Self = serde_json::from_str(json)?;
315        Ok(fragment)
316    }
317
318    /// Create a `Fragment` with one DataFile
319    pub fn with_file_legacy(
320        id: u64,
321        path: &str,
322        schema: &Schema,
323        physical_rows: Option<usize>,
324    ) -> Self {
325        Self {
326            id,
327            files: vec![DataFile::new_legacy(path, schema, None)],
328            deletion_file: None,
329            physical_rows,
330            row_id_meta: None,
331        }
332    }
333
334    pub fn with_file(
335        mut self,
336        path: impl Into<String>,
337        field_ids: Vec<i32>,
338        column_indices: Vec<i32>,
339        version: &LanceFileVersion,
340        file_size_bytes: Option<NonZero<u64>>,
341    ) -> Self {
342        let (major, minor) = version.to_numbers();
343        let data_file = DataFile::new(
344            path,
345            field_ids,
346            column_indices,
347            major,
348            minor,
349            file_size_bytes,
350        );
351        self.files.push(data_file);
352        self
353    }
354
355    pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
356        self.physical_rows = Some(physical_rows);
357        self
358    }
359
360    pub fn add_file(
361        &mut self,
362        path: impl Into<String>,
363        field_ids: Vec<i32>,
364        column_indices: Vec<i32>,
365        version: &LanceFileVersion,
366        file_size_bytes: Option<NonZero<u64>>,
367    ) {
368        let (major, minor) = version.to_numbers();
369        self.files.push(DataFile::new(
370            path,
371            field_ids,
372            column_indices,
373            major,
374            minor,
375            file_size_bytes,
376        ));
377    }
378
379    /// Add a new [`DataFile`] to this fragment.
380    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
381        self.files.push(DataFile::new_legacy(path, schema, None));
382    }
383
384    // True if this fragment is made up of legacy v1 files, false otherwise
385    pub fn has_legacy_files(&self) -> bool {
386        // If any file in a fragment is legacy then all files in the fragment must be
387        self.files[0].is_legacy_file()
388    }
389
390    // Helper method to infer the Lance version from a set of fragments
391    //
392    // Returns None if there are no data files
393    // Returns an error if the data files have different versions
394    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
395        // Otherwise we need to check the actual file versions
396        // Determine version from first file
397        let Some(sample_file) = fragments
398            .iter()
399            .find(|f| !f.files.is_empty())
400            .map(|f| &f.files[0])
401        else {
402            return Ok(None);
403        };
404        let file_version = LanceFileVersion::try_from_major_minor(
405            sample_file.file_major_version,
406            sample_file.file_minor_version,
407        )?;
408        // Ensure all files match
409        for frag in fragments {
410            for file in &frag.files {
411                let this_file_version = LanceFileVersion::try_from_major_minor(
412                    file.file_major_version,
413                    file.file_minor_version,
414                )?;
415                if file_version != this_file_version {
416                    return Err(Error::invalid_input(
417                        format!(
418                            "All data files must have the same version.  Detected both {} and {}",
419                            file_version, this_file_version
420                        ),
421                        location!(),
422                    ));
423                }
424            }
425        }
426        Ok(Some(file_version))
427    }
428}
429
430impl TryFrom<pb::DataFragment> for Fragment {
431    type Error = Error;
432
433    fn try_from(p: pb::DataFragment) -> Result<Self> {
434        let physical_rows = if p.physical_rows > 0 {
435            Some(p.physical_rows as usize)
436        } else {
437            None
438        };
439        Ok(Self {
440            id: p.id,
441            files: p
442                .files
443                .into_iter()
444                .map(DataFile::try_from)
445                .collect::<Result<_>>()?,
446            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
447            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
448            physical_rows,
449        })
450    }
451}
452
453impl From<&Fragment> for pb::DataFragment {
454    fn from(f: &Fragment) -> Self {
455        let deletion_file = f.deletion_file.as_ref().map(|f| {
456            let file_type = match f.file_type {
457                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
458                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
459            };
460            pb::DeletionFile {
461                read_version: f.read_version,
462                id: f.id,
463                file_type: file_type.into(),
464                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
465                base_id: f.base_id,
466            }
467        });
468
469        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
470            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
471            RowIdMeta::External(file) => {
472                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
473                    path: file.path.clone(),
474                    offset: file.offset,
475                    size: file.size,
476                })
477            }
478        });
479
480        Self {
481            id: f.id,
482            files: f.files.iter().map(pb::DataFile::from).collect(),
483            deletion_file,
484            row_id_sequence,
485            physical_rows: f.physical_rows.unwrap_or_default() as u64,
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use arrow_schema::{
494        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
495    };
496    use serde_json::{json, Value};
497
498    #[test]
499    fn test_new_fragment() {
500        let path = "foobar.lance";
501
502        let arrow_schema = ArrowSchema::new(vec![
503            ArrowField::new(
504                "s",
505                DataType::Struct(ArrowFields::from(vec![
506                    ArrowField::new("si", DataType::Int32, false),
507                    ArrowField::new("sb", DataType::Binary, true),
508                ])),
509                true,
510            ),
511            ArrowField::new("bool", DataType::Boolean, true),
512        ]);
513        let schema = Schema::try_from(&arrow_schema).unwrap();
514        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
515
516        assert_eq!(123, fragment.id);
517        assert_eq!(
518            fragment.files,
519            vec![DataFile::new_legacy_from_fields(
520                path.to_string(),
521                vec![0, 1, 2, 3],
522            )]
523        )
524    }
525
526    #[test]
527    fn test_roundtrip_fragment() {
528        let mut fragment = Fragment::new(123);
529        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
530        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
531        fragment.deletion_file = Some(DeletionFile {
532            read_version: 123,
533            id: 456,
534            file_type: DeletionFileType::Array,
535            num_deleted_rows: Some(10),
536            base_id: None,
537        });
538
539        let proto = pb::DataFragment::from(&fragment);
540        let fragment2 = Fragment::try_from(proto).unwrap();
541        assert_eq!(fragment, fragment2);
542
543        fragment.deletion_file = None;
544        let proto = pb::DataFragment::from(&fragment);
545        let fragment2 = Fragment::try_from(proto).unwrap();
546        assert_eq!(fragment, fragment2);
547    }
548
549    #[test]
550    fn test_to_json() {
551        let mut fragment = Fragment::new(123);
552        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
553        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
554        fragment.deletion_file = Some(DeletionFile {
555            read_version: 123,
556            id: 456,
557            file_type: DeletionFileType::Array,
558            num_deleted_rows: Some(10),
559            base_id: None,
560        });
561
562        let json = serde_json::to_string(&fragment).unwrap();
563
564        let value: Value = serde_json::from_str(&json).unwrap();
565        assert_eq!(
566            value,
567            json!({
568                "id": 123,
569                "files":[
570                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
571                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
572                     "file_size_bytes": null, "base_id": null }
573                ],
574                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
575                                  "num_deleted_rows": 10, "base_id": null},
576                "physical_rows": None::<usize>}),
577        );
578
579        let frag2 = Fragment::from_json(&json).unwrap();
580        assert_eq!(fragment, frag2);
581    }
582}