use sqlx::MySql;
use sqlx::Pool;
use crate::core::http_clients::EmailClient;
use crate::services::newsletter_service::SubscriberEmail;
use crate::services::newsletter_service::delete_task;
use crate::services::newsletter_service::dequeue_task;
use crate::services::newsletter_service::get_issue;
pub enum ExecutionOutcome {
TaskCompleted,
EmptyQueue,
}
pub async fn run_worker_until_stopped(
configuration: crate::core::config::Settings,
) -> Result<(), anyhow::Error> {
let mysql_settings = crate::core::mysql::MysqlSettings::get(&configuration.mysql);
let mysql_client = crate::core::mysql::create_pool(&mysql_settings).unwrap();
let email_client = EmailClient::new();
worker_loop(mysql_client.pool, email_client).await
}
pub async fn worker_loop(
pool: Pool<MySql>,
email_client: EmailClient,
) -> Result<(), anyhow::Error> {
loop {
match try_execute_task(&pool, &email_client).await {
Ok(ExecutionOutcome::EmptyQueue) => {
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
Err(_) => {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(ExecutionOutcome::TaskCompleted) => {}
}
}
}
pub async fn try_execute_task(
pool: &Pool<MySql>,
email_client: &EmailClient,
) -> Result<ExecutionOutcome, anyhow::Error> {
let task = dequeue_task(pool).await?;
if task.is_none() {
return Ok(ExecutionOutcome::EmptyQueue);
}
let (transaction, issue_id, email) = task.unwrap();
match SubscriberEmail::parse(email.clone()) {
Ok(email) => {
let issue = get_issue(pool, issue_id).await?;
if let Err(e) = email_client
.send_email(&email, &issue.title, &issue.html_content)
.await
{
log::error!(
"给订阅者发送邮件失败, receiver={}, error_message={:?}",
&email,
e
);
}
}
Err(e) => {
log::error!(
"Skipping a confirmed subscriber. \
Their stored contact details are invalid. email={}, error_message={:?}",
&email,
e
);
}
}
delete_task(transaction, issue_id, &email).await?;
Ok(ExecutionOutcome::TaskCompleted)
}