lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use deepsize::DeepSizeOf;
5use lance_core::Error;
6use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
7use lance_file::version::LanceFileVersion;
8use object_store::path::Path;
9use serde::{Deserialize, Serialize};
10use snafu::location;
11
12use crate::format::pb;
13
14use lance_core::datatypes::Schema;
15use lance_core::error::Result;
16
17/// Lance Data File
18///
19/// A data file is one piece of file storing data.
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
21pub struct DataFile {
22    /// Relative path of the data file to dataset root.
23    pub path: String,
24    /// The ids of fields in this file.
25    pub fields: Vec<i32>,
26    /// The offsets of the fields listed in `fields`, empty in v1 files
27    ///
28    /// Note that -1 is a possibility and it indices that the field has
29    /// no top-level column in the file.
30    #[serde(default)]
31    pub column_indices: Vec<i32>,
32    /// The major version of the file format used to write this file.
33    #[serde(default)]
34    pub file_major_version: u32,
35    /// The minor version of the file format used to write this file.
36    #[serde(default)]
37    pub file_minor_version: u32,
38}
39
40impl DataFile {
41    pub fn new(
42        path: impl Into<String>,
43        fields: Vec<i32>,
44        column_indices: Vec<i32>,
45        file_major_version: u32,
46        file_minor_version: u32,
47    ) -> Self {
48        Self {
49            path: path.into(),
50            fields,
51            column_indices,
52            file_major_version,
53            file_minor_version,
54        }
55    }
56
57    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
58    pub fn new_unstarted(
59        path: impl Into<String>,
60        file_major_version: u32,
61        file_minor_version: u32,
62    ) -> Self {
63        Self {
64            path: path.into(),
65            fields: vec![],
66            column_indices: vec![],
67            file_major_version,
68            file_minor_version,
69        }
70    }
71
72    pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
73        Self::new(
74            path,
75            fields,
76            vec![],
77            MAJOR_VERSION as u32,
78            MINOR_VERSION as u32,
79        )
80    }
81
82    pub fn new_legacy(path: impl Into<String>, schema: &Schema) -> Self {
83        let mut field_ids = schema.field_ids();
84        field_ids.sort();
85        Self::new(
86            path,
87            field_ids,
88            vec![],
89            MAJOR_VERSION as u32,
90            MINOR_VERSION as u32,
91        )
92    }
93
94    pub fn schema(&self, full_schema: &Schema) -> Schema {
95        full_schema.project_by_ids(&self.fields, false)
96    }
97
98    pub fn is_legacy_file(&self) -> bool {
99        self.file_major_version == 0 && self.file_minor_version < 3
100    }
101
102    pub fn validate(&self, base_path: &Path) -> Result<()> {
103        if self.is_legacy_file() {
104            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
105                return Err(Error::corrupt_file(
106                    base_path.child(self.path.clone()),
107                    "contained unsorted or duplicate field ids",
108                    location!(),
109                ));
110            }
111        } else if self.fields.len() != self.column_indices.len() {
112            return Err(Error::corrupt_file(
113                base_path.child(self.path.clone()),
114                "contained an unequal number of fields / column_indices",
115                location!(),
116            ));
117        }
118        Ok(())
119    }
120}
121
122impl From<&DataFile> for pb::DataFile {
123    fn from(df: &DataFile) -> Self {
124        Self {
125            path: df.path.clone(),
126            fields: df.fields.clone(),
127            column_indices: df.column_indices.clone(),
128            file_major_version: df.file_major_version,
129            file_minor_version: df.file_minor_version,
130        }
131    }
132}
133
134impl TryFrom<pb::DataFile> for DataFile {
135    type Error = Error;
136
137    fn try_from(proto: pb::DataFile) -> Result<Self> {
138        Ok(Self {
139            path: proto.path,
140            fields: proto.fields,
141            column_indices: proto.column_indices,
142            file_major_version: proto.file_major_version,
143            file_minor_version: proto.file_minor_version,
144        })
145    }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
149#[serde(rename_all = "lowercase")]
150pub enum DeletionFileType {
151    Array,
152    Bitmap,
153}
154
155impl DeletionFileType {
156    // TODO: pub(crate)
157    pub fn suffix(&self) -> &str {
158        match self {
159            Self::Array => "arrow",
160            Self::Bitmap => "bin",
161        }
162    }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
166pub struct DeletionFile {
167    pub read_version: u64,
168    pub id: u64,
169    pub file_type: DeletionFileType,
170    /// Number of deleted rows in this file. If None, this is unknown.
171    pub num_deleted_rows: Option<usize>,
172}
173
174impl TryFrom<pb::DeletionFile> for DeletionFile {
175    type Error = Error;
176
177    fn try_from(value: pb::DeletionFile) -> Result<Self> {
178        let file_type = match value.file_type {
179            0 => DeletionFileType::Array,
180            1 => DeletionFileType::Bitmap,
181            _ => {
182                return Err(Error::NotSupported {
183                    source: "Unknown deletion file type".into(),
184                    location: location!(),
185                })
186            }
187        };
188        let num_deleted_rows = if value.num_deleted_rows == 0 {
189            None
190        } else {
191            Some(value.num_deleted_rows as usize)
192        };
193        Ok(Self {
194            read_version: value.read_version,
195            id: value.id,
196            file_type,
197            num_deleted_rows,
198        })
199    }
200}
201
202/// A reference to a part of a file.
203#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
204pub struct ExternalFile {
205    pub path: String,
206    pub offset: u64,
207    pub size: u64,
208}
209
210/// Metadata about location of the row id sequence.
211#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
212pub enum RowIdMeta {
213    Inline(Vec<u8>),
214    External(ExternalFile),
215}
216
217impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
218    type Error = Error;
219
220    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
221        match value {
222            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
223            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
224                Ok(Self::External(ExternalFile {
225                    path: file.path.clone(),
226                    offset: file.offset,
227                    size: file.size,
228                }))
229            }
230        }
231    }
232}
233
234/// Data fragment.
235///
236/// A fragment is a set of files which represent the different columns of the same rows.
237/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
238#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
239pub struct Fragment {
240    /// Fragment ID
241    pub id: u64,
242
243    /// Files within the fragment.
244    pub files: Vec<DataFile>,
245
246    /// Optional file with deleted local row offsets.
247    #[serde(skip_serializing_if = "Option::is_none")]
248    pub deletion_file: Option<DeletionFile>,
249
250    /// RowIndex
251    #[serde(skip_serializing_if = "Option::is_none")]
252    pub row_id_meta: Option<RowIdMeta>,
253
254    /// Original number of rows in the fragment. If this is None, then it is
255    /// unknown. This is only optional for legacy reasons. All new tables should
256    /// have this set.
257    pub physical_rows: Option<usize>,
258}
259
260impl Fragment {
261    pub fn new(id: u64) -> Self {
262        Self {
263            id,
264            files: vec![],
265            deletion_file: None,
266            row_id_meta: None,
267            physical_rows: None,
268        }
269    }
270
271    pub fn num_rows(&self) -> Option<usize> {
272        match (self.physical_rows, &self.deletion_file) {
273            // Known fragment length, no deletion file.
274            (Some(len), None) => Some(len),
275            // Known fragment length, but don't know deletion file size.
276            (
277                Some(len),
278                Some(DeletionFile {
279                    num_deleted_rows: Some(num_deleted_rows),
280                    ..
281                }),
282            ) => Some(len - num_deleted_rows),
283            _ => None,
284        }
285    }
286
287    pub fn from_json(json: &str) -> Result<Self> {
288        let fragment: Self = serde_json::from_str(json)?;
289        Ok(fragment)
290    }
291
292    /// Create a `Fragment` with one DataFile
293    pub fn with_file_legacy(
294        id: u64,
295        path: &str,
296        schema: &Schema,
297        physical_rows: Option<usize>,
298    ) -> Self {
299        Self {
300            id,
301            files: vec![DataFile::new_legacy(path, schema)],
302            deletion_file: None,
303            physical_rows,
304            row_id_meta: None,
305        }
306    }
307
308    pub fn add_file(
309        &mut self,
310        path: impl Into<String>,
311        field_ids: Vec<i32>,
312        column_indices: Vec<i32>,
313        version: &LanceFileVersion,
314    ) {
315        let (major, minor) = version.to_numbers();
316        self.files
317            .push(DataFile::new(path, field_ids, column_indices, major, minor));
318    }
319
320    /// Add a new [`DataFile`] to this fragment.
321    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
322        self.files.push(DataFile::new_legacy(path, schema));
323    }
324
325    // True if this fragment is made up of legacy v1 files, false otherwise
326    pub fn has_legacy_files(&self) -> bool {
327        // If any file in a fragment is legacy then all files in the fragment must be
328        self.files[0].is_legacy_file()
329    }
330
331    // Helper method to infer the Lance version from a set of fragments
332    //
333    // Returns None if there are no data files
334    // Returns an error if the data files have different versions
335    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
336        // Otherwise we need to check the actual file versions
337        // Determine version from first file
338        let Some(sample_file) = fragments
339            .iter()
340            .find(|f| !f.files.is_empty())
341            .map(|f| &f.files[0])
342        else {
343            return Ok(None);
344        };
345        let file_version = LanceFileVersion::try_from_major_minor(
346            sample_file.file_major_version,
347            sample_file.file_minor_version,
348        )?;
349        // Ensure all files match
350        for frag in fragments {
351            for file in &frag.files {
352                let this_file_version = LanceFileVersion::try_from_major_minor(
353                    file.file_major_version,
354                    file.file_minor_version,
355                )?;
356                if file_version != this_file_version {
357                    return Err(Error::invalid_input(
358                        format!(
359                            "All data files must have the same version.  Detected both {} and {}",
360                            file_version, this_file_version
361                        ),
362                        location!(),
363                    ));
364                }
365            }
366        }
367        Ok(Some(file_version))
368    }
369}
370
371impl TryFrom<pb::DataFragment> for Fragment {
372    type Error = Error;
373
374    fn try_from(p: pb::DataFragment) -> Result<Self> {
375        let physical_rows = if p.physical_rows > 0 {
376            Some(p.physical_rows as usize)
377        } else {
378            None
379        };
380        Ok(Self {
381            id: p.id,
382            files: p
383                .files
384                .into_iter()
385                .map(DataFile::try_from)
386                .collect::<Result<_>>()?,
387            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
388            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
389            physical_rows,
390        })
391    }
392}
393
394impl From<&Fragment> for pb::DataFragment {
395    fn from(f: &Fragment) -> Self {
396        let deletion_file = f.deletion_file.as_ref().map(|f| {
397            let file_type = match f.file_type {
398                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
399                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
400            };
401            pb::DeletionFile {
402                read_version: f.read_version,
403                id: f.id,
404                file_type: file_type.into(),
405                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
406            }
407        });
408
409        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
410            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
411            RowIdMeta::External(file) => {
412                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
413                    path: file.path.clone(),
414                    offset: file.offset,
415                    size: file.size,
416                })
417            }
418        });
419
420        Self {
421            id: f.id,
422            files: f.files.iter().map(pb::DataFile::from).collect(),
423            deletion_file,
424            row_id_sequence,
425            physical_rows: f.physical_rows.unwrap_or_default() as u64,
426        }
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    use arrow_schema::{
434        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
435    };
436    use serde_json::{json, Value};
437
438    #[test]
439    fn test_new_fragment() {
440        let path = "foobar.lance";
441
442        let arrow_schema = ArrowSchema::new(vec![
443            ArrowField::new(
444                "s",
445                DataType::Struct(ArrowFields::from(vec![
446                    ArrowField::new("si", DataType::Int32, false),
447                    ArrowField::new("sb", DataType::Binary, true),
448                ])),
449                true,
450            ),
451            ArrowField::new("bool", DataType::Boolean, true),
452        ]);
453        let schema = Schema::try_from(&arrow_schema).unwrap();
454        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
455
456        assert_eq!(123, fragment.id);
457        assert_eq!(
458            fragment.files,
459            vec![DataFile::new_legacy_from_fields(
460                path.to_string(),
461                vec![0, 1, 2, 3],
462            )]
463        )
464    }
465
466    #[test]
467    fn test_roundtrip_fragment() {
468        let mut fragment = Fragment::new(123);
469        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
470        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
471        fragment.deletion_file = Some(DeletionFile {
472            read_version: 123,
473            id: 456,
474            file_type: DeletionFileType::Array,
475            num_deleted_rows: Some(10),
476        });
477
478        let proto = pb::DataFragment::from(&fragment);
479        let fragment2 = Fragment::try_from(proto).unwrap();
480        assert_eq!(fragment, fragment2);
481
482        fragment.deletion_file = None;
483        let proto = pb::DataFragment::from(&fragment);
484        let fragment2 = Fragment::try_from(proto).unwrap();
485        assert_eq!(fragment, fragment2);
486    }
487
488    #[test]
489    fn test_to_json() {
490        let mut fragment = Fragment::new(123);
491        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
492        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
493        fragment.deletion_file = Some(DeletionFile {
494            read_version: 123,
495            id: 456,
496            file_type: DeletionFileType::Array,
497            num_deleted_rows: Some(10),
498        });
499
500        let json = serde_json::to_string(&fragment).unwrap();
501
502        let value: Value = serde_json::from_str(&json).unwrap();
503        assert_eq!(
504            value,
505            json!({
506                "id": 123,
507                "files":[
508                    {"path": "foobar.lance", "fields": [0], "column_indices": [], "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION}],
509                     "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
510                                       "num_deleted_rows": 10},
511                "physical_rows": None::<usize>}),
512        );
513
514        let frag2 = Fragment::from_json(&json).unwrap();
515        assert_eq!(fragment, frag2);
516    }
517}