pricelevel 0.8.1

A high-performance, lock-free price level implementation for limit order books in Rust. This library provides the building blocks for creating efficient trading systems with support for multiple order types and concurrent access patterns.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
//! Core price level implementation

use crate::UuidGenerator;
use crate::errors::PriceLevelError;
use crate::execution::{MatchResult, TakerKind, Trade};
use crate::orders::{Id, OrderType, OrderUpdate, TimeInForce};
use crate::price_level::order_queue::{FrontAction, FrontOutcome, OrderQueue};
use crate::price_level::{PriceLevelSnapshot, PriceLevelSnapshotPackage, PriceLevelStatistics};
use crate::utils::{Price, Quantity, TimestampMs};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::str::FromStr;

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};

/// A lock-free implementation of a price level in a limit order book
#[derive(Debug)]
pub struct PriceLevel {
    /// The price of this level
    price: u128,

    /// Total visible quantity at this price level
    visible_quantity: AtomicU64,

    /// Total hidden quantity at this price level
    hidden_quantity: AtomicU64,

    /// Number of orders at this price level
    order_count: AtomicUsize,

    /// Queue of orders at this price level
    orders: OrderQueue,

    /// Statistics for this price level
    stats: Arc<PriceLevelStatistics>,
}

impl PriceLevel {
    /// Reconstructs a price level directly from a snapshot.
    ///
    /// The rebuilt level carries the per-level statistics persisted in the
    /// snapshot (orders added / removed / executed, quantity / value executed,
    /// waiting-time aggregates, and execution / arrival timestamps) rather than
    /// a fresh, zeroed set — so a restored level resumes with its recorded
    /// history.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::InvalidOperation`] if recomputing the snapshot
    /// aggregates overflows `u64`.
    pub fn from_snapshot(mut snapshot: PriceLevelSnapshot) -> Result<Self, PriceLevelError> {
        snapshot.refresh_aggregates()?;

        let order_count = snapshot.orders().len();
        let visible_quantity = snapshot.visible_quantity().as_u64();
        let hidden_quantity = snapshot.hidden_quantity().as_u64();
        let price = snapshot.price().as_u128();
        // Clone the persisted statistics before consuming the snapshot's orders.
        let stats = (*snapshot.statistics()).clone();
        let queue = OrderQueue::from(snapshot.into_orders());

        Ok(Self {
            price,
            visible_quantity: AtomicU64::new(visible_quantity),
            hidden_quantity: AtomicU64::new(hidden_quantity),
            order_count: AtomicUsize::new(order_count),
            orders: queue,
            stats: Arc::new(stats),
        })
    }

    /// Reconstructs a price level from a checksum-protected snapshot package.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::ChecksumMismatch`] if the package's embedded
    /// SHA-256 checksum does not match its payload (tampered or corrupted
    /// snapshot), [`PriceLevelError::SerializationError`] if re-encoding the
    /// payload to recompute that checksum fails,
    /// [`PriceLevelError::InvalidOperation`] if the package carries an
    /// unsupported snapshot format version, and propagates any
    /// [`PriceLevelError`] from rebuilding the level out of the validated
    /// snapshot.
    pub fn from_snapshot_package(
        package: PriceLevelSnapshotPackage,
    ) -> Result<Self, PriceLevelError> {
        let snapshot = package.into_snapshot()?;
        Self::from_snapshot(snapshot)
    }

    /// Restores a price level from its snapshot JSON representation.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::DeserializationError`] if `data` is not a
    /// valid snapshot-package JSON document, [`PriceLevelError::ChecksumMismatch`]
    /// if the decoded package's SHA-256 checksum does not match its payload,
    /// [`PriceLevelError::SerializationError`] if re-encoding the payload to
    /// recompute that checksum fails, and [`PriceLevelError::InvalidOperation`]
    /// on an unsupported snapshot format version.
    pub fn from_snapshot_json(data: &str) -> Result<Self, PriceLevelError> {
        let package = PriceLevelSnapshotPackage::from_json(data)?;
        Self::from_snapshot_package(package)
    }
}

impl PriceLevel {
    /// Create a new price level
    #[must_use]
    pub fn new(price: u128) -> Self {
        Self {
            price,
            visible_quantity: AtomicU64::new(0),
            hidden_quantity: AtomicU64::new(0),
            order_count: AtomicUsize::new(0),
            orders: OrderQueue::new(),
            stats: Arc::new(PriceLevelStatistics::new()),
        }
    }

    /// Get the price of this level
    #[must_use]
    pub fn price(&self) -> u128 {
        self.price
    }

    /// Get the visible quantity, in quantity units.
    ///
    /// This is an **advisory, eventually-consistent** read: it loads a single
    /// atomic counter, which under concurrent `add_order` / `match_order` /
    /// `update_order` can briefly lead or lag the queue contents (it may not yet
    /// include an order already in the queue, or still count one just removed).
    /// The relative order of the counter update and the queue mutation is not a
    /// guaranteed cross-method invariant — different paths order them
    /// differently (e.g. iceberg replenishment in `match_order` adjusts the
    /// counters before pushing the refreshed tranche). Treat any single counter
    /// read as approximate; for a reading where the counters and the order list
    /// are guaranteed mutually consistent, take a [`Self::snapshot`] and read
    /// from it.
    #[must_use]
    pub fn visible_quantity(&self) -> u64 {
        // `Relaxed`: this counter is advisory / eventually-consistent (see the
        // doc above and issue #68). It carries NO happens-before relationship —
        // the lock-free `SkipMap` / `DashMap` in `OrderQueue` carry the real
        // ordering between producers and consumers, and `snapshot()` is the
        // mutually-consistent view. Nothing is published or synchronized through
        // this load, so `Acquire` would buy nothing.
        self.visible_quantity.load(Ordering::Relaxed)
    }

    /// Get the hidden quantity, in quantity units.
    ///
    /// Advisory / eventually-consistent under concurrent mutation — see
    /// [`Self::visible_quantity`]; use [`Self::snapshot`] for a consistent view.
    #[must_use]
    pub fn hidden_quantity(&self) -> u64 {
        // `Relaxed`: advisory counter, no happens-before rides on it — see
        // `visible_quantity` for the full rationale.
        self.hidden_quantity.load(Ordering::Relaxed)
    }

    /// Get the total quantity (visible + hidden), in quantity units.
    ///
    /// Advisory / eventually-consistent under concurrent mutation (sums two
    /// independent atomic counters) — see [`Self::visible_quantity`]; use
    /// [`Self::snapshot`] for a consistent view.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::InvalidOperation`] if `visible + hidden`
    /// overflows `u64`.
    pub fn total_quantity(&self) -> Result<u64, PriceLevelError> {
        self.visible_quantity()
            .checked_add(self.hidden_quantity())
            .ok_or_else(|| PriceLevelError::InvalidOperation {
                message: "price level total quantity overflow".to_string(),
            })
    }

    /// Get the number of orders.
    ///
    /// Advisory / eventually-consistent under concurrent mutation — see
    /// [`Self::visible_quantity`]; use [`Self::snapshot`] for a consistent view.
    #[must_use]
    pub fn order_count(&self) -> usize {
        // `Relaxed`: advisory counter, no happens-before rides on it — see
        // `visible_quantity` for the full rationale.
        self.order_count.load(Ordering::Relaxed)
    }

    /// Get the statistics for this price level
    #[must_use]
    pub fn stats(&self) -> Arc<PriceLevelStatistics> {
        self.stats.clone()
    }

    /// Add an order to this price level
    pub fn add_order(&self, order: OrderType<()>) -> Arc<OrderType<()>> {
        // Calculate quantities
        let visible_qty = order.visible_quantity().as_u64();
        let hidden_qty = order.hidden_quantity().as_u64();

        // On this path, publish to the queue FIRST, then bump the counters, so a
        // concurrent reader of this add tends to see the order resting before it
        // is counted rather than the reverse. This is a local ordering choice,
        // not a cross-method guarantee (other paths, e.g. iceberg replenishment
        // in `match_order`, adjust counters before pushing). The bare counters
        // are advisory under concurrency; `snapshot()` is the mutually-consistent
        // view (see `visible_quantity`).
        let order_arc = Arc::new(order);
        self.orders.push(order_arc.clone());

        // Update atomic counters. `Relaxed` on all three: these are the
        // advisory counters (issue #68). The queue publication above (the
        // `push` into `OrderQueue`'s `SkipMap` / `DashMap`) carries the actual
        // happens-before for a concurrent reader; the counters are not a
        // synchronization channel, so a release here would publish nothing a
        // reader relies on. The RMWs only need to be atomic, not ordered.
        self.visible_quantity
            .fetch_add(visible_qty, Ordering::Relaxed);
        self.hidden_quantity
            .fetch_add(hidden_qty, Ordering::Relaxed);
        self.order_count.fetch_add(1, Ordering::Relaxed);

        // Update statistics
        self.stats.record_order_added();

        order_arc
    }

    /// Creates a non-allocating iterator over current orders in this level.
    ///
    /// The iteration order is not guaranteed to be stable. Use [`Self::snapshot_orders`]
    /// when deterministic ordering is required.
    pub fn iter_orders(&self) -> impl Iterator<Item = Arc<OrderType<()>>> + '_ {
        self.orders.iter_orders()
    }

    /// Materializes a deterministic snapshot of orders sorted by timestamp.
    #[must_use]
    pub fn snapshot_orders(&self) -> Vec<Arc<OrderType<()>>> {
        self.orders.snapshot_vec()
    }

    /// Materializes the resting orders in the exact order [`Self::match_order`]
    /// consumes them: ascending **insertion sequence** (the oldest order first).
    ///
    /// Use this to predict the sweep — e.g. a self-trade-prevention pre-scan that
    /// must walk orders in consumption order to compute how much a taker may
    /// safely fill. It differs from the other two views:
    /// - [`Self::snapshot_orders`] sorts by `(timestamp, sequence)`, which equals
    ///   the sweep order *only* when timestamps are monotonic with insertion
    ///   (client-supplied or modify-restamped timestamps break that); and
    /// - [`Self::iter_orders`] has no stable order.
    ///
    /// Like `snapshot_orders`, this is a point-in-time view: a concurrent
    /// mutation after the call can change the queue.
    #[must_use]
    pub fn snapshot_by_insertion_seq(&self) -> Vec<Arc<OrderType<()>>> {
        self.orders.snapshot_by_seq()
    }

    /// Returns `true` if any resting order has matchable depth, i.e. a positive
    /// taker would cross at this level.
    ///
    /// Used by the post-only pre-check, which only needs to know whether *any*
    /// liquidity would be taken, not how much. Short-circuits on the first
    /// matchable order, so it is cheaper than `matchable_quantity`.
    ///
    /// Matchability is delegated to [`OrderType::is_matchable`] — the single
    /// source of truth shared with the fill-or-kill dry run — so the post-only
    /// verdict and the fill-or-kill prediction can never disagree about the same
    /// level. In particular a zero-visible iceberg (or auto-replenishing
    /// reserve) backed by hidden quantity counts as matchable depth, because the
    /// sweep will draw that hidden into visible and fill it.
    fn has_matchable_depth(&self) -> bool {
        self.iter_orders().any(|order| order.is_matchable())
    }

    /// Computes how much of `incoming_quantity` this level could actually fill
    /// for a taker, in quantity units, **without mutating the queue**.
    ///
    /// This is a deterministic dry run of the FIFO sweep: it replays
    /// [`OrderType::match_against`] over a snapshot of the resting queue in the
    /// same price-time order the real sweep uses, including iceberg / reserve
    /// replenishment (a refreshed tranche is re-queued at the tail) and the
    /// removal of a non-replenishing reserve once its visible part is drained.
    /// The returned value is therefore exactly what [`Self::match_order`] would
    /// consume — never an over- or under-count — which is what fill-or-kill
    /// (all-or-nothing) correctly depends on.
    ///
    /// It allocates a working snapshot and is only used on the cold
    /// fill-or-kill path, not on the hot `Gtc` sweep.
    fn matchable_quantity(&self, incoming_quantity: u64) -> u64 {
        if incoming_quantity == 0 {
            return 0;
        }

        // Snapshot the resting orders. `snapshot_orders()` is ordered by
        // `(timestamp, sequence)` whereas the real sweep pops by pure insertion
        // sequence; these coincide when timestamps are monotonic with insertion
        // (the normal case). The two can only differ in *visit order*, never in
        // the fillable *total*: every maker (including a fully-drained
        // replenishing iceberg/auto-reserve) contributes the same amount
        // regardless of when it is visited, so the sum this returns is exactly
        // what the sweep would consume — which is all fill-or-kill depends on.
        let mut pending: std::collections::VecDeque<Arc<OrderType<()>>> =
            self.snapshot_orders().into();
        let mut remaining = incoming_quantity;
        let mut filled: u64 = 0;

        while remaining > 0 {
            let Some(order) = pending.pop_front() else {
                break;
            };
            let (consumed, updated_order, hidden_reduced, new_remaining) =
                order.match_against(remaining);

            // No-progress safety guard, identical in shape to the real sweep
            // (see `match_order`): a front maker that consumes nothing, draws no
            // hidden, and leaves `remaining` unchanged while handing itself back
            // is set aside (dropped from `pending`) rather than re-queued at the
            // front, which would spin forever. Dropping it here is the dry-run
            // analogue of the real sweep setting it aside: it contributes
            // nothing to `filled`, and the makers behind it are still visited.
            // Keeping this logic identical to the real sweep is what guarantees
            // `matchable_quantity` predicts exactly what `match_order` consumes,
            // which fill-or-kill depends on.
            if consumed == 0
                && hidden_reduced == 0
                && new_remaining == remaining
                && updated_order.is_some()
            {
                continue;
            }

            // `consumed <= remaining <= incoming_quantity`, so this sum cannot
            // overflow `u64`; checked anyway per the no-saturate/no-wrap rule.
            filled = match filled.checked_add(consumed) {
                Some(total) => total,
                None => break,
            };
            remaining = new_remaining;

            if let Some(updated) = updated_order {
                if hidden_reduced > 0 {
                    // Replenished tranche loses time priority -> back of queue,
                    // exactly as the real sweep re-queues it.
                    pending.push_back(Arc::new(updated));
                } else {
                    // Pure partial fill keeps front position; the taker is now
                    // exhausted (`remaining == 0`) so the loop ends next check.
                    pending.push_front(Arc::new(updated));
                }
            }
        }

        filled
    }

    /// Matches an incoming taker order against existing orders at this price level.
    ///
    /// The sweep consumes resting makers in strict price-time (FIFO) order until
    /// the taker is filled or the matchable depth is exhausted. Trades are
    /// generated for each successful match, fully-consumed makers are removed,
    /// and the visible / hidden quantity counters and statistics are updated in
    /// lockstep with each execution.
    ///
    /// # Taker time-in-force / kind semantics
    ///
    /// Unlike earlier versions, this method **honors the taker's**
    /// [`TimeInForce`] and [`TakerKind`]. Let `available` be the quantity this
    /// level can actually fill for the taker (see `matchable_quantity`),
    /// capped at `incoming_quantity`:
    ///
    /// - [`TakerKind::PostOnly`]: must never take liquidity. If `available > 0`
    ///   the match is **rejected** — zero trades, the full `incoming_quantity`
    ///   reported as remaining, and the resting queue left untouched
    ///   ([`MatchResult::was_rejected`]).
    /// - [`TimeInForce::Fok`]: all-or-nothing. If `available < incoming_quantity`
    ///   the taker is **killed** — zero trades, full remaining, queue untouched
    ///   ([`MatchResult::was_killed`]). Otherwise it fills completely.
    /// - [`TimeInForce::Ioc`]: fills `available` and discards the remainder.
    ///   The taker is never enqueued here (this layer never rests a taker), so
    ///   the remainder is simply reported and dropped by the caller.
    /// - [`TimeInForce::Gtc`] / [`TimeInForce::Gtd`] / [`TimeInForce::Day`]:
    ///   fills `available`; the remainder is reported in
    ///   [`MatchResult::remaining_quantity`] for the order book to rest.
    /// - [`TakerKind::MarketToLimit`]: fills `available`; the remainder is
    ///   reported for the order book to convert into a resting limit. At this
    ///   single-level layer it fills like a standard taker.
    ///
    /// A post-only rejection and a fill-or-kill kill both leave zero trades and
    /// the full remainder; use [`MatchResult::outcome`] /
    /// [`MatchResult::was_rejected`] / [`MatchResult::was_killed`] to tell them
    /// apart from "the level had no liquidity".
    ///
    /// Time-in-force EXPIRY of resting **makers** is still NOT enforced here: a
    /// resting maker's `Gtd` / `Day` expiry is not consulted, so an expired
    /// maker still matches. Evicting or skipping expired makers is the
    /// caller's / order book's responsibility, keeping the match path a pure,
    /// deterministic sweep over the resting queue.
    ///
    /// # Arguments
    ///
    /// * `incoming_quantity`: The quantity of the incoming taker order to match.
    /// * `taker_order_id`: The ID of the incoming order (the "taker" order).
    /// * `taker_tif`: The taker's [`TimeInForce`], which governs how an unfilled
    ///   remainder is treated (kill / discard / rest).
    /// * `taker_kind`: The taker's [`TakerKind`] (standard / post-only /
    ///   market-to-limit).
    /// * `timestamp`: The taker timestamp (milliseconds since epoch) stamped
    ///   onto every emitted [`Trade`] and used as the execution time for
    ///   statistics. It is threaded in from the caller so the match path never
    ///   reads the wall clock — guaranteeing a deterministic, replayable trade
    ///   stream for a fixed input.
    /// * `trade_id_generator`: An atomic counter used to generate unique trade IDs.
    ///
    /// [`Trade`]: crate::execution::Trade
    /// [`TimeInForce`]: crate::orders::TimeInForce
    /// [`TakerKind`]: crate::execution::TakerKind
    /// [`MatchResult::was_rejected`]: crate::execution::MatchResult::was_rejected
    /// [`MatchResult::was_killed`]: crate::execution::MatchResult::was_killed
    /// [`MatchResult::outcome`]: crate::execution::MatchResult::outcome
    /// [`MatchResult::remaining_quantity`]: crate::execution::MatchResult::remaining_quantity
    ///
    /// # Returns
    ///
    /// A `MatchResult` carrying the generated trades, the remaining unmatched
    /// quantity, the completion flag, the fully-filled maker IDs, and the
    /// terminal [`MatchOutcome`](crate::execution::MatchOutcome).
    ///
    /// # Concurrency
    ///
    /// Resting orders are consumed in strict price-time (FIFO) order, and a
    /// partially-filled maker keeps its position at the front of the queue.
    ///
    /// **A concurrent `cancel` of the order currently being matched is safe and
    /// linearizable** (issue #81). The sweep keeps each maker resident in the
    /// queue and applies its decision (full consume / partial fill in place /
    /// replenish) while the maker's per-entry lock is held — the same lock a
    /// `cancel`'s removal takes (the internal `OrderQueue::match_front` step).
    /// The two therefore serialize: a cancel either fully wins (removes the
    /// maker and decrements the counters before the match observes it) or fully
    /// loses (the match commits first and the cancel then removes the residual
    /// it left). A cancel is never silently lost, and the counters never
    /// double-count. This closes the prior "lost cancel" window where a cancel
    /// landing between the matcher's pop and its reinsert would no-op while the
    /// matcher re-rested the residual.
    ///
    /// This method still assumes a **single logical matcher per level at a
    /// time**: two concurrent `match_order` calls on the *same* level are NOT
    /// made safe here and must be serialized by the caller (an order book
    /// typically matches a level from a single thread). The post-only /
    /// fill-or-kill pre-checks read the queue before the sweep; under that
    /// single-matcher assumption no concurrent `match_order` can change the
    /// matchable depth between the pre-check and the sweep. Concurrent
    /// `add_order` from other threads is likewise safe.
    pub fn match_order(
        &self,
        incoming_quantity: u64,
        taker_order_id: Id,
        taker_tif: TimeInForce,
        taker_kind: TakerKind,
        timestamp: TimestampMs,
        trade_id_generator: &UuidGenerator,
    ) -> MatchResult {
        // -------- Taker TIF / kind pre-checks (before any queue mutation) --------
        //
        // PostOnly: must never take liquidity. If any matchable depth exists for
        // a positive taker, reject without touching the queue. A zero-quantity
        // taker has nothing to cross, so it is not rejected (it falls through to
        // the vacuous-complete sweep below).
        if taker_kind.is_post_only() && incoming_quantity > 0 && self.has_matchable_depth() {
            tracing::debug!(
                taker_order_id = %taker_order_id,
                incoming_quantity,
                price = self.price,
                "post-only taker rejected: would take liquidity"
            );
            let mut result = MatchResult::new(taker_order_id, Quantity::new(incoming_quantity));
            result.mark_rejected(incoming_quantity);
            return result;
        }

        // Fill-or-kill: all-or-nothing. If the level cannot fill the taker in
        // full, kill it without touching the queue.
        if matches!(taker_tif, TimeInForce::Fok) && incoming_quantity > 0 {
            let available = self.matchable_quantity(incoming_quantity);
            if available < incoming_quantity {
                tracing::debug!(
                    taker_order_id = %taker_order_id,
                    incoming_quantity,
                    available,
                    price = self.price,
                    "fill-or-kill taker killed: insufficient depth"
                );
                let mut result = MatchResult::new(taker_order_id, Quantity::new(incoming_quantity));
                result.mark_killed(incoming_quantity);
                return result;
            }
        }

        // A single sweep emits at most one trade and at most one filled-order
        // id per resting order, so the live order count is an upper bound for
        // both vectors. Pre-size them to cut per-fill reallocations on the hot
        // path; the count is advisory (`Relaxed`) so the `Vec` still grows if a
        // concurrent `add_order` lands mid-sweep — capacity is a hint, not a
        // bound.
        let capacity = self.order_count();
        let mut result =
            MatchResult::with_capacity(taker_order_id, Quantity::new(incoming_quantity), capacity);
        let mut remaining = incoming_quantity;

        // No-progress safety guard. A maker that yields no progress
        // (`consumed == 0`, re-queued unchanged, `remaining` not decreased)
        // must not be re-selected this sweep, or the loop would spin forever on
        // the same front order. Because such a dead order sits at the FRONT
        // (FIFO), simply breaking would starve any matchable makers behind it.
        // [`OrderQueue::match_front`] leaves a `SetAside` maker untouched in the
        // queue and parks its insertion sequence in `set_aside` so the sweep
        // advances to the maker behind it without re-selecting it. The maker is
        // never modified, never traded against, and its counters are never
        // touched, so the queue and the atomic counters stay exactly as if it
        // had been skipped — keeping counter <-> queue consistency intact and
        // preserving its price-time position for the next sweep.
        //
        // `match_against`'s own progress fix means this guard should never fire
        // for the iceberg/reserve states it now handles; it is defense-in-depth
        // against any future zero-progress shape (e.g. a degenerate residual
        // from `with_reduced_quantity(0)`).
        let mut set_aside: std::collections::HashSet<u64> = std::collections::HashSet::new();

        // Per-step bookkeeping carried out of the locked decision closure. The
        // trade / stats / counter work is done AFTER the closure returns so it
        // is not performed while the per-entry lock is held; correctness vs a
        // concurrent cancel rides on the `FrontAction` the queue committed under
        // the lock (see `OrderQueue::match_front`), not on when these counters
        // move (they are advisory — issue #68).
        struct StepData {
            consumed: u64,
            hidden_reduced: u64,
            fully_consumed: bool,
            maker_id: Id,
            maker_side: crate::orders::Side,
            maker_price: u128,
            maker_timestamp: u64,
            /// Hidden quantity stranded by a full consume with no replenishment
            /// (drained reserve / leftover iceberg hidden), to subtract from the
            /// hidden counter.
            hidden_stranded: u64,
            /// The taker's remaining quantity after this maker is matched.
            new_remaining: u64,
        }

        // Either the maker progressed (carrying `StepData`) or it was parked by
        // the no-progress guard (`SetAside`). The parked variant threads the
        // maker's id and insertion seq OUT of the locked decision closure so the
        // no-progress `warn!` can name the parked maker without logging inside
        // the per-entry lock.
        enum StepResult {
            Progressed(StepData),
            SetAside { maker_id: Id, seq: u64 },
        }

        while remaining > 0 {
            let outcome = self.orders.match_front(&mut set_aside, |seq, order_arc| {
                let (consumed, updated_order, hidden_reduced, new_remaining) =
                    order_arc.match_against(remaining);

                // Detect a non-progressing maker: nothing consumed, no hidden
                // drawn, the taker's remaining unchanged, and the maker handed
                // back to us to re-queue. Park it and advance. Thread the maker
                // id + seq out so the caller can name it in the no-progress
                // `warn!` without logging under the per-entry lock.
                if consumed == 0
                    && hidden_reduced == 0
                    && new_remaining == remaining
                    && updated_order.is_some()
                {
                    return (
                        FrontAction::SetAside,
                        StepResult::SetAside {
                            maker_id: order_arc.id(),
                            seq,
                        },
                    );
                }

                let maker_id = order_arc.id();
                let maker_side = order_arc.side();
                let maker_price = order_arc.price().as_u128();
                let maker_timestamp = order_arc.timestamp().as_u64();

                // Hidden stranded by a full consume that does not replenish:
                // an iceberg / reserve whose visible was fully taken but whose
                // hidden is dropped (non-auto reserve, or a leftover the
                // `match_against` chose not to refresh). Identical condition to
                // the pre-#81 sweep's full-consume cleanup branch.
                let hidden_stranded = if updated_order.is_none() && hidden_reduced == 0 {
                    match order_arc {
                        OrderType::IcebergOrder {
                            hidden_quantity, ..
                        }
                        | OrderType::ReserveOrder {
                            hidden_quantity, ..
                        } if hidden_quantity.as_u64() > 0 => hidden_quantity.as_u64(),
                        _ => 0,
                    }
                } else {
                    0
                };

                let data = StepData {
                    consumed,
                    hidden_reduced,
                    fully_consumed: updated_order.is_none(),
                    maker_id,
                    maker_side,
                    maker_price,
                    maker_timestamp,
                    hidden_stranded,
                    new_remaining,
                };

                let action = match updated_order {
                    None => FrontAction::Remove,
                    Some(updated) => {
                        if hidden_reduced > 0 {
                            // Replenishment: refreshed tranche loses priority.
                            FrontAction::ReplaceAtTail(Arc::new(updated))
                        } else {
                            // Pure partial fill: keep priority in place.
                            FrontAction::KeepInPlace(Arc::new(updated))
                        }
                    }
                };

                (action, StepResult::Progressed(data))
            });

            match outcome {
                FrontOutcome::Empty => break,
                FrontOutcome::Matched { result: step } => {
                    let data = match step {
                        StepResult::SetAside { maker_id, seq } => {
                            // Parked by the queue; advance to the maker behind it.
                            // The id + seq were threaded out of the locked
                            // decision closure so we can name the parked maker
                            // here, outside the per-entry lock.
                            tracing::warn!(
                                price = self.price,
                                remaining,
                                order_id = %maker_id,
                                seq,
                                "match sweep: front maker made no progress; set aside to avoid re-pop"
                            );
                            continue;
                        }
                        StepResult::Progressed(data) => data,
                    };
                    let new_remaining = data.new_remaining;

                    if data.consumed > 0 {
                        // Update visible quantity counter. `Relaxed`: advisory
                        // counter (issue #68); the queue mutation committed
                        // inside `match_front` carries the real happens-before,
                        // not this RMW. The delta is keyed off the committed
                        // action so it never double-counts with a concurrent
                        // cancel (which decrements only the residual it removes).
                        self.visible_quantity
                            .fetch_sub(data.consumed, Ordering::Relaxed);

                        let trade_id = Id::from_uuid(trade_id_generator.next());

                        // A resting maker must never be the taker matching
                        // against it. Debug-only invariant of the caller.
                        debug_assert!(data.maker_id != taker_order_id, "self-fill: maker == taker");

                        let trade = Trade::with_timestamp(
                            trade_id,
                            taker_order_id,
                            data.maker_id,
                            Price::new(self.price),
                            Quantity::new(data.consumed),
                            data.maker_side.opposite(),
                            timestamp,
                        );

                        if result.add_trade(trade).is_err() {
                            remaining = new_remaining;
                            break;
                        }

                        if data.fully_consumed {
                            result.add_filled_order_id(data.maker_id);
                        }

                        let _ = self.stats.record_execution(
                            data.consumed,
                            data.maker_price,
                            data.maker_timestamp,
                            timestamp.as_u64(),
                        );
                    }

                    remaining = new_remaining;

                    if data.fully_consumed {
                        // Maker fully consumed and removed inside `match_front`.
                        self.order_count.fetch_sub(1, Ordering::Relaxed);
                        if data.hidden_stranded > 0 {
                            self.hidden_quantity
                                .fetch_sub(data.hidden_stranded, Ordering::Relaxed);
                        }
                    } else if data.hidden_reduced > 0 {
                        // Replenishment: a fresh tranche moved from hidden into
                        // visible. The maker stayed resident (re-sequenced in
                        // place by `match_front`), so only the counters move.
                        self.hidden_quantity
                            .fetch_sub(data.hidden_reduced, Ordering::Relaxed);
                        self.visible_quantity
                            .fetch_add(data.hidden_reduced, Ordering::Relaxed);
                    }
                    // Pure partial fill (KeepInPlace, hidden_reduced == 0):
                    // visible already decremented by `consumed` above; the maker
                    // stays resident with its residual. Nothing else to do.

                    if remaining == 0 {
                        break;
                    }
                }
            }
        }

        result.finalize(Quantity::new(remaining));

        result
    }

    /// Create a snapshot of the current price level state
    ///
    /// All aggregates are derived from a single materialized order vector so the
    /// snapshot is internally consistent under concurrent mutation: the counter
    /// fields can never disagree with a sum over the snapshot's own `orders`. We
    /// fold the vector instead of reading the live atomic counters separately,
    /// which would be a torn read (the atomics could advance between the counter
    /// load and the order materialization).
    #[must_use]
    pub fn snapshot(&self) -> PriceLevelSnapshot {
        // Materialize the orders exactly once; every aggregate is derived from
        // this snapshot so they are mutually consistent by construction.
        let orders = self.snapshot_orders();

        let order_count = orders.len();

        let mut visible_quantity: u64 = 0;
        let mut hidden_quantity: u64 = 0;

        for order in &orders {
            // Checked arithmetic per the crate's no-saturate/no-wrap rule.
            // `snapshot()` is infallible (changing it would ripple to
            // `snapshot_package` / `snapshot_to_json` and every caller), so the
            // overflow branch needs a value, not a `Result`. That branch is
            // unreachable for any state the level can represent: the level tracks
            // the same running total in a `u64` atomic counter, so a sum that
            // overflows `u64` here is one the level itself could never have held.
            // On that impossible branch we fall back to the live atomic counter —
            // the engine's own authoritative `u64` total (best-effort, since the
            // branch cannot occur for representable state).
            match visible_quantity.checked_add(order.visible_quantity().as_u64()) {
                Some(total) => visible_quantity = total,
                None => {
                    debug_assert!(false, "snapshot visible quantity overflow is unreachable");
                    visible_quantity = self.visible_quantity();
                }
            }

            match hidden_quantity.checked_add(order.hidden_quantity().as_u64()) {
                Some(total) => hidden_quantity = total,
                None => {
                    debug_assert!(false, "snapshot hidden quantity overflow is unreachable");
                    hidden_quantity = self.hidden_quantity();
                }
            }
        }

        // Persist the per-level statistics alongside the aggregates so the
        // snapshot round-trip reproduces the recorded execution history. The
        // clone snapshots the eight atomic counters (best-effort, like every
        // other read path); statistics are independent counters, not part of
        // the queue / counter consistency invariant.
        PriceLevelSnapshot::from_raw_parts_with_stats(
            Price::new(self.price),
            Quantity::new(visible_quantity),
            Quantity::new(hidden_quantity),
            order_count,
            orders,
            (*self.stats).clone(),
        )
    }

    /// Serialize the current price level state into a checksum-protected snapshot package.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::InvalidOperation`] if computing the snapshot's
    /// aggregate quantities overflows while building the package's checksummed
    /// payload, or [`PriceLevelError::SerializationError`] if encoding the
    /// snapshot payload to compute its SHA-256 checksum fails.
    pub fn snapshot_package(&self) -> Result<PriceLevelSnapshotPackage, PriceLevelError> {
        PriceLevelSnapshotPackage::new(self.snapshot())
    }

    /// Serialize the current price level state to JSON, including checksum metadata.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::InvalidOperation`] if building the snapshot
    /// package overflows an aggregate quantity, or
    /// [`PriceLevelError::SerializationError`] if the package cannot be encoded
    /// to JSON.
    pub fn snapshot_to_json(&self) -> Result<String, PriceLevelError> {
        self.snapshot_package()?.to_json()
    }
}

impl PriceLevel {
    /// Apply an update to an existing order at this price level.
    ///
    /// # Quantity-update priority policy
    ///
    /// For [`OrderUpdate::UpdateQuantity`] (and the same-price branch of
    /// [`OrderUpdate::UpdatePriceAndQuantity`]) this method follows the
    /// conventional exchange price-time-priority rules:
    ///
    /// - **Decrease or unchanged total quantity** keeps the maker's queue
    ///   position. The stored order is updated *in place* at its existing
    ///   insertion sequence, so it is consumed at the same point in FIFO order
    ///   as before. Reducing size never forfeits time priority.
    /// - **Increase in total quantity** demotes the order to the *back* of the
    ///   queue (it is assigned a fresh insertion sequence). Sizing an order up
    ///   loses time priority, matching standard exchange behaviour.
    ///
    /// Total quantity is `visible + hidden`; the branch is chosen by comparing
    /// the order's total before and after the update. Order types that cannot
    /// be resized fall back to an unchanged order and therefore take the
    /// in-place (position-preserving) branch.
    ///
    /// # Errors
    ///
    /// Returns [`PriceLevelError::InvalidOperation`] if an
    /// [`OrderUpdate::UpdatePrice`] / [`OrderUpdate::Replace`] would not move
    /// the order to a different price level, or if computing an order's total
    /// quantity overflows `u64`.
    #[must_use = "the updated order (or None when the order is absent) must be handled"]
    pub fn update_order(
        &self,
        update: OrderUpdate,
    ) -> Result<Option<Arc<OrderType<()>>>, PriceLevelError> {
        match update {
            OrderUpdate::UpdatePrice {
                order_id,
                new_price,
            } => {
                // If price changes, this order needs to be moved to a different price level
                // So we remove it from this level and return it for re-insertion elsewhere
                if new_price != Price::new(self.price) {
                    let order = self.orders.remove(order_id);

                    if let Some(ref order_arc) = order {
                        // Update atomic counters from the order actually removed
                        // from the queue above. `Relaxed` on all three: advisory
                        // counters (issue #68); the `OrderQueue::remove` carries
                        // the happens-before, not these counters.
                        let visible_qty = order_arc.visible_quantity().as_u64();
                        let hidden_qty = order_arc.hidden_quantity().as_u64();

                        self.visible_quantity
                            .fetch_sub(visible_qty, Ordering::Relaxed);
                        self.hidden_quantity
                            .fetch_sub(hidden_qty, Ordering::Relaxed);
                        self.order_count.fetch_sub(1, Ordering::Relaxed);

                        // Update statistics
                        self.stats.record_order_removed();
                    }

                    Ok(order)
                } else {
                    // If price is the same, this is a no-op at the price level
                    // (Should be handled at the order book level)
                    Err(PriceLevelError::InvalidOperation {
                        message: "Cannot update price to the same value".to_string(),
                    })
                }
            }

            OrderUpdate::UpdateQuantity {
                order_id,
                new_quantity,
            } => {
                // Read the current order to build the resized order and pick the
                // priority policy. The counter deltas below are taken from the
                // order actually removed/replaced under the queue's per-entry
                // lock — not from this pre-read — so a concurrent update cannot
                // drift `visible_quantity` / `hidden_quantity` from the queue.
                let Some(order) = self.orders.find(order_id) else {
                    return Ok(None); // Order not found
                };

                let prev_total = order
                    .visible_quantity()
                    .as_u64()
                    .checked_add(order.hidden_quantity().as_u64())
                    .ok_or_else(|| PriceLevelError::InvalidOperation {
                        message: "order total quantity overflow".to_string(),
                    })?;

                // Build the updated order. `with_reduced_quantity` sets the
                // (visible/main) quantity to exactly `new_quantity` for order
                // types that support resizing; types it cannot resize fall back
                // to an unchanged clone, so `new_total` equals the old total for
                // them and they take the position-preserving in-place branch.
                let new_order = order.with_reduced_quantity(new_quantity.as_u64());
                let new_visible = new_order.visible_quantity().as_u64();
                let new_hidden = new_order.hidden_quantity().as_u64();
                let new_total = new_visible.checked_add(new_hidden).ok_or_else(|| {
                    PriceLevelError::InvalidOperation {
                        message: "order total quantity overflow".to_string(),
                    }
                })?;

                let new_order_arc = Arc::new(new_order);

                // Perform the queue mutation and capture the order it actually
                // removed/replaced, so the counter deltas reflect the real
                // transition rather than the possibly-stale pre-read above.
                let old = if new_total > prev_total {
                    // Quantity INCREASE: demote to the back of the queue (mint a
                    // new sequence), losing time priority. Retains the
                    // remove+push shape; its concurrency window is the broader
                    // #81 work and is intentionally not addressed here.
                    let Some(removed) = self.orders.remove(order_id) else {
                        return Ok(None); // Removed by another thread.
                    };
                    self.orders.push(new_order_arc.clone());
                    removed
                } else {
                    // Quantity DECREASE or unchanged total: keep the maker's
                    // queue position by swapping the stored order in place at its
                    // existing insertion sequence, under the DashMap per-entry
                    // lock.
                    let Some(replaced) =
                        self.orders.update_in_place(order_id, new_order_arc.clone())
                    else {
                        return Ok(None); // Removed by another thread.
                    };
                    replaced
                };

                // Apply the counter deltas from the actual replaced order. A
                // single component (visible or hidden) can move in EITHER
                // direction even when the total shrinks or is unchanged, because
                // quantity can shift between the visible and hidden portions, so
                // handle both signs.
                let old_visible = old.visible_quantity().as_u64();
                let old_hidden = old.hidden_quantity().as_u64();
                // `Relaxed` on both branches: advisory counters (issue #68); the
                // queue mutation above (`update_in_place` / `remove` + `push`)
                // carries the happens-before, not these counter RMWs.
                let apply = |counter: &std::sync::atomic::AtomicU64, old: u64, new: u64| {
                    if new >= old {
                        counter.fetch_add(new - old, Ordering::Relaxed);
                    } else {
                        counter.fetch_sub(old - new, Ordering::Relaxed);
                    }
                };
                apply(&self.visible_quantity, old_visible, new_visible);
                apply(&self.hidden_quantity, old_hidden, new_hidden);

                Ok(Some(new_order_arc))
            }

            OrderUpdate::UpdatePriceAndQuantity {
                order_id,
                new_price,
                new_quantity,
            } => {
                // If price changes, remove the order and let the order book handle re-insertion
                if new_price != Price::new(self.price) {
                    let order = self.orders.remove(order_id);

                    if let Some(ref order_arc) = order {
                        // Update atomic counters from the order actually removed
                        // from the queue above. `Relaxed` on all three: advisory
                        // counters (issue #68); the `OrderQueue::remove` carries
                        // the happens-before, not these counters.
                        let visible_qty = order_arc.visible_quantity().as_u64();
                        let hidden_qty = order_arc.hidden_quantity().as_u64();

                        self.visible_quantity
                            .fetch_sub(visible_qty, Ordering::Relaxed);
                        self.hidden_quantity
                            .fetch_sub(hidden_qty, Ordering::Relaxed);
                        self.order_count.fetch_sub(1, Ordering::Relaxed);

                        // Update statistics
                        self.stats.record_order_removed();
                    }
                    Ok(order)
                } else {
                    // If price is the same, just update the quantity (reuse logic)
                    self.update_order(OrderUpdate::UpdateQuantity {
                        order_id,
                        new_quantity,
                    })
                }
            }

            OrderUpdate::Cancel { order_id } => {
                // Remove the order
                let order = self.orders.remove(order_id);

                if let Some(ref order_arc) = order {
                    // Update atomic counters from the order actually removed from
                    // the queue above. `Relaxed` on all three: advisory counters
                    // (issue #68); the `OrderQueue::remove` carries the
                    // happens-before, not these counters.
                    let visible_qty = order_arc.visible_quantity().as_u64();
                    let hidden_qty = order_arc.hidden_quantity().as_u64();

                    self.visible_quantity
                        .fetch_sub(visible_qty, Ordering::Relaxed);
                    self.hidden_quantity
                        .fetch_sub(hidden_qty, Ordering::Relaxed);
                    self.order_count.fetch_sub(1, Ordering::Relaxed);

                    // Update statistics
                    self.stats.record_order_removed();
                }

                Ok(order)
            }

            OrderUpdate::Replace {
                order_id,
                price,
                quantity,
                side: _,
            } => {
                // For replacement, check if the price is changing
                if price != Price::new(self.price) {
                    // If price is different, remove the order and let order book handle re-insertion
                    let order = self.orders.remove(order_id);

                    if let Some(ref order_arc) = order {
                        // Update atomic counters from the order actually removed
                        // from the queue above. `Relaxed` on all three: advisory
                        // counters (issue #68); the `OrderQueue::remove` carries
                        // the happens-before, not these counters.
                        let visible_qty = order_arc.visible_quantity().as_u64();
                        let hidden_qty = order_arc.hidden_quantity().as_u64();

                        self.visible_quantity
                            .fetch_sub(visible_qty, Ordering::Relaxed);
                        self.hidden_quantity
                            .fetch_sub(hidden_qty, Ordering::Relaxed);
                        self.order_count.fetch_sub(1, Ordering::Relaxed);

                        // Update statistics
                        self.stats.record_order_removed();
                    }

                    Ok(order)
                } else {
                    // If price is the same, just update the quantity
                    self.update_order(OrderUpdate::UpdateQuantity {
                        order_id,
                        new_quantity: quantity,
                    })
                }
            }
        }
    }
}

/// Serializable representation of a price level for easier data transfer and storage
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PriceLevelData {
    /// The price of this level
    pub price: u128,
    /// Total visible quantity at this price level
    pub visible_quantity: u64,
    /// Total hidden quantity at this price level
    pub hidden_quantity: u64,
    /// Number of orders at this price level
    pub order_count: usize,
    /// Orders at this price level
    pub orders: Vec<OrderType<()>>,
}

impl From<&PriceLevel> for PriceLevelData {
    fn from(price_level: &PriceLevel) -> Self {
        Self {
            price: price_level.price(),
            visible_quantity: price_level.visible_quantity(),
            hidden_quantity: price_level.hidden_quantity(),
            order_count: price_level.order_count(),
            orders: price_level
                .iter_orders()
                .map(|order_arc| *order_arc)
                .collect(),
        }
    }
}

impl From<&PriceLevelSnapshot> for PriceLevel {
    fn from(value: &PriceLevelSnapshot) -> Self {
        let mut snapshot = value.clone();
        let _ = snapshot.refresh_aggregates();

        let order_count = snapshot.orders().len();
        let visible_quantity = snapshot.visible_quantity().as_u64();
        let hidden_quantity = snapshot.hidden_quantity().as_u64();
        let price = snapshot.price().as_u128();
        // Preserve the persisted statistics instead of resetting them.
        let stats = (*snapshot.statistics()).clone();
        let queue = OrderQueue::from(snapshot.into_orders());

        Self {
            price,
            visible_quantity: AtomicU64::new(visible_quantity),
            hidden_quantity: AtomicU64::new(hidden_quantity),
            order_count: AtomicUsize::new(order_count),
            orders: queue,
            stats: Arc::new(stats),
        }
    }
}

impl TryFrom<PriceLevelData> for PriceLevel {
    type Error = PriceLevelError;

    fn try_from(data: PriceLevelData) -> Result<Self, Self::Error> {
        let price_level = PriceLevel::new(data.price);

        // Add orders to the price level
        for order in data.orders {
            price_level.add_order(order);
        }

        Ok(price_level)
    }
}

// Implement custom serialization for the atomic types
impl Serialize for PriceLevel {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        // Convert to a serializable representation
        let data: PriceLevelData = self.into();
        data.serialize(serializer)
    }
}

impl FromStr for PriceLevel {
    type Err = PriceLevelError;
    fn from_str(s: &str) -> Result<Self, Self::Err> {
        use std::borrow::Cow;

        if !s.starts_with("PriceLevel:") {
            return Err(PriceLevelError::ParseError {
                message: "Invalid format: missing 'PriceLevel:' prefix".to_string(),
            });
        }

        let content = &s["PriceLevel:".len()..];

        let mut parts = std::collections::HashMap::new();
        let remaining_content: Cow<str>;

        if let Some(orders_start) = content.find("orders=[") {
            let orders_end =
                content[orders_start..]
                    .find(']')
                    .ok_or_else(|| PriceLevelError::ParseError {
                        message: "Invalid format: unclosed orders bracket".to_string(),
                    })?
                    + orders_start;

            let orders_str = &content[orders_start + "orders=[".len()..orders_end];
            parts.insert("orders", orders_str);

            let before_orders = &content[..orders_start];
            let after_orders = &content[orders_end + 1..];
            remaining_content = Cow::Owned([before_orders, after_orders].join(""));
        } else {
            remaining_content = Cow::Borrowed(content);
        }

        for part in remaining_content.split(';').filter(|s| !s.is_empty()) {
            let mut iter = part.splitn(2, '=');
            if let (Some(key), Some(value)) = (iter.next(), iter.next()) {
                parts.insert(key, value);
            }
        }

        let price = parts
            .get("price")
            .and_then(|v| v.parse::<u128>().ok())
            .ok_or_else(|| PriceLevelError::ParseError {
                message: "Missing or invalid price".to_string(),
            })?;

        let price_level = PriceLevel::new(price);

        if let Some(orders_part) = parts.get("orders")
            && !orders_part.is_empty()
        {
            let mut bracket_level = 0;
            let mut last_split = 0;

            for (i, c) in orders_part.char_indices() {
                match c {
                    '(' | '[' => bracket_level += 1,
                    ')' | ']' => bracket_level -= 1,
                    ',' if bracket_level == 0 => {
                        let order_str = &orders_part[last_split..i];
                        let order = OrderType::<()>::from_str(order_str).map_err(|e| {
                            PriceLevelError::ParseError {
                                message: format!("Order parse error: {e}"),
                            }
                        })?;
                        price_level.add_order(order);
                        last_split = i + 1;
                    }
                    _ => {}
                }
            }

            let order_str = &orders_part[last_split..];
            if !order_str.is_empty() {
                let order = OrderType::<()>::from_str(order_str).map_err(|e| {
                    PriceLevelError::ParseError {
                        message: format!("Order parse error: {e}"),
                    }
                })?;
                price_level.add_order(order);
            }
        }

        Ok(price_level)
    }
}

impl<'de> Deserialize<'de> for PriceLevel {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        // Deserialize into the data representation
        let data = PriceLevelData::deserialize(deserializer)?;

        // Convert to PriceLevel
        PriceLevel::try_from(data).map_err(serde::de::Error::custom)
    }
}

impl PartialEq for PriceLevel {
    fn eq(&self, other: &Self) -> bool {
        self.price == other.price
    }
}

impl Eq for PriceLevel {}

impl PartialOrd for PriceLevel {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for PriceLevel {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.price.cmp(&other.price)
    }
}

impl Display for PriceLevel {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "PriceLevel:price={};visible_quantity={};hidden_quantity={};order_count={};orders=[",
            self.price(),
            self.visible_quantity(),
            self.hidden_quantity(),
            self.order_count()
        )?;

        let mut first = true;
        for order in self.snapshot_orders() {
            if !first {
                write!(f, ",")?;
            }
            write!(f, "{order}")?;
            first = false;
        }

        write!(f, "]")
    }
}