use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
use crate::browser::tab::Tab;
use crate::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DownloadState {
Downloading,
Finished,
}
#[derive(Debug, Clone)]
pub struct DownloadMission {
pub suggested_filename: String,
pub path: PathBuf,
pub state: DownloadState,
}
impl DownloadMission {
pub fn is_finished(&self) -> bool {
matches!(self.state, DownloadState::Finished)
}
pub fn succeeded(&self) -> bool {
self.is_finished()
}
pub async fn downloaded_bytes(&self) -> u64 {
if let Ok(m) = tokio::fs::metadata(&self.path).await {
return m.len();
}
let part = self
.path
.with_file_name(format!("{}.part", self.suggested_filename));
tokio::fs::metadata(&part).await.map(|m| m.len()).unwrap_or(0)
}
pub async fn save_as(&self, dest: impl AsRef<Path>) -> Result<PathBuf> {
let dest = dest.as_ref().to_path_buf();
if let Some(parent) = dest.parent().filter(|p| !p.as_os_str().is_empty()) {
tokio::fs::create_dir_all(parent).await?;
}
if tokio::fs::rename(&self.path, &dest).await.is_err() {
tokio::fs::copy(&self.path, &dest).await?;
let _ = tokio::fs::remove_file(&self.path).await;
}
Ok(dest)
}
}
pub(crate) struct DownloadShared {
pub active: AtomicBool,
pub baseline: Mutex<HashSet<String>>,
pub done_returned: Mutex<HashSet<String>>,
pub new_returned: Mutex<HashSet<String>>,
}
impl DownloadShared {
pub(crate) fn new() -> Self {
Self {
active: AtomicBool::new(false),
baseline: Mutex::new(HashSet::new()),
done_returned: Mutex::new(HashSet::new()),
new_returned: Mutex::new(HashSet::new()),
}
}
}
pub struct Downloads {
tab: Tab,
}
impl Downloads {
pub(crate) fn new(tab: Tab) -> Self {
Self { tab }
}
fn dir(&self) -> Option<PathBuf> {
self.tab.core.download_path.clone()
}
pub async fn start(&self) -> Result<()> {
let dir = self.dir().ok_or_else(|| {
Error::Other("downloads(): 需先用 BrowserOptions::download_path 设置下载目录".into())
})?;
let shared = &self.tab.core.downloads;
*shared.baseline.lock().await = list_dir_files(&dir).await;
shared.done_returned.lock().await.clear();
shared.new_returned.lock().await.clear();
shared.active.store(true, Ordering::SeqCst);
Ok(())
}
pub fn listening(&self) -> bool {
self.tab.core.downloads.active.load(Ordering::SeqCst)
}
async fn scan(&self) -> Vec<DownloadMission> {
let Some(dir) = self.dir() else {
return Vec::new();
};
let baseline = self.tab.core.downloads.baseline.lock().await.clone();
scan_new_files(&dir, &baseline).await
}
pub async fn wait_new(&self, timeout: Duration) -> Result<Option<DownloadMission>> {
self.ensure_active()?;
let deadline = Instant::now() + timeout;
loop {
for m in self.scan().await {
let mut seen = self.tab.core.downloads.new_returned.lock().await;
if seen.insert(m.suggested_filename.clone()) {
return Ok(Some(m));
}
}
if self.expired(deadline).await {
return Ok(None);
}
}
}
pub async fn wait_done(&self, timeout: Duration) -> Result<Option<DownloadMission>> {
self.ensure_active()?;
let deadline = Instant::now() + timeout;
loop {
for m in self.scan().await {
if m.state != DownloadState::Finished {
continue;
}
{
let seen = self.tab.core.downloads.done_returned.lock().await;
if seen.contains(&m.suggested_filename) {
continue;
}
}
if !size_stable(&m.path).await {
continue;
}
self.tab
.core
.downloads
.done_returned
.lock()
.await
.insert(m.suggested_filename.clone());
return Ok(Some(m));
}
if self.expired(deadline).await {
return Ok(None);
}
}
}
pub async fn wait_count_done(
&self,
count: usize,
timeout: Duration,
) -> Result<Vec<DownloadMission>> {
let deadline = Instant::now() + timeout;
let mut out = Vec::with_capacity(count);
while out.len() < count {
let remain = deadline.saturating_duration_since(Instant::now());
if remain.is_zero() {
break;
}
match self.wait_done(remain).await? {
Some(m) => out.push(m),
None => break,
}
}
Ok(out)
}
pub async fn missions(&self) -> Vec<DownloadMission> {
self.scan().await
}
pub async fn stop(&self) -> Result<()> {
let shared = &self.tab.core.downloads;
shared.active.store(false, Ordering::SeqCst);
shared.baseline.lock().await.clear();
shared.done_returned.lock().await.clear();
shared.new_returned.lock().await.clear();
Ok(())
}
fn ensure_active(&self) -> Result<()> {
if self.tab.core.downloads.active.load(Ordering::SeqCst) {
Ok(())
} else {
Err(Error::Other("尚未调用 downloads().start()".into()))
}
}
async fn expired(&self, deadline: Instant) -> bool {
if Instant::now() >= deadline {
return true;
}
tokio::time::sleep(Duration::from_millis(80)).await;
false
}
}
pub(crate) async fn scan_new_files(dir: &Path, baseline: &HashSet<String>) -> Vec<DownloadMission> {
let mut finals: HashSet<String> = HashSet::new();
let mut parts: HashSet<String> = HashSet::new();
if let Ok(mut rd) = tokio::fs::read_dir(dir).await {
while let Ok(Some(e)) = rd.next_entry().await {
let name = e.file_name().to_string_lossy().to_string();
match name.strip_suffix(".part") {
Some(stripped) => {
parts.insert(stripped.to_string());
}
None => {
finals.insert(name);
}
}
}
}
let mut out = Vec::new();
for p in &parts {
if baseline.contains(p) {
continue;
}
out.push(DownloadMission {
suggested_filename: p.clone(),
path: dir.join(p),
state: DownloadState::Downloading,
});
}
for f in &finals {
if baseline.contains(f) || parts.contains(f) {
continue;
}
out.push(DownloadMission {
suggested_filename: f.clone(),
path: dir.join(f),
state: DownloadState::Finished,
});
}
out
}
pub(crate) async fn list_dir_files(dir: &Path) -> HashSet<String> {
let mut set = HashSet::new();
if let Ok(mut rd) = tokio::fs::read_dir(dir).await {
while let Ok(Some(e)) = rd.next_entry().await {
set.insert(e.file_name().to_string_lossy().to_string());
}
}
set
}
pub(crate) async fn size_stable(path: &Path) -> bool {
let Ok(a) = tokio::fs::metadata(path).await.map(|m| m.len()) else {
return false;
};
tokio::time::sleep(Duration::from_millis(120)).await;
matches!(tokio::fs::metadata(path).await.map(|m| m.len()), Ok(b) if b == a)
}