ytitler 0.0.4

Fetch YouTube videos info and save the result to playlist(s) (m3u files).
// External.
use std::sync::{mpsc, Arc, Barrier, Mutex};
use std::thread;
use std::time::Duration;
use std::io;

use pbr::ProgressBar;
use threadpool::ThreadPool;
use regex::Regex;
use chrono::prelude::NaiveDate;


#[derive(Debug)]
enum ParseError {
    Title,
    Duration,
    Channel,
    Date,
}


// Represents video title.
pub struct Title {
    pub content: String,
}


impl From<String> for Title {
    // Parses and cleans video title.
    fn from(title: String) -> Self {

        let mut title = title.trim().to_string();

        // Remove the " - YouTube" postfix - if found.
        let postfix = " - YouTube";

        if title.ends_with(postfix) {

            for _ in 0..postfix.len() {
                title.pop();
            }
        }

        // Return new instance.
        Title { content: title }
    }
}


// Represents plain HTML downloaded by HTTP client.
struct Html {
    url: String,
    content: String,
}


impl Html {
    // Checks if the video is available (playable).
    fn is_available(&self) -> bool {

        self.content.contains("'IS_UNAVAILABLE_PAGE': false")
    }

    // Parses HTML and returns new Video struct with all
    // the parsed details.
    fn parse(&self) -> Result<Video, ParseError> {

        info!("URL: {}", self.url);

        // Parse title.
        let re = Regex::new(r"<title>(.+)</title>").unwrap();
        let title = re.captures(self.content.as_str())
            .ok_or(ParseError::Title)?
            .get(1)
            .ok_or(ParseError::Title)?
            .as_str()
            .to_owned();

        let title = Title::from(title);

        // Parse duration.
        let re = Regex::new(r#"lengthSeconds\\":\\"(\d+)\\""#).unwrap();
        let duration: Duration = Duration::from_secs(re.captures(self.content.as_str())
            .ok_or(ParseError::Duration)?
            .get(1)
            .ok_or(ParseError::Duration)?
            .as_str()
            .parse::<u64>()
            .map_err(|_| ParseError::Duration)?);

        // Parse channel name.
        let re = Regex::new(r#"author\\":\\"(.+?)\\""#).unwrap();
        let channel = re.captures(self.content.as_str())
            .ok_or(ParseError::Channel)?
            .get(1)
            .ok_or(ParseError::Channel)?
            .as_str()
            .to_owned();

        // Parse published date.
        let re = Regex::new(r#"datePublished" content="(.+?)""#).unwrap();
        let published = NaiveDate::parse_from_str(
            re.captures(self.content.as_str())
                .ok_or(ParseError::Date)?
                .get(1)
                .ok_or(ParseError::Date)?
                .as_str(),
            "%Y-%m-%d",
        ).map_err(|_| ParseError::Date)?;

        // Constructs Video instance and returns it back.
        Ok(Video {
            url: self.url.to_owned(),
            title,
            duration,
            channel,
            published,
        })
    }
}


pub struct Video {
    pub url: String,
    pub title: Title,
    pub duration: Duration,
    pub channel: String,
    pub published: NaiveDate,
}


pub fn fetch(chunks: Vec<Vec<String>>, progress_bar: Option<bool>) -> Vec<Video> {

    fn fetch_one(
        url: &str,
        sender: &mpsc::Sender<Video>,
        pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>>,
    ) {

        // Fetch URL and check the response before next step.
        let mut response = match reqwest::get(url) {
            Ok(res) => res,
            Err(e) => {
                error!(
                    "URL \"{}\" couldn't be fetched because of \"{}\" reason. Skipping...",
                    url,
                    e
                );

                return;
            }
        };

        // Log response code.
        if !response.status().is_success() {
            error!(
                "URL \"{}\" returned wrong response code \"{}\". Skipping...",
                url,
                response.status().as_str()
            )
        }

        info!(
            "URL \"{}\" returned ok response code \"{}\"",
            url,
            response.status().as_str()
        );

        // Process response.
        let html = match response.text() {
            Ok(content) => {
                info!("Body length: {}", content.len());
                Html {
                    url: url.to_owned(),
                    content,
                }
            }
            Err(_) => {
                error!("Couldn't read the server response. Skipping...");
                return;
            }
        };

        // Send video to channel.
        if html.is_available() {

            let video = html.parse().unwrap();
            let result = sender.send(video);

            if result.is_err() {
                error!(
                    "Sending back from thread has failed because \"{}\".",
                    result.err().unwrap()
                )
            }
        }

        // If progress bar was given advance it's state.
        if pb.is_some() {
            let mut pb = pb.as_ref().unwrap().lock().unwrap();
            pb.inc();
        }
    }

    fn fetch_chunk(
        chunk: Vec<String>,
        pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>>,
    ) -> Vec<Video> {

        let (sender, receiver) = mpsc::channel();

        info!("--- New pool ---");
        let barrier = Arc::new(Barrier::new(chunk.len()));
        let pool = ThreadPool::new(chunk.len());

        for url in chunk {
            let barrier = barrier.clone();
            let s = sender.clone();

            // Progress bar?
            let mut local_pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;

            if pb.is_some() {
                local_pb = Some(pb.as_ref().unwrap().clone())
            }

            // ThreadPool + request.
            pool.execute(move || {
                fetch_one(&url, &s, local_pb);
                barrier.wait();
            })
        }

        // Wait for all the threads.
        barrier.wait();

        // Wait before next run - 0.5 sec.
        thread::sleep(Duration::from_millis(500));

        // Return collected videos.
        let mut videos = vec![];

        for v in receiver.try_iter() {
            videos.push(v);
        }

        videos
    }

    // Count urls.
    let mut count = 0;

    for ch in chunks.iter() {
        count += ch.len();
    }

    // Set up progress bar.
    let mut pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;

    if progress_bar.unwrap_or(false) {
        pb = Some(Arc::new(Mutex::new(ProgressBar::new(count as u64))));
    }

    // Fetch chunks.
    let mut videos = vec![];

    for ch in chunks.iter() {

        // Progress bar?
        let mut local_pb: Option<Arc<Mutex<ProgressBar<io::Stdout>>>> = None;

        if pb.is_some() {
            local_pb = Some(pb.as_ref().unwrap().clone());
        }

        let mut chunk_videos = fetch_chunk(ch.to_vec(), local_pb);
        videos.append(&mut chunk_videos);
    }

    info!("Collected videos: {}", videos.len());

    videos
}