use std::env;
use std::io::{self, Write};
use std::str::FromStr;
use std::time::Instant;
use broccoli_queue::queue::{BroccoliQueue, ConsumeOptions};
use criterion::{criterion_group, criterion_main, Criterion};
use serde::{Deserialize, Serialize};
use surrealdb::engine::any::connect;
use surrealdb::{engine::any::Any, RecordId};
use surrealdb::{Response, Surreal};
use tokio::runtime::Runtime;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BenchmarkMessage {
id: RecordId, data: String,
timestamp: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BenchmarkMessageEntry {
message_id: RecordId, priority: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BenchmarkMessageIndex {
queue_id: RecordId, }
fn read_param(s: &str) -> &str {
&s[s.find("=").unwrap() + 1..]
}
async fn setup_surrealdb(url: Option<String>) -> Surreal<Any> {
match url {
Some(url) => {
let i = url.find("?").unwrap();
let c = &url[0..i];
let url = &url[i..];
let url: Vec<&str> = url.split("&").collect();
let u = read_param(url[0]);
let p = read_param(url[1]);
let ns = read_param(url[2]);
let database = read_param(url[3]);
let db = connect(c).await.unwrap();
db.signin(surrealdb::opt::auth::Root {
username: u,
password: p,
})
.await
.unwrap();
db.use_ns(ns).use_db(database).await.unwrap();
db
}
None => {
let url = "mem://".to_string();
let db = connect(url).await.unwrap();
db.use_ns("app").await.unwrap();
db.use_db("app").await.unwrap();
db
}
}
}
async fn setup_broccoli(url: String) -> BroccoliQueue {
BroccoliQueue::builder(url)
.pool_connections(10)
.build()
.await
.unwrap()
}
fn target_counter(message_count: usize) -> usize {
let target = ((message_count * (message_count + 1)) as f64) / 2_f64;
(target.floor() as usize) - 2
}
async fn generate_test_messages(queue_name: &str, n: usize) -> Vec<BenchmarkMessage> {
let messages: Vec<BenchmarkMessage> = (0..n)
.map(|i| BenchmarkMessage {
id: RecordId::from_str(&format!("{queue_name}:{i}")).unwrap(),
data: format!("test data {}{}", i, if i == n - 1 { " [last]" } else { "" }),
timestamp: time::OffsetDateTime::now_utc().unix_timestamp(),
})
.collect();
messages
}
async fn consume_loop(
queue: &BroccoliQueue,
queue_name: &str,
message_count: usize,
) -> Vec<BenchmarkMessage> {
let mut consumed: Vec<BenchmarkMessage> = Vec::with_capacity(message_count);
let mut counter = target_counter(message_count);
for _ in 0..message_count {
let msg = queue
.consume::<BenchmarkMessage>(queue_name, None)
.await
.unwrap();
consumed.push(msg.payload.clone());
let i = msg
.payload
.id
.key()
.to_string()
.parse::<usize>()
.expect("message id was not parseable");
counter -= i;
let is_last = counter == 0;
io::stderr().flush().unwrap();
queue.acknowledge(queue_name, msg).await.unwrap();
if is_last {
break;
}
}
consumed
}
fn to_rfc3339<T>(dt: T) -> Result<std::string::String, time::error::Format>
where
T: Into<time::OffsetDateTime>,
{
dt.into()
.format(&time::format_description::well_known::Rfc3339)
}
async fn benchmark_raw_surrealdb_throughput(db: &Surreal<Any>, message_count: usize) -> (f64, f64) {
let queue_name = "bench_raw_surrealdb";
let index_table = format!("{queue_name}___index");
let queue_table = format!("{queue_name}___queue");
let processing_table = format!("{queue_name}___processing");
let messages = generate_test_messages(queue_name, message_count).await;
let now = Instant::now();
for msg in messages {
let _: Option<BenchmarkMessage> = db.create(&msg.id).content(msg.clone()).await.unwrap();
let id = msg.id.key().clone();
let now = to_rfc3339(time::OffsetDateTime::now_utc()).unwrap();
let queue_record_str = format!("{queue_table}:[5,<datetime>'{now}',{id}]");
let queue_record_id = RecordId::from_str(&queue_record_str).unwrap();
let message_record_id: RecordId = msg.id.clone();
let _: Option<BenchmarkMessageEntry> = db
.create(queue_record_id.clone())
.content(BenchmarkMessageEntry {
message_id: message_record_id, priority: 5,
})
.await
.unwrap();
let _: Option<BenchmarkMessageIndex> = db
.create((index_table.clone(), id))
.content(BenchmarkMessageIndex {
queue_id: queue_record_id, })
.await
.unwrap();
}
for _ in 0..message_count {
let q = "
BEGIN TRANSACTION;
{
LET $m = {
FOR $p IN 1..=5 {
LET $output = SELECT * FROM ONLY type::thing($queue_table,type::range([[$p,None],[$p,time::now()]])) LIMIT 1;
IF $output {
RETURN $output;
};
};
};
IF !$m {
RETURN NONE // no data available
};
-- upserting will be more robust and not freeze the queue if there is a duplicate
UPSERT type::table($processing_table) CONTENT {
// loses the uuid, see https://github.com/surrealdb/surrealdb/issues/6104
//id: type::thing($acc.t_, $e.id[2]), // id[2] is the uuid
// we forcefully add it
id: type::record($acc.t_+':u\\''+<string>$e.id[2]+'\\''), // id[2] is the uuid
message_id: $m.message_id,
priority: $m.priority
};
-- remove from queue and return payload
-- remember we don't delete from index, instead acknowledge/reject/cancel will do it
LET $deleted = DELETE $m.id RETURN BEFORE;
IF !$deleted {
THROW 'Transaction failed as $m.id already deleted (CONCURRENT_READ)';
};
LET $payload = SELECT * FROM $m.message_id;
$payload
};
COMMIT TRANSACTION;
";
let queued: Response = db
.query(q)
.bind(("queue_table", queue_table.clone()))
.bind(("processing_table", processing_table.clone()))
.await
.unwrap();
let queued: Option<BenchmarkMessage> = match queued.check() {
Ok(mut queued) => queued.take(queued.num_statements() - 1).unwrap(),
Err(e) => panic!("Could not do the raw transaction {e:?}"),
};
let queued = match queued {
Some(queued) => queued,
None => continue,
};
let message_id = queued.id.key().to_string();
let message_id: Vec<&str> = message_id.split(",").collect();
let message_id = message_id[0];
let message_id = message_id.replace("]", "");
let message_id = message_id.trim();
let message_record_id =
RecordId::from_str(&format!("{processing_table}:{message_id}")).unwrap();
let processing: Option<BenchmarkMessageEntry> = db.delete(message_record_id).await.unwrap();
processing.unwrap();
let payload_record_id = RecordId::from_str(&format!("{queue_name}:{message_id}")).unwrap();
let removed: Option<BenchmarkMessage> = db.delete(payload_record_id).await.unwrap();
removed.unwrap();
let index_record_id =
RecordId::from_str(&format!("{}:{}", index_table.clone(), message_id)).unwrap();
let removed: Option<BenchmarkMessageIndex> = db.delete(index_record_id).await.unwrap();
removed.unwrap();
}
let total_time = now.elapsed().as_secs_f64();
let throughput = message_count as f64 / total_time;
let avg_latency = total_time / message_count as f64;
(throughput, avg_latency)
}
async fn benchmark_broccoli_batch_publish_consume_throughput(
queue: &BroccoliQueue,
message_count: usize,
) -> (f64, f64) {
let queue_name = "bench_broccoli";
let messages = generate_test_messages(queue_name, message_count).await;
let now = Instant::now();
let published = queue
.publish_batch(queue_name, None, messages, None)
.await
.expect("Could not publish");
let n = published.len();
if n != message_count {
panic!("Only published {message_count}/{n} messages");
}
tokio::time::sleep(tokio::time::Duration::ZERO).await;
let consumed = consume_loop(queue, queue_name, message_count).await;
let n = consumed.len();
if n != message_count {
panic!("Only consumed {message_count}/{n} messages");
}
let total_time = now.elapsed().as_secs_f64();
let throughput = message_count as f64 / total_time;
let avg_latency = total_time / message_count as f64;
(throughput, avg_latency)
}
async fn benchmark_broccoli_consume_loop_throughput(
queue: &BroccoliQueue,
message_count: usize,
) -> (f64, f64) {
let queue_name = "bench_broccoli";
let messages = generate_test_messages(queue_name, message_count).await;
queue
.publish_batch(queue_name, None, messages, None)
.await
.expect("Could not publish");
tokio::time::sleep(tokio::time::Duration::ZERO).await;
let now = Instant::now();
let consumed = consume_loop(queue, queue_name, message_count).await;
let n = consumed.len();
if n != message_count {
panic!("Only consumed {message_count}/{n} messages");
}
let total_time = now.elapsed().as_secs_f64();
let throughput = message_count as f64 / total_time;
let avg_latency = total_time / message_count as f64;
(throughput, avg_latency)
}
async fn process_job(m: BenchmarkMessage) -> Result<(), broccoli_queue::error::BroccoliError> {
let i =
m.id.key()
.to_string()
.parse::<usize>()
.expect("message id was not parseable");
let mut _mutex = handler_counter.lock().await;
let mut v = *_mutex;
v = v.checked_sub(i).expect("repeated message");
*_mutex = v;
let finished = *_mutex == 0;
let _mutex = 0;
if finished {
let handler_token = {
let mut _mutex = handler_mutex.lock().await;
_mutex.clone()
};
handler_token.cancel();
}
Ok(())
}
lazy_static::lazy_static! {
static ref handler_mutex: std::sync::Arc<tokio::sync::Mutex<CancellationToken>> =
std::sync::Arc::new(tokio::sync::Mutex::new(CancellationToken::new()));
static ref handler_counter: std::sync::Arc<tokio::sync::Mutex<usize>> =
std::sync::Arc::new(tokio::sync::Mutex::new(0));
}
async fn benchmark_broccoli_batch_handler_throughput(
queue: &BroccoliQueue,
options: Option<ConsumeOptions>,
message_count: usize,
) -> (f64, f64) {
let queue_name = "bench_handler_broccoli";
let shared_token = CancellationToken::new();
let handler_token = shared_token.clone();
let target_counter = target_counter(message_count);
{
let mut _mutex = handler_mutex.lock().await;
*_mutex = handler_token;
let _mutex = 0;
let mut _mutex = handler_counter.lock().await;
*_mutex = target_counter;
let _mutex = 0;
}
let queue_clone = queue.clone();
let consumer = tokio::spawn(async move {
tokio::select! {
_ = shared_token.cancelled() => {
}
_ = queue_clone
.process_messages(queue_name, Some(5), options, |msg| async {
process_job(msg.payload).await
})
=> {
}
};
});
tokio::time::sleep(tokio::time::Duration::ZERO).await;
let messages = generate_test_messages(queue_name, message_count).await;
let now = Instant::now();
let published = queue
.publish_batch(queue_name, None, messages, None)
.await
.expect("Could not publish");
let n = published.len();
if n != message_count {
panic!("Only published {message_count}/{n} messages");
}
let _ = consumer.await;
let total_time = now.elapsed().as_secs_f64();
let throughput = message_count as f64 / total_time;
let avg_latency = total_time / message_count as f64;
(throughput, avg_latency)
}
fn criterion_benchmark(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("Queue Performance");
let url = env::var("SURREALDB_URL");
if url.is_err() {
panic!("Missing SURREALDB_URL env var (in order: ws://localhost:8001?username=<USERNAME>&password=<PASSWD>&ns=test&database=broker)")
};
let url = url.ok();
let db_mem = rt.block_on(setup_surrealdb(None));
let db = rt.block_on(setup_surrealdb(url.clone()));
let broccoli_mem_queue = rt.block_on(setup_broccoli("mem://".to_string()));
let broccoli_queue = rt.block_on(setup_broccoli(url.clone().unwrap()));
let instances = vec![
(db_mem, broccoli_mem_queue, "mem"),
(db, broccoli_queue, "disk"),
];
let consume_options = [None, Some(ConsumeOptions::builder().auto_ack(true).build())];
let message_counts = [1, 10, 100];
for (db, broccoli_queue, instance) in instances {
for &count in &message_counts {
for options in &consume_options {
let opt_str = if options.is_some() {
" [auto-ack] "
} else {
""
};
group.bench_function(
format!(
"Broccoli surrealdb batch_publish + consume loop {instance}{opt_str} - {count}"
),
|b| {
b.iter(|| {
rt.block_on(async {
benchmark_broccoli_batch_publish_consume_throughput(
&broccoli_queue,
count,
)
.await
})
})
},
);
group.bench_function(
format!("Broccoli surrealdb consume loop {instance}{opt_str} - {count}"),
|b| {
b.iter(|| {
rt.block_on(async {
benchmark_broccoli_consume_loop_throughput(&broccoli_queue, count)
.await
})
})
},
);
}
}
}
group.finish();
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);