use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::watch;
use crate::error::{FerriError, Result};
use crate::page::Page;
#[derive(Debug, Clone)]
pub enum DownloadStatus {
Pending,
Finished { path: PathBuf },
Failed { error: String },
}
pub type DownloadCanceler = Arc<
dyn Fn() -> std::pin::Pin<Box<dyn std::future::Future<Output = std::result::Result<(), FerriError>> + Send>>
+ Send
+ Sync,
>;
#[derive(Clone)]
pub struct Download {
pub(crate) inner: Arc<DownloadState>,
}
pub(crate) struct DownloadState {
page: std::sync::Weak<Page>,
guid: String,
url: String,
suggested_filename: std::sync::Mutex<String>,
downloads_dir: PathBuf,
local_path: std::sync::Mutex<PathBuf>,
canceler: DownloadCanceler,
state_tx: watch::Sender<DownloadStatus>,
deleted: AtomicBool,
}
impl Download {
#[must_use]
pub fn new(
page: &Arc<Page>,
guid: String,
url: String,
suggested_filename: String,
downloads_dir: PathBuf,
canceler: DownloadCanceler,
) -> Self {
let local_path = downloads_dir.join(&guid);
let (tx, _) = watch::channel(DownloadStatus::Pending);
Self {
inner: Arc::new(DownloadState {
page: Arc::downgrade(page),
guid,
url,
suggested_filename: std::sync::Mutex::new(suggested_filename),
downloads_dir,
local_path: std::sync::Mutex::new(local_path),
canceler,
state_tx: tx,
deleted: AtomicBool::new(false),
}),
}
}
#[must_use]
pub fn url(&self) -> &str {
&self.inner.url
}
#[must_use]
pub fn guid(&self) -> &str {
&self.inner.guid
}
#[must_use]
pub fn suggested_filename(&self) -> String {
self
.inner
.suggested_filename
.lock()
.map(|g| g.clone())
.unwrap_or_default()
}
#[must_use]
pub fn page(&self) -> Option<Arc<Page>> {
self.inner.page.upgrade()
}
pub fn filename_suggested(&self, suggested: String) {
if let Ok(mut g) = self.inner.suggested_filename.lock() {
*g = suggested;
}
}
pub fn report_finished(&self, final_path: Option<PathBuf>, error: Option<String>) {
if let Some(p) = final_path {
if let Ok(mut g) = self.inner.local_path.lock() {
*g = p;
}
}
let path = self
.inner
.local_path
.lock()
.map_or_else(|_| self.inner.downloads_dir.clone(), |g| g.clone());
let new_state = match error {
None => DownloadStatus::Finished { path },
Some(e) => DownloadStatus::Failed { error: e },
};
self.inner.state_tx.send_replace(new_state);
}
async fn wait_finished(&self) -> DownloadStatus {
let mut rx = self.inner.state_tx.subscribe();
loop {
{
let state = rx.borrow_and_update().clone();
if !matches!(state, DownloadStatus::Pending) {
return state;
}
}
if rx.changed().await.is_err() {
return rx.borrow().clone();
}
}
}
pub async fn path(&self) -> Result<PathBuf> {
match self.wait_finished().await {
DownloadStatus::Pending => Err(FerriError::Backend(
"download watch closed before reaching terminal state".into(),
)),
DownloadStatus::Finished { path } => Ok(path),
DownloadStatus::Failed { error } => Err(FerriError::Backend(error)),
}
}
pub async fn failure(&self) -> Option<String> {
match self.wait_finished().await {
DownloadStatus::Failed { error } => Some(error),
_ => None,
}
}
pub async fn save_as(&self, target: &Path) -> Result<()> {
let src = self.path().await?;
if let Some(parent) = target.parent() {
if !parent.as_os_str().is_empty() {
tokio::fs::create_dir_all(parent).await?;
}
}
tokio::fs::copy(&src, target).await?;
Ok(())
}
pub async fn create_read_stream(&self) -> Result<tokio::fs::File> {
let path = self.path().await?;
Ok(tokio::fs::File::open(path).await?)
}
pub async fn cancel(&self) -> Result<()> {
(self.inner.canceler)().await
}
pub async fn delete(&self) -> Result<()> {
if self.inner.deleted.swap(true, Ordering::AcqRel) {
return Ok(());
}
let _ = self.wait_finished().await;
let path = self
.inner
.local_path
.lock()
.map_or_else(|_| self.inner.downloads_dir.clone(), |g| g.clone());
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(FerriError::from(e)),
}
}
}
impl std::fmt::Debug for Download {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Download")
.field("guid", &self.inner.guid)
.field("url", &self.inner.url)
.field("suggested_filename", &self.suggested_filename())
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DownloadHandlerId(pub u64);
pub type DownloadHandlerFn = Arc<dyn Fn(&Download) -> bool + Send + Sync>;
struct DownloadHandlerEntry {
id: u64,
handler: DownloadHandlerFn,
}
#[derive(Clone, Default)]
pub struct DownloadManager {
inner: Arc<DownloadManagerState>,
}
#[derive(Default)]
struct DownloadManagerState {
handlers: std::sync::Mutex<Vec<DownloadHandlerEntry>>,
next_id: AtomicU64,
by_guid: std::sync::Mutex<Vec<Download>>,
}
impl DownloadManager {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_handler(&self, handler: DownloadHandlerFn) -> DownloadHandlerId {
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
if let Ok(mut handlers) = self.inner.handlers.lock() {
handlers.push(DownloadHandlerEntry { id, handler });
}
DownloadHandlerId(id)
}
pub fn remove_handler(&self, id: DownloadHandlerId) {
if let Ok(mut handlers) = self.inner.handlers.lock() {
handlers.retain(|h| h.id != id.0);
}
}
pub fn did_open(&self, download: &Download) {
if let Ok(mut by_guid) = self.inner.by_guid.lock() {
by_guid.push(download.clone());
}
self.fire_download_event(download);
}
pub fn register_pending(&self, download: &Download) {
if let Ok(mut by_guid) = self.inner.by_guid.lock() {
by_guid.push(download.clone());
}
}
pub fn fire_download_event(&self, download: &Download) {
let handlers: Vec<DownloadHandlerFn> = match self.inner.handlers.lock() {
Ok(g) => g.iter().map(|e| Arc::clone(&e.handler)).collect(),
Err(_) => Vec::new(),
};
for h in handlers {
let _ = h(download);
}
}
#[must_use]
pub fn take_for_guid(&self, guid: &str) -> Option<Download> {
let mut guard = self.inner.by_guid.lock().ok()?;
let idx = guard.iter().position(|d| d.guid() == guid)?;
Some(guard.remove(idx))
}
#[must_use]
pub fn peek_for_guid(&self, guid: &str) -> Option<Download> {
let guard = self.inner.by_guid.lock().ok()?;
guard.iter().find(|d| d.guid() == guid).cloned()
}
#[must_use]
pub fn register_emitter_bridge(&self, events: crate::events::EventEmitter) -> DownloadHandlerId {
self.add_handler(Arc::new(move |download: &Download| {
if events.has_listener("download") {
events.emit(crate::events::PageEvent::Download(download.clone()));
true
} else {
false
}
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn noop_canceler() -> DownloadCanceler {
Arc::new(|| Box::pin(async { Ok(()) }))
}
#[test]
fn download_manager_add_remove_roundtrip() {
let mgr = DownloadManager::new();
let fired = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let fired_c = fired.clone();
let id = mgr.add_handler(Arc::new(move |_| {
fired_c.fetch_add(1, Ordering::Relaxed);
true
}));
let page = std::sync::Weak::<Page>::new();
let (tx, _) = watch::channel(DownloadStatus::Pending);
let d = Download {
inner: Arc::new(DownloadState {
page,
guid: "abc".into(),
url: "http://x/".into(),
suggested_filename: std::sync::Mutex::new("f".into()),
downloads_dir: PathBuf::from("/tmp"),
local_path: std::sync::Mutex::new(PathBuf::from("/tmp/abc")),
canceler: noop_canceler(),
state_tx: tx,
deleted: AtomicBool::new(false),
}),
};
mgr.did_open(&d);
assert_eq!(fired.load(Ordering::Relaxed), 1);
mgr.remove_handler(id);
mgr.did_open(&d);
assert_eq!(fired.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn report_finished_resolves_path() {
let page = std::sync::Weak::<Page>::new();
let (tx, _) = watch::channel(DownloadStatus::Pending);
let d = Download {
inner: Arc::new(DownloadState {
page,
guid: "abc".into(),
url: "http://x/".into(),
suggested_filename: std::sync::Mutex::new("f".into()),
downloads_dir: PathBuf::from("/tmp"),
local_path: std::sync::Mutex::new(PathBuf::from("/tmp/abc")),
canceler: noop_canceler(),
state_tx: tx,
deleted: AtomicBool::new(false),
}),
};
let d2 = d.clone();
let task = tokio::spawn(async move { d2.path().await });
d.report_finished(Some(PathBuf::from("/tmp/final")), None);
let p = task.await.unwrap().unwrap();
assert_eq!(p, PathBuf::from("/tmp/final"));
}
#[tokio::test]
async fn report_finished_with_error_surfaces_failure() {
let page = std::sync::Weak::<Page>::new();
let (tx, _) = watch::channel(DownloadStatus::Pending);
let d = Download {
inner: Arc::new(DownloadState {
page,
guid: "abc".into(),
url: "http://x/".into(),
suggested_filename: std::sync::Mutex::new("f".into()),
downloads_dir: PathBuf::from("/tmp"),
local_path: std::sync::Mutex::new(PathBuf::from("/tmp/abc")),
canceler: noop_canceler(),
state_tx: tx,
deleted: AtomicBool::new(false),
}),
};
let d2 = d.clone();
let task = tokio::spawn(async move { d2.failure().await });
d.report_finished(None, Some("canceled".into()));
let f = task.await.unwrap();
assert_eq!(f.as_deref(), Some("canceled"));
}
}