use std::cmp::Reverse;
use super::{BLOCK_VSIZE, package::Package};
const LOOK_AHEAD_COUNT: usize = 100;
pub fn partition_into_blocks(
mut packages: Vec<Package>,
num_blocks: usize,
) -> Vec<Vec<Package>> {
packages.sort_by_key(|p| Reverse(p.fee_rate));
let num_clusters = packages
.iter()
.map(|p| p.cluster_id as usize + 1)
.max()
.unwrap_or(0);
let mut cluster_next: Vec<u32> = vec![0; num_clusters];
let mut slots: Vec<Option<Package>> = packages.into_iter().map(Some).collect();
let mut blocks: Vec<Vec<Package>> = Vec::with_capacity(num_blocks);
let normal_blocks = num_blocks.saturating_sub(1);
let idx = fill_normal_blocks(&mut slots, &mut blocks, normal_blocks, &mut cluster_next);
if blocks.len() < num_blocks {
let overflow: Vec<Package> = slots[idx..].iter_mut().filter_map(Option::take).collect();
if !overflow.is_empty() {
blocks.push(overflow);
}
}
blocks
}
fn fill_normal_blocks(
slots: &mut [Option<Package>],
blocks: &mut Vec<Vec<Package>>,
target_blocks: usize,
cluster_next: &mut [u32],
) -> usize {
let mut current_block: Vec<Package> = Vec::new();
let mut current_vsize: u64 = 0;
let mut idx = 0;
while idx < slots.len() && blocks.len() < target_blocks {
let Some(pkg) = &slots[idx] else {
idx += 1;
continue;
};
let remaining_space = BLOCK_VSIZE.saturating_sub(current_vsize);
if pkg.vsize <= remaining_space {
take(slots, idx, &mut current_block, &mut current_vsize, cluster_next);
idx += 1;
continue;
}
if current_block.is_empty() {
take(slots, idx, &mut current_block, &mut current_vsize, cluster_next);
idx += 1;
continue;
}
if try_fill_with_smaller(
slots,
idx,
remaining_space,
&mut current_block,
&mut current_vsize,
cluster_next,
) {
continue;
}
blocks.push(std::mem::take(&mut current_block));
current_vsize = 0;
}
if !current_block.is_empty() && blocks.len() < target_blocks {
blocks.push(current_block);
}
idx
}
fn try_fill_with_smaller(
slots: &mut [Option<Package>],
start: usize,
remaining_space: u64,
block: &mut Vec<Package>,
block_vsize: &mut u64,
cluster_next: &mut [u32],
) -> bool {
let end = (start + LOOK_AHEAD_COUNT).min(slots.len());
for idx in (start + 1)..end {
let Some(pkg) = &slots[idx] else { continue };
if pkg.vsize > remaining_space {
continue;
}
if pkg.chunk_order != cluster_next[pkg.cluster_id as usize] {
continue;
}
take(slots, idx, block, block_vsize, cluster_next);
return true;
}
false
}
fn take(
slots: &mut [Option<Package>],
idx: usize,
block: &mut Vec<Package>,
block_vsize: &mut u64,
cluster_next: &mut [u32],
) {
let pkg = slots[idx].take().unwrap();
debug_assert_eq!(
pkg.chunk_order, cluster_next[pkg.cluster_id as usize],
"partitioner took a chunk out of cluster order"
);
cluster_next[pkg.cluster_id as usize] = pkg.chunk_order + 1;
*block_vsize += pkg.vsize;
block.push(pkg);
}