use dragonfly_api::dfdaemon::v2::DownloadTaskRequest;
use dragonfly_client_core::{Error as ClientError, Result as ClientResult};
use dragonfly_client_metric::{
collect_prefetch_task_failure_metrics, collect_prefetch_task_started_metrics,
};
use std::path::PathBuf;
use std::time::Duration;
use tonic::Request;
use tracing::{debug, error, info, instrument, Instrument};
pub mod block_list;
pub mod dfdaemon_download;
pub mod dfdaemon_upload;
pub mod health;
pub mod interceptor;
pub mod manager;
pub mod middleware;
pub mod scheduler;
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
pub const TCP_KEEPALIVE: Duration = Duration::from_secs(3600);
pub const HTTP2_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(300);
pub const HTTP2_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
pub const MAX_FRAME_SIZE: u32 = 4 * 1024 * 1024;
pub const BUFFER_SIZE: usize = 512 * 1024;
pub const INITIAL_WINDOW_SIZE: u32 = 1024 * 1024;
#[instrument(skip_all)]
pub async fn prefetch_task(
socket_path: PathBuf,
request: Request<DownloadTaskRequest>,
) -> ClientResult<()> {
let dfdaemon_download_client =
dfdaemon_download::DfdaemonDownloadClient::new_unix(socket_path.clone()).await?;
let mut request = request.into_inner();
let Some(download) = request.download.as_mut() else {
error!("request download is missing");
return Err(ClientError::InvalidParameter);
};
download.range = None;
download.prefetch = false;
download.is_prefetch = true;
download
.request_header
.remove(reqwest::header::RANGE.as_str());
let task_type = download.r#type;
let tag = download.tag.clone();
let application = download.application.clone();
let priority = download.priority;
let response = dfdaemon_download_client
.download_task(request)
.await
.inspect_err(|err| {
error!("prefetch task failed: {}", err);
})?;
collect_prefetch_task_started_metrics(
task_type,
tag.clone().unwrap_or_default().as_str(),
application.clone().unwrap_or_default().as_str(),
priority.to_string().as_str(),
);
tokio::spawn(
async move {
let mut out_stream = response.into_inner();
loop {
match out_stream.message().await {
Ok(Some(_)) => debug!("prefetch piece finished"),
Ok(None) => {
info!("prefetch task finished");
return;
}
Err(err) => {
collect_prefetch_task_failure_metrics(
task_type,
tag.clone().unwrap_or_default().as_str(),
application.clone().unwrap_or_default().as_str(),
priority.to_string().as_str(),
);
error!("prefetch piece failed: {}", err);
return;
}
}
}
}
.in_current_span(),
);
Ok(())
}