use async_trait::async_trait;
use chrono::{DateTime, Utc};
use cloud_pubsub;
use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json;
use uuid::Uuid;
use crate::{
checkout::Checkout,
context::Context,
customer::{Address, Contact},
error::{new_application_error, new_invalid_state_error, Error},
internal_context::InternalContext,
inventory::StockUnit,
invoice::Invoice,
item::Item,
payment::{handlers as payment_handlers, Initiator},
payment::{Payment, PaymentProcessor},
shipping::Fulfillment,
};
pub mod handlers;
#[derive(Serialize, Deserialize, JsonSchema)]
pub struct Order<P> {
pub id: String,
pub state: OrderState,
pub state_reconciled: bool,
pub contact: Contact,
pub shipping_address: Address,
pub items: Vec<Item>,
pub fulfillment: Fulfillment,
pub invoice: Invoice,
pub payment_id: Option<String>,
pub payment: Option<P>,
}
#[derive(Serialize, Deserialize, JsonSchema)]
#[serde(tag = "name", rename_all = "snake_case")]
pub enum OrderState {
PaymentInProgress { reserved_since: DateTime<Utc> }, PendingConfirmation, Confirmed, Fulfilled, Cancelled, }
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub enum LookupKey {
CheckoutId(String),
OrderId(String),
}
impl cloud_pubsub::FromPubSubMessage for LookupKey {
fn from(message: cloud_pubsub::EncodedMessage) -> Result<Self, cloud_pubsub::error::Error> {
match message.decode() {
Ok(bytes) => {
let val: LookupKey = serde_json::from_slice(&bytes).unwrap();
Ok(val)
}
Err(e) => Err(cloud_pubsub::error::Error::from(e)),
}
}
}
#[async_trait]
pub trait OrderStore {
async fn create_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
order: &Order<P>,
) -> Result<(), Error>;
async fn update_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
order: &Order<P>,
) -> Result<(), Error>;
async fn get_order_for_update<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Order<P>>, Error>;
async fn get_order<P: Sync + Send + Serialize + DeserializeOwned>(
&mut self,
id: &str,
) -> Result<Option<Order<P>>, Error>;
}
#[async_trait]
pub trait OrderEvents: PaymentProcessor {
async fn on_payment_in_progress(
&mut self,
_order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
Ok(())
}
async fn on_pending_confirmation(
&mut self,
_order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
Ok(())
}
async fn on_confirmed(
&mut self,
_order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
Ok(())
}
async fn on_fulfilled(
&mut self,
_order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
Ok(())
}
async fn on_cancelled(
&mut self,
_order: &Order<Payment<<Self as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
Ok(())
}
}
impl<P: Sync + Send + Serialize + DeserializeOwned> Order<P> {
pub async fn new_from_checkout<C: Context + Send>(
internal_ctx: &InternalContext,
ctx: &mut C,
checkout: &Checkout<P>,
) -> Result<Self, Error> {
let order = Self {
id: Uuid::new_v4().to_string(),
state_reconciled: false,
state: OrderState::PendingConfirmation,
contact: checkout.contact.as_ref().unwrap().clone(),
shipping_address: checkout.shipping_address.as_ref().unwrap().clone(),
items: checkout.items.clone(),
fulfillment: checkout.fulfillment.as_ref().unwrap().clone(),
invoice: checkout.invoice.as_ref().unwrap().clone(),
payment_id: checkout.payment_id.clone(),
payment: None,
};
ctx.create_order(&order).await?;
publish_state_change_event(internal_ctx, LookupKey::CheckoutId(checkout.id.clone()))
.await?;
Ok(order)
}
pub async fn new_from_checkout_missing_payment<C: Context + Send>(
internal_ctx: &InternalContext,
ctx: &mut C,
checkout: &Checkout<P>,
reserved_since: DateTime<Utc>,
payment_args: <C as PaymentProcessor>::InitArgs,
) -> Result<Self, Error> {
let mut order = Order {
id: Uuid::new_v4().to_string(),
state_reconciled: false,
state: OrderState::PaymentInProgress { reserved_since },
contact: checkout.contact.as_ref().unwrap().clone(),
shipping_address: checkout.shipping_address.as_ref().unwrap().clone(),
items: checkout.items.clone(),
fulfillment: checkout.fulfillment.as_ref().unwrap().clone(),
invoice: checkout.invoice.as_ref().unwrap().clone(),
payment_id: None,
payment: None,
};
ctx.create_order(&order).await?;
let charge_amount = order.invoice.initial_charge_amount.clone();
let payment = payment_handlers::create_handler(
ctx,
Initiator::<P>::Order(&mut order),
charge_amount,
payment_args,
)
.await?;
order.payment_id = Some(payment.id.clone());
ctx.update_order(&order).await?;
publish_state_change_event(internal_ctx, LookupKey::CheckoutId(checkout.id.clone()))
.await?;
Ok(order)
}
}
impl<P: Sync + Send + Serialize + DeserializeOwned> Order<P> {
async fn assert_reconciled(&self) -> Result<(), Error> {
match self.state_reconciled {
true => Ok(()),
false => Err(new_application_error(
"ORDER_STATE_RECONCILING",
"the order state is reconciling. please try again in a few moments",
)),
}
}
async fn transition_state(
&mut self,
internal_ctx: &InternalContext,
to: OrderState,
) -> Result<(), Error> {
self.assert_reconciled().await?;
self.state = to;
self.state_reconciled = false;
publish_state_change_event(internal_ctx, LookupKey::OrderId(self.id.clone())).await
}
async fn ensure_pending_confirmation_state(
&mut self,
internal_ctx: &InternalContext,
) -> Result<(), Error> {
match self.state {
OrderState::PendingConfirmation => Ok(()),
OrderState::PaymentInProgress { reserved_since: _ } => {
self.transition_state(internal_ctx, OrderState::PendingConfirmation)
.await
}
_ => Err(new_invalid_state_error(
"should be in the PaymentInProgress state",
)),
}
}
async fn ensure_confirmed_state(
&mut self,
internal_ctx: &InternalContext,
) -> Result<(), Error> {
match self.state {
OrderState::Confirmed => Ok(()),
OrderState::PendingConfirmation => {
self.transition_state(internal_ctx, OrderState::Confirmed)
.await
}
_ => Err(new_invalid_state_error(
"should be in the PendingConfirmation state",
)),
}
}
async fn ensure_fulfilled_state(
&mut self,
internal_ctx: &InternalContext,
) -> Result<(), Error> {
match self.state {
OrderState::Fulfilled => Ok(()),
OrderState::Confirmed => {
self.transition_state(internal_ctx, OrderState::Fulfilled)
.await
}
_ => Err(new_invalid_state_error("should be in the Confirmed state")),
}
}
async fn ensure_cancelled_state(
&mut self,
internal_ctx: &InternalContext,
) -> Result<(), Error> {
match self.state {
OrderState::Cancelled => Ok(()),
OrderState::PaymentInProgress { reserved_since: _ }
| OrderState::PendingConfirmation
| OrderState::Confirmed => {
self.transition_state(internal_ctx, OrderState::Cancelled)
.await
}
_ => Err(new_invalid_state_error(
"should be in the PaymentInProgress, PendingConfirmation or Confirmed states",
)),
}
}
pub async fn populate_associations<C: Context + Send>(
&mut self,
ctx: &mut C,
) -> Result<(), Error> {
if let Some(ref payment_id) = self.payment_id {
self.payment = ctx.get_payment(payment_id).await?;
}
Ok(())
}
fn purge_associations(&mut self) {
self.payment = None;
}
}
impl<P: Sync + Send + Serialize + DeserializeOwned> Order<P> {
pub async fn prompt_confirmation<C: Context + Send>(
&mut self,
internal_ctx: &InternalContext,
ctx: &mut C,
) -> Result<(), Error> {
self.ensure_pending_confirmation_state(internal_ctx).await?;
ctx.update_order(self).await
}
pub async fn confirm<C: Context + Send>(
&mut self,
internal_ctx: &InternalContext,
ctx: &mut C,
) -> Result<(), Error> {
self.ensure_confirmed_state(internal_ctx).await?;
ctx.update_order(self).await
}
pub async fn complete<C: Context + Send>(
&mut self,
internal_ctx: &InternalContext,
ctx: &mut C,
) -> Result<(), Error> {
self.ensure_fulfilled_state(internal_ctx).await?;
ctx.free_items(&self.items).await?;
let stock: Vec<StockUnit> = self.items.iter().map(|item| item.into()).collect();
ctx.remove_stock(&stock).await?;
ctx.update_order(self).await
}
pub async fn cancel<C: Context + Send>(
&mut self,
internal_ctx: &InternalContext,
ctx: &mut C,
) -> Result<(), Error> {
self.ensure_cancelled_state(internal_ctx).await?;
ctx.free_items(&self.items).await?;
ctx.update_order(self).await
}
}
async fn reconcile<C: Context + Send>(
ctx: &mut C,
order: &mut Order<Payment<<C as PaymentProcessor>::Data>>,
) -> Result<(), Error> {
if order.state_reconciled == true {
return Ok(());
}
order.populate_associations(ctx).await?;
match order.state {
OrderState::PaymentInProgress { reserved_since: _ } => {
ctx.on_payment_in_progress(order).await?
}
OrderState::PendingConfirmation => ctx.on_pending_confirmation(order).await?,
OrderState::Confirmed => ctx.on_confirmed(order).await?,
OrderState::Fulfilled => ctx.on_fulfilled(order).await?,
OrderState::Cancelled => ctx.on_cancelled(order).await?,
}
order.purge_associations();
order.state_reconciled = true;
ctx.update_order(order).await
}
async fn publish_state_change_event(
internal_ctx: &InternalContext,
lookup_key: LookupKey,
) -> Result<(), Error> {
internal_ctx
.order_events_topic
.clone()
.publish(lookup_key)
.await
.map_err(|_| {
new_application_error(
"UPSTREAM_ERROR",
"couldn't send message to order events topic",
)
})?;
Ok(())
}