use std::sync::{mpsc, Arc, Barrier, Mutex};
use std::thread;
use std::time::Duration;
use std::io;
use pbr::ProgressBar;
use threadpool::ThreadPool;
use regex::Regex;
use chrono::prelude::NaiveDate;
#[derive(Debug)]
enum ParseError {
Title,
Duration,
Channel,
Date,
}
pub struct Title {
pub content: String,
}
impl From<String> for Title {
fn from(title: String) -> Self {
let mut title = title.trim().to_string();
let postfix = " - YouTube";
if title.ends_with(postfix) {
for _ in 0..postfix.len() {
title.pop();
}
}
Title { content: title }
}
}
struct Html {
url: String,
content: String,
}
impl Html {
fn is_available(&self) -> bool {
self.content.contains("'IS_UNAVAILABLE_PAGE': false")
}
fn parse(&self) -> Result<Video, ParseError> {
info!("URL: {}", self.url);
let re = Regex::new(r"<title>(.+)</title>").unwrap();
let title = re.captures(self.content.as_str())
.ok_or(ParseError::Title)?
.get(1)
.ok_or(ParseError::Title)?
.as_str()
.to_owned();
let title = Title::from(title);
let re = Regex::new(r#"lengthSeconds\\":\\"(\d+)\\""#).unwrap();
let duration: Duration = Duration::from_secs(re.captures(self.content.as_str())
.ok_or(ParseError::Duration)?
.get(1)
.ok_or(ParseError::Duration)?
.as_str()
.parse::<u64>()
.map_err(|_| ParseError::Duration)?);
let re = Regex::new(r#"author\\":\\"(.+?)\\""#).unwrap();
let channel = re.captures(self.content.as_str())
.ok_or(ParseError::Channel)?
.get(1)
.ok_or(ParseError::Channel)?
.as_str()
.to_owned();
let re = Regex::new(r#"datePublished" content="(.+?)""#).unwrap();
let published = NaiveDate::parse_from_str(
re.captures(self.content.as_str())
.ok_or(ParseError::Date)?
.get(1)
.ok_or(ParseError::Date)?
.as_str(),
"%Y-%m-%d",
).map_err(|_| ParseError::Date)?;
Ok(Video {
url: self.url.to_owned(),
title,
duration,
channel,
published,
})
}
}
pub struct Video {
pub url: String,
pub title: Title,
pub duration: Duration,
pub channel: String,
pub published: NaiveDate,
}
pub fn fetch(chunks: Vec<Vec<String>>, progress_bar: Option<bool>) -> Vec<Video> {
fn fetch_one(
url: &str,
sender: &mpsc::Sender<Video>,
pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>>,
) {
let mut response = match reqwest::get(url) {
Ok(res) => res,
Err(e) => {
error!(
"URL \"{}\" couldn't be fetched because of \"{}\" reason. Skipping...",
url,
e
);
return;
}
};
if !response.status().is_success() {
error!(
"URL \"{}\" returned wrong response code \"{}\". Skipping...",
url,
response.status().as_str()
)
}
info!(
"URL \"{}\" returned ok response code \"{}\"",
url,
response.status().as_str()
);
let html = match response.text() {
Ok(content) => {
info!("Body length: {}", content.len());
Html {
url: url.to_owned(),
content,
}
}
Err(_) => {
error!("Couldn't read the server response. Skipping...");
return;
}
};
if html.is_available() {
let video = html.parse().unwrap();
let result = sender.send(video);
if result.is_err() {
error!(
"Sending back from thread has failed because \"{}\".",
result.err().unwrap()
)
}
}
if pb.is_some() {
let mut pb = pb.as_ref().unwrap().lock().unwrap();
pb.inc();
}
}
fn fetch_chunk(
chunk: Vec<String>,
pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>>,
) -> Vec<Video> {
let (sender, receiver) = mpsc::channel();
info!("--- New pool ---");
let barrier = Arc::new(Barrier::new(chunk.len()));
let pool = ThreadPool::new(chunk.len());
for url in chunk {
let barrier = barrier.clone();
let s = sender.clone();
let mut local_pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;
if pb.is_some() {
local_pb = Some(pb.as_ref().unwrap().clone())
}
pool.execute(move || {
fetch_one(&url, &s, local_pb);
barrier.wait();
})
}
barrier.wait();
thread::sleep(Duration::from_millis(500));
let mut videos = vec![];
for v in receiver.try_iter() {
videos.push(v);
}
videos
}
let mut count = 0;
for ch in chunks.iter() {
count += ch.len();
}
let mut pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;
if progress_bar.unwrap_or(false) {
pb = Some(Arc::new(Mutex::new(ProgressBar::new(count as u64))));
}
let mut videos = vec![];
for ch in chunks.iter() {
let mut local_pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;
if pb.is_some() {
local_pb = Some(pb.as_ref().unwrap().clone());
}
let mut chunk_videos = fetch_chunk(ch.to_vec(), local_pb);
videos.append(&mut chunk_videos);
}
info!("Collected videos: {}", videos.len());
videos
}