pricelevel/price_level/
order_queue.rs

1use crate::errors::PriceLevelError;
2use crate::orders::{OrderId, OrderType};
3use crossbeam::queue::SegQueue;
4use dashmap::DashMap;
5use serde::de::{SeqAccess, Visitor};
6use serde::ser::SerializeSeq;
7use serde::{Deserialize, Deserializer, Serialize, Serializer};
8use std::fmt;
9use std::fmt::Display;
10use std::marker::PhantomData;
11use std::str::FromStr;
12use std::sync::Arc;
13
14/// A thread-safe queue of orders with specialized operations
15#[derive(Debug)]
16pub struct OrderQueue {
17    /// A map of order IDs to orders for quick lookups
18    orders: DashMap<OrderId, Arc<OrderType<()>>>,
19    /// A queue of order IDs to maintain FIFO order
20    order_ids: SegQueue<OrderId>,
21}
22
23impl OrderQueue {
24    /// Create a new empty order queue
25    pub fn new() -> Self {
26        Self {
27            orders: DashMap::new(),
28            order_ids: SegQueue::new(),
29        }
30    }
31
32    /// Add an order to the queue
33    pub fn push(&self, order: Arc<OrderType<()>>) {
34        let order_id = order.id();
35        self.orders.insert(order_id, order);
36        self.order_ids.push(order_id);
37    }
38
39    /// Attempt to pop an order from the queue
40    pub fn pop(&self) -> Option<Arc<OrderType<()>>> {
41        loop {
42            if let Some(order_id) = self.order_ids.pop() {
43                // If the order was removed, pop will return None, but the ID was in the queue.
44                // In this case, we loop and try to get the next one.
45                if let Some((_, order)) = self.orders.remove(&order_id) {
46                    return Some(order);
47                }
48            } else {
49                return None; // Queue is empty
50            }
51        }
52    }
53
54    /// Search for an order with the given ID. O(1) operation.
55    pub fn find(&self, order_id: OrderId) -> Option<Arc<OrderType<()>>> {
56        self.orders.get(&order_id).map(|o| o.value().clone())
57    }
58
59    /// Remove an order with the given ID
60    /// Returns the removed order if found. O(1) for the map, but the ID remains in the queue.
61    pub fn remove(&self, order_id: OrderId) -> Option<Arc<OrderType<()>>> {
62        self.orders.remove(&order_id).map(|(_, order)| order)
63    }
64
65    /// Convert the queue to a vector (for snapshots)
66    pub fn to_vec(&self) -> Vec<Arc<OrderType<()>>> {
67        let mut orders: Vec<Arc<OrderType<()>>> =
68            self.orders.iter().map(|o| o.value().clone()).collect();
69        orders.sort_by_key(|o| o.timestamp());
70        orders
71    }
72
73    /// Creates a new `OrderQueue` instance and populates it with orders from the provided vector.
74    ///
75    /// This function takes ownership of a vector of order references (wrapped in `Arc`) and constructs
76    /// a new `OrderQueue` by iteratively pushing each order into the queue. The resulting queue
77    /// maintains the insertion order of the original vector.
78    ///
79    /// # Parameters
80    ///
81    /// * `orders` - A vector of atomic reference counted (`Arc`) order instances representing
82    ///   the orders to be added to the new queue.
83    ///
84    /// # Returns
85    ///
86    /// A new `OrderQueue` instance containing all the orders from the input vector.
87    ///
88    #[allow(dead_code)]
89    pub fn from_vec(orders: Vec<Arc<OrderType<()>>>) -> Self {
90        let queue = OrderQueue::new();
91        for order in orders {
92            queue.push(order);
93        }
94        queue
95    }
96
97    /// Check if the queue is empty
98    #[allow(dead_code)]
99    pub fn is_empty(&self) -> bool {
100        self.orders.is_empty()
101    }
102
103    /// Returns the number of orders currently in the queue.
104    ///
105    /// # Returns
106    ///
107    /// * `usize` - The total count of orders in the queue.
108    ///
109    pub fn len(&self) -> usize {
110        self.orders.len()
111    }
112}
113
114impl Default for OrderQueue {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119// Implement serialization for OrderQueue
120impl Serialize for OrderQueue {
121    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
122    where
123        S: Serializer,
124    {
125        let mut seq = serializer.serialize_seq(Some(self.len()))?;
126        for order_entry in self.orders.iter() {
127            seq.serialize_element(order_entry.value().as_ref())?;
128        }
129        seq.end()
130    }
131}
132
133impl FromStr for OrderQueue {
134    type Err = PriceLevelError;
135    fn from_str(s: &str) -> Result<Self, Self::Err> {
136        if !s.starts_with("OrderQueue:orders=[") || !s.ends_with(']') {
137            return Err(PriceLevelError::ParseError {
138                message: "Invalid format".to_string(),
139            });
140        }
141
142        let content = &s["OrderQueue:orders=[".len()..s.len() - 1];
143        let queue = OrderQueue::new();
144
145        if !content.is_empty() {
146            for order_str in content.split(',') {
147                let order =
148                    OrderType::from_str(order_str).map_err(|e| PriceLevelError::ParseError {
149                        message: format!("Order parse error: {e}"),
150                    })?;
151                queue.push(Arc::new(order));
152            }
153        }
154
155        Ok(queue)
156    }
157}
158
159impl Display for OrderQueue {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        let orders_str: Vec<String> = self.to_vec().iter().map(|o| o.to_string()).collect();
162        write!(f, "OrderQueue:orders=[{}]", orders_str.join(","))
163    }
164}
165
166impl From<Vec<Arc<OrderType<()>>>> for OrderQueue {
167    fn from(orders: Vec<Arc<OrderType<()>>>) -> Self {
168        let queue = OrderQueue::new();
169        for order in orders {
170            queue.push(order);
171        }
172        queue
173    }
174}
175
176// Custom visitor for deserializing OrderQueue
177struct OrderQueueVisitor {
178    marker: PhantomData<fn() -> OrderQueue>,
179}
180
181impl OrderQueueVisitor {
182    fn new() -> Self {
183        OrderQueueVisitor {
184            marker: PhantomData,
185        }
186    }
187}
188
189impl<'de> Visitor<'de> for OrderQueueVisitor {
190    type Value = OrderQueue;
191
192    fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
193        formatter.write_str("a sequence of orders")
194    }
195
196    fn visit_seq<V>(self, mut seq: V) -> Result<OrderQueue, V::Error>
197    where
198        V: SeqAccess<'de>,
199    {
200        let queue = OrderQueue::new();
201
202        // Deserialize each order and add it to the queue
203        while let Some(order) = seq.next_element::<OrderType<()>>()? {
204            queue.push(Arc::new(order));
205        }
206
207        Ok(queue)
208    }
209}
210
211// Implement deserialization for OrderQueue
212impl<'de> Deserialize<'de> for OrderQueue {
213    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
214    where
215        D: Deserializer<'de>,
216    {
217        // Deserialize as a sequence of orders
218        deserializer.deserialize_seq(OrderQueueVisitor::new())
219
220        // Alternative approach: Deserialize as OrderQueueData first, then convert
221        // let data = OrderQueueData::deserialize(deserializer)?;
222        // let queue = OrderQueue::new();
223        // for order in data.orders {
224        //     queue.push(Arc::new(order));
225        // }
226        // Ok(queue)
227    }
228}