#![allow(dead_code)]
use crate::RegisterPlugin;
use crate::Result;
use crate::config::types::PluginConfig;
use crate::plugin::{Context, Plugin};
use async_trait::async_trait;
use serde_yaml::Value;
use std::any::Any;
use std::fs;
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
#[derive(Debug, Clone)]
pub(crate) struct FileDownloadSpec {
url: String,
path: String,
}
#[derive(Debug, RegisterPlugin)]
pub struct DownloaderPlugin {
files: Vec<FileDownloadSpec>,
timeout_secs: u64,
concurrent: bool,
max_retries: usize,
retry_delay_secs: u64,
}
impl DownloaderPlugin {
pub(crate) fn new(
files: Vec<FileDownloadSpec>,
timeout_secs: u64,
concurrent: bool,
max_retries: usize,
retry_delay_secs: u64,
) -> Self {
Self {
files,
timeout_secs,
concurrent,
max_retries,
retry_delay_secs,
}
}
async fn download_files(&self) -> Result<()> {
if self.files.is_empty() {
warn!("No files configured for download");
return Ok(());
}
info!(
count = self.files.len(),
concurrent = self.concurrent,
timeout_secs = self.timeout_secs,
"Downloader: Starting file downloads"
);
let start = std::time::Instant::now();
let result = if self.concurrent {
self.download_concurrent().await
} else {
self.download_sequential().await
};
let duration = start.elapsed();
match result {
Ok(()) => {
info!(
count = self.files.len(),
duration_ms = duration.as_millis(),
"Downloader: All files downloaded successfully"
);
Ok(())
}
Err(e) => {
warn!(
count = self.files.len(),
duration_ms = duration.as_millis(),
error = %e,
"Downloader: File download failed"
);
Err(e)
}
}
}
async fn download_sequential(&self) -> Result<()> {
info!(
count = self.files.len(),
"Downloader: Starting sequential downloads"
);
for (idx, spec) in self.files.iter().enumerate() {
debug!(idx = idx, file_count = self.files.len(), url = %spec.url, "Downloader: Downloading file");
self.download_single(spec).await?;
}
info!("Downloader: Sequential downloads completed");
Ok(())
}
async fn download_concurrent(&self) -> Result<()> {
info!(
count = self.files.len(),
"Downloader: Starting concurrent downloads"
);
let timeout = self.timeout_secs;
let max_retries = self.max_retries;
let retry_delay_secs = self.retry_delay_secs;
let handles: Vec<_> = self
.files
.iter()
.enumerate()
.map(move |(idx, spec)| {
let spec_clone = spec.clone();
let timeout = timeout;
let max_retries = max_retries;
let retry_delay_secs = retry_delay_secs;
tokio::spawn(async move {
debug!(idx = idx, url = %spec_clone.url, "Downloader: Downloading file concurrently");
Self::download_single_with_retries(&spec_clone, timeout, max_retries, retry_delay_secs).await
})
})
.collect();
debug!(
task_count = handles.len(),
"Downloader: Waiting for {} concurrent download tasks",
handles.len()
);
for (idx, handle) in handles.into_iter().enumerate() {
handle.await.map_err(|e| {
warn!(idx = idx, error = %e, "Downloader: Download task panicked");
crate::Error::Config(format!("Download task {} failed: {}", idx, e))
})??;
}
info!("Downloader: Concurrent downloads completed");
Ok(())
}
async fn download_single_with_retries(
spec: &FileDownloadSpec,
timeout_secs: u64,
max_retries: usize,
retry_delay_secs: u64,
) -> Result<()> {
let mut last_err: Option<crate::Error> = None;
for attempt in 0..=max_retries {
let start = std::time::Instant::now();
let client = reqwest::Client::new();
debug!(url = %spec.url, path = %spec.path, attempt = attempt, "Downloader: Connecting to URL");
let resp_res = client
.get(&spec.url)
.timeout(std::time::Duration::from_secs(timeout_secs))
.send()
.await;
match resp_res {
Ok(resp) => {
debug!(url = %spec.url, status = %resp.status(), "Downloader: Received response");
if !resp.status().is_success() {
warn!(url = %spec.url, status = %resp.status(), "Downloader: HTTP error");
last_err = Some(crate::Error::Config(format!(
"HTTP {} for {}",
resp.status(),
spec.url
)));
} else {
let content_res = resp.text().await;
match content_res {
Ok(content) => {
trace!(url = %spec.url, content_len = content.len(), "Downloader: Response received");
if content.is_empty() {
warn!(url = %spec.url, "Downloader: Downloaded file is empty");
last_err = Some(crate::Error::Config(
"Downloaded file is empty".to_string(),
));
} else {
let temp_path = format!("{}.tmp", spec.path);
trace!(path = %spec.path, temp_path = %temp_path, "Downloader: Writing temporary file");
fs::write(&temp_path, &content).map_err(|e| {
warn!(temp_path = %temp_path, error = %e, "Downloader: Failed to write temp file");
crate::Error::Config(format!("Failed to write temp file: {}", e))
})?;
trace!(temp_path = %temp_path, final_path = %spec.path, "Downloader: Moving temporary file to final path");
fs::rename(&temp_path, &spec.path).map_err(|e| {
let _ = fs::remove_file(&temp_path);
warn!(temp_path = %temp_path, final_path = %spec.path, error = %e, "Downloader: Failed to move file to final path");
crate::Error::Config(format!("Failed to move file to {}: {}", spec.path, e))
})?;
let duration = start.elapsed();
let size_bytes = content.len();
info!(
url = %spec.url,
path = %spec.path,
size_bytes = size_bytes,
duration_ms = duration.as_millis(),
"Downloader: File downloaded successfully"
);
return Ok(());
}
}
Err(e) => {
warn!(url = %spec.url, error = %e, "Downloader: Failed to read response body");
last_err = Some(crate::Error::Config(format!(
"Failed to read response body: {}",
e
)));
}
}
}
}
Err(e) => {
warn!(url = %spec.url, error = %e, "Downloader: Failed to download file");
last_err = Some(crate::Error::Config(format!(
"Failed to download {}: {}",
spec.url, e
)));
}
}
if attempt == max_retries {
break;
}
let backoff = retry_delay_secs.saturating_mul(1_u64 << attempt);
debug!(url = %spec.url, attempt = attempt, backoff_secs = backoff, "Downloader: Retrying after backoff");
tokio::time::sleep(std::time::Duration::from_secs(backoff)).await;
}
Err(last_err
.unwrap_or_else(|| crate::Error::Config(format!("Failed to download {}", spec.url))))
}
async fn download_single(&self, spec: &FileDownloadSpec) -> Result<()> {
Self::download_single_with_retries(
spec,
self.timeout_secs,
self.max_retries,
self.retry_delay_secs,
)
.await
}
}
impl Default for DownloaderPlugin {
fn default() -> Self {
Self {
files: Vec::new(),
timeout_secs: 30,
concurrent: false,
max_retries: 3,
retry_delay_secs: 2,
}
}
}
#[async_trait]
impl Plugin for DownloaderPlugin {
fn name(&self) -> &str {
"downloader"
}
async fn execute(&self, _ctx: &mut Context) -> Result<()> {
self.download_files().await
}
fn as_any(&self) -> &dyn Any {
self
}
fn init(config: &PluginConfig) -> Result<Arc<dyn Plugin>> {
let args = config.effective_args();
let mut files = Vec::new();
if let Some(files_val) = args.get("files") {
match files_val {
Value::Sequence(seq) => {
for item in seq {
if let Value::Mapping(map) = item {
let url = map
.get(Value::String("url".to_string()))
.and_then(|v| v.as_str())
.unwrap_or("");
let path = map
.get(Value::String("path".to_string()))
.and_then(|v| v.as_str())
.unwrap_or("");
if !url.is_empty() && !path.is_empty() {
files.push(FileDownloadSpec {
url: url.to_string(),
path: path.to_string(),
});
} else {
warn!("Skipping file entry with missing url or path");
}
}
}
}
_ => {
return Err(crate::Error::Config(
"files must be a sequence of {url, path} objects".to_string(),
));
}
}
}
if files.is_empty() {
return Err(crate::Error::Config(
"Downloader plugin requires at least one file to download".to_string(),
));
}
let timeout_secs = args
.get("timeout_secs")
.and_then(|v| v.as_u64())
.unwrap_or(30);
let concurrent = args
.get("concurrent")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let max_retries = args
.get("max_retries")
.and_then(|v| v.as_u64())
.unwrap_or(3) as usize;
let retry_delay_secs = args
.get("retry_delay_secs")
.and_then(|v| v.as_u64())
.unwrap_or(2);
info!(
file_count = files.len(),
timeout_secs = timeout_secs,
concurrent = concurrent,
max_retries = max_retries,
retry_delay_secs = retry_delay_secs,
"Downloader plugin initialized"
);
Ok(Arc::new(DownloaderPlugin::new(
files,
timeout_secs,
concurrent,
max_retries,
retry_delay_secs,
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::dns::Message;
#[test]
fn test_downloader_plugin_creation() {
let spec = FileDownloadSpec {
url: "https://example.com/test.txt".to_string(),
path: "test.txt".to_string(),
};
let plugin = DownloaderPlugin::new(vec![spec], 30, false, 3, 2);
assert_eq!(plugin.name(), "downloader");
assert_eq!(plugin.files.len(), 1);
}
#[test]
fn test_downloader_plugin_default() {
let plugin = DownloaderPlugin::default();
assert_eq!(plugin.timeout_secs, 30);
assert!(!plugin.concurrent);
assert!(plugin.files.is_empty());
assert_eq!(plugin.max_retries, 3);
assert_eq!(plugin.retry_delay_secs, 2);
}
#[tokio::test]
async fn test_downloader_plugin_execute() {
let spec = FileDownloadSpec {
url: "https://example.com/test.txt".to_string(),
path: "test.txt".to_string(),
};
let plugin = DownloaderPlugin::new(vec![spec], 30, false, 3, 2);
let mut ctx = Context::new(Message::new());
let result = plugin.execute(&mut ctx).await;
assert!(result.is_err());
}
#[test]
fn test_downloader_plugin_init() {
let mut config = PluginConfig::new("downloader".to_string());
let mut files_map = serde_yaml::Mapping::new();
files_map.insert(
Value::String("url".to_string()),
Value::String("https://example.com/test.txt".to_string()),
);
files_map.insert(
Value::String("path".to_string()),
Value::String("test.txt".to_string()),
);
let files = Value::Sequence(vec![Value::Mapping(files_map)]);
let mut args = serde_yaml::Mapping::new();
args.insert(Value::String("files".to_string()), files);
args.insert(
Value::String("timeout_secs".to_string()),
Value::Number(serde_yaml::Number::from(60u64)),
);
args.insert(Value::String("concurrent".to_string()), Value::Bool(true));
args.insert(
Value::String("max_retries".to_string()),
Value::Number(serde_yaml::Number::from(5u64)),
);
args.insert(
Value::String("retry_delay_secs".to_string()),
Value::Number(serde_yaml::Number::from(7u64)),
);
config.args = Value::Mapping(args);
let result = DownloaderPlugin::init(&config);
assert!(result.is_ok());
if let Ok(plugin) = result {
let downloader = plugin.as_any().downcast_ref::<DownloaderPlugin>();
assert!(downloader.is_some());
if let Some(downloader) = downloader {
assert_eq!(downloader.timeout_secs, 60);
assert!(downloader.concurrent);
assert_eq!(downloader.max_retries, 5);
assert_eq!(downloader.retry_delay_secs, 7);
}
}
}
#[test]
fn test_downloader_missing_url() {
let mut config = PluginConfig::new("downloader".to_string());
let mut files_map = serde_yaml::Mapping::new();
files_map.insert(
Value::String("path".to_string()),
Value::String("test.txt".to_string()),
);
let files = Value::Sequence(vec![Value::Mapping(files_map)]);
let mut args = serde_yaml::Mapping::new();
args.insert(Value::String("files".to_string()), files);
config.args = Value::Mapping(args);
let result = DownloaderPlugin::init(&config);
assert!(result.is_err());
}
}