use crate::errors::PriceLevelError;
use crate::orders::{Id, OrderType};
use crossbeam_skiplist::SkipMap;
use dashmap::DashMap;
use dashmap::mapref::entry::Entry;
use serde::de::{SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashSet;
use std::fmt;
use std::fmt::Display;
use std::marker::PhantomData;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub struct OrderQueue {
orders: DashMap<Id, (u64, Arc<OrderType<()>>)>,
index: SkipMap<u64, Id>,
next_seq: AtomicU64,
}
#[derive(Debug)]
pub(crate) enum FrontAction {
Remove,
KeepInPlace(Arc<OrderType<()>>),
ReplaceAtTail(Arc<OrderType<()>>),
SetAside,
}
#[derive(Debug)]
pub(crate) enum FrontOutcome<R> {
Matched { result: R },
Empty,
}
impl OrderQueue {
#[must_use]
pub fn new() -> Self {
Self {
orders: DashMap::new(),
index: SkipMap::new(),
next_seq: AtomicU64::new(0),
}
}
pub fn push(&self, order: Arc<OrderType<()>>) {
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let order_id = order.id();
self.orders.insert(order_id, (seq, order));
self.index.insert(seq, order_id);
}
#[must_use]
pub(crate) fn pop_entry(&self) -> Option<(u64, Arc<OrderType<()>>)> {
loop {
let entry = self.index.pop_front()?;
let order_id = *entry.value();
if let Some((_, (seq, order))) = self.orders.remove(&order_id) {
return Some((seq, order));
}
}
}
#[must_use]
pub fn pop(&self) -> Option<Arc<OrderType<()>>> {
self.pop_entry().map(|(_, order)| order)
}
pub(crate) fn match_front<F, R>(
&self,
set_aside: &mut HashSet<u64>,
decide: F,
) -> FrontOutcome<R>
where
F: FnOnce(u64, &OrderType<()>) -> (FrontAction, R),
{
loop {
let Some((seq, order_id)) = self
.index
.iter()
.find(|e| !set_aside.contains(e.key()))
.map(|e| (*e.key(), *e.value()))
else {
return FrontOutcome::Empty;
};
match self.orders.entry(order_id) {
Entry::Vacant(_) => {
self.index.remove(&seq);
continue;
}
Entry::Occupied(mut occupied) => {
let (action, result) = decide(seq, occupied.get().1.as_ref());
match &action {
FrontAction::Remove => {
let _ = occupied.remove();
self.index.remove(&seq);
}
FrontAction::KeepInPlace(residual) => {
let slot = occupied.get_mut();
slot.1 = residual.clone();
}
FrontAction::ReplaceAtTail(refreshed) => {
let new_seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
{
let slot = occupied.get_mut();
slot.0 = new_seq;
slot.1 = refreshed.clone();
}
self.index.remove(&seq);
self.index.insert(new_seq, order_id);
}
FrontAction::SetAside => {
set_aside.insert(seq);
}
}
return FrontOutcome::Matched { result };
}
}
}
}
#[cfg(test)]
pub(crate) fn reinsert(&self, seq: u64, order: Arc<OrderType<()>>) {
let order_id = order.id();
self.orders.insert(order_id, (seq, order));
self.index.insert(seq, order_id);
}
#[must_use]
#[inline]
pub fn find(&self, order_id: Id) -> Option<Arc<OrderType<()>>> {
self.orders.get(&order_id).map(|o| o.value().1.clone())
}
#[must_use]
pub(crate) fn update_in_place(
&self,
order_id: Id,
new_order: Arc<OrderType<()>>,
) -> Option<Arc<OrderType<()>>> {
debug_assert_eq!(
new_order.id(),
order_id,
"update_in_place: new_order id must match the key it is stored under"
);
let mut entry = self.orders.get_mut(&order_id)?;
let (_seq, slot) = entry.value_mut();
Some(std::mem::replace(slot, new_order))
}
#[must_use]
pub fn remove(&self, order_id: Id) -> Option<Arc<OrderType<()>>> {
let (_, (seq, order)) = self.orders.remove(&order_id)?;
self.index.remove(&seq);
Some(order)
}
pub fn iter_orders(&self) -> impl Iterator<Item = Arc<OrderType<()>>> + '_ {
self.orders.iter().map(|entry| entry.value().1.clone())
}
#[must_use]
pub fn snapshot_vec(&self) -> Vec<Arc<OrderType<()>>> {
let mut orders: Vec<(u64, Arc<OrderType<()>>)> =
self.orders.iter().map(|o| o.value().clone()).collect();
orders.sort_by_key(|(seq, o)| (o.timestamp(), *seq));
orders.into_iter().map(|(_, o)| o).collect()
}
#[must_use]
pub fn to_vec(&self) -> Vec<Arc<OrderType<()>>> {
self.snapshot_vec()
}
#[must_use]
pub(crate) fn snapshot_by_seq(&self) -> Vec<Arc<OrderType<()>>> {
self.index
.iter()
.filter_map(|index_entry| {
self.orders
.get(index_entry.value())
.map(|order_entry| order_entry.value().1.clone())
})
.collect()
}
#[allow(dead_code)]
#[must_use]
pub fn from_vec(orders: Vec<Arc<OrderType<()>>>) -> Self {
let queue = OrderQueue::new();
for order in orders {
queue.push(order);
}
queue
}
#[allow(dead_code)]
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.orders.is_empty()
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.orders.len()
}
}
impl Default for OrderQueue {
fn default() -> Self {
Self::new()
}
}
impl Serialize for OrderQueue {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let ordered = self.snapshot_by_seq();
let mut seq = serializer.serialize_seq(Some(ordered.len()))?;
for order in &ordered {
seq.serialize_element(order.as_ref())?;
}
seq.end()
}
}
impl FromStr for OrderQueue {
type Err = PriceLevelError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if !s.starts_with("OrderQueue:orders=[") || !s.ends_with(']') {
return Err(PriceLevelError::ParseError {
message: "Invalid format".to_string(),
});
}
let content = &s["OrderQueue:orders=[".len()..s.len() - 1];
let queue = OrderQueue::new();
if !content.is_empty() {
for order_str in content.split(',') {
let order =
OrderType::from_str(order_str).map_err(|e| PriceLevelError::ParseError {
message: format!("Order parse error: {e}"),
})?;
queue.push(Arc::new(order));
}
}
Ok(queue)
}
}
impl Display for OrderQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "OrderQueue:orders=[")?;
let mut first = true;
for order in self.snapshot_vec() {
if !first {
write!(f, ",")?;
}
write!(f, "{order}")?;
first = false;
}
write!(f, "]")
}
}
impl From<Vec<Arc<OrderType<()>>>> for OrderQueue {
fn from(orders: Vec<Arc<OrderType<()>>>) -> Self {
let queue = OrderQueue::new();
for order in orders {
queue.push(order);
}
queue
}
}
struct OrderQueueVisitor {
marker: PhantomData<fn() -> OrderQueue>,
}
impl OrderQueueVisitor {
fn new() -> Self {
OrderQueueVisitor {
marker: PhantomData,
}
}
}
impl<'de> Visitor<'de> for OrderQueueVisitor {
type Value = OrderQueue;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a sequence of orders")
}
fn visit_seq<V>(self, mut seq: V) -> Result<OrderQueue, V::Error>
where
V: SeqAccess<'de>,
{
let queue = OrderQueue::new();
while let Some(order) = seq.next_element::<OrderType<()>>()? {
queue.push(Arc::new(order));
}
Ok(queue)
}
}
impl<'de> Deserialize<'de> for OrderQueue {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
deserializer.deserialize_seq(OrderQueueVisitor::new())
}
}