Skip to main content

lance_table/format/
fragment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::num::NonZero;
6use std::sync::Arc;
7
8use deepsize::DeepSizeOf;
9use lance_core::Error;
10use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
11use lance_file::version::LanceFileVersion;
12use lance_io::utils::CachedFileSize;
13use object_store::path::Path;
14use serde::{Deserialize, Deserializer, Serialize, Serializer};
15
16use crate::format::pb;
17
18use crate::rowids::version::{
19    RowDatasetVersionMeta, created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb,
20};
21use lance_core::datatypes::Schema;
22use lance_core::error::Result;
23
24/// Lance Data File
25///
26/// A data file is one piece of file storing data.
27#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
28pub struct DataFile {
29    /// Relative path of the data file to dataset root.
30    pub path: String,
31    /// The ids of fields in this file.
32    ///
33    /// When identical across many fragments (common case), multiple `DataFile`
34    /// instances share a single heap allocation via `Arc`, significantly
35    /// reducing manifest memory for large tables.
36    pub fields: Arc<[i32]>,
37    /// The offsets of the fields listed in `fields`, empty in v1 files
38    ///
39    /// Note that -1 is a possibility and it indices that the field has
40    /// no top-level column in the file.
41    ///
42    /// Columns that lack a field id may still exist as extra entries in
43    /// `column_indices`; such columns are ignored by field-id–based projection.
44    /// For example, some fields, such as blob fields, occupy multiple
45    /// columns in the file but only have a single field id.
46    pub column_indices: Arc<[i32]>,
47    /// The major version of the file format used to write this file.
48    pub file_major_version: u32,
49    /// The minor version of the file format used to write this file.
50    pub file_minor_version: u32,
51
52    /// The size of the file in bytes, if known.
53    pub file_size_bytes: CachedFileSize,
54
55    /// The base path of the datafile, when the datafile is outside the dataset.
56    pub base_id: Option<u32>,
57}
58
59// Custom Serialize: convert Arc<[i32]> to slice for transparent JSON output
60impl Serialize for DataFile {
61    fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
62        use serde::ser::SerializeStruct;
63        let mut s = serializer.serialize_struct("DataFile", 7)?;
64        s.serialize_field("path", &self.path)?;
65        s.serialize_field("fields", self.fields.as_ref())?;
66        s.serialize_field("column_indices", self.column_indices.as_ref())?;
67        s.serialize_field("file_major_version", &self.file_major_version)?;
68        s.serialize_field("file_minor_version", &self.file_minor_version)?;
69        s.serialize_field("file_size_bytes", &self.file_size_bytes)?;
70        s.serialize_field("base_id", &self.base_id)?;
71        s.end()
72    }
73}
74
75// Custom Deserialize: read Vec<i32> and convert to Arc<[i32]>
76impl<'de> Deserialize<'de> for DataFile {
77    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
78        #[derive(Deserialize)]
79        struct DataFileHelper {
80            path: String,
81            fields: Vec<i32>,
82            #[serde(default)]
83            column_indices: Vec<i32>,
84            #[serde(default)]
85            file_major_version: u32,
86            #[serde(default)]
87            file_minor_version: u32,
88            file_size_bytes: CachedFileSize,
89            base_id: Option<u32>,
90        }
91
92        let helper = DataFileHelper::deserialize(deserializer)?;
93        Ok(Self {
94            path: helper.path,
95            fields: Arc::from(helper.fields),
96            column_indices: Arc::from(helper.column_indices),
97            file_major_version: helper.file_major_version,
98            file_minor_version: helper.file_minor_version,
99            file_size_bytes: helper.file_size_bytes,
100            base_id: helper.base_id,
101        })
102    }
103}
104
105impl DataFile {
106    pub fn new(
107        path: impl Into<String>,
108        fields: Vec<i32>,
109        column_indices: Vec<i32>,
110        file_major_version: u32,
111        file_minor_version: u32,
112        file_size_bytes: Option<NonZero<u64>>,
113        base_id: Option<u32>,
114    ) -> Self {
115        Self {
116            path: path.into(),
117            fields: Arc::from(fields),
118            column_indices: Arc::from(column_indices),
119            file_major_version,
120            file_minor_version,
121            file_size_bytes: file_size_bytes.into(),
122            base_id,
123        }
124    }
125
126    /// Create a new `DataFile` with the expectation that fields and column_indices will be set later
127    pub fn new_unstarted(
128        path: impl Into<String>,
129        file_major_version: u32,
130        file_minor_version: u32,
131    ) -> Self {
132        Self {
133            path: path.into(),
134            fields: Arc::from([]),
135            column_indices: Arc::from([]),
136            file_major_version,
137            file_minor_version,
138            file_size_bytes: Default::default(),
139            base_id: None,
140        }
141    }
142
143    pub fn new_legacy_from_fields(
144        path: impl Into<String>,
145        fields: Vec<i32>,
146        base_id: Option<u32>,
147    ) -> Self {
148        Self::new(
149            path,
150            fields,
151            vec![],
152            MAJOR_VERSION as u32,
153            MINOR_VERSION as u32,
154            None,
155            base_id,
156        )
157    }
158
159    pub fn new_legacy(
160        path: impl Into<String>,
161        schema: &Schema,
162        file_size_bytes: Option<NonZero<u64>>,
163        base_id: Option<u32>,
164    ) -> Self {
165        let mut field_ids = schema.field_ids();
166        field_ids.sort();
167        Self::new(
168            path,
169            field_ids,
170            vec![],
171            MAJOR_VERSION as u32,
172            MINOR_VERSION as u32,
173            file_size_bytes,
174            base_id,
175        )
176    }
177
178    pub fn schema(&self, full_schema: &Schema) -> Schema {
179        full_schema.project_by_ids(&self.fields, false)
180    }
181
182    pub fn is_legacy_file(&self) -> bool {
183        self.file_major_version == 0 && self.file_minor_version < 3
184    }
185
186    pub fn validate(&self, base_path: &Path) -> Result<()> {
187        if self.is_legacy_file() {
188            if !self.fields.windows(2).all(|w| w[0] < w[1]) {
189                return Err(Error::corrupt_file(
190                    base_path.child(self.path.clone()),
191                    "contained unsorted or duplicate field ids",
192                ));
193            }
194        } else if self.column_indices.len() < self.fields.len() {
195            // Every recorded field id must have a column index, but not every column needs
196            // to be associated with a field id (extra columns are allowed).
197            return Err(Error::corrupt_file(
198                base_path.child(self.path.clone()),
199                "contained fewer column_indices than fields",
200            ));
201        }
202        Ok(())
203    }
204}
205
206impl From<&DataFile> for pb::DataFile {
207    fn from(df: &DataFile) -> Self {
208        Self {
209            path: df.path.clone(),
210            fields: df.fields.to_vec(),
211            column_indices: df.column_indices.to_vec(),
212            file_major_version: df.file_major_version,
213            file_minor_version: df.file_minor_version,
214            file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
215            base_id: df.base_id,
216        }
217    }
218}
219
220impl TryFrom<pb::DataFile> for DataFile {
221    type Error = Error;
222
223    fn try_from(proto: pb::DataFile) -> Result<Self> {
224        Ok(Self {
225            path: proto.path,
226            fields: Arc::from(proto.fields),
227            column_indices: Arc::from(proto.column_indices),
228            file_major_version: proto.file_major_version,
229            file_minor_version: proto.file_minor_version,
230            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
231            base_id: proto.base_id,
232        })
233    }
234}
235
236/// Interns repeated data so that fragments with identical content share a
237/// single heap allocation via `Arc`.
238///
239/// At 20M fragments the deduplication typically saves multiple GB of heap
240/// because every fragment in a homogeneous table carries the same field list,
241/// and post-compaction fragments share identical version metadata bytes.
242///
243/// Uses a `Vec`-based linear scan when the cache is small (<=16 entries)
244/// and upgrades to `HashMap` for larger caches. In the common homogeneous
245/// case (1-3 unique values), linear scan avoids per-fragment hashing overhead.
246#[derive(Default)]
247pub struct DataFileFieldInterner {
248    fields: InternCache<i32>,
249    column_indices: InternCache<i32>,
250    inline_bytes: InternCache<u8>,
251}
252
253/// A cache that uses linear scan for small sizes and HashMap for large.
254/// The threshold is chosen so that scan + compare is cheaper than hash for
255/// typical payload sizes (20-200 bytes).
256enum InternCache<T: Eq + std::hash::Hash + Clone> {
257    Small(Vec<Arc<[T]>>),
258    Large(HashMap<Arc<[T]>, ()>),
259}
260
261const INTERN_CACHE_UPGRADE_THRESHOLD: usize = 16;
262
263impl<T: Eq + std::hash::Hash + Clone> Default for InternCache<T> {
264    fn default() -> Self {
265        Self::Small(Vec::new())
266    }
267}
268
269impl<T: Eq + std::hash::Hash + Clone> InternCache<T> {
270    fn intern(&mut self, v: Vec<T>) -> Arc<[T]> {
271        match self {
272            Self::Small(entries) => {
273                for existing in entries.iter() {
274                    if existing.as_ref() == v.as_slice() {
275                        return existing.clone();
276                    }
277                }
278                let arc: Arc<[T]> = Arc::from(v);
279                entries.push(arc.clone());
280                if entries.len() > INTERN_CACHE_UPGRADE_THRESHOLD {
281                    let mut map = HashMap::with_capacity(entries.len());
282                    for e in entries.drain(..) {
283                        map.insert(e, ());
284                    }
285                    *self = Self::Large(map);
286                }
287                arc
288            }
289            Self::Large(map) => {
290                if let Some((existing, _)) = map.get_key_value(v.as_slice()) {
291                    existing.clone()
292                } else {
293                    let arc: Arc<[T]> = Arc::from(v);
294                    map.insert(arc.clone(), ());
295                    arc
296                }
297            }
298        }
299    }
300}
301
302impl DataFileFieldInterner {
303    /// Intern a `RowDatasetVersionMeta`, deduplicating inline byte payloads.
304    /// Accepts the protobuf oneof value directly to avoid an intermediate
305    /// `Arc<[u8]>` allocation that would need to be `.to_vec()`'d for the key lookup.
306    fn intern_last_updated_version_meta(
307        cache: &mut InternCache<u8>,
308        pb: pb::data_fragment::LastUpdatedAtVersionSequence,
309    ) -> Result<RowDatasetVersionMeta> {
310        match pb {
311            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
312                Ok(RowDatasetVersionMeta::Inline(cache.intern(data)))
313            }
314            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
315                file,
316            ) => Ok(RowDatasetVersionMeta::External(ExternalFile {
317                path: file.path,
318                offset: file.offset,
319                size: file.size,
320            })),
321        }
322    }
323
324    /// Intern a `RowDatasetVersionMeta`, deduplicating inline byte payloads.
325    fn intern_created_version_meta(
326        cache: &mut InternCache<u8>,
327        pb: pb::data_fragment::CreatedAtVersionSequence,
328    ) -> Result<RowDatasetVersionMeta> {
329        match pb {
330            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
331                Ok(RowDatasetVersionMeta::Inline(cache.intern(data)))
332            }
333            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
334                Ok(RowDatasetVersionMeta::External(ExternalFile {
335                    path: file.path,
336                    offset: file.offset,
337                    size: file.size,
338                }))
339            }
340        }
341    }
342
343    /// Convert a protobuf `DataFile`, interning `fields` and `column_indices`.
344    pub fn intern_data_file(&mut self, proto: pb::DataFile) -> Result<DataFile> {
345        Ok(DataFile {
346            path: proto.path,
347            fields: self.fields.intern(proto.fields),
348            column_indices: self.column_indices.intern(proto.column_indices),
349            file_major_version: proto.file_major_version,
350            file_minor_version: proto.file_minor_version,
351            file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
352            base_id: proto.base_id,
353        })
354    }
355
356    /// Convert a protobuf `DataFragment`, interning fields and version metadata.
357    pub fn intern_fragment(&mut self, p: pb::DataFragment) -> Result<Fragment> {
358        let physical_rows = if p.physical_rows > 0 {
359            Some(p.physical_rows as usize)
360        } else {
361            None
362        };
363        let last_updated_at_version_meta = p
364            .last_updated_at_version_sequence
365            .map(|pb| Self::intern_last_updated_version_meta(&mut self.inline_bytes, pb))
366            .transpose()?;
367        let created_at_version_meta = p
368            .created_at_version_sequence
369            .map(|pb| Self::intern_created_version_meta(&mut self.inline_bytes, pb))
370            .transpose()?;
371        Ok(Fragment {
372            id: p.id,
373            files: p
374                .files
375                .into_iter()
376                .map(|f| self.intern_data_file(f))
377                .collect::<Result<_>>()?,
378            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
379            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
380            physical_rows,
381            last_updated_at_version_meta,
382            created_at_version_meta,
383        })
384    }
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
388#[serde(rename_all = "lowercase")]
389pub enum DeletionFileType {
390    Array,
391    Bitmap,
392}
393
394impl DeletionFileType {
395    // TODO: pub(crate)
396    pub fn suffix(&self) -> &str {
397        match self {
398            Self::Array => "arrow",
399            Self::Bitmap => "bin",
400        }
401    }
402}
403
404#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
405pub struct DeletionFile {
406    pub read_version: u64,
407    pub id: u64,
408    pub file_type: DeletionFileType,
409    /// Number of deleted rows in this file. If None, this is unknown.
410    pub num_deleted_rows: Option<usize>,
411    pub base_id: Option<u32>,
412}
413
414impl TryFrom<pb::DeletionFile> for DeletionFile {
415    type Error = Error;
416
417    fn try_from(value: pb::DeletionFile) -> Result<Self> {
418        let file_type = match value.file_type {
419            0 => DeletionFileType::Array,
420            1 => DeletionFileType::Bitmap,
421            _ => {
422                return Err(Error::not_supported_source(
423                    "Unknown deletion file type".into(),
424                ));
425            }
426        };
427        let num_deleted_rows = if value.num_deleted_rows == 0 {
428            None
429        } else {
430            Some(value.num_deleted_rows as usize)
431        };
432        Ok(Self {
433            read_version: value.read_version,
434            id: value.id,
435            file_type,
436            num_deleted_rows,
437            base_id: value.base_id,
438        })
439    }
440}
441
442/// A reference to a part of a file.
443#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
444pub struct ExternalFile {
445    pub path: String,
446    pub offset: u64,
447    pub size: u64,
448}
449
450/// Metadata about location of the row id sequence.
451#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
452pub enum RowIdMeta {
453    Inline(Vec<u8>),
454    External(ExternalFile),
455}
456
457impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
458    type Error = Error;
459
460    fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
461        match value {
462            pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
463            pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
464                Ok(Self::External(ExternalFile {
465                    path: file.path.clone(),
466                    offset: file.offset,
467                    size: file.size,
468                }))
469            }
470        }
471    }
472}
473
474/// Data fragment.
475///
476/// A fragment is a set of files which represent the different columns of the same rows.
477/// If column exists in the schema, but the related file does not exist, treat this column as `nulls`.
478#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
479pub struct Fragment {
480    /// Fragment ID
481    pub id: u64,
482
483    /// Files within the fragment.
484    pub files: Vec<DataFile>,
485
486    /// Optional file with deleted local row offsets.
487    #[serde(skip_serializing_if = "Option::is_none")]
488    pub deletion_file: Option<DeletionFile>,
489
490    /// RowIndex
491    #[serde(skip_serializing_if = "Option::is_none")]
492    pub row_id_meta: Option<RowIdMeta>,
493
494    /// Original number of rows in the fragment. If this is None, then it is
495    /// unknown. This is only optional for legacy reasons. All new tables should
496    /// have this set.
497    pub physical_rows: Option<usize>,
498
499    /// Last updated at version metadata
500    #[serde(skip_serializing_if = "Option::is_none")]
501    pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
502
503    /// Created at version metadata
504    #[serde(skip_serializing_if = "Option::is_none")]
505    pub created_at_version_meta: Option<RowDatasetVersionMeta>,
506}
507
508impl Fragment {
509    pub fn new(id: u64) -> Self {
510        Self {
511            id,
512            files: vec![],
513            deletion_file: None,
514            row_id_meta: None,
515            physical_rows: None,
516            last_updated_at_version_meta: None,
517            created_at_version_meta: None,
518        }
519    }
520
521    pub fn num_rows(&self) -> Option<usize> {
522        match (self.physical_rows, &self.deletion_file) {
523            // Known fragment length, no deletion file.
524            (Some(len), None) => Some(len),
525            // Known fragment length, but don't know deletion file size.
526            (
527                Some(len),
528                Some(DeletionFile {
529                    num_deleted_rows: Some(num_deleted_rows),
530                    ..
531                }),
532            ) => Some(len - num_deleted_rows),
533            _ => None,
534        }
535    }
536
537    pub fn from_json(json: &str) -> Result<Self> {
538        let fragment: Self = serde_json::from_str(json)?;
539        Ok(fragment)
540    }
541
542    /// Create a `Fragment` with one DataFile
543    pub fn with_file_legacy(
544        id: u64,
545        path: &str,
546        schema: &Schema,
547        physical_rows: Option<usize>,
548    ) -> Self {
549        Self {
550            id,
551            files: vec![DataFile::new_legacy(path, schema, None, None)],
552            deletion_file: None,
553            physical_rows,
554            row_id_meta: None,
555            last_updated_at_version_meta: None,
556            created_at_version_meta: None,
557        }
558    }
559
560    pub fn with_file(
561        mut self,
562        path: impl Into<String>,
563        field_ids: Vec<i32>,
564        column_indices: Vec<i32>,
565        version: &LanceFileVersion,
566        file_size_bytes: Option<NonZero<u64>>,
567    ) -> Self {
568        let (major, minor) = version.to_numbers();
569        let data_file = DataFile::new(
570            path,
571            field_ids,
572            column_indices,
573            major,
574            minor,
575            file_size_bytes,
576            None,
577        );
578        self.files.push(data_file);
579        self
580    }
581
582    pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
583        self.physical_rows = Some(physical_rows);
584        self
585    }
586
587    pub fn add_file(
588        &mut self,
589        path: impl Into<String>,
590        field_ids: Vec<i32>,
591        column_indices: Vec<i32>,
592        version: &LanceFileVersion,
593        file_size_bytes: Option<NonZero<u64>>,
594    ) {
595        let (major, minor) = version.to_numbers();
596        self.files.push(DataFile::new(
597            path,
598            field_ids,
599            column_indices,
600            major,
601            minor,
602            file_size_bytes,
603            None,
604        ));
605    }
606
607    /// Add a new [`DataFile`] to this fragment.
608    pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
609        self.files
610            .push(DataFile::new_legacy(path, schema, None, None));
611    }
612
613    // True if this fragment is made up of legacy v1 files, false otherwise
614    pub fn has_legacy_files(&self) -> bool {
615        // If any file in a fragment is legacy then all files in the fragment must be
616        self.files[0].is_legacy_file()
617    }
618
619    // Helper method to infer the Lance version from a set of fragments
620    //
621    // Returns None if there are no data files
622    // Returns an error if the data files have different versions
623    pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
624        // Otherwise we need to check the actual file versions
625        // Determine version from first file
626        let Some(sample_file) = fragments
627            .iter()
628            .find(|f| !f.files.is_empty())
629            .map(|f| &f.files[0])
630        else {
631            return Ok(None);
632        };
633        let file_version = LanceFileVersion::try_from_major_minor(
634            sample_file.file_major_version,
635            sample_file.file_minor_version,
636        )?;
637        // Ensure all files match
638        for frag in fragments {
639            for file in &frag.files {
640                let this_file_version = LanceFileVersion::try_from_major_minor(
641                    file.file_major_version,
642                    file.file_minor_version,
643                )?;
644                if file_version != this_file_version {
645                    return Err(Error::invalid_input(format!(
646                        "All data files must have the same version.  Detected both {} and {}",
647                        file_version, this_file_version
648                    )));
649                }
650            }
651        }
652        Ok(Some(file_version))
653    }
654}
655
656impl TryFrom<pb::DataFragment> for Fragment {
657    type Error = Error;
658
659    fn try_from(p: pb::DataFragment) -> Result<Self> {
660        let physical_rows = if p.physical_rows > 0 {
661            Some(p.physical_rows as usize)
662        } else {
663            None
664        };
665        Ok(Self {
666            id: p.id,
667            files: p
668                .files
669                .into_iter()
670                .map(DataFile::try_from)
671                .collect::<Result<_>>()?,
672            deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
673            row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
674            physical_rows,
675            last_updated_at_version_meta: p
676                .last_updated_at_version_sequence
677                .map(RowDatasetVersionMeta::try_from)
678                .transpose()?,
679            created_at_version_meta: p
680                .created_at_version_sequence
681                .map(RowDatasetVersionMeta::try_from)
682                .transpose()?,
683        })
684    }
685}
686
687impl From<&Fragment> for pb::DataFragment {
688    fn from(f: &Fragment) -> Self {
689        let deletion_file = f.deletion_file.as_ref().map(|f| {
690            let file_type = match f.file_type {
691                DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
692                DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
693            };
694            pb::DeletionFile {
695                read_version: f.read_version,
696                id: f.id,
697                file_type: file_type.into(),
698                num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
699                base_id: f.base_id,
700            }
701        });
702
703        let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
704            RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
705            RowIdMeta::External(file) => {
706                pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
707                    path: file.path.clone(),
708                    offset: file.offset,
709                    size: file.size,
710                })
711            }
712        });
713        let last_updated_at_version_sequence =
714            last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
715        let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
716        Self {
717            id: f.id,
718            files: f.files.iter().map(pb::DataFile::from).collect(),
719            deletion_file,
720            row_id_sequence,
721            physical_rows: f.physical_rows.unwrap_or_default() as u64,
722            last_updated_at_version_sequence,
723            created_at_version_sequence,
724        }
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use super::*;
731    use arrow_schema::{
732        DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
733    };
734    use object_store::path::Path;
735    use serde_json::{Value, json};
736
737    #[test]
738    fn test_new_fragment() {
739        let path = "foobar.lance";
740
741        let arrow_schema = ArrowSchema::new(vec![
742            ArrowField::new(
743                "s",
744                DataType::Struct(ArrowFields::from(vec![
745                    ArrowField::new("si", DataType::Int32, false),
746                    ArrowField::new("sb", DataType::Binary, true),
747                ])),
748                true,
749            ),
750            ArrowField::new("bool", DataType::Boolean, true),
751        ]);
752        let schema = Schema::try_from(&arrow_schema).unwrap();
753        let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
754
755        assert_eq!(123, fragment.id);
756        assert_eq!(
757            fragment.files,
758            vec![DataFile::new_legacy_from_fields(
759                path.to_string(),
760                vec![0, 1, 2, 3],
761                None,
762            )]
763        )
764    }
765
766    #[test]
767    fn test_roundtrip_fragment() {
768        let mut fragment = Fragment::new(123);
769        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
770        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
771        fragment.deletion_file = Some(DeletionFile {
772            read_version: 123,
773            id: 456,
774            file_type: DeletionFileType::Array,
775            num_deleted_rows: Some(10),
776            base_id: None,
777        });
778
779        let proto = pb::DataFragment::from(&fragment);
780        let fragment2 = Fragment::try_from(proto).unwrap();
781        assert_eq!(fragment, fragment2);
782
783        fragment.deletion_file = None;
784        let proto = pb::DataFragment::from(&fragment);
785        let fragment2 = Fragment::try_from(proto).unwrap();
786        assert_eq!(fragment, fragment2);
787    }
788
789    #[test]
790    fn test_to_json() {
791        let mut fragment = Fragment::new(123);
792        let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
793        fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
794        fragment.deletion_file = Some(DeletionFile {
795            read_version: 123,
796            id: 456,
797            file_type: DeletionFileType::Array,
798            num_deleted_rows: Some(10),
799            base_id: None,
800        });
801
802        let json = serde_json::to_string(&fragment).unwrap();
803
804        let value: Value = serde_json::from_str(&json).unwrap();
805        assert_eq!(
806            value,
807            json!({
808                "id": 123,
809                "files":[
810                    {"path": "foobar.lance", "fields": [0], "column_indices": [], 
811                     "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
812                     "file_size_bytes": null, "base_id": null }
813                ],
814                "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
815                                  "num_deleted_rows": 10, "base_id": null},
816                "physical_rows": None::<usize>}),
817        );
818
819        let frag2 = Fragment::from_json(&json).unwrap();
820        assert_eq!(fragment, frag2);
821    }
822
823    #[test]
824    fn data_file_validate_allows_extra_columns() {
825        let data_file = DataFile {
826            path: "foo.lance".to_string(),
827            fields: Arc::from([1, 2]),
828            // One extra column without a field id mapping
829            column_indices: Arc::from([0, 1, 2]),
830            file_major_version: MAJOR_VERSION as u32,
831            file_minor_version: MINOR_VERSION as u32,
832            file_size_bytes: Default::default(),
833            base_id: None,
834        };
835
836        let base_path = Path::from("base");
837        data_file
838            .validate(&base_path)
839            .expect("validation should allow extra columns without field ids");
840    }
841}