use super::DataReaderTrait;
use crate::{Blob, ByteRange};
use anyhow::Result;
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
#[async_trait]
pub(crate) trait NetworkReader: DataReaderTrait {
async fn try_read_range(&self, range: &ByteRange) -> Result<Blob>;
fn max_request_bytes(&self) -> &AtomicU64;
async fn network_read_range(&self, range: &ByteRange) -> Result<Blob> {
if range.length > self.max_request_bytes().load(Ordering::Relaxed) && range.length > 1 {
log::trace!(
"proactively splitting range {range} ({} bytes) based on previous failures",
range.length
);
return self.split_and_read(range).await;
}
match self.try_read_range(range).await {
Ok(blob) => Ok(blob),
Err(e) if range.length <= 1 => Err(e),
Err(e) => {
self.max_request_bytes().fetch_min(range.length / 2, Ordering::Relaxed);
log::debug!(
"splitting failed range {range} ({} bytes) into two halves: {e}",
range.length
);
self.split_and_read(range).await
}
}
}
async fn split_and_read(&self, range: &ByteRange) -> Result<Blob> {
let mid = range.offset + range.length / 2;
let left = ByteRange::new(range.offset, mid - range.offset);
let right = ByteRange::new(mid, range.offset + range.length - mid);
let blob_left = self.read_range(&left).await?;
let blob_right = self.read_range(&right).await?;
let mut data = blob_left.into_vec();
data.extend_from_slice(blob_right.as_slice());
Ok(Blob::from(data))
}
}