use crate::{putio, AppData};
use actix_web::web::Data;
use anyhow::{Context, Result};
use async_recursion::async_recursion;
use file_owner::PathExt;
use futures::StreamExt;
use log::info;
use serde::{Deserialize, Serialize};
use std::{
fs::{self, metadata},
os::unix::prelude::MetadataExt,
path::Path,
};
use tokio::time::sleep;
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) enum DownloadStatus {
Downloading,
Downloaded,
Imported,
}
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct Download {
hash: String,
pub status: DownloadStatus,
file_id: u64,
targets: Option<Vec<DownloadedTarget>>,
}
pub(crate) async fn start_downloader_task(app_data: Data<AppData>) -> Result<()> {
let api_token = &app_data.api_token;
let download_dir = &app_data.download_dir;
let uid = app_data.uid;
let ten_seconds = std::time::Duration::from_secs(10);
app_data.tx.send(true).await?;
loop {
info!("Waiting to start..");
app_data.rx.recv().await?;
info!("Downloader started");
loop {
let transfers = putio::list_transfers(api_token).await?.transfers;
for transfer in &transfers {
if !transfer.userfile_exists {
continue;
}
if app_data.downloads.lock().await.contains_key(&transfer.hash) {
{
let mut m = app_data.downloads.lock().await;
let d = m.get_mut(&transfer.hash).unwrap();
if d.status == DownloadStatus::Imported && transfer.status == "SEEDING" {
continue;
} else if d.status == DownloadStatus::Imported {
info!("Transfer {} done seeding. Removing..", &transfer.name);
putio::remove_transfer(api_token, transfer.id).await?;
m.remove(&transfer.hash);
drop(m);
app_data.save().await?;
continue;
}
}
info!("Checking if '{}' has been imported already", &transfer.name);
let targets: Vec<String> = {
let m = app_data.downloads.lock().await;
let d = m.get(&transfer.hash).unwrap();
d.targets
.as_ref()
.unwrap()
.iter()
.filter(|t| t.target_type == DownloadType::File)
.map(|t| t.path_name.clone())
.collect()
};
let imported = targets
.iter()
.filter_map(|s| {
let meta = fs::metadata(s).unwrap();
let links = meta.nlink();
if links > 1 {
Some(true)
} else {
None
}
})
.any(|x| x);
if imported {
info!("'{}' has been imported. Deleting files.", &transfer.name);
let top_level_path = app_data
.downloads
.lock()
.await
.get(&transfer.hash)
.unwrap()
.targets
.as_ref()
.unwrap()
.iter()
.find(|d| d.top_level)
.unwrap()
.path_name
.clone();
match metadata(&top_level_path) {
Ok(m) if m.is_dir() => {
info!("Deleting everyting in {}", &top_level_path);
fs::remove_dir_all(top_level_path).unwrap();
}
Ok(m) if m.is_file() => {
info!("Deleting {}", &top_level_path);
fs::remove_file(&top_level_path).unwrap();
}
Ok(_) | Err(_) => {
panic!("Don't know how to handle {}", &top_level_path)
}
};
{
let mut m = app_data.downloads.lock().await;
let d = m.get_mut(&transfer.hash).unwrap();
d.status = DownloadStatus::Imported;
}
app_data.save().await?;
}
} else {
info!("Downloading transfer '{}'", transfer.hash);
let d = Download {
hash: transfer.hash.clone(),
status: DownloadStatus::Downloading,
file_id: transfer.file_id.unwrap(),
targets: None,
};
let file_id = d.file_id;
app_data
.downloads
.lock()
.await
.insert(transfer.hash.clone(), d);
let targets = download(api_token, file_id, download_dir, uid).await?;
{
let mut m = app_data.downloads.lock().await;
let d = m.get_mut(&transfer.hash).unwrap();
d.targets = Some(targets);
d.status = DownloadStatus::Downloaded;
}
app_data.save().await?;
}
}
{
let transfer_keys: Vec<String> = transfers.iter().map(|t| t.hash.clone()).collect();
let mut downloads = app_data.downloads.lock().await;
downloads.retain(|k, _| {
if !transfer_keys.contains(k) {
info!("Cleaning up {}", k);
false
} else {
true
}
});
}
if transfers.is_empty() {
info!("No more transfers, downloader stopped.");
break;
}
sleep(ten_seconds).await;
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub(crate) struct DownloadedTarget {
pub path_name: String,
pub target_type: DownloadType,
pub top_level: bool,
}
pub(crate) async fn download(
api_token: &str,
file_id: u64,
download_dir: &str,
uid: u32,
) -> Result<Vec<DownloadedTarget>> {
let listing = get_download_operations(api_token, file_id, download_dir).await?;
let mut targets = Vec::<DownloadedTarget>::new();
for (i, op) in listing.into_iter().enumerate() {
match op.download_type {
DownloadType::Directory => {
if !Path::new(&op.target).exists() {
fs::create_dir(&op.target)?;
op.target.clone().set_owner(uid)?;
}
}
DownloadType::File => {
if Path::new(&op.target).exists() {
fs::remove_file(&op.target)?;
}
fetch_url(op.url.context("No URL found")?, op.target.clone(), uid).await?
}
}
targets.push(DownloadedTarget {
path_name: op.target,
target_type: op.download_type,
top_level: i == 0, })
}
Ok(targets)
}
async fn fetch_url(url: String, target: String, uid: u32) -> Result<()> {
info!("Downloading {} started...", &target);
let tmp_path = format!("{}.downloading", &target);
let mut tmp_file = tokio::fs::File::create(&tmp_path).await?;
let mut byte_stream = reqwest::get(&url).await?.bytes_stream();
while let Some(item) = byte_stream.next().await {
tokio::io::copy(&mut item?.as_ref(), &mut tmp_file).await?;
}
tmp_path.clone().set_owner(uid)?;
fs::rename(&tmp_path, &target)?;
info!("Downloading {} finished...", &target);
Ok(())
}
#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
pub enum DownloadType {
Directory,
File,
}
#[derive(Debug)]
struct DownloadOperation {
pub url: Option<String>,
pub download_type: DownloadType,
pub target: String,
}
#[async_recursion]
async fn get_download_operations(
api_token: &str,
file_id: u64,
download_dir: &str,
) -> Result<Vec<DownloadOperation>> {
let mut operations = Vec::<DownloadOperation>::new();
let response = putio::list_files(api_token, file_id).await?;
if response.parent.content_type == "application/x-directory" {
let target = Path::new(download_dir)
.join(&response.parent.name)
.to_string_lossy()
.to_string();
operations.push(DownloadOperation {
url: None,
download_type: DownloadType::Directory,
target: target.clone(),
});
let new_base = format!("{}/", &target);
for file in response.files {
operations
.append(&mut get_download_operations(api_token, file.id, new_base.as_str()).await?)
}
} else {
let url = putio::url(api_token, response.parent.id).await?;
let target = Path::new(download_dir)
.join(&response.parent.name)
.to_string_lossy()
.to_string();
operations.push(DownloadOperation {
url: Some(url),
download_type: DownloadType::File,
target,
})
}
Ok(operations)
}