1use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13
14use crate::format::pb;
15
16use crate::rowids::version::{
17 RowDatasetVersionMeta, created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb,
18};
19use lance_core::datatypes::Schema;
20use lance_core::error::Result;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
26pub struct DataFile {
27 pub path: String,
29 pub fields: Vec<i32>,
31 #[serde(default)]
41 pub column_indices: Vec<i32>,
42 #[serde(default)]
44 pub file_major_version: u32,
45 #[serde(default)]
47 pub file_minor_version: u32,
48
49 pub file_size_bytes: CachedFileSize,
51
52 pub base_id: Option<u32>,
54}
55
56impl DataFile {
57 pub fn new(
58 path: impl Into<String>,
59 fields: Vec<i32>,
60 column_indices: Vec<i32>,
61 file_major_version: u32,
62 file_minor_version: u32,
63 file_size_bytes: Option<NonZero<u64>>,
64 base_id: Option<u32>,
65 ) -> Self {
66 Self {
67 path: path.into(),
68 fields,
69 column_indices,
70 file_major_version,
71 file_minor_version,
72 file_size_bytes: file_size_bytes.into(),
73 base_id,
74 }
75 }
76
77 pub fn new_unstarted(
79 path: impl Into<String>,
80 file_major_version: u32,
81 file_minor_version: u32,
82 ) -> Self {
83 Self {
84 path: path.into(),
85 fields: vec![],
86 column_indices: vec![],
87 file_major_version,
88 file_minor_version,
89 file_size_bytes: Default::default(),
90 base_id: None,
91 }
92 }
93
94 pub fn new_legacy_from_fields(
95 path: impl Into<String>,
96 fields: Vec<i32>,
97 base_id: Option<u32>,
98 ) -> Self {
99 Self::new(
100 path,
101 fields,
102 vec![],
103 MAJOR_VERSION as u32,
104 MINOR_VERSION as u32,
105 None,
106 base_id,
107 )
108 }
109
110 pub fn new_legacy(
111 path: impl Into<String>,
112 schema: &Schema,
113 file_size_bytes: Option<NonZero<u64>>,
114 base_id: Option<u32>,
115 ) -> Self {
116 let mut field_ids = schema.field_ids();
117 field_ids.sort();
118 Self::new(
119 path,
120 field_ids,
121 vec![],
122 MAJOR_VERSION as u32,
123 MINOR_VERSION as u32,
124 file_size_bytes,
125 base_id,
126 )
127 }
128
129 pub fn schema(&self, full_schema: &Schema) -> Schema {
130 full_schema.project_by_ids(&self.fields, false)
131 }
132
133 pub fn is_legacy_file(&self) -> bool {
134 self.file_major_version == 0 && self.file_minor_version < 3
135 }
136
137 pub fn validate(&self, base_path: &Path) -> Result<()> {
138 if self.is_legacy_file() {
139 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
140 return Err(Error::corrupt_file(
141 base_path.child(self.path.clone()),
142 "contained unsorted or duplicate field ids",
143 ));
144 }
145 } else if self.column_indices.len() < self.fields.len() {
146 return Err(Error::corrupt_file(
149 base_path.child(self.path.clone()),
150 "contained fewer column_indices than fields",
151 ));
152 }
153 Ok(())
154 }
155}
156
157impl From<&DataFile> for pb::DataFile {
158 fn from(df: &DataFile) -> Self {
159 Self {
160 path: df.path.clone(),
161 fields: df.fields.clone(),
162 column_indices: df.column_indices.clone(),
163 file_major_version: df.file_major_version,
164 file_minor_version: df.file_minor_version,
165 file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
166 base_id: df.base_id,
167 }
168 }
169}
170
171impl TryFrom<pb::DataFile> for DataFile {
172 type Error = Error;
173
174 fn try_from(proto: pb::DataFile) -> Result<Self> {
175 Ok(Self {
176 path: proto.path,
177 fields: proto.fields,
178 column_indices: proto.column_indices,
179 file_major_version: proto.file_major_version,
180 file_minor_version: proto.file_minor_version,
181 file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
182 base_id: proto.base_id,
183 })
184 }
185}
186
187#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
188#[serde(rename_all = "lowercase")]
189pub enum DeletionFileType {
190 Array,
191 Bitmap,
192}
193
194impl DeletionFileType {
195 pub fn suffix(&self) -> &str {
197 match self {
198 Self::Array => "arrow",
199 Self::Bitmap => "bin",
200 }
201 }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
205pub struct DeletionFile {
206 pub read_version: u64,
207 pub id: u64,
208 pub file_type: DeletionFileType,
209 pub num_deleted_rows: Option<usize>,
211 pub base_id: Option<u32>,
212}
213
214impl TryFrom<pb::DeletionFile> for DeletionFile {
215 type Error = Error;
216
217 fn try_from(value: pb::DeletionFile) -> Result<Self> {
218 let file_type = match value.file_type {
219 0 => DeletionFileType::Array,
220 1 => DeletionFileType::Bitmap,
221 _ => {
222 return Err(Error::not_supported_source(
223 "Unknown deletion file type".into(),
224 ));
225 }
226 };
227 let num_deleted_rows = if value.num_deleted_rows == 0 {
228 None
229 } else {
230 Some(value.num_deleted_rows as usize)
231 };
232 Ok(Self {
233 read_version: value.read_version,
234 id: value.id,
235 file_type,
236 num_deleted_rows,
237 base_id: value.base_id,
238 })
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
244pub struct ExternalFile {
245 pub path: String,
246 pub offset: u64,
247 pub size: u64,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
252pub enum RowIdMeta {
253 Inline(Vec<u8>),
254 External(ExternalFile),
255}
256
257impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
258 type Error = Error;
259
260 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
261 match value {
262 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
263 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
264 Ok(Self::External(ExternalFile {
265 path: file.path.clone(),
266 offset: file.offset,
267 size: file.size,
268 }))
269 }
270 }
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
279pub struct Fragment {
280 pub id: u64,
282
283 pub files: Vec<DataFile>,
285
286 #[serde(skip_serializing_if = "Option::is_none")]
288 pub deletion_file: Option<DeletionFile>,
289
290 #[serde(skip_serializing_if = "Option::is_none")]
292 pub row_id_meta: Option<RowIdMeta>,
293
294 pub physical_rows: Option<usize>,
298
299 #[serde(skip_serializing_if = "Option::is_none")]
301 pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
302
303 #[serde(skip_serializing_if = "Option::is_none")]
305 pub created_at_version_meta: Option<RowDatasetVersionMeta>,
306}
307
308impl Fragment {
309 pub fn new(id: u64) -> Self {
310 Self {
311 id,
312 files: vec![],
313 deletion_file: None,
314 row_id_meta: None,
315 physical_rows: None,
316 last_updated_at_version_meta: None,
317 created_at_version_meta: None,
318 }
319 }
320
321 pub fn num_rows(&self) -> Option<usize> {
322 match (self.physical_rows, &self.deletion_file) {
323 (Some(len), None) => Some(len),
325 (
327 Some(len),
328 Some(DeletionFile {
329 num_deleted_rows: Some(num_deleted_rows),
330 ..
331 }),
332 ) => Some(len - num_deleted_rows),
333 _ => None,
334 }
335 }
336
337 pub fn from_json(json: &str) -> Result<Self> {
338 let fragment: Self = serde_json::from_str(json)?;
339 Ok(fragment)
340 }
341
342 pub fn with_file_legacy(
344 id: u64,
345 path: &str,
346 schema: &Schema,
347 physical_rows: Option<usize>,
348 ) -> Self {
349 Self {
350 id,
351 files: vec![DataFile::new_legacy(path, schema, None, None)],
352 deletion_file: None,
353 physical_rows,
354 row_id_meta: None,
355 last_updated_at_version_meta: None,
356 created_at_version_meta: None,
357 }
358 }
359
360 pub fn with_file(
361 mut self,
362 path: impl Into<String>,
363 field_ids: Vec<i32>,
364 column_indices: Vec<i32>,
365 version: &LanceFileVersion,
366 file_size_bytes: Option<NonZero<u64>>,
367 ) -> Self {
368 let (major, minor) = version.to_numbers();
369 let data_file = DataFile::new(
370 path,
371 field_ids,
372 column_indices,
373 major,
374 minor,
375 file_size_bytes,
376 None,
377 );
378 self.files.push(data_file);
379 self
380 }
381
382 pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
383 self.physical_rows = Some(physical_rows);
384 self
385 }
386
387 pub fn add_file(
388 &mut self,
389 path: impl Into<String>,
390 field_ids: Vec<i32>,
391 column_indices: Vec<i32>,
392 version: &LanceFileVersion,
393 file_size_bytes: Option<NonZero<u64>>,
394 ) {
395 let (major, minor) = version.to_numbers();
396 self.files.push(DataFile::new(
397 path,
398 field_ids,
399 column_indices,
400 major,
401 minor,
402 file_size_bytes,
403 None,
404 ));
405 }
406
407 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
409 self.files
410 .push(DataFile::new_legacy(path, schema, None, None));
411 }
412
413 pub fn has_legacy_files(&self) -> bool {
415 self.files[0].is_legacy_file()
417 }
418
419 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
424 let Some(sample_file) = fragments
427 .iter()
428 .find(|f| !f.files.is_empty())
429 .map(|f| &f.files[0])
430 else {
431 return Ok(None);
432 };
433 let file_version = LanceFileVersion::try_from_major_minor(
434 sample_file.file_major_version,
435 sample_file.file_minor_version,
436 )?;
437 for frag in fragments {
439 for file in &frag.files {
440 let this_file_version = LanceFileVersion::try_from_major_minor(
441 file.file_major_version,
442 file.file_minor_version,
443 )?;
444 if file_version != this_file_version {
445 return Err(Error::invalid_input(format!(
446 "All data files must have the same version. Detected both {} and {}",
447 file_version, this_file_version
448 )));
449 }
450 }
451 }
452 Ok(Some(file_version))
453 }
454}
455
456impl TryFrom<pb::DataFragment> for Fragment {
457 type Error = Error;
458
459 fn try_from(p: pb::DataFragment) -> Result<Self> {
460 let physical_rows = if p.physical_rows > 0 {
461 Some(p.physical_rows as usize)
462 } else {
463 None
464 };
465 Ok(Self {
466 id: p.id,
467 files: p
468 .files
469 .into_iter()
470 .map(DataFile::try_from)
471 .collect::<Result<_>>()?,
472 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
473 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
474 physical_rows,
475 last_updated_at_version_meta: p
476 .last_updated_at_version_sequence
477 .map(RowDatasetVersionMeta::try_from)
478 .transpose()?,
479 created_at_version_meta: p
480 .created_at_version_sequence
481 .map(RowDatasetVersionMeta::try_from)
482 .transpose()?,
483 })
484 }
485}
486
487impl From<&Fragment> for pb::DataFragment {
488 fn from(f: &Fragment) -> Self {
489 let deletion_file = f.deletion_file.as_ref().map(|f| {
490 let file_type = match f.file_type {
491 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
492 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
493 };
494 pb::DeletionFile {
495 read_version: f.read_version,
496 id: f.id,
497 file_type: file_type.into(),
498 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
499 base_id: f.base_id,
500 }
501 });
502
503 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
504 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
505 RowIdMeta::External(file) => {
506 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
507 path: file.path.clone(),
508 offset: file.offset,
509 size: file.size,
510 })
511 }
512 });
513 let last_updated_at_version_sequence =
514 last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
515 let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
516 Self {
517 id: f.id,
518 files: f.files.iter().map(pb::DataFile::from).collect(),
519 deletion_file,
520 row_id_sequence,
521 physical_rows: f.physical_rows.unwrap_or_default() as u64,
522 last_updated_at_version_sequence,
523 created_at_version_sequence,
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use arrow_schema::{
532 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
533 };
534 use object_store::path::Path;
535 use serde_json::{Value, json};
536
537 #[test]
538 fn test_new_fragment() {
539 let path = "foobar.lance";
540
541 let arrow_schema = ArrowSchema::new(vec![
542 ArrowField::new(
543 "s",
544 DataType::Struct(ArrowFields::from(vec![
545 ArrowField::new("si", DataType::Int32, false),
546 ArrowField::new("sb", DataType::Binary, true),
547 ])),
548 true,
549 ),
550 ArrowField::new("bool", DataType::Boolean, true),
551 ]);
552 let schema = Schema::try_from(&arrow_schema).unwrap();
553 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
554
555 assert_eq!(123, fragment.id);
556 assert_eq!(
557 fragment.files,
558 vec![DataFile::new_legacy_from_fields(
559 path.to_string(),
560 vec![0, 1, 2, 3],
561 None,
562 )]
563 )
564 }
565
566 #[test]
567 fn test_roundtrip_fragment() {
568 let mut fragment = Fragment::new(123);
569 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
570 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
571 fragment.deletion_file = Some(DeletionFile {
572 read_version: 123,
573 id: 456,
574 file_type: DeletionFileType::Array,
575 num_deleted_rows: Some(10),
576 base_id: None,
577 });
578
579 let proto = pb::DataFragment::from(&fragment);
580 let fragment2 = Fragment::try_from(proto).unwrap();
581 assert_eq!(fragment, fragment2);
582
583 fragment.deletion_file = None;
584 let proto = pb::DataFragment::from(&fragment);
585 let fragment2 = Fragment::try_from(proto).unwrap();
586 assert_eq!(fragment, fragment2);
587 }
588
589 #[test]
590 fn test_to_json() {
591 let mut fragment = Fragment::new(123);
592 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
593 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
594 fragment.deletion_file = Some(DeletionFile {
595 read_version: 123,
596 id: 456,
597 file_type: DeletionFileType::Array,
598 num_deleted_rows: Some(10),
599 base_id: None,
600 });
601
602 let json = serde_json::to_string(&fragment).unwrap();
603
604 let value: Value = serde_json::from_str(&json).unwrap();
605 assert_eq!(
606 value,
607 json!({
608 "id": 123,
609 "files":[
610 {"path": "foobar.lance", "fields": [0], "column_indices": [],
611 "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
612 "file_size_bytes": null, "base_id": null }
613 ],
614 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
615 "num_deleted_rows": 10, "base_id": null},
616 "physical_rows": None::<usize>}),
617 );
618
619 let frag2 = Fragment::from_json(&json).unwrap();
620 assert_eq!(fragment, frag2);
621 }
622
623 #[test]
624 fn data_file_validate_allows_extra_columns() {
625 let data_file = DataFile {
626 path: "foo.lance".to_string(),
627 fields: vec![1, 2],
628 column_indices: vec![0, 1, 2],
630 file_major_version: MAJOR_VERSION as u32,
631 file_minor_version: MINOR_VERSION as u32,
632 file_size_bytes: Default::default(),
633 base_id: None,
634 };
635
636 let base_path = Path::from("base");
637 data_file
638 .validate(&base_path)
639 .expect("validation should allow extra columns without field ids");
640 }
641}