1use tokio::sync::broadcast;
2
3use bat_markets_core::{
4 CommandAck, CommandLaneEvent, CommandLifecycleEvent, CommandReceipt, CommandStatus,
5 CommandTransport, ErrorKind, ReconcileOutcome, ReconcileReport, Result,
6};
7
8pub struct PendingCommandHandle {
10 ack: CommandAck,
11 receiver: broadcast::Receiver<CommandLaneEvent>,
12 initial_receipt_pending: bool,
13}
14
15impl PendingCommandHandle {
16 pub(crate) fn from_ack(
17 ack: CommandAck,
18 receiver: broadcast::Receiver<CommandLaneEvent>,
19 ) -> Self {
20 Self {
21 ack,
22 receiver,
23 initial_receipt_pending: true,
24 }
25 }
26
27 pub(crate) fn from_receipt(
28 receipt: CommandReceipt,
29 transport: CommandTransport,
30 receiver: broadcast::Receiver<CommandLaneEvent>,
31 ) -> Self {
32 Self::from_ack(
33 CommandAck {
34 receipt,
35 transport,
36 acknowledged_at: timestamp_now_ms(),
37 },
38 receiver,
39 )
40 }
41
42 #[must_use]
44 pub const fn ack(&self) -> &CommandAck {
45 &self.ack
46 }
47
48 pub async fn receipt(&mut self) -> Result<CommandReceipt> {
53 if self.initial_receipt_pending {
54 self.initial_receipt_pending = false;
55 return Ok(self.ack.receipt.clone());
56 }
57
58 loop {
59 let event = self.receiver_mut().recv().await.map_err(|error| {
60 bat_markets_core::MarketError::new(
61 ErrorKind::TransportError,
62 format!("command receipt receive failed: {error}"),
63 )
64 })?;
65
66 let CommandLaneEvent::Receipt(receipt) = event else {
67 continue;
68 };
69 if matches_receipt(&self.ack.receipt, &receipt) {
70 return Ok(receipt);
71 }
72 }
73 }
74
75 pub async fn next_lifecycle(&mut self) -> Result<CommandLifecycleEvent> {
80 loop {
81 let event = self.receiver_mut().recv().await.map_err(|error| {
82 bat_markets_core::MarketError::new(
83 ErrorKind::TransportError,
84 format!("command lifecycle receive failed: {error}"),
85 )
86 })?;
87
88 let CommandLaneEvent::Lifecycle(lifecycle) = event else {
89 continue;
90 };
91 if matches_lifecycle(&self.ack, &lifecycle) {
92 return Ok(lifecycle);
93 }
94 }
95 }
96
97 pub async fn resolved(&mut self) -> Result<CommandReceipt> {
103 if self.ack.receipt.status != CommandStatus::UnknownExecution {
104 return Ok(self.ack.receipt.clone());
105 }
106
107 loop {
108 let lifecycle = self.next_lifecycle().await?;
109 if let CommandLifecycleEvent::RecoveryCompleted { report, .. } = lifecycle {
110 return Ok(receipt_after_recovery(&self.ack.receipt, &report));
111 }
112 }
113 }
114
115 fn receiver_mut(&mut self) -> &mut broadcast::Receiver<CommandLaneEvent> {
116 &mut self.receiver
117 }
118}
119
120fn matches_lifecycle(ack: &CommandAck, lifecycle: &CommandLifecycleEvent) -> bool {
121 match lifecycle {
122 CommandLifecycleEvent::Ack(other)
123 | CommandLifecycleEvent::RecoveryScheduled(other)
124 | CommandLifecycleEvent::RecoveryCompleted { ack: other, .. } => ack == other,
125 CommandLifecycleEvent::Receipt(other) => matches_receipt(&ack.receipt, other),
126 }
127}
128
129fn matches_receipt(left: &CommandReceipt, right: &CommandReceipt) -> bool {
130 if left.operation != right.operation {
131 return false;
132 }
133
134 if let (Some(left_id), Some(right_id)) = (&left.order_id, &right.order_id) {
135 return left_id == right_id;
136 }
137 if let (Some(left_id), Some(right_id)) = (&left.client_order_id, &right.client_order_id) {
138 return left_id == right_id;
139 }
140 if let (Some(left_id), Some(right_id)) = (&left.request_id, &right.request_id) {
141 return left_id == right_id;
142 }
143
144 false
145}
146
147fn receipt_after_recovery(receipt: &CommandReceipt, report: &ReconcileReport) -> CommandReceipt {
148 let mut resolved = receipt.clone();
149 match report.outcome {
150 ReconcileOutcome::Synchronized => {
151 resolved.status = CommandStatus::Accepted;
152 resolved.retriable = false;
153 resolved.message = Some(
154 report
155 .note
156 .clone()
157 .unwrap_or_else(|| "command outcome resolved by reconcile".into()),
158 );
159 }
160 ReconcileOutcome::StillUncertain | ReconcileOutcome::Diverged => {
161 resolved.status = CommandStatus::UnknownExecution;
162 resolved.retriable = true;
163 resolved.message =
164 Some(report.note.clone().unwrap_or_else(|| {
165 "command outcome remains unresolved after reconcile".into()
166 }));
167 }
168 }
169 resolved
170}
171
172fn timestamp_now_ms() -> bat_markets_core::TimestampMs {
173 let millis = std::time::SystemTime::now()
174 .duration_since(std::time::UNIX_EPOCH)
175 .map(|duration| duration.as_millis())
176 .unwrap_or_default()
177 .min(i64::MAX as u128) as i64;
178 bat_markets_core::TimestampMs::new(millis)
179}
180
181#[cfg(test)]
182mod tests {
183 use bat_markets_core::{
184 CommandAck, CommandLifecycleEvent, CommandOperation, CommandReceipt, CommandStatus,
185 CommandTransport, InstrumentId, Product, ReconcileOutcome, ReconcileReport,
186 ReconcileTrigger, TimestampMs, Venue,
187 };
188
189 #[test]
190 fn recovery_synchronized_returns_accepted_non_retriable_receipt() {
191 let receipt = unknown_receipt();
192 let report = report(
193 ReconcileOutcome::Synchronized,
194 "recent history resolved command",
195 );
196
197 let resolved = super::receipt_after_recovery(&receipt, &report);
198
199 assert_eq!(resolved.status, CommandStatus::Accepted);
200 assert!(!resolved.retriable);
201 assert_eq!(
202 resolved.message.as_deref(),
203 Some("recent history resolved command")
204 );
205 }
206
207 #[test]
208 fn recovery_still_uncertain_keeps_receipt_explicitly_unknown() {
209 let receipt = unknown_receipt();
210 let report = report(
211 ReconcileOutcome::StillUncertain,
212 "1 pending command outcomes still unresolved",
213 );
214
215 let resolved = super::receipt_after_recovery(&receipt, &report);
216
217 assert_eq!(resolved.status, CommandStatus::UnknownExecution);
218 assert!(resolved.retriable);
219 assert_eq!(
220 resolved.message.as_deref(),
221 Some("1 pending command outcomes still unresolved")
222 );
223 }
224
225 #[test]
226 fn receipts_without_explicit_identity_do_not_match_by_instrument_only() {
227 let left = receipt_for_instrument(TimestampMs::new(1));
228 let right = receipt_for_instrument(TimestampMs::new(2));
229
230 assert!(!super::matches_receipt(&left, &right));
231 }
232
233 #[test]
234 fn lifecycle_matching_requires_the_same_ack() {
235 let left = ack_for_instrument(TimestampMs::new(1));
236 let right = ack_for_instrument(TimestampMs::new(2));
237 let lifecycle = CommandLifecycleEvent::RecoveryCompleted {
238 ack: right,
239 report: report(ReconcileOutcome::Synchronized, "resolved"),
240 };
241
242 assert!(!super::matches_lifecycle(&left, &lifecycle));
243 }
244
245 fn receipt_for_instrument(acknowledged_at: TimestampMs) -> CommandReceipt {
246 let mut receipt = unknown_receipt();
247 receipt.instrument_id = Some(InstrumentId::from("BTC/USDT:USDT"));
248 receipt.message = Some(format!("ack at {}", acknowledged_at.value()).into());
249 receipt
250 }
251
252 fn ack_for_instrument(acknowledged_at: TimestampMs) -> CommandAck {
253 CommandAck {
254 receipt: receipt_for_instrument(acknowledged_at),
255 transport: CommandTransport::WebSocket,
256 acknowledged_at,
257 }
258 }
259
260 fn unknown_receipt() -> CommandReceipt {
261 CommandReceipt {
262 operation: CommandOperation::CreateOrder,
263 status: CommandStatus::UnknownExecution,
264 venue: Venue::Binance,
265 product: Product::LinearUsdt,
266 instrument_id: None,
267 order_id: None,
268 client_order_id: None,
269 request_id: None,
270 message: Some("command outcome requires reconcile".into()),
271 native_code: None,
272 retriable: true,
273 }
274 }
275
276 fn report(outcome: ReconcileOutcome, note: &str) -> ReconcileReport {
277 ReconcileReport {
278 trigger: ReconcileTrigger::UnknownExecution,
279 outcome,
280 repaired_at: TimestampMs::new(1),
281 note: Some(note.into()),
282 }
283 }
284}