use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
use tf_core::{ErrorStore, Generator, Pipeline, Video};
use tf_filter::{Filter, FilterGroup};
use async_trait::async_trait;
use tf_yt::YTPipeline;
use crate::{AnySubscriptionList, AnyVideo, AnyVideoFilter};
#[derive(Clone)]
pub struct Joiner {
subscription_list: AnySubscriptionList,
filters: Arc<Mutex<FilterGroup<AnyVideoFilter>>>,
#[cfg(feature = "youtube")]
yt_pipeline: YTPipeline,
#[cfg(feature = "peertube")]
pt_pipeline: Pipeline<tf_pt::PTSubscription, tf_pt::PTVideo>,
#[cfg(feature = "lbry")]
lbry_pipeline: Pipeline<tf_lbry::LbrySubscription, tf_lbry::LbryVideo>,
#[cfg(test)]
test_pipeline: Pipeline<tf_test::TestSubscription, tf_test::TestVideo>,
}
impl Joiner {
pub fn new() -> Self {
#[cfg(feature = "youtube")]
let yt_pipeline = YTPipeline::new();
#[cfg(feature = "peertube")]
let pt_pipeline = Pipeline::new();
#[cfg(feature = "lbry")]
let lbry_pipeline = Pipeline::new();
#[cfg(test)]
let test_pipeline = Pipeline::new();
let mut subscriptions = AnySubscriptionList::default();
#[cfg(feature = "youtube")]
subscriptions.yt_subscriptions(yt_pipeline.subscription_list());
#[cfg(feature = "peertube")]
subscriptions.pt_subscriptions(pt_pipeline.subscription_list());
#[cfg(feature = "lbry")]
subscriptions.lbry_subscriptions(lbry_pipeline.subscription_list());
#[cfg(test)]
subscriptions.test_subscriptions(test_pipeline.subscription_list());
Joiner {
subscription_list: subscriptions,
#[cfg(feature = "youtube")]
yt_pipeline,
#[cfg(feature = "peertube")]
pt_pipeline,
#[cfg(feature = "lbry")]
lbry_pipeline,
#[cfg(test)]
test_pipeline,
filters: Arc::new(Mutex::new(FilterGroup::new())),
}
}
pub fn subscription_list(&self) -> AnySubscriptionList {
self.subscription_list.clone()
}
pub fn filters(&self) -> Arc<Mutex<FilterGroup<AnyVideoFilter>>> {
self.filters.clone()
}
pub fn upgrade_video(&self, video: &AnyVideo) -> AnyVideo {
match video {
#[cfg(feature = "youtube")]
AnyVideo::Youtube(v) => self.yt_pipeline.upgrade_video(&v.lock().unwrap()).into(),
#[cfg(feature = "peertube")]
AnyVideo::Peertube(v) => self.pt_pipeline.upgrade_video(&v.lock().unwrap()).into(),
#[cfg(feature = "lbry")]
AnyVideo::Lbry(v) => self.lbry_pipeline.upgrade_video(&v.lock().unwrap()).into(),
#[cfg(test)]
AnyVideo::Test(v) => self.test_pipeline.upgrade_video(&v.lock().unwrap()).into(),
}
}
}
#[async_trait]
impl Generator for Joiner {
type Item = AnyVideo;
type Iterator = std::vec::IntoIter<AnyVideo>;
async fn generate(&self, errors: &ErrorStore) -> Self::Iterator {
let mut generators: Vec<
Pin<
Box<
dyn Future<Output = Box<dyn Iterator<Item = AnyVideo> + std::marker::Send>>
+ std::marker::Send,
>,
>,
> = vec![];
#[cfg(feature = "youtube")]
generators.push(Box::pin(async move {
let iter = self.yt_pipeline.generate(errors).await;
let iter_mapped = iter.map(|v| v.into());
Box::new(iter_mapped) as Box<dyn Iterator<Item = AnyVideo> + std::marker::Send>
}));
#[cfg(feature = "peertube")]
generators.push(Box::pin(async move {
let iter = self.pt_pipeline.generate(errors).await;
let iter_mapped = iter.map(|v| v.into());
Box::new(iter_mapped) as Box<dyn Iterator<Item = AnyVideo> + std::marker::Send>
}));
#[cfg(feature = "lbry")]
generators.push(Box::pin(async move {
let iter = self.lbry_pipeline.generate(errors).await;
let iter_mapped = iter.map(|v| v.into());
Box::new(iter_mapped) as Box<dyn Iterator<Item = AnyVideo> + std::marker::Send>
}));
#[cfg(test)]
generators.push(Box::pin(async {
let iter = self.test_pipeline.generate(errors).await;
let iter_mapped = iter.map(|v| v.into());
Box::new(iter_mapped) as Box<dyn Iterator<Item = AnyVideo> + std::marker::Send>
}));
let results = futures::future::join_all(generators).await;
let mut videos: Vec<AnyVideo> = results
.into_iter()
.map(|i| i.collect())
.collect::<Vec<Vec<AnyVideo>>>()
.concat();
videos
.iter()
.map(|v| v.subscription())
.for_each(|s| self.subscription_list.update(s));
videos.retain(|v| !self.filters.lock().unwrap().matches(v));
videos.sort_by_cached_key(|v| v.uploaded());
videos.reverse();
videos.into_iter()
}
}
impl Default for Joiner {
fn default() -> Self {
Joiner::new()
}
}