use std::thread::sleep;
use std::time::Duration;
use sqlx::MySql;
use sqlx::Pool;
use sqlx::QueryBuilder;
use sqlx::Row;
use crate::idempotency::IdempotencyKey;
use actix_web::body::to_bytes;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use crate::commons::parse_json_string;
use crate::core::error2::Error;
use crate::core::error2::Result;
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct HeaderPairRecord {
name: String,
value: String,
}
pub async fn try_processing<'a>(
pool: &'a Pool<MySql>,
idempotency_key: &'a IdempotencyKey,
user_id: String,
) -> Result<crate::idempotency::NextAction<'a>> {
let transaction = pool.begin().await?;
let n_inserted_rows = sqlx::query(
r#"
INSERT INTO t_idempotency (user_id, idempotency_key, created_at)
VALUES (?, ?, now()) ON DUPLICATE KEY UPDATE created_at = now()
"#,
)
.bind(user_id.to_owned())
.bind(idempotency_key.as_ref())
.execute(pool)
.await
.map_err(|e| {
log::error!("error1: error={:?}", e);
Error::UnexpectedError(anyhow::anyhow!(e))
})?
.rows_affected();
if n_inserted_rows > 0 {
Ok(crate::idempotency::NextAction::StartProcessing(transaction))
} else {
sleep(Duration::from_millis(2000));
let saved_response = get_saved_response(pool, idempotency_key, user_id)
.await?
.ok_or_else(|| anyhow::anyhow!("We expected a saved response, we didn't find it"))?;
Ok(crate::idempotency::NextAction::ReturnSavedResponse(
saved_response,
))
}
}
pub async fn save_response(
pool: &Pool<MySql>,
idempotency_key: &IdempotencyKey,
user_id: String,
http_response: HttpResponse,
) -> Result<HttpResponse> {
let (response_head, body) = http_response.into_parts();
let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
let status_code = response_head.status().as_u16() as i16;
let _headers = {
let mut h = Vec::with_capacity(response_head.headers().len());
for (name, value) in response_head.headers().iter() {
let name = name.as_str().to_owned();
let value = value.to_str().map_err(Error::run_time)?.to_string();
h.push(HeaderPairRecord { name, value });
}
h
};
let _headers_json_string = serde_json::to_string(&_headers)?;
let _ = sqlx::query(
r#"
UPDATE t_idempotency
SET
response_status_code = ?,
response_headers = ?,
response_body = ?
WHERE
user_id = ? AND
idempotency_key = ?
"#,
)
.bind(status_code)
.bind(_headers_json_string)
.bind(body.as_ref())
.bind(user_id)
.bind(idempotency_key.as_ref())
.execute(pool)
.await?;
let http_response = response_head.set_body(body).map_into_boxed_body();
Ok(http_response)
}
pub async fn get_saved_response(
pool: &Pool<MySql>,
idempotency_key: &IdempotencyKey,
user_id: String,
) -> Result<Option<HttpResponse>> {
let mut query_builder: QueryBuilder<MySql> = QueryBuilder::new(
"SELECT
response_status_code,
response_headers,
response_body
FROM t_idempotency
WHERE 1=1",
);
query_builder.push(" and user_id = ");
query_builder.push_bind(user_id.to_owned());
query_builder.push(" and idempotency_key = ");
query_builder.push_bind(idempotency_key.as_ref());
let saved_response = query_builder
.build()
.fetch_optional(pool)
.await
.map_err(Error::run_time)?;
if let Some(r) = saved_response {
let response_status_code: u16 = r.get("response_status_code");
let response_headers: String = r.get("response_headers");
let response_body: String = r.get("response_body");
let status_code = StatusCode::from_u16(response_status_code).map_err(Error::run_time)?;
let mut response = HttpResponse::build(status_code);
let _header_string = response_headers;
let response_headers = parse_json_string::<Vec<HeaderPairRecord>>(&_header_string)?;
for HeaderPairRecord { name, value } in response_headers {
response.append_header((name, value));
}
Ok(Some(response.body(response_body)))
} else {
Ok(None)
}
}