#![warn(clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::missing_panics_doc)]
#![allow(clippy::module_name_repetitions)]
pub mod args;
pub mod settings;
use crate::args::{Args, Setting};
use fetcher_config::tasks::{ParsedTask, ParsedTasks};
use fetcher_core::error::{Error, ErrorChainExt};
use color_eyre::{eyre::eyre, Report, Result};
use futures::{future::try_join_all, StreamExt};
use signal_hook::consts::TERM_SIGNALS;
use signal_hook_tokio::Signals;
use std::{
collections::HashMap,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use tokio::{
select,
sync::watch::{self, Receiver},
task::JoinHandle,
time::sleep,
};
use tracing::Instrument;
fn main() -> Result<()> {
set_up_logging()?;
async_main()?;
Ok(())
}
fn set_up_logging() -> Result<()> {
use tracing_subscriber::{
filter::LevelFilter, fmt::time::OffsetTime, layer::SubscriberExt, EnvFilter, Layer,
};
let env_filter = EnvFilter::try_from_env("FETCHER_LOG")
.unwrap_or_else(|_| EnvFilter::from("fetcher=info,fetcher_core=info"));
let stdout = tracing_subscriber::fmt::layer()
.pretty()
.with_timer(OffsetTime::local_rfc_3339().expect("could not get local time offset"));
let journald = if cfg!(debug_assertions) {
None
} else {
tracing_journald::layer().ok()
};
let subscriber = tracing_subscriber::registry()
.with(journald.with_filter(LevelFilter::INFO))
.with(stdout.with_filter(env_filter));
tracing::subscriber::set_global_default(subscriber).unwrap();
color_eyre::install()?;
Ok(())
}
#[tokio::main]
async fn async_main() -> Result<()> {
let args: Args = argh::from_env();
settings::DATA_PATH
.set(match args.data_path {
Some(p) => p,
None => settings::data::default_data_path()?,
})
.unwrap();
settings::CONF_PATHS
.set(match args.config_path {
Some(p) => vec![p],
None => settings::config::default_cfg_dirs()?,
})
.unwrap();
match args.subcommand {
args::TopLvlSubcommand::Run(arg) => {
run(
arg.once,
if arg.tasks.is_empty() {
None
} else {
Some(arg.tasks)
},
)
.await?;
}
args::TopLvlSubcommand::Save(save) => match save.setting {
Setting::GoogleOAuth2 => settings::data::google_oauth2::prompt().await?,
Setting::EmailPassword => settings::data::email_password::prompt()?,
Setting::Telegram => settings::data::telegram::prompt()?,
Setting::Twitter => settings::data::twitter::prompt()?,
},
}
Ok(())
}
async fn run(once: bool, run_by_name: Option<Vec<String>>) -> Result<()> {
let version = if std::env!("VERGEN_GIT_BRANCH") == "main" {
std::env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")
} else {
concat!(
"v",
std::env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT"),
"-",
std::env!("VERGEN_GIT_SHA_SHORT"),
" on branch ",
std::env!("VERGEN_GIT_BRANCH")
)
};
tracing::info!("Running fetcher {}", version);
let mut tasks = settings::config::tasks::get_all().await?;
if tasks.is_empty() {
tracing::info!("No enabled tasks provided");
return Ok(());
}
tracing::info!(
"Found {} enabled tasks: {:?}",
tasks.len(),
tasks.keys().collect::<Vec<_>>()
);
if let Some(run_by_name) = run_by_name {
let mut new_tasks = HashMap::new();
for name in run_by_name {
let task = tasks.remove(&name);
match task {
Some(task) => {
new_tasks.insert(name, task);
}
None => {
return Err(eyre!(
"Task {name} not found. All available tasks: {:?}",
tasks.keys().collect::<Vec<_>>()
));
}
}
}
tracing::info!("Running tasks {:?}", new_tasks.keys().collect::<Vec<_>>());
tasks = new_tasks;
}
let (shutdown_tx, shutdown_rx) = watch::channel(());
let sig = Signals::new(TERM_SIGNALS).expect("Error registering signals");
let sig_handle = sig.handle();
let sig_term_now = Arc::new(AtomicBool::new(false));
for s in TERM_SIGNALS {
use signal_hook::flag;
flag::register_conditional_shutdown(
*s,
1,
Arc::clone(&sig_term_now),
)
.expect("Error registering signal handler");
flag::register(*s, Arc::clone(&sig_term_now)).expect("Error registering signal handler");
}
let sig_task = tokio::spawn(async move {
let mut sig = sig.fuse();
while sig.next().await.is_some() {
shutdown_tx
.send(())
.expect("Error broadcasting signal to tasks");
}
Ok::<(), Report>(())
});
run_tasks(tasks, shutdown_rx, once).await?;
sig_handle.close(); sig_task
.await
.expect("Error shutting down of signal handler")?;
Ok(())
}
async fn run_tasks(tasks: ParsedTasks, shutdown_rx: Receiver<()>, once: bool) -> Result<()> {
let mut running_tasks = Vec::new();
for (name, mut t) in tasks {
let name2 = name.clone();
let mut shutdown_rx = shutdown_rx.clone();
let task_handle = tokio::spawn(
async move {
tracing::trace!("Task {} contents: {:#?}", name, t);
let res = select! {
r = task_loop(&mut t, once) => r,
_ = shutdown_rx.changed() => Ok(()),
};
if !cfg!(debug_assertions) {
if let Err(err) = &res {
if let Err(e) = report_error(&name, &err.display_chain()).await {
tracing::error!("Unable to send error report to the admin: {e:?}",);
}
}
}
tracing::info!("Task {name} shut down...");
res
}
.instrument(tracing::info_span!("task", name = name2.as_str())),
);
running_tasks.push(flatten_task_result(task_handle));
}
try_join_all(running_tasks).await?;
Ok(())
}
async fn task_loop(t: &mut ParsedTask, once: bool) -> Result<(), Error> {
const TRANSFORM_ERR_MAX_COUNT: u32 = 5;
let mut transform_err_count = 0;
loop {
match fetcher_core::run_task(&mut t.inner).await {
Ok(()) => (),
Err(Error::Transform(transform_err)) => {
transform_err_count += 1;
if transform_err_count > TRANSFORM_ERR_MAX_COUNT {
return Err(Error::Transform(transform_err));
}
tracing::error!(
"Transform error ({transform_err_count} out of {TRANSFORM_ERR_MAX_COUNT}): {}",
transform_err.display_chain()
);
}
Err(e) => {
if let Some(network_err) = e.is_connection_error() {
tracing::warn!("Network error: {}", network_err.display_chain());
} else {
return Err(e);
}
}
}
if once {
break;
}
tracing::debug!("Sleeping for {time}m", time = t.refresh);
sleep(Duration::from_secs(t.refresh * 60 )).await;
}
Ok(())
}
async fn report_error(task_name: &str, err: &str) -> Result<()> {
use fetcher_core::sink::telegram::LinkLocation;
use fetcher_core::sink::Message;
use fetcher_core::sink::Telegram;
let admin_chat_id = match std::env::var("FETCHER_TELEGRAM_ADMIN_CHAT_ID")?.parse::<i64>() {
Ok(num) => num,
Err(e) => {
return Err(eyre!(
"FETCHER_TELEGRAM_ADMIN_CHAT_ID isn't a valid chat id ({e})"
));
}
};
let bot = match settings::data::telegram::get()? {
Some(b) => b,
None => {
return Err(eyre!("Telegram bot token not provided"));
}
};
let msg = Message {
body: Some(err.to_owned()),
..Default::default()
};
Telegram::new(bot, admin_chat_id, LinkLocation::default())
.send(msg, Some(task_name))
.await
.map_err(fetcher_core::error::Error::Sink)?;
Ok(())
}
async fn flatten_task_result<T, E>(h: JoinHandle<Result<T, E>>) -> Result<T, E> {
match h.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
e => e.expect("Thread panicked"),
}
}