timeboost_rs/lib.rs
1//! timeboost-rs implements the time boost protocol for ordering blockchain transactions as specified in the paper
2//! titled "Buying Time: Latency Racing vs. Bidding for Transaction Ordering" published at https://arxiv.org/pdf/2306.02179.pdf.
3//! Instead of individually boosting transactions, this implementation of the protocol
4//! operates in fixed rounds of length G milliseconds, where G is the parameter defined in the paper.
5use std::cmp::{Eq, Ord, Ordering, PartialOrd};
6use std::collections::BinaryHeap;
7use std::sync::Mutex;
8use std::time::Duration;
9
10use chrono::{NaiveDateTime, Utc};
11use crossbeam_channel::{bounded, select, Receiver, Sender};
12use lazy_static::lazy_static;
13use prometheus::register_int_counter;
14use prometheus::{self, IntCounter};
15use tokio::sync::broadcast;
16use tracing::error;
17
18lazy_static! {
19 /// The number of elapsed time boost rounds, which occur in intervals of G milliseconds,
20 /// exposed as prometheus int counter.
21 pub static ref TIME_BOOST_ROUNDS_TOTAL: IntCounter = register_int_counter!(
22 "timeboost_rounds_total",
23 "Number of time boost rounds elapsed"
24 )
25 .unwrap();
26}
27
28/// A default max boost factor, set to 500ms after empirical evaluations for Ethereum Layer 2s from
29/// the time boost paper. Can be adjusted using the `g_factor` method when building
30/// a TimeBoostService struct.
31pub const DEFAULT_MAX_BOOST_FACTOR: u64 = 500;
32
33/// The default capacity for the transaction input channel used by [`TimeBoostService`] to receive txs
34/// from outside sources. Can be adjusted using the `input_feed_buffer_capacity` method when building
35/// a TimeBoostService struct.
36pub const DEFAULT_INPUT_FEED_BUFFER_CAP: usize = 1000;
37
38/// The TimeBoostService struct is a long-running service that will receive transactions from an input channel,
39/// push them to a priority queue where they are sorted by max bid, and then releases them at
40/// discrete time intervals defined by a parameter G (in milliseconds).
41///
42/// At the end of each round of "G" milliseconds, the service will release all the transactions
43/// that were in the priority queue and start the next round. The timestamps of transactions in the output
44/// feed are the timestamp at the time of release from the priority queue.
45///
46/// We recommend running the TimeBoostService in a dedicated thread, and handles can be acquired from it to send
47/// transactions for it to enqueue and process. Here's a setup example:
48///
49/// ```
50/// use timeboost_rs::{TimeBoostService, BoostableTx};
51/// use tokio::sync::broadcast;
52///
53/// #[tokio::main]
54/// async fn main() {
55/// let (tx_output_feed, mut rx) = broadcast::channel(100);
56/// let mut service = TimeBoostService::new(tx_output_feed);
57///
58/// // Obtain a channel handle to send txs to the TimeBoostService.
59/// let sender = service.sender();
60///
61/// // Spawn a dedicated thread for the time boost service.
62/// std::thread::spawn(move || service.run());
63///
64/// let mut txs = vec![
65/// BoostableTx::new(0 /* id */, 1 /* bid */, 100 /* unix timestamp millis */),
66/// BoostableTx::new(1 /* id */, 100 /* bid */, 101 /* unix timestamp millis */),
67/// ];
68///
69/// for tx in txs.iter() {
70/// sender.send(tx.clone()).unwrap();
71/// }
72///
73/// let mut got_txs = vec![];
74/// for _ in 0..2 {
75/// let tx = rx.recv().await.unwrap();
76/// got_txs.push(tx);
77/// }
78///
79/// // Assert we received 2 txs from the output feed.
80/// assert_eq!(txs.len(), 2);
81///
82/// // Assert the output is the same as the reversed input, as
83/// // the highest bid txs will be released first.
84/// txs.reverse();
85/// let want = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
86/// let got = got_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
87/// assert_eq!(want, got);
88/// }
89/// ```
90pub struct TimeBoostService {
91 g_factor: u64,
92 tx_sender: Sender<BoostableTx>,
93 txs_recv: Receiver<BoostableTx>,
94 tx_heap: Mutex<BinaryHeap<BoostableTx>>,
95 output_feed: broadcast::Sender<BoostableTx>,
96}
97
98impl TimeBoostService {
99 /// Takes in an output feed for broadcasting txs released by the TimeBoostService.
100 pub fn new(output_feed: broadcast::Sender<BoostableTx>) -> Self {
101 let (tx_sender, txs_recv) = bounded(DEFAULT_INPUT_FEED_BUFFER_CAP);
102 TimeBoostService {
103 g_factor: DEFAULT_MAX_BOOST_FACTOR,
104 tx_sender,
105 txs_recv,
106 tx_heap: Mutex::new(BinaryHeap::new()),
107 output_feed,
108 }
109 }
110 /// Customize the buffer capacity of the input channel for the time boost service to receive transactions.
111 /// [`TimeBoostService`] is listening for newly received txs in a select statement via this feed.
112 /// Adjust this parameter to an estimated max throughput of txs that is satisfactory every G milliseconds.
113 /// It is set to a default of DEFAULT_INPUT_FEED_BUFFER_CAP if not set.
114 #[allow(dead_code)]
115 fn input_feed_buffer_capacity(mut self, buffer_size: usize) -> Self {
116 let (tx_sender, txs_recv) = bounded(buffer_size);
117 self.tx_sender = tx_sender;
118 self.txs_recv = txs_recv;
119 self
120 }
121 /// Customize the max boost factor, known as G in the time boost specification paper. It is set to a default of
122 /// DEFAULT_MAX_BOOST_FACTOR milliseconds if not set.
123 #[allow(dead_code)]
124 fn g_factor(mut self, g_factor: u64) -> Self {
125 self.g_factor = g_factor;
126 self
127 }
128 // Entities wishing to send boostable txs to the timeboost service can acquire
129 // a handle to the sender channel via this method.
130 pub fn sender(&self) -> Sender<BoostableTx> {
131 self.tx_sender.clone()
132 }
133 /// Runs the loop of the timeboost service, which will collect received txs from an input
134 /// channel into a priority queue that sorts them by max bid. At intervals of G milliseconds, the service will
135 /// release all the txs in the priority queue into a broadcast channel.
136 pub fn run(&mut self) {
137 'next: loop {
138 select! {
139 // Transactions received from an input channel are pushed into
140 // a priority queue by max bid where ties are broken by timestamp.
141 recv(self.txs_recv) -> tx => {
142 let mut heap = self.tx_heap.lock().unwrap();
143 match tx {
144 Ok(tx) => heap.push(tx),
145 Err(e) => error!("TimeBoostService got receive error from tx input channel: {}", e),
146 }
147 },
148 default(Duration::from_millis(self.g_factor)) => {
149 // We release all the txs in the priority queue into the output sequence
150 // until the queue is empty and then we can restart the timer once again.
151 let mut heap = self.tx_heap.lock().unwrap();
152 while let Some(tx) = heap.pop() {
153 let timestamp = Utc::now().naive_utc();
154 let output_tx = BoostableTx {
155 id: tx.id,
156 bid: tx.bid,
157 // The output sequence must have monotonically increasing timestamps,
158 // so if we had a reordering by bid, we preserve this property by outputting
159 // a transaction with a new timestamp representing the time it is emitted
160 // into the output feed.
161 timestamp,
162 };
163 if let Err(e) = self.output_feed.send(output_tx) {
164 error!(
165 "TimeBoostService got send error when broadcasting tx into output sequence: {}",
166 e,
167 );
168 }
169 }
170 TIME_BOOST_ROUNDS_TOTAL.inc();
171 continue 'next;
172 }
173 }
174 }
175 }
176}
177
178/// A BoostableTx represents three important values: a unique id, a bid, and a timestamp.
179/// Bid and timestamp values are used when performing the time boost protocol by the [`TimeBoostService`]
180/// at intervals of G milliseconds.
181#[derive(Debug, Clone, Eq)]
182pub struct BoostableTx {
183 pub id: u64,
184 pub bid: u64,
185 pub timestamp: NaiveDateTime,
186}
187
188impl BoostableTx {
189 pub fn new(id: u64, bid: u64, timestamp_millis: u64) -> Self {
190 Self {
191 id,
192 bid,
193 // TODO: Better handling of fallible conversion.
194 timestamp: NaiveDateTime::from_timestamp_millis(timestamp_millis as i64).unwrap(),
195 }
196 }
197}
198
199/// We consider a boostable tx equal if all its fields are equal.
200impl PartialEq for BoostableTx {
201 fn eq(&self, other: &Self) -> bool {
202 self.id == other.id && self.bid == other.bid && self.timestamp == other.timestamp
203 }
204}
205
206/// BoostableTx are comparable by bid and ties are broken by timestamp.
207impl PartialOrd for BoostableTx {
208 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
209 match self.bid.cmp(&other.bid) {
210 Ordering::Equal => {
211 // A tx is better if its timestamp is earlier than another tx.
212 match self.timestamp.partial_cmp(&other.timestamp) {
213 Some(Ordering::Less) => Some(Ordering::Greater),
214 Some(Ordering::Equal) => Some(Ordering::Equal),
215 Some(Ordering::Greater) => Some(Ordering::Less),
216 _ => unreachable!(),
217 }
218 }
219 Ordering::Greater => Some(Ordering::Greater),
220 Ordering::Less => Some(Ordering::Less),
221 }
222 }
223}
224
225impl Ord for BoostableTx {
226 fn cmp(&self, other: &Self) -> Ordering {
227 self.partial_cmp(other).unwrap()
228 }
229
230 fn max(self, other: Self) -> Self {
231 if self > other {
232 self
233 } else {
234 other
235 }
236 }
237 fn min(self, other: Self) -> Self {
238 if self < other {
239 self
240 } else {
241 other
242 }
243 }
244 fn clamp(self, min: Self, max: Self) -> Self {
245 if self < min {
246 min
247 } else if self > max {
248 max
249 } else {
250 self
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 macro_rules! bid {
260 ($id:expr, $bid:expr, $millis:expr) => {
261 BoostableTx::new($id, $bid, $millis)
262 };
263 }
264
265 #[tokio::test]
266 async fn normalization_no_bid_no_boost() {
267 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
268 let mut service = TimeBoostService::new(tx_feed);
269
270 // Obtain a channel handle to send txs to the TimeBoostService.
271 let sender = service.sender();
272
273 // Spawn a dedicated thread for the time boost service.
274 std::thread::spawn(move || service.run());
275
276 // Prepare a list of txs with 0 bid and monotonically increasing timestamp.
277 let original_txs = vec![
278 bid!(
279 0, /* ID */
280 0, /* bid */
281 1 /* unix timestamp millis */
282 ),
283 bid!(1, 0, 2),
284 bid!(2, 0, 3),
285 bid!(3, 0, 4),
286 ];
287 for tx in original_txs.iter() {
288 sender.send(tx.clone()).unwrap();
289 }
290
291 let mut txs = vec![];
292 for _ in 0..4 {
293 let tx = timeboost_output_feed.recv().await.unwrap();
294 txs.push(tx);
295 }
296
297 // Assert we received 4 txs from the output feed.
298 assert_eq!(txs.len(), 4);
299
300 // Assert the output is the same as the input input, as transactions had no bids present
301 // to create any reordering in the output sequence.
302 let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
303 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
304 assert_eq!(want, got);
305 }
306
307 #[tokio::test]
308 async fn tx_arrived_until_next_boost_round_with_bid_no_advantage() {
309 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
310 let mut service = TimeBoostService::new(tx_feed);
311
312 // Obtain a channel handle to send txs to the TimeBoostService.
313 let sender = service.sender();
314
315 // Spawn a dedicated thread for the time boost service.
316 std::thread::spawn(move || service.run());
317
318 // Prepare a list of txs with 0 bid and monotonically increasing timestamp.
319 let mut original_txs = vec![
320 bid!(
321 0, /* ID */
322 0, /* bid */
323 1 /* unix timestamp millis */
324 ),
325 bid!(1, 0, 2),
326 bid!(2, 0, 3),
327 bid!(3, 0, 4),
328 ];
329 for tx in original_txs.iter() {
330 sender.send(tx.clone()).unwrap();
331 }
332
333 let late_tx = bid!(4, 100 /* large bid */, 4 + DEFAULT_MAX_BOOST_FACTOR);
334 original_txs.push(late_tx.clone());
335
336 // Wait a boost round and then send the tx.
337 tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
338
339 sender.send(late_tx).unwrap();
340
341 let mut txs = vec![];
342 for _ in 0..5 {
343 let tx = timeboost_output_feed.recv().await.unwrap();
344 txs.push(tx);
345 }
346
347 // Assert we received 5 txs from the output feed.
348 assert_eq!(txs.len(), 5);
349
350 // Assert the output is the same as the input input, as the late tx cannot gain an advantage
351 // even with a high bid because it did not arrive until the second boost round.
352 // to create any reordering in the output sequence.
353 let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
354 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
355 assert_eq!(want, got);
356 }
357
358 #[tokio::test]
359 async fn three_boost_rounds() {
360 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
361 let mut service = TimeBoostService::new(tx_feed);
362
363 // Obtain a channel handle to send txs to the TimeBoostService.
364 let sender = service.sender();
365
366 // Spawn a dedicated thread for the time boost service.
367 std::thread::spawn(move || service.run());
368
369 // Prepare two txs for each round of time boost, with one having a larger bid.
370 // We want to check that they get sorted within their respective rounds by bid, but no
371 // tx can make it over into previous rounds due to their bid.
372 let round1_txs = vec![
373 bid!(
374 0, /* ID */
375 0, /* bid */
376 1 /* unix timestamp millis */
377 ),
378 bid!(1, 50, 2),
379 ];
380 let round2_txs = vec![
381 bid!(
382 2, /* ID */
383 0, /* bid */
384 3 /* unix timestamp millis */
385 ),
386 bid!(3, 100, 4),
387 ];
388 let round3_txs = vec![
389 bid!(
390 4, /* ID */
391 0, /* bid */
392 5 /* unix timestamp millis */
393 ),
394 bid!(5, 200, 6),
395 ];
396 for tx in round1_txs.iter() {
397 sender.send(tx.clone()).unwrap();
398 }
399 // Wait > boost round and then send the next round of txs.
400 tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
401
402 for tx in round2_txs.iter() {
403 sender.send(tx.clone()).unwrap();
404 }
405 // Wait > boost round and then send the tx.
406 tokio::time::sleep(Duration::from_millis(DEFAULT_MAX_BOOST_FACTOR + 100)).await;
407
408 for tx in round3_txs.iter() {
409 sender.send(tx.clone()).unwrap();
410 }
411
412 let mut txs = vec![];
413 for _ in 0..6 {
414 let tx = timeboost_output_feed.recv().await.unwrap();
415 txs.push(tx);
416 }
417 dbg!(&txs);
418
419 // Assert we received 6 txs from the output feed.
420 assert_eq!(txs.len(), 6);
421
422 // Assert the output is the same as the input input, as the late tx cannot gain an advantage
423 // even with a high bid because it did not arrive until the second boost round.
424 // to create any reordering in the output sequence.
425 let want = vec![1, 0, 3, 2, 5, 4];
426 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
427 assert_eq!(want, got);
428 }
429
430 #[tokio::test]
431 async fn all_equal_bids_tiebreak_by_arrival_timestamp() {
432 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
433 let mut service = TimeBoostService::new(tx_feed);
434
435 // Obtain a channel handle to send txs to the TimeBoostService.
436 let sender = service.sender();
437
438 // Spawn a dedicated thread for the time boost service.
439 std::thread::spawn(move || service.run());
440
441 // Prepare a list of time boostable txs with all bids equal.
442 let original_txs = vec![
443 bid!(
444 0, /* ID */
445 100, /* bid */
446 0 /* unix timestamp millis */
447 ),
448 bid!(1, 100, 3),
449 bid!(2, 100, 2), // The two below have the same bid, and we expect tiebreaks by timestamp if this is the case.
450 bid!(3, 100, 1),
451 bid!(4, 100, 6), // The two below have the same bid.
452 bid!(5, 100, 5),
453 bid!(6, 100, 4), // Highest bid, will come first in the output.
454 ];
455 for tx in original_txs.iter() {
456 sender.send(tx.clone()).unwrap();
457 }
458
459 let mut txs = vec![];
460 for _ in 0..7 {
461 let tx = timeboost_output_feed.recv().await.unwrap();
462 txs.push(tx);
463 }
464
465 // Assert we received 7 txs from the output feed.
466 assert_eq!(txs.len(), 7);
467
468 // Expect txs to be sorted by arrival timestamp as all bids were equal.
469 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
470 let want: Vec<u64> = vec![0, 3, 2, 1, 6, 5, 4];
471 assert_eq!(want, got);
472 }
473
474 #[tokio::test]
475 async fn some_equal_bids_tiebreak_by_timestamp() {
476 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
477 let mut service = TimeBoostService::new(tx_feed);
478
479 // Obtain a channel handle to send txs to the TimeBoostService.
480 let sender = service.sender();
481
482 // Spawn a dedicated thread for the time boost service.
483 std::thread::spawn(move || service.run());
484
485 // Prepare a list of time boostable txs with some bids equal.
486 let original_txs = vec![
487 bid!(
488 0, /* ID */
489 1, /* bid */
490 0 /* unix timestamp millis */
491 ),
492 bid!(1, 2, 1),
493 bid!(2, 3, 2), // The two below have the same bid, and we expect tiebreaks by timestamp if this is the case.
494 bid!(3, 3, 3),
495 bid!(4, 5, 4), // The two below have the same bid.
496 bid!(5, 5, 5),
497 bid!(6, 7, 6), // Highest bid, will come first in the output.
498 ];
499 for tx in original_txs.iter() {
500 sender.send(tx.clone()).unwrap();
501 }
502
503 let mut txs = vec![];
504 for _ in 0..7 {
505 let tx = timeboost_output_feed.recv().await.unwrap();
506 txs.push(tx);
507 }
508
509 // Assert we received 7 txs from the output feed.
510 assert_eq!(txs.len(), 7);
511
512 // Assert the output is the same as the reversed input, as
513 // the highest bid txs will be released first.
514 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
515 let want: Vec<u64> = vec![6, 4, 5, 2, 3, 1, 0];
516 assert_eq!(want, got);
517 }
518
519 #[tokio::test]
520 async fn timeboost_same_interval_sort_by_bid() {
521 let (tx_feed, mut timeboost_output_feed) = broadcast::channel(10);
522 let mut service = TimeBoostService::new(tx_feed);
523
524 // Obtain a channel handle to send txs to the TimeBoostService.
525 let sender = service.sender();
526
527 // Spawn a dedicated thread for the time boost service.
528 std::thread::spawn(move || service.run());
529
530 // Prepare a list of time boostable txs with bids and timestamps
531 let mut original_txs = vec![
532 bid!(
533 0, /* ID */
534 1, /* bid */
535 0 /* unix timestamp millis */
536 ),
537 bid!(1, 2, 1),
538 bid!(2, 3, 2),
539 bid!(3, 4, 3),
540 bid!(4, 5, 4),
541 bid!(5, 6, 5),
542 bid!(6, 7, 6),
543 ];
544 for tx in original_txs.iter() {
545 sender.send(tx.clone()).unwrap();
546 }
547
548 let mut txs = vec![];
549 for _ in 0..7 {
550 let tx = timeboost_output_feed.recv().await.unwrap();
551 txs.push(tx);
552 }
553
554 // Assert we received 7 txs from the output feed.
555 assert_eq!(txs.len(), 7);
556
557 // Assert the output is the same as the reversed input, as
558 // the highest bid txs will be released first.
559 original_txs.reverse();
560 let want = original_txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
561 let got = txs.into_iter().map(|tx| tx.id).collect::<Vec<_>>();
562 assert_eq!(want, got);
563 }
564}