mod background_tasks;
mod config_ops;
mod control;
pub(crate) mod direct_unpack;
mod download_task;
mod lifecycle;
mod nzb;
mod post_process;
mod queue;
mod queue_processor;
mod rss;
mod server;
mod services;
mod tasks;
mod webhooks;
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
pub(crate) mod test_helpers;
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests;
pub use webhooks::{TriggerScriptsParams, TriggerWebhooksParams};
use crate::config::Config;
use crate::db::Database;
use crate::error::{Error, Result};
use crate::parity::{CliParityHandler, NoOpParityHandler, ParityHandler};
use crate::post_processing;
use crate::speed_limiter;
use crate::types::{DownloadId, Priority};
#[derive(Clone)]
pub(crate) struct QueueState {
pub(crate) queue:
std::sync::Arc<tokio::sync::Mutex<std::collections::BinaryHeap<QueuedDownload>>>,
pub(crate) concurrent_limit: std::sync::Arc<tokio::sync::Semaphore>,
pub(crate) active_downloads: std::sync::Arc<
tokio::sync::Mutex<
std::collections::HashMap<DownloadId, tokio_util::sync::CancellationToken>,
>,
>,
pub(crate) accepting_new: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
#[derive(Clone)]
pub(crate) struct RuntimeConfig {
pub(crate) categories: std::sync::Arc<
tokio::sync::RwLock<std::collections::HashMap<String, crate::config::CategoryConfig>>,
>,
pub(crate) schedule_rules:
std::sync::Arc<tokio::sync::RwLock<Vec<crate::config::ScheduleRule>>>,
pub(crate) next_schedule_rule_id: std::sync::Arc<std::sync::atomic::AtomicI64>,
}
#[derive(Clone)]
pub(crate) struct ProcessingPipeline {
pub(crate) post_processor: std::sync::Arc<post_processing::PostProcessor>,
pub(crate) parity_handler: std::sync::Arc<dyn ParityHandler>,
}
#[derive(Clone)]
pub struct UsenetDownloader {
pub db: std::sync::Arc<Database>,
pub(crate) event_tx: tokio::sync::broadcast::Sender<crate::types::Event>,
pub(crate) config: std::sync::Arc<Config>,
pub(crate) nntp_pools: std::sync::Arc<Vec<nntp_rs::NntpPool>>,
pub(crate) speed_limiter: speed_limiter::SpeedLimiter,
pub(crate) queue_state: QueueState,
pub(crate) runtime_config: RuntimeConfig,
pub(crate) processing: ProcessingPipeline,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct QueuedDownload {
pub(crate) id: DownloadId,
pub(crate) priority: Priority,
pub(crate) created_at: i64, }
impl Ord for QueuedDownload {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.priority.cmp(&other.priority) {
std::cmp::Ordering::Equal => {
other.created_at.cmp(&self.created_at)
}
ordering => ordering,
}
}
}
impl PartialOrd for QueuedDownload {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl UsenetDownloader {
pub async fn new(config: Config) -> Result<Self> {
tokio::fs::create_dir_all(&config.download.download_dir)
.await
.map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to create download directory '{}': {}",
config.download.download_dir.display(),
e
),
))
})?;
tokio::fs::create_dir_all(&config.download.temp_dir)
.await
.map_err(|e| {
Error::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to create temp directory '{}': {}",
config.download.temp_dir.display(),
e
),
))
})?;
let db = Database::new(&config.persistence.database_path).await?;
db.set_clean_start().await?;
let (event_tx, _rx) = tokio::sync::broadcast::channel(1000);
let mut nntp_pools = Vec::with_capacity(config.servers.len());
for server in &config.servers {
let pool = nntp_rs::NntpPool::new(server.clone().into(), server.connections as u32)
.await
.map_err(|e| Error::Nntp(format!("Failed to create NNTP pool: {}", e)))?;
nntp_pools.push(pool);
}
let queue =
std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::BinaryHeap::new()));
let concurrent_limit = std::sync::Arc::new(tokio::sync::Semaphore::new(
config.download.max_concurrent_downloads,
));
let active_downloads =
std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
let speed_limiter = speed_limiter::SpeedLimiter::new(config.download.speed_limit_bps);
let config_arc = std::sync::Arc::new(config.clone());
let categories = std::sync::Arc::new(tokio::sync::RwLock::new(
config.persistence.categories.clone(),
));
let schedule_rules = std::sync::Arc::new(tokio::sync::RwLock::new(
config.persistence.schedule_rules.clone(),
));
let next_schedule_rule_id = std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0));
let parity_handler: std::sync::Arc<dyn ParityHandler> =
if let Some(ref handler) = config.tools.parity_handler {
std::sync::Arc::clone(handler)
} else if let Some(ref par2_path) = config.tools.par2_path {
std::sync::Arc::new(CliParityHandler::new(par2_path.clone()))
} else if config.tools.search_path {
CliParityHandler::from_path()
.map(|h| std::sync::Arc::new(h) as std::sync::Arc<dyn ParityHandler>)
.unwrap_or_else(|| std::sync::Arc::new(NoOpParityHandler))
} else {
std::sync::Arc::new(NoOpParityHandler)
};
let parity_caps = parity_handler.capabilities();
tracing::info!(
parity_handler = parity_handler.name(),
can_verify = parity_caps.can_verify,
can_repair = parity_caps.can_repair,
"Parity handler initialized"
);
let db_arc = std::sync::Arc::new(db);
let post_processor = std::sync::Arc::new(post_processing::PostProcessor::new(
event_tx.clone(),
config_arc.clone(),
parity_handler.clone(),
db_arc.clone(),
));
let queue_state = QueueState {
queue,
concurrent_limit,
active_downloads,
accepting_new: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true)),
};
let runtime_config = RuntimeConfig {
categories,
schedule_rules,
next_schedule_rule_id,
};
let processing = ProcessingPipeline {
post_processor,
parity_handler,
};
let downloader = Self {
db: db_arc,
event_tx,
config: config_arc,
nntp_pools: std::sync::Arc::new(nntp_pools),
speed_limiter,
queue_state,
runtime_config,
processing,
};
let needs_post_processing = downloader.restore_queue().await?;
for id in needs_post_processing {
let dl = downloader.clone();
tokio::spawn(async move {
if let Err(e) = dl.start_post_processing(id).await {
tracing::error!(download_id = id.0, error = %e, "Post-processing failed during restore");
}
});
}
Ok(downloader)
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<crate::types::Event> {
self.event_tx.subscribe()
}
pub fn get_config(&self) -> std::sync::Arc<Config> {
std::sync::Arc::clone(&self.config)
}
pub fn capabilities(&self) -> crate::types::Capabilities {
let parity_caps = self.processing.parity_handler.capabilities();
let handler_name = self.processing.parity_handler.name().to_string();
crate::types::Capabilities {
parity: crate::types::ParityCapabilitiesInfo {
can_verify: parity_caps.can_verify,
can_repair: parity_caps.can_repair,
handler: handler_name,
},
}
}
pub(crate) fn emit_event(&self, event: crate::types::Event) {
self.event_tx.send(event).ok();
}
pub fn spawn_api_server(self: &std::sync::Arc<Self>) -> tokio::task::JoinHandle<Result<()>> {
let downloader = self.clone();
let config = self.config.clone();
tokio::spawn(async move { crate::api::start_api_server(downloader, config).await })
}
}