use async_trait::async_trait;
use chrono::prelude::*;
use deepsize::DeepSizeOf;
use lance_file::datatypes::{Fields, FieldsWithMeta, populate_schema_dictionary};
use lance_file::previous::reader::FileReader as PreviousFileReader;
use lance_file::version::{LEGACY_FORMAT_VERSION, LanceFileVersion};
use lance_io::traits::{ProtoStruct, Reader};
use object_store::path::Path;
use prost::Message;
use prost_types::Timestamp;
use std::collections::{BTreeMap, HashMap};
use std::ops::Range;
use std::sync::Arc;
use super::Fragment;
use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag};
use crate::format::pb;
use lance_core::cache::LanceCache;
use lance_core::datatypes::Schema;
use lance_core::{Error, Result};
use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
use lance_io::utils::read_struct;
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct Manifest {
pub schema: Schema,
pub version: u64,
pub branch: Option<String>,
pub writer_version: Option<WriterVersion>,
pub fragments: Arc<Vec<Fragment>>,
pub version_aux_data: usize,
pub index_section: Option<usize>,
pub timestamp_nanos: u128,
pub tag: Option<String>,
pub reader_feature_flags: u64,
pub writer_feature_flags: u64,
pub max_fragment_id: Option<u32>,
pub transaction_file: Option<String>,
pub transaction_section: Option<usize>,
fragment_offsets: Vec<usize>,
pub next_row_id: u64,
pub data_storage_format: DataStorageFormat,
pub config: HashMap<String, String>,
pub table_metadata: HashMap<String, String>,
pub base_paths: HashMap<u32, BasePath>,
}
pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
pub fn is_detached_version(version: u64) -> bool {
version & DETACHED_VERSION_MASK != 0
}
fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
fragments
.iter()
.map(|f| f.num_rows().unwrap_or_default())
.chain([0]) .scan(0_usize, |offset, len| {
let start = *offset;
*offset += len;
Some(start)
})
.collect()
}
#[derive(Default)]
pub struct ManifestSummary {
pub total_fragments: u64,
pub total_data_files: u64,
pub total_files_size: u64,
pub total_deletion_files: u64,
pub total_data_file_rows: u64,
pub total_deletion_file_rows: u64,
pub total_rows: u64,
}
impl From<ManifestSummary> for BTreeMap<String, String> {
fn from(summary: ManifestSummary) -> Self {
let mut stats_map = Self::new();
stats_map.insert(
"total_fragments".to_string(),
summary.total_fragments.to_string(),
);
stats_map.insert(
"total_data_files".to_string(),
summary.total_data_files.to_string(),
);
stats_map.insert(
"total_files_size".to_string(),
summary.total_files_size.to_string(),
);
stats_map.insert(
"total_deletion_files".to_string(),
summary.total_deletion_files.to_string(),
);
stats_map.insert(
"total_data_file_rows".to_string(),
summary.total_data_file_rows.to_string(),
);
stats_map.insert(
"total_deletion_file_rows".to_string(),
summary.total_deletion_file_rows.to_string(),
);
stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
stats_map
}
}
impl Manifest {
pub fn new(
schema: Schema,
fragments: Arc<Vec<Fragment>>,
data_storage_format: DataStorageFormat,
base_paths: HashMap<u32, BasePath>,
) -> Self {
let fragment_offsets = compute_fragment_offsets(&fragments);
Self {
schema,
version: 1,
branch: None,
writer_version: Some(WriterVersion::default()),
fragments,
version_aux_data: 0,
index_section: None,
timestamp_nanos: 0,
tag: None,
reader_feature_flags: 0,
writer_feature_flags: 0,
max_fragment_id: None,
transaction_file: None,
transaction_section: None,
fragment_offsets,
next_row_id: 0,
data_storage_format,
config: HashMap::new(),
table_metadata: HashMap::new(),
base_paths,
}
}
pub fn new_from_previous(
previous: &Self,
schema: Schema,
fragments: Arc<Vec<Fragment>>,
) -> Self {
let fragment_offsets = compute_fragment_offsets(&fragments);
Self {
schema,
version: previous.version + 1,
branch: previous.branch.clone(),
writer_version: Some(WriterVersion::default()),
fragments,
version_aux_data: 0,
index_section: None, timestamp_nanos: 0, tag: None,
reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
transaction_file: None,
transaction_section: None,
fragment_offsets,
next_row_id: previous.next_row_id,
data_storage_format: previous.data_storage_format.clone(),
config: previous.config.clone(),
table_metadata: previous.table_metadata.clone(),
base_paths: previous.base_paths.clone(),
}
}
pub fn shallow_clone(
&self,
ref_name: Option<String>,
ref_path: String,
ref_base_id: u32,
branch_name: Option<String>,
transaction_file: String,
) -> Self {
let cloned_fragments = self
.fragments
.as_ref()
.iter()
.map(|fragment| {
let mut cloned_fragment = fragment.clone();
for file in &mut cloned_fragment.files {
if file.base_id.is_none() {
file.base_id = Some(ref_base_id);
}
}
if let Some(deletion) = &mut cloned_fragment.deletion_file
&& deletion.base_id.is_none()
{
deletion.base_id = Some(ref_base_id);
}
cloned_fragment
})
.collect::<Vec<_>>();
Self {
schema: self.schema.clone(),
version: self.version,
branch: branch_name,
writer_version: self.writer_version.clone(),
fragments: Arc::new(cloned_fragments),
version_aux_data: self.version_aux_data,
index_section: None, timestamp_nanos: self.timestamp_nanos,
tag: None,
reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
transaction_file: Some(transaction_file),
transaction_section: None,
fragment_offsets: self.fragment_offsets.clone(),
next_row_id: self.next_row_id,
data_storage_format: self.data_storage_format.clone(),
config: self.config.clone(),
base_paths: {
let mut base_paths = self.base_paths.clone();
let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
base_paths.insert(ref_base_id, base_path);
base_paths
},
table_metadata: self.table_metadata.clone(),
}
}
pub fn timestamp(&self) -> DateTime<Utc> {
let nanos = self.timestamp_nanos % 1_000_000_000;
let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
Utc.from_utc_datetime(
&DateTime::from_timestamp(seconds, nanos as u32)
.unwrap_or_default()
.naive_utc(),
)
}
pub fn set_timestamp(&mut self, nanos: u128) {
self.timestamp_nanos = nanos;
}
pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.config
}
pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.table_metadata
}
pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
&mut self.schema.metadata
}
pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
self.schema
.field_by_id_mut(field_id)
.map(|field| &mut field.metadata)
}
#[deprecated(note = "Use config_mut() for direct access to config HashMap")]
pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
self.config.extend(upsert_values);
}
#[deprecated(note = "Use config_mut() for direct access to config HashMap")]
pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
self.config
.retain(|key, _| !delete_keys.contains(&key.as_str()));
}
#[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
self.schema.metadata = new_metadata;
}
#[deprecated(
note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
)]
pub fn replace_field_metadata(
&mut self,
field_id: i32,
new_metadata: HashMap<String, String>,
) -> Result<()> {
if let Some(field) = self.schema.field_by_id_mut(field_id) {
field.metadata = new_metadata;
Ok(())
} else {
Err(Error::invalid_input(format!(
"Field with id {} does not exist for replace_field_metadata",
field_id
)))
}
}
pub fn update_max_fragment_id(&mut self) {
if self.fragments.is_empty() {
return;
}
let max_fragment_id = self
.fragments
.iter()
.map(|f| f.id)
.max()
.unwrap() .try_into()
.unwrap();
match self.max_fragment_id {
None => {
self.max_fragment_id = Some(max_fragment_id);
}
Some(current_max) => {
if max_fragment_id > current_max {
self.max_fragment_id = Some(max_fragment_id);
}
}
}
}
pub fn max_fragment_id(&self) -> Option<u64> {
if let Some(max_id) = self.max_fragment_id {
Some(max_id.into())
} else {
self.fragments.iter().map(|f| f.id).max()
}
}
pub fn max_field_id(&self) -> i32 {
let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
let fragment_max_id = self
.fragments
.iter()
.flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
.max()
.copied();
let fragment_max_id = fragment_max_id.unwrap_or(-1);
schema_max_id.max(fragment_max_id)
}
pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
if since.version >= self.version {
return Err(Error::invalid_input(format!(
"fragments_since: given version {} is newer than manifest version {}",
since.version, self.version
)));
}
let start = since.max_fragment_id();
Ok(self
.fragments
.iter()
.filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
.cloned()
.collect())
}
pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
let start = range.start;
let end = range.end;
let idx = self
.fragment_offsets
.binary_search(&start)
.unwrap_or_else(|idx| idx - 1);
let mut fragments = vec![];
for i in idx..self.fragments.len() {
if self.fragment_offsets[i] >= end
|| self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
<= start
{
break;
}
fragments.push((self.fragment_offsets[i], &self.fragments[i]));
}
fragments
}
pub fn uses_stable_row_ids(&self) -> bool {
self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
}
pub fn serialized(&self) -> Vec<u8> {
let pb_manifest: pb::Manifest = self.into();
pb_manifest.encode_to_vec()
}
pub fn should_use_legacy_format(&self) -> bool {
self.data_storage_format.version == LEGACY_FORMAT_VERSION
}
pub fn summary(&self) -> ManifestSummary {
let mut summary =
self.fragments
.iter()
.fold(ManifestSummary::default(), |mut summary, f| {
summary.total_data_files += f.files.len() as u64;
if let Some(num_rows) = f.num_rows() {
summary.total_rows += num_rows as u64;
}
for data_file in &f.files {
if let Some(size_bytes) = data_file.file_size_bytes.get() {
summary.total_files_size += size_bytes.get();
}
}
if f.deletion_file.is_some() {
summary.total_deletion_files += 1;
}
if let Some(deletion_file) = &f.deletion_file
&& let Some(num_deleted) = deletion_file.num_deleted_rows
{
summary.total_deletion_file_rows += num_deleted as u64;
}
summary
});
summary.total_fragments = self.fragments.len() as u64;
summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
summary
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct BasePath {
pub id: u32,
pub name: Option<String>,
pub is_dataset_root: bool,
pub path: String,
}
impl BasePath {
pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
Self {
id,
name,
is_dataset_root,
path,
}
}
pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
ObjectStore::extract_path_from_uri(registry, &self.path)
}
}
impl DeepSizeOf for BasePath {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
self.name.deep_size_of_children(context)
+ self.path.deep_size_of_children(context) * 2
+ size_of::<bool>()
}
}
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct WriterVersion {
pub library: String,
pub version: String,
pub prerelease: Option<String>,
pub build_metadata: Option<String>,
}
#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
pub struct DataStorageFormat {
pub file_format: String,
pub version: String,
}
const LANCE_FORMAT_NAME: &str = "lance";
impl DataStorageFormat {
pub fn new(version: LanceFileVersion) -> Self {
Self {
file_format: LANCE_FORMAT_NAME.to_string(),
version: version.resolve().to_string(),
}
}
pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
self.version.parse::<LanceFileVersion>()
}
}
impl Default for DataStorageFormat {
fn default() -> Self {
Self::new(LanceFileVersion::default())
}
}
impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
fn from(pb: pb::manifest::DataStorageFormat) -> Self {
Self {
file_format: pb.file_format,
version: pb.version,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VersionPart {
Major,
Minor,
Patch,
}
fn bump_version(version: &mut semver::Version, part: VersionPart) {
match part {
VersionPart::Major => {
version.major += 1;
version.minor = 0;
version.patch = 0;
}
VersionPart::Minor => {
version.minor += 1;
version.patch = 0;
}
VersionPart::Patch => {
version.patch += 1;
}
}
}
impl WriterVersion {
fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
let mut parsed = semver::Version::parse(full_version).ok()?;
let prerelease = if parsed.pre.is_empty() {
None
} else {
Some(parsed.pre.to_string())
};
let build_metadata = if parsed.build.is_empty() {
None
} else {
Some(parsed.build.to_string())
};
parsed.pre = semver::Prerelease::EMPTY;
parsed.build = semver::BuildMetadata::EMPTY;
Some((parsed.to_string(), prerelease, build_metadata))
}
#[deprecated(note = "Use `lance_lib_version()` instead")]
pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
(
&self.version[..dash_idx],
Some(&self.version[dash_idx + 1..]),
)
} else {
(self.version.as_str(), None)
};
let mut parts = version_part.split('.');
let major = parts.next().unwrap_or("0").parse().ok()?;
let minor = parts.next().unwrap_or("0").parse().ok()?;
let patch = parts.next().unwrap_or("0").parse().ok()?;
Some((major, minor, patch, tag))
}
pub fn lance_lib_version(&self) -> Option<semver::Version> {
if self.library != "lance" {
return None;
}
let mut version = semver::Version::parse(&self.version).ok()?;
if let Some(ref prerelease) = self.prerelease {
version.pre = semver::Prerelease::new(prerelease).ok()?;
}
if let Some(ref build_metadata) = self.build_metadata {
version.build = semver::BuildMetadata::new(build_metadata).ok()?;
}
Some(version)
}
#[deprecated(
note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
)]
#[allow(deprecated)]
pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
self.semver()
.unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
}
#[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
let version = self
.lance_lib_version()
.expect("Not lance library or invalid version");
let other = semver::Version {
major: major.into(),
minor: minor.into(),
patch: patch.into(),
pre: semver::Prerelease::EMPTY,
build: semver::BuildMetadata::EMPTY,
};
version < other
}
#[deprecated(note = "This is meant for testing and will be made private in future version.")]
pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
let mut version = self.lance_lib_version().expect("Should be lance version");
bump_version(&mut version, part);
if !keep_tag {
version.pre = semver::Prerelease::EMPTY;
}
let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
.expect("Bumped version should be valid semver");
Self {
library: self.library.clone(),
version: clean_version,
prerelease,
build_metadata,
}
}
}
impl Default for WriterVersion {
#[cfg(not(test))]
fn default() -> Self {
let full_version = env!("CARGO_PKG_VERSION");
let (version, prerelease, build_metadata) =
Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
Self {
library: "lance".to_string(),
version,
prerelease,
build_metadata,
}
}
#[cfg(test)]
#[allow(deprecated)]
fn default() -> Self {
let full_version = env!("CARGO_PKG_VERSION");
let (version, prerelease, build_metadata) =
Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
Self {
library: "lance".to_string(),
version,
prerelease,
build_metadata,
}
.bump(VersionPart::Patch, true)
}
}
impl ProtoStruct for Manifest {
type Proto = pb::Manifest;
}
impl From<pb::BasePath> for BasePath {
fn from(p: pb::BasePath) -> Self {
Self::new(p.id, p.path, p.name, p.is_dataset_root)
}
}
impl From<BasePath> for pb::BasePath {
fn from(p: BasePath) -> Self {
Self {
id: p.id,
name: p.name,
is_dataset_root: p.is_dataset_root,
path: p.path,
}
}
}
impl TryFrom<pb::Manifest> for Manifest {
type Error = Error;
fn try_from(p: pb::Manifest) -> Result<Self> {
let timestamp_nanos = p.timestamp.map(|ts| {
let sec = ts.seconds as u128 * 1e9 as u128;
let nanos = ts.nanos as u128;
sec + nanos
});
let writer_version = match p.writer_version {
Some(pb::manifest::WriterVersion {
library,
version,
prerelease,
build_metadata,
}) => Some(WriterVersion {
library,
version,
prerelease,
build_metadata,
}),
_ => None,
};
let fragments = Arc::new(
p.fragments
.into_iter()
.map(Fragment::try_from)
.collect::<Result<Vec<_>>>()?,
);
let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
let fields_with_meta = FieldsWithMeta {
fields: Fields(p.fields),
metadata: p.schema_metadata,
};
if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
&& !fragments.iter().all(|frag| frag.row_id_meta.is_some())
{
return Err(Error::internal("All fragments must have row ids"));
}
let data_storage_format = match p.data_format {
None => {
if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
DataStorageFormat::new(inferred_version)
} else {
if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
DataStorageFormat::new(LanceFileVersion::Stable)
} else {
DataStorageFormat::new(LanceFileVersion::Legacy)
}
}
}
Some(format) => DataStorageFormat::from(format),
};
let schema = Schema::from(fields_with_meta);
Ok(Self {
schema,
version: p.version,
branch: p.branch,
writer_version,
version_aux_data: p.version_aux_data as usize,
index_section: p.index_section.map(|i| i as usize),
timestamp_nanos: timestamp_nanos.unwrap_or(0),
tag: if p.tag.is_empty() { None } else { Some(p.tag) },
reader_feature_flags: p.reader_feature_flags,
writer_feature_flags: p.writer_feature_flags,
max_fragment_id: p.max_fragment_id,
fragments,
transaction_file: if p.transaction_file.is_empty() {
None
} else {
Some(p.transaction_file)
},
transaction_section: p.transaction_section.map(|i| i as usize),
fragment_offsets,
next_row_id: p.next_row_id,
data_storage_format,
config: p.config,
table_metadata: p.table_metadata,
base_paths: p
.base_paths
.iter()
.map(|item| (item.id, item.clone().into()))
.collect(),
})
}
}
impl From<&Manifest> for pb::Manifest {
fn from(m: &Manifest) -> Self {
let timestamp_nanos = if m.timestamp_nanos == 0 {
None
} else {
let nanos = m.timestamp_nanos % 1e9 as u128;
let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
Some(Timestamp {
seconds,
nanos: nanos as i32,
})
};
let fields_with_meta: FieldsWithMeta = (&m.schema).into();
Self {
fields: fields_with_meta.fields.0,
schema_metadata: m
.schema
.metadata
.iter()
.map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
.collect(),
version: m.version,
branch: m.branch.clone(),
writer_version: m
.writer_version
.as_ref()
.map(|wv| pb::manifest::WriterVersion {
library: wv.library.clone(),
version: wv.version.clone(),
prerelease: wv.prerelease.clone(),
build_metadata: wv.build_metadata.clone(),
}),
fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
table_metadata: m.table_metadata.clone(),
version_aux_data: m.version_aux_data as u64,
index_section: m.index_section.map(|i| i as u64),
timestamp: timestamp_nanos,
tag: m.tag.clone().unwrap_or_default(),
reader_feature_flags: m.reader_feature_flags,
writer_feature_flags: m.writer_feature_flags,
max_fragment_id: m.max_fragment_id,
transaction_file: m.transaction_file.clone().unwrap_or_default(),
next_row_id: m.next_row_id,
data_format: Some(pb::manifest::DataStorageFormat {
file_format: m.data_storage_format.file_format.clone(),
version: m.data_storage_format.version.clone(),
}),
config: m.config.clone(),
base_paths: m
.base_paths
.values()
.map(|base_path| pb::BasePath {
id: base_path.id,
name: base_path.name.clone(),
is_dataset_root: base_path.is_dataset_root,
path: base_path.path.clone(),
})
.collect(),
transaction_section: m.transaction_section.map(|i| i as u64),
}
}
}
#[async_trait]
pub trait SelfDescribingFileReader {
async fn try_new_self_described(
object_store: &ObjectStore,
path: &Path,
cache: Option<&LanceCache>,
) -> Result<Self>
where
Self: Sized,
{
let reader = object_store.open(path).await?;
Self::try_new_self_described_from_reader(reader.into(), cache).await
}
async fn try_new_self_described_from_reader(
reader: Arc<dyn Reader>,
cache: Option<&LanceCache>,
) -> Result<Self>
where
Self: Sized;
}
#[async_trait]
impl SelfDescribingFileReader for PreviousFileReader {
async fn try_new_self_described_from_reader(
reader: Arc<dyn Reader>,
cache: Option<&LanceCache>,
) -> Result<Self> {
let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
let manifest_position = metadata.manifest_position.ok_or(Error::internal(format!(
"Attempt to open file at {} as self-describing but it did not contain a manifest",
reader.path(),
)))?;
let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
if manifest.should_use_legacy_format() {
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
}
let schema = manifest.schema;
let max_field_id = schema.max_field_id().unwrap_or_default();
Self::try_new_from_reader(
reader.path(),
reader.clone(),
Some(metadata),
schema,
0,
0,
max_field_id,
cache,
)
.await
}
}
#[cfg(test)]
mod tests {
use crate::format::{DataFile, DeletionFile, DeletionFileType};
use std::num::NonZero;
use super::*;
use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
use lance_core::datatypes::Field;
#[test]
fn test_writer_version() {
let wv = WriterVersion::default();
assert_eq!(wv.library, "lance");
let cargo_version = env!("CARGO_PKG_VERSION");
let expected_tag = if cargo_version.contains('-') {
Some(cargo_version.split('-').nth(1).unwrap())
} else {
None
};
let version_parts: Vec<&str> = wv.version.split('.').collect();
assert_eq!(
version_parts.len(),
3,
"Version should be major.minor.patch"
);
assert!(
!wv.version.contains('-'),
"Version field should not contain prerelease"
);
assert_eq!(wv.prerelease.as_deref(), expected_tag);
assert_eq!(wv.build_metadata, None);
let version = wv.lance_lib_version().unwrap();
assert_eq!(
version.major,
env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
);
assert_eq!(
version.minor,
env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
);
assert_eq!(
version.patch,
env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
);
assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
let mut bumped_version = version.clone();
bump_version(&mut bumped_version, *part);
assert!(version < bumped_version);
}
}
#[test]
fn test_writer_version_split() {
let (version, prerelease, build_metadata) =
WriterVersion::split_version("2.0.0-rc.1").unwrap();
assert_eq!(version, "2.0.0");
assert_eq!(prerelease, Some("rc.1".to_string()));
assert_eq!(build_metadata, None);
let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
assert_eq!(version, "2.0.0");
assert_eq!(prerelease, None);
assert_eq!(build_metadata, None);
let (version, prerelease, build_metadata) =
WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
assert_eq!(version, "2.0.0");
assert_eq!(prerelease, Some("rc.1".to_string()));
assert_eq!(build_metadata, Some("build.123".to_string()));
let (version, prerelease, build_metadata) =
WriterVersion::split_version("2.0.0+build.123").unwrap();
assert_eq!(version, "2.0.0");
assert_eq!(prerelease, None);
assert_eq!(build_metadata, Some("build.123".to_string()));
assert!(WriterVersion::split_version("not-a-version").is_none());
}
#[test]
fn test_writer_version_comparison_with_prerelease() {
let v1 = WriterVersion {
library: "lance".to_string(),
version: "2.0.0".to_string(),
prerelease: Some("rc.1".to_string()),
build_metadata: None,
};
let v2 = WriterVersion {
library: "lance".to_string(),
version: "2.0.0".to_string(),
prerelease: None,
build_metadata: None,
};
let semver1 = v1.lance_lib_version().unwrap();
let semver2 = v2.lance_lib_version().unwrap();
assert!(semver1 < semver2);
}
#[test]
fn test_writer_version_with_build_metadata() {
let v = WriterVersion {
library: "lance".to_string(),
version: "2.0.0".to_string(),
prerelease: Some("rc.1".to_string()),
build_metadata: Some("build.123".to_string()),
};
let semver = v.lance_lib_version().unwrap();
assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
assert_eq!(semver.major, 2);
assert_eq!(semver.minor, 0);
assert_eq!(semver.patch, 0);
assert_eq!(semver.pre.as_str(), "rc.1");
assert_eq!(semver.build.as_str(), "build.123");
}
#[test]
fn test_writer_version_non_semver() {
let v = WriterVersion {
library: "lance".to_string(),
version: "custom-build-v1".to_string(),
prerelease: None,
build_metadata: None,
};
assert!(v.lance_lib_version().is_none());
assert_eq!(v.library, "lance");
assert_eq!(v.version, "custom-build-v1");
}
#[test]
#[allow(deprecated)]
fn test_older_than_with_prerelease() {
let v_rc = WriterVersion {
library: "lance".to_string(),
version: "2.0.0".to_string(),
prerelease: Some("rc.1".to_string()),
build_metadata: None,
};
assert!(v_rc.older_than(2, 0, 0));
assert!(v_rc.older_than(2, 0, 1));
assert!(!v_rc.older_than(1, 9, 9));
let v_release = WriterVersion {
library: "lance".to_string(),
version: "2.0.0".to_string(),
prerelease: None,
build_metadata: None,
};
assert!(!v_release.older_than(2, 0, 0));
assert!(v_release.older_than(2, 0, 1));
}
#[test]
fn test_fragments_by_offset_range() {
let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
"a",
arrow_schema::DataType::Int64,
false,
)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let fragments = vec![
Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
];
let manifest = Manifest::new(
schema,
Arc::new(fragments),
DataStorageFormat::default(),
HashMap::new(),
);
let actual = manifest.fragments_by_offset_range(0..10);
assert_eq!(actual.len(), 1);
assert_eq!(actual[0].0, 0);
assert_eq!(actual[0].1.id, 0);
let actual = manifest.fragments_by_offset_range(5..15);
assert_eq!(actual.len(), 2);
assert_eq!(actual[0].0, 0);
assert_eq!(actual[0].1.id, 0);
assert_eq!(actual[1].0, 10);
assert_eq!(actual[1].1.id, 1);
let actual = manifest.fragments_by_offset_range(15..50);
assert_eq!(actual.len(), 2);
assert_eq!(actual[0].0, 10);
assert_eq!(actual[0].1.id, 1);
assert_eq!(actual[1].0, 25);
assert_eq!(actual[1].1.id, 2);
let actual = manifest.fragments_by_offset_range(45..100);
assert!(actual.is_empty());
assert!(manifest.fragments_by_offset_range(200..400).is_empty());
}
#[test]
fn test_max_field_id() {
let mut field0 =
Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
field0.set_id(-1, &mut 0);
let mut field2 =
Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
field2.set_id(-1, &mut 2);
let schema = Schema {
fields: vec![field0, field2],
metadata: Default::default(),
};
let fragments = vec![
Fragment {
id: 0,
files: vec![DataFile::new_legacy_from_fields(
"path1",
vec![0, 1, 2],
None,
)],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
created_at_version_meta: None,
last_updated_at_version_meta: None,
},
Fragment {
id: 1,
files: vec![
DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
DataFile::new_legacy_from_fields("path3", vec![2], None),
],
deletion_file: None,
row_id_meta: None,
physical_rows: None,
created_at_version_meta: None,
last_updated_at_version_meta: None,
},
];
let manifest = Manifest::new(
schema,
Arc::new(fragments),
DataStorageFormat::default(),
HashMap::new(),
);
assert_eq!(manifest.max_field_id(), 43);
}
#[test]
fn test_config() {
let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
"a",
arrow_schema::DataType::Int64,
false,
)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let fragments = vec![
Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
];
let mut manifest = Manifest::new(
schema,
Arc::new(fragments),
DataStorageFormat::default(),
HashMap::new(),
);
let mut config = manifest.config.clone();
config.insert("lance.test".to_string(), "value".to_string());
config.insert("other-key".to_string(), "other-value".to_string());
manifest.config_mut().extend(config.clone());
assert_eq!(manifest.config, config.clone());
config.remove("other-key");
manifest.config_mut().remove("other-key");
assert_eq!(manifest.config, config);
}
#[test]
fn test_manifest_summary() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("id", arrow_schema::DataType::Int64, false),
ArrowField::new("name", arrow_schema::DataType::Utf8, true),
]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let empty_manifest = Manifest::new(
schema.clone(),
Arc::new(vec![]),
DataStorageFormat::default(),
HashMap::new(),
);
let empty_summary = empty_manifest.summary();
assert_eq!(empty_summary.total_rows, 0);
assert_eq!(empty_summary.total_files_size, 0);
assert_eq!(empty_summary.total_fragments, 0);
assert_eq!(empty_summary.total_data_files, 0);
assert_eq!(empty_summary.total_deletion_file_rows, 0);
assert_eq!(empty_summary.total_data_file_rows, 0);
assert_eq!(empty_summary.total_deletion_files, 0);
let empty_fragments = vec![
Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
];
let empty_files_manifest = Manifest::new(
schema.clone(),
Arc::new(empty_fragments),
DataStorageFormat::default(),
HashMap::new(),
);
let empty_files_summary = empty_files_manifest.summary();
assert_eq!(empty_files_summary.total_rows, 0);
assert_eq!(empty_files_summary.total_files_size, 0);
assert_eq!(empty_files_summary.total_fragments, 2);
assert_eq!(empty_files_summary.total_data_files, 2);
assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
assert_eq!(empty_files_summary.total_data_file_rows, 0);
assert_eq!(empty_files_summary.total_deletion_files, 0);
let real_fragments = vec![
Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
];
let real_data_manifest = Manifest::new(
schema.clone(),
Arc::new(real_fragments),
DataStorageFormat::default(),
HashMap::new(),
);
let real_data_summary = real_data_manifest.summary();
assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
assert_eq!(real_data_summary.total_data_files, 3);
assert_eq!(real_data_summary.total_deletion_file_rows, 0);
assert_eq!(real_data_summary.total_data_file_rows, 425);
assert_eq!(real_data_summary.total_deletion_files, 0);
let file_version = LanceFileVersion::default();
let mut fragment_with_deletion = Fragment::new(0)
.with_file(
"data_with_deletion.lance",
vec![0, 1],
vec![0, 1],
&file_version,
NonZero::new(1000),
)
.with_physical_rows(50);
fragment_with_deletion.deletion_file = Some(DeletionFile {
read_version: 123,
id: 456,
file_type: DeletionFileType::Array,
num_deleted_rows: Some(10),
base_id: None,
});
let manifest_with_deletion = Manifest::new(
schema,
Arc::new(vec![fragment_with_deletion]),
DataStorageFormat::default(),
HashMap::new(),
);
let deletion_summary = manifest_with_deletion.summary();
assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
assert_eq!(deletion_summary.total_fragments, 1);
assert_eq!(deletion_summary.total_data_files, 1);
assert_eq!(deletion_summary.total_deletion_file_rows, 10);
assert_eq!(deletion_summary.total_data_file_rows, 50);
assert_eq!(deletion_summary.total_deletion_files, 1);
let stats_map: BTreeMap<String, String> = deletion_summary.into();
assert_eq!(stats_map.len(), 7)
}
}