Skip to main content

bat_markets_core/
state.rs

1use std::collections::{BTreeMap, VecDeque};
2
3use crate::account::{AccountSummary, Balance};
4use crate::command::{CommandReceipt, CommandStatus};
5use crate::config::StatePolicy;
6use crate::error::{ErrorKind, MarketError, Result};
7use crate::execution::{DivergenceEvent, PrivateLaneEvent, PublicLaneEvent};
8use crate::health::HealthReport;
9use crate::ids::{AssetCode, InstrumentId, OrderId};
10use crate::instrument::InstrumentSpec;
11use crate::market::{
12    BookTop, FastBookTop, FastKline, FastLiquidation, FastMarkPrice, FastTicker, FastTrade,
13    FundingRate, Kline, Liquidation, MarkPrice, OpenInterest, Ticker, TradeTick,
14};
15use crate::position::Position;
16use crate::reconcile::{AccountSnapshot, PrivateSnapshot, ReconcileOutcome, ReconcileReport};
17use crate::trade::{Execution, Order};
18use crate::types::{OrderStatus, Product, Venue};
19
20/// In-memory engine state used by the facade.
21#[derive(Clone, Debug)]
22pub struct EngineState {
23    venue: Venue,
24    product: Product,
25    state_policy: StatePolicy,
26    instruments: BTreeMap<InstrumentId, InstrumentSpec>,
27    tickers: BTreeMap<InstrumentId, Ticker>,
28    recent_trades: BTreeMap<InstrumentId, VecDeque<TradeTick>>,
29    book_tops: BTreeMap<InstrumentId, BookTop>,
30    klines: BTreeMap<InstrumentId, Kline>,
31    mark_prices: BTreeMap<InstrumentId, MarkPrice>,
32    funding_rates: BTreeMap<InstrumentId, FundingRate>,
33    open_interest: BTreeMap<InstrumentId, OpenInterest>,
34    liquidations: BTreeMap<InstrumentId, VecDeque<Liquidation>>,
35    balances: BTreeMap<AssetCode, Balance>,
36    account_summary: Option<AccountSummary>,
37    positions: BTreeMap<InstrumentId, Position>,
38    orders: BTreeMap<OrderId, Order>,
39    executions: VecDeque<Execution>,
40    health: HealthReport,
41}
42
43impl EngineState {
44    #[must_use]
45    pub fn new(
46        venue: Venue,
47        product: Product,
48        state_policy: StatePolicy,
49        instruments: impl IntoIterator<Item = InstrumentSpec>,
50    ) -> Self {
51        let instruments = instruments
52            .into_iter()
53            .map(|spec| (spec.instrument_id.clone(), spec))
54            .collect();
55
56        Self {
57            venue,
58            product,
59            state_policy,
60            instruments,
61            tickers: BTreeMap::new(),
62            recent_trades: BTreeMap::new(),
63            book_tops: BTreeMap::new(),
64            klines: BTreeMap::new(),
65            mark_prices: BTreeMap::new(),
66            funding_rates: BTreeMap::new(),
67            open_interest: BTreeMap::new(),
68            liquidations: BTreeMap::new(),
69            balances: BTreeMap::new(),
70            account_summary: None,
71            positions: BTreeMap::new(),
72            orders: BTreeMap::new(),
73            executions: VecDeque::new(),
74            health: HealthReport::default(),
75        }
76    }
77
78    pub fn apply_public_event(&mut self, event: PublicLaneEvent) -> Result<()> {
79        match event {
80            PublicLaneEvent::Ticker(ticker) => self.apply_fast_ticker(ticker)?,
81            PublicLaneEvent::Trade(trade) => self.apply_fast_trade(trade)?,
82            PublicLaneEvent::BookTop(book_top) => self.apply_fast_book_top(book_top)?,
83            PublicLaneEvent::OrderBookDelta(_) => {}
84            PublicLaneEvent::Kline(kline) => self.apply_fast_kline(kline)?,
85            PublicLaneEvent::MarkPrice(mark_price) => self.apply_fast_mark_price(mark_price)?,
86            PublicLaneEvent::FundingRate(funding_rate) => {
87                self.health.observe_public_message(funding_rate.event_time);
88                self.funding_rates
89                    .insert(funding_rate.instrument_id.clone(), funding_rate);
90            }
91            PublicLaneEvent::OpenInterest(open_interest) => {
92                self.health.observe_public_message(open_interest.event_time);
93                self.open_interest
94                    .insert(open_interest.instrument_id.clone(), open_interest);
95            }
96            PublicLaneEvent::Liquidation(liquidation) => {
97                self.apply_fast_liquidation(liquidation)?;
98            }
99            PublicLaneEvent::Divergence(divergence) => match divergence {
100                DivergenceEvent::ReconcileRequired | DivergenceEvent::SequenceGap { .. } => {
101                    self.health.mark_public_gap();
102                }
103                DivergenceEvent::StateDivergence => self.health.mark_state_divergence(),
104            },
105        }
106
107        Ok(())
108    }
109
110    pub fn apply_private_event(&mut self, event: PrivateLaneEvent) {
111        match event {
112            PrivateLaneEvent::Balance(balance) => {
113                self.health.observe_private_message(balance.updated_at);
114                self.balances.insert(balance.asset.clone(), balance);
115            }
116            PrivateLaneEvent::Position(position) => {
117                self.health.observe_private_message(position.updated_at);
118                self.positions
119                    .insert(position.instrument_id.clone(), position);
120            }
121            PrivateLaneEvent::Order(order) => {
122                self.health.observe_private_message(order.updated_at);
123                self.orders.insert(order.order_id.clone(), order);
124            }
125            PrivateLaneEvent::Execution(execution) => {
126                self.health.observe_private_message(execution.executed_at);
127                if self
128                    .executions
129                    .iter()
130                    .any(|existing| existing.execution_id == execution.execution_id)
131                {
132                    return;
133                }
134                self.executions.push_back(execution);
135                while self.executions.len() > self.state_policy.execution_capacity {
136                    let _ = self.executions.pop_front();
137                }
138            }
139            PrivateLaneEvent::Divergence(divergence) => match divergence {
140                DivergenceEvent::ReconcileRequired => self.health.mark_reconcile_required(),
141                DivergenceEvent::SequenceGap { .. } => self.health.mark_private_gap(),
142                DivergenceEvent::StateDivergence => {
143                    self.health.mark_state_divergence();
144                }
145            },
146        }
147    }
148
149    pub fn apply_command_receipt(&mut self, receipt: &CommandReceipt) {
150        match receipt.status {
151            CommandStatus::Accepted => {}
152            CommandStatus::Rejected => {}
153            CommandStatus::UnknownExecution => self.health.mark_command_uncertain(),
154        }
155    }
156
157    pub fn replace_instruments(&mut self, instruments: impl IntoIterator<Item = InstrumentSpec>) {
158        self.instruments = instruments
159            .into_iter()
160            .map(|spec| (spec.instrument_id.clone(), spec))
161            .collect();
162        self.prune_orphan_market_state();
163    }
164
165    pub fn replace_account_snapshot(&mut self, snapshot: AccountSnapshot) {
166        self.balances = snapshot
167            .balances
168            .into_iter()
169            .map(|balance| (balance.asset.clone(), balance))
170            .collect();
171        self.account_summary = snapshot.summary;
172        let updated_at = self
173            .account_summary
174            .as_ref()
175            .map(|summary| summary.updated_at)
176            .or_else(|| {
177                self.balances
178                    .values()
179                    .map(|balance| balance.updated_at)
180                    .max()
181            });
182        if let Some(updated_at) = updated_at {
183            self.health.observe_private_message(updated_at);
184        }
185    }
186
187    pub fn replace_positions(&mut self, positions: Vec<Position>) {
188        let updated_at = positions.iter().map(|position| position.updated_at).max();
189        self.positions = positions
190            .into_iter()
191            .map(|position| (position.instrument_id.clone(), position))
192            .collect();
193        if let Some(updated_at) = updated_at {
194            self.health.observe_private_message(updated_at);
195        }
196    }
197
198    pub fn replace_open_orders(&mut self, open_orders: Vec<Order>) {
199        let updated_at = open_orders.iter().map(|order| order.updated_at).max();
200        self.orders.retain(|_, order| {
201            !matches!(
202                order.status,
203                OrderStatus::New | OrderStatus::PartiallyFilled
204            )
205        });
206        for order in open_orders {
207            self.orders.insert(order.order_id.clone(), order);
208        }
209        if let Some(updated_at) = updated_at {
210            self.health.observe_private_message(updated_at);
211        }
212    }
213
214    pub fn merge_order_history(&mut self, orders: Vec<Order>) {
215        let updated_at = orders.iter().map(|order| order.updated_at).max();
216        for order in orders {
217            self.orders.insert(order.order_id.clone(), order);
218        }
219        if let Some(updated_at) = updated_at {
220            self.health.observe_private_message(updated_at);
221        }
222    }
223
224    pub fn merge_executions(&mut self, executions: Vec<Execution>) {
225        let updated_at = executions
226            .iter()
227            .map(|execution| execution.executed_at)
228            .max();
229        for execution in executions {
230            if self
231                .executions
232                .iter()
233                .any(|existing| existing.execution_id == execution.execution_id)
234            {
235                continue;
236            }
237            self.executions.push_back(execution);
238            while self.executions.len() > self.state_policy.execution_capacity {
239                let _ = self.executions.pop_front();
240            }
241        }
242        if let Some(updated_at) = updated_at {
243            self.health.observe_private_message(updated_at);
244        }
245    }
246
247    pub fn apply_private_snapshot(&mut self, snapshot: PrivateSnapshot) {
248        if let Some(account) = snapshot.account {
249            self.replace_account_snapshot(account);
250        }
251        self.replace_positions(snapshot.positions);
252        self.replace_open_orders(snapshot.open_orders);
253    }
254
255    pub fn apply_reconcile_report(&mut self, report: &ReconcileReport) {
256        match report.outcome {
257            ReconcileOutcome::Synchronized => self.health.mark_reconcile_complete(),
258            ReconcileOutcome::StillUncertain => self.health.mark_reconcile_required(),
259            ReconcileOutcome::Diverged => self.health.mark_state_divergence(),
260        }
261    }
262
263    pub fn mark_rest_success(&mut self, clock_skew_ms: Option<i64>) {
264        self.health.observe_rest_success(clock_skew_ms);
265    }
266
267    pub fn mark_public_disconnect(&mut self) {
268        self.health.mark_public_disconnect();
269    }
270
271    pub fn mark_private_disconnect(&mut self) {
272        self.health.mark_private_disconnect();
273    }
274
275    pub fn mark_reconnect(&mut self) {
276        self.health.mark_reconnect();
277    }
278
279    pub fn mark_snapshot_age(&mut self, age_ms: u64, stale_after_ms: u64) {
280        self.health.mark_snapshot_age(age_ms, stale_after_ms);
281    }
282
283    #[must_use]
284    pub fn ticker(&self, instrument_id: &InstrumentId) -> Option<&Ticker> {
285        self.tickers.get(instrument_id)
286    }
287
288    #[must_use]
289    pub fn recent_trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
290        self.recent_trades
291            .get(instrument_id)
292            .map(|trades| trades.iter().cloned().collect())
293    }
294
295    #[must_use]
296    pub fn book_top(&self, instrument_id: &InstrumentId) -> Option<&BookTop> {
297        self.book_tops.get(instrument_id)
298    }
299
300    #[must_use]
301    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRate> {
302        self.funding_rates.get(instrument_id)
303    }
304
305    #[must_use]
306    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPrice> {
307        self.mark_prices.get(instrument_id)
308    }
309
310    #[must_use]
311    pub fn open_interest(&self, instrument_id: &InstrumentId) -> Option<&OpenInterest> {
312        self.open_interest.get(instrument_id)
313    }
314
315    #[must_use]
316    pub fn liquidations(&self, instrument_id: &InstrumentId) -> Option<Vec<Liquidation>> {
317        self.liquidations
318            .get(instrument_id)
319            .map(|events| events.iter().cloned().collect())
320    }
321
322    #[must_use]
323    pub fn balances(&self) -> Vec<Balance> {
324        self.balances.values().cloned().collect()
325    }
326
327    #[must_use]
328    pub fn account_summary(&self) -> Option<AccountSummary> {
329        if let Some(summary) = &self.account_summary {
330            return Some(summary.clone());
331        }
332
333        let mut balances = self.balances.values();
334        let first = balances.next()?;
335        let mut wallet = first.wallet_balance;
336        let mut available = first.available_balance;
337
338        for balance in balances {
339            wallet = wallet
340                .value()
341                .checked_add(balance.wallet_balance.value())
342                .map(Into::into)?;
343            available = available
344                .value()
345                .checked_add(balance.available_balance.value())
346                .map(Into::into)?;
347        }
348
349        Some(AccountSummary {
350            total_wallet_balance: wallet,
351            total_available_balance: available,
352            total_unrealized_pnl: crate::numeric::Amount::new(0.into()),
353            updated_at: first.updated_at,
354        })
355    }
356
357    #[must_use]
358    pub fn positions(&self) -> Vec<Position> {
359        self.positions.values().cloned().collect()
360    }
361
362    #[must_use]
363    pub fn orders(&self) -> Vec<Order> {
364        self.orders.values().cloned().collect()
365    }
366
367    #[must_use]
368    pub fn open_orders(&self) -> Vec<Order> {
369        self.orders
370            .values()
371            .filter(|order| {
372                matches!(
373                    order.status,
374                    OrderStatus::New | OrderStatus::PartiallyFilled
375                )
376            })
377            .cloned()
378            .collect()
379    }
380
381    #[must_use]
382    pub fn executions(&self) -> Vec<Execution> {
383        self.executions.iter().cloned().collect()
384    }
385
386    #[must_use]
387    pub fn latest_order_update_at(
388        &self,
389        instrument_id: &InstrumentId,
390    ) -> Option<crate::primitives::TimestampMs> {
391        self.orders
392            .values()
393            .filter(|order| &order.instrument_id == instrument_id)
394            .map(|order| order.updated_at)
395            .max()
396    }
397
398    #[must_use]
399    pub fn latest_execution_at(
400        &self,
401        instrument_id: &InstrumentId,
402    ) -> Option<crate::primitives::TimestampMs> {
403        self.executions
404            .iter()
405            .filter(|execution| &execution.instrument_id == instrument_id)
406            .map(|execution| execution.executed_at)
407            .max()
408    }
409
410    #[must_use]
411    pub const fn health(&self) -> &HealthReport {
412        &self.health
413    }
414
415    #[must_use]
416    pub fn instrument_specs(&self) -> Vec<InstrumentSpec> {
417        self.instruments.values().cloned().collect()
418    }
419
420    #[must_use]
421    pub const fn venue(&self) -> Venue {
422        self.venue
423    }
424
425    #[must_use]
426    pub const fn product(&self) -> Product {
427        self.product
428    }
429
430    fn apply_fast_ticker(&mut self, ticker: FastTicker) -> Result<()> {
431        let unified = {
432            let spec = self.spec(&ticker.instrument_id)?;
433            ticker.to_unified(spec)
434        };
435        self.health.observe_public_message(ticker.event_time);
436        self.tickers.insert(ticker.instrument_id.clone(), unified);
437        Ok(())
438    }
439
440    fn apply_fast_trade(&mut self, trade: FastTrade) -> Result<()> {
441        let unified = {
442            let spec = self.spec(&trade.instrument_id)?;
443            trade.to_unified(spec)
444        };
445        self.health.observe_public_message(trade.event_time);
446        let entry = self
447            .recent_trades
448            .entry(trade.instrument_id.clone())
449            .or_default();
450        if entry
451            .back()
452            .is_some_and(|existing| existing.trade_id == unified.trade_id)
453        {
454            return Ok(());
455        }
456        entry.push_back(unified);
457        while entry.len() > self.state_policy.recent_trade_capacity {
458            let _ = entry.pop_front();
459        }
460        Ok(())
461    }
462
463    fn apply_fast_book_top(&mut self, book_top: FastBookTop) -> Result<()> {
464        let unified = {
465            let spec = self.spec(&book_top.instrument_id)?;
466            book_top.to_unified(spec)
467        };
468        self.health.observe_public_message(book_top.event_time);
469        self.book_tops
470            .insert(book_top.instrument_id.clone(), unified);
471        Ok(())
472    }
473
474    fn apply_fast_kline(&mut self, kline: FastKline) -> Result<()> {
475        let unified = {
476            let spec = self.spec(&kline.instrument_id)?;
477            kline.to_unified(spec)
478        };
479        self.health.observe_public_message(kline.close_time);
480        self.klines.insert(kline.instrument_id.clone(), unified);
481        Ok(())
482    }
483
484    fn apply_fast_mark_price(&mut self, mark_price: FastMarkPrice) -> Result<()> {
485        let unified = {
486            let spec = self.spec(&mark_price.instrument_id)?;
487            mark_price.to_unified(spec)
488        };
489        self.health.observe_public_message(mark_price.event_time);
490        self.mark_prices
491            .insert(mark_price.instrument_id.clone(), unified);
492        Ok(())
493    }
494
495    fn apply_fast_liquidation(&mut self, liquidation: FastLiquidation) -> Result<()> {
496        let unified = {
497            let spec = self.spec(&liquidation.instrument_id)?;
498            liquidation.to_unified(spec)
499        };
500        self.health.observe_public_message(liquidation.event_time);
501        let entry = self
502            .liquidations
503            .entry(liquidation.instrument_id.clone())
504            .or_default();
505        entry.push_back(unified);
506        while entry.len() > self.state_policy.recent_trade_capacity {
507            let _ = entry.pop_front();
508        }
509        Ok(())
510    }
511
512    fn spec(&self, instrument_id: &InstrumentId) -> Result<&InstrumentSpec> {
513        self.instruments.get(instrument_id).ok_or_else(|| {
514            MarketError::new(
515                ErrorKind::ConfigError,
516                format!(
517                    "unknown instrument {instrument_id} for {} {}",
518                    self.venue, self.product
519                ),
520            )
521        })
522    }
523
524    fn prune_orphan_market_state(&mut self) {
525        let retain_instruments =
526            |instrument_id: &InstrumentId, instruments: &BTreeMap<InstrumentId, InstrumentSpec>| {
527                instruments.contains_key(instrument_id)
528            };
529
530        self.tickers
531            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
532        self.recent_trades
533            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
534        self.book_tops
535            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
536        self.klines
537            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
538        self.mark_prices
539            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
540        self.funding_rates
541            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
542        self.open_interest
543            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
544        self.liquidations
545            .retain(|instrument_id, _| retain_instruments(instrument_id, &self.instruments));
546    }
547}
548
549#[cfg(test)]
550mod tests {
551    use rust_decimal::Decimal;
552
553    use crate::ids::{AssetCode, ClientOrderId, InstrumentId};
554    use crate::instrument::{InstrumentSpec, InstrumentStatus, InstrumentSupport};
555    use crate::numeric::{Notional, Price, Quantity};
556    use crate::primitives::TimestampMs;
557    use crate::trade::Order;
558    use crate::types::{MarketType, OrderStatus, OrderType, Product, Side, Venue};
559    use crate::{
560        execution::DivergenceEvent,
561        health::{DegradedReason, HealthStatus},
562        reconcile::{ReconcileOutcome, ReconcileReport, ReconcileTrigger},
563    };
564
565    use super::EngineState;
566
567    #[test]
568    fn open_orders_excludes_terminal_statuses() {
569        let instrument_id = InstrumentId::from("BTC/USDT:USDT");
570        let mut state = EngineState::new(
571            Venue::Binance,
572            Product::LinearUsdt,
573            crate::config::StatePolicy {
574                recent_trade_capacity: 16,
575                execution_capacity: 16,
576            },
577            [InstrumentSpec {
578                venue: Venue::Binance,
579                product: Product::LinearUsdt,
580                market_type: MarketType::LinearPerpetual,
581                instrument_id: instrument_id.clone(),
582                canonical_symbol: "BTC/USDT:USDT".into(),
583                native_symbol: "BTCUSDT".into(),
584                base: AssetCode::from("BTC"),
585                quote: AssetCode::from("USDT"),
586                settle: AssetCode::from("USDT"),
587                contract_size: Quantity::new(Decimal::ONE),
588                tick_size: Price::new(Decimal::new(1, 2)),
589                step_size: Quantity::new(Decimal::new(1, 3)),
590                min_qty: Quantity::new(Decimal::new(1, 3)),
591                min_notional: Notional::new(Decimal::new(5, 0)),
592                price_scale: 2,
593                qty_scale: 3,
594                quote_scale: 2,
595                max_leverage: None,
596                support: InstrumentSupport {
597                    public_streams: true,
598                    private_trading: true,
599                    leverage_set: true,
600                    margin_mode_set: true,
601                    funding_rate: true,
602                    open_interest: true,
603                },
604                status: InstrumentStatus::Active,
605            }],
606        );
607
608        state.apply_private_event(crate::execution::PrivateLaneEvent::Order(Order {
609            order_id: "open-order".into(),
610            client_order_id: Some(ClientOrderId::from("open-client")),
611            instrument_id: instrument_id.clone(),
612            side: Side::Buy,
613            order_type: OrderType::Limit,
614            time_in_force: None,
615            status: OrderStatus::New,
616            price: Some(Price::new(Decimal::new(70_000, 0))),
617            quantity: Quantity::new(Decimal::new(1, 3)),
618            filled_quantity: Quantity::new(Decimal::ZERO),
619            average_fill_price: None,
620            reduce_only: false,
621            post_only: false,
622            created_at: TimestampMs::new(1),
623            updated_at: TimestampMs::new(1),
624            venue_status: None,
625        }));
626        state.apply_private_event(crate::execution::PrivateLaneEvent::Order(Order {
627            order_id: "done-order".into(),
628            client_order_id: Some(ClientOrderId::from("done-client")),
629            instrument_id,
630            side: Side::Buy,
631            order_type: OrderType::Limit,
632            time_in_force: None,
633            status: OrderStatus::Filled,
634            price: Some(Price::new(Decimal::new(70_000, 0))),
635            quantity: Quantity::new(Decimal::new(1, 3)),
636            filled_quantity: Quantity::new(Decimal::new(1, 3)),
637            average_fill_price: Some(Price::new(Decimal::new(70_000, 0))),
638            reduce_only: false,
639            post_only: false,
640            created_at: TimestampMs::new(1),
641            updated_at: TimestampMs::new(2),
642            venue_status: None,
643        }));
644
645        assert_eq!(state.orders().len(), 2);
646        assert_eq!(state.open_orders().len(), 1);
647    }
648
649    #[test]
650    fn reconcile_report_clears_state_divergence_after_gap() {
651        let instrument_id = InstrumentId::from("BTC/USDT:USDT");
652        let mut state = EngineState::new(
653            Venue::Binance,
654            Product::LinearUsdt,
655            crate::config::StatePolicy {
656                recent_trade_capacity: 16,
657                execution_capacity: 16,
658            },
659            [InstrumentSpec {
660                venue: Venue::Binance,
661                product: Product::LinearUsdt,
662                market_type: MarketType::LinearPerpetual,
663                instrument_id: instrument_id.clone(),
664                canonical_symbol: "BTC/USDT:USDT".into(),
665                native_symbol: "BTCUSDT".into(),
666                base: AssetCode::from("BTC"),
667                quote: AssetCode::from("USDT"),
668                settle: AssetCode::from("USDT"),
669                contract_size: Quantity::new(Decimal::ONE),
670                tick_size: Price::new(Decimal::new(1, 2)),
671                step_size: Quantity::new(Decimal::new(1, 3)),
672                min_qty: Quantity::new(Decimal::new(1, 3)),
673                min_notional: Notional::new(Decimal::new(5, 0)),
674                price_scale: 2,
675                qty_scale: 3,
676                quote_scale: 2,
677                max_leverage: None,
678                support: InstrumentSupport {
679                    public_streams: true,
680                    private_trading: true,
681                    leverage_set: true,
682                    margin_mode_set: true,
683                    funding_rate: true,
684                    open_interest: true,
685                },
686                status: InstrumentStatus::Active,
687            }],
688        );
689
690        state.apply_private_event(crate::execution::PrivateLaneEvent::Divergence(
691            DivergenceEvent::SequenceGap { at: None },
692        ));
693        assert_eq!(state.health().status, HealthStatus::ReconcileRequired);
694        assert_eq!(
695            state.health().degraded_reason,
696            Some(DegradedReason::PrivateStreamGap)
697        );
698
699        state.apply_reconcile_report(&ReconcileReport {
700            trigger: ReconcileTrigger::SequenceGap,
701            outcome: ReconcileOutcome::Synchronized,
702            repaired_at: TimestampMs::new(5),
703            note: Some("unit reconcile".into()),
704        });
705        assert_eq!(state.health().status, HealthStatus::Disconnected);
706        assert_eq!(
707            state.health().degraded_reason,
708            Some(DegradedReason::Disconnected)
709        );
710        assert!(!state.health().state_divergence);
711    }
712}