use crate::error::{Result, ShoveError};
use crate::topology::QueueTopology;
use super::client::{RedisClient, RedisConnection};
pub struct RedisTopologyDeclarer {
client: RedisClient,
}
impl RedisTopologyDeclarer {
pub fn new(client: RedisClient) -> Self {
Self { client }
}
pub fn shard_stream_name(main_queue: &str, shard: u16) -> String {
format!("{main_queue}-seq-{shard}")
}
pub fn hold_set_name(hold_queue_name: &str) -> String {
format!("{hold_queue_name}:pending")
}
pub async fn declare(&self, topology: &QueueTopology) -> Result<()> {
let mut conn = self.client.multiplexed_conn().await?;
if let Some(seq) = topology.sequencing() {
for shard_idx in 0..seq.routing_shards() {
let stream_name = Self::shard_stream_name(topology.queue(), shard_idx);
Self::ensure_stream_and_group(&mut conn, &stream_name, self.client.group()).await?;
}
} else {
Self::ensure_stream_and_group(&mut conn, topology.queue(), self.client.group()).await?;
}
if let Some(dlq) = topology.dlq() {
Self::ensure_stream_and_group(&mut conn, dlq, self.client.group()).await?;
}
Ok(())
}
async fn ensure_stream_and_group(
conn: &mut RedisConnection,
stream: &str,
group: &str,
) -> Result<()> {
let mut cmd = redis::cmd("XGROUP");
cmd.arg("CREATE")
.arg(stream)
.arg(group)
.arg("$")
.arg("MKSTREAM");
match conn.query::<()>(&mut cmd).await {
Ok(_) => Ok(()),
Err(ShoveError::Connection(e)) => {
if e.contains("BUSYGROUP") {
Ok(())
} else {
Err(ShoveError::Topology(format!(
"failed to create stream/group {}/{}: {}",
stream, group, e
)))
}
}
Err(other) => Err(other),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hold_set_name_format() {
assert_eq!(
RedisTopologyDeclarer::hold_set_name("orders-hold-5s"),
"orders-hold-5s:pending"
);
}
#[test]
fn shard_stream_name_format() {
assert_eq!(
RedisTopologyDeclarer::shard_stream_name("ledger", 3),
"ledger-seq-3"
);
}
#[test]
fn shard_stream_name_zero() {
assert_eq!(
RedisTopologyDeclarer::shard_stream_name("ledger", 0),
"ledger-seq-0"
);
}
#[test]
fn shard_stream_name_large_index() {
assert_eq!(
RedisTopologyDeclarer::shard_stream_name("payments", u16::MAX),
format!("payments-seq-{}", u16::MAX)
);
assert_eq!(
RedisTopologyDeclarer::shard_stream_name("orders", 1000),
"orders-seq-1000"
);
}
#[test]
fn hold_set_name_with_special_chars() {
assert_eq!(
RedisTopologyDeclarer::hold_set_name("my:queue-hold"),
"my:queue-hold:pending"
);
assert_eq!(RedisTopologyDeclarer::hold_set_name(""), ":pending");
}
}