1use std::num::NonZero;
5
6use deepsize::DeepSizeOf;
7use lance_core::Error;
8use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
9use lance_file::version::LanceFileVersion;
10use lance_io::utils::CachedFileSize;
11use object_store::path::Path;
12use serde::{Deserialize, Serialize};
13use snafu::location;
14
15use crate::format::pb;
16
17use lance_core::datatypes::Schema;
18use lance_core::error::Result;
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct DataFile {
25 pub path: String,
27 pub fields: Vec<i32>,
29 #[serde(default)]
34 pub column_indices: Vec<i32>,
35 #[serde(default)]
37 pub file_major_version: u32,
38 #[serde(default)]
40 pub file_minor_version: u32,
41
42 pub file_size_bytes: CachedFileSize,
44}
45
46impl DataFile {
47 pub fn new(
48 path: impl Into<String>,
49 fields: Vec<i32>,
50 column_indices: Vec<i32>,
51 file_major_version: u32,
52 file_minor_version: u32,
53 file_size_bytes: Option<NonZero<u64>>,
54 ) -> Self {
55 Self {
56 path: path.into(),
57 fields,
58 column_indices,
59 file_major_version,
60 file_minor_version,
61 file_size_bytes: file_size_bytes.into(),
62 }
63 }
64
65 pub fn new_unstarted(
67 path: impl Into<String>,
68 file_major_version: u32,
69 file_minor_version: u32,
70 ) -> Self {
71 Self {
72 path: path.into(),
73 fields: vec![],
74 column_indices: vec![],
75 file_major_version,
76 file_minor_version,
77 file_size_bytes: Default::default(),
78 }
79 }
80
81 pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
82 Self::new(
83 path,
84 fields,
85 vec![],
86 MAJOR_VERSION as u32,
87 MINOR_VERSION as u32,
88 None,
89 )
90 }
91
92 pub fn new_legacy(
93 path: impl Into<String>,
94 schema: &Schema,
95 file_size_bytes: Option<NonZero<u64>>,
96 ) -> Self {
97 let mut field_ids = schema.field_ids();
98 field_ids.sort();
99 Self::new(
100 path,
101 field_ids,
102 vec![],
103 MAJOR_VERSION as u32,
104 MINOR_VERSION as u32,
105 file_size_bytes,
106 )
107 }
108
109 pub fn schema(&self, full_schema: &Schema) -> Schema {
110 full_schema.project_by_ids(&self.fields, false)
111 }
112
113 pub fn is_legacy_file(&self) -> bool {
114 self.file_major_version == 0 && self.file_minor_version < 3
115 }
116
117 pub fn validate(&self, base_path: &Path) -> Result<()> {
118 if self.is_legacy_file() {
119 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
120 return Err(Error::corrupt_file(
121 base_path.child(self.path.clone()),
122 "contained unsorted or duplicate field ids",
123 location!(),
124 ));
125 }
126 } else if self.fields.len() != self.column_indices.len() {
127 return Err(Error::corrupt_file(
128 base_path.child(self.path.clone()),
129 "contained an unequal number of fields / column_indices",
130 location!(),
131 ));
132 }
133 Ok(())
134 }
135}
136
137impl From<&DataFile> for pb::DataFile {
138 fn from(df: &DataFile) -> Self {
139 Self {
140 path: df.path.clone(),
141 fields: df.fields.clone(),
142 column_indices: df.column_indices.clone(),
143 file_major_version: df.file_major_version,
144 file_minor_version: df.file_minor_version,
145 file_size_bytes: df.file_size_bytes.get().map_or(0, |v| v.get()),
146 }
147 }
148}
149
150impl TryFrom<pb::DataFile> for DataFile {
151 type Error = Error;
152
153 fn try_from(proto: pb::DataFile) -> Result<Self> {
154 Ok(Self {
155 path: proto.path,
156 fields: proto.fields,
157 column_indices: proto.column_indices,
158 file_major_version: proto.file_major_version,
159 file_minor_version: proto.file_minor_version,
160 file_size_bytes: CachedFileSize::new(proto.file_size_bytes),
161 })
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
166#[serde(rename_all = "lowercase")]
167pub enum DeletionFileType {
168 Array,
169 Bitmap,
170}
171
172impl DeletionFileType {
173 pub fn suffix(&self) -> &str {
175 match self {
176 Self::Array => "arrow",
177 Self::Bitmap => "bin",
178 }
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
183pub struct DeletionFile {
184 pub read_version: u64,
185 pub id: u64,
186 pub file_type: DeletionFileType,
187 pub num_deleted_rows: Option<usize>,
189}
190
191impl TryFrom<pb::DeletionFile> for DeletionFile {
192 type Error = Error;
193
194 fn try_from(value: pb::DeletionFile) -> Result<Self> {
195 let file_type = match value.file_type {
196 0 => DeletionFileType::Array,
197 1 => DeletionFileType::Bitmap,
198 _ => {
199 return Err(Error::NotSupported {
200 source: "Unknown deletion file type".into(),
201 location: location!(),
202 })
203 }
204 };
205 let num_deleted_rows = if value.num_deleted_rows == 0 {
206 None
207 } else {
208 Some(value.num_deleted_rows as usize)
209 };
210 Ok(Self {
211 read_version: value.read_version,
212 id: value.id,
213 file_type,
214 num_deleted_rows,
215 })
216 }
217}
218
219#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
221pub struct ExternalFile {
222 pub path: String,
223 pub offset: u64,
224 pub size: u64,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
229pub enum RowIdMeta {
230 Inline(Vec<u8>),
231 External(ExternalFile),
232}
233
234impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
235 type Error = Error;
236
237 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
238 match value {
239 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
240 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
241 Ok(Self::External(ExternalFile {
242 path: file.path.clone(),
243 offset: file.offset,
244 size: file.size,
245 }))
246 }
247 }
248 }
249}
250
251#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
256pub struct Fragment {
257 pub id: u64,
259
260 pub files: Vec<DataFile>,
262
263 #[serde(skip_serializing_if = "Option::is_none")]
265 pub deletion_file: Option<DeletionFile>,
266
267 #[serde(skip_serializing_if = "Option::is_none")]
269 pub row_id_meta: Option<RowIdMeta>,
270
271 pub physical_rows: Option<usize>,
275}
276
277impl Fragment {
278 pub fn new(id: u64) -> Self {
279 Self {
280 id,
281 files: vec![],
282 deletion_file: None,
283 row_id_meta: None,
284 physical_rows: None,
285 }
286 }
287
288 pub fn num_rows(&self) -> Option<usize> {
289 match (self.physical_rows, &self.deletion_file) {
290 (Some(len), None) => Some(len),
292 (
294 Some(len),
295 Some(DeletionFile {
296 num_deleted_rows: Some(num_deleted_rows),
297 ..
298 }),
299 ) => Some(len - num_deleted_rows),
300 _ => None,
301 }
302 }
303
304 pub fn from_json(json: &str) -> Result<Self> {
305 let fragment: Self = serde_json::from_str(json)?;
306 Ok(fragment)
307 }
308
309 pub fn with_file_legacy(
311 id: u64,
312 path: &str,
313 schema: &Schema,
314 physical_rows: Option<usize>,
315 ) -> Self {
316 Self {
317 id,
318 files: vec![DataFile::new_legacy(path, schema, None)],
319 deletion_file: None,
320 physical_rows,
321 row_id_meta: None,
322 }
323 }
324
325 pub fn with_file(
326 mut self,
327 path: impl Into<String>,
328 field_ids: Vec<i32>,
329 column_indices: Vec<i32>,
330 version: &LanceFileVersion,
331 file_size_bytes: Option<NonZero<u64>>,
332 ) -> Self {
333 let (major, minor) = version.to_numbers();
334 let data_file = DataFile::new(
335 path,
336 field_ids,
337 column_indices,
338 major,
339 minor,
340 file_size_bytes,
341 );
342 self.files.push(data_file);
343 self
344 }
345
346 pub fn with_physical_rows(mut self, physical_rows: usize) -> Self {
347 self.physical_rows = Some(physical_rows);
348 self
349 }
350
351 pub fn add_file(
352 &mut self,
353 path: impl Into<String>,
354 field_ids: Vec<i32>,
355 column_indices: Vec<i32>,
356 version: &LanceFileVersion,
357 file_size_bytes: Option<NonZero<u64>>,
358 ) {
359 let (major, minor) = version.to_numbers();
360 self.files.push(DataFile::new(
361 path,
362 field_ids,
363 column_indices,
364 major,
365 minor,
366 file_size_bytes,
367 ));
368 }
369
370 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
372 self.files.push(DataFile::new_legacy(path, schema, None));
373 }
374
375 pub fn has_legacy_files(&self) -> bool {
377 self.files[0].is_legacy_file()
379 }
380
381 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
386 let Some(sample_file) = fragments
389 .iter()
390 .find(|f| !f.files.is_empty())
391 .map(|f| &f.files[0])
392 else {
393 return Ok(None);
394 };
395 let file_version = LanceFileVersion::try_from_major_minor(
396 sample_file.file_major_version,
397 sample_file.file_minor_version,
398 )?;
399 for frag in fragments {
401 for file in &frag.files {
402 let this_file_version = LanceFileVersion::try_from_major_minor(
403 file.file_major_version,
404 file.file_minor_version,
405 )?;
406 if file_version != this_file_version {
407 return Err(Error::invalid_input(
408 format!(
409 "All data files must have the same version. Detected both {} and {}",
410 file_version, this_file_version
411 ),
412 location!(),
413 ));
414 }
415 }
416 }
417 Ok(Some(file_version))
418 }
419}
420
421impl TryFrom<pb::DataFragment> for Fragment {
422 type Error = Error;
423
424 fn try_from(p: pb::DataFragment) -> Result<Self> {
425 let physical_rows = if p.physical_rows > 0 {
426 Some(p.physical_rows as usize)
427 } else {
428 None
429 };
430 Ok(Self {
431 id: p.id,
432 files: p
433 .files
434 .into_iter()
435 .map(DataFile::try_from)
436 .collect::<Result<_>>()?,
437 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
438 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
439 physical_rows,
440 })
441 }
442}
443
444impl From<&Fragment> for pb::DataFragment {
445 fn from(f: &Fragment) -> Self {
446 let deletion_file = f.deletion_file.as_ref().map(|f| {
447 let file_type = match f.file_type {
448 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
449 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
450 };
451 pb::DeletionFile {
452 read_version: f.read_version,
453 id: f.id,
454 file_type: file_type.into(),
455 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
456 }
457 });
458
459 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
460 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
461 RowIdMeta::External(file) => {
462 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
463 path: file.path.clone(),
464 offset: file.offset,
465 size: file.size,
466 })
467 }
468 });
469
470 Self {
471 id: f.id,
472 files: f.files.iter().map(pb::DataFile::from).collect(),
473 deletion_file,
474 row_id_sequence,
475 physical_rows: f.physical_rows.unwrap_or_default() as u64,
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use arrow_schema::{
484 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
485 };
486 use serde_json::{json, Value};
487
488 #[test]
489 fn test_new_fragment() {
490 let path = "foobar.lance";
491
492 let arrow_schema = ArrowSchema::new(vec![
493 ArrowField::new(
494 "s",
495 DataType::Struct(ArrowFields::from(vec![
496 ArrowField::new("si", DataType::Int32, false),
497 ArrowField::new("sb", DataType::Binary, true),
498 ])),
499 true,
500 ),
501 ArrowField::new("bool", DataType::Boolean, true),
502 ]);
503 let schema = Schema::try_from(&arrow_schema).unwrap();
504 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
505
506 assert_eq!(123, fragment.id);
507 assert_eq!(
508 fragment.files,
509 vec![DataFile::new_legacy_from_fields(
510 path.to_string(),
511 vec![0, 1, 2, 3],
512 )]
513 )
514 }
515
516 #[test]
517 fn test_roundtrip_fragment() {
518 let mut fragment = Fragment::new(123);
519 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
520 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
521 fragment.deletion_file = Some(DeletionFile {
522 read_version: 123,
523 id: 456,
524 file_type: DeletionFileType::Array,
525 num_deleted_rows: Some(10),
526 });
527
528 let proto = pb::DataFragment::from(&fragment);
529 let fragment2 = Fragment::try_from(proto).unwrap();
530 assert_eq!(fragment, fragment2);
531
532 fragment.deletion_file = None;
533 let proto = pb::DataFragment::from(&fragment);
534 let fragment2 = Fragment::try_from(proto).unwrap();
535 assert_eq!(fragment, fragment2);
536 }
537
538 #[test]
539 fn test_to_json() {
540 let mut fragment = Fragment::new(123);
541 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
542 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
543 fragment.deletion_file = Some(DeletionFile {
544 read_version: 123,
545 id: 456,
546 file_type: DeletionFileType::Array,
547 num_deleted_rows: Some(10),
548 });
549
550 let json = serde_json::to_string(&fragment).unwrap();
551
552 let value: Value = serde_json::from_str(&json).unwrap();
553 assert_eq!(
554 value,
555 json!({
556 "id": 123,
557 "files":[
558 {"path": "foobar.lance", "fields": [0], "column_indices": [],
559 "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION,
560 "file_size_bytes": null }
561 ],
562 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
563 "num_deleted_rows": 10},
564 "physical_rows": None::<usize>}),
565 );
566
567 let frag2 = Fragment::from_json(&json).unwrap();
568 assert_eq!(fragment, frag2);
569 }
570}