1use std::collections::{BTreeMap, VecDeque};
2
3use crate::account::{AccountSummary, Balance};
4use crate::command::{CommandReceipt, CommandStatus};
5use crate::config::StatePolicy;
6use crate::error::{ErrorKind, MarketError, Result};
7use crate::execution::{DivergenceEvent, PrivateLaneEvent, PublicLaneEvent};
8use crate::health::HealthReport;
9use crate::ids::{AssetCode, InstrumentId, OrderId};
10use crate::instrument::InstrumentSpec;
11use crate::market::{
12 BookTop, FastBookTop, FastKline, FastLiquidation, FastMarkPrice, FastTicker, FastTrade,
13 FundingRate, Kline, Liquidation, MarkPrice, OpenInterest, Ticker, TradeTick,
14};
15use crate::position::Position;
16use crate::reconcile::{AccountSnapshot, PrivateSnapshot, ReconcileOutcome, ReconcileReport};
17use crate::trade::{Execution, Order};
18use crate::types::{OrderStatus, Product, Venue};
19
20#[derive(Clone, Debug)]
22pub struct EngineState {
23 venue: Venue,
24 product: Product,
25 state_policy: StatePolicy,
26 instruments: BTreeMap<InstrumentId, InstrumentSpec>,
27 tickers: BTreeMap<InstrumentId, Ticker>,
28 recent_trades: BTreeMap<InstrumentId, VecDeque<TradeTick>>,
29 book_tops: BTreeMap<InstrumentId, BookTop>,
30 klines: BTreeMap<InstrumentId, Kline>,
31 mark_prices: BTreeMap<InstrumentId, MarkPrice>,
32 funding_rates: BTreeMap<InstrumentId, FundingRate>,
33 open_interest: BTreeMap<InstrumentId, OpenInterest>,
34 liquidations: BTreeMap<InstrumentId, VecDeque<Liquidation>>,
35 balances: BTreeMap<AssetCode, Balance>,
36 account_summary: Option<AccountSummary>,
37 positions: BTreeMap<InstrumentId, Position>,
38 orders: BTreeMap<OrderId, Order>,
39 executions: VecDeque<Execution>,
40 health: HealthReport,
41}
42
43impl EngineState {
44 #[must_use]
45 pub fn new(
46 venue: Venue,
47 product: Product,
48 state_policy: StatePolicy,
49 instruments: impl IntoIterator<Item = InstrumentSpec>,
50 ) -> Self {
51 let instruments = instruments
52 .into_iter()
53 .map(|spec| (spec.instrument_id.clone(), spec))
54 .collect();
55
56 Self {
57 venue,
58 product,
59 state_policy,
60 instruments,
61 tickers: BTreeMap::new(),
62 recent_trades: BTreeMap::new(),
63 book_tops: BTreeMap::new(),
64 klines: BTreeMap::new(),
65 mark_prices: BTreeMap::new(),
66 funding_rates: BTreeMap::new(),
67 open_interest: BTreeMap::new(),
68 liquidations: BTreeMap::new(),
69 balances: BTreeMap::new(),
70 account_summary: None,
71 positions: BTreeMap::new(),
72 orders: BTreeMap::new(),
73 executions: VecDeque::new(),
74 health: HealthReport::default(),
75 }
76 }
77
78 pub fn apply_public_event(&mut self, event: PublicLaneEvent) -> Result<()> {
79 match event {
80 PublicLaneEvent::Ticker(ticker) => self.apply_fast_ticker(ticker)?,
81 PublicLaneEvent::Trade(trade) => self.apply_fast_trade(trade)?,
82 PublicLaneEvent::BookTop(book_top) => self.apply_fast_book_top(book_top)?,
83 PublicLaneEvent::OrderBookDelta(_) => {}
84 PublicLaneEvent::Kline(kline) => self.apply_fast_kline(kline)?,
85 PublicLaneEvent::MarkPrice(mark_price) => self.apply_fast_mark_price(mark_price)?,
86 PublicLaneEvent::FundingRate(funding_rate) => {
87 self.health.observe_public_message(funding_rate.event_time);
88 self.funding_rates
89 .insert(funding_rate.instrument_id.clone(), funding_rate);
90 }
91 PublicLaneEvent::OpenInterest(open_interest) => {
92 self.health.observe_public_message(open_interest.event_time);
93 self.open_interest
94 .insert(open_interest.instrument_id.clone(), open_interest);
95 }
96 PublicLaneEvent::Liquidation(liquidation) => {
97 self.apply_fast_liquidation(liquidation)?;
98 }
99 PublicLaneEvent::Divergence(divergence) => match divergence {
100 DivergenceEvent::ReconcileRequired | DivergenceEvent::SequenceGap { .. } => {
101 self.health.mark_public_gap();
102 }
103 DivergenceEvent::StateDivergence => self.health.mark_state_divergence(),
104 },
105 }
106
107 Ok(())
108 }
109
110 pub fn apply_private_event(&mut self, event: PrivateLaneEvent) {
111 match event {
112 PrivateLaneEvent::Balance(balance) => {
113 self.health.observe_private_message(balance.updated_at);
114 self.balances.insert(balance.asset.clone(), balance);
115 }
116 PrivateLaneEvent::Position(position) => {
117 self.health.observe_private_message(position.updated_at);
118 self.positions
119 .insert(position.instrument_id.clone(), position);
120 }
121 PrivateLaneEvent::Order(order) => {
122 self.health.observe_private_message(order.updated_at);
123 self.orders.insert(order.order_id.clone(), order);
124 }
125 PrivateLaneEvent::Execution(execution) => {
126 self.health.observe_private_message(execution.executed_at);
127 if self
128 .executions
129 .iter()
130 .any(|existing| existing.execution_id == execution.execution_id)
131 {
132 return;
133 }
134 self.executions.push_back(execution);
135 while self.executions.len() > self.state_policy.execution_capacity {
136 let _ = self.executions.pop_front();
137 }
138 }
139 PrivateLaneEvent::Divergence(divergence) => match divergence {
140 DivergenceEvent::ReconcileRequired => self.health.mark_reconcile_required(),
141 DivergenceEvent::SequenceGap { .. } => self.health.mark_private_gap(),
142 DivergenceEvent::StateDivergence => {
143 self.health.mark_state_divergence();
144 }
145 },
146 }
147 }
148
149 pub fn apply_command_receipt(&mut self, receipt: &CommandReceipt) {
150 match receipt.status {
151 CommandStatus::Accepted => {}
152 CommandStatus::Rejected => {}
153 CommandStatus::UnknownExecution => self.health.mark_command_uncertain(),
154 }
155 }
156
157 pub fn replace_instruments(&mut self, instruments: impl IntoIterator<Item = InstrumentSpec>) {
158 self.instruments = instruments
159 .into_iter()
160 .map(|spec| (spec.instrument_id.clone(), spec))
161 .collect();
162 self.prune_orphan_market_state();
163 }
164
165 pub fn replace_account_snapshot(&mut self, snapshot: AccountSnapshot) {
166 self.balances = snapshot
167 .balances
168 .into_iter()
169 .map(|balance| (balance.asset.clone(), balance))
170 .collect();
171 self.account_summary = snapshot.summary;
172 let updated_at = self
173 .account_summary
174 .as_ref()
175 .map(|summary| summary.updated_at)
176 .or_else(|| {
177 self.balances
178 .values()
179 .map(|balance| balance.updated_at)
180 .max()
181 });
182 if let Some(updated_at) = updated_at {
183 self.health.observe_private_message(updated_at);
184 }
185 }
186
187 pub fn replace_positions(&mut self, positions: Vec<Position>) {
188 let updated_at = positions.iter().map(|position| position.updated_at).max();
189 self.positions = positions
190 .into_iter()
191 .map(|position| (position.instrument_id.clone(), position))
192 .collect();
193 if let Some(updated_at) = updated_at {
194 self.health.observe_private_message(updated_at);
195 }
196 }
197
198 pub fn replace_open_orders(&mut self, open_orders: Vec<Order>) {
199 let updated_at = open_orders.iter().map(|order| order.updated_at).max();
200 self.orders.retain(|_, order| {
201 !matches!(
202 order.status,
203 OrderStatus::New | OrderStatus::PartiallyFilled
204 )
205 });
206 for order in open_orders {
207 self.orders.insert(order.order_id.clone(), order);
208 }
209 if let Some(updated_at) = updated_at {
210 self.health.observe_private_message(updated_at);
211 }
212 }
213
214 pub fn merge_order_history(&mut self, orders: Vec<Order>) {
215 let updated_at = orders.iter().map(|order| order.updated_at).max();
216 for order in orders {
217 self.orders.insert(order.order_id.clone(), order);
218 }
219 if let Some(updated_at) = updated_at {
220 self.health.observe_private_message(updated_at);
221 }
222 }
223
224 pub fn merge_executions(&mut self, executions: Vec<Execution>) {
225 let updated_at = executions
226 .iter()
227 .map(|execution| execution.executed_at)
228 .max();
229 for execution in executions {
230 if self
231 .executions
232 .iter()
233 .any(|existing| existing.execution_id == execution.execution_id)
234 {
235 continue;
236 }
237 self.executions.push_back(execution);
238 while self.executions.len() > self.state_policy.execution_capacity {
239 let _ = self.executions.pop_front();
240 }
241 }
242 if let Some(updated_at) = updated_at {
243 self.health.observe_private_message(updated_at);
244 }
245 }
246
247 pub fn apply_private_snapshot(&mut self, snapshot: PrivateSnapshot) {
248 if let Some(account) = snapshot.account {
249 self.replace_account_snapshot(account);
250 }
251 self.replace_positions(snapshot.positions);
252 self.replace_open_orders(snapshot.open_orders);
253 }
254
255 pub fn apply_reconcile_report(&mut self, report: &ReconcileReport) {
256 match report.outcome {
257 ReconcileOutcome::Synchronized => self.health.mark_reconcile_complete(),
258 ReconcileOutcome::StillUncertain => self.health.mark_reconcile_required(),
259 ReconcileOutcome::Diverged => self.health.mark_state_divergence(),
260 }
261 }
262
263 pub fn mark_rest_success(&mut self, clock_skew_ms: Option<i64>) {
264 self.health.observe_rest_success(clock_skew_ms);
265 }
266
267 pub fn mark_public_disconnect(&mut self) {
268 self.health.mark_public_disconnect();
269 }
270
271 pub fn mark_private_disconnect(&mut self) {
272 self.health.mark_private_disconnect();
273 }
274
275 pub fn mark_reconnect(&mut self) {
276 self.health.mark_reconnect();
277 }
278
279 pub fn mark_snapshot_age(&mut self, age_ms: u64, stale_after_ms: u64) {
280 self.health.mark_snapshot_age(age_ms, stale_after_ms);
281 }
282
283 #[must_use]
284 pub fn ticker(&self, instrument_id: &InstrumentId) -> Option<&Ticker> {
285 self.tickers.get(instrument_id)
286 }
287
288 #[must_use]
289 pub fn recent_trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
290 self.recent_trades
291 .get(instrument_id)
292 .map(|trades| trades.iter().cloned().collect())
293 }
294
295 #[must_use]
296 pub fn book_top(&self, instrument_id: &InstrumentId) -> Option<&BookTop> {
297 self.book_tops.get(instrument_id)
298 }
299
300 #[must_use]
301 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRate> {
302 self.funding_rates.get(instrument_id)
303 }
304
305 #[must_use]
306 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPrice> {
307 self.mark_prices.get(instrument_id)
308 }
309
310 #[must_use]
311 pub fn open_interest(&self, instrument_id: &InstrumentId) -> Option<&OpenInterest> {
312 self.open_interest.get(instrument_id)
313 }
314
315 #[must_use]
316 pub fn liquidations(&self, instrument_id: &InstrumentId) -> Option<Vec<Liquidation>> {
317 self.liquidations
318 .get(instrument_id)
319 .map(|events| events.iter().cloned().collect())
320 }
321
322 #[must_use]
323 pub fn balances(&self) -> Vec<Balance> {
324 self.balances.values().cloned().collect()
325 }
326
327 #[must_use]
328 pub fn account_summary(&self) -> Option<AccountSummary> {
329 if let Some(summary) = &self.account_summary {
330 return Some(summary.clone());
331 }
332
333 let mut balances = self.balances.values();
334 let first = balances.next()?;
335 let mut wallet = first.wallet_balance;
336 let mut available = first.available_balance;
337
338 for balance in balances {
339 wallet = wallet
340 .value()
341 .checked_add(balance.wallet_balance.value())
342 .map(Into::into)?;
343 available = available
344 .value()
345 .checked_add(balance.available_balance.value())
346 .map(Into::into)?;
347 }
348
349 Some(AccountSummary {
350 total_wallet_balance: wallet,
351 total_available_balance: available,
352 total_unrealized_pnl: crate::numeric::Amount::new(0.into()),
353 updated_at: first.updated_at,
354 })
355 }
356
357 #[must_use]
358 pub fn positions(&self) -> Vec<Position> {
359 self.positions.values().cloned().collect()
360 }
361
362 #[must_use]
363 pub fn orders(&self) -> Vec<Order> {
364 self.orders.values().cloned().collect()
365 }
366
367 #[must_use]
368 pub fn open_orders(&self) -> Vec<Order> {
369 self.orders
370 .values()
371 .filter(|order| {
372 matches!(
373 order.status,
374 OrderStatus::New | OrderStatus::PartiallyFilled
375 )
376 })
377 .cloned()
378 .collect()
379 }
380
381 #[must_use]
382 pub fn executions(&self) -> Vec<Execution> {
383 self.executions.iter().cloned().collect()
384 }
385
386 #[must_use]
387 pub fn latest_order_update_at(
388 &self,
389 instrument_id: &InstrumentId,
390 ) -> Option<crate::primitives::TimestampMs> {
391 self.orders
392 .values()
393 .filter(|order| &order.instrument_id == instrument_id)
394 .map(|order| order.updated_at)
395 .max()
396 }
397
398 #[must_use]
399 pub fn latest_execution_at(
400 &self,
401 instrument_id: &InstrumentId,
402 ) -> Option<crate::primitives::TimestampMs> {
403 self.executions
404 .iter()
405 .filter(|execution| &execution.instrument_id == instrument_id)
406 .map(|execution| execution.executed_at)
407 .max()
408 }
409
410 #[must_use]
411 pub const fn health(&self) -> &HealthReport {
412 &self.health
413 }
414
415 #[must_use]
416 pub fn instrument_specs(&self) -> Vec<InstrumentSpec> {
417 self.instruments.values().cloned().collect()
418 }
419
420 #[must_use]
421 pub const fn venue(&self) -> Venue {
422 self.venue
423 }
424
425 #[must_use]
426 pub const fn product(&self) -> Product {
427 self.product
428 }
429
430 fn apply_fast_ticker(&mut self, ticker: FastTicker) -> Result<()> {
431 let unified = {
432 let spec = self.spec(&ticker.instrument_id)?;
433 ticker.to_unified(spec)
434 };
435 self.health.observe_public_message(ticker.event_time);
436 self.tickers.insert(ticker.instrument_id.clone(), unified);
437 Ok(())
438 }
439
440 fn apply_fast_trade(&mut self, trade: FastTrade) -> Result<()> {
441 let unified = {
442 let spec = self.spec(&trade.instrument_id)?;
443 trade.to_unified(spec)
444 };
445 self.health.observe_public_message(trade.event_time);
446 let entry = self
447 .recent_trades
448 .entry(trade.instrument_id.clone())
449 .or_default();
450 if entry
451 .back()
452 .is_some_and(|existing| existing.trade_id == unified.trade_id)
453 {
454 return Ok(());
455 }
456 entry.push_back(unified);
457 while entry.len() > self.state_policy.recent_trade_capacity {
458 let _ = entry.pop_front();
459 }
460 Ok(())
461 }
462
463 fn apply_fast_book_top(&mut self, book_top: FastBookTop) -> Result<()> {
464 let unified = {
465 let spec = self.spec(&book_top.instrument_id)?;
466 book_top.to_unified(spec)
467 };
468 self.health.observe_public_message(book_top.event_time);
469 self.book_tops
470 .insert(book_top.instrument_id.clone(), unified);
471 Ok(())
472 }
473
474 fn apply_fast_kline(&mut self, kline: FastKline) -> Result<()> {
475 let unified = {
476 let spec = self.spec(&kline.instrument_id)?;
477 kline.to_unified(spec)
478 };
479 self.health.observe_public_message(kline.close_time);
480 self.klines.insert(kline.instrument_id.clone(), unified);
481 Ok(())
482 }
483
484 fn apply_fast_mark_price(&mut self, mark_price: FastMarkPrice) -> Result<()> {
485 let unified = {
486 let spec = self.spec(&mark_price.instrument_id)?;
487 mark_price.to_unified(spec)
488 };
489 self.health.observe_public_message(mark_price.event_time);
490 self.mark_prices
491 .insert(mark_price.instrument_id.clone(), unified);
492 Ok(())
493 }
494
495 fn apply_fast_liquidation(&mut self, liquidation: FastLiquidation) -> Result<()> {
496 let unified = {
497 let spec = self.spec(&liquidation.instrument_id)?;
498 liquidation.to_unified(spec)
499 };
500 self.health.observe_public_message(liquidation.event_time);
501 let entry = self
502 .liquidations
503 .entry(liquidation.instrument_id.clone())
504 .or_default();
505 entry.push_back(unified);
506 while entry.len() > self.state_policy.recent_trade_capacity {
507 let _ = entry.pop_front();
508 }
509 Ok(())
510 }
511
512 fn spec(&self, instrument_id: &InstrumentId) -> Result<&InstrumentSpec> {
513 self.instruments.get(instrument_id).ok_or_else(|| {
514 MarketError::new(
515 ErrorKind::ConfigError,
516 format!(
517 "unknown instrument {instrument_id} for {} {}",
518 self.venue, self.product
519 ),
520 )
521 })
522 }
523
524 fn prune_orphan_market_state(&mut self) {
525 let retain_instruments =
526 |instrument_id: &InstrumentId, instruments: &BTreeMap<InstrumentId, InstrumentSpec>| {
527 instruments.contains_key(instrument_id)
528 };
529
530 self.tickers
531 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
532 self.recent_trades
533 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
534 self.book_tops
535 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
536 self.klines
537 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
538 self.mark_prices
539 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
540 self.funding_rates
541 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
542 self.open_interest
543 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
544 self.liquidations
545 .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
546 }
547}
548
549#[cfg(test)]
550mod tests {
551 use rust_decimal::Decimal;
552
553 use crate::ids::{AssetCode, ClientOrderId, InstrumentId};
554 use crate::instrument::{InstrumentSpec, InstrumentStatus, InstrumentSupport};
555 use crate::numeric::{Notional, Price, Quantity};
556 use crate::primitives::TimestampMs;
557 use crate::trade::Order;
558 use crate::types::{MarketType, OrderStatus, OrderType, Product, Side, Venue};
559 use crate::{
560 execution::DivergenceEvent,
561 health::{DegradedReason, HealthStatus},
562 reconcile::{ReconcileOutcome, ReconcileReport, ReconcileTrigger},
563 };
564
565 use super::EngineState;
566
567 #[test]
568 fn open_orders_excludes_terminal_statuses() {
569 let instrument_id = InstrumentId::from("BTC/USDT:USDT");
570 let mut state = EngineState::new(
571 Venue::Binance,
572 Product::LinearUsdt,
573 crate::config::StatePolicy {
574 recent_trade_capacity: 16,
575 execution_capacity: 16,
576 },
577 [InstrumentSpec {
578 venue: Venue::Binance,
579 product: Product::LinearUsdt,
580 market_type: MarketType::LinearPerpetual,
581 instrument_id: instrument_id.clone(),
582 canonical_symbol: "BTC/USDT:USDT".into(),
583 native_symbol: "BTCUSDT".into(),
584 base: AssetCode::from("BTC"),
585 quote: AssetCode::from("USDT"),
586 settle: AssetCode::from("USDT"),
587 contract_size: Quantity::new(Decimal::ONE),
588 tick_size: Price::new(Decimal::new(1, 2)),
589 step_size: Quantity::new(Decimal::new(1, 3)),
590 min_qty: Quantity::new(Decimal::new(1, 3)),
591 min_notional: Notional::new(Decimal::new(5, 0)),
592 price_scale: 2,
593 qty_scale: 3,
594 quote_scale: 2,
595 max_leverage: None,
596 support: InstrumentSupport {
597 public_streams: true,
598 private_trading: true,
599 leverage_set: true,
600 margin_mode_set: true,
601 funding_rate: true,
602 open_interest: true,
603 },
604 status: InstrumentStatus::Active,
605 }],
606 );
607
608 state.apply_private_event(crate::execution::PrivateLaneEvent::Order(Order {
609 order_id: "open-order".into(),
610 client_order_id: Some(ClientOrderId::from("open-client")),
611 instrument_id: instrument_id.clone(),
612 side: Side::Buy,
613 order_type: OrderType::Limit,
614 time_in_force: None,
615 status: OrderStatus::New,
616 price: Some(Price::new(Decimal::new(70_000, 0))),
617 quantity: Quantity::new(Decimal::new(1, 3)),
618 filled_quantity: Quantity::new(Decimal::ZERO),
619 average_fill_price: None,
620 reduce_only: false,
621 post_only: false,
622 created_at: TimestampMs::new(1),
623 updated_at: TimestampMs::new(1),
624 venue_status: None,
625 }));
626 state.apply_private_event(crate::execution::PrivateLaneEvent::Order(Order {
627 order_id: "done-order".into(),
628 client_order_id: Some(ClientOrderId::from("done-client")),
629 instrument_id,
630 side: Side::Buy,
631 order_type: OrderType::Limit,
632 time_in_force: None,
633 status: OrderStatus::Filled,
634 price: Some(Price::new(Decimal::new(70_000, 0))),
635 quantity: Quantity::new(Decimal::new(1, 3)),
636 filled_quantity: Quantity::new(Decimal::new(1, 3)),
637 average_fill_price: Some(Price::new(Decimal::new(70_000, 0))),
638 reduce_only: false,
639 post_only: false,
640 created_at: TimestampMs::new(1),
641 updated_at: TimestampMs::new(2),
642 venue_status: None,
643 }));
644
645 assert_eq!(state.orders().len(), 2);
646 assert_eq!(state.open_orders().len(), 1);
647 }
648
649 #[test]
650 fn reconcile_report_clears_state_divergence_after_gap() {
651 let instrument_id = InstrumentId::from("BTC/USDT:USDT");
652 let mut state = EngineState::new(
653 Venue::Binance,
654 Product::LinearUsdt,
655 crate::config::StatePolicy {
656 recent_trade_capacity: 16,
657 execution_capacity: 16,
658 },
659 [InstrumentSpec {
660 venue: Venue::Binance,
661 product: Product::LinearUsdt,
662 market_type: MarketType::LinearPerpetual,
663 instrument_id: instrument_id.clone(),
664 canonical_symbol: "BTC/USDT:USDT".into(),
665 native_symbol: "BTCUSDT".into(),
666 base: AssetCode::from("BTC"),
667 quote: AssetCode::from("USDT"),
668 settle: AssetCode::from("USDT"),
669 contract_size: Quantity::new(Decimal::ONE),
670 tick_size: Price::new(Decimal::new(1, 2)),
671 step_size: Quantity::new(Decimal::new(1, 3)),
672 min_qty: Quantity::new(Decimal::new(1, 3)),
673 min_notional: Notional::new(Decimal::new(5, 0)),
674 price_scale: 2,
675 qty_scale: 3,
676 quote_scale: 2,
677 max_leverage: None,
678 support: InstrumentSupport {
679 public_streams: true,
680 private_trading: true,
681 leverage_set: true,
682 margin_mode_set: true,
683 funding_rate: true,
684 open_interest: true,
685 },
686 status: InstrumentStatus::Active,
687 }],
688 );
689
690 state.apply_private_event(crate::execution::PrivateLaneEvent::Divergence(
691 DivergenceEvent::SequenceGap { at: None },
692 ));
693 assert_eq!(state.health().status, HealthStatus::ReconcileRequired);
694 assert_eq!(
695 state.health().degraded_reason,
696 Some(DegradedReason::PrivateStreamGap)
697 );
698
699 state.apply_reconcile_report(&ReconcileReport {
700 trigger: ReconcileTrigger::SequenceGap,
701 outcome: ReconcileOutcome::Synchronized,
702 repaired_at: TimestampMs::new(5),
703 note: Some("unit reconcile".into()),
704 });
705 assert_eq!(state.health().status, HealthStatus::Disconnected);
706 assert_eq!(
707 state.health().degraded_reason,
708 Some(DegradedReason::Disconnected)
709 );
710 assert!(!state.health().state_divergence);
711 }
712}