use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
};
use futures_util::StreamExt;
use rustis::{
client::Client,
commands::{ListCommands, PubSubCommands},
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::net::TcpListener;
const POLL_TIMEOUT: Duration = Duration::from_secs(10);
pub struct RedisClients {
pub regular: Client,
pub sub: Client,
}
#[tokio::main]
async fn main() {
let redis_uri = "redis://127.0.0.1:6379";
let redis_clients = Arc::new(RedisClients {
regular: Client::connect(redis_uri).await.unwrap(),
sub: Client::connect(redis_uri).await.unwrap(),
});
let app = Router::new()
.route("/:key", get(poll_messages).post(publish))
.with_state(redis_clients);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("listening on {addr}");
let listener = TcpListener::bind(&addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn poll_messages(
State(redis): State<Arc<RedisClients>>,
Path(channel): Path<String>,
) -> Result<Json<Vec<String>>, ServiceError> {
let messages = get_messages_from_queue(&redis.regular, &channel).await?;
if POLL_TIMEOUT.is_zero() || !messages.is_empty() {
return Ok(Json(messages));
}
let mut sub_stream = redis.sub.subscribe(&channel).await?;
let msg = tokio::time::timeout(POLL_TIMEOUT, sub_stream.next()).await;
let messages: Vec<String> = match msg {
Ok(Some(Ok(_msg))) => get_messages_from_queue(&redis.regular, &channel).await?,
Ok(Some(Err(e))) => {
return Err(ServiceError::new(
StatusCode::INTERNAL_SERVER_ERROR,
format!("error received from PubSubStream: {e}"),
));
}
Ok(None) => Vec::new(),
Err(_e) => Vec::new(),
};
Ok(Json(messages))
}
async fn get_messages_from_queue(
redis: &Client,
channel: &str,
) -> Result<Vec<String>, ServiceError> {
Ok(redis.lpop(channel, i32::MAX as usize).await?)
}
async fn publish(
State(redis): State<Arc<RedisClients>>,
Path(channel): Path<String>,
message: String,
) -> Result<(), ServiceError> {
if message.is_empty() {
return Err(ServiceError::new(
StatusCode::BAD_REQUEST,
"Message not provided",
));
};
redis.regular.lpush(&channel, &message).await?;
redis.regular.publish(&channel, "new").await?;
Ok(())
}
struct ServiceError(StatusCode, String);
impl ServiceError {
fn new(status_code: StatusCode, description: impl ToString) -> Self {
Self(status_code, description.to_string())
}
}
impl IntoResponse for ServiceError {
fn into_response(self) -> Response {
(self.0, self.1).into_response()
}
}
impl From<rustis::Error> for ServiceError {
fn from(e: rustis::Error) -> Self {
eprintln!("rustis error: {e}");
ServiceError::new(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error")
}
}