use redis::{AsyncCommands, Client};
use crate::{
error::KumoError,
request::{CrawlRequest, FrontierRequest, StoredFrontierRequest},
};
use super::Frontier;
pub struct RedisFrontier {
client: Client,
queue_key: String,
seen_key: String,
}
impl RedisFrontier {
pub async fn new(
url: &str,
queue_key: impl Into<String>,
seen_key: impl Into<String>,
) -> Result<Self, KumoError> {
let client = Client::open(url).map_err(|e| KumoError::store("redis connect", e))?;
let mut conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| KumoError::store("redis get connection", e))?;
let _pong: String = redis::cmd("PING")
.query_async(&mut conn)
.await
.map_err(|e| KumoError::store("redis ping", e))?;
Ok(Self {
client,
queue_key: queue_key.into(),
seen_key: seen_key.into(),
})
}
pub async fn clear(&self) -> Result<(), KumoError> {
let mut conn = self.conn().await?;
redis::pipe()
.del(&self.queue_key)
.del(&self.seen_key)
.query_async::<()>(&mut conn)
.await
.map_err(|e| KumoError::store("redis clear", e))
}
async fn conn(&self) -> Result<redis::aio::MultiplexedConnection, KumoError> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| KumoError::store("redis connection", e))
}
}
#[async_trait::async_trait]
impl Frontier for RedisFrontier {
async fn push(&self, url: String, depth: usize) -> bool {
self.push_request(CrawlRequest::get(url), depth).await
}
async fn push_force(&self, url: String, depth: usize, retry_count: u32) {
self.push_request_force(FrontierRequest::new(
CrawlRequest::get(url),
depth,
retry_count,
))
.await;
}
async fn pop(&self) -> Option<(String, usize, u32)> {
self.pop_request().await.map(|queued| {
(
queued.request().url().to_string(),
queued.depth(),
queued.retry_count(),
)
})
}
async fn push_request(&self, request: CrawlRequest, depth: usize) -> bool {
let Ok(mut conn) = self.conn().await else {
return false;
};
let added: i64 = if request.dont_filter_enabled() {
1
} else {
conn.sadd(&self.seen_key, request.dedup_key())
.await
.unwrap_or(0)
};
if !request.dont_filter_enabled() && added == 0 {
return false;
}
let Ok(entry) = serde_json::to_string(&StoredFrontierRequest::from(&FrontierRequest::new(
request, depth, 0,
))) else {
return false;
};
let _: () = conn.rpush(&self.queue_key, entry).await.unwrap_or(());
true
}
async fn push_request_force(&self, queued: FrontierRequest) {
let Ok(mut conn) = self.conn().await else {
return;
};
let Ok(entry) = serde_json::to_string(&StoredFrontierRequest::from(&queued)) else {
return;
};
let _: () = conn.rpush(&self.queue_key, entry).await.unwrap_or(());
}
async fn pop_request(&self) -> Option<FrontierRequest> {
let mut conn = self.conn().await.ok()?;
let raw: Option<String> = conn.lpop(&self.queue_key, None).await.ok()?;
let raw = raw?;
serde_json::from_str::<StoredFrontierRequest>(&raw)
.ok()
.and_then(|stored| FrontierRequest::try_from(stored).ok())
}
async fn len(&self) -> usize {
let Ok(mut conn) = self.conn().await else {
return 0;
};
conn.llen(&self.queue_key).await.unwrap_or(0)
}
}