use std::collections::HashSet;
use std::io::{Cursor, Read, Write, copy};
use std::mem::size_of;
use std::time::SystemTime;
use bytes::Bytes;
use futures::AsyncRead;
use futures_util::io::AsyncReadExt;
use itertools::Itertools;
use more_asserts::debug_assert_lt;
use super::file_structs::{FileDataSequenceHeader, MDBFileInfoView};
use super::shard_file::{MDB_FILE_INFO_ENTRY_SIZE, current_timestamp};
use super::xorb_structs::{MDBXorbInfoView, XorbChunkSequenceEntry, XorbChunkSequenceHeader};
use super::{MDBShardFileFooter, MDBShardFileHeader};
use crate::MerkleHashMap;
use crate::error::{CoreError, Result};
use crate::merklehash::MerkleHash;
pub fn process_shard_file_info_section<R: Read, FileFunc>(reader: &mut R, mut file_callback: FileFunc) -> Result<()>
where
FileFunc: FnMut(MDBFileInfoView) -> Result<()>,
{
loop {
let header = FileDataSequenceHeader::deserialize(reader)?;
if header.is_bookend() {
break;
}
let n = header.num_entries as usize;
let mut n_entries = n;
if header.contains_verification() {
n_entries += n;
}
if header.contains_metadata_ext() {
n_entries += 1;
}
let n_bytes = n_entries * MDB_FILE_INFO_ENTRY_SIZE;
let mut file_data = Vec::with_capacity(size_of::<FileDataSequenceHeader>() + n_bytes);
header.serialize(&mut file_data)?;
copy(&mut reader.take(n_bytes as u64), &mut file_data)?;
file_callback(MDBFileInfoView::from_data_and_header(header, Bytes::from(file_data))?)?;
}
Ok(())
}
pub fn process_shard_xorb_info_section<R: Read, XorbFunc>(reader: &mut R, mut xorb_callback: XorbFunc) -> Result<()>
where
XorbFunc: FnMut(MDBXorbInfoView) -> Result<()>,
{
loop {
let header = XorbChunkSequenceHeader::deserialize(reader)?;
if header.is_bookend() {
break;
}
let n_bytes = (header.num_entries as usize) * size_of::<XorbChunkSequenceEntry>();
let mut xorb_data = Vec::with_capacity(size_of::<XorbChunkSequenceHeader>() + n_bytes);
header.serialize(&mut xorb_data)?;
copy(&mut reader.take(n_bytes as u64), &mut xorb_data)?;
xorb_callback(MDBXorbInfoView::from_data_and_header(header, Bytes::from(xorb_data))?)?;
}
Ok(())
}
pub async fn process_shard_file_info_section_async<R: AsyncRead + Unpin, FileFunc>(
reader: &mut R,
mut file_callback: FileFunc,
) -> Result<()>
where
FileFunc: FnMut(MDBFileInfoView) -> Result<()>,
{
loop {
let mut header_buf = [0u8; size_of::<FileDataSequenceHeader>()];
reader.read_exact(&mut header_buf).await?;
let header = FileDataSequenceHeader::deserialize(&mut Cursor::new(&header_buf[..]))?;
if header.is_bookend() {
break;
}
let n = header.num_entries as usize;
let mut n_entries = n;
if header.contains_verification() {
n_entries += n;
}
if header.contains_metadata_ext() {
n_entries += 1;
}
let n_bytes = n_entries * MDB_FILE_INFO_ENTRY_SIZE;
let total_len = size_of::<FileDataSequenceHeader>() + n_bytes;
let mut file_data = Vec::with_capacity(total_len);
file_data.extend_from_slice(&header_buf); file_data.resize(total_len, 0);
reader.read_exact(&mut file_data[size_of::<FileDataSequenceHeader>()..]).await?;
file_callback(MDBFileInfoView::from_data_and_header(header, Bytes::from(file_data))?)?;
}
Ok(())
}
pub async fn process_shard_xorb_info_section_async<R: AsyncRead + Unpin, XorbFunc>(
reader: &mut R,
mut xorb_callback: XorbFunc,
) -> Result<()>
where
XorbFunc: FnMut(MDBXorbInfoView) -> Result<()>,
{
loop {
let mut header_buf = [0u8; size_of::<XorbChunkSequenceHeader>()];
reader.read_exact(&mut header_buf).await?;
let header = XorbChunkSequenceHeader::deserialize(&mut Cursor::new(&header_buf[..]))?;
if header.is_bookend() {
break;
}
let n_bytes = (header.num_entries as usize) * size_of::<XorbChunkSequenceEntry>();
let total_len = size_of::<XorbChunkSequenceHeader>() + n_bytes;
let mut xorb_data = Vec::with_capacity(total_len);
xorb_data.extend_from_slice(&header_buf); xorb_data.resize(total_len, 0);
reader
.read_exact(&mut xorb_data[size_of::<XorbChunkSequenceHeader>()..])
.await?;
xorb_callback(MDBXorbInfoView::from_data_and_header(header, Bytes::from(xorb_data))?)?;
}
Ok(())
}
#[derive(Clone, Debug, PartialEq)]
pub struct MDBMinimalShard {
file_info_views: Vec<MDBFileInfoView>,
xorb_info_views: Vec<MDBXorbInfoView>,
}
impl MDBMinimalShard {
pub fn from_reader<R: Read>(reader: &mut R, include_files: bool, include_xorb: bool) -> Result<Self> {
let _ = MDBShardFileHeader::deserialize(reader)?;
let mut file_info_views = Vec::<MDBFileInfoView>::new();
process_shard_file_info_section(reader, |fiv: MDBFileInfoView| {
if include_files {
file_info_views.push(fiv);
}
Ok(())
})?;
let mut xorb_info_views = Vec::<MDBXorbInfoView>::new();
if include_xorb {
process_shard_xorb_info_section(reader, |civ: MDBXorbInfoView| {
xorb_info_views.push(civ);
Ok(())
})?;
}
Ok(Self {
file_info_views,
xorb_info_views,
})
}
pub async fn from_reader_async<R: AsyncRead + Unpin>(
reader: &mut R,
include_files: bool,
include_xorb: bool,
) -> Result<Self> {
Self::from_reader_async_with_custom_callbacks(reader, include_files, include_xorb, |_| Ok(()), |_| Ok(())).await
}
pub async fn from_reader_async_with_custom_callbacks<R: AsyncRead + Unpin, FileFunc, XorbFunc>(
reader: &mut R,
include_files: bool,
include_xorb: bool,
mut file_callback: FileFunc,
mut xorb_callback: XorbFunc,
) -> Result<Self>
where
FileFunc: FnMut(&MDBFileInfoView) -> Result<()>,
XorbFunc: FnMut(&MDBXorbInfoView) -> Result<()>,
{
let mut buf = [0u8; size_of::<MDBShardFileHeader>()];
reader.read_exact(&mut buf[..]).await?;
let _ = MDBShardFileHeader::deserialize(&mut Cursor::new(&buf))?;
let mut file_info_views = Vec::<MDBFileInfoView>::new();
process_shard_file_info_section_async(reader, |fiv: MDBFileInfoView| {
if include_files {
file_callback(&fiv)?;
file_info_views.push(fiv);
}
Ok(())
})
.await?;
if !file_info_views.is_empty() && !file_info_views.iter().map(|fiv| fiv.contains_verification()).all_equal() {
return Err(CoreError::invalid_shard("only some files contain verification"));
}
let mut xorb_info_views = Vec::<MDBXorbInfoView>::new();
if include_xorb {
process_shard_xorb_info_section_async(reader, |civ: MDBXorbInfoView| {
xorb_callback(&civ)?;
xorb_info_views.push(civ);
Ok(())
})
.await?;
}
Ok(Self {
file_info_views,
xorb_info_views,
})
}
pub fn has_file_verification(&self) -> bool {
let Some(file_info_view) = self.file_info_views.first() else {
return false;
};
file_info_view.contains_verification()
}
pub fn num_files(&self) -> usize {
self.file_info_views.len()
}
pub fn file(&self, index: usize) -> Option<&MDBFileInfoView> {
self.file_info_views.get(index)
}
pub fn num_xorb(&self) -> usize {
self.xorb_info_views.len()
}
pub fn xorb(&self, index: usize) -> Option<&MDBXorbInfoView> {
self.xorb_info_views.get(index)
}
pub fn serialized_size(&self, with_verification: bool) -> usize {
if with_verification && !self.has_file_verification() {
return 0;
}
size_of::<MDBShardFileHeader>()
+ self
.file_info_views
.iter()
.fold(0, |acc, fiv| acc + fiv.byte_size(with_verification))
+ size_of::<FileDataSequenceHeader>() + self.xorb_info_views.iter().fold(0, |acc, civ| acc + civ.byte_size())
+ size_of::<XorbChunkSequenceHeader>() + size_of::<MDBShardFileFooter>()
}
fn file_start_entries(&self) -> MerkleHashMap<Vec<usize>> {
let mut file_start_entries = MerkleHashMap::<Vec<usize>>::new();
for f_idx in 0..self.num_files() {
let Some(fv) = self.file(f_idx) else {
break;
};
if fv.num_entries() > 0 {
let entry = fv.entry(0);
let xorb_hash = entry.xorb_hash;
let idx = entry.chunk_index_start;
file_start_entries.entry(xorb_hash).or_default().push(idx as usize);
}
}
for v in file_start_entries.values_mut() {
v.sort_unstable();
v.dedup();
}
file_start_entries
}
fn serialize_impl<W: Write>(
&self,
writer: &mut W,
with_file_section: bool,
with_verification: bool,
expiry: Option<SystemTime>,
xorb_filter_fn: impl Fn(&MDBXorbInfoView) -> bool,
) -> Result<usize> {
let mut bytes = 0;
bytes += MDBShardFileHeader::default().serialize(writer)?;
let mut stored_bytes_on_disk = 0;
let mut stored_bytes = 0;
let mut materialized_bytes = 0;
let file_start_chunks = self.file_start_entries();
let fs_start = bytes as u64;
if with_file_section {
for file_info in &self.file_info_views {
for j in 0..file_info.num_entries() {
let segment_info = file_info.entry(j);
materialized_bytes += segment_info.unpacked_segment_bytes as u64;
}
bytes += file_info.serialize(writer, with_verification)?;
}
}
bytes += FileDataSequenceHeader::bookend().serialize(writer)?;
let cs_start = bytes as u64;
for xorb_info in &self.xorb_info_views {
if !xorb_filter_fn(xorb_info) {
continue;
}
stored_bytes_on_disk += xorb_info.header().num_bytes_on_disk as u64;
stored_bytes += xorb_info.header().num_bytes_in_xorb as u64;
if let Some(gde_indices) = file_start_chunks.get(&xorb_info.xorb_hash()) {
debug_assert!(gde_indices.is_sorted());
bytes += xorb_info.serialize_with_chunk_rewrite(writer, |idx, chunk| {
if gde_indices.binary_search(&idx).is_ok() {
chunk.with_global_dedup_flag(true)
} else {
chunk
}
})?;
} else {
bytes += xorb_info.serialize(writer)?;
}
}
bytes += XorbChunkSequenceHeader::bookend().serialize(writer)?;
let footer_start = bytes as u64;
bytes += MDBShardFileFooter {
file_info_offset: fs_start,
xorb_info_offset: cs_start,
file_lookup_offset: footer_start,
file_lookup_num_entry: 0,
xorb_lookup_offset: footer_start,
xorb_lookup_num_entry: 0,
chunk_lookup_offset: footer_start,
chunk_lookup_num_entry: 0,
shard_creation_timestamp: current_timestamp(),
shard_key_expiry: expiry
.map_or(0, |t| t.duration_since(std::time::UNIX_EPOCH).unwrap_or_default().as_secs()),
stored_bytes_on_disk,
materialized_bytes,
stored_bytes,
footer_offset: footer_start,
..Default::default()
}
.serialize(writer)?;
Ok(bytes)
}
pub fn serialize_xorb_subset_only<W: Write>(
&self,
writer: &mut W,
xorb_filter_fn: impl Fn(&MDBXorbInfoView) -> bool,
) -> Result<usize> {
self.serialize_impl(writer, false, false, None, xorb_filter_fn)
}
pub fn serialize_xorb_subset_with_expiry<W: Write>(
&self,
writer: &mut W,
expiry: Option<SystemTime>,
xorb_filter_fn: impl Fn(&MDBXorbInfoView) -> bool,
) -> Result<usize> {
self.serialize_impl(writer, false, false, expiry, xorb_filter_fn)
}
pub fn serialize<W: Write>(&self, writer: &mut W, with_verification: bool) -> Result<usize> {
self.serialize_impl(writer, true, with_verification, None, |_| true)
}
pub fn global_dedup_eligible_chunks(&self) -> Vec<MerkleHash> {
let mut ret = HashSet::<MerkleHash>::new();
let file_start_entries = self.file_start_entries();
for xorb_idx in 0..self.num_xorb() {
let Some(xorb_view) = self.xorb(xorb_idx) else {
break;
};
let num_entries = xorb_view.num_entries();
if let Some(fse) = file_start_entries.get(&xorb_view.xorb_hash()) {
for &c_idx in fse {
debug_assert_lt!(c_idx, num_entries);
if c_idx < num_entries {
let chunk_hash = xorb_view.chunk(c_idx).chunk_hash;
ret.insert(chunk_hash);
}
}
}
for c_idx in 0..num_entries {
let chunk = xorb_view.chunk(c_idx);
if chunk.is_global_dedup_eligible() {
ret.insert(chunk.chunk_hash);
}
}
}
Vec::from_iter(ret)
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::io::Cursor;
use std::time::{Duration, SystemTime};
use rand::rngs::SmallRng;
use rand::{RngExt, SeedableRng};
use super::super::MDBShardInfo;
use super::super::file_structs::MDBFileInfo;
use super::super::shard_file::test_routines::{
convert_to_file, gen_random_shard, gen_random_shard_with_xorb_references,
};
use super::super::shard_in_memory::MDBInMemoryShard;
use super::super::xorb_structs::MDBXorbInfo;
use super::MDBMinimalShard;
use crate::error::Result;
use crate::merklehash::MerkleHash;
fn verify_serialization(min_shard: &MDBMinimalShard, mem_shard: &MDBInMemoryShard) -> Result<()> {
for verification in [true, false] {
let size = min_shard.serialized_size(min_shard.has_file_verification() && verification);
assert_ne!(0, size);
if !min_shard.has_file_verification() {
assert_eq!(0, min_shard.serialized_size(true))
}
let mut reloaded_shard = Vec::new();
let serialize_result = min_shard.serialize(&mut reloaded_shard, verification);
if !min_shard.has_file_verification() && verification && min_shard.num_files() > 0 {
assert!(serialize_result.is_err());
continue;
}
assert!(serialize_result.is_ok());
let serialized_len = serialize_result?;
assert_eq!(reloaded_shard.len(), serialized_len);
assert_eq!(size, serialized_len);
let si = MDBShardInfo::load_from_reader(&mut Cursor::new(&reloaded_shard)).unwrap();
let file_info: Vec<MDBFileInfo> =
si.read_all_file_info_sections(&mut Cursor::new(&reloaded_shard)).unwrap();
let mem_file_info: Vec<_> = mem_shard.file_content.clone().into_values().collect();
for (i, (read, mem)) in file_info.iter().zip(mem_file_info.iter()).enumerate() {
assert!(read.equal_accepting_no_verification(mem), "i: {i} verification = {verification}");
}
let xorb_info: Vec<MDBXorbInfo> = si.read_all_xorb_blocks_full(&mut Cursor::new(&reloaded_shard)).unwrap();
let mem_xorb_info: Vec<_> = mem_shard.xorb_content.clone().into_values().collect();
assert_eq!(xorb_info.len(), mem_xorb_info.len(), "verification = {verification}");
for i in 0..xorb_info.len() {
let c1 = &xorb_info[i];
let c2 = mem_xorb_info[i].as_ref();
assert_eq!(c1.metadata, c2.metadata);
for (ch1, ch2) in c1.chunks.iter().zip(c2.chunks.iter()) {
let ch1 = ch1.clone().with_global_dedup_flag(false);
assert_eq!(&ch1, ch2);
}
}
}
Ok(())
}
async fn verify_minimal_shard(mem_shard: &MDBInMemoryShard) -> Result<()> {
let buffer = convert_to_file(mem_shard)?;
{
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, true).unwrap();
let min_shard_async = MDBMinimalShard::from_reader_async(&mut &buffer[..], true, true).await.unwrap();
assert_eq!(min_shard, min_shard_async);
for i in 0..min_shard.num_files() {
let file_view = min_shard.file(i).unwrap();
let file_info = MDBFileInfo::from(file_view);
assert_eq!(file_info.metadata.file_hash, file_view.file_hash());
assert_eq!(file_info.segments.len(), file_view.num_entries());
assert_eq!(file_info.contains_verification(), file_view.contains_verification());
assert_eq!(file_info.contains_metadata_ext(), file_view.contains_metadata_ext());
}
for i in 0..min_shard.num_xorb() {
let xorb_view = min_shard.xorb(i).unwrap();
let xorb_info = MDBXorbInfo::from(xorb_view);
assert_eq!(xorb_info.metadata.xorb_hash, xorb_view.xorb_hash());
assert_eq!(xorb_info.chunks.len(), xorb_view.num_entries());
}
verify_serialization(&min_shard, mem_shard).unwrap();
}
{
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, false).unwrap();
let min_shard_async = MDBMinimalShard::from_reader_async(&mut &buffer[..], true, false).await.unwrap();
assert_eq!(min_shard, min_shard_async);
let mut file_only_memshard = mem_shard.clone();
file_only_memshard.xorb_content.clear();
file_only_memshard.chunk_hash_lookup.clear();
verify_serialization(&min_shard, &file_only_memshard).unwrap();
}
{
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), false, true).unwrap();
let min_shard_async = MDBMinimalShard::from_reader_async(&mut &buffer[..], false, true).await.unwrap();
assert_eq!(min_shard, min_shard_async);
let mut xorb_only_memshard = mem_shard.clone();
xorb_only_memshard.file_content.clear();
verify_serialization(&min_shard, &xorb_only_memshard).unwrap();
}
{
let mut file_info_views = vec![];
let mut xorb_info_views = vec![];
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, true).unwrap();
let min_shard_async = MDBMinimalShard::from_reader_async_with_custom_callbacks(
&mut &buffer[..],
true,
true,
|f| {
file_info_views.push(f.clone());
Ok(())
},
|c| {
xorb_info_views.push(c.clone());
Ok(())
},
)
.await
.unwrap();
assert_eq!(min_shard, min_shard_async);
assert_eq!(file_info_views, min_shard.file_info_views);
assert_eq!(xorb_info_views, min_shard.xorb_info_views);
let mut xorb_only_memshard = mem_shard.clone();
xorb_only_memshard.file_content.clear();
verify_serialization(&min_shard, mem_shard).unwrap();
}
Ok(())
}
#[tokio::test]
async fn test_shards() -> Result<()> {
let shard = gen_random_shard(0, &[], &[0], false, false)?;
verify_minimal_shard(&shard).await?;
let shard = gen_random_shard(0, &[1], &[1, 1], false, false)?;
verify_minimal_shard(&shard).await?;
let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, false)?;
verify_minimal_shard(&shard).await?;
let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, false)?;
verify_minimal_shard(&shard).await?;
let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, true)?;
verify_minimal_shard(&shard).await?;
let shard = gen_random_shard(0, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, true)?;
verify_minimal_shard(&shard).await?;
Ok(())
}
async fn verify_minimal_shard_dedup_processing(mem_shard: &MDBInMemoryShard) {
verify_minimal_shard(mem_shard).await.unwrap();
let buffer = convert_to_file(mem_shard).unwrap();
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, true).unwrap();
let ref_global_dedup_chunks: HashSet<_> = min_shard.global_dedup_eligible_chunks().into_iter().collect();
let mut xorb_only_shard_buffer = Vec::<u8>::new();
min_shard
.serialize_xorb_subset_only(&mut xorb_only_shard_buffer, |_| true)
.unwrap();
let xorb_only_shard =
MDBMinimalShard::from_reader(&mut Cursor::new(&xorb_only_shard_buffer), true, true).unwrap();
let global_dedup_chunks: HashSet<_> = xorb_only_shard.global_dedup_eligible_chunks().into_iter().collect();
assert_eq!(ref_global_dedup_chunks, global_dedup_chunks);
let mut chunk_hashes = HashMap::<MerkleHash, Vec<usize>>::new();
let mut xorb_map = HashMap::<MerkleHash, usize>::new();
let mut rng = SmallRng::seed_from_u64(0);
for xi in 0..min_shard.num_xorb() {
let xorb = min_shard.xorb(xi).unwrap();
let group = rng.random_range(0..=3);
xorb_map.insert(xorb.xorb_hash(), group);
for ci in 0..xorb.num_entries() {
let chunk_hash = xorb.chunk(ci).chunk_hash;
if ref_global_dedup_chunks.contains(&chunk_hash) {
chunk_hashes.entry(chunk_hash).or_default().push(group);
}
}
}
for grp_set_threshhold in 1..4 {
let xorb_filter_fn = |xh| *xorb_map.get(&xh).unwrap() < grp_set_threshhold;
let ref_filtered_xorbs: HashSet<MerkleHash> =
xorb_map.keys().filter(|&&xh| xorb_filter_fn(xh)).cloned().collect();
let ref_filtered_global_dedup_chunks: HashSet<_> = chunk_hashes
.iter()
.filter(|(_, grp_set): &(&MerkleHash, &Vec<usize>)| grp_set.iter().any(|&grp| grp < grp_set_threshhold))
.map(|(&ch, _)| ch)
.collect();
let mut xo_subset_shard_buffer = Vec::<u8>::new();
min_shard
.serialize_xorb_subset_only(&mut xo_subset_shard_buffer, |xorb| xorb_filter_fn(xorb.xorb_hash()))
.unwrap();
let xo_subset_shard =
MDBMinimalShard::from_reader(&mut Cursor::new(&xo_subset_shard_buffer), true, true).unwrap();
assert_eq!(xo_subset_shard.num_files(), 0);
assert_eq!(xo_subset_shard.num_xorb(), ref_filtered_xorbs.len());
let xorbs_present: HashSet<_> = (0..xo_subset_shard.num_xorb())
.map(|i| xo_subset_shard.xorb(i).unwrap().xorb_hash())
.collect();
assert_eq!(xorbs_present, ref_filtered_xorbs);
let xo_global_dedup_chunks: HashSet<_> =
xo_subset_shard.global_dedup_eligible_chunks().into_iter().collect();
assert_eq!(ref_filtered_global_dedup_chunks, xo_global_dedup_chunks);
}
}
#[tokio::test]
async fn test_shard_processing() {
let shard = gen_random_shard_with_xorb_references(1, &[1], &[1], false, false).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
let shard = gen_random_shard_with_xorb_references(1, &[2], &[1, 1], false, false).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
let shard =
gen_random_shard_with_xorb_references(1, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, false).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
let shard = gen_random_shard_with_xorb_references(1, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, false).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
let shard = gen_random_shard_with_xorb_references(1, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], false, true).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
let shard = gen_random_shard_with_xorb_references(1, &[1, 5, 10, 8], &[4, 3, 5, 9, 4, 6], true, true).unwrap();
verify_minimal_shard_dedup_processing(&shard).await;
}
#[test]
fn test_serialize_xorb_subset_with_expiry_footer() {
let shard = gen_random_shard_with_xorb_references(1, &[1, 2], &[3, 2], true, true).unwrap();
let buffer = convert_to_file(&shard).unwrap();
let min_shard = MDBMinimalShard::from_reader(&mut Cursor::new(&buffer), true, true).unwrap();
let mut no_expiry_buffer = Vec::new();
min_shard
.serialize_xorb_subset_with_expiry(&mut no_expiry_buffer, None, |_| true)
.unwrap();
let no_expiry_info = MDBShardInfo::load_from_reader(&mut Cursor::new(&no_expiry_buffer)).unwrap();
assert_eq!(no_expiry_info.metadata.shard_key_expiry, 0);
let expiry_secs = super::current_timestamp().saturating_add(12345);
let expiry = SystemTime::UNIX_EPOCH + Duration::from_secs(expiry_secs);
let mut expiry_buffer = Vec::new();
min_shard
.serialize_xorb_subset_with_expiry(&mut expiry_buffer, Some(expiry), |_| true)
.unwrap();
let expiry_info = MDBShardInfo::load_from_reader(&mut Cursor::new(&expiry_buffer)).unwrap();
assert_eq!(expiry_info.metadata.shard_key_expiry, expiry_secs);
assert!(expiry_info.metadata.shard_key_expiry > super::current_timestamp());
}
}