zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
pub mod models;

pub use models::*;

use sqlx::MySql;
use sqlx::Pool;
use sqlx::QueryBuilder;
use sqlx::Row;

use sqlx::Transaction;

type PgTransaction = Transaction<'static, MySql>;

use crate::core::error2::Error;
use crate::core::error2::Result;

pub async fn insert_newsletter_issue(
    transaction: &mut Transaction<'_, MySql>,
    title: &str,
    html_content: &str,
) -> Result<uuid::Uuid> {
    let newsletter_issue_id = uuid::Uuid::now_v7();

    let _ = sqlx::query(
        "INSERT INTO t_newsletter_issues (
            newsletter_issue_id,
            title,
            html_content,
            published_at
        )
        VALUES (?, ?, ?, now())",
    )
    .bind(newsletter_issue_id.to_string())
    .bind(title)
    .bind(html_content)
    .execute(&mut **transaction)
    .await?;

    log::info!("newsletter_issue_id: {}", newsletter_issue_id);

    Ok(newsletter_issue_id)
}

pub async fn enqueue_delivery_tasks(
    transaction: &mut Transaction<'_, MySql>,
    newsletter_issue_id: uuid::Uuid,
) -> Result<()> {
    let _ = sqlx::query(
        "INSERT INTO t_issue_delivery_queue (
            newsletter_issue_id,
            subscriber_email
        )
        SELECT ?, email
        FROM subscriptions
        WHERE subscribe_status = 'confirmed'",
    )
    .bind(newsletter_issue_id.to_string())
    .execute(&mut **transaction)
    .await?;

    Ok(())
}

pub async fn insert_subscriber(
    _transaction: &mut Transaction<'_, MySql>,
    new_subscriber: &NewSubscriber,
) -> Result<uuid::Uuid> {
    let subscriber_id = uuid::Uuid::now_v7();

    let _ = sqlx::query(
        "INSERT INTO subscriptions (id, email, name, subscribed_at, subscribe_status) \
         VALUES (?, ?, ?, ?, ?)",
    )
    .bind(subscriber_id.to_string())
    .bind(new_subscriber.email.as_ref())
    .bind(new_subscriber.name.as_ref())
    .bind(chrono::Utc::now())
    .bind("pending")
    .execute(&mut **_transaction)
    .await?;

    Ok(subscriber_id)
}

pub async fn store_token(
    _transaction: &mut Transaction<'_, MySql>,
    subscriber_id: uuid::Uuid,
    subscription_token: &str,
) -> Result<()> {
    let _ = sqlx::query(
        "INSERT INTO subscription_tokens (subscription_token, subscriber_id) \
        VALUES (?, ?)",
    )
    .bind(subscription_token)
    .bind(subscriber_id.to_string())
    .execute(&mut **_transaction)
    .await?;

    Ok(())
}

pub async fn confirm_subscriber(pool: &Pool<MySql>, subscriber_id: uuid::Uuid) -> Result<()> {
    let _ = sqlx::query("UPDATE subscriptions SET subscribe_status = ? WHERE id = ?")
        .bind("confirmed")
        .bind(subscriber_id)
        .execute(pool)
        .await
        .map_err(|e| {
            log::error!("Failed to execute query: {:?}", e);
            e
        })?;

    Ok(())
}

pub async fn get_subscriber_id_from_token(
    pool: &Pool<MySql>,
    subscription_token: &str,
) -> Result<Option<uuid::Uuid>> {
    let mut query_builder: QueryBuilder<MySql> =
        QueryBuilder::new("SELECT subscriber_id FROM subscription_tokens WHERE 1=1");

    query_builder.push(" and subscription_token = ");
    query_builder.push_bind(subscription_token);

    let result = query_builder
        .build()
        .fetch_optional(pool)
        .await
        .map_err(|e| {
            log::error!("Failed to execute query: {:?}", e);
            e
        })?;

    if let Some(row) = result {
        let subscriber_id: String = row.get("subscriber_id");
        Ok(Some(uuid::Uuid::parse_str(&subscriber_id).unwrap()))
    } else {
        Ok(None)
    }
}

pub async fn get_confirmed_subscribers(
    pool: &Pool<MySql>,
) -> Result<Vec<Result<ConfirmedSubscriber>>> {
    let mut query_builder: QueryBuilder<MySql> =
        QueryBuilder::new("SELECT email FROM subscriptions WHERE 1=1");

    query_builder.push(" and subscribe_status = ");
    query_builder.push_bind("confirmed");

    let confirmed_subscribers = query_builder
        .build()
        .fetch_all(pool)
        .await?
        .into_iter()
        .map(|r| {
            let email: String = r.get("email");

            match SubscriberEmail::parse(email) {
                Ok(email) => Ok(ConfirmedSubscriber { email }),
                Err(error) => Err(Error::throw("", Some(error))),
            }
        })
        .collect();

    Ok(confirmed_subscribers)
}

pub async fn dequeue_task(
    pool: &Pool<MySql>,
) -> Result<Option<(PgTransaction, uuid::Uuid, String)>> {
    let mut transaction = pool.begin().await?;

    let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
        r#"
        SELECT newsletter_issue_id, subscriber_email
        FROM t_issue_delivery_queue
        LIMIT 1 FOR UPDATE SKIP LOCKED
        "#,
    );

    let r = query_builder
        .build()
        .fetch_optional(&mut *transaction)
        .await?;

    if let Some(r) = r {
        let newsletter_issue_id: String = r.get("newsletter_issue_id");
        let subscriber_email: String = r.get("subscriber_email");

        Ok(Some((
            transaction,
            uuid::Uuid::parse_str(&newsletter_issue_id).unwrap(),
            subscriber_email,
        )))
    } else {
        Ok(None)
    }
}

pub async fn delete_task(
    mut transaction: PgTransaction,
    issue_id: uuid::Uuid,
    email: &str,
) -> Result<()> {
    let mut query_builder: QueryBuilder<MySql> =
        QueryBuilder::new(r#"DELETE FROM t_issue_delivery_queue WHERE 1=1"#);

    query_builder.push(" and newsletter_issue_id = ");
    query_builder.push_bind(issue_id);

    query_builder.push(" and subscriber_email = ");
    query_builder.push_bind(email);

    query_builder.build().execute(&mut *transaction).await?;

    transaction.commit().await?;

    Ok(())
}

pub async fn get_issue(pool: &Pool<MySql>, issue_id: uuid::Uuid) -> Result<NewsletterIssue> {
    let mut query_builder: QueryBuilder<MySql> =
        QueryBuilder::new("SELECT title, html_content FROM t_newsletter_issues WHERE 1=1");

    query_builder.push(" and newsletter_issue_id = ");
    query_builder.push_bind(issue_id);

    let r = query_builder.build().fetch_optional(pool).await?.unwrap();

    let title: String = r.get("title");
    let html_content: String = r.get("html_content");

    Ok(NewsletterIssue {
        title,
        html_content,
    })
}