use std::{
borrow::Cow,
io::{BufReader, Read, Seek},
path::{Path, PathBuf},
sync::Arc,
};
use bytesize::ByteSize;
use clap::{crate_version, Subcommand};
use crossbeam::deque::Injector;
use directories::ProjectDirs;
use fd_lock::RwLock;
use fs_err::{File, OpenOptions};
use itertools::Itertools;
use quick_xml::{de::from_reader, se::to_writer, DeError};
use reqwest::{
blocking::{ClientBuilder, Response},
header::CONTENT_DISPOSITION,
Url,
};
use rss::{Channel, Item};
use serde::{Deserialize, Serialize};
use crate::{
article::MessageId,
download::{Download, DownloadError, Server},
nzb::Nzb,
PROJECT_DIR,
};
macro_rules! regex {
($re:literal $(,)?) => {{
static RE: once_cell::sync::OnceCell<regex::Regex> = once_cell::sync::OnceCell::new();
RE.get_or_init(|| regex::Regex::new($re).unwrap())
}};
}
#[derive(Debug, Subcommand)]
pub enum QueueAction {
Add {
#[clap(value_parser)]
path: String,
},
#[clap(alias = "ls")]
List,
#[clap(alias = "rm")]
Remove {
#[clap(value_parser)]
name: String,
},
}
impl QueueAction {
pub fn execute(&self) -> Result<(), QueueError> {
Queue::lock(|queue| match self {
QueueAction::Add { path } => {
let bytes = get_bytes(&path).map_err(QueueError::GetBytesFailed)?;
if let Ok(mut nzb) = Nzb::read_from(&bytes[..]) {
if nzb.name().is_none() {
if path.starts_with("https://") || path.starts_with("http://") {
let url = Url::parse(&path)
.map_err(GetBytesFailed::InvalidUrl)
.map_err(QueueError::GetBytesFailed)?;
let response = ClientBuilder::new()
.user_agent(format!("rews {}", crate_version!()))
.build()
.and_then(|client| client.head(url).send())
.map_err(GetBytesFailed::DownloadFailed)
.map_err(QueueError::GetBytesFailed)?;
let name = get_filename(&response)
.ok_or(GetBytesFailed::FilenameMissing)
.map_err(QueueError::GetBytesFailed)?;
nzb.set_name(&name);
} else {
let name: String = PathBuf::from(&path)
.file_stem()
.and_then(|filename| filename.to_str())
.ok_or(GetBytesFailed::FilenameMissing)
.map_err(QueueError::GetBytesFailed)?
.into();
nzb.set_name(&name);
}
}
Ok(queue.add_item(nzb).unwrap())
} else if let Ok(channel) = Channel::read_from(&bytes[..]) {
queue.add_items(channel.items.into_iter())
} else {
Err(QueueError::InvalidSource)
}
}
QueueAction::List => {
queue.items.iter().for_each(|item| {
let total_bytes: ByteSize = ByteSize::b(item.total_bytes);
let remaining_bytes: ByteSize =
ByteSize::b(item.messages.iter().map(|msg| msg.size).sum());
let downloaded_bytes = ByteSize(total_bytes.0 - remaining_bytes.0);
let name = item.name();
println!("Name: {name}\nSize: {downloaded_bytes} / {total_bytes}");
println!("");
});
Ok(())
}
QueueAction::Remove { name } => Ok(queue.remove_item(name)),
})
}
}
fn get_bytes(path: &str) -> Result<Vec<u8>, GetBytesFailed> {
if path.starts_with("https://") || path.starts_with("http://") {
let url = Url::parse(&path).map_err(GetBytesFailed::InvalidUrl)?;
let response = ClientBuilder::new()
.user_agent(format!("rews {}", crate_version!()))
.build()
.and_then(|client| client.get(url).send())
.map_err(GetBytesFailed::DownloadFailed)?;
let bytes = response.bytes().map_err(GetBytesFailed::DownloadFailed)?;
Ok(bytes.to_vec())
} else {
let mut file = File::open(&path).map_err(GetBytesFailed::OpenFailed)?;
let mut buffer: Vec<u8> = vec![];
file
.read_to_end(&mut buffer)
.map_err(GetBytesFailed::OpenFailed)?;
Ok(buffer)
}
}
fn get_filename(response: &Response) -> Option<String> {
let value = response.headers().get(CONTENT_DISPOSITION)?.to_str().ok()?;
PathBuf::from(
®ex!(r#"attachment; filename[^;=\n]*=(?P<filename>(['"]).*?\2|[^;\n]*)"#)
.captures(&value)?
.name("filename")?
.as_str(),
)
.file_stem()?
.to_str()
.map(|s| s.into())
}
#[derive(Debug)]
pub enum QueueError {
DeserializationFailed(DeError),
SerializationFailed(DeError),
OpenFailed(std::io::Error),
WriteFailed(std::io::Error),
InvalidSource,
GetBytesFailed(GetBytesFailed),
}
#[derive(Debug)]
pub enum GetBytesFailed {
MissingEnclosure(Item),
InvalidUrl(url::ParseError),
DownloadFailed(reqwest::Error),
OpenFailed(std::io::Error),
FilenameMissing,
}
impl Into<QueueError> for GetBytesFailed {
fn into(self) -> QueueError {
QueueError::GetBytesFailed(self)
}
}
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename = "queue", rename_all = "kebab-case")]
pub struct Queue {
#[serde(rename = "item", default)]
pub items: Vec<QueueItem>,
}
impl Default for Queue {
fn default() -> Self {
Self {
items: Default::default(),
}
}
}
impl Queue {
pub fn lock<F, E>(f: F) -> Result<(), E>
where
F: Fn(&mut Queue) -> Result<(), E>,
E: From<QueueError>,
{
let mut lock = Queue::open(PROJECT_DIR.wait())?;
let mut file = lock.try_write().map_err(QueueError::OpenFailed)?;
let metadata = file.metadata().map_err(QueueError::OpenFailed)?;
let mut queue = if metadata.len() == 0 {
Queue::default()
} else {
from_reader(BufReader::new(&*file)).map_err(QueueError::DeserializationFailed)?
};
let result = f(&mut queue);
file.set_len(0).map_err(QueueError::WriteFailed)?;
file
.seek(std::io::SeekFrom::Start(0))
.map_err(QueueError::WriteFailed)?;
to_writer(&*file, &queue).map_err(QueueError::SerializationFailed)?;
result
}
pub fn open(project_dir: &ProjectDirs) -> Result<RwLock<File>, QueueError> {
let mut queue_path = project_dir.data_dir().to_path_buf();
queue_path.push("queue.xml");
OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&queue_path)
.map_err(|e| QueueError::OpenFailed(e))
.map(RwLock::new)
}
pub fn add_item<V>(&mut self, value: V) -> Result<(), V::Error>
where
V: TryInto<QueueItem>,
{
Ok(self.items.push(value.try_into()?))
}
pub fn add_items<I>(
&mut self,
items: I,
) -> Result<(), <<I as Iterator>::Item as TryInto<QueueItem>>::Error>
where
I: Iterator,
I::Item: TryInto<QueueItem>,
{
for item in items {
let result = item
.try_into()
.map(|queue_item| self.items.push(queue_item));
if result.is_err() {
return result;
}
}
Ok(())
}
pub fn remove_item(&mut self, name: &str) {
self.items.retain(|item| item.name() != name);
}
pub fn remove_message(&mut self, message_id: &MessageId) {
for (item_index, item) in &mut self.items.iter_mut().enumerate() {
if let Some(message_index) = item
.messages
.iter()
.position(|message| message.message_id == *message_id)
{
item.messages.swap_remove(message_index);
if item.messages.is_empty() {
self.items.remove(item_index);
}
break;
}
}
}
pub fn size(&self, message_id: &MessageId) -> u64 {
self
.items
.iter()
.find_map(|item| {
item.messages.iter().find_map(|msg| {
if msg.message_id == *message_id {
Some(msg.size)
} else {
None
}
})
})
.unwrap_or_default()
}
pub fn run<'a>(
&self,
output_dir: &'a Path,
server: &'a Server,
) -> Result<Download<'a>, DownloadError> {
let injector: Injector<(PathBuf, MessageId)> = Injector::new();
for item in &self.items {
for message in &item.messages {
injector.push((item.path.clone(), message.message_id.clone()))
}
}
let download = Download::new(output_dir, &server);
download.run(Arc::new(injector))?;
Ok(download)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct QueueItem {
pub total_bytes: u64,
#[serde(rename = "path")]
pub path: PathBuf,
#[serde(rename = "message", default)]
pub messages: Vec<QueueMessage>,
}
impl QueueItem {
pub fn remaining_bytes(&self) -> u64 {
self.messages.iter().map(|message| message.size).sum()
}
pub fn name(&self) -> Cow<str> {
self
.path
.as_path()
.file_name()
.unwrap_or_default()
.to_string_lossy()
}
}
impl TryFrom<Item> for QueueItem {
type Error = QueueError;
fn try_from(item: Item) -> Result<Self, Self::Error> {
if let Some(enclosure) = item.enclosure {
let bytes = get_bytes(&enclosure.url).map_err(QueueError::GetBytesFailed)?;
let mut nzb = Nzb::read_from(&bytes[..]).map_err(QueueError::DeserializationFailed)?;
if nzb.name().is_none() {
if let Some(title) = item.title {
nzb.set_name(&title);
}
}
Ok(nzb.into())
} else {
Err(QueueError::GetBytesFailed(
GetBytesFailed::MissingEnclosure(item),
))
}
}
}
impl From<Nzb> for QueueItem {
fn from(nzb: Nzb) -> Self {
QueueItem {
total_bytes: nzb.size(),
path: PathBuf::from(nzb.name().unwrap_or_default()),
messages: nzb
.messages()
.into_iter()
.map(|(message_id, size)| QueueMessage { message_id, size })
.collect_vec(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct QueueMessage {
pub message_id: MessageId,
pub size: u64,
}
impl Default for QueueItem {
fn default() -> Self {
Self {
total_bytes: Default::default(),
path: Default::default(),
messages: Default::default(),
}
}
}
#[cfg(test)]
mod tests {
use std::{path::PathBuf, str::FromStr};
use quick_xml::{de::from_str, se::to_string};
use crate::{
article::MessageId,
queue::{Queue, QueueItem, QueueMessage},
};
#[test]
fn serialize_queue() {
let xml =
"<queue><item total-bytes=\"100\" path=\"/tmp/xyz\"><message size=\"25\"><message-id>some-id</message-id></message><message size=\"25\"><message-id>other-id</message-id></message></item></queue>";
let queue = Queue {
items: vec![QueueItem {
total_bytes: 100,
path: PathBuf::from("/tmp/xyz"),
messages: vec![
QueueMessage {
message_id: MessageId::from_str("some-id").unwrap(),
size: 25,
},
QueueMessage {
message_id: MessageId::from_str("other-id").unwrap(),
size: 25,
},
],
}],
};
assert_eq!(to_string(&queue).unwrap(), xml);
}
#[test]
fn deserialize_queue() {
let xml = "<queue><item total-bytes=\"100\" path=\"/tmp/xyz\"><message size=\"25\"><message-id>some-id</message-id></message><message size=\"25\"><message-id>other-id</message-id></message></item></queue>";
let queue = Queue {
items: vec![QueueItem {
total_bytes: 100,
path: PathBuf::from("/tmp/xyz"),
messages: vec![
QueueMessage {
message_id: MessageId::from_str("some-id").unwrap(),
size: 25,
},
QueueMessage {
message_id: MessageId::from_str("other-id").unwrap(),
size: 25,
},
],
}],
};
assert_eq!(from_str::<Queue>(xml).unwrap(), queue);
}
}