use super::{IdempotencyKey, LiveOrderWriter};
use crate::internal::domain::{BrokerOrderId, ErrorCode, GatewayError, ValidatedOrderGroup};
use async_trait::async_trait;
use tracing::warn;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GroupSubmitReceipt {
pub broker_order_ids: Vec<BrokerOrderId>,
}
#[async_trait]
pub trait PaperOrderGroupWriter: Send + Sync {
async fn submit_paper_group(
&self,
group: &ValidatedOrderGroup,
idempotency_key: &IdempotencyKey,
) -> Result<GroupSubmitReceipt, GatewayError>;
}
#[async_trait]
pub trait LiveOrderGroupWriter: Send + Sync {
async fn submit_live_group(
&self,
group: &ValidatedOrderGroup,
idempotency_key: &IdempotencyKey,
) -> Result<GroupSubmitReceipt, GatewayError>;
}
#[derive(Clone, Debug, Default)]
pub struct LocalCandidatePaperGroupWriter;
#[async_trait]
impl PaperOrderGroupWriter for LocalCandidatePaperGroupWriter {
async fn submit_paper_group(
&self,
_group: &ValidatedOrderGroup,
_idempotency_key: &IdempotencyKey,
) -> Result<GroupSubmitReceipt, GatewayError> {
Ok(GroupSubmitReceipt {
broker_order_ids: vec![
BrokerOrderId::from_static("paper-bracket-parent-local"),
BrokerOrderId::from_static("paper-bracket-take-profit-local"),
BrokerOrderId::from_static("paper-bracket-stop-loss-local"),
],
})
}
}
#[derive(Clone, Debug, Default)]
pub struct LocalCandidateLiveGroupWriter;
#[async_trait]
impl LiveOrderGroupWriter for LocalCandidateLiveGroupWriter {
async fn submit_live_group(
&self,
_group: &ValidatedOrderGroup,
idempotency_key: &IdempotencyKey,
) -> Result<GroupSubmitReceipt, GatewayError> {
Ok(GroupSubmitReceipt {
broker_order_ids: vec![
BrokerOrderId::new(format!("live-bracket-{}-parent", idempotency_key.as_str()))
.ok_or_else(local_candidate_error)?,
BrokerOrderId::new(format!(
"live-bracket-{}-take-profit",
idempotency_key.as_str()
))
.ok_or_else(local_candidate_error)?,
BrokerOrderId::new(format!(
"live-bracket-{}-stop-loss",
idempotency_key.as_str()
))
.ok_or_else(local_candidate_error)?,
],
})
}
}
pub struct SequentialLiveOrderGroupWriter<'a> {
writer: &'a dyn LiveOrderWriter,
}
impl<'a> SequentialLiveOrderGroupWriter<'a> {
#[must_use]
pub const fn new(writer: &'a dyn LiveOrderWriter) -> Self {
Self { writer }
}
}
#[async_trait]
impl LiveOrderGroupWriter for SequentialLiveOrderGroupWriter<'_> {
async fn submit_live_group(
&self,
group: &ValidatedOrderGroup,
idempotency_key: &IdempotencyKey,
) -> Result<GroupSubmitReceipt, GatewayError> {
let parent_key = leg_key(idempotency_key, "parent")?;
let take_profit_key = leg_key(idempotency_key, "take-profit")?;
let stop_loss_key = leg_key(idempotency_key, "stop-loss")?;
let parent = self.writer.submit_live(&group.parent, &parent_key).await?;
let take_profit = match self
.writer
.submit_live(&group.take_profit, &take_profit_key)
.await
{
Ok(receipt) => receipt,
Err(error) => {
return Err(orphaned_group_error(
error,
std::slice::from_ref(&parent.broker_order_id),
));
}
};
let stop_loss = match self
.writer
.submit_live(&group.stop_loss, &stop_loss_key)
.await
{
Ok(receipt) => receipt,
Err(error) => {
return Err(orphaned_group_error(
error,
&[
parent.broker_order_id.clone(),
take_profit.broker_order_id.clone(),
],
));
}
};
Ok(GroupSubmitReceipt {
broker_order_ids: vec![
parent.broker_order_id,
take_profit.broker_order_id,
stop_loss.broker_order_id,
],
})
}
}
fn leg_key(idempotency_key: &IdempotencyKey, suffix: &str) -> Result<IdempotencyKey, GatewayError> {
IdempotencyKey::new(format!("{}-{suffix}", idempotency_key.as_str()))
}
fn orphaned_group_error(error: GatewayError, submitted_ids: &[BrokerOrderId]) -> GatewayError {
let submitted_ids = submitted_ids
.iter()
.map(BrokerOrderId::as_str)
.collect::<Vec<_>>()
.join(",");
warn!(
target: "orders",
broker_order_ids = %submitted_ids,
error_code = ?error.code,
"sequential live bracket writer failed after submitting prior legs"
);
GatewayError::new(
error.code,
format!(
"Sequential live bracket submit failed after submitting broker order ids: {submitted_ids}"
),
error.retryable,
Some(format!(
"Inspect and clean up broker order ids before retrying: {submitted_ids}"
)),
)
}
fn local_candidate_error() -> GatewayError {
GatewayError::new(
ErrorCode::OrderValidationFailed,
"Local candidate group writer could not synthesize broker ids",
false,
Some("Use a non-empty idempotency key".to_string()),
)
}