timeboost_rs/
lib.rs

1//! timeboost-rs implements the time boost protocol for ordering blockchain transactions as specified in the paper
2//! titled "Buying Time: Latency Racing vs. Bidding for Transaction Ordering" published at https://arxiv.org/pdf/2306.02179.pdf.
3//! Instead of individually boosting transactions, this implementation of the protocol
4//! operates in fixed rounds of length G milliseconds, where G is the parameter defined in the paper.
5use std::cmp::{Eq, Ord, Ordering, PartialOrd};
6use std::collections::BinaryHeap;
7use std::sync::Mutex;
8use std::time::Duration;
9
10use chrono::{NaiveDateTime, Utc};
11use crossbeam_channel::{bounded, select, Receiver, Sender};
12use lazy_static::lazy_static;
13use prometheus::register_int_counter;
14use prometheus::{self, IntCounter};
15use tokio::sync::broadcast;
16use tracing::error;
17
18lazy_static! {
19    /// The number of elapsed time boost rounds, which occur in intervals of G milliseconds,
20    /// exposed as prometheus int counter.
21    pub static ref TIME_BOOST_ROUNDS_TOTAL: IntCounter = register_int_counter!(
22        "timeboost_rounds_total",
23        "Number of time boost rounds elapsed"
24    )
25    .unwrap();
26}
27
28/// A default max boost factor, set to 500ms after empirical evaluations for Ethereum Layer 2s from
29/// the time boost paper. Can be adjusted using the `g_factor` method when building
30/// a TimeBoostService struct.
31pub const DEFAULT_MAX_BOOST_FACTOR: u64 = 500;
32
33/// The default capacity for the transaction input channel used by [`TimeBoostService`] to receive txs
34/// from outside sources. Can be adjusted using the `input_feed_buffer_capacity` method when building
35/// a TimeBoostService struct.
36pub const DEFAULT_INPUT_FEED_BUFFER_CAP: usize = 1000;
37
38/// The TimeBoostService struct is a long-running service that will receive transactions from an input channel,
39/// push them to a priority queue where they are sorted by max bid, and then releases them at
40/// discrete time intervals defined by a parameter G (in milliseconds).
41///
42/// At the end of each round of "G" milliseconds, the service will release all the transactions
43/// that were in the priority queue and start the next round. The timestamps of transactions in the output
44/// feed are the timestamp at the time of release from the priority queue.
45///
46/// We recommend running the TimeBoostService in a dedicated thread, and handles can be acquired from it to send
47/// transactions for it to enqueue and process. Here's a setup example:
48///
49/// ```
50/// use timeboost_rs::{TimeBoostService, BoostableTx};
51/// use tokio::sync::broadcast;
52///
53/// #[tokio::main]
54/// async fn main() {
55///     let (tx_output_feed, mut rx) = broadcast::channel(100);
56///     let mut service = TimeBoostService::new(tx_output_feed);
57///
58///     // Obtain a channel handle to send txs to the TimeBoostService.
59///     let sender = service.sender();
60///
61///     // Spawn a dedicated thread for the time boost service.
62///     std::thread::spawn(move || service.run());
63///
64///     let mut txs = vec![
65///         BoostableTx::new(0 /* id */, 1 /* bid */, 100 /* unix timestamp millis */),
66///         BoostableTx::new(1 /* id */, 100 /* bid */, 101 /* unix timestamp millis */),
67///     ];
68///
69///     for tx in txs.iter() {
70///         sender.send(tx.clone()).unwrap();
71///     }
72///
73///     let mut got_txs = vec![];
74///     for _ in 0..2 {
75///         let tx = rx.recv().await.unwrap();
76///         got_txs.push(tx);
77///     }
78///
79///     // Assert we received 2 txs from the output feed.
80///     assert_eq!(txs.len(), 2);
81///
82///     // Assert the output is the same as the reversed input, as
83///     // the highest bid txs will be released first.
84///     txs.reverse();
85///     let want = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
86///     let got = got_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
87///     assert_eq!(want, got);
88/// }
89/// ```
90pub struct TimeBoostService {
91    g_factor: u64,
92    tx_sender: Sender<BoostableTx>,
93    txs_recv: Receiver<BoostableTx>,
94    tx_heap: Mutex<BinaryHeap<BoostableTx>>,
95    output_feed: broadcast::Sender<BoostableTx>,
96}
97
98impl TimeBoostService {
99    /// Takes in an output feed for broadcasting txs released by the TimeBoostService.
100    pub fn new(output_feed: broadcast::Sender<BoostableTx>) -> Self {
101        let (tx_sender, txs_recv) = bounded(DEFAULT_INPUT_FEED_BUFFER_CAP);
102        TimeBoostService {
103            g_factor: DEFAULT_MAX_BOOST_FACTOR,
104            tx_sender,
105            txs_recv,
106            tx_heap: Mutex::new(BinaryHeap::new()),
107            output_feed,
108        }
109    }
110    /// Customize the buffer capacity of the input channel for the time boost service to receive transactions.
111    /// [`TimeBoostService`] is listening for newly received txs in a select statement via this feed.
112    /// Adjust this parameter to an estimated max throughput of txs that is satisfactory every G milliseconds.
113    /// It is set to a default of DEFAULT_INPUT_FEED_BUFFER_CAP if not set.
114    #[allow(dead_code)]
115    fn input_feed_buffer_capacity(mut self, buffer_size: usize) -> Self {
116        let (tx_sender, txs_recv) = bounded(buffer_size);
117        self.tx_sender = tx_sender;
118        self.txs_recv = txs_recv;
119        self
120    }
121    /// Customize the max boost factor, known as G in the time boost specification paper. It is set to a default of
122    /// DEFAULT_MAX_BOOST_FACTOR milliseconds if not set.
123    #[allow(dead_code)]
124    fn g_factor(mut self, g_factor: u64) -> Self {
125        self.g_factor = g_factor;
126        self
127    }
128    // Entities wishing to send boostable txs to the timeboost service can acquire
129    // a handle to the sender channel via this method.
130    pub fn sender(&self) -> Sender<BoostableTx> {
131        self.tx_sender.clone()
132    }
133    /// Runs the loop of the timeboost service, which will collect received txs from an input
134    /// channel into a priority queue that sorts them by max bid. At intervals of G milliseconds, the service will
135    /// release all the txs in the priority queue into a broadcast channel.
136    pub fn run(&mut self) {
137        'next: loop {
138            select! {
139                // Transactions received from an input channel are pushed into
140                // a priority queue by max bid where ties are broken by timestamp.
141                recv(self.txs_recv) -> tx => {
142                    let mut heap = self.tx_heap.lock().unwrap();
143                    match tx {
144                        Ok(tx) => heap.push(tx),
145                        Err(e) => error!("TimeBoostService got receive error from tx input channel: {}", e),
146                    }
147                },
148                default(Duration::from_millis(self.g_factor)) => {
149                    // We release all the txs in the priority queue into the output sequence
150                    // until the queue is empty and then we can restart the timer once again.
151                    let mut heap = self.tx_heap.lock().unwrap();
152                    while let Some(tx) = heap.pop() {
153                        let timestamp = Utc::now().naive_utc();
154                        let output_tx = BoostableTx {
155                            id: tx.id,
156                            bid: tx.bid,
157                            // The output sequence must have monotonically increasing timestamps,
158                            // so if we had a reordering by bid, we preserve this property by outputting
159                            // a transaction with a new timestamp representing the time it is emitted
160                            // into the output feed.
161                            timestamp,
162                        };
163                        if let Err(e) = self.output_feed.send(output_tx) {
164                            error!(
165                                "TimeBoostService got send error when broadcasting tx into output sequence: {}",
166                                e,
167                            );
168                        }
169                    }
170                    TIME_BOOST_ROUNDS_TOTAL.inc();
171                    continue 'next;
172                }
173            }
174        }
175    }
176}
177
178/// A BoostableTx represents three important values: a unique id, a bid, and a timestamp.
179/// Bid and timestamp values are used when performing the time boost protocol by the [`TimeBoostService`]
180/// at intervals of G milliseconds.
181#[derive(Debug, Clone, Eq)]
182pub struct BoostableTx {
183    pub id: u64,
184    pub bid: u64,
185    pub timestamp: NaiveDateTime,
186}
187
188impl BoostableTx {
189    pub fn new(id: u64, bid: u64, timestamp_millis: u64) -> Self {
190        Self {
191            id,
192            bid,
193            // TODO: Better handling of fallible conversion.
194            timestamp: NaiveDateTime::from_timestamp_millis(timestamp_millis as i64).unwrap(),
195        }
196    }
197}
198
199/// We consider a boostable tx equal if all its fields are equal.
200impl PartialEq for BoostableTx {
201    fn eq(&self, other: &Self) -> bool {
202        self.id == other.id && self.bid == other.bid && self.timestamp == other.timestamp
203    }
204}
205
206/// BoostableTx are comparable by bid and ties are broken by timestamp.
207impl PartialOrd for BoostableTx {
208    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209        match self.bid.cmp(&other.bid) {
210            Ordering::Equal => {
211                // A tx is better if its timestamp is earlier than another tx.
212                match self.timestamp.partial_cmp(&other.timestamp) {
213                    Some(Ordering::Less) => Some(Ordering::Greater),
214                    Some(Ordering::Equal) => Some(Ordering::Equal),
215                    Some(Ordering::Greater) => Some(Ordering::Less),
216                    _ => unreachable!(),
217                }
218            }
219            Ordering::Greater => Some(Ordering::Greater),
220            Ordering::Less => Some(Ordering::Less),
221        }
222    }
223}
224
225impl Ord for BoostableTx {
226    fn cmp(&self, other: &Self) -> Ordering {
227        self.partial_cmp(other).unwrap()
228    }
229
230    fn max(self, other: Self) -> Self {
231        if self > other {
232            self
233        } else {
234            other
235        }
236    }
237    fn min(self, other: Self) -> Self {
238        if self < other {
239            self
240        } else {
241            other
242        }
243    }
244    fn clamp(self, min: Self, max: Self) -> Self {
245        if self < min {
246            min
247        } else if self > max {
248            max
249        } else {
250            self
251        }
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    macro_rules! bid {
260        ($id:expr, $bid:expr, $millis:expr) => {
261            BoostableTx::new($id, $bid, $millis)
262        };
263    }
264
265    #[tokio::test]
266    async fn normalization_no_bid_no_boost() {
267        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
268        let mut service = TimeBoostService::new(tx_feed);
269
270        // Obtain a channel handle to send txs to the TimeBoostService.
271        let sender = service.sender();
272
273        // Spawn a dedicated thread for the time boost service.
274        std::thread::spawn(move || service.run());
275
276        // Prepare a list of txs with 0 bid and monotonically increasing timestamp.
277        let original_txs = vec![
278            bid!(
279                0, /* ID */
280                0, /* bid */
281                1  /* unix timestamp millis */
282            ),
283            bid!(1, 0, 2),
284            bid!(2, 0, 3),
285            bid!(3, 0, 4),
286        ];
287        for tx in original_txs.iter() {
288            sender.send(tx.clone()).unwrap();
289        }
290
291        let mut txs = vec![];
292        for _ in 0..4 {
293            let tx = timeboost_output_feed.recv().await.unwrap();
294            txs.push(tx);
295        }
296
297        // Assert we received 4 txs from the output feed.
298        assert_eq!(txs.len(), 4);
299
300        // Assert the output is the same as the input input, as transactions had no bids present
301        // to create any reordering in the output sequence.
302        let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
303        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
304        assert_eq!(want, got);
305    }
306
307    #[tokio::test]
308    async fn tx_arrived_until_next_boost_round_with_bid_no_advantage() {
309        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
310        let mut service = TimeBoostService::new(tx_feed);
311
312        // Obtain a channel handle to send txs to the TimeBoostService.
313        let sender = service.sender();
314
315        // Spawn a dedicated thread for the time boost service.
316        std::thread::spawn(move || service.run());
317
318        // Prepare a list of txs with 0 bid and monotonically increasing timestamp.
319        let mut original_txs = vec![
320            bid!(
321                0, /* ID */
322                0, /* bid */
323                1  /* unix timestamp millis */
324            ),
325            bid!(1, 0, 2),
326            bid!(2, 0, 3),
327            bid!(3, 0, 4),
328        ];
329        for tx in original_txs.iter() {
330            sender.send(tx.clone()).unwrap();
331        }
332
333        let late_tx = bid!(4, 100 /* large bid */, 4 + DEFAULT_MAX_BOOST_FACTOR);
334        original_txs.push(late_tx.clone());
335
336        // Wait a boost round and then send the tx.
337        tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
338
339        sender.send(late_tx).unwrap();
340
341        let mut txs = vec![];
342        for _ in 0..5 {
343            let tx = timeboost_output_feed.recv().await.unwrap();
344            txs.push(tx);
345        }
346
347        // Assert we received 5 txs from the output feed.
348        assert_eq!(txs.len(), 5);
349
350        // Assert the output is the same as the input input, as the late tx cannot gain an advantage
351        // even with a high bid because it did not arrive until the second boost round.
352        // to create any reordering in the output sequence.
353        let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
354        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
355        assert_eq!(want, got);
356    }
357
358    #[tokio::test]
359    async fn three_boost_rounds() {
360        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
361        let mut service = TimeBoostService::new(tx_feed);
362
363        // Obtain a channel handle to send txs to the TimeBoostService.
364        let sender = service.sender();
365
366        // Spawn a dedicated thread for the time boost service.
367        std::thread::spawn(move || service.run());
368
369        // Prepare two txs for each round of time boost, with one having a larger bid.
370        // We want to check that they get sorted within their respective rounds by bid, but no
371        // tx can make it over into previous rounds due to their bid.
372        let round1_txs = vec![
373            bid!(
374                0, /* ID */
375                0, /* bid */
376                1  /* unix timestamp millis */
377            ),
378            bid!(1, 50, 2),
379        ];
380        let round2_txs = vec![
381            bid!(
382                2, /* ID */
383                0, /* bid */
384                3  /* unix timestamp millis */
385            ),
386            bid!(3, 100, 4),
387        ];
388        let round3_txs = vec![
389            bid!(
390                4, /* ID */
391                0, /* bid */
392                5  /* unix timestamp millis */
393            ),
394            bid!(5, 200, 6),
395        ];
396        for tx in round1_txs.iter() {
397            sender.send(tx.clone()).unwrap();
398        }
399        // Wait > boost round and then send the next round of txs.
400        tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
401
402        for tx in round2_txs.iter() {
403            sender.send(tx.clone()).unwrap();
404        }
405        // Wait > boost round and then send the tx.
406        tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
407
408        for tx in round3_txs.iter() {
409            sender.send(tx.clone()).unwrap();
410        }
411
412        let mut txs = vec![];
413        for _ in 0..6 {
414            let tx = timeboost_output_feed.recv().await.unwrap();
415            txs.push(tx);
416        }
417        dbg!(&txs);
418
419        // Assert we received 6 txs from the output feed.
420        assert_eq!(txs.len(), 6);
421
422        // Assert the output is the same as the input input, as the late tx cannot gain an advantage
423        // even with a high bid because it did not arrive until the second boost round.
424        // to create any reordering in the output sequence.
425        let want = vec![1, 0, 3, 2, 5, 4];
426        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
427        assert_eq!(want, got);
428    }
429
430    #[tokio::test]
431    async fn all_equal_bids_tiebreak_by_arrival_timestamp() {
432        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
433        let mut service = TimeBoostService::new(tx_feed);
434
435        // Obtain a channel handle to send txs to the TimeBoostService.
436        let sender = service.sender();
437
438        // Spawn a dedicated thread for the time boost service.
439        std::thread::spawn(move || service.run());
440
441        // Prepare a list of time boostable txs with all bids equal.
442        let original_txs = vec![
443            bid!(
444                0,   /* ID */
445                100, /* bid */
446                0    /* unix timestamp millis */
447            ),
448            bid!(1, 100, 3),
449            bid!(2, 100, 2), // The two below have the same bid, and we expect tiebreaks by timestamp if this is the case.
450            bid!(3, 100, 1),
451            bid!(4, 100, 6), // The two below have the same bid.
452            bid!(5, 100, 5),
453            bid!(6, 100, 4), // Highest bid, will come first in the output.
454        ];
455        for tx in original_txs.iter() {
456            sender.send(tx.clone()).unwrap();
457        }
458
459        let mut txs = vec![];
460        for _ in 0..7 {
461            let tx = timeboost_output_feed.recv().await.unwrap();
462            txs.push(tx);
463        }
464
465        // Assert we received 7 txs from the output feed.
466        assert_eq!(txs.len(), 7);
467
468        // Expect txs to be sorted by arrival timestamp as all bids were equal.
469        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
470        let want: Vec<u64> = vec![0, 3, 2, 1, 6, 5, 4];
471        assert_eq!(want, got);
472    }
473
474    #[tokio::test]
475    async fn some_equal_bids_tiebreak_by_timestamp() {
476        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
477        let mut service = TimeBoostService::new(tx_feed);
478
479        // Obtain a channel handle to send txs to the TimeBoostService.
480        let sender = service.sender();
481
482        // Spawn a dedicated thread for the time boost service.
483        std::thread::spawn(move || service.run());
484
485        // Prepare a list of time boostable txs with some bids equal.
486        let original_txs = vec![
487            bid!(
488                0, /* ID */
489                1, /* bid */
490                0  /* unix timestamp millis */
491            ),
492            bid!(1, 2, 1),
493            bid!(2, 3, 2), // The two below have the same bid, and we expect tiebreaks by timestamp if this is the case.
494            bid!(3, 3, 3),
495            bid!(4, 5, 4), // The two below have the same bid.
496            bid!(5, 5, 5),
497            bid!(6, 7, 6), // Highest bid, will come first in the output.
498        ];
499        for tx in original_txs.iter() {
500            sender.send(tx.clone()).unwrap();
501        }
502
503        let mut txs = vec![];
504        for _ in 0..7 {
505            let tx = timeboost_output_feed.recv().await.unwrap();
506            txs.push(tx);
507        }
508
509        // Assert we received 7 txs from the output feed.
510        assert_eq!(txs.len(), 7);
511
512        // Assert the output is the same as the reversed input, as
513        // the highest bid txs will be released first.
514        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
515        let want: Vec<u64> = vec![6, 4, 5, 2, 3, 1, 0];
516        assert_eq!(want, got);
517    }
518
519    #[tokio::test]
520    async fn timeboost_same_interval_sort_by_bid() {
521        let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
522        let mut service = TimeBoostService::new(tx_feed);
523
524        // Obtain a channel handle to send txs to the TimeBoostService.
525        let sender = service.sender();
526
527        // Spawn a dedicated thread for the time boost service.
528        std::thread::spawn(move || service.run());
529
530        // Prepare a list of time boostable txs with bids and timestamps
531        let mut original_txs = vec![
532            bid!(
533                0, /* ID */
534                1, /* bid */
535                0  /* unix timestamp millis */
536            ),
537            bid!(1, 2, 1),
538            bid!(2, 3, 2),
539            bid!(3, 4, 3),
540            bid!(4, 5, 4),
541            bid!(5, 6, 5),
542            bid!(6, 7, 6),
543        ];
544        for tx in original_txs.iter() {
545            sender.send(tx.clone()).unwrap();
546        }
547
548        let mut txs = vec![];
549        for _ in 0..7 {
550            let tx = timeboost_output_feed.recv().await.unwrap();
551            txs.push(tx);
552        }
553
554        // Assert we received 7 txs from the output feed.
555        assert_eq!(txs.len(), 7);
556
557        // Assert the output is the same as the reversed input, as
558        // the highest bid txs will be released first.
559        original_txs.reverse();
560        let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
561        let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
562        assert_eq!(want, got);
563    }
564}