use anyhow::{Result, anyhow};
use async_trait::async_trait;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt};
use crate::http::HttpReader;
use crate::payload::payload_dumper::{AsyncPayloadRead, PayloadReader, ProgressReporter};
use crate::readers::local_reader::LocalAsyncPayloadReader;
use crate::structs::PartitionUpdate;
#[derive(Debug, Clone)]
pub struct PartitionExtractionConfig {
pub data_offset: u64,
pub block_size: u64,
pub payload_offset: u64,
}
pub struct ExtractionPaths {
pub temp_path: PathBuf,
pub output_path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct PartitionDataRange {
pub min_offset: u64,
pub total_bytes: u64,
}
pub fn calculate_partition_range(
partition: &PartitionUpdate,
data_offset: u64,
) -> Option<PartitionDataRange> {
let mut min_offset = u64::MAX;
let mut max_offset = 0u64;
let mut ops_with_data = 0;
for op in &partition.operations {
if let (Some(offset), Some(length)) = (op.data_offset, op.data_length)
&& length > 0
{
let abs_offset = data_offset + offset;
let end_offset = abs_offset + length;
min_offset = min_offset.min(abs_offset);
max_offset = max_offset.max(end_offset);
ops_with_data += 1;
}
}
if ops_with_data == 0 || min_offset == u64::MAX {
return None;
}
Some(PartitionDataRange {
min_offset,
total_bytes: max_offset - min_offset,
})
}
#[async_trait]
pub trait DownloadProgressReporter: Send + Sync {
fn on_download_start(&self, partition_name: &str, total_bytes: u64);
fn on_download_progress(&self, partition_name: &str, downloaded: u64, total: u64);
fn on_download_complete(&self, partition_name: &str, total_bytes: u64);
}
pub struct NoOpDownloadReporter;
impl DownloadProgressReporter for NoOpDownloadReporter {
fn on_download_start(&self, _: &str, _: u64) {}
fn on_download_progress(&self, _: &str, _: u64, _: u64) {}
fn on_download_complete(&self, _: &str, _: u64) {}
}
async fn download_partition_data(
http_reader: &HttpReader,
range: &PartitionDataRange,
temp_path: &PathBuf,
partition_name: &str,
reporter: &dyn DownloadProgressReporter,
payload_offset: u64,
) -> Result<()> {
reporter.on_download_start(partition_name, range.total_bytes);
let mut file = File::create(temp_path).await?;
const BUFFER_SIZE: usize = 256 * 1024; let mut buffer = vec![0u8; BUFFER_SIZE];
let mut downloaded = 0u64;
let total = range.total_bytes;
let mut current_offset = range.min_offset + payload_offset;
while downloaded < total {
let remaining = total - downloaded;
let chunk_size = remaining.min(BUFFER_SIZE as u64) as usize;
http_reader
.read_at(current_offset, &mut buffer[..chunk_size])
.await?;
file.write_all(&buffer[..chunk_size]).await?;
downloaded += chunk_size as u64;
current_offset += chunk_size as u64;
reporter.on_download_progress(partition_name, downloaded, total);
}
file.flush().await?;
drop(file);
reporter.on_download_complete(partition_name, total);
Ok(())
}
struct OffsetTranslatingReader {
inner: LocalAsyncPayloadReader,
base_offset: u64,
}
impl OffsetTranslatingReader {
async fn new(path: PathBuf, base_offset: u64) -> Result<Self> {
let inner = LocalAsyncPayloadReader::new(path).await?;
Ok(Self { inner, base_offset })
}
}
#[async_trait]
impl AsyncPayloadRead for OffsetTranslatingReader {
async fn open_reader(&self) -> Result<Box<dyn PayloadReader>> {
let inner_reader = self.inner.open_reader().await?;
Ok(Box::new(OffsetTranslatingPayloadReader {
inner: inner_reader,
base_offset: self.base_offset,
}))
}
}
struct OffsetTranslatingPayloadReader {
inner: Box<dyn PayloadReader>,
base_offset: u64,
}
#[async_trait]
impl PayloadReader for OffsetTranslatingPayloadReader {
async fn read_range(
&mut self,
offset: u64,
length: u64,
) -> Result<Pin<Box<dyn AsyncRead + Send + '_>>> {
if offset < self.base_offset {
return Err(anyhow!(
"Offset {} is before base offset {}",
offset,
self.base_offset
));
}
let relative_offset = offset - self.base_offset;
self.inner.read_range(relative_offset, length).await
}
}
pub async fn prefetch_and_dump_partition<D, E>(
partition: &PartitionUpdate,
config: &PartitionExtractionConfig,
http_reader: &HttpReader,
paths: ExtractionPaths,
download_reporter: &D,
extract_reporter: &E,
) -> Result<()>
where
D: DownloadProgressReporter,
E: ProgressReporter,
{
let partition_name = &partition.partition_name;
let range = calculate_partition_range(partition, config.data_offset)
.ok_or_else(|| anyhow!("Partition {} has no data to extract", partition_name))?;
download_partition_data(
http_reader,
&range,
&paths.temp_path,
partition_name,
download_reporter,
config.payload_offset,
)
.await?;
let reader = OffsetTranslatingReader::new(paths.temp_path, range.min_offset).await?;
crate::payload::payload_dumper::dump_partition(
partition,
config.data_offset,
config.block_size,
paths.output_path,
&reader,
extract_reporter,
)
.await?;
Ok(())
}