use crate::api::sequential_chunk_consumer::SequentialChunkConsumer;
use crate::task::TaskStats;
use anyhow::bail;
use std::cmp::max;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::sync::{OwnedSemaphorePermit, mpsc};
use tracing::error;
pub async fn reorder_chunks(
mut chunk_rx: mpsc::UnboundedReceiver<(u64, Vec<u8>, OwnedSemaphorePermit)>,
mut output: Box<dyn SequentialChunkConsumer>,
task_stats: Arc<TaskStats>,
) -> anyhow::Result<()> {
let mut next_offset = 0;
let mut pending_chunks: HashMap<u64, (Vec<u8>, OwnedSemaphorePermit)> = HashMap::new();
let mut err = None;
let mut furthest_offset: u64 = 0;
loop {
if let Some((offset, chunk, permit)) = chunk_rx.recv().await {
furthest_offset = max(offset + chunk.len() as u64, furthest_offset);
task_stats
.bytes_downloaded
.fetch_add(chunk.len() as u64, Ordering::SeqCst);
if offset == next_offset {
let len = chunk.len() as u64;
if let Err(e) = output.consume_bytes(chunk).await {
error!("Failed to write to output: {e}");
err = Some(e);
break;
}
next_offset += len;
while let Some((chunk, permit)) = pending_chunks.remove(&next_offset) {
let len = chunk.len() as u64;
if let Err(e) = output.consume_bytes(chunk).await {
error!("Failed to write to output: {e}");
err = Some(e);
break;
}
drop(permit);
next_offset += len;
}
if let Some(cached_bytes) = furthest_offset.checked_sub(next_offset) {
task_stats
.cached_bytes
.store(cached_bytes, Ordering::SeqCst);
} else {
bail!("next_offset={next_offset} ahead of furthest_offset={furthest_offset}");
}
} else if offset > next_offset {
if pending_chunks.insert(offset, (chunk, permit)).is_some() {
bail!("received duplicate chunk at offset {offset}");
}
} else {
bail!(
"Received chunk with offset={offset} when we've already moved on. next_offset={next_offset}"
);
}
} else {
break;
}
}
if pending_chunks.is_empty() && err.is_none() {
output.finalise().await;
Ok(())
} else if let Some(e) = err {
output.on_failure().await;
Err(e)
} else if !pending_chunks.is_empty() {
bail!("pending_chunks.len={} expected zero", pending_chunks.len());
} else {
bail!("Unknown error.");
}
}