use async_trait::async_trait;
use rust_decimal::Decimal;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::types::Json;
use sqlx::{Postgres, Row as _, Transaction};
use std::mem;
use crate::checkout::{Checkout, CheckoutStore};
use crate::context::{ClientContext, Context};
use crate::customer::Address;
use crate::db::DBClient;
use crate::error::{new_server_error, Error};
use crate::inventory::InventoryController;
use crate::invoice::InvoiceCalculator;
use crate::invoice::{Invoice, Tax};
use crate::item::Item;
use crate::money::Currency;
use crate::order::{Order, OrderEvents, OrderStore};
use crate::payment::{
CancellationResult, Payment, PaymentData, PaymentProcessor, PaymentStore, ProcessingResult,
};
use crate::price::ItemPrice;
use crate::price::PriceCalculator;
use crate::product::Product;
use crate::product::ProductResolver;
use crate::shipping::ShippingCalculator;
use crate::shipping::ShippingQuote;
use crate::transaction::TransactionController;
pub struct MasterContext<'a, C: ClientContext> {
client_ctx: C,
db_client: &'a DBClient,
tx: Option<Transaction<'a, Postgres>>,
}
impl<'a, C: ClientContext> MasterContext<'a, C> {
pub fn new(client_ctx: C, db_client: &'a DBClient) -> Self {
Self {
client_ctx,
db_client,
tx: None,
}
}
}
impl<'a, C: ClientContext + Send> Context for MasterContext<'a, C> {}
impl<'a, C: ClientContext + Send> ClientContext for MasterContext<'a, C> {}
#[async_trait]
impl<'a, C: ClientContext + Send> TransactionController for MasterContext<'a, C> {
async fn start_transaction(&mut self) -> Result<(), Error> {
Ok(self.tx = Some(
self.db_client
.new_tx()
.await
.map_err(|_| new_server_error("DB_ERROR: failed to start database transaction"))?,
))
}
async fn commit_transaction(&mut self) -> Result<(), Error> {
let mut tx: Option<Transaction<Postgres>> = None;
mem::swap(&mut self.tx, &mut tx);
tx.unwrap()
.commit()
.await
.map_err(|_| new_server_error("DB_ERROR: failed to commit transaction"))?;
Ok(())
}
async fn abort_transaction(&mut self) -> Result<(), Error> {
let mut tx: Option<Transaction<Postgres>> = None;
mem::swap(&mut self.tx, &mut tx);
tx.unwrap()
.rollback()
.await
.map_err(|_| new_server_error("DB_ERROR: failed to rollback transaction"))?;
Ok(())
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> InventoryController for MasterContext<'a, C> {}
#[async_trait]
impl<'a, C: ClientContext + Send> CheckoutStore for MasterContext<'a, C> {
async fn create_checkout<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
co: &Checkout<P>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("INSERT INTO checkouts (id, data) VALUES ($1, $2)")
.bind(&co.id)
.bind(Json(co))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error creating checkout"))?;
Ok(())
}
async fn update_checkout<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
co: &Checkout<P>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("UPDATE checkouts SET data = $2 WHERE id = $1")
.bind(&co.id)
.bind(Json(co))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error updating checkout"))?;
Ok(())
}
async fn get_checkout<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Checkout<P>>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM checkouts WHERE id = $1")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error selecting checkout"))?;
if record.is_none() {
return Ok(None);
}
let co: Json<Checkout<P>> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode checkout from db"))?;
Ok(Some(co.0))
}
async fn get_checkout_for_update<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Checkout<P>>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM checkouts WHERE id = $1 FOR UPDATE")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|err| {
println!("{}", err);
new_server_error("DB_ERROR: error selecting checkout for update")
})?;
if record.is_none() {
return Ok(None);
}
let co: Json<Checkout<P>> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode checkout from db"))?;
Ok(Some(co.0))
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> OrderStore for MasterContext<'a, C> {
async fn create_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
od: &Order<P>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("INSERT INTO orders (id, data) VALUES ($1, $2)")
.bind(&od.id)
.bind(Json(od))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error creating order"))?;
Ok(())
}
async fn update_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
od: &Order<P>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("UPDATE orders SET data = $2 WHERE id = $1")
.bind(&od.id)
.bind(Json(od))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error updating order"))?;
Ok(())
}
async fn get_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Order<P>>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM orders WHERE id = $1")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error selecting order"))?;
if record.is_none() {
return Ok(None);
}
let od: Json<Order<P>> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode order from db"))?;
Ok(Some(od.0))
}
async fn get_order_for_update<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Order<P>>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM orders WHERE id = $1 FOR UPDATE")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|err| {
println!("{}", err);
new_server_error("DB_ERROR: error selecting order for update")
})?;
if record.is_none() {
return Ok(None);
}
let od: Json<Order<P>> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode order from db"))?;
Ok(Some(od.0))
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> PaymentStore for MasterContext<'a, C> {
async fn create_payment(
&mut self,
payment: &Payment<<Self as PaymentProcessor>::Data>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("INSERT INTO payments (id, correlation_id, data) VALUES ($1, $2, $3)")
.bind(&payment.id)
.bind(&payment.correlation_id)
.bind(Json(payment))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error creating payment"))?;
Ok(())
}
async fn update_payment(
&mut self,
payment: &Payment<<Self as PaymentProcessor>::Data>,
) -> Result<(), Error> {
let tx = self.tx.as_mut().unwrap();
sqlx::query("UPDATE payments SET correlation_id = $2, data = $3 WHERE id = $1")
.bind(&payment.id)
.bind(&payment.correlation_id)
.bind(Json(payment))
.execute(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error updating payment"))?;
Ok(())
}
async fn get_payment<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<P>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM payments WHERE id = $1")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error selecting payment"))?;
if record.is_none() {
return Ok(None);
}
let co: Json<P> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode payment from db"))?;
Ok(Some(co.0))
}
async fn get_payment_for_update<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<P>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM payments WHERE id = $1 FOR UPDATE")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|_| new_server_error("DB_ERROR: error selecting payment for update"))?;
if record.is_none() {
return Ok(None);
}
let co: Json<P> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode payment from db"))?;
Ok(Some(co.0))
}
async fn get_payment_for_update_by_correlation_id<
P: Sync + Send + Serialize + DeserializeOwned,
>(
&mut self,
id: &str,
) -> Result<Option<P>, Error> {
let tx = self.tx.as_mut().unwrap();
let record = sqlx::query("SELECT data FROM payments WHERE correlation_id = $1 FOR UPDATE")
.bind(id)
.fetch_optional(tx)
.await
.map_err(|_| {
new_server_error("DB_ERROR: error selecting payment for update by correlation id")
})?;
if record.is_none() {
return Ok(None);
}
let co: Json<P> = record
.unwrap()
.try_get("data")
.map_err(|_| new_server_error("DB_ERROR: couldn't decode payment from db"))?;
Ok(Some(co.0))
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> PaymentProcessor for MasterContext<'a, C> {
type Data = <C as PaymentProcessor>::Data;
type InitArgs = <C as PaymentProcessor>::InitArgs;
type ProcessArgs = <C as PaymentProcessor>::ProcessArgs;
async fn initiate_payment(
&mut self,
payment: &Payment<Self::Data>,
args: &Self::InitArgs,
) -> Result<PaymentData<Self::Data>, Error> {
self.client_ctx.initiate_payment(payment, args).await
}
async fn process_payment(
&mut self,
payment: &Payment<Self::Data>,
args: &Self::ProcessArgs,
) -> Result<ProcessingResult<Self::Data>, Error> {
self.client_ctx.process_payment(payment, args).await
}
async fn cancel_payment(
&mut self,
payment: &Payment<Self::Data>,
) -> Result<CancellationResult, Error> {
self.client_ctx.cancel_payment(payment).await
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> OrderEvents for MasterContext<'a, C> {
async fn on_payment_in_progress(
&mut self,
order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
self.client_ctx.on_payment_in_progress(order).await
}
async fn on_pending_confirmation(
&mut self,
order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
self.client_ctx.on_pending_confirmation(order).await
}
async fn on_confirmed(
&mut self,
order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
self.client_ctx.on_confirmed(order).await
}
async fn on_fulfilled(
&mut self,
order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
self.client_ctx.on_fulfilled(order).await
}
async fn on_cancelled(
&mut self,
order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
self.client_ctx.on_cancelled(order).await
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> PriceCalculator for MasterContext<'a, C> {
async fn calculate_item_prices(
&mut self,
currency: &Currency,
promo_codes: &Vec<String>,
items: &Vec<Item>,
) -> Result<Vec<ItemPrice>, Error> {
self.client_ctx
.calculate_item_prices(currency, promo_codes, items)
.await
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> ShippingCalculator for MasterContext<'a, C> {
async fn get_shipping_quotes(
&mut self,
currency: &Currency,
promo_codes: &Vec<String>,
items: &Vec<Item>,
address: &Address,
) -> Result<Vec<ShippingQuote>, Error> {
self.client_ctx
.get_shipping_quotes(currency, promo_codes, items, address)
.await
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> ProductResolver for MasterContext<'a, C> {
async fn resolve_product(&mut self, currency: &Currency, sku: &str) -> Result<Product, Error> {
self.client_ctx.resolve_product(currency, sku).await
}
}
#[async_trait]
impl<'a, C: ClientContext + Send> InvoiceCalculator for MasterContext<'a, C> {
async fn line_item_taxes<P: Sync + Send>(
&mut self,
co: &Checkout<P>,
) -> Result<Vec<Tax>, Error> {
self.client_ctx.line_item_taxes(co).await
}
async fn shipping_taxes<P: Sync + Send>(
&mut self,
co: &Checkout<P>,
) -> Result<Vec<Tax>, Error> {
self.client_ctx.shipping_taxes(co).await
}
async fn initial_charge_ratio<P: Sync + Send>(
&mut self,
co: &Checkout<P>,
) -> Result<Decimal, Error> {
self.client_ctx.initial_charge_ratio(co).await
}
async fn generate_invoice<P: Sync + Send>(
&mut self,
co: &Checkout<P>,
) -> Result<Invoice, Error> {
self.client_ctx.generate_invoice(co).await
}
}