zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use std::thread::sleep;
use std::time::Duration;

use sqlx::MySql;
use sqlx::Pool;
use sqlx::QueryBuilder;
use sqlx::Row;

use crate::idempotency::IdempotencyKey;

use actix_web::body::to_bytes;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;

use crate::commons::parse_json_string;

use crate::core::error2::Error;
use crate::core::error2::Result;

#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct HeaderPairRecord {
    name: String,
    value: String,
}

// READ COMMITTED is the default isolation level in Postgres.
// SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began;
// it never sees either uncommitted data or changes committed during query execution by concurrent transactions.
// In effect, a SELECT query sees a snapshot of the database as of the instant the query begins to run.

// UPDATE, DELETE, SELECT FOR UPDATE [...] will only find target rows that were committed as of the command start time.
// However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found.
// In this case, the would-be updater will wait for the first updating transaction to commit or roll back (if it is still in progress).

// repeatable read is designed to prevent non-repeatable reads (who would have guessed?): the same SELECT query,
// if run twice in a row within the same transaction, should return the same data.
// This has consequences for statements such as UPDATE: if they are executed within a repeatable read transaction,
// they cannot modify or lock rows changed by other transactions after the repeatable read transaction began.
pub async fn try_processing<'a>(
    pool: &'a Pool<MySql>,
    idempotency_key: &'a IdempotencyKey,
    user_id: String,
) -> Result<crate::idempotency::NextAction<'a>> {
    let transaction = pool.begin().await?;

    // sqlx::query!("SET TRANSACTION ISOLATION LEVEL repeatable read")
    //     .execute(&mut transaction)
    //     .await
    //     .map_err(|e| {
    //         log::error!("error1: error={:?}", e);
    //         ThrowError::UnexpectedError(anyhow::anyhow!(e))
    //     })?;

    // let n_inserted_rows = sqlx::query!(
    //     r#"
    //     INSERT INTO t_idempotency (user_id, idempotency_key, created_at)
    //     VALUES (?, ?, now()) ON DUPLICATE KEY UPDATE created_at = now()
    //     "#,
    //     user_id.to_owned(),
    //     idempotency_key.as_ref()
    // )
    // .execute(pool)
    // .await
    // .map_err(|e| {
    //     log::error!("error1: error={:?}", e);
    //     ThrowError::UnexpectedError(anyhow::anyhow!(e))
    // })?
    // .rows_affected();

    let n_inserted_rows = sqlx::query(
        r#"
        INSERT INTO t_idempotency (user_id, idempotency_key, created_at)
        VALUES (?, ?, now()) ON DUPLICATE KEY UPDATE created_at = now()
        "#,
    )
    .bind(user_id.to_owned())
    .bind(idempotency_key.as_ref())
    .execute(pool)
    .await
    .map_err(|e| {
        log::error!("error1: error={:?}", e);
        Error::UnexpectedError(anyhow::anyhow!(e))
    })?
    .rows_affected();

    if n_inserted_rows > 0 {
        Ok(crate::idempotency::NextAction::StartProcessing(transaction))
    } else {
        sleep(Duration::from_millis(2000));

        let saved_response = get_saved_response(pool, idempotency_key, user_id)
            .await?
            .ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?;
        Ok(crate::idempotency::NextAction::ReturnSavedResponse(
            saved_response,
        ))
    }
}

pub async fn save_response(
    pool: &Pool<MySql>,
    idempotency_key: &IdempotencyKey,
    user_id: String,
    http_response: HttpResponse,
) -> Result<HttpResponse> {
    let (response_head, body) = http_response.into_parts();

    // `MessageBody::Error` is not `Send` + `Sync`, therefore it doesn't play nicely with `anyhow`
    let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
    let status_code = response_head.status().as_u16() as i16;
    let _headers = {
        let mut h = Vec::with_capacity(response_head.headers().len());
        for (name, value) in response_head.headers().iter() {
            let name = name.as_str().to_owned();
            let value = value.to_str().map_err(Error::run_time)?.to_string();
            h.push(HeaderPairRecord { name, value });
        }
        h
    };

    let _headers_json_string = serde_json::to_string(&_headers)?;

    let _ = sqlx::query(
        r#"
        UPDATE t_idempotency
        SET
            response_status_code = ?,
            response_headers = ?,
            response_body = ?
        WHERE
            user_id = ? AND
            idempotency_key = ?
        "#,
    )
    .bind(status_code)
    .bind(_headers_json_string)
    .bind(body.as_ref())
    .bind(user_id)
    .bind(idempotency_key.as_ref())
    .execute(pool)
    .await?;

    // We need `.map_into_boxed_body` to go from `HttpResponse<Bytes>` to `HttpResponse<BoxBody>`
    let http_response = response_head.set_body(body).map_into_boxed_body();
    Ok(http_response)
}

pub async fn get_saved_response(
    pool: &Pool<MySql>,
    idempotency_key: &IdempotencyKey,
    user_id: String,
) -> Result<Option<HttpResponse>> {
    // let saved_response = sqlx::query!(
    //     r#"
    //     SELECT
    //         response_status_code,
    //         response_headers,
    //         response_body
    //     FROM t_idempotency
    //     WHERE
    //       user_id = ? AND
    //       idempotency_key = ?
    //     "#,
    //     user_id.to_owned(),
    //     idempotency_key.as_ref()
    // )
    // .fetch_optional(pool)
    // .await?;

    let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
        "SELECT
                response_status_code,
                response_headers,
                response_body
            FROM t_idempotency
            WHERE 1=1",
    );

    query_builder.push(" and user_id = ");
    query_builder.push_bind(user_id.to_owned());

    query_builder.push(" and idempotency_key = ");
    query_builder.push_bind(idempotency_key.as_ref());

    let saved_response = query_builder
        .build()
        .fetch_optional(pool)
        .await
        .map_err(Error::run_time)?;

    if let Some(r) = saved_response {
        let response_status_code: u16 = r.get("response_status_code");
        let response_headers: String = r.get("response_headers");
        let response_body: String = r.get("response_body");

        let status_code = StatusCode::from_u16(response_status_code).map_err(Error::run_time)?;
        let mut response = HttpResponse::build(status_code);

        let _header_string = response_headers;
        let response_headers = parse_json_string::<Vec<HeaderPairRecord>>(&_header_string)?;

        for HeaderPairRecord { name, value } in response_headers {
            response.append_header((name, value));
        }

        Ok(Some(response.body(response_body)))
    } else {
        Ok(None)
    }
}