use crate::config::Settings;
use crate::ipc::{Command, JobStatus, Request, Response};
use crate::observer::DaemonObserver;
use crate::{downloader, perform_parallel_download};
use anyhow::Result;
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::{Mutex, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
pub struct ActiveJobData {
pub id: usize,
pub filename: String,
pub total_bytes: AtomicU64,
pub downloaded_bytes: AtomicU64,
pub state: Mutex<String>,
pub cancel_token: Mutex<CancellationToken>,
pub url: String,
pub dir: String,
}
struct DaemonState {
jobs: HashMap<usize, Arc<ActiveJobData>>,
}
struct JobRequest {
url: String,
dir: String,
job_data: Arc<ActiveJobData>,
}
pub async fn start_daemon(port: u16, secret: Option<String>, bind_ip: String) -> Result<()> {
let listener = TcpListener::bind(format!("{}:{}", bind_ip, port)).await?;
println!("Daemon started on {}:{}", bind_ip, port);
let settings = Settings::load().unwrap_or_default();
let max_concurrent_downloads = settings.concurrent_files.unwrap_or(3);
let client = reqwest::Client::builder()
.user_agent("ParallelDownloader/0.2")
.timeout(Duration::from_secs(30))
.build()?;
let global_state = Arc::new(Mutex::new(DaemonState {
jobs: HashMap::new(),
}));
let next_id = Arc::new(AtomicUsize::new(1));
let shutdown_token = CancellationToken::new();
let shutdown_token_ref = shutdown_token.clone();
let semaphore = Arc::new(Semaphore::new(max_concurrent_downloads));
let (job_tx, mut job_rx) = mpsc::channel::<JobRequest>(100);
let client_for_scheduler = client.clone();
let shutdown_for_scheduler = shutdown_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_for_scheduler.cancelled() => {
break;
}
Some(req) = job_rx.recv() => {
let semaphore_clone = semaphore.clone();
let client_clone = client_for_scheduler.clone();
tokio::spawn(async move {
let _permit = match semaphore_clone.acquire().await {
Ok(p) => p,
Err(_) => return,
};
let job_token = req.job_data.cancel_token.lock().await;
if job_token.is_cancelled() { return; }
{
let mut state = req.job_data.state.lock().await;
*state = "Starting...".to_string();
}
if let Err(e) = perform_background_download(
req.url,
req.dir,
req.job_data.clone(),
client_clone,
).await {
let mut state = req.job_data.state.lock().await;
*state = format!("Failed: {}", e);
}
});
}
}
}
});
tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
println!("\nReceived Ctrl+C. Pausing all jobs and shutting down...");
shutdown_token_ref.cancel();
}
});
loop {
tokio::select! {
_ = shutdown_token.cancelled() => {
println!("Daemon shutting down. Goodbye!");
tokio::time::sleep(Duration::from_millis(500)).await;
return Ok(());
}
accept_result = listener.accept() => {
let (mut socket, addr) = accept_result?;
let secret_check = secret.clone();
let next_id_ref = next_id.clone();
let state_ref = global_state.clone();
let shutdown_token_inner = shutdown_token.clone();
let job_tx_clone = job_tx.clone();
tokio::spawn(async move {
let mut buf = [0; 4096];
let n = match socket.read(&mut buf).await {
Ok(0) => return,
Ok(n) => n,
Err(e) => {
eprintln!("Socket read error: {}", e);
return;
}
};
let req_str = String::from_utf8_lossy(&buf[..n]);
let request: Request = match serde_json::from_str(&req_str) {
Ok(r) => r,
Err(e) => {
let _ =
send_response(&mut socket, Response::Err(format!("Invalid JSON: {}", e)))
.await;
return;
}
};
if let Some(ref server_pass) = secret_check
&& request.secret.as_ref() != Some(server_pass)
{
println!("⚠️ Unauthorized attempt from {}", addr);
let _ = send_response(
&mut socket,
Response::Err("Unauthorized: Invalid secret".into()),
)
.await;
return;
}
match request.command {
Command::Shutdown => {
let _ =
send_response(&mut socket, Response::Ok("Shutting down...".into())).await;
shutdown_token_inner.cancel();
std::process::exit(0);
}
Command::Status => {
let locked = state_ref.lock().await;
let mut list = Vec::new();
for job in locked.jobs.values() {
let current = job.downloaded_bytes.load(Ordering::Relaxed);
let total = job.total_bytes.load(Ordering::Relaxed);
let percent = if total > 0 {
(current * 100) / total
} else {
0
};
list.push(JobStatus {
id: job.id,
filename: job.filename.clone(),
progress_percent: percent,
state: job.state.lock().await.clone(),
});
}
list.sort_by_key(|j| j.id);
let _ = send_response(&mut socket, Response::StatusList(list)).await;
}
Command::Pause { id } => {
let locked = state_ref.lock().await;
if let Some(job) = locked.jobs.get(&id) {
let cancel_token_ref = job.cancel_token.lock().await;
cancel_token_ref.cancel();
let mut state_str = job.state.lock().await;
*state_str = "Pausing...".into();
let _ =
send_response(&mut socket, Response::Ok(format!("Paused job #{}", id)))
.await;
} else {
let _ =
send_response(&mut socket, Response::Err("Job ID not found".into()))
.await;
}
}
Command::Resume { id } => {
let locked = state_ref.lock().await;
if let Some(job) = locked.jobs.get(&id) {
let mut cancel_token_ref = job.cancel_token.lock().await;
if !cancel_token_ref.is_cancelled() {
let _ = send_response(
&mut socket,
Response::Err("Job is already running".into()),
)
.await;
return;
}
let new_token = shutdown_token_inner.child_token();
*cancel_token_ref = new_token;
let _ = job_tx_clone.send(JobRequest {
url: job.url.clone(),
dir: job.dir.clone(),
job_data: job.clone(),
}).await;
let _ = send_response(&mut socket, Response::Ok(format!("Resumed job #{}", id))).await;
} else {
let _ =
send_response(&mut socket, Response::Err("Job ID not found".into()))
.await;
}
}
Command::Add { url, dir } => {
let id = next_id_ref.fetch_add(1, Ordering::SeqCst);
let filename = crate::utils::get_filename_from_url(&url);
let child_cancel_token = shutdown_token_inner.child_token();
let job_data = Arc::new(ActiveJobData {
id,
filename: filename.clone(),
total_bytes: AtomicU64::new(0), downloaded_bytes: AtomicU64::new(0),
state: Mutex::new("Queued".into()),
cancel_token: Mutex::new(child_cancel_token),
url: url.clone(),
dir: dir.clone(),
});
{
let mut locked = state_ref.lock().await;
locked.jobs.insert(id, job_data.clone());
}
if let Err(e) = job_tx_clone.send(JobRequest {
url: url.clone(),
dir: dir.clone(),
job_data: job_data.clone(),
}).await {
let _ = send_response(&mut socket, Response::Err(format!("Scheduler Error: {}", e))).await;
return;
}
let _ = send_response(
&mut socket,
Response::Ok(format!("Queued job #{}: {}", id, filename)),
)
.await;
}
}
});
}
}
}
}
async fn perform_background_download(
url: String,
dir: String,
job_data: Arc<ActiveJobData>,
client: reqwest::Client,
) -> Result<()> {
let filename = &job_data.filename;
let mut output_path = std::path::PathBuf::from(&dir);
output_path.push(filename);
if dir != "." {
tokio::fs::create_dir_all(&dir).await?;
}
let output_filename = output_path.to_string_lossy().to_string();
{
let mut s = job_data.state.lock().await;
*s = "Fetching Metadata...".into();
}
let (state, _state_filename, size) =
downloader::prepare_download(&url, output_filename.clone(), 4, &client).await?;
job_data.total_bytes.store(size, Ordering::SeqCst);
let downloaded_bytes: u64 = state
.chunks
.iter()
.filter(|c| c.completed)
.map(|c| c.end - c.start + 1)
.sum();
job_data
.downloaded_bytes
.store(downloaded_bytes, Ordering::SeqCst);
{
let mut s = job_data.state.lock().await;
*s = "Downloading...".into();
}
let token_clone = {
let guard = job_data.cancel_token.lock().await;
guard.clone()
};
let _ = perform_parallel_download(
&url,
output_filename.clone(),
4,
&client,
|_, _| {
Arc::new(DaemonObserver {
job_data: job_data.clone(),
})
},
None,
token_clone.clone(),
)
.await?;
if !token_clone.is_cancelled() {
let mut s = job_data.state.lock().await;
*s = "Done".into();
job_data.downloaded_bytes.store(size, Ordering::SeqCst);
} else {
let mut s = job_data.state.lock().await;
*s = "Paused".into();
}
Ok(())
}
async fn send_response(socket: &mut tokio::net::TcpStream, resp: Response) -> Result<()> {
let json = serde_json::to_string(&resp)?;
socket.write_all(json.as_bytes()).await?;
Ok(())
}