pub mod models;
pub use models::*;
use sqlx::MySql;
use sqlx::Pool;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Transaction;
type PgTransaction = Transaction<'static, MySql>;
use crate::core::error2::Error;
use crate::core::error2::Result;
pub async fn insert_newsletter_issue(
transaction: &mut Transaction<'_, MySql>,
title: &str,
html_content: &str,
) -> Result<uuid::Uuid> {
let newsletter_issue_id = uuid::Uuid::now_v7();
let _ = sqlx::query(
"INSERT INTO t_newsletter_issues (
newsletter_issue_id,
title,
html_content,
published_at
)
VALUES (?, ?, ?, now())",
)
.bind(newsletter_issue_id.to_string())
.bind(title)
.bind(html_content)
.execute(&mut **transaction)
.await?;
log::info!("newsletter_issue_id: {}", newsletter_issue_id);
Ok(newsletter_issue_id)
}
pub async fn enqueue_delivery_tasks(
transaction: &mut Transaction<'_, MySql>,
newsletter_issue_id: uuid::Uuid,
) -> Result<()> {
let _ = sqlx::query(
"INSERT INTO t_issue_delivery_queue (
newsletter_issue_id,
subscriber_email
)
SELECT ?, email
FROM subscriptions
WHERE subscribe_status = 'confirmed'",
)
.bind(newsletter_issue_id.to_string())
.execute(&mut **transaction)
.await?;
Ok(())
}
pub async fn insert_subscriber(
_transaction: &mut Transaction<'_, MySql>,
new_subscriber: &NewSubscriber,
) -> Result<uuid::Uuid> {
let subscriber_id = uuid::Uuid::now_v7();
let _ = sqlx::query(
"INSERT INTO subscriptions (id, email, name, subscribed_at, subscribe_status) \
VALUES (?, ?, ?, ?, ?)",
)
.bind(subscriber_id.to_string())
.bind(new_subscriber.email.as_ref())
.bind(new_subscriber.name.as_ref())
.bind(chrono::Utc::now())
.bind("pending")
.execute(&mut **_transaction)
.await?;
Ok(subscriber_id)
}
pub async fn store_token(
_transaction: &mut Transaction<'_, MySql>,
subscriber_id: uuid::Uuid,
subscription_token: &str,
) -> Result<()> {
let _ = sqlx::query(
"INSERT INTO subscription_tokens (subscription_token, subscriber_id) \
VALUES (?, ?)",
)
.bind(subscription_token)
.bind(subscriber_id.to_string())
.execute(&mut **_transaction)
.await?;
Ok(())
}
pub async fn confirm_subscriber(pool: &Pool<MySql>, subscriber_id: uuid::Uuid) -> Result<()> {
let _ = sqlx::query("UPDATE subscriptions SET subscribe_status = ? WHERE id = ?")
.bind("confirmed")
.bind(subscriber_id)
.execute(pool)
.await
.map_err(|e| {
log::error!("Failed to execute query: {:?}", e);
e
})?;
Ok(())
}
pub async fn get_subscriber_id_from_token(
pool: &Pool<MySql>,
subscription_token: &str,
) -> Result<Option<uuid::Uuid>> {
let mut query_builder: QueryBuilder<MySql> =
QueryBuilder::new("SELECT subscriber_id FROM subscription_tokens WHERE 1=1");
query_builder.push(" and subscription_token = ");
query_builder.push_bind(subscription_token);
let result = query_builder
.build()
.fetch_optional(pool)
.await
.map_err(|e| {
log::error!("Failed to execute query: {:?}", e);
e
})?;
if let Some(row) = result {
let subscriber_id: String = row.get("subscriber_id");
Ok(Some(uuid::Uuid::parse_str(&subscriber_id).unwrap()))
} else {
Ok(None)
}
}
pub async fn get_confirmed_subscribers(
pool: &Pool<MySql>,
) -> Result<Vec<Result<ConfirmedSubscriber>>> {
let mut query_builder: QueryBuilder<MySql> =
QueryBuilder::new("SELECT email FROM subscriptions WHERE 1=1");
query_builder.push(" and subscribe_status = ");
query_builder.push_bind("confirmed");
let confirmed_subscribers = query_builder
.build()
.fetch_all(pool)
.await?
.into_iter()
.map(|r| {
let email: String = r.get("email");
match SubscriberEmail::parse(email) {
Ok(email) => Ok(ConfirmedSubscriber { email }),
Err(error) => Err(Error::throw("", Some(error))),
}
})
.collect();
Ok(confirmed_subscribers)
}
pub async fn dequeue_task(
pool: &Pool<MySql>,
) -> Result<Option<(PgTransaction, uuid::Uuid, String)>> {
let mut transaction = pool.begin().await?;
let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
r#"
SELECT newsletter_issue_id, subscriber_email
FROM t_issue_delivery_queue
LIMIT 1 FOR UPDATE SKIP LOCKED
"#,
);
let r = query_builder
.build()
.fetch_optional(&mut *transaction)
.await?;
if let Some(r) = r {
let newsletter_issue_id: String = r.get("newsletter_issue_id");
let subscriber_email: String = r.get("subscriber_email");
Ok(Some((
transaction,
uuid::Uuid::parse_str(&newsletter_issue_id).unwrap(),
subscriber_email,
)))
} else {
Ok(None)
}
}
pub async fn delete_task(
mut transaction: PgTransaction,
issue_id: uuid::Uuid,
email: &str,
) -> Result<()> {
let mut query_builder: QueryBuilder<MySql> =
QueryBuilder::new(r#"DELETE FROM t_issue_delivery_queue WHERE 1=1"#);
query_builder.push(" and newsletter_issue_id = ");
query_builder.push_bind(issue_id);
query_builder.push(" and subscriber_email = ");
query_builder.push_bind(email);
query_builder.build().execute(&mut *transaction).await?;
transaction.commit().await?;
Ok(())
}
pub async fn get_issue(pool: &Pool<MySql>, issue_id: uuid::Uuid) -> Result<NewsletterIssue> {
let mut query_builder: QueryBuilder<MySql> =
QueryBuilder::new("SELECT title, html_content FROM t_newsletter_issues WHERE 1=1");
query_builder.push(" and newsletter_issue_id = ");
query_builder.push_bind(issue_id);
let r = query_builder.build().fetch_optional(pool).await?.unwrap();
let title: String = r.get("title");
let html_content: String = r.get("html_content");
Ok(NewsletterIssue {
title,
html_content,
})
}