use futures::StreamExt;
use super::{shuffler::Shuffler, Ivf};
use crate::dataset::ROW_ID;
use crate::index::vector::ivf::PQ_CODE_COLUMN;
use crate::io::object_writer::ObjectWriter;
use crate::Result;
pub(super) async fn write_index_partitions(
writer: &mut ObjectWriter,
ivf: &mut Ivf,
shuffler: &Shuffler<'_>,
) -> Result<()> {
for part_id in 0..ivf.num_partitions() as u32 {
if let Some(mut stream) = shuffler.key_iter(part_id).await? {
let mut pq_array = vec![];
let mut row_id_array = vec![];
while let Some(batch) = stream.next().await {
let batch = batch?;
let arr = batch.column_by_name(PQ_CODE_COLUMN).unwrap();
pq_array.push(arr.clone());
let arr = batch.column_by_name(ROW_ID).unwrap();
row_id_array.push(arr.clone());
}
let total_records = row_id_array.iter().map(|a| a.len()).sum::<usize>();
ivf.add_partition(writer.tell(), total_records as u32);
let pq_refs = pq_array.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
writer.write_plain_encoded_array(pq_refs.as_slice()).await?;
let row_ids_refs = row_id_array.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
writer
.write_plain_encoded_array(row_ids_refs.as_slice())
.await?;
} else {
ivf.add_partition(writer.tell(), 0);
}
}
Ok(())
}