lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use crate::rowids::version::{
18    created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb, RowDatasetVersionMeta,
19};
20use lance_core::datatypes::Schema;
21use lance_core::error::Result;
22
23/// Lance Data File
24///
25/// A data file is one piece of file storing data.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
27pub struct DataFile {
28    /// Relative path of the data file to dataset root.
29    pub path: String,
30    /// The ids of fields in this file.
31    pub fields: Vec<i32>,
32    /// The offsets of the fields listed in `fields`, empty in v1 files
33    ///
34    /// Note that -1 is a possibility and it indices that the field has
35    /// no top-level column in the file.
36    #[serde(default)]
37    pub column_indices: Vec<i32>,
38    /// The major version of the file format used to write this file.
39    #[serde(default)]
40    pub file_major_version: u32,
41    /// The minor version of the file format used to write this file.
42    #[serde(default)]
43    pub file_minor_version: u32,
44
45    /// The size of the file in bytes, if known.
46    pub file_size_bytes: CachedFileSize,
47
48    /// The base path of the datafile, when the datafile is outside the dataset.
49    pub base_id: Option<u32>,
50}
51
52impl DataFile {
53    pub fn new(
54        path: impl Into<String>,
55        fields: Vec<i32>,
56        column_indices: Vec<i32>,
57        file_major_version: u32,
58        file_minor_version: u32,
59        file_size_bytes: Option<NonZero<u64>>,
60        base_id: Option<u32>,
61    ) -> Self {
62        Self {
63            path: path.into(),
64            fields,
65            column_indices,
66            file_major_version,
67            file_minor_version,
68            file_size_bytes: file_size_bytes.into(),
69            base_id,
70        }
71    }
72
73    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
74    pub fn new_unstarted(
75        path: impl Into<String>,
76        file_major_version: u32,
77        file_minor_version: u32,
78    ) -> Self {
79        Self {
80            path: path.into(),
81            fields: vec![],
82            column_indices: vec![],
83            file_major_version,
84            file_minor_version,
85            file_size_bytes: Default::default(),
86            base_id: None,
87        }
88    }
89
90    pub fn new_legacy_from_fields(
91        path: impl Into<String>,
92        fields: Vec<i32>,
93        base_id: Option<u32>,
94    ) -> Self {
95        Self::new(
96            path,
97            fields,
98            vec![],
99            MAJOR_VERSION as u32,
100            MINOR_VERSION as u32,
101            None,
102            base_id,
103        )
104    }
105
106    pub fn new_legacy(
107        path: impl Into<String>,
108        schema: &Schema,
109        file_size_bytes: Option<NonZero<u64>>,
110        base_id: Option<u32>,
111    ) -> Self {
112        let mut field_ids = schema.field_ids();
113        field_ids.sort();
114        Self::new(
115            path,
116            field_ids,
117            vec![],
118            MAJOR_VERSION as u32,
119            MINOR_VERSION as u32,
120            file_size_bytes,
121            base_id,
122        )
123    }
124
125    pub fn schema(&self, full_schema: &Schema) -> Schema {
126        full_schema.project_by_ids(&self.fields, false)
127    }
128
129    pub fn is_legacy_file(&self) -> bool {
130        self.file_major_version == 0 && self.file_minor_version < 3
131    }
132
133    pub fn validate(&self, base_path: &Path) -> Result<()> {
134        if self.is_legacy_file() {
135            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
136                return Err(Error::corrupt_file(
137                    base_path.child(self.path.clone()),
138                    "contained unsorted or duplicate field ids",
139                    location!(),
140                ));
141            }
142        } else if self.fields.len() != self.column_indices.len() {
143            return Err(Error::corrupt_file(
144                base_path.child(self.path.clone()),
145                "contained an unequal number of fields / column_indices",
146                location!(),
147            ));
148        }
149        Ok(())
150    }
151}
152
153impl From<&DataFile> for pb::DataFile {
154    fn from(df: &DataFile) -> Self {
155        Self {
156            path: df.path.clone(),
157            fields: df.fields.clone(),
158            column_indices: df.column_indices.clone(),
159            file_major_version: df.file_major_version,
160            file_minor_version: df.file_minor_version,
161            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
162            base_id: df.base_id,
163        }
164    }
165}
166
167impl TryFrom<pb::DataFile> for DataFile {
168    type Error = Error;
169
170    fn try_from(proto: pb::DataFile) -> Result<Self> {
171        Ok(Self {
172            path: proto.path,
173            fields: proto.fields,
174            column_indices: proto.column_indices,
175            file_major_version: proto.file_major_version,
176            file_minor_version: proto.file_minor_version,
177            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
178            base_id: proto.base_id,
179        })
180    }
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
184#[serde(rename_all = "lowercase")]
185pub enum DeletionFileType {
186    Array,
187    Bitmap,
188}
189
190impl DeletionFileType {
191    // TODO: pub(crate)
192    pub fn suffix(&self) -> &str {
193        match self {
194            Self::Array => "arrow",
195            Self::Bitmap => "bin",
196        }
197    }
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
201pub struct DeletionFile {
202    pub read_version: u64,
203    pub id: u64,
204    pub file_type: DeletionFileType,
205    /// Number of deleted rows in this file. If None, this is unknown.
206    pub num_deleted_rows: Option<usize>,
207    pub base_id: Option<u32>,
208}
209
210impl TryFrom<pb::DeletionFile> for DeletionFile {
211    type Error = Error;
212
213    fn try_from(value: pb::DeletionFile) -> Result<Self> {
214        let file_type = match value.file_type {
215            0 => DeletionFileType::Array,
216            1 => DeletionFileType::Bitmap,
217            _ => {
218                return Err(Error::NotSupported {
219                    source: "Unknown deletion file type".into(),
220                    location: location!(),
221                })
222            }
223        };
224        let num_deleted_rows = if value.num_deleted_rows == 0 {
225            None
226        } else {
227            Some(value.num_deleted_rows as usize)
228        };
229        Ok(Self {
230            read_version: value.read_version,
231            id: value.id,
232            file_type,
233            num_deleted_rows,
234            base_id: value.base_id,
235        })
236    }
237}
238
239/// A reference to a part of a file.
240#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
241pub struct ExternalFile {
242    pub path: String,
243    pub offset: u64,
244    pub size: u64,
245}
246
247/// Metadata about location of the row id sequence.
248#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
249pub enum RowIdMeta {
250    Inline(Vec<u8>),
251    External(ExternalFile),
252}
253
254impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
255    type Error = Error;
256
257    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
258        match value {
259            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
260            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
261                Ok(Self::External(ExternalFile {
262                    path: file.path.clone(),
263                    offset: file.offset,
264                    size: file.size,
265                }))
266            }
267        }
268    }
269}
270
271/// Data fragment.
272///
273/// A fragment is a set of files which represent the different columns of the same rows.
274/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
275#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
276pub struct Fragment {
277    /// Fragment ID
278    pub id: u64,
279
280    /// Files within the fragment.
281    pub files: Vec<DataFile>,
282
283    /// Optional file with deleted local row offsets.
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub deletion_file: Option<DeletionFile>,
286
287    /// RowIndex
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub row_id_meta: Option<RowIdMeta>,
290
291    /// Original number of rows in the fragment. If this is None, then it is
292    /// unknown. This is only optional for legacy reasons. All new tables should
293    /// have this set.
294    pub physical_rows: Option<usize>,
295
296    /// Last updated at version metadata
297    #[serde(skip_serializing_if = "Option::is_none")]
298    pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
299
300    /// Created at version metadata
301    #[serde(skip_serializing_if = "Option::is_none")]
302    pub created_at_version_meta: Option<RowDatasetVersionMeta>,
303}
304
305impl Fragment {
306    pub fn new(id: u64) -> Self {
307        Self {
308            id,
309            files: vec![],
310            deletion_file: None,
311            row_id_meta: None,
312            physical_rows: None,
313            last_updated_at_version_meta: None,
314            created_at_version_meta: None,
315        }
316    }
317
318    pub fn num_rows(&self) -> Option<usize> {
319        match (self.physical_rows, &self.deletion_file) {
320            // Known fragment length, no deletion file.
321            (Some(len), None) => Some(len),
322            // Known fragment length, but don't know deletion file size.
323            (
324                Some(len),
325                Some(DeletionFile {
326                    num_deleted_rows: Some(num_deleted_rows),
327                    ..
328                }),
329            ) => Some(len - num_deleted_rows),
330            _ => None,
331        }
332    }
333
334    pub fn from_json(json: &str) -> Result<Self> {
335        let fragment: Self = serde_json::from_str(json)?;
336        Ok(fragment)
337    }
338
339    /// Create a `Fragment` with one DataFile
340    pub fn with_file_legacy(
341        id: u64,
342        path: &str,
343        schema: &Schema,
344        physical_rows: Option<usize>,
345    ) -> Self {
346        Self {
347            id,
348            files: vec![DataFile::new_legacy(path, schema, None, None)],
349            deletion_file: None,
350            physical_rows,
351            row_id_meta: None,
352            last_updated_at_version_meta: None,
353            created_at_version_meta: None,
354        }
355    }
356
357    pub fn with_file(
358        mut self,
359        path: impl Into<String>,
360        field_ids: Vec<i32>,
361        column_indices: Vec<i32>,
362        version: &LanceFileVersion,
363        file_size_bytes: Option<NonZero<u64>>,
364    ) -> Self {
365        let (major, minor) = version.to_numbers();
366        let data_file = DataFile::new(
367            path,
368            field_ids,
369            column_indices,
370            major,
371            minor,
372            file_size_bytes,
373            None,
374        );
375        self.files.push(data_file);
376        self
377    }
378
379    pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
380        self.physical_rows = Some(physical_rows);
381        self
382    }
383
384    pub fn add_file(
385        &mut self,
386        path: impl Into<String>,
387        field_ids: Vec<i32>,
388        column_indices: Vec<i32>,
389        version: &LanceFileVersion,
390        file_size_bytes: Option<NonZero<u64>>,
391    ) {
392        let (major, minor) = version.to_numbers();
393        self.files.push(DataFile::new(
394            path,
395            field_ids,
396            column_indices,
397            major,
398            minor,
399            file_size_bytes,
400            None,
401        ));
402    }
403
404    /// Add a new [`DataFile`] to this fragment.
405    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
406        self.files
407            .push(DataFile::new_legacy(path, schema, None, None));
408    }
409
410    // True if this fragment is made up of legacy v1 files, false otherwise
411    pub fn has_legacy_files(&self) -> bool {
412        // If any file in a fragment is legacy then all files in the fragment must be
413        self.files[0].is_legacy_file()
414    }
415
416    // Helper method to infer the Lance version from a set of fragments
417    //
418    // Returns None if there are no data files
419    // Returns an error if the data files have different versions
420    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
421        // Otherwise we need to check the actual file versions
422        // Determine version from first file
423        let Some(sample_file) = fragments
424            .iter()
425            .find(|f| !f.files.is_empty())
426            .map(|f| &f.files[0])
427        else {
428            return Ok(None);
429        };
430        let file_version = LanceFileVersion::try_from_major_minor(
431            sample_file.file_major_version,
432            sample_file.file_minor_version,
433        )?;
434        // Ensure all files match
435        for frag in fragments {
436            for file in &frag.files {
437                let this_file_version = LanceFileVersion::try_from_major_minor(
438                    file.file_major_version,
439                    file.file_minor_version,
440                )?;
441                if file_version != this_file_version {
442                    return Err(Error::invalid_input(
443                        format!(
444                            "All data files must have the same version.  Detected both {} and {}",
445                            file_version, this_file_version
446                        ),
447                        location!(),
448                    ));
449                }
450            }
451        }
452        Ok(Some(file_version))
453    }
454}
455
456impl TryFrom<pb::DataFragment> for Fragment {
457    type Error = Error;
458
459    fn try_from(p: pb::DataFragment) -> Result<Self> {
460        let physical_rows = if p.physical_rows > 0 {
461            Some(p.physical_rows as usize)
462        } else {
463            None
464        };
465        Ok(Self {
466            id: p.id,
467            files: p
468                .files
469                .into_iter()
470                .map(DataFile::try_from)
471                .collect::<Result<_>>()?,
472            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
473            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
474            physical_rows,
475            last_updated_at_version_meta: p
476                .last_updated_at_version_sequence
477                .map(RowDatasetVersionMeta::try_from)
478                .transpose()?,
479            created_at_version_meta: p
480                .created_at_version_sequence
481                .map(RowDatasetVersionMeta::try_from)
482                .transpose()?,
483        })
484    }
485}
486
487impl From<&Fragment> for pb::DataFragment {
488    fn from(f: &Fragment) -> Self {
489        let deletion_file = f.deletion_file.as_ref().map(|f| {
490            let file_type = match f.file_type {
491                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
492                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
493            };
494            pb::DeletionFile {
495                read_version: f.read_version,
496                id: f.id,
497                file_type: file_type.into(),
498                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
499                base_id: f.base_id,
500            }
501        });
502
503        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
504            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
505            RowIdMeta::External(file) => {
506                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
507                    path: file.path.clone(),
508                    offset: file.offset,
509                    size: file.size,
510                })
511            }
512        });
513        let last_updated_at_version_sequence =
514            last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
515        let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
516        Self {
517            id: f.id,
518            files: f.files.iter().map(pb::DataFile::from).collect(),
519            deletion_file,
520            row_id_sequence,
521            physical_rows: f.physical_rows.unwrap_or_default() as u64,
522            last_updated_at_version_sequence,
523            created_at_version_sequence,
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use arrow_schema::{
532        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
533    };
534    use serde_json::{json, Value};
535
536    #[test]
537    fn test_new_fragment() {
538        let path = "foobar.lance";
539
540        let arrow_schema = ArrowSchema::new(vec![
541            ArrowField::new(
542                "s",
543                DataType::Struct(ArrowFields::from(vec![
544                    ArrowField::new("si", DataType::Int32, false),
545                    ArrowField::new("sb", DataType::Binary, true),
546                ])),
547                true,
548            ),
549            ArrowField::new("bool", DataType::Boolean, true),
550        ]);
551        let schema = Schema::try_from(&arrow_schema).unwrap();
552        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
553
554        assert_eq!(123, fragment.id);
555        assert_eq!(
556            fragment.files,
557            vec![DataFile::new_legacy_from_fields(
558                path.to_string(),
559                vec![0, 1, 2, 3],
560                None,
561            )]
562        )
563    }
564
565    #[test]
566    fn test_roundtrip_fragment() {
567        let mut fragment = Fragment::new(123);
568        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
569        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
570        fragment.deletion_file = Some(DeletionFile {
571            read_version: 123,
572            id: 456,
573            file_type: DeletionFileType::Array,
574            num_deleted_rows: Some(10),
575            base_id: None,
576        });
577
578        let proto = pb::DataFragment::from(&fragment);
579        let fragment2 = Fragment::try_from(proto).unwrap();
580        assert_eq!(fragment, fragment2);
581
582        fragment.deletion_file = None;
583        let proto = pb::DataFragment::from(&fragment);
584        let fragment2 = Fragment::try_from(proto).unwrap();
585        assert_eq!(fragment, fragment2);
586    }
587
588    #[test]
589    fn test_to_json() {
590        let mut fragment = Fragment::new(123);
591        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
592        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
593        fragment.deletion_file = Some(DeletionFile {
594            read_version: 123,
595            id: 456,
596            file_type: DeletionFileType::Array,
597            num_deleted_rows: Some(10),
598            base_id: None,
599        });
600
601        let json = serde_json::to_string(&fragment).unwrap();
602
603        let value: Value = serde_json::from_str(&json).unwrap();
604        assert_eq!(
605            value,
606            json!({
607                "id": 123,
608                "files":[
609                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
610                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
611                     "file_size_bytes": null, "base_id": null }
612                ],
613                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
614                                  "num_deleted_rows": 10, "base_id": null},
615                "physical_rows": None::<usize>}),
616        );
617
618        let frag2 = Fragment::from_json(&json).unwrap();
619        assert_eq!(fragment, frag2);
620    }
621}