1use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use crate::rowids::version::{
18 created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb, RowDatasetVersionMeta,
19};
20use lance_core::datatypes::Schema;
21use lance_core::error::Result;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
27pub struct DataFile {
28 pub path: String,
30 pub fields: Vec<i32>,
32 #[serde(default)]
42 pub column_indices: Vec<i32>,
43 #[serde(default)]
45 pub file_major_version: u32,
46 #[serde(default)]
48 pub file_minor_version: u32,
49
50 pub file_size_bytes: CachedFileSize,
52
53 pub base_id: Option<u32>,
55}
56
57impl DataFile {
58 pub fn new(
59 path: impl Into<String>,
60 fields: Vec<i32>,
61 column_indices: Vec<i32>,
62 file_major_version: u32,
63 file_minor_version: u32,
64 file_size_bytes: Option<NonZero<u64>>,
65 base_id: Option<u32>,
66 ) -> Self {
67 Self {
68 path: path.into(),
69 fields,
70 column_indices,
71 file_major_version,
72 file_minor_version,
73 file_size_bytes: file_size_bytes.into(),
74 base_id,
75 }
76 }
77
78 pub fn new_unstarted(
80 path: impl Into<String>,
81 file_major_version: u32,
82 file_minor_version: u32,
83 ) -> Self {
84 Self {
85 path: path.into(),
86 fields: vec![],
87 column_indices: vec![],
88 file_major_version,
89 file_minor_version,
90 file_size_bytes: Default::default(),
91 base_id: None,
92 }
93 }
94
95 pub fn new_legacy_from_fields(
96 path: impl Into<String>,
97 fields: Vec<i32>,
98 base_id: Option<u32>,
99 ) -> Self {
100 Self::new(
101 path,
102 fields,
103 vec![],
104 MAJOR_VERSION as u32,
105 MINOR_VERSION as u32,
106 None,
107 base_id,
108 )
109 }
110
111 pub fn new_legacy(
112 path: impl Into<String>,
113 schema: &Schema,
114 file_size_bytes: Option<NonZero<u64>>,
115 base_id: Option<u32>,
116 ) -> Self {
117 let mut field_ids = schema.field_ids();
118 field_ids.sort();
119 Self::new(
120 path,
121 field_ids,
122 vec![],
123 MAJOR_VERSION as u32,
124 MINOR_VERSION as u32,
125 file_size_bytes,
126 base_id,
127 )
128 }
129
130 pub fn schema(&self, full_schema: &Schema) -> Schema {
131 full_schema.project_by_ids(&self.fields, false)
132 }
133
134 pub fn is_legacy_file(&self) -> bool {
135 self.file_major_version == 0 && self.file_minor_version < 3
136 }
137
138 pub fn validate(&self, base_path: &Path) -> Result<()> {
139 if self.is_legacy_file() {
140 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
141 return Err(Error::corrupt_file(
142 base_path.child(self.path.clone()),
143 "contained unsorted or duplicate field ids",
144 location!(),
145 ));
146 }
147 } else if self.column_indices.len() < self.fields.len() {
148 return Err(Error::corrupt_file(
151 base_path.child(self.path.clone()),
152 "contained fewer column_indices than fields",
153 location!(),
154 ));
155 }
156 Ok(())
157 }
158}
159
160impl From<&DataFile> for pb::DataFile {
161 fn from(df: &DataFile) -> Self {
162 Self {
163 path: df.path.clone(),
164 fields: df.fields.clone(),
165 column_indices: df.column_indices.clone(),
166 file_major_version: df.file_major_version,
167 file_minor_version: df.file_minor_version,
168 file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
169 base_id: df.base_id,
170 }
171 }
172}
173
174impl TryFrom<pb::DataFile> for DataFile {
175 type Error = Error;
176
177 fn try_from(proto: pb::DataFile) -> Result<Self> {
178 Ok(Self {
179 path: proto.path,
180 fields: proto.fields,
181 column_indices: proto.column_indices,
182 file_major_version: proto.file_major_version,
183 file_minor_version: proto.file_minor_version,
184 file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
185 base_id: proto.base_id,
186 })
187 }
188}
189
190#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
191#[serde(rename_all = "lowercase")]
192pub enum DeletionFileType {
193 Array,
194 Bitmap,
195}
196
197impl DeletionFileType {
198 pub fn suffix(&self) -> &str {
200 match self {
201 Self::Array => "arrow",
202 Self::Bitmap => "bin",
203 }
204 }
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
208pub struct DeletionFile {
209 pub read_version: u64,
210 pub id: u64,
211 pub file_type: DeletionFileType,
212 pub num_deleted_rows: Option<usize>,
214 pub base_id: Option<u32>,
215}
216
217impl TryFrom<pb::DeletionFile> for DeletionFile {
218 type Error = Error;
219
220 fn try_from(value: pb::DeletionFile) -> Result<Self> {
221 let file_type = match value.file_type {
222 0 => DeletionFileType::Array,
223 1 => DeletionFileType::Bitmap,
224 _ => {
225 return Err(Error::NotSupported {
226 source: "Unknown deletion file type".into(),
227 location: location!(),
228 })
229 }
230 };
231 let num_deleted_rows = if value.num_deleted_rows == 0 {
232 None
233 } else {
234 Some(value.num_deleted_rows as usize)
235 };
236 Ok(Self {
237 read_version: value.read_version,
238 id: value.id,
239 file_type,
240 num_deleted_rows,
241 base_id: value.base_id,
242 })
243 }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
248pub struct ExternalFile {
249 pub path: String,
250 pub offset: u64,
251 pub size: u64,
252}
253
254#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
256pub enum RowIdMeta {
257 Inline(Vec<u8>),
258 External(ExternalFile),
259}
260
261impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
262 type Error = Error;
263
264 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
265 match value {
266 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
267 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
268 Ok(Self::External(ExternalFile {
269 path: file.path.clone(),
270 offset: file.offset,
271 size: file.size,
272 }))
273 }
274 }
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
283pub struct Fragment {
284 pub id: u64,
286
287 pub files: Vec<DataFile>,
289
290 #[serde(skip_serializing_if = "Option::is_none")]
292 pub deletion_file: Option<DeletionFile>,
293
294 #[serde(skip_serializing_if = "Option::is_none")]
296 pub row_id_meta: Option<RowIdMeta>,
297
298 pub physical_rows: Option<usize>,
302
303 #[serde(skip_serializing_if = "Option::is_none")]
305 pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
306
307 #[serde(skip_serializing_if = "Option::is_none")]
309 pub created_at_version_meta: Option<RowDatasetVersionMeta>,
310}
311
312impl Fragment {
313 pub fn new(id: u64) -> Self {
314 Self {
315 id,
316 files: vec![],
317 deletion_file: None,
318 row_id_meta: None,
319 physical_rows: None,
320 last_updated_at_version_meta: None,
321 created_at_version_meta: None,
322 }
323 }
324
325 pub fn num_rows(&self) -> Option<usize> {
326 match (self.physical_rows, &self.deletion_file) {
327 (Some(len), None) => Some(len),
329 (
331 Some(len),
332 Some(DeletionFile {
333 num_deleted_rows: Some(num_deleted_rows),
334 ..
335 }),
336 ) => Some(len - num_deleted_rows),
337 _ => None,
338 }
339 }
340
341 pub fn from_json(json: &str) -> Result<Self> {
342 let fragment: Self = serde_json::from_str(json)?;
343 Ok(fragment)
344 }
345
346 pub fn with_file_legacy(
348 id: u64,
349 path: &str,
350 schema: &Schema,
351 physical_rows: Option<usize>,
352 ) -> Self {
353 Self {
354 id,
355 files: vec![DataFile::new_legacy(path, schema, None, None)],
356 deletion_file: None,
357 physical_rows,
358 row_id_meta: None,
359 last_updated_at_version_meta: None,
360 created_at_version_meta: None,
361 }
362 }
363
364 pub fn with_file(
365 mut self,
366 path: impl Into<String>,
367 field_ids: Vec<i32>,
368 column_indices: Vec<i32>,
369 version: &LanceFileVersion,
370 file_size_bytes: Option<NonZero<u64>>,
371 ) -> Self {
372 let (major, minor) = version.to_numbers();
373 let data_file = DataFile::new(
374 path,
375 field_ids,
376 column_indices,
377 major,
378 minor,
379 file_size_bytes,
380 None,
381 );
382 self.files.push(data_file);
383 self
384 }
385
386 pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
387 self.physical_rows = Some(physical_rows);
388 self
389 }
390
391 pub fn add_file(
392 &mut self,
393 path: impl Into<String>,
394 field_ids: Vec<i32>,
395 column_indices: Vec<i32>,
396 version: &LanceFileVersion,
397 file_size_bytes: Option<NonZero<u64>>,
398 ) {
399 let (major, minor) = version.to_numbers();
400 self.files.push(DataFile::new(
401 path,
402 field_ids,
403 column_indices,
404 major,
405 minor,
406 file_size_bytes,
407 None,
408 ));
409 }
410
411 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
413 self.files
414 .push(DataFile::new_legacy(path, schema, None, None));
415 }
416
417 pub fn has_legacy_files(&self) -> bool {
419 self.files[0].is_legacy_file()
421 }
422
423 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
428 let Some(sample_file) = fragments
431 .iter()
432 .find(|f| !f.files.is_empty())
433 .map(|f| &f.files[0])
434 else {
435 return Ok(None);
436 };
437 let file_version = LanceFileVersion::try_from_major_minor(
438 sample_file.file_major_version,
439 sample_file.file_minor_version,
440 )?;
441 for frag in fragments {
443 for file in &frag.files {
444 let this_file_version = LanceFileVersion::try_from_major_minor(
445 file.file_major_version,
446 file.file_minor_version,
447 )?;
448 if file_version != this_file_version {
449 return Err(Error::invalid_input(
450 format!(
451 "All data files must have the same version. Detected both {} and {}",
452 file_version, this_file_version
453 ),
454 location!(),
455 ));
456 }
457 }
458 }
459 Ok(Some(file_version))
460 }
461}
462
463impl TryFrom<pb::DataFragment> for Fragment {
464 type Error = Error;
465
466 fn try_from(p: pb::DataFragment) -> Result<Self> {
467 let physical_rows = if p.physical_rows > 0 {
468 Some(p.physical_rows as usize)
469 } else {
470 None
471 };
472 Ok(Self {
473 id: p.id,
474 files: p
475 .files
476 .into_iter()
477 .map(DataFile::try_from)
478 .collect::<Result<_>>()?,
479 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
480 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
481 physical_rows,
482 last_updated_at_version_meta: p
483 .last_updated_at_version_sequence
484 .map(RowDatasetVersionMeta::try_from)
485 .transpose()?,
486 created_at_version_meta: p
487 .created_at_version_sequence
488 .map(RowDatasetVersionMeta::try_from)
489 .transpose()?,
490 })
491 }
492}
493
494impl From<&Fragment> for pb::DataFragment {
495 fn from(f: &Fragment) -> Self {
496 let deletion_file = f.deletion_file.as_ref().map(|f| {
497 let file_type = match f.file_type {
498 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
499 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
500 };
501 pb::DeletionFile {
502 read_version: f.read_version,
503 id: f.id,
504 file_type: file_type.into(),
505 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
506 base_id: f.base_id,
507 }
508 });
509
510 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
511 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
512 RowIdMeta::External(file) => {
513 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
514 path: file.path.clone(),
515 offset: file.offset,
516 size: file.size,
517 })
518 }
519 });
520 let last_updated_at_version_sequence =
521 last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
522 let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
523 Self {
524 id: f.id,
525 files: f.files.iter().map(pb::DataFile::from).collect(),
526 deletion_file,
527 row_id_sequence,
528 physical_rows: f.physical_rows.unwrap_or_default() as u64,
529 last_updated_at_version_sequence,
530 created_at_version_sequence,
531 }
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use arrow_schema::{
539 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
540 };
541 use object_store::path::Path;
542 use serde_json::{json, Value};
543
544 #[test]
545 fn test_new_fragment() {
546 let path = "foobar.lance";
547
548 let arrow_schema = ArrowSchema::new(vec![
549 ArrowField::new(
550 "s",
551 DataType::Struct(ArrowFields::from(vec![
552 ArrowField::new("si", DataType::Int32, false),
553 ArrowField::new("sb", DataType::Binary, true),
554 ])),
555 true,
556 ),
557 ArrowField::new("bool", DataType::Boolean, true),
558 ]);
559 let schema = Schema::try_from(&arrow_schema).unwrap();
560 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
561
562 assert_eq!(123, fragment.id);
563 assert_eq!(
564 fragment.files,
565 vec![DataFile::new_legacy_from_fields(
566 path.to_string(),
567 vec![0, 1, 2, 3],
568 None,
569 )]
570 )
571 }
572
573 #[test]
574 fn test_roundtrip_fragment() {
575 let mut fragment = Fragment::new(123);
576 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
577 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
578 fragment.deletion_file = Some(DeletionFile {
579 read_version: 123,
580 id: 456,
581 file_type: DeletionFileType::Array,
582 num_deleted_rows: Some(10),
583 base_id: None,
584 });
585
586 let proto = pb::DataFragment::from(&fragment);
587 let fragment2 = Fragment::try_from(proto).unwrap();
588 assert_eq!(fragment, fragment2);
589
590 fragment.deletion_file = None;
591 let proto = pb::DataFragment::from(&fragment);
592 let fragment2 = Fragment::try_from(proto).unwrap();
593 assert_eq!(fragment, fragment2);
594 }
595
596 #[test]
597 fn test_to_json() {
598 let mut fragment = Fragment::new(123);
599 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
600 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
601 fragment.deletion_file = Some(DeletionFile {
602 read_version: 123,
603 id: 456,
604 file_type: DeletionFileType::Array,
605 num_deleted_rows: Some(10),
606 base_id: None,
607 });
608
609 let json = serde_json::to_string(&fragment).unwrap();
610
611 let value: Value = serde_json::from_str(&json).unwrap();
612 assert_eq!(
613 value,
614 json!({
615 "id": 123,
616 "files":[
617 {"path": "foobar.lance", "fields": [0], "column_indices": [],
618 "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
619 "file_size_bytes": null, "base_id": null }
620 ],
621 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
622 "num_deleted_rows": 10, "base_id": null},
623 "physical_rows": None::<usize>}),
624 );
625
626 let frag2 = Fragment::from_json(&json).unwrap();
627 assert_eq!(fragment, frag2);
628 }
629
630 #[test]
631 fn data_file_validate_allows_extra_columns() {
632 let data_file = DataFile {
633 path: "foo.lance".to_string(),
634 fields: vec![1, 2],
635 column_indices: vec![0, 1, 2],
637 file_major_version: MAJOR_VERSION as u32,
638 file_minor_version: MINOR_VERSION as u32,
639 file_size_bytes: Default::default(),
640 base_id: None,
641 };
642
643 let base_path = Path::from("base");
644 data_file
645 .validate(&base_path)
646 .expect("validation should allow extra columns without field ids");
647 }
648}