use super::ZSTD_SKIP_FRAME_LEN;
use crate::{
db::car::{forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER, plain::write_skip_frame_header_async},
utils::misc::env::is_env_truthy,
};
#[cfg_vis(feature = "benchmark-private", pub)]
use self::util::NonMaximalU64;
use byteorder::{LittleEndian, ReadBytesExt as _};
use cfg_vis::cfg_vis;
use cid::Cid;
use itertools::Itertools;
use positioned_io::{ReadAt, ReadBytesAtExt as _, Size};
use smallvec::{SmallVec, smallvec};
use std::{
cmp,
io::{self, Read},
iter,
num::NonZeroUsize,
pin::pin,
};
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
#[cfg(not(any(test, feature = "benchmark-private")))]
mod hash;
#[cfg(any(test, feature = "benchmark-private"))]
pub mod hash;
pub struct Reader<R> {
inner: R,
table_offset: u64,
#[cfg(feature = "benchmark-private")]
pub header: V1Header,
#[cfg(not(feature = "benchmark-private"))]
header: V1Header,
}
impl<R> Reader<R>
where
R: ReadAt,
{
pub fn new(reader: R) -> io::Result<Self> {
let mut reader = positioned_io::Cursor::new(reader);
let Version::V1 = Version::read_from(&mut reader)? else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"unsupported embedded index version",
));
};
let header = V1Header::read_from(&mut reader)?;
Ok(Self {
table_offset: reader.position(),
inner: reader.into_inner(),
header,
})
}
pub fn get(&self, key: Cid) -> io::Result<SmallVec<[u64; 1]>> {
self.get_by_hash(hash::summary(&key))
}
#[cfg_vis(feature = "benchmark-private", pub)]
fn get_by_hash(&self, needle: NonMaximalU64) -> io::Result<SmallVec<[u64; 1]>> {
let Some(initial_buckets) =
NonZeroUsize::new(self.header.initial_buckets.try_into().unwrap())
else {
return Ok(smallvec![]); };
let offset_in_table =
u64::try_from(hash::ideal_slot_ix(needle, initial_buckets)).unwrap() * RawSlot::LEN;
let mut haystack =
positioned_io::Cursor::new_pos(&self.inner, self.table_offset + offset_in_table);
let mut limit = self.header.longest_distance;
while let Slot::Occupied(OccupiedSlot { hash, frame_offset }) =
Slot::read_from(&mut haystack)?
{
if hash == needle {
let mut found = smallvec![frame_offset];
loop {
match Slot::read_from(&mut haystack)? {
Slot::Occupied(another) if another.hash == needle => {
found.push(another.frame_offset)
}
Slot::Empty | Slot::Occupied(_) => return Ok(found),
}
}
}
if limit == 0 {
return Ok(smallvec![]);
}
limit -= 1;
}
Ok(smallvec![]) }
pub fn reader(&self) -> &R {
&self.inner
}
pub fn map<T>(self, f: impl FnOnce(R) -> T) -> Reader<T> {
Reader {
inner: f(self.inner),
table_offset: self.table_offset,
header: self.header,
}
}
}
pub struct ZstdSkipFramesEncodedDataReader<R> {
reader: R,
skip_frame_header_offsets: Vec<u64>,
}
impl<R: ReadAt> ZstdSkipFramesEncodedDataReader<R> {
pub fn new(reader: R) -> Self {
let mut offset = 0;
let mut skip_frame_header_offsets = vec![];
while let Ok(data_len) = reader
.read_u32_at::<LittleEndian>(offset + ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER.len() as u64)
{
skip_frame_header_offsets.push(offset);
offset += ZSTD_SKIP_FRAME_LEN + u64::from(data_len);
}
Self {
reader,
skip_frame_header_offsets,
}
}
pub fn inner(&self) -> &R {
&self.reader
}
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: Size> Size for ZstdSkipFramesEncodedDataReader<R> {
fn size(&self) -> io::Result<Option<u64>> {
if let Some(size) = self.reader.size()? {
let total_header_size =
ZSTD_SKIP_FRAME_LEN * self.skip_frame_header_offsets.len() as u64;
if size >= total_header_size {
Ok(Some(size - total_header_size))
} else {
Err(io::Error::other(format!(
"unexpected error: size({size}) < total_header_size({total_header_size})"
)))
}
} else {
Ok(None)
}
}
}
impl<R> ReadAt for ZstdSkipFramesEncodedDataReader<R>
where
R: ReadAt,
{
fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
let mut adjusted_pos = pos;
let mut next_frame_pos = None;
for &p in self.skip_frame_header_offsets.iter() {
if p <= adjusted_pos {
adjusted_pos += ZSTD_SKIP_FRAME_LEN;
} else {
next_frame_pos = Some(p);
break;
}
}
if let Some(next_frame_pos) = next_frame_pos
&& let max_read_len = (next_frame_pos - adjusted_pos) as usize
&& max_read_len < buf.len()
{
#[allow(clippy::indexing_slicing)]
Ok(self
.reader
.read_at(adjusted_pos, &mut buf[..max_read_len])?
+ self.read_at(pos + max_read_len as u64, &mut buf[max_read_len..])?)
} else {
self.reader.read_at(adjusted_pos, buf)
}
}
}
#[cfg_vis(feature = "benchmark-private", pub)]
#[allow(unused)] struct Iter<R> {
inner: R,
positions: iter::StepBy<std::ops::Range<u64>>,
}
#[allow(unused)] impl<R> Iterator for Iter<R>
where
R: ReadAt + Size,
{
type Item = io::Result<Slot>;
fn next(&mut self) -> Option<Self::Item> {
self.positions
.next()
.map(|pos| Slot::read_from(positioned_io::Cursor::new_pos(&self.inner, pos)))
}
}
impl<R> Reader<R>
where
R: ReadAt + Size,
{
#[cfg_vis(feature = "benchmark-private", pub)]
#[allow(unused)] fn iter(&self) -> io::Result<Iter<&R>> {
let end = self
.inner
.size()?
.ok_or_else(|| io::Error::other("couldn't get end of table size"))?;
Ok(Iter {
inner: &self.inner,
positions: (self.table_offset..end).step_by(Slot::LEN.try_into().unwrap()),
})
}
}
const DEFAULT_LOAD_FACTOR: f64 = 0.8;
pub struct Builder {
load_factor: f64,
slots: Vec<(usize, OccupiedSlot)>,
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl Builder {
pub fn new() -> Self {
Self::new_with_load_factor(DEFAULT_LOAD_FACTOR)
}
fn new_with_load_factor(load_factor: f64) -> Self {
Self {
load_factor,
slots: vec![],
}
}
pub fn into_writer(self) -> Writer {
let Self {
load_factor,
mut slots,
} = self;
slots.sort_unstable_by_key(|(_, it)| *it);
slots.dedup_by_key(|(_, it)| *it);
let Some(initial_width) = initial_width(slots.len(), load_factor) else {
return Writer {
version: Version::V1,
header: V1Header {
longest_distance: 0,
collisions: 0,
initial_buckets: 0,
},
slots: vec![],
};
};
let collisions = slots
.iter()
.chunk_by(|(_, it)| it.hash)
.into_iter()
.map(|(_, group)| group.count() - 1)
.sum::<usize>();
let mut total_padding = 0;
let mut longest_distance = 0;
for (ix, (pre_padding, slot)) in slots.iter_mut().enumerate() {
let ix = ix + total_padding;
let ideal_ix = hash::ideal_slot_ix(slot.hash, initial_width);
*pre_padding = ideal_ix.saturating_sub(ix);
let actual_ix = ix + *pre_padding;
let distance = actual_ix - ideal_ix;
longest_distance = cmp::max(longest_distance, distance);
total_padding += *pre_padding;
}
Writer {
version: Version::V1,
header: V1Header {
longest_distance: longest_distance.try_into().unwrap(),
collisions: collisions.try_into().unwrap(),
initial_buckets: initial_width.get().try_into().unwrap(),
},
slots,
}
}
}
impl Extend<(Cid, u64)> for Builder {
fn extend<T: IntoIterator<Item = (Cid, u64)>>(&mut self, iter: T) {
self.extend(iter.into_iter().map(|(cid, u)| (hash::summary(&cid), u)))
}
}
impl Extend<(NonMaximalU64, u64)> for Builder {
fn extend<T: IntoIterator<Item = (NonMaximalU64, u64)>>(&mut self, iter: T) {
self.slots.extend(
iter.into_iter()
.map(|(hash, frame_offset)| (0, OccupiedSlot { hash, frame_offset })),
)
}
}
impl FromIterator<(Cid, u64)> for Builder {
fn from_iter<T: IntoIterator<Item = (Cid, u64)>>(iter: T) -> Self {
let mut this = Self::default();
this.extend(iter);
this
}
}
impl FromIterator<(NonMaximalU64, u64)> for Builder {
fn from_iter<T: IntoIterator<Item = (NonMaximalU64, u64)>>(iter: T) -> Self {
let mut this = Self::default();
this.extend(iter);
this
}
}
pub struct Writer {
version: Version,
header: V1Header,
slots: Vec<(usize, OccupiedSlot)>,
}
impl Writer {
fn written_len(&self) -> u64 {
let Self {
version,
header,
slots,
} = self;
written_len(version)
+ written_len(header)
+ cmp::max(
u64::try_from(
slots
.iter()
.map(|(pre, _)| *pre + 1 )
.sum::<usize>()
+ 1,
)
.unwrap(),
header.initial_buckets + 1,
) * Slot::LEN
}
fn slots(
min_slots: usize,
slots: impl IntoIterator<Item = (usize, OccupiedSlot)>,
) -> impl Iterator<Item = Slot> {
slots
.into_iter()
.flat_map(|(pre, occ)| {
std::iter::repeat_n(Slot::Empty, pre).chain(iter::once(Slot::Occupied(occ)))
})
.pad_using(min_slots, |_ix| Slot::Empty)
.chain(iter::once(Slot::Empty))
}
pub async fn write_zstd_skip_frames_into(self, writer: impl AsyncWrite) -> io::Result<()> {
const CHUNK_FRAME_DATA_MAX_BYTES: usize = 512 * 1024 * 1024;
let written_len = self.written_len();
self.write_zstd_skip_frames_into_inner(
writer,
CHUNK_FRAME_DATA_MAX_BYTES,
u32::try_from(written_len).ok(),
)
.await
}
async fn write_zstd_skip_frames_into_inner(
self,
writer: impl AsyncWrite,
skip_frame_data_max_bytes: usize,
index_data_len: Option<u32>,
) -> io::Result<()> {
let mut writer = pin!(writer);
let Self {
version,
header,
slots,
} = self;
let slots = Self::slots(
header.initial_buckets.try_into().unwrap(),
slots.iter().copied(),
);
if let Some(index_data_len) = index_data_len
&& !is_env_truthy("FOREST_CAR_INDEX_USE_MULTIPLE_SKIP_FRAMES")
{
write_skip_frame_header_async(&mut writer, index_data_len).await?;
version.write_to(&mut writer).await?;
header.write_to(&mut writer).await?;
for slot in slots {
slot.write_to(&mut writer).await?;
}
} else {
let mut buf = Vec::with_capacity(skip_frame_data_max_bytes);
version.write_to(&mut buf).await?;
header.write_to(&mut buf).await?;
for slot in slots {
slot.write_to(&mut buf).await?;
if buf.len() >= skip_frame_data_max_bytes {
write_skip_frame_header_async(&mut writer, buf.len() as u32).await?;
writer.write_all(&buf).await?;
buf.clear();
}
}
if !buf.is_empty() {
write_skip_frame_header_async(&mut writer, buf.len() as u32).await?;
writer.write_all(&buf).await?;
}
}
Ok(())
}
}
fn initial_width(slots_len: usize, load_factor: f64) -> Option<NonZeroUsize> {
NonZeroUsize::new(cmp::max(
(slots_len as f64 / load_factor) as usize,
slots_len,
))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, num_derive::FromPrimitive)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[repr(u64)]
enum Version {
V0 = 0xdeadbeef,
V1 = 0xdeadbeef + 1,
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[cfg_vis(feature = "benchmark-private", pub)]
struct V1Header {
longest_distance: u64,
collisions: u64,
pub initial_buckets: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[cfg_vis(feature = "benchmark-private", pub)]
struct OccupiedSlot {
pub hash: NonMaximalU64,
frame_offset: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[cfg_vis(feature = "benchmark-private", pub)]
enum Slot {
Empty,
Occupied(OccupiedSlot),
}
impl Slot {
fn into_raw(self) -> RawSlot {
match self {
Slot::Empty => RawSlot::EMPTY,
Slot::Occupied(OccupiedSlot { hash, frame_offset }) => RawSlot {
hash: hash.get(),
frame_offset,
},
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
struct RawSlot {
hash: u64,
frame_offset: u64,
}
impl RawSlot {
const EMPTY: Self = Self {
hash: u64::MAX,
frame_offset: u64::MAX,
};
}
impl Readable for Version {
fn read_from(mut reader: impl Read) -> io::Result<Self>
where
Self: Sized,
{
num::FromPrimitive::from_u64(reader.read_u64::<LittleEndian>()?).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "unknown header magic/version")
})
}
}
impl Writable for Version {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u64_le(*self as u64).await
}
const LEN: u64 = std::mem::size_of::<u64>() as u64;
}
impl Readable for Slot {
fn read_from(reader: impl Read) -> io::Result<Self>
where
Self: Sized,
{
let raw @ RawSlot { hash, frame_offset } = Readable::read_from(reader)?;
match NonMaximalU64::new(hash) {
Some(hash) => Ok(Self::Occupied(OccupiedSlot { hash, frame_offset })),
None => match raw == RawSlot::EMPTY {
true => Ok(Self::Empty),
false => Err(io::Error::new(
io::ErrorKind::InvalidData,
"empty slots must have a frame offset of u64::MAX",
)),
},
}
}
}
impl Writable for Slot {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
self.into_raw().write_to(writer).await
}
const LEN: u64 = RawSlot::LEN;
}
impl Readable for RawSlot {
fn read_from(mut reader: impl Read) -> io::Result<Self>
where
Self: Sized,
{
Ok(Self {
hash: reader.read_u64::<LittleEndian>()?,
frame_offset: reader.read_u64::<LittleEndian>()?,
})
}
}
impl Writable for RawSlot {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
let Self { hash, frame_offset } = *self;
writer.write_u64_le(hash).await?;
writer.write_u64_le(frame_offset).await?;
Ok(())
}
const LEN: u64 = std::mem::size_of::<u64>() as u64 * 2;
}
impl Readable for V1Header {
fn read_from(mut reader: impl Read) -> io::Result<Self>
where
Self: Sized,
{
Ok(Self {
longest_distance: reader.read_u64::<LittleEndian>()?,
collisions: reader.read_u64::<LittleEndian>()?,
initial_buckets: reader.read_u64::<LittleEndian>()?,
})
}
}
impl Writable for V1Header {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()> {
let Self {
longest_distance,
collisions,
initial_buckets,
} = *self;
writer.write_u64_le(longest_distance).await?;
writer.write_u64_le(collisions).await?;
writer.write_u64_le(initial_buckets).await?;
Ok(())
}
const LEN: u64 = std::mem::size_of::<u64>() as u64 * 3;
}
trait Readable {
fn read_from(reader: impl Read) -> io::Result<Self>
where
Self: Sized;
}
#[auto_impl::auto_impl(&)]
trait Writable {
async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> io::Result<()>;
const LEN: u64;
}
fn written_len<T: Writable>(_: &T) -> u64 {
T::LEN
}
mod util {
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct NonMaximalU64(u64);
impl NonMaximalU64 {
pub fn new(u: u64) -> Option<Self> {
match u == u64::MAX {
true => None,
false => Some(Self(u)),
}
}
pub fn fit(u: u64) -> Self {
Self(u.saturating_sub(1))
}
pub fn get(&self) -> u64 {
self.0
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for NonMaximalU64 {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
Self::fit(u64::arbitrary(g))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ahash::{HashMap, HashSet};
use cid::Cid;
use tap::Tap as _;
use tokio_test::block_on;
fn do_hashmap_of_cids(reference: HashMap<Cid, HashSet<u64>>) {
for multi_index_frame in [false, true] {
let r: ZstdSkipFramesEncodedDataReader<Vec<u8>> =
ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| {
let writer = Builder::from_iter(reference.clone().into_iter().flat_map(
|(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)),
))
.into_writer();
block_on(async {
if multi_index_frame {
writer
.write_zstd_skip_frames_into_inner(&mut *v, 128, None)
.await
} else {
writer.write_zstd_skip_frames_into(&mut *v).await
}
})?;
Ok(())
}));
if multi_index_frame {
assert!(!r.skip_frame_header_offsets.is_empty());
} else {
assert_eq!(r.skip_frame_header_offsets.len(), 1);
}
let subject = Reader::new(r).unwrap();
for (&cid, expected) in &reference {
let actual = subject.get(cid).unwrap().into_iter().collect();
assert!(expected.is_subset(&actual)); }
}
}
fn do_hashmap_of_hashes(reference: HashMap<NonMaximalU64, HashSet<u64>>) {
for multi_index_frame in [false, true] {
let r = ZstdSkipFramesEncodedDataReader::new(write_to_vec(|v| {
let writer = Builder::from_iter(reference.clone().into_iter().flat_map(
|(hash, offsets)| offsets.into_iter().map(move |offset| (hash, offset)),
))
.into_writer();
block_on(async {
if multi_index_frame {
writer
.write_zstd_skip_frames_into_inner(&mut *v, 128, None)
.await
} else {
writer.write_zstd_skip_frames_into(&mut *v).await
}
})?;
Ok(())
}));
if multi_index_frame {
assert!(!r.skip_frame_header_offsets.is_empty());
} else {
assert_eq!(r.skip_frame_header_offsets.len(), 1);
}
let subject = Reader::new(r).unwrap();
for (hash, expected) in &reference {
let actual = subject.get_by_hash(*hash).unwrap().into_iter().collect();
assert!(expected.is_subset(&actual)) }
let via_iter = subject
.iter()
.unwrap()
.filter_map(|it| match it.unwrap() {
Slot::Empty => None,
Slot::Occupied(it) => Some(it),
})
.chunk_by(|it| it.hash)
.into_iter()
.map(|(hash, group)| (hash, HashSet::from_iter(group.map(|it| it.frame_offset))))
.collect::<HashMap<_, _>>();
assert_eq!(
via_iter,
reference
.clone()
.tap_mut(|it| it.retain(|_, v| !v.is_empty()))
);
}
}
#[quickcheck_macros::quickcheck]
fn hashmap_of_cids(reference: HashMap<Cid, HashSet<u64>>) {
do_hashmap_of_cids(reference)
}
#[quickcheck_macros::quickcheck]
fn hashmap_of_hashes(reference: HashMap<NonMaximalU64, HashSet<u64>>) {
do_hashmap_of_hashes(reference)
}
#[quickcheck_macros::quickcheck]
fn everything_maps_to_first_slot(values: Vec<HashSet<u64>>) {
let Some(initial_width) =
initial_width(values.iter().map(HashSet::len).sum(), DEFAULT_LOAD_FACTOR)
else {
return;
};
let reference = HashMap::from_iter(iter::zip(
hash::from_ideal_slot_ix(0, initial_width).unique(),
values,
));
do_hashmap_of_hashes(reference)
}
#[quickcheck_macros::quickcheck]
fn everything_maps_to_first_10_slots(values: Vec<HashSet<u64>>) {
let Some(initial_width) =
initial_width(values.iter().map(HashSet::len).sum(), DEFAULT_LOAD_FACTOR)
else {
return;
};
let mut generators = Vec::from_iter(
(0..cmp::min(initial_width.get(), 10))
.map(|it| hash::from_ideal_slot_ix(it, initial_width).unique()),
);
let hashes_in_first_10 = generators.iter_mut().flatten();
let reference = HashMap::from_iter(iter::zip(hashes_in_first_10, values));
do_hashmap_of_hashes(reference)
}
#[quickcheck_macros::quickcheck]
fn header(it: V1Header) {
round_trip(&it);
}
#[quickcheck_macros::quickcheck]
fn slot(it: Slot) {
round_trip(&it);
}
#[quickcheck_macros::quickcheck]
fn raw_slot(it: RawSlot) {
round_trip(&it);
}
#[track_caller]
fn round_trip<T: PartialEq + std::fmt::Debug + Readable + Writable>(original: &T) {
let serialized = write_to_vec(|v| block_on(original.write_to(v)));
assert_eq!(
serialized.len(),
usize::try_from(written_len(original)).unwrap()
);
let deserialized = T::read_from(serialized.as_slice())
.expect("couldn't deserialize T from a deserialized T");
pretty_assertions::assert_eq!(original, &deserialized);
}
pub fn write_to_vec(f: impl FnOnce(&mut Vec<u8>) -> io::Result<()>) -> Vec<u8> {
let mut v = Vec::new();
f(&mut v).unwrap();
v
}
}