use bytes::Bytes;
use crate::chunk::ChunkAddress;
use super::error::Result;
use super::mode::JoinMode;
use super::tree::ChunkRange;
use crate::store::SyncChunkGet;
pub(crate) struct SubtreeNode<M: JoinMode> {
pub(crate) addr: ChunkAddress,
pub(crate) context: M::JoinerContext,
pub(crate) span: u64,
pub(crate) byte_offset: u64,
}
impl<M: JoinMode> Clone for SubtreeNode<M> {
fn clone(&self) -> Self {
Self {
addr: self.addr,
context: self.context.clone(),
span: self.span,
byte_offset: self.byte_offset,
}
}
}
#[inline]
pub(crate) fn overlapping_children<M, const BS: usize>(
body: &[u8],
parent: &SubtreeNode<M>,
chunk_range: &ChunkRange,
) -> Result<Vec<SubtreeNode<M>>>
where
M: JoinMode,
{
let subspan = M::subspan_size::<BS>(parent.span);
let num_children = body.len() / M::REF_SIZE;
let range_start = chunk_range.start * BS as u64;
let range_end = chunk_range.end * BS as u64;
let mut children = Vec::with_capacity(num_children);
for i in 0..num_children {
let byte_offset = parent.byte_offset + i as u64 * subspan;
let span = M::child_span::<BS>(parent.span, subspan, i);
if byte_offset >= range_end || byte_offset + span <= range_start {
continue;
}
let ref_start = i * M::REF_SIZE;
let (addr, context) = M::parse_child_ref(body, ref_start)?;
children.push(SubtreeNode {
addr,
context,
span,
byte_offset,
});
}
Ok(children)
}
fn frontier_seed<M: JoinMode>(
root: &ChunkAddress,
context: &M::JoinerContext,
span: u64,
) -> SubtreeNode<M> {
SubtreeNode {
addr: *root,
context: context.clone(),
span,
byte_offset: 0,
}
}
struct BfsExpander<M: JoinMode, const BS: usize> {
frontier: Vec<SubtreeNode<M>>,
next: Vec<SubtreeNode<M>>,
ideal_span: u64,
chunk_range: ChunkRange,
}
impl<M: JoinMode, const BS: usize> BfsExpander<M, BS> {
fn new(
root: &ChunkAddress,
context: &M::JoinerContext,
span: u64,
chunk_range: &ChunkRange,
target_subtrees: usize,
) -> Option<Self> {
if span <= BS as u64 {
return None;
}
Some(Self {
frontier: vec![frontier_seed::<M>(root, context, span)],
next: Vec::new(),
ideal_span: span / target_subtrees as u64,
chunk_range: *chunk_range,
})
}
fn nodes_to_expand(&self) -> Vec<usize> {
self.frontier
.iter()
.enumerate()
.filter(|(_, n)| n.span > self.ideal_span && n.span > BS as u64)
.map(|(i, _)| i)
.collect()
}
fn advance(&mut self, expand_indices: &[usize], bodies: &[Bytes]) -> Result<bool> {
if expand_indices.is_empty() {
return Ok(false);
}
self.next.clear();
let mut body_idx = 0;
for (i, node) in self.frontier.iter().enumerate() {
if body_idx < expand_indices.len() && expand_indices[body_idx] == i {
self.next.extend(overlapping_children::<M, BS>(
&bodies[body_idx],
node,
&self.chunk_range,
)?);
body_idx += 1;
} else {
self.next.push(node.clone());
}
}
std::mem::swap(&mut self.frontier, &mut self.next);
Ok(true)
}
fn into_frontier(self) -> Vec<SubtreeNode<M>> {
self.frontier
}
}
pub(crate) fn expand_frontier<G, M, const BS: usize>(
getter: &G,
root: &ChunkAddress,
context: &M::JoinerContext,
span: u64,
chunk_range: &ChunkRange,
target_subtrees: usize,
) -> Result<Vec<SubtreeNode<M>>>
where
G: SyncChunkGet<BS>,
M: JoinMode,
{
if chunk_range.is_empty() {
return Ok(Vec::new());
}
let Some(mut bfs) =
BfsExpander::<M, BS>::new(root, context, span, chunk_range, target_subtrees)
else {
return Ok(vec![frontier_seed::<M>(root, context, span)]);
};
loop {
let indices = bfs.nodes_to_expand();
let mut bodies = Vec::with_capacity(indices.len());
for &i in &indices {
let n = &bfs.frontier[i];
bodies.push(super::mode::read_chunk_body::<M, G, BS>(
getter, &n.addr, &n.context, n.span,
)?);
}
if !bfs.advance(&indices, &bodies)? {
break;
}
}
Ok(bfs.into_frontier())
}
pub(crate) fn read_subtree_bodies<G, M, const BS: usize>(
getter: &G,
node: &SubtreeNode<M>,
chunk_range: &ChunkRange,
out: &mut Vec<Bytes>,
) -> Result<()>
where
G: SyncChunkGet<BS>,
M: JoinMode,
{
let body =
super::mode::read_chunk_body::<M, G, BS>(getter, &node.addr, &node.context, node.span)?;
if node.span <= BS as u64 {
out.push(body);
return Ok(());
}
for child in overlapping_children::<M, BS>(&body, node, chunk_range)? {
read_subtree_bodies::<G, M, BS>(getter, &child, chunk_range, out)?;
}
Ok(())
}
pub(crate) async fn expand_frontier_async<G, M, const BS: usize>(
getter: &G,
root: &ChunkAddress,
context: &M::JoinerContext,
span: u64,
chunk_range: &ChunkRange,
target_subtrees: usize,
) -> Result<Vec<SubtreeNode<M>>>
where
G: crate::store::ChunkGet<BS>,
M: JoinMode + Send + Sync,
{
use futures::stream::{self, StreamExt};
if chunk_range.is_empty() {
return Ok(Vec::new());
}
let Some(mut bfs) =
BfsExpander::<M, BS>::new(root, context, span, chunk_range, target_subtrees)
else {
return Ok(vec![frontier_seed::<M>(root, context, span)]);
};
loop {
let indices = bfs.nodes_to_expand();
if indices.is_empty() {
break;
}
let futs: Vec<_> = indices
.iter()
.map(|&i| {
let n = &bfs.frontier[i];
super::mode::read_chunk_body_async::<M, G, BS>(getter, &n.addr, &n.context, n.span)
})
.collect();
let bodies: Vec<Bytes> = stream::iter(futs)
.buffered(indices.len())
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
if !bfs.advance(&indices, &bodies)? {
break;
}
}
Ok(bfs.into_frontier())
}
pub(crate) async fn read_subtree_bodies_async<G, M, const BS: usize>(
getter: &G,
node: &SubtreeNode<M>,
chunk_range: &ChunkRange,
) -> Result<Vec<Bytes>>
where
G: crate::store::ChunkGet<BS>,
M: JoinMode + Send + Sync,
{
let mut out = Vec::new();
let mut stack = vec![node.clone()];
while let Some(current) = stack.pop() {
let body = super::mode::read_chunk_body_async::<M, G, BS>(
getter,
¤t.addr,
¤t.context,
current.span,
)
.await?;
if current.span <= BS as u64 {
out.push(body);
} else {
let children = overlapping_children::<M, BS>(&body, ¤t, chunk_range)?;
stack.extend(children.into_iter().rev());
}
}
Ok(out)
}