use std::{thread, time::Duration};
use crate::{
app_state::{AppState, StateMessage},
args::Args,
};
use super::worker::download_worker;
pub fn process_queue(state: AppState, args: Args) {
if state.get_queue().unwrap_or_default().is_empty() {
if let Err(e) = state.send(StateMessage::SetCompleted(true)) {
eprintln!("Error setting completed: {}", e);
}
return;
}
if let Err(e) = state.reset_for_new_run() {
eprintln!("Error resetting state: {}", e);
}
let state_clone = state.clone();
let args_clone = args.clone();
let controller = thread::spawn(move || {
let mut worker_handles = vec![];
let mut workers_created = false;
loop {
if state_clone.is_force_quit().unwrap_or(false)
|| state_clone.is_shutdown().unwrap_or(false)
{
if state_clone.is_force_quit().unwrap_or(false)
&& let Err(e) = state_clone
.add_log("Controller: Force quit detected, exiting main loop.".to_string())
{
eprintln!("Error adding log: {}", e);
}
break;
}
if state_clone.is_paused().unwrap_or(false) {
thread::sleep(Duration::from_millis(100));
continue;
}
if !workers_created && !state_clone.get_queue().unwrap_or_default().is_empty() {
let concurrent_count = state_clone.get_concurrent().unwrap_or(1);
workers_created = true;
for _ in 0..concurrent_count {
let worker_state = state_clone.clone();
let worker_args = args_clone.clone();
let handle = thread::spawn(move || {
loop {
if worker_state.is_force_quit().unwrap_or(false)
|| worker_state.is_shutdown().unwrap_or(false)
{
break;
}
if worker_state.is_paused().unwrap_or(false) {
thread::sleep(Duration::from_millis(100));
continue;
}
if let Ok(Some(url)) = worker_state.pop_queue() {
let url_clone = url.clone();
let state_for_panic = worker_state.clone();
let result =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
download_worker(
url_clone,
worker_state.clone(),
worker_args.clone(),
);
}));
if result.is_err() {
let _ = state_for_panic
.send(StateMessage::RemoveActiveDownload(url.clone()));
let _ = state_for_panic.log_error(
"Worker panic",
format!(
"Worker panicked while downloading {}, recovered",
url
),
);
}
} else {
thread::sleep(Duration::from_millis(100));
if worker_state.get_queue().unwrap_or_default().is_empty()
&& worker_state
.get_active_downloads()
.unwrap_or_default()
.is_empty()
{
break;
}
}
}
});
worker_handles.push(handle);
}
}
if workers_created
&& state_clone.get_queue().unwrap_or_default().is_empty()
&& state_clone
.get_active_downloads()
.unwrap_or_default()
.is_empty()
{
break;
}
thread::sleep(Duration::from_millis(100));
}
if state_clone.is_force_quit().unwrap_or(false) {
if let Err(e) = state_clone.add_log(
"Controller: Force quit active. Not waiting for worker threads to join."
.to_string(),
) {
eprintln!("Error adding log: {}", e);
}
} else {
if let Err(e) = state_clone
.add_log("Controller: Waiting for worker threads to complete.".to_string())
{
eprintln!("Error adding log: {}", e);
}
for handle in worker_handles {
if let Err(e) = handle.join()
&& let Err(log_err) =
state_clone.add_log(format!("Controller: Worker thread panicked: {:?}", e))
{
eprintln!("Error adding log: {}", log_err);
}
}
if let Err(e) =
state_clone.add_log("Controller: All worker threads completed.".to_string())
{
eprintln!("Error adding log: {}", e);
}
}
let queue_empty = state_clone.get_queue().unwrap_or_default().is_empty();
let active_downloads_empty = state_clone
.get_active_downloads()
.unwrap_or_default()
.is_empty();
if state_clone.is_force_quit().unwrap_or(false) {
if let Err(e) =
state_clone.add_log("Download processing forcefully stopped.".to_string())
{
eprintln!("Error adding log: {}", e);
}
} else if queue_empty && active_downloads_empty {
if let Err(e) = state_clone.send(StateMessage::SetCompleted(true)) {
eprintln!("Error setting completed: {}", e);
}
if let Err(e) =
state_clone.add_log("All downloads completed or queue is empty.".to_string())
{
eprintln!("Error adding log: {}", e);
}
} else {
if let Err(e) = state_clone.add_log("Download processing stopped.".to_string()) {
eprintln!("Error adding log: {}", e);
}
}
if let Err(e) = state_clone.send(StateMessage::SetStarted(false)) {
eprintln!("Error setting started: {}", e);
}
if !state_clone.is_force_quit().unwrap_or(false) {
let final_state_clone = state_clone.clone();
thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
if let Err(e) = final_state_clone.clear_logs() {
eprintln!("Error clearing logs: {}", e);
}
});
}
});
if let Err(e) = controller.join() {
eprintln!("Controller thread panicked: {:?}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::app_state::AppState;
use crate::args::Args;
use clap::Parser;
fn create_test_args() -> Args {
Args::parse_from([
"test",
"-d",
"/tmp/test_downloads",
"-f",
"/tmp/test_archive.txt",
])
}
#[test]
fn test_process_queue_empty_queue_sets_completed() {
let state = AppState::new();
let args = create_test_args();
assert!(state.get_queue().unwrap_or_default().is_empty());
process_queue(state.clone(), args);
std::thread::sleep(std::time::Duration::from_millis(100));
assert!(state.is_completed().unwrap_or(false));
}
#[test]
fn test_process_queue_empty_queue_does_not_start() {
let state = AppState::new();
let args = create_test_args();
process_queue(state.clone(), args);
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(!state.is_started().unwrap_or(true));
}
#[test]
fn test_get_concurrent_returns_expected_value() {
let state = AppState::new();
let _ = state.set_concurrent(8);
assert_eq!(state.get_concurrent().unwrap_or(0), 8);
}
#[test]
fn test_concurrent_default_value() {
let state = AppState::new();
let concurrent = state.get_concurrent().unwrap_or(0);
assert!(concurrent > 0);
}
#[test]
fn test_shutdown_flag_default_false() {
let state = AppState::new();
assert!(!state.is_shutdown().unwrap_or(true));
}
#[test]
fn test_force_quit_flag_default_false() {
let state = AppState::new();
assert!(!state.is_force_quit().unwrap_or(true));
}
#[test]
fn test_shutdown_flag_can_be_set() {
let state = AppState::new();
let _ = state.send(StateMessage::SetShutdown(true));
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(state.is_shutdown().unwrap_or(false));
}
#[test]
fn test_force_quit_flag_can_be_set() {
let state = AppState::new();
let _ = state.send(StateMessage::SetForceQuit(true));
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(state.is_force_quit().unwrap_or(false));
}
#[test]
fn test_queue_can_hold_urls() {
let state = AppState::new();
let _ = state.send(StateMessage::LoadLinks(vec![
"https://example.com/video1".to_string(),
"https://example.com/video2".to_string(),
"https://example.com/video3".to_string(),
]));
std::thread::sleep(std::time::Duration::from_millis(50));
let queue = state.get_queue().unwrap_or_default();
assert_eq!(queue.len(), 3);
}
#[test]
fn test_pop_queue_returns_url() {
let state = AppState::new();
let _ = state.send(StateMessage::LoadLinks(vec![
"https://example.com/video".to_string(),
]));
std::thread::sleep(std::time::Duration::from_millis(50));
let url = state.pop_queue();
assert!(url.is_ok());
assert!(url.unwrap().is_some());
}
#[test]
fn test_pop_queue_empty_returns_none() {
let state = AppState::new();
assert!(state.get_queue().unwrap_or_default().is_empty());
let url = state.pop_queue();
assert!(url.is_ok());
assert!(url.unwrap().is_none());
}
}