use crate::wire::{dispatch_emit, dispatch_send, WireError, WireKind, WireRequest, WireResponse};
use crate::{MicroserviceHandler, Transport, TransportError};
use async_trait::async_trait;
use futures_util::StreamExt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct RedisTransportOptions {
pub url: String,
pub prefix: Option<String>,
pub request_timeout: std::time::Duration,
}
impl RedisTransportOptions {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
prefix: None,
request_timeout: std::time::Duration::from_secs(5),
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
fn channel(&self, pattern: &str) -> String {
match self.prefix.as_deref() {
None => pattern.to_string(),
Some(p) => {
let p = p.trim_end_matches('.');
if p.is_empty() {
pattern.to_string()
} else {
format!("{p}.{pattern}")
}
}
}
}
fn wildcard(&self) -> String {
match self.prefix.as_deref() {
None => "*".to_string(),
Some(p) => {
let p = p.trim_end_matches('.');
if p.is_empty() {
"*".to_string()
} else {
format!("{p}.*")
}
}
}
}
}
#[derive(Clone)]
pub struct RedisTransport {
options: RedisTransportOptions,
client: redis::Client,
seq: Arc<AtomicU64>,
}
impl RedisTransport {
pub fn new(options: RedisTransportOptions) -> Self {
let client = redis::Client::open(options.url.clone())
.unwrap_or_else(|e| panic!("redis client open failed: {e}"));
Self {
options,
client,
seq: Arc::new(AtomicU64::new(1)),
}
}
fn next_id(&self) -> String {
self.seq.fetch_add(1, Ordering::Relaxed).to_string()
}
}
#[async_trait]
impl Transport for RedisTransport {
async fn send_json(
&self,
pattern: &str,
payload: serde_json::Value,
) -> Result<serde_json::Value, TransportError> {
let id = self.next_id();
let reply = format!("__nestrs.reply.{id}");
let channel = self.options.channel(pattern);
let mut pubsub = self
.client
.get_async_pubsub()
.await
.map_err(|e| TransportError::new(format!("redis pubsub failed: {e}")))?;
pubsub
.subscribe(&reply)
.await
.map_err(|e| TransportError::new(format!("redis subscribe failed: {e}")))?;
let wire = WireRequest {
kind: WireKind::Send,
pattern: pattern.to_string(),
payload,
reply: Some(reply.clone()),
correlation_id: None,
};
let text = serde_json::to_string(&wire)
.map_err(|e| TransportError::new(format!("serialize request failed: {e}")))?;
let mut pub_conn = self
.client
.get_multiplexed_async_connection()
.await
.map_err(|e| TransportError::new(format!("redis connect failed: {e}")))?;
redis::cmd("PUBLISH")
.arg(&channel)
.arg(text)
.query_async::<i64>(&mut pub_conn)
.await
.map_err(|e| TransportError::new(format!("redis publish failed: {e}")))?;
let mut stream = pubsub.on_message();
let msg = tokio::time::timeout(self.options.request_timeout, stream.next())
.await
.map_err(|_| TransportError::new("redis request timed out"))?
.ok_or_else(|| TransportError::new("redis request timed out"))?;
let payload: String = msg
.get_payload()
.map_err(|e| TransportError::new(format!("redis reply payload decode failed: {e}")))?;
let wire: WireResponse = serde_json::from_str(&payload)
.map_err(|e| TransportError::new(format!("deserialize response failed: {e}")))?;
if wire.ok {
Ok(wire.payload.unwrap_or(serde_json::Value::Null))
} else {
let err = wire.error.unwrap_or(WireError {
message: "microservice error".to_string(),
details: None,
});
let mut out = TransportError::new(err.message);
if let Some(details) = err.details {
out = out.with_details(details);
}
Err(out)
}
}
async fn emit_json(
&self,
pattern: &str,
payload: serde_json::Value,
) -> Result<(), TransportError> {
let channel = self.options.channel(pattern);
let wire = WireRequest {
kind: WireKind::Emit,
pattern: pattern.to_string(),
payload,
reply: None,
correlation_id: None,
};
let text = serde_json::to_string(&wire)
.map_err(|e| TransportError::new(format!("serialize event failed: {e}")))?;
let mut conn = self
.client
.get_multiplexed_async_connection()
.await
.map_err(|e| TransportError::new(format!("redis connect failed: {e}")))?;
redis::cmd("PUBLISH")
.arg(&channel)
.arg(text)
.query_async::<i64>(&mut conn)
.await
.map_err(|e| TransportError::new(format!("redis publish failed: {e}")))?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct RedisMicroserviceOptions {
pub url: String,
pub prefix: Option<String>,
}
impl RedisMicroserviceOptions {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
prefix: None,
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
}
pub struct RedisMicroserviceServer {
options: RedisTransportOptions,
client: redis::Client,
handlers: Vec<Arc<dyn MicroserviceHandler>>,
}
impl RedisMicroserviceServer {
pub fn new(
options: RedisMicroserviceOptions,
handlers: Vec<Arc<dyn MicroserviceHandler>>,
) -> Self {
let options = RedisTransportOptions {
url: options.url,
prefix: options.prefix,
request_timeout: std::time::Duration::from_secs(5),
};
let client = redis::Client::open(options.url.clone())
.unwrap_or_else(|e| panic!("redis client open failed: {e}"));
Self {
options,
client,
handlers,
}
}
pub async fn listen(self) -> Result<(), TransportError> {
self.listen_with_shutdown(std::future::pending::<()>())
.await
}
pub async fn listen_with_shutdown<F>(self, shutdown: F) -> Result<(), TransportError>
where
F: std::future::Future<Output = ()> + Send + 'static,
{
let mut pubsub = self
.client
.get_async_pubsub()
.await
.map_err(|e| TransportError::new(format!("redis pubsub failed: {e}")))?;
pubsub
.psubscribe(self.options.wildcard())
.await
.map_err(|e| TransportError::new(format!("redis psubscribe failed: {e}")))?;
let handlers = Arc::new(self.handlers);
let mut stream = pubsub.on_message();
tokio::pin!(shutdown);
loop {
tokio::select! {
_ = &mut shutdown => break,
maybe = stream.next() => {
let Some(msg) = maybe else { break; };
let payload: String = match msg.get_payload() {
Ok(v) => v,
Err(_) => continue,
};
let req: WireRequest = match serde_json::from_str(&payload) {
Ok(v) => v,
Err(_) => continue,
};
match req.kind {
WireKind::Send => {
let Some(reply) = req.reply else { continue; };
let handlers = handlers.clone();
let client = self.client.clone();
tokio::spawn(async move {
let res = dispatch_send(&handlers, &req.pattern, req.payload.clone()).await;
let wire = match res {
Ok(v) => WireResponse { ok: true, payload: Some(v), error: None },
Err(e) => WireResponse { ok: false, payload: None, error: Some(WireError { message: e.message, details: e.details }) },
};
if let Ok(text) = serde_json::to_string(&wire) {
if let Ok(mut conn) = client.get_multiplexed_async_connection().await
{
let _ = redis::cmd("PUBLISH")
.arg(&reply)
.arg(text)
.query_async::<i64>(&mut conn)
.await;
}
}
});
}
WireKind::Emit => {
let handlers = handlers.clone();
tokio::spawn(async move {
dispatch_emit(&handlers, &req.pattern, req.payload.clone()).await;
});
}
}
}
}
}
Ok(())
}
}
#[async_trait]
impl crate::MicroserviceServer for RedisMicroserviceServer {
async fn listen_with_shutdown(
self: Box<Self>,
shutdown: crate::ShutdownFuture,
) -> Result<(), TransportError> {
(*self).listen_with_shutdown(shutdown).await
}
}