Skip to main content

bat_markets/
entry.rs

1use tokio::sync::broadcast;
2
3use bat_markets_core::{
4    CommandAck, CommandLaneEvent, CommandLifecycleEvent, CommandReceipt, CommandStatus,
5    CommandTransport, ErrorKind, ReconcileOutcome, ReconcileReport, Result,
6};
7
8/// Low-latency command handle with lifecycle tracking over the shared command bus.
9pub struct PendingCommandHandle {
10    ack: CommandAck,
11    receiver: broadcast::Receiver<CommandLaneEvent>,
12    initial_receipt_pending: bool,
13}
14
15impl PendingCommandHandle {
16    pub(crate) fn from_ack(
17        ack: CommandAck,
18        receiver: broadcast::Receiver<CommandLaneEvent>,
19    ) -> Self {
20        Self {
21            ack,
22            receiver,
23            initial_receipt_pending: true,
24        }
25    }
26
27    pub(crate) fn from_receipt(
28        receipt: CommandReceipt,
29        transport: CommandTransport,
30        receiver: broadcast::Receiver<CommandLaneEvent>,
31    ) -> Self {
32        Self::from_ack(
33            CommandAck {
34                receipt,
35                transport,
36                acknowledged_at: timestamp_now_ms(),
37            },
38            receiver,
39        )
40    }
41
42    /// Return the immediate acknowledgement captured when the command was sent.
43    #[must_use]
44    pub const fn ack(&self) -> &CommandAck {
45        &self.ack
46    }
47
48    /// Return the initial receipt, then subsequent matching receipts from the command bus.
49    ///
50    /// The first call is always the receipt embedded in [`Self::ack`]. Later
51    /// calls wait for matching command-lane receipt events.
52    pub async fn receipt(&mut self) -> Result<CommandReceipt> {
53        if self.initial_receipt_pending {
54            self.initial_receipt_pending = false;
55            return Ok(self.ack.receipt.clone());
56        }
57
58        loop {
59            let event = self.receiver_mut().recv().await.map_err(|error| {
60                bat_markets_core::MarketError::new(
61                    ErrorKind::TransportError,
62                    format!("command receipt receive failed: {error}"),
63                )
64            })?;
65
66            let CommandLaneEvent::Receipt(receipt) = event else {
67                continue;
68            };
69            if matches_receipt(&self.ack.receipt, &receipt) {
70                return Ok(receipt);
71            }
72        }
73    }
74
75    /// Wait for the next lifecycle event matching this command.
76    ///
77    /// This observes acknowledgement, recovery scheduling, recovery completion,
78    /// and follow-up receipt events emitted by the command lane.
79    pub async fn next_lifecycle(&mut self) -> Result<CommandLifecycleEvent> {
80        loop {
81            let event = self.receiver_mut().recv().await.map_err(|error| {
82                bat_markets_core::MarketError::new(
83                    ErrorKind::TransportError,
84                    format!("command lifecycle receive failed: {error}"),
85                )
86            })?;
87
88            let CommandLaneEvent::Lifecycle(lifecycle) = event else {
89                continue;
90            };
91            if matches_lifecycle(&self.ack, &lifecycle) {
92                return Ok(lifecycle);
93            }
94        }
95    }
96
97    /// Return a receipt that is resolved as far as local evidence allows.
98    ///
99    /// Non-uncertain receipts are returned immediately. `UnknownExecution`
100    /// receipts wait for a matching recovery completion and then map the
101    /// reconcile report into the best known final status.
102    pub async fn resolved(&mut self) -> Result<CommandReceipt> {
103        if self.ack.receipt.status != CommandStatus::UnknownExecution {
104            return Ok(self.ack.receipt.clone());
105        }
106
107        loop {
108            let lifecycle = self.next_lifecycle().await?;
109            if let CommandLifecycleEvent::RecoveryCompleted { report, .. } = lifecycle {
110                return Ok(receipt_after_recovery(&self.ack.receipt, &report));
111            }
112        }
113    }
114
115    fn receiver_mut(&mut self) -> &mut broadcast::Receiver<CommandLaneEvent> {
116        &mut self.receiver
117    }
118}
119
120fn matches_lifecycle(ack: &CommandAck, lifecycle: &CommandLifecycleEvent) -> bool {
121    match lifecycle {
122        CommandLifecycleEvent::Ack(other)
123        | CommandLifecycleEvent::RecoveryScheduled(other)
124        | CommandLifecycleEvent::RecoveryCompleted { ack: other, .. } => ack == other,
125        CommandLifecycleEvent::Receipt(other) => matches_receipt(&ack.receipt, other),
126    }
127}
128
129fn matches_receipt(left: &CommandReceipt, right: &CommandReceipt) -> bool {
130    if left.operation != right.operation {
131        return false;
132    }
133
134    if let (Some(left_id), Some(right_id)) = (&left.order_id, &right.order_id) {
135        return left_id == right_id;
136    }
137    if let (Some(left_id), Some(right_id)) = (&left.client_order_id, &right.client_order_id) {
138        return left_id == right_id;
139    }
140    if let (Some(left_id), Some(right_id)) = (&left.request_id, &right.request_id) {
141        return left_id == right_id;
142    }
143
144    false
145}
146
147fn receipt_after_recovery(receipt: &CommandReceipt, report: &ReconcileReport) -> CommandReceipt {
148    let mut resolved = receipt.clone();
149    match report.outcome {
150        ReconcileOutcome::Synchronized => {
151            resolved.status = CommandStatus::Accepted;
152            resolved.retriable = false;
153            resolved.message = Some(
154                report
155                    .note
156                    .clone()
157                    .unwrap_or_else(|| "command outcome resolved by reconcile".into()),
158            );
159        }
160        ReconcileOutcome::StillUncertain | ReconcileOutcome::Diverged => {
161            resolved.status = CommandStatus::UnknownExecution;
162            resolved.retriable = true;
163            resolved.message =
164                Some(report.note.clone().unwrap_or_else(|| {
165                    "command outcome remains unresolved after reconcile".into()
166                }));
167        }
168    }
169    resolved
170}
171
172fn timestamp_now_ms() -> bat_markets_core::TimestampMs {
173    let millis = std::time::SystemTime::now()
174        .duration_since(std::time::UNIX_EPOCH)
175        .map(|duration| duration.as_millis())
176        .unwrap_or_default()
177        .min(i64::MAX as u128) as i64;
178    bat_markets_core::TimestampMs::new(millis)
179}
180
181#[cfg(test)]
182mod tests {
183    use bat_markets_core::{
184        CommandAck, CommandLifecycleEvent, CommandOperation, CommandReceipt, CommandStatus,
185        CommandTransport, InstrumentId, Product, ReconcileOutcome, ReconcileReport,
186        ReconcileTrigger, TimestampMs, Venue,
187    };
188
189    #[test]
190    fn recovery_synchronized_returns_accepted_non_retriable_receipt() {
191        let receipt = unknown_receipt();
192        let report = report(
193            ReconcileOutcome::Synchronized,
194            "recent history resolved command",
195        );
196
197        let resolved = super::receipt_after_recovery(&receipt, &report);
198
199        assert_eq!(resolved.status, CommandStatus::Accepted);
200        assert!(!resolved.retriable);
201        assert_eq!(
202            resolved.message.as_deref(),
203            Some("recent history resolved command")
204        );
205    }
206
207    #[test]
208    fn recovery_still_uncertain_keeps_receipt_explicitly_unknown() {
209        let receipt = unknown_receipt();
210        let report = report(
211            ReconcileOutcome::StillUncertain,
212            "1 pending command outcomes still unresolved",
213        );
214
215        let resolved = super::receipt_after_recovery(&receipt, &report);
216
217        assert_eq!(resolved.status, CommandStatus::UnknownExecution);
218        assert!(resolved.retriable);
219        assert_eq!(
220            resolved.message.as_deref(),
221            Some("1 pending command outcomes still unresolved")
222        );
223    }
224
225    #[test]
226    fn receipts_without_explicit_identity_do_not_match_by_instrument_only() {
227        let left = receipt_for_instrument(TimestampMs::new(1));
228        let right = receipt_for_instrument(TimestampMs::new(2));
229
230        assert!(!super::matches_receipt(&left, &right));
231    }
232
233    #[test]
234    fn lifecycle_matching_requires_the_same_ack() {
235        let left = ack_for_instrument(TimestampMs::new(1));
236        let right = ack_for_instrument(TimestampMs::new(2));
237        let lifecycle = CommandLifecycleEvent::RecoveryCompleted {
238            ack: right,
239            report: report(ReconcileOutcome::Synchronized, "resolved"),
240        };
241
242        assert!(!super::matches_lifecycle(&left, &lifecycle));
243    }
244
245    fn receipt_for_instrument(acknowledged_at: TimestampMs) -> CommandReceipt {
246        let mut receipt = unknown_receipt();
247        receipt.instrument_id = Some(InstrumentId::from("BTC/USDT:USDT"));
248        receipt.message = Some(format!("ack at {}", acknowledged_at.value()).into());
249        receipt
250    }
251
252    fn ack_for_instrument(acknowledged_at: TimestampMs) -> CommandAck {
253        CommandAck {
254            receipt: receipt_for_instrument(acknowledged_at),
255            transport: CommandTransport::WebSocket,
256            acknowledged_at,
257        }
258    }
259
260    fn unknown_receipt() -> CommandReceipt {
261        CommandReceipt {
262            operation: CommandOperation::CreateOrder,
263            status: CommandStatus::UnknownExecution,
264            venue: Venue::Binance,
265            product: Product::LinearUsdt,
266            instrument_id: None,
267            order_id: None,
268            client_order_id: None,
269            request_id: None,
270            message: Some("command outcome requires reconcile".into()),
271            native_code: None,
272            retriable: true,
273        }
274    }
275
276    fn report(outcome: ReconcileOutcome, note: &str) -> ReconcileReport {
277        ReconcileReport {
278            trigger: ReconcileTrigger::UnknownExecution,
279            outcome,
280            repaired_at: TimestampMs::new(1),
281            note: Some(note.into()),
282        }
283    }
284}