1use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use lance_core::datatypes::Schema;
18use lance_core::error::Result;
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct DataFile {
25 pub path: String,
27 pub fields: Vec<i32>,
29 #[serde(default)]
34 pub column_indices: Vec<i32>,
35 #[serde(default)]
37 pub file_major_version: u32,
38 #[serde(default)]
40 pub file_minor_version: u32,
41
42 pub file_size_bytes: CachedFileSize,
44
45 pub base_id: Option<u32>,
47}
48
49impl DataFile {
50 pub fn new(
51 path: impl Into<String>,
52 fields: Vec<i32>,
53 column_indices: Vec<i32>,
54 file_major_version: u32,
55 file_minor_version: u32,
56 file_size_bytes: Option<NonZero<u64>>,
57 ) -> Self {
58 Self {
59 path: path.into(),
60 fields,
61 column_indices,
62 file_major_version,
63 file_minor_version,
64 file_size_bytes: file_size_bytes.into(),
65 base_id: None,
66 }
67 }
68
69 pub fn new_unstarted(
71 path: impl Into<String>,
72 file_major_version: u32,
73 file_minor_version: u32,
74 ) -> Self {
75 Self {
76 path: path.into(),
77 fields: vec![],
78 column_indices: vec![],
79 file_major_version,
80 file_minor_version,
81 file_size_bytes: Default::default(),
82 base_id: None,
83 }
84 }
85
86 pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
87 Self::new(
88 path,
89 fields,
90 vec![],
91 MAJOR_VERSION as u32,
92 MINOR_VERSION as u32,
93 None,
94 )
95 }
96
97 pub fn new_legacy(
98 path: impl Into<String>,
99 schema: &Schema,
100 file_size_bytes: Option<NonZero<u64>>,
101 ) -> Self {
102 let mut field_ids = schema.field_ids();
103 field_ids.sort();
104 Self::new(
105 path,
106 field_ids,
107 vec![],
108 MAJOR_VERSION as u32,
109 MINOR_VERSION as u32,
110 file_size_bytes,
111 )
112 }
113
114 pub fn schema(&self, full_schema: &Schema) -> Schema {
115 full_schema.project_by_ids(&self.fields, false)
116 }
117
118 pub fn is_legacy_file(&self) -> bool {
119 self.file_major_version == 0 && self.file_minor_version < 3
120 }
121
122 pub fn validate(&self, base_path: &Path) -> Result<()> {
123 if self.is_legacy_file() {
124 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
125 return Err(Error::corrupt_file(
126 base_path.child(self.path.clone()),
127 "contained unsorted or duplicate field ids",
128 location!(),
129 ));
130 }
131 } else if self.fields.len() != self.column_indices.len() {
132 return Err(Error::corrupt_file(
133 base_path.child(self.path.clone()),
134 "contained an unequal number of fields / column_indices",
135 location!(),
136 ));
137 }
138 Ok(())
139 }
140}
141
142impl From<&DataFile> for pb::DataFile {
143 fn from(df: &DataFile) -> Self {
144 Self {
145 path: df.path.clone(),
146 fields: df.fields.clone(),
147 column_indices: df.column_indices.clone(),
148 file_major_version: df.file_major_version,
149 file_minor_version: df.file_minor_version,
150 file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
151 base_id: df.base_id,
152 }
153 }
154}
155
156impl TryFrom<pb::DataFile> for DataFile {
157 type Error = Error;
158
159 fn try_from(proto: pb::DataFile) -> Result<Self> {
160 Ok(Self {
161 path: proto.path,
162 fields: proto.fields,
163 column_indices: proto.column_indices,
164 file_major_version: proto.file_major_version,
165 file_minor_version: proto.file_minor_version,
166 file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
167 base_id: proto.base_id,
168 })
169 }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
173#[serde(rename_all = "lowercase")]
174pub enum DeletionFileType {
175 Array,
176 Bitmap,
177}
178
179impl DeletionFileType {
180 pub fn suffix(&self) -> &str {
182 match self {
183 Self::Array => "arrow",
184 Self::Bitmap => "bin",
185 }
186 }
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
190pub struct DeletionFile {
191 pub read_version: u64,
192 pub id: u64,
193 pub file_type: DeletionFileType,
194 pub num_deleted_rows: Option<usize>,
196 pub base_id: Option<u32>,
197}
198
199impl TryFrom<pb::DeletionFile> for DeletionFile {
200 type Error = Error;
201
202 fn try_from(value: pb::DeletionFile) -> Result<Self> {
203 let file_type = match value.file_type {
204 0 => DeletionFileType::Array,
205 1 => DeletionFileType::Bitmap,
206 _ => {
207 return Err(Error::NotSupported {
208 source: "Unknown deletion file type".into(),
209 location: location!(),
210 })
211 }
212 };
213 let num_deleted_rows = if value.num_deleted_rows == 0 {
214 None
215 } else {
216 Some(value.num_deleted_rows as usize)
217 };
218 Ok(Self {
219 read_version: value.read_version,
220 id: value.id,
221 file_type,
222 num_deleted_rows,
223 base_id: value.base_id,
224 })
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
230pub struct ExternalFile {
231 pub path: String,
232 pub offset: u64,
233 pub size: u64,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
238pub enum RowIdMeta {
239 Inline(Vec<u8>),
240 External(ExternalFile),
241}
242
243impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
244 type Error = Error;
245
246 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
247 match value {
248 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
249 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
250 Ok(Self::External(ExternalFile {
251 path: file.path.clone(),
252 offset: file.offset,
253 size: file.size,
254 }))
255 }
256 }
257 }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
265pub struct Fragment {
266 pub id: u64,
268
269 pub files: Vec<DataFile>,
271
272 #[serde(skip_serializing_if = "Option::is_none")]
274 pub deletion_file: Option<DeletionFile>,
275
276 #[serde(skip_serializing_if = "Option::is_none")]
278 pub row_id_meta: Option<RowIdMeta>,
279
280 pub physical_rows: Option<usize>,
284}
285
286impl Fragment {
287 pub fn new(id: u64) -> Self {
288 Self {
289 id,
290 files: vec![],
291 deletion_file: None,
292 row_id_meta: None,
293 physical_rows: None,
294 }
295 }
296
297 pub fn num_rows(&self) -> Option<usize> {
298 match (self.physical_rows, &self.deletion_file) {
299 (Some(len), None) => Some(len),
301 (
303 Some(len),
304 Some(DeletionFile {
305 num_deleted_rows: Some(num_deleted_rows),
306 ..
307 }),
308 ) => Some(len - num_deleted_rows),
309 _ => None,
310 }
311 }
312
313 pub fn from_json(json: &str) -> Result<Self> {
314 let fragment: Self = serde_json::from_str(json)?;
315 Ok(fragment)
316 }
317
318 pub fn with_file_legacy(
320 id: u64,
321 path: &str,
322 schema: &Schema,
323 physical_rows: Option<usize>,
324 ) -> Self {
325 Self {
326 id,
327 files: vec![DataFile::new_legacy(path, schema, None)],
328 deletion_file: None,
329 physical_rows,
330 row_id_meta: None,
331 }
332 }
333
334 pub fn with_file(
335 mut self,
336 path: impl Into<String>,
337 field_ids: Vec<i32>,
338 column_indices: Vec<i32>,
339 version: &LanceFileVersion,
340 file_size_bytes: Option<NonZero<u64>>,
341 ) -> Self {
342 let (major, minor) = version.to_numbers();
343 let data_file = DataFile::new(
344 path,
345 field_ids,
346 column_indices,
347 major,
348 minor,
349 file_size_bytes,
350 );
351 self.files.push(data_file);
352 self
353 }
354
355 pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
356 self.physical_rows = Some(physical_rows);
357 self
358 }
359
360 pub fn add_file(
361 &mut self,
362 path: impl Into<String>,
363 field_ids: Vec<i32>,
364 column_indices: Vec<i32>,
365 version: &LanceFileVersion,
366 file_size_bytes: Option<NonZero<u64>>,
367 ) {
368 let (major, minor) = version.to_numbers();
369 self.files.push(DataFile::new(
370 path,
371 field_ids,
372 column_indices,
373 major,
374 minor,
375 file_size_bytes,
376 ));
377 }
378
379 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
381 self.files.push(DataFile::new_legacy(path, schema, None));
382 }
383
384 pub fn has_legacy_files(&self) -> bool {
386 self.files[0].is_legacy_file()
388 }
389
390 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
395 let Some(sample_file) = fragments
398 .iter()
399 .find(|f| !f.files.is_empty())
400 .map(|f| &f.files[0])
401 else {
402 return Ok(None);
403 };
404 let file_version = LanceFileVersion::try_from_major_minor(
405 sample_file.file_major_version,
406 sample_file.file_minor_version,
407 )?;
408 for frag in fragments {
410 for file in &frag.files {
411 let this_file_version = LanceFileVersion::try_from_major_minor(
412 file.file_major_version,
413 file.file_minor_version,
414 )?;
415 if file_version != this_file_version {
416 return Err(Error::invalid_input(
417 format!(
418 "All data files must have the same version. Detected both {} and {}",
419 file_version, this_file_version
420 ),
421 location!(),
422 ));
423 }
424 }
425 }
426 Ok(Some(file_version))
427 }
428}
429
430impl TryFrom<pb::DataFragment> for Fragment {
431 type Error = Error;
432
433 fn try_from(p: pb::DataFragment) -> Result<Self> {
434 let physical_rows = if p.physical_rows > 0 {
435 Some(p.physical_rows as usize)
436 } else {
437 None
438 };
439 Ok(Self {
440 id: p.id,
441 files: p
442 .files
443 .into_iter()
444 .map(DataFile::try_from)
445 .collect::<Result<_>>()?,
446 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
447 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
448 physical_rows,
449 })
450 }
451}
452
453impl From<&Fragment> for pb::DataFragment {
454 fn from(f: &Fragment) -> Self {
455 let deletion_file = f.deletion_file.as_ref().map(|f| {
456 let file_type = match f.file_type {
457 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
458 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
459 };
460 pb::DeletionFile {
461 read_version: f.read_version,
462 id: f.id,
463 file_type: file_type.into(),
464 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
465 base_id: f.base_id,
466 }
467 });
468
469 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
470 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
471 RowIdMeta::External(file) => {
472 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
473 path: file.path.clone(),
474 offset: file.offset,
475 size: file.size,
476 })
477 }
478 });
479
480 Self {
481 id: f.id,
482 files: f.files.iter().map(pb::DataFile::from).collect(),
483 deletion_file,
484 row_id_sequence,
485 physical_rows: f.physical_rows.unwrap_or_default() as u64,
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use arrow_schema::{
494 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
495 };
496 use serde_json::{json, Value};
497
498 #[test]
499 fn test_new_fragment() {
500 let path = "foobar.lance";
501
502 let arrow_schema = ArrowSchema::new(vec![
503 ArrowField::new(
504 "s",
505 DataType::Struct(ArrowFields::from(vec![
506 ArrowField::new("si", DataType::Int32, false),
507 ArrowField::new("sb", DataType::Binary, true),
508 ])),
509 true,
510 ),
511 ArrowField::new("bool", DataType::Boolean, true),
512 ]);
513 let schema = Schema::try_from(&arrow_schema).unwrap();
514 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
515
516 assert_eq!(123, fragment.id);
517 assert_eq!(
518 fragment.files,
519 vec![DataFile::new_legacy_from_fields(
520 path.to_string(),
521 vec![0, 1, 2, 3],
522 )]
523 )
524 }
525
526 #[test]
527 fn test_roundtrip_fragment() {
528 let mut fragment = Fragment::new(123);
529 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
530 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
531 fragment.deletion_file = Some(DeletionFile {
532 read_version: 123,
533 id: 456,
534 file_type: DeletionFileType::Array,
535 num_deleted_rows: Some(10),
536 base_id: None,
537 });
538
539 let proto = pb::DataFragment::from(&fragment);
540 let fragment2 = Fragment::try_from(proto).unwrap();
541 assert_eq!(fragment, fragment2);
542
543 fragment.deletion_file = None;
544 let proto = pb::DataFragment::from(&fragment);
545 let fragment2 = Fragment::try_from(proto).unwrap();
546 assert_eq!(fragment, fragment2);
547 }
548
549 #[test]
550 fn test_to_json() {
551 let mut fragment = Fragment::new(123);
552 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
553 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
554 fragment.deletion_file = Some(DeletionFile {
555 read_version: 123,
556 id: 456,
557 file_type: DeletionFileType::Array,
558 num_deleted_rows: Some(10),
559 base_id: None,
560 });
561
562 let json = serde_json::to_string(&fragment).unwrap();
563
564 let value: Value = serde_json::from_str(&json).unwrap();
565 assert_eq!(
566 value,
567 json!({
568 "id": 123,
569 "files":[
570 {"path": "foobar.lance", "fields": [0], "column_indices": [],
571 "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
572 "file_size_bytes": null, "base_id": null }
573 ],
574 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
575 "num_deleted_rows": 10, "base_id": null},
576 "physical_rows": None::<usize>}),
577 );
578
579 let frag2 = Fragment::from_json(&json).unwrap();
580 assert_eq!(fragment, frag2);
581 }
582}