#[derive(Clone)]
pub struct RedisSet {
manager: redis::aio::ConnectionManager,
key: String,
ttl_seconds: usize,
}
impl RedisSet {
pub async fn new(url: &str, key: &str, ttl_seconds: u64) -> anyhow::Result<Self> {
Self::new_with_name(url, key, ttl_seconds, None).await
}
pub async fn new_with_name(
url: &str,
key: &str,
ttl_seconds: u64,
connection_name: Option<&str>,
) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
let mut manager = client.get_connection_manager().await?;
if let Some(name) = connection_name {
redis::cmd("CLIENT")
.arg("SETNAME")
.arg(name)
.query_async::<()>(&mut manager)
.await?;
}
Ok(Self {
manager,
key: key.to_owned(),
ttl_seconds: ttl_seconds as usize,
})
}
pub async fn new_sentinel(
sentinel_urls: &[&str],
master_name: &str,
key: &str,
ttl_seconds: u64,
connection_name: Option<&str>,
) -> anyhow::Result<Self> {
use redis::sentinel::Sentinel;
let mut sentinel = Sentinel::build(sentinel_urls.to_vec())?;
let client = sentinel.async_master_for(master_name, None).await?;
let mut manager = client.get_connection_manager().await?;
if let Some(name) = connection_name {
redis::cmd("CLIENT")
.arg("SETNAME")
.arg(name)
.query_async::<()>(&mut manager)
.await?;
}
Ok(Self {
manager,
key: key.to_owned(),
ttl_seconds: ttl_seconds as usize,
})
}
pub async fn add_items(&self, items: &[String]) -> anyhow::Result<()> {
if items.is_empty() {
return Ok(());
}
let mut conn = self.manager.clone();
let mut pipe = redis::pipe();
for item in items {
pipe.cmd("SADD").arg(&self.key).arg(item);
}
pipe.cmd("EXPIRE").arg(&self.key).arg(self.ttl_seconds);
pipe.query_async::<()>(&mut conn).await?;
Ok(())
}
pub async fn remove_items(&self, items: &[String]) -> anyhow::Result<()> {
if items.is_empty() {
return Ok(());
}
let mut conn = self.manager.clone();
let mut pipe = redis::pipe();
for item in items {
pipe.cmd("SREM").arg(&self.key).arg(item);
}
pipe.query_async::<()>(&mut conn).await?;
Ok(())
}
pub async fn add_item(&self, item: &str) -> anyhow::Result<()> {
self.add_items(&[item.to_owned()]).await
}
pub async fn remove_item(&self, item: &str) -> anyhow::Result<()> {
self.remove_items(&[item.to_owned()]).await
}
pub async fn load_items(&self) -> anyhow::Result<Vec<String>> {
let mut conn = self.manager.clone();
let entries: Vec<String> = redis::cmd("SMEMBERS")
.arg(&self.key)
.query_async(&mut conn)
.await?;
Ok(entries)
}
pub async fn trim_to(&self, max_entries: usize) -> anyhow::Result<()> {
if max_entries == 0 {
return Ok(());
}
let mut conn = self.manager.clone();
let count: usize = redis::cmd("SCARD")
.arg(&self.key)
.query_async(&mut conn)
.await?;
if count > max_entries {
let excess = count - max_entries;
let _: () = redis::cmd("SPOP")
.arg(&self.key)
.arg(excess)
.query_async(&mut conn)
.await?;
}
Ok(())
}
}