lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use lance_core::datatypes::Schema;
18use lance_core::error::Result;
19
20/// Lance Data File
21///
22/// A data file is one piece of file storing data.
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct DataFile {
25    /// Relative path of the data file to dataset root.
26    pub path: String,
27    /// The ids of fields in this file.
28    pub fields: Vec<i32>,
29    /// The offsets of the fields listed in `fields`, empty in v1 files
30    ///
31    /// Note that -1 is a possibility and it indices that the field has
32    /// no top-level column in the file.
33    #[serde(default)]
34    pub column_indices: Vec<i32>,
35    /// The major version of the file format used to write this file.
36    #[serde(default)]
37    pub file_major_version: u32,
38    /// The minor version of the file format used to write this file.
39    #[serde(default)]
40    pub file_minor_version: u32,
41
42    /// The size of the file in bytes, if known.
43    pub file_size_bytes: CachedFileSize,
44}
45
46impl DataFile {
47    pub fn new(
48        path: impl Into<String>,
49        fields: Vec<i32>,
50        column_indices: Vec<i32>,
51        file_major_version: u32,
52        file_minor_version: u32,
53        file_size_bytes: Option<NonZero<u64>>,
54    ) -> Self {
55        Self {
56            path: path.into(),
57            fields,
58            column_indices,
59            file_major_version,
60            file_minor_version,
61            file_size_bytes: file_size_bytes.into(),
62        }
63    }
64
65    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
66    pub fn new_unstarted(
67        path: impl Into<String>,
68        file_major_version: u32,
69        file_minor_version: u32,
70    ) -> Self {
71        Self {
72            path: path.into(),
73            fields: vec![],
74            column_indices: vec![],
75            file_major_version,
76            file_minor_version,
77            file_size_bytes: Default::default(),
78        }
79    }
80
81    pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
82        Self::new(
83            path,
84            fields,
85            vec![],
86            MAJOR_VERSION as u32,
87            MINOR_VERSION as u32,
88            None,
89        )
90    }
91
92    pub fn new_legacy(
93        path: impl Into<String>,
94        schema: &Schema,
95        file_size_bytes: Option<NonZero<u64>>,
96    ) -> Self {
97        let mut field_ids = schema.field_ids();
98        field_ids.sort();
99        Self::new(
100            path,
101            field_ids,
102            vec![],
103            MAJOR_VERSION as u32,
104            MINOR_VERSION as u32,
105            file_size_bytes,
106        )
107    }
108
109    pub fn schema(&self, full_schema: &Schema) -> Schema {
110        full_schema.project_by_ids(&self.fields, false)
111    }
112
113    pub fn is_legacy_file(&self) -> bool {
114        self.file_major_version == 0 && self.file_minor_version < 3
115    }
116
117    pub fn validate(&self, base_path: &Path) -> Result<()> {
118        if self.is_legacy_file() {
119            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
120                return Err(Error::corrupt_file(
121                    base_path.child(self.path.clone()),
122                    "contained unsorted or duplicate field ids",
123                    location!(),
124                ));
125            }
126        } else if self.fields.len() != self.column_indices.len() {
127            return Err(Error::corrupt_file(
128                base_path.child(self.path.clone()),
129                "contained an unequal number of fields / column_indices",
130                location!(),
131            ));
132        }
133        Ok(())
134    }
135}
136
137impl From<&DataFile> for pb::DataFile {
138    fn from(df: &DataFile) -> Self {
139        Self {
140            path: df.path.clone(),
141            fields: df.fields.clone(),
142            column_indices: df.column_indices.clone(),
143            file_major_version: df.file_major_version,
144            file_minor_version: df.file_minor_version,
145            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
146        }
147    }
148}
149
150impl TryFrom<pb::DataFile> for DataFile {
151    type Error = Error;
152
153    fn try_from(proto: pb::DataFile) -> Result<Self> {
154        Ok(Self {
155            path: proto.path,
156            fields: proto.fields,
157            column_indices: proto.column_indices,
158            file_major_version: proto.file_major_version,
159            file_minor_version: proto.file_minor_version,
160            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
161        })
162    }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
166#[serde(rename_all = "lowercase")]
167pub enum DeletionFileType {
168    Array,
169    Bitmap,
170}
171
172impl DeletionFileType {
173    // TODO: pub(crate)
174    pub fn suffix(&self) -> &str {
175        match self {
176            Self::Array => "arrow",
177            Self::Bitmap => "bin",
178        }
179    }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
183pub struct DeletionFile {
184    pub read_version: u64,
185    pub id: u64,
186    pub file_type: DeletionFileType,
187    /// Number of deleted rows in this file. If None, this is unknown.
188    pub num_deleted_rows: Option<usize>,
189}
190
191impl TryFrom<pb::DeletionFile> for DeletionFile {
192    type Error = Error;
193
194    fn try_from(value: pb::DeletionFile) -> Result<Self> {
195        let file_type = match value.file_type {
196            0 => DeletionFileType::Array,
197            1 => DeletionFileType::Bitmap,
198            _ => {
199                return Err(Error::NotSupported {
200                    source: "Unknown deletion file type".into(),
201                    location: location!(),
202                })
203            }
204        };
205        let num_deleted_rows = if value.num_deleted_rows == 0 {
206            None
207        } else {
208            Some(value.num_deleted_rows as usize)
209        };
210        Ok(Self {
211            read_version: value.read_version,
212            id: value.id,
213            file_type,
214            num_deleted_rows,
215        })
216    }
217}
218
219/// A reference to a part of a file.
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
221pub struct ExternalFile {
222    pub path: String,
223    pub offset: u64,
224    pub size: u64,
225}
226
227/// Metadata about location of the row id sequence.
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
229pub enum RowIdMeta {
230    Inline(Vec<u8>),
231    External(ExternalFile),
232}
233
234impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
235    type Error = Error;
236
237    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
238        match value {
239            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
240            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
241                Ok(Self::External(ExternalFile {
242                    path: file.path.clone(),
243                    offset: file.offset,
244                    size: file.size,
245                }))
246            }
247        }
248    }
249}
250
251/// Data fragment.
252///
253/// A fragment is a set of files which represent the different columns of the same rows.
254/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
255#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
256pub struct Fragment {
257    /// Fragment ID
258    pub id: u64,
259
260    /// Files within the fragment.
261    pub files: Vec<DataFile>,
262
263    /// Optional file with deleted local row offsets.
264    #[serde(skip_serializing_if = "Option::is_none")]
265    pub deletion_file: Option<DeletionFile>,
266
267    /// RowIndex
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub row_id_meta: Option<RowIdMeta>,
270
271    /// Original number of rows in the fragment. If this is None, then it is
272    /// unknown. This is only optional for legacy reasons. All new tables should
273    /// have this set.
274    pub physical_rows: Option<usize>,
275}
276
277impl Fragment {
278    pub fn new(id: u64) -> Self {
279        Self {
280            id,
281            files: vec![],
282            deletion_file: None,
283            row_id_meta: None,
284            physical_rows: None,
285        }
286    }
287
288    pub fn num_rows(&self) -> Option<usize> {
289        match (self.physical_rows, &self.deletion_file) {
290            // Known fragment length, no deletion file.
291            (Some(len), None) => Some(len),
292            // Known fragment length, but don't know deletion file size.
293            (
294                Some(len),
295                Some(DeletionFile {
296                    num_deleted_rows: Some(num_deleted_rows),
297                    ..
298                }),
299            ) => Some(len - num_deleted_rows),
300            _ => None,
301        }
302    }
303
304    pub fn from_json(json: &str) -> Result<Self> {
305        let fragment: Self = serde_json::from_str(json)?;
306        Ok(fragment)
307    }
308
309    /// Create a `Fragment` with one DataFile
310    pub fn with_file_legacy(
311        id: u64,
312        path: &str,
313        schema: &Schema,
314        physical_rows: Option<usize>,
315    ) -> Self {
316        Self {
317            id,
318            files: vec![DataFile::new_legacy(path, schema, None)],
319            deletion_file: None,
320            physical_rows,
321            row_id_meta: None,
322        }
323    }
324
325    pub fn add_file(
326        &mut self,
327        path: impl Into<String>,
328        field_ids: Vec<i32>,
329        column_indices: Vec<i32>,
330        version: &LanceFileVersion,
331        file_size_bytes: Option<NonZero<u64>>,
332    ) {
333        let (major, minor) = version.to_numbers();
334        self.files.push(DataFile::new(
335            path,
336            field_ids,
337            column_indices,
338            major,
339            minor,
340            file_size_bytes,
341        ));
342    }
343
344    /// Add a new [`DataFile`] to this fragment.
345    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
346        self.files.push(DataFile::new_legacy(path, schema, None));
347    }
348
349    // True if this fragment is made up of legacy v1 files, false otherwise
350    pub fn has_legacy_files(&self) -> bool {
351        // If any file in a fragment is legacy then all files in the fragment must be
352        self.files[0].is_legacy_file()
353    }
354
355    // Helper method to infer the Lance version from a set of fragments
356    //
357    // Returns None if there are no data files
358    // Returns an error if the data files have different versions
359    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
360        // Otherwise we need to check the actual file versions
361        // Determine version from first file
362        let Some(sample_file) = fragments
363            .iter()
364            .find(|f| !f.files.is_empty())
365            .map(|f| &f.files[0])
366        else {
367            return Ok(None);
368        };
369        let file_version = LanceFileVersion::try_from_major_minor(
370            sample_file.file_major_version,
371            sample_file.file_minor_version,
372        )?;
373        // Ensure all files match
374        for frag in fragments {
375            for file in &frag.files {
376                let this_file_version = LanceFileVersion::try_from_major_minor(
377                    file.file_major_version,
378                    file.file_minor_version,
379                )?;
380                if file_version != this_file_version {
381                    return Err(Error::invalid_input(
382                        format!(
383                            "All data files must have the same version.  Detected both {} and {}",
384                            file_version, this_file_version
385                        ),
386                        location!(),
387                    ));
388                }
389            }
390        }
391        Ok(Some(file_version))
392    }
393}
394
395impl TryFrom<pb::DataFragment> for Fragment {
396    type Error = Error;
397
398    fn try_from(p: pb::DataFragment) -> Result<Self> {
399        let physical_rows = if p.physical_rows > 0 {
400            Some(p.physical_rows as usize)
401        } else {
402            None
403        };
404        Ok(Self {
405            id: p.id,
406            files: p
407                .files
408                .into_iter()
409                .map(DataFile::try_from)
410                .collect::<Result<_>>()?,
411            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
412            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
413            physical_rows,
414        })
415    }
416}
417
418impl From<&Fragment> for pb::DataFragment {
419    fn from(f: &Fragment) -> Self {
420        let deletion_file = f.deletion_file.as_ref().map(|f| {
421            let file_type = match f.file_type {
422                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
423                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
424            };
425            pb::DeletionFile {
426                read_version: f.read_version,
427                id: f.id,
428                file_type: file_type.into(),
429                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
430            }
431        });
432
433        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
434            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
435            RowIdMeta::External(file) => {
436                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
437                    path: file.path.clone(),
438                    offset: file.offset,
439                    size: file.size,
440                })
441            }
442        });
443
444        Self {
445            id: f.id,
446            files: f.files.iter().map(pb::DataFile::from).collect(),
447            deletion_file,
448            row_id_sequence,
449            physical_rows: f.physical_rows.unwrap_or_default() as u64,
450        }
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use arrow_schema::{
458        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
459    };
460    use serde_json::{json, Value};
461
462    #[test]
463    fn test_new_fragment() {
464        let path = "foobar.lance";
465
466        let arrow_schema = ArrowSchema::new(vec![
467            ArrowField::new(
468                "s",
469                DataType::Struct(ArrowFields::from(vec![
470                    ArrowField::new("si", DataType::Int32, false),
471                    ArrowField::new("sb", DataType::Binary, true),
472                ])),
473                true,
474            ),
475            ArrowField::new("bool", DataType::Boolean, true),
476        ]);
477        let schema = Schema::try_from(&arrow_schema).unwrap();
478        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
479
480        assert_eq!(123, fragment.id);
481        assert_eq!(
482            fragment.files,
483            vec![DataFile::new_legacy_from_fields(
484                path.to_string(),
485                vec![0, 1, 2, 3],
486            )]
487        )
488    }
489
490    #[test]
491    fn test_roundtrip_fragment() {
492        let mut fragment = Fragment::new(123);
493        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
494        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
495        fragment.deletion_file = Some(DeletionFile {
496            read_version: 123,
497            id: 456,
498            file_type: DeletionFileType::Array,
499            num_deleted_rows: Some(10),
500        });
501
502        let proto = pb::DataFragment::from(&fragment);
503        let fragment2 = Fragment::try_from(proto).unwrap();
504        assert_eq!(fragment, fragment2);
505
506        fragment.deletion_file = None;
507        let proto = pb::DataFragment::from(&fragment);
508        let fragment2 = Fragment::try_from(proto).unwrap();
509        assert_eq!(fragment, fragment2);
510    }
511
512    #[test]
513    fn test_to_json() {
514        let mut fragment = Fragment::new(123);
515        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
516        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
517        fragment.deletion_file = Some(DeletionFile {
518            read_version: 123,
519            id: 456,
520            file_type: DeletionFileType::Array,
521            num_deleted_rows: Some(10),
522        });
523
524        let json = serde_json::to_string(&fragment).unwrap();
525
526        let value: Value = serde_json::from_str(&json).unwrap();
527        assert_eq!(
528            value,
529            json!({
530                "id": 123,
531                "files":[
532                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
533                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
534                     "file_size_bytes": null }
535                ],
536                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
537                                  "num_deleted_rows": 10},
538                "physical_rows": None::<usize>}),
539        );
540
541        let frag2 = Fragment::from_json(&json).unwrap();
542        assert_eq!(fragment, frag2);
543    }
544}