use std::collections::HashMap;
use std::sync::Arc;
use chrono::prelude::*;
use prost_types::Timestamp;
use super::Fragment;
use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::format::{pb, ProtoStruct};
use crate::utils::temporal::SystemTime;
#[derive(Debug, Clone, PartialEq)]
pub struct Manifest {
pub schema: Schema,
pub version: u64,
pub fragments: Arc<Vec<Fragment>>,
pub version_aux_data: usize,
pub index_section: Option<usize>,
pub timestamp_nanos: u128,
pub tag: Option<String>,
pub reader_feature_flags: u64,
pub writer_feature_flags: u64,
pub max_fragment_id: u32,
pub transaction_file: Option<String>,
}
impl Manifest {
pub fn new(schema: &Schema, fragments: Arc<Vec<Fragment>>) -> Self {
Self {
schema: schema.clone(),
version: 1,
fragments,
version_aux_data: 0,
index_section: None,
timestamp_nanos: 0,
tag: None,
reader_feature_flags: 0,
writer_feature_flags: 0,
max_fragment_id: 0,
transaction_file: None,
}
}
pub fn new_from_previous(
previous: &Self,
schema: &Schema,
fragments: Arc<Vec<Fragment>>,
) -> Self {
Self {
schema: schema.clone(),
version: previous.version + 1,
fragments,
version_aux_data: 0,
index_section: None, timestamp_nanos: 0, tag: None,
reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
transaction_file: None,
}
}
pub fn timestamp(&self) -> DateTime<Utc> {
let nanos = self.timestamp_nanos % 1_000_000_000;
let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
Utc.from_utc_datetime(
&NaiveDateTime::from_timestamp_opt(seconds, nanos as u32).unwrap_or(NaiveDateTime::MIN),
)
}
pub fn set_timestamp(&mut self, timestamp: Option<SystemTime>) {
let timestamp = timestamp.unwrap_or_else(SystemTime::now);
let nanos = timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos();
self.timestamp_nanos = nanos;
}
pub fn update_max_fragment_id(&mut self) {
let max_fragment_id = self
.fragments
.iter()
.map(|f| f.id)
.max()
.unwrap_or_default()
.try_into()
.unwrap();
if max_fragment_id > self.max_fragment_id {
self.max_fragment_id = max_fragment_id;
}
}
pub fn max_fragment_id(&self) -> Option<u64> {
if self.max_fragment_id == 0 {
self.fragments.iter().map(|f| f.id).max()
} else {
self.max_fragment_id.try_into().ok()
}
}
pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
if since.version >= self.version {
return Err(Error::IO {
message: format!(
"fragments_since: given version {} is newer than manifest version {}",
since.version, self.version
),
});
}
let start = since.max_fragment_id();
Ok(self
.fragments
.iter()
.filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
.cloned()
.collect())
}
}
impl ProtoStruct for Manifest {
type Proto = pb::Manifest;
}
impl From<pb::Manifest> for Manifest {
fn from(p: pb::Manifest) -> Self {
let timestamp_nanos = p.timestamp.map(|ts| {
let sec = ts.seconds as u128 * 1e9 as u128;
let nanos = ts.nanos as u128;
sec + nanos
});
Self {
schema: Schema::from((&p.fields, p.metadata)),
version: p.version,
fragments: Arc::new(p.fragments.iter().map(Fragment::from).collect()),
version_aux_data: p.version_aux_data as usize,
index_section: p.index_section.map(|i| i as usize),
timestamp_nanos: timestamp_nanos.unwrap_or(0),
tag: if p.tag.is_empty() { None } else { Some(p.tag) },
reader_feature_flags: p.reader_feature_flags,
writer_feature_flags: p.writer_feature_flags,
max_fragment_id: p.max_fragment_id,
transaction_file: if p.transaction_file.is_empty() {
None
} else {
Some(p.transaction_file)
},
}
}
}
impl From<&Manifest> for pb::Manifest {
fn from(m: &Manifest) -> Self {
let timestamp_nanos = if m.timestamp_nanos == 0 {
None
} else {
let nanos = m.timestamp_nanos % 1e9 as u128;
let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
Some(Timestamp {
seconds,
nanos: nanos as i32,
})
};
let (fields, metadata): (Vec<pb::Field>, HashMap<String, Vec<u8>>) = (&m.schema).into();
Self {
fields,
version: m.version,
fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
metadata,
version_aux_data: m.version_aux_data as u64,
index_section: m.index_section.map(|i| i as u64),
timestamp: timestamp_nanos,
tag: m.tag.clone().unwrap_or_default(),
reader_feature_flags: m.reader_feature_flags,
writer_feature_flags: m.writer_feature_flags,
max_fragment_id: m.max_fragment_id,
transaction_file: m.transaction_file.clone().unwrap_or_default(),
}
}
}