1use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use crate::rowids::version::{
18 created_at_version_meta_to_pb, last_updated_at_version_meta_to_pb, RowDatasetVersionMeta,
19};
20use lance_core::datatypes::Schema;
21use lance_core::error::Result;
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
27pub struct DataFile {
28 pub path: String,
30 pub fields: Vec<i32>,
32 #[serde(default)]
37 pub column_indices: Vec<i32>,
38 #[serde(default)]
40 pub file_major_version: u32,
41 #[serde(default)]
43 pub file_minor_version: u32,
44
45 pub file_size_bytes: CachedFileSize,
47
48 pub base_id: Option<u32>,
50}
51
52impl DataFile {
53 pub fn new(
54 path: impl Into<String>,
55 fields: Vec<i32>,
56 column_indices: Vec<i32>,
57 file_major_version: u32,
58 file_minor_version: u32,
59 file_size_bytes: Option<NonZero<u64>>,
60 base_id: Option<u32>,
61 ) -> Self {
62 Self {
63 path: path.into(),
64 fields,
65 column_indices,
66 file_major_version,
67 file_minor_version,
68 file_size_bytes: file_size_bytes.into(),
69 base_id,
70 }
71 }
72
73 pub fn new_unstarted(
75 path: impl Into<String>,
76 file_major_version: u32,
77 file_minor_version: u32,
78 ) -> Self {
79 Self {
80 path: path.into(),
81 fields: vec![],
82 column_indices: vec![],
83 file_major_version,
84 file_minor_version,
85 file_size_bytes: Default::default(),
86 base_id: None,
87 }
88 }
89
90 pub fn new_legacy_from_fields(
91 path: impl Into<String>,
92 fields: Vec<i32>,
93 base_id: Option<u32>,
94 ) -> Self {
95 Self::new(
96 path,
97 fields,
98 vec![],
99 MAJOR_VERSION as u32,
100 MINOR_VERSION as u32,
101 None,
102 base_id,
103 )
104 }
105
106 pub fn new_legacy(
107 path: impl Into<String>,
108 schema: &Schema,
109 file_size_bytes: Option<NonZero<u64>>,
110 base_id: Option<u32>,
111 ) -> Self {
112 let mut field_ids = schema.field_ids();
113 field_ids.sort();
114 Self::new(
115 path,
116 field_ids,
117 vec![],
118 MAJOR_VERSION as u32,
119 MINOR_VERSION as u32,
120 file_size_bytes,
121 base_id,
122 )
123 }
124
125 pub fn schema(&self, full_schema: &Schema) -> Schema {
126 full_schema.project_by_ids(&self.fields, false)
127 }
128
129 pub fn is_legacy_file(&self) -> bool {
130 self.file_major_version == 0 && self.file_minor_version < 3
131 }
132
133 pub fn validate(&self, base_path: &Path) -> Result<()> {
134 if self.is_legacy_file() {
135 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
136 return Err(Error::corrupt_file(
137 base_path.child(self.path.clone()),
138 "contained unsorted or duplicate field ids",
139 location!(),
140 ));
141 }
142 } else if self.fields.len() != self.column_indices.len() {
143 return Err(Error::corrupt_file(
144 base_path.child(self.path.clone()),
145 "contained an unequal number of fields / column_indices",
146 location!(),
147 ));
148 }
149 Ok(())
150 }
151}
152
153impl From<&DataFile> for pb::DataFile {
154 fn from(df: &DataFile) -> Self {
155 Self {
156 path: df.path.clone(),
157 fields: df.fields.clone(),
158 column_indices: df.column_indices.clone(),
159 file_major_version: df.file_major_version,
160 file_minor_version: df.file_minor_version,
161 file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
162 base_id: df.base_id,
163 }
164 }
165}
166
167impl TryFrom<pb::DataFile> for DataFile {
168 type Error = Error;
169
170 fn try_from(proto: pb::DataFile) -> Result<Self> {
171 Ok(Self {
172 path: proto.path,
173 fields: proto.fields,
174 column_indices: proto.column_indices,
175 file_major_version: proto.file_major_version,
176 file_minor_version: proto.file_minor_version,
177 file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
178 base_id: proto.base_id,
179 })
180 }
181}
182
183#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
184#[serde(rename_all = "lowercase")]
185pub enum DeletionFileType {
186 Array,
187 Bitmap,
188}
189
190impl DeletionFileType {
191 pub fn suffix(&self) -> &str {
193 match self {
194 Self::Array => "arrow",
195 Self::Bitmap => "bin",
196 }
197 }
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
201pub struct DeletionFile {
202 pub read_version: u64,
203 pub id: u64,
204 pub file_type: DeletionFileType,
205 pub num_deleted_rows: Option<usize>,
207 pub base_id: Option<u32>,
208}
209
210impl TryFrom<pb::DeletionFile> for DeletionFile {
211 type Error = Error;
212
213 fn try_from(value: pb::DeletionFile) -> Result<Self> {
214 let file_type = match value.file_type {
215 0 => DeletionFileType::Array,
216 1 => DeletionFileType::Bitmap,
217 _ => {
218 return Err(Error::NotSupported {
219 source: "Unknown deletion file type".into(),
220 location: location!(),
221 })
222 }
223 };
224 let num_deleted_rows = if value.num_deleted_rows == 0 {
225 None
226 } else {
227 Some(value.num_deleted_rows as usize)
228 };
229 Ok(Self {
230 read_version: value.read_version,
231 id: value.id,
232 file_type,
233 num_deleted_rows,
234 base_id: value.base_id,
235 })
236 }
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
241pub struct ExternalFile {
242 pub path: String,
243 pub offset: u64,
244 pub size: u64,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
249pub enum RowIdMeta {
250 Inline(Vec<u8>),
251 External(ExternalFile),
252}
253
254impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
255 type Error = Error;
256
257 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
258 match value {
259 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
260 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
261 Ok(Self::External(ExternalFile {
262 path: file.path.clone(),
263 offset: file.offset,
264 size: file.size,
265 }))
266 }
267 }
268 }
269}
270
271#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
276pub struct Fragment {
277 pub id: u64,
279
280 pub files: Vec<DataFile>,
282
283 #[serde(skip_serializing_if = "Option::is_none")]
285 pub deletion_file: Option<DeletionFile>,
286
287 #[serde(skip_serializing_if = "Option::is_none")]
289 pub row_id_meta: Option<RowIdMeta>,
290
291 pub physical_rows: Option<usize>,
295
296 #[serde(skip_serializing_if = "Option::is_none")]
298 pub last_updated_at_version_meta: Option<RowDatasetVersionMeta>,
299
300 #[serde(skip_serializing_if = "Option::is_none")]
302 pub created_at_version_meta: Option<RowDatasetVersionMeta>,
303}
304
305impl Fragment {
306 pub fn new(id: u64) -> Self {
307 Self {
308 id,
309 files: vec![],
310 deletion_file: None,
311 row_id_meta: None,
312 physical_rows: None,
313 last_updated_at_version_meta: None,
314 created_at_version_meta: None,
315 }
316 }
317
318 pub fn num_rows(&self) -> Option<usize> {
319 match (self.physical_rows, &self.deletion_file) {
320 (Some(len), None) => Some(len),
322 (
324 Some(len),
325 Some(DeletionFile {
326 num_deleted_rows: Some(num_deleted_rows),
327 ..
328 }),
329 ) => Some(len - num_deleted_rows),
330 _ => None,
331 }
332 }
333
334 pub fn from_json(json: &str) -> Result<Self> {
335 let fragment: Self = serde_json::from_str(json)?;
336 Ok(fragment)
337 }
338
339 pub fn with_file_legacy(
341 id: u64,
342 path: &str,
343 schema: &Schema,
344 physical_rows: Option<usize>,
345 ) -> Self {
346 Self {
347 id,
348 files: vec![DataFile::new_legacy(path, schema, None, None)],
349 deletion_file: None,
350 physical_rows,
351 row_id_meta: None,
352 last_updated_at_version_meta: None,
353 created_at_version_meta: None,
354 }
355 }
356
357 pub fn with_file(
358 mut self,
359 path: impl Into<String>,
360 field_ids: Vec<i32>,
361 column_indices: Vec<i32>,
362 version: &LanceFileVersion,
363 file_size_bytes: Option<NonZero<u64>>,
364 ) -> Self {
365 let (major, minor) = version.to_numbers();
366 let data_file = DataFile::new(
367 path,
368 field_ids,
369 column_indices,
370 major,
371 minor,
372 file_size_bytes,
373 None,
374 );
375 self.files.push(data_file);
376 self
377 }
378
379 pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
380 self.physical_rows = Some(physical_rows);
381 self
382 }
383
384 pub fn add_file(
385 &mut self,
386 path: impl Into<String>,
387 field_ids: Vec<i32>,
388 column_indices: Vec<i32>,
389 version: &LanceFileVersion,
390 file_size_bytes: Option<NonZero<u64>>,
391 ) {
392 let (major, minor) = version.to_numbers();
393 self.files.push(DataFile::new(
394 path,
395 field_ids,
396 column_indices,
397 major,
398 minor,
399 file_size_bytes,
400 None,
401 ));
402 }
403
404 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
406 self.files
407 .push(DataFile::new_legacy(path, schema, None, None));
408 }
409
410 pub fn has_legacy_files(&self) -> bool {
412 self.files[0].is_legacy_file()
414 }
415
416 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
421 let Some(sample_file) = fragments
424 .iter()
425 .find(|f| !f.files.is_empty())
426 .map(|f| &f.files[0])
427 else {
428 return Ok(None);
429 };
430 let file_version = LanceFileVersion::try_from_major_minor(
431 sample_file.file_major_version,
432 sample_file.file_minor_version,
433 )?;
434 for frag in fragments {
436 for file in &frag.files {
437 let this_file_version = LanceFileVersion::try_from_major_minor(
438 file.file_major_version,
439 file.file_minor_version,
440 )?;
441 if file_version != this_file_version {
442 return Err(Error::invalid_input(
443 format!(
444 "All data files must have the same version. Detected both {} and {}",
445 file_version, this_file_version
446 ),
447 location!(),
448 ));
449 }
450 }
451 }
452 Ok(Some(file_version))
453 }
454}
455
456impl TryFrom<pb::DataFragment> for Fragment {
457 type Error = Error;
458
459 fn try_from(p: pb::DataFragment) -> Result<Self> {
460 let physical_rows = if p.physical_rows > 0 {
461 Some(p.physical_rows as usize)
462 } else {
463 None
464 };
465 Ok(Self {
466 id: p.id,
467 files: p
468 .files
469 .into_iter()
470 .map(DataFile::try_from)
471 .collect::<Result<_>>()?,
472 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
473 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
474 physical_rows,
475 last_updated_at_version_meta: p
476 .last_updated_at_version_sequence
477 .map(RowDatasetVersionMeta::try_from)
478 .transpose()?,
479 created_at_version_meta: p
480 .created_at_version_sequence
481 .map(RowDatasetVersionMeta::try_from)
482 .transpose()?,
483 })
484 }
485}
486
487impl From<&Fragment> for pb::DataFragment {
488 fn from(f: &Fragment) -> Self {
489 let deletion_file = f.deletion_file.as_ref().map(|f| {
490 let file_type = match f.file_type {
491 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
492 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
493 };
494 pb::DeletionFile {
495 read_version: f.read_version,
496 id: f.id,
497 file_type: file_type.into(),
498 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
499 base_id: f.base_id,
500 }
501 });
502
503 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
504 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
505 RowIdMeta::External(file) => {
506 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
507 path: file.path.clone(),
508 offset: file.offset,
509 size: file.size,
510 })
511 }
512 });
513 let last_updated_at_version_sequence =
514 last_updated_at_version_meta_to_pb(&f.last_updated_at_version_meta);
515 let created_at_version_sequence = created_at_version_meta_to_pb(&f.created_at_version_meta);
516 Self {
517 id: f.id,
518 files: f.files.iter().map(pb::DataFile::from).collect(),
519 deletion_file,
520 row_id_sequence,
521 physical_rows: f.physical_rows.unwrap_or_default() as u64,
522 last_updated_at_version_sequence,
523 created_at_version_sequence,
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use arrow_schema::{
532 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
533 };
534 use serde_json::{json, Value};
535
536 #[test]
537 fn test_new_fragment() {
538 let path = "foobar.lance";
539
540 let arrow_schema = ArrowSchema::new(vec![
541 ArrowField::new(
542 "s",
543 DataType::Struct(ArrowFields::from(vec![
544 ArrowField::new("si", DataType::Int32, false),
545 ArrowField::new("sb", DataType::Binary, true),
546 ])),
547 true,
548 ),
549 ArrowField::new("bool", DataType::Boolean, true),
550 ]);
551 let schema = Schema::try_from(&arrow_schema).unwrap();
552 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
553
554 assert_eq!(123, fragment.id);
555 assert_eq!(
556 fragment.files,
557 vec![DataFile::new_legacy_from_fields(
558 path.to_string(),
559 vec![0, 1, 2, 3],
560 None,
561 )]
562 )
563 }
564
565 #[test]
566 fn test_roundtrip_fragment() {
567 let mut fragment = Fragment::new(123);
568 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
569 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
570 fragment.deletion_file = Some(DeletionFile {
571 read_version: 123,
572 id: 456,
573 file_type: DeletionFileType::Array,
574 num_deleted_rows: Some(10),
575 base_id: None,
576 });
577
578 let proto = pb::DataFragment::from(&fragment);
579 let fragment2 = Fragment::try_from(proto).unwrap();
580 assert_eq!(fragment, fragment2);
581
582 fragment.deletion_file = None;
583 let proto = pb::DataFragment::from(&fragment);
584 let fragment2 = Fragment::try_from(proto).unwrap();
585 assert_eq!(fragment, fragment2);
586 }
587
588 #[test]
589 fn test_to_json() {
590 let mut fragment = Fragment::new(123);
591 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
592 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
593 fragment.deletion_file = Some(DeletionFile {
594 read_version: 123,
595 id: 456,
596 file_type: DeletionFileType::Array,
597 num_deleted_rows: Some(10),
598 base_id: None,
599 });
600
601 let json = serde_json::to_string(&fragment).unwrap();
602
603 let value: Value = serde_json::from_str(&json).unwrap();
604 assert_eq!(
605 value,
606 json!({
607 "id": 123,
608 "files":[
609 {"path": "foobar.lance", "fields": [0], "column_indices": [],
610 "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
611 "file_size_bytes": null, "base_id": null }
612 ],
613 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
614 "num_deleted_rows": 10, "base_id": null},
615 "physical_rows": None::<usize>}),
616 );
617
618 let frag2 = Fragment::from_json(&json).unwrap();
619 assert_eq!(fragment, frag2);
620 }
621}