1use deepsize::DeepSizeOf;
5use lance_core::Error;
6use lance_file::format::{MAJOR_VERSION, MINOR_VERSION};
7use lance_file::version::LanceFileVersion;
8use object_store::path::Path;
9use serde::{Deserialize, Serialize};
10use snafu::location;
11
12use crate::format::pb;
13
14use lance_core::datatypes::Schema;
15use lance_core::error::Result;
16
17#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
21pub struct DataFile {
22 pub path: String,
24 pub fields: Vec<i32>,
26 #[serde(default)]
31 pub column_indices: Vec<i32>,
32 #[serde(default)]
34 pub file_major_version: u32,
35 #[serde(default)]
37 pub file_minor_version: u32,
38}
39
40impl DataFile {
41 pub fn new(
42 path: impl Into<String>,
43 fields: Vec<i32>,
44 column_indices: Vec<i32>,
45 file_major_version: u32,
46 file_minor_version: u32,
47 ) -> Self {
48 Self {
49 path: path.into(),
50 fields,
51 column_indices,
52 file_major_version,
53 file_minor_version,
54 }
55 }
56
57 pub fn new_unstarted(
59 path: impl Into<String>,
60 file_major_version: u32,
61 file_minor_version: u32,
62 ) -> Self {
63 Self {
64 path: path.into(),
65 fields: vec![],
66 column_indices: vec![],
67 file_major_version,
68 file_minor_version,
69 }
70 }
71
72 pub fn new_legacy_from_fields(path: impl Into<String>, fields: Vec<i32>) -> Self {
73 Self::new(
74 path,
75 fields,
76 vec![],
77 MAJOR_VERSION as u32,
78 MINOR_VERSION as u32,
79 )
80 }
81
82 pub fn new_legacy(path: impl Into<String>, schema: &Schema) -> Self {
83 let mut field_ids = schema.field_ids();
84 field_ids.sort();
85 Self::new(
86 path,
87 field_ids,
88 vec![],
89 MAJOR_VERSION as u32,
90 MINOR_VERSION as u32,
91 )
92 }
93
94 pub fn schema(&self, full_schema: &Schema) -> Schema {
95 full_schema.project_by_ids(&self.fields, false)
96 }
97
98 pub fn is_legacy_file(&self) -> bool {
99 self.file_major_version == 0 && self.file_minor_version < 3
100 }
101
102 pub fn validate(&self, base_path: &Path) -> Result<()> {
103 if self.is_legacy_file() {
104 if !self.fields.windows(2).all(|w| w[0] < w[1]) {
105 return Err(Error::corrupt_file(
106 base_path.child(self.path.clone()),
107 "contained unsorted or duplicate field ids",
108 location!(),
109 ));
110 }
111 } else if self.fields.len() != self.column_indices.len() {
112 return Err(Error::corrupt_file(
113 base_path.child(self.path.clone()),
114 "contained an unequal number of fields / column_indices",
115 location!(),
116 ));
117 }
118 Ok(())
119 }
120}
121
122impl From<&DataFile> for pb::DataFile {
123 fn from(df: &DataFile) -> Self {
124 Self {
125 path: df.path.clone(),
126 fields: df.fields.clone(),
127 column_indices: df.column_indices.clone(),
128 file_major_version: df.file_major_version,
129 file_minor_version: df.file_minor_version,
130 }
131 }
132}
133
134impl TryFrom<pb::DataFile> for DataFile {
135 type Error = Error;
136
137 fn try_from(proto: pb::DataFile) -> Result<Self> {
138 Ok(Self {
139 path: proto.path,
140 fields: proto.fields,
141 column_indices: proto.column_indices,
142 file_major_version: proto.file_major_version,
143 file_minor_version: proto.file_minor_version,
144 })
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
149#[serde(rename_all = "lowercase")]
150pub enum DeletionFileType {
151 Array,
152 Bitmap,
153}
154
155impl DeletionFileType {
156 pub fn suffix(&self) -> &str {
158 match self {
159 Self::Array => "arrow",
160 Self::Bitmap => "bin",
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
166pub struct DeletionFile {
167 pub read_version: u64,
168 pub id: u64,
169 pub file_type: DeletionFileType,
170 pub num_deleted_rows: Option<usize>,
172}
173
174impl TryFrom<pb::DeletionFile> for DeletionFile {
175 type Error = Error;
176
177 fn try_from(value: pb::DeletionFile) -> Result<Self> {
178 let file_type = match value.file_type {
179 0 => DeletionFileType::Array,
180 1 => DeletionFileType::Bitmap,
181 _ => {
182 return Err(Error::NotSupported {
183 source: "Unknown deletion file type".into(),
184 location: location!(),
185 })
186 }
187 };
188 let num_deleted_rows = if value.num_deleted_rows == 0 {
189 None
190 } else {
191 Some(value.num_deleted_rows as usize)
192 };
193 Ok(Self {
194 read_version: value.read_version,
195 id: value.id,
196 file_type,
197 num_deleted_rows,
198 })
199 }
200}
201
202#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
204pub struct ExternalFile {
205 pub path: String,
206 pub offset: u64,
207 pub size: u64,
208}
209
210#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
212pub enum RowIdMeta {
213 Inline(Vec<u8>),
214 External(ExternalFile),
215}
216
217impl TryFrom<pb::data_fragment::RowIdSequence> for RowIdMeta {
218 type Error = Error;
219
220 fn try_from(value: pb::data_fragment::RowIdSequence) -> Result<Self> {
221 match value {
222 pb::data_fragment::RowIdSequence::InlineRowIds(data) => Ok(Self::Inline(data)),
223 pb::data_fragment::RowIdSequence::ExternalRowIds(file) => {
224 Ok(Self::External(ExternalFile {
225 path: file.path.clone(),
226 offset: file.offset,
227 size: file.size,
228 }))
229 }
230 }
231 }
232}
233
234#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
239pub struct Fragment {
240 pub id: u64,
242
243 pub files: Vec<DataFile>,
245
246 #[serde(skip_serializing_if = "Option::is_none")]
248 pub deletion_file: Option<DeletionFile>,
249
250 #[serde(skip_serializing_if = "Option::is_none")]
252 pub row_id_meta: Option<RowIdMeta>,
253
254 pub physical_rows: Option<usize>,
258}
259
260impl Fragment {
261 pub fn new(id: u64) -> Self {
262 Self {
263 id,
264 files: vec![],
265 deletion_file: None,
266 row_id_meta: None,
267 physical_rows: None,
268 }
269 }
270
271 pub fn num_rows(&self) -> Option<usize> {
272 match (self.physical_rows, &self.deletion_file) {
273 (Some(len), None) => Some(len),
275 (
277 Some(len),
278 Some(DeletionFile {
279 num_deleted_rows: Some(num_deleted_rows),
280 ..
281 }),
282 ) => Some(len - num_deleted_rows),
283 _ => None,
284 }
285 }
286
287 pub fn from_json(json: &str) -> Result<Self> {
288 let fragment: Self = serde_json::from_str(json)?;
289 Ok(fragment)
290 }
291
292 pub fn with_file_legacy(
294 id: u64,
295 path: &str,
296 schema: &Schema,
297 physical_rows: Option<usize>,
298 ) -> Self {
299 Self {
300 id,
301 files: vec![DataFile::new_legacy(path, schema)],
302 deletion_file: None,
303 physical_rows,
304 row_id_meta: None,
305 }
306 }
307
308 pub fn add_file(
309 &mut self,
310 path: impl Into<String>,
311 field_ids: Vec<i32>,
312 column_indices: Vec<i32>,
313 version: &LanceFileVersion,
314 ) {
315 let (major, minor) = version.to_numbers();
316 self.files
317 .push(DataFile::new(path, field_ids, column_indices, major, minor));
318 }
319
320 pub fn add_file_legacy(&mut self, path: &str, schema: &Schema) {
322 self.files.push(DataFile::new_legacy(path, schema));
323 }
324
325 pub fn has_legacy_files(&self) -> bool {
327 self.files[0].is_legacy_file()
329 }
330
331 pub fn try_infer_version(fragments: &[Self]) -> Result<Option<LanceFileVersion>> {
336 let Some(sample_file) = fragments
339 .iter()
340 .find(|f| !f.files.is_empty())
341 .map(|f| &f.files[0])
342 else {
343 return Ok(None);
344 };
345 let file_version = LanceFileVersion::try_from_major_minor(
346 sample_file.file_major_version,
347 sample_file.file_minor_version,
348 )?;
349 for frag in fragments {
351 for file in &frag.files {
352 let this_file_version = LanceFileVersion::try_from_major_minor(
353 file.file_major_version,
354 file.file_minor_version,
355 )?;
356 if file_version != this_file_version {
357 return Err(Error::invalid_input(
358 format!(
359 "All data files must have the same version. Detected both {} and {}",
360 file_version, this_file_version
361 ),
362 location!(),
363 ));
364 }
365 }
366 }
367 Ok(Some(file_version))
368 }
369}
370
371impl TryFrom<pb::DataFragment> for Fragment {
372 type Error = Error;
373
374 fn try_from(p: pb::DataFragment) -> Result<Self> {
375 let physical_rows = if p.physical_rows > 0 {
376 Some(p.physical_rows as usize)
377 } else {
378 None
379 };
380 Ok(Self {
381 id: p.id,
382 files: p
383 .files
384 .into_iter()
385 .map(DataFile::try_from)
386 .collect::<Result<_>>()?,
387 deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?,
388 row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?,
389 physical_rows,
390 })
391 }
392}
393
394impl From<&Fragment> for pb::DataFragment {
395 fn from(f: &Fragment) -> Self {
396 let deletion_file = f.deletion_file.as_ref().map(|f| {
397 let file_type = match f.file_type {
398 DeletionFileType::Array => pb::deletion_file::DeletionFileType::ArrowArray,
399 DeletionFileType::Bitmap => pb::deletion_file::DeletionFileType::Bitmap,
400 };
401 pb::DeletionFile {
402 read_version: f.read_version,
403 id: f.id,
404 file_type: file_type.into(),
405 num_deleted_rows: f.num_deleted_rows.unwrap_or_default() as u64,
406 }
407 });
408
409 let row_id_sequence = f.row_id_meta.as_ref().map(|m| match m {
410 RowIdMeta::Inline(data) => pb::data_fragment::RowIdSequence::InlineRowIds(data.clone()),
411 RowIdMeta::External(file) => {
412 pb::data_fragment::RowIdSequence::ExternalRowIds(pb::ExternalFile {
413 path: file.path.clone(),
414 offset: file.offset,
415 size: file.size,
416 })
417 }
418 });
419
420 Self {
421 id: f.id,
422 files: f.files.iter().map(pb::DataFile::from).collect(),
423 deletion_file,
424 row_id_sequence,
425 physical_rows: f.physical_rows.unwrap_or_default() as u64,
426 }
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use arrow_schema::{
434 DataType, Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema,
435 };
436 use serde_json::{json, Value};
437
438 #[test]
439 fn test_new_fragment() {
440 let path = "foobar.lance";
441
442 let arrow_schema = ArrowSchema::new(vec![
443 ArrowField::new(
444 "s",
445 DataType::Struct(ArrowFields::from(vec![
446 ArrowField::new("si", DataType::Int32, false),
447 ArrowField::new("sb", DataType::Binary, true),
448 ])),
449 true,
450 ),
451 ArrowField::new("bool", DataType::Boolean, true),
452 ]);
453 let schema = Schema::try_from(&arrow_schema).unwrap();
454 let fragment = Fragment::with_file_legacy(123, path, &schema, Some(10));
455
456 assert_eq!(123, fragment.id);
457 assert_eq!(
458 fragment.files,
459 vec![DataFile::new_legacy_from_fields(
460 path.to_string(),
461 vec![0, 1, 2, 3],
462 )]
463 )
464 }
465
466 #[test]
467 fn test_roundtrip_fragment() {
468 let mut fragment = Fragment::new(123);
469 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
470 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
471 fragment.deletion_file = Some(DeletionFile {
472 read_version: 123,
473 id: 456,
474 file_type: DeletionFileType::Array,
475 num_deleted_rows: Some(10),
476 });
477
478 let proto = pb::DataFragment::from(&fragment);
479 let fragment2 = Fragment::try_from(proto).unwrap();
480 assert_eq!(fragment, fragment2);
481
482 fragment.deletion_file = None;
483 let proto = pb::DataFragment::from(&fragment);
484 let fragment2 = Fragment::try_from(proto).unwrap();
485 assert_eq!(fragment, fragment2);
486 }
487
488 #[test]
489 fn test_to_json() {
490 let mut fragment = Fragment::new(123);
491 let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]);
492 fragment.add_file_legacy("foobar.lance", &Schema::try_from(&schema).unwrap());
493 fragment.deletion_file = Some(DeletionFile {
494 read_version: 123,
495 id: 456,
496 file_type: DeletionFileType::Array,
497 num_deleted_rows: Some(10),
498 });
499
500 let json = serde_json::to_string(&fragment).unwrap();
501
502 let value: Value = serde_json::from_str(&json).unwrap();
503 assert_eq!(
504 value,
505 json!({
506 "id": 123,
507 "files":[
508 {"path": "foobar.lance", "fields": [0], "column_indices": [], "file_major_version": MAJOR_VERSION, "file_minor_version": MINOR_VERSION}],
509 "deletion_file": {"read_version": 123, "id": 456, "file_type": "array",
510 "num_deleted_rows": 10},
511 "physical_rows": None::<usize>}),
512 );
513
514 let frag2 = Fragment::from_json(&json).unwrap();
515 assert_eq!(fragment, frag2);
516 }
517}