use std::collections::HashMap;
use std::sync::Arc;
use chrono::prelude::*;
use prost_types::Timestamp;
use super::Fragment;
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::format::{pb, ProtoStruct};
use snafu::{location, Location};
#[derive(Debug, Clone, PartialEq)]
pub struct Manifest {
pub schema: Schema,
pub version: u64,
pub writer_version: Option<WriterVersion>,
pub fragments: Arc<Vec<Fragment>>,
pub version_aux_data: usize,
pub index_section: Option<usize>,
pub timestamp_nanos: u128,
pub tag: Option<String>,
pub reader_feature_flags: u64,
pub writer_feature_flags: u64,
pub max_fragment_id: u32,
pub transaction_file: Option<String>,
}
impl Manifest {
pub fn new(schema: &Schema, fragments: Arc<Vec<Fragment>>) -> Self {
Self {
schema: schema.clone(),
version: 1,
writer_version: Some(WriterVersion::default()),
fragments,
version_aux_data: 0,
index_section: None,
timestamp_nanos: 0,
tag: None,
reader_feature_flags: 0,
writer_feature_flags: 0,
max_fragment_id: 0,
transaction_file: None,
}
}
pub fn new_from_previous(
previous: &Self,
schema: &Schema,
fragments: Arc<Vec<Fragment>>,
) -> Self {
Self {
schema: schema.clone(),
version: previous.version + 1,
writer_version: Some(WriterVersion::default()),
fragments,
version_aux_data: 0,
index_section: None, timestamp_nanos: 0, tag: None,
reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
transaction_file: None,
}
}
pub fn timestamp(&self) -> DateTime<Utc> {
let nanos = self.timestamp_nanos % 1_000_000_000;
let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
Utc.from_utc_datetime(
&NaiveDateTime::from_timestamp_opt(seconds, nanos as u32).unwrap_or(NaiveDateTime::MIN),
)
}
pub fn set_timestamp(&mut self, nanos: u128) {
self.timestamp_nanos = nanos;
}
pub fn update_max_fragment_id(&mut self) {
let max_fragment_id = self
.fragments
.iter()
.map(|f| f.id)
.max()
.unwrap_or_default()
.try_into()
.unwrap();
if max_fragment_id > self.max_fragment_id {
self.max_fragment_id = max_fragment_id;
}
}
pub fn max_fragment_id(&self) -> Option<u64> {
if self.max_fragment_id == 0 {
self.fragments.iter().map(|f| f.id).max()
} else {
self.max_fragment_id.try_into().ok()
}
}
pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
if since.version >= self.version {
return Err(Error::IO {
message: format!(
"fragments_since: given version {} is newer than manifest version {}",
since.version, self.version
),
location: location!(),
});
}
let start = since.max_fragment_id();
Ok(self
.fragments
.iter()
.filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
.cloned()
.collect())
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct WriterVersion {
pub library: String,
pub version: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VersionPart {
Major,
Minor,
Patch,
}
impl WriterVersion {
pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
let mut parts = self.version.split('.');
let major = parts.next().unwrap_or("0").parse().ok()?;
let minor = parts.next().unwrap_or("0").parse().ok()?;
let patch = parts.next().unwrap_or("0").parse().ok()?;
let tag = parts.next();
Some((major, minor, patch, tag))
}
pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
self.semver()
.unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
}
pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
let version = self.semver_or_panic();
(version.0, version.1, version.2) < (major, minor, patch)
}
pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
let parts = self.semver_or_panic();
let tag = if keep_tag { parts.3 } else { None };
let new_parts = match part {
VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
};
let new_version = if let Some(tag) = tag {
format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
} else {
format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
};
Self {
library: self.library.clone(),
version: new_version,
}
}
}
impl Default for WriterVersion {
#[cfg(not(test))]
fn default() -> Self {
Self {
library: "lance".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}
#[cfg(test)]
fn default() -> Self {
Self {
library: "lance".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
.bump(VersionPart::Patch, true)
}
}
impl ProtoStruct for Manifest {
type Proto = pb::Manifest;
}
impl From<pb::Manifest> for Manifest {
fn from(p: pb::Manifest) -> Self {
let timestamp_nanos = p.timestamp.map(|ts| {
let sec = ts.seconds as u128 * 1e9 as u128;
let nanos = ts.nanos as u128;
sec + nanos
});
let writer_version = match p.writer_version {
Some(pb::manifest::WriterVersion { library, version }) => {
Some(WriterVersion { library, version })
}
_ => None,
};
Self {
schema: Schema::from((&p.fields, p.metadata)),
version: p.version,
writer_version,
fragments: Arc::new(p.fragments.iter().map(Fragment::from).collect()),
version_aux_data: p.version_aux_data as usize,
index_section: p.index_section.map(|i| i as usize),
timestamp_nanos: timestamp_nanos.unwrap_or(0),
tag: if p.tag.is_empty() { None } else { Some(p.tag) },
reader_feature_flags: p.reader_feature_flags,
writer_feature_flags: p.writer_feature_flags,
max_fragment_id: p.max_fragment_id,
transaction_file: if p.transaction_file.is_empty() {
None
} else {
Some(p.transaction_file)
},
}
}
}
impl From<&Manifest> for pb::Manifest {
fn from(m: &Manifest) -> Self {
let timestamp_nanos = if m.timestamp_nanos == 0 {
None
} else {
let nanos = m.timestamp_nanos % 1e9 as u128;
let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
Some(Timestamp {
seconds,
nanos: nanos as i32,
})
};
let (fields, metadata): (Vec<pb::Field>, HashMap<String, Vec<u8>>) = (&m.schema).into();
Self {
fields,
version: m.version,
writer_version: m
.writer_version
.as_ref()
.map(|wv| pb::manifest::WriterVersion {
library: wv.library.clone(),
version: wv.version.clone(),
}),
fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
metadata,
version_aux_data: m.version_aux_data as u64,
index_section: m.index_section.map(|i| i as u64),
timestamp: timestamp_nanos,
tag: m.tag.clone().unwrap_or_default(),
reader_feature_flags: m.reader_feature_flags,
writer_feature_flags: m.writer_feature_flags,
max_fragment_id: m.max_fragment_id,
transaction_file: m.transaction_file.clone().unwrap_or_default(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_writer_version() {
let wv = WriterVersion::default();
assert_eq!(wv.library, "lance");
let parts = wv.semver().unwrap();
assert_eq!(
parts,
(
env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
None
)
);
assert_eq!(
format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
env!("CARGO_PKG_VERSION")
);
for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
let bumped = wv.bump(*part, false);
let bumped_parts = bumped.semver_or_panic();
assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
}
}
}