use std::cmp::{Eq, Ord, Ordering, PartialOrd};
use std::collections::BinaryHeap;
use std::sync::Mutex;
use std::time::Duration;
use chrono::{NaiveDateTime, Utc};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use lazy_static::lazy_static;
use prometheus::register_int_counter;
use prometheus::{self, IntCounter};
use tokio::sync::broadcast;
use tracing::error;
lazy_static! {
pub static ref TIME_BOOST_ROUNDS_TOTAL: IntCounter = register_int_counter!(
"timeboost_rounds_total",
"Number of time boost rounds elapsed"
)
.unwrap();
}
pub const DEFAULT_MAX_BOOST_FACTOR: u64 = 500;
pub const DEFAULT_INPUT_FEED_BUFFER_CAP: usize = 1000;
pub struct TimeBoostService {
g_factor: u64,
tx_sender: Sender<BoostableTx>,
txs_recv: Receiver<BoostableTx>,
tx_heap: Mutex<BinaryHeap<BoostableTx>>,
output_feed: broadcast::Sender<BoostableTx>,
}
impl TimeBoostService {
pub fn new(output_feed: broadcast::Sender<BoostableTx>) -> Self {
let (tx_sender, txs_recv) = bounded(DEFAULT_INPUT_FEED_BUFFER_CAP);
TimeBoostService {
g_factor: DEFAULT_MAX_BOOST_FACTOR,
tx_sender,
txs_recv,
tx_heap: Mutex::new(BinaryHeap::new()),
output_feed,
}
}
#[allow(dead_code)]
fn input_feed_buffer_capacity(mut self, buffer_size: usize) -> Self {
let (tx_sender, txs_recv) = bounded(buffer_size);
self.tx_sender = tx_sender;
self.txs_recv = txs_recv;
self
}
#[allow(dead_code)]
fn g_factor(mut self, g_factor: u64) -> Self {
self.g_factor = g_factor;
self
}
pub fn sender(&self) -> Sender<BoostableTx> {
self.tx_sender.clone()
}
pub fn run(&mut self) {
'next: loop {
select! {
recv(self.txs_recv) -> tx => {
let mut heap = self.tx_heap.lock().unwrap();
match tx {
Ok(tx) => heap.push(tx),
Err(e) => error!("TimeBoostService got receive error from tx input channel: {}", e),
}
},
default(Duration::from_millis(self.g_factor)) => {
let mut heap = self.tx_heap.lock().unwrap();
while let Some(tx) = heap.pop() {
let timestamp = Utc::now().naive_utc();
let output_tx = BoostableTx {
id: tx.id,
bid: tx.bid,
timestamp,
};
if let Err(e) = self.output_feed.send(output_tx) {
error!(
"TimeBoostService got send error when broadcasting tx into output sequence: {}",
e,
);
}
}
TIME_BOOST_ROUNDS_TOTAL.inc();
continue 'next;
}
}
}
}
}
#[derive(Debug, Clone, Eq)]
pub struct BoostableTx {
pub id: u64,
pub bid: u64,
pub timestamp: NaiveDateTime,
}
impl BoostableTx {
pub fn new(id: u64, bid: u64, timestamp_millis: u64) -> Self {
Self {
id,
bid,
timestamp: NaiveDateTime::from_timestamp_millis(timestamp_millis as i64).unwrap(),
}
}
}
impl PartialEq for BoostableTx {
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.bid == other.bid && self.timestamp == other.timestamp
}
}
impl PartialOrd for BoostableTx {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.bid.cmp(&other.bid) {
Ordering::Equal => {
match self.timestamp.partial_cmp(&other.timestamp) {
Some(Ordering::Less) => Some(Ordering::Greater),
Some(Ordering::Equal) => Some(Ordering::Equal),
Some(Ordering::Greater) => Some(Ordering::Less),
_ => unreachable!(),
}
}
Ordering::Greater => Some(Ordering::Greater),
Ordering::Less => Some(Ordering::Less),
}
}
}
impl Ord for BoostableTx {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
fn max(self, other: Self) -> Self {
if self > other {
self
} else {
other
}
}
fn min(self, other: Self) -> Self {
if self < other {
self
} else {
other
}
}
fn clamp(self, min: Self, max: Self) -> Self {
if self < min {
min
} else if self > max {
max
} else {
self
}
}
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! bid {
($id:expr, $bid:expr, $millis:expr) => {
BoostableTx::new($id, $bid, $millis)
};
}
#[tokio::test]
async fn normalization_no_bid_no_boost() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let original_txs = vec![
bid!(
0,
0,
1
),
bid!(1, 0, 2),
bid!(2, 0, 3),
bid!(3, 0, 4),
];
for tx in original_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let mut txs = vec![];
for _ in 0..4 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
assert_eq!(txs.len(), 4);
let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
assert_eq!(want, got);
}
#[tokio::test]
async fn tx_arrived_until_next_boost_round_with_bid_no_advantage() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let mut original_txs = vec![
bid!(
0,
0,
1
),
bid!(1, 0, 2),
bid!(2, 0, 3),
bid!(3, 0, 4),
];
for tx in original_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let late_tx = bid!(4, 100 , 4 + DEFAULT_MAX_BOOST_FACTOR);
original_txs.push(late_tx.clone());
tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
sender.send(late_tx).unwrap();
let mut txs = vec![];
for _ in 0..5 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
assert_eq!(txs.len(), 5);
let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
assert_eq!(want, got);
}
#[tokio::test]
async fn three_boost_rounds() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let round1_txs = vec![
bid!(
0,
0,
1
),
bid!(1, 50, 2),
];
let round2_txs = vec![
bid!(
2,
0,
3
),
bid!(3, 100, 4),
];
let round3_txs = vec![
bid!(
4,
0,
5
),
bid!(5, 200, 6),
];
for tx in round1_txs.iter() {
sender.send(tx.clone()).unwrap();
}
tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
for tx in round2_txs.iter() {
sender.send(tx.clone()).unwrap();
}
tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
for tx in round3_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let mut txs = vec![];
for _ in 0..6 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
dbg!(&txs);
assert_eq!(txs.len(), 6);
let want = vec![1, 0, 3, 2, 5, 4];
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
assert_eq!(want, got);
}
#[tokio::test]
async fn all_equal_bids_tiebreak_by_arrival_timestamp() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let original_txs = vec![
bid!(
0,
100,
0
),
bid!(1, 100, 3),
bid!(2, 100, 2), bid!(3, 100, 1),
bid!(4, 100, 6), bid!(5, 100, 5),
bid!(6, 100, 4), ];
for tx in original_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let mut txs = vec![];
for _ in 0..7 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
assert_eq!(txs.len(), 7);
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
let want: Vec<u64> = vec![0, 3, 2, 1, 6, 5, 4];
assert_eq!(want, got);
}
#[tokio::test]
async fn some_equal_bids_tiebreak_by_timestamp() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let original_txs = vec![
bid!(
0,
1,
0
),
bid!(1, 2, 1),
bid!(2, 3, 2), bid!(3, 3, 3),
bid!(4, 5, 4), bid!(5, 5, 5),
bid!(6, 7, 6), ];
for tx in original_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let mut txs = vec![];
for _ in 0..7 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
assert_eq!(txs.len(), 7);
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
let want: Vec<u64> = vec![6, 4, 5, 2, 3, 1, 0];
assert_eq!(want, got);
}
#[tokio::test]
async fn timeboost_same_interval_sort_by_bid() {
let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
let mut service = TimeBoostService::new(tx_feed);
let sender = service.sender();
std::thread::spawn(move || service.run());
let mut original_txs = vec![
bid!(
0,
1,
0
),
bid!(1, 2, 1),
bid!(2, 3, 2),
bid!(3, 4, 3),
bid!(4, 5, 4),
bid!(5, 6, 5),
bid!(6, 7, 6),
];
for tx in original_txs.iter() {
sender.send(tx.clone()).unwrap();
}
let mut txs = vec![];
for _ in 0..7 {
let tx = timeboost_output_feed.recv().await.unwrap();
txs.push(tx);
}
assert_eq!(txs.len(), 7);
original_txs.reverse();
let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
assert_eq!(want, got);
}
}