pricelevel/price_level/
order_queue.rs1use crate::errors::PriceLevelError;
2use crate::orders::{OrderId, OrderType};
3use crossbeam::queue::SegQueue;
4use dashmap::DashMap;
5use serde::de::{SeqAccess, Visitor};
6use serde::ser::SerializeSeq;
7use serde::{Deserialize, Deserializer, Serialize, Serializer};
8use std::fmt;
9use std::fmt::Display;
10use std::marker::PhantomData;
11use std::str::FromStr;
12use std::sync::Arc;
13
14#[derive(Debug)]
16pub struct OrderQueue {
17 orders: DashMap<OrderId, Arc<OrderType<()>>>,
19 order_ids: SegQueue<OrderId>,
21}
22
23impl OrderQueue {
24 pub fn new() -> Self {
26 Self {
27 orders: DashMap::new(),
28 order_ids: SegQueue::new(),
29 }
30 }
31
32 pub fn push(&self, order: Arc<OrderType<()>>) {
34 let order_id = order.id();
35 self.orders.insert(order_id, order);
36 self.order_ids.push(order_id);
37 }
38
39 pub fn pop(&self) -> Option<Arc<OrderType<()>>> {
41 loop {
42 if let Some(order_id) = self.order_ids.pop() {
43 if let Some((_, order)) = self.orders.remove(&order_id) {
46 return Some(order);
47 }
48 } else {
49 return None; }
51 }
52 }
53
54 pub fn find(&self, order_id: OrderId) -> Option<Arc<OrderType<()>>> {
56 self.orders.get(&order_id).map(|o| o.value().clone())
57 }
58
59 pub fn remove(&self, order_id: OrderId) -> Option<Arc<OrderType<()>>> {
62 self.orders.remove(&order_id).map(|(_, order)| order)
63 }
64
65 pub fn to_vec(&self) -> Vec<Arc<OrderType<()>>> {
67 let mut orders: Vec<Arc<OrderType<()>>> =
68 self.orders.iter().map(|o| o.value().clone()).collect();
69 orders.sort_by_key(|o| o.timestamp());
70 orders
71 }
72
73 #[allow(dead_code)]
89 pub fn from_vec(orders: Vec<Arc<OrderType<()>>>) -> Self {
90 let queue = OrderQueue::new();
91 for order in orders {
92 queue.push(order);
93 }
94 queue
95 }
96
97 #[allow(dead_code)]
99 pub fn is_empty(&self) -> bool {
100 self.orders.is_empty()
101 }
102
103 pub fn len(&self) -> usize {
110 self.orders.len()
111 }
112}
113
114impl Default for OrderQueue {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119impl Serialize for OrderQueue {
121 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
122 where
123 S: Serializer,
124 {
125 let mut seq = serializer.serialize_seq(Some(self.len()))?;
126 for order_entry in self.orders.iter() {
127 seq.serialize_element(order_entry.value().as_ref())?;
128 }
129 seq.end()
130 }
131}
132
133impl FromStr for OrderQueue {
134 type Err = PriceLevelError;
135 fn from_str(s: &str) -> Result<Self, Self::Err> {
136 if !s.starts_with("OrderQueue:orders=[") || !s.ends_with(']') {
137 return Err(PriceLevelError::ParseError {
138 message: "Invalid format".to_string(),
139 });
140 }
141
142 let content = &s["OrderQueue:orders=[".len()..s.len() - 1];
143 let queue = OrderQueue::new();
144
145 if !content.is_empty() {
146 for order_str in content.split(',') {
147 let order =
148 OrderType::from_str(order_str).map_err(|e| PriceLevelError::ParseError {
149 message: format!("Order parse error: {e}"),
150 })?;
151 queue.push(Arc::new(order));
152 }
153 }
154
155 Ok(queue)
156 }
157}
158
159impl Display for OrderQueue {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 let orders_str: Vec<String> = self.to_vec().iter().map(|o| o.to_string()).collect();
162 write!(f, "OrderQueue:orders=[{}]", orders_str.join(","))
163 }
164}
165
166impl From<Vec<Arc<OrderType<()>>>> for OrderQueue {
167 fn from(orders: Vec<Arc<OrderType<()>>>) -> Self {
168 let queue = OrderQueue::new();
169 for order in orders {
170 queue.push(order);
171 }
172 queue
173 }
174}
175
176struct OrderQueueVisitor {
178 marker: PhantomData<fn() -> OrderQueue>,
179}
180
181impl OrderQueueVisitor {
182 fn new() -> Self {
183 OrderQueueVisitor {
184 marker: PhantomData,
185 }
186 }
187}
188
189impl<'de> Visitor<'de> for OrderQueueVisitor {
190 type Value = OrderQueue;
191
192 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
193 formatter.write_str("a sequence of orders")
194 }
195
196 fn visit_seq<V>(self, mut seq: V) -> Result<OrderQueue, V::Error>
197 where
198 V: SeqAccess<'de>,
199 {
200 let queue = OrderQueue::new();
201
202 while let Some(order) = seq.next_element::<OrderType<()>>()? {
204 queue.push(Arc::new(order));
205 }
206
207 Ok(queue)
208 }
209}
210
211impl<'de> Deserialize<'de> for OrderQueue {
213 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
214 where
215 D: Deserializer<'de>,
216 {
217 deserializer.deserialize_seq(OrderQueueVisitor::new())
219
220 }
228}