cyndikator 0.2.2

A cli rss reader
use crate::db::Database;
use crate::fetcher::Fetcher;
use ticker::Ticker;
use tracker::{Trackee, Tracker};

use chrono::Local;
use futures::{future::join_all, select, StreamExt};
use log::{debug, error, info, trace};
use std::time::Duration;
use url::Url;

use cyndikator_dispatch::{Dispatch, Event};
use perform::Invoker;

mod perform;
mod ticker;
mod tracker;

pub struct Daemon {
    db: Database,
    dispatch: Dispatch,
    tick: usize,
}

impl Daemon {
    pub fn new(db: Database, dispatch: Dispatch, tick: usize) -> Daemon {
        Daemon { db, dispatch, tick }
    }

    pub async fn run(mut self) -> eyre::Result<()> {
        let mut tracker = Tracker::default();
        let mut ticker = Ticker::new(Duration::from_secs(self.tick as u64));

        let mut fetches = Vec::new();
        for feed in self.db.tracking()? {
            info!("fetching {} ...", &feed.url);
            let url = match Url::parse(&feed.url) {
                Ok(url) => url,
                Err(_) => continue,
            };

            if feed.last_fetch.is_none() {
                fetches.push(self.fetch(url.clone()));
            }

            info!("tracking {}", &feed.title);

            tracker.track(Trackee {
                url,
                last: feed.last_fetch.unwrap_or_else(Local::now),
                ttl: feed.ttl.unwrap_or(60),
            })
        }

        let (events, errs) = sep(join_all(fetches).await);
        for (url, err) in errs {
            error!("error while tracking feed '{}': {}", url, err);
        }

        self.dispatch(events).await;

        loop {
            select! {
                now = ticker.next() => {
                    trace!("tick");
                    let now = if let Some(now) = now { now } else { continue };
                    debug!("checking expired {}", now);

                    let expired = tracker.expired(&now);

                    let mut fetches = Vec::new();
                    for mut trackee in expired {
                        info!("fetching {}", trackee.url);
                        fetches.push(self.fetch(trackee.url.clone()));

                        trackee.fetched(&now);
                        tracker.track(trackee);
                    }

                    let (events, errs) = sep(join_all(fetches).await);
                    for (url, err) in errs {
                        error!("{} {}", err, url);
                    }


                    self.dispatch(events).await;
                }

            }
        }
    }

    async fn fetch(&self, url: Url) -> (Url, eyre::Result<Vec<Event>>) {
        let mut fetcher = Fetcher::new(&url);
        let events = fetcher.events().await;
        (url, events)
    }

    async fn dispatch(&mut self, feeds: Vec<(Url, Vec<Event>)>) {
        for (url, events) in feeds {
            let last_fetch = self.db.last_fetch(url.as_str()).ok();

            for event in events {
                match (&last_fetch, &event.date) {
                    (Some(lf), Some(pd)) if lf >= pd => {
                        debug!(
                            "skipping {} {}",
                            &event.feed_title.as_deref().unwrap_or("''"),
                            event.title.as_deref().unwrap_or("''")
                        );

                        continue;
                    }

                    _ => (),
                };

                debug!(
                    "dispatching event {} {} {} {}",
                    event.feed_title.as_deref().unwrap_or("''"),
                    event.title.as_deref().unwrap_or("''"),
                    event.feed_url,
                    event.url.as_deref().unwrap_or("''"),
                );

                let actions = self.dispatch.dispatch(&event);

                let mut invoker = Invoker::new(&mut self.db);
                for action in &actions {
                    invoker.invoke(action, &event);
                }
            }

            if let Err(err) = self.db.mark_clean(url.as_str()) {
                error!("failed to mark {} as clean: {}", &url, err);
            }
        }
    }
}

type ResSplit<S, T, E> = (Vec<(S, T)>, Vec<(S, E)>);
fn sep<S, T, E>(v: Vec<(S, Result<T, E>)>) -> ResSplit<S, T, E> {
    let mut ts = Vec::new();
    let mut es = Vec::new();

    for (s, e) in v.into_iter() {
        match e {
            Ok(t) => ts.push((s, t)),
            Err(e) => es.push((s, e)),
        }
    }

    (ts, es)
}