use crate::functions::{CachedScript, RsmqFunctions};
use crate::r#trait::RsmqConnectionSync;
use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::{RsmqError, RsmqResult};
use core::convert::TryFrom;
use core::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;
#[derive(Clone)]
struct RedisConnection(redis::aio::MultiplexedConnection);
impl std::fmt::Debug for RedisConnection {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MultiplexedRedisAsyncConnnection")
}
}
#[derive(Debug, Clone)]
pub struct RsmqSync {
connection: RedisConnection,
functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
runner: Arc<Runtime>,
scripts: CachedScript,
}
impl RsmqSync {
pub async fn new(options: RsmqOptions) -> RsmqResult<RsmqSync> {
let runner = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| RsmqError::TokioStart(e.into()))?;
let mut redis_info = redis::RedisConnectionInfo::default()
.set_db(options.db.into())
.set_protocol(options.protocol);
if let Some(username) = options.username {
redis_info = redis_info.set_username(username);
}
if let Some(password) = options.password {
redis_info = redis_info.set_password(password);
}
let conn_info = format!("redis://{}:{}", options.host, options.port)
.parse::<redis::ConnectionInfo>()?
.set_redis_settings(redis_info);
let client = redis::Client::open(conn_info)?;
let functions = RsmqFunctions {
ns: options.ns,
realtime: options.realtime,
conn: PhantomData,
};
let (connection, scripts) = runner.block_on(async {
let mut conn = client.get_multiplexed_async_connection().await?;
let scripts = functions.load_scripts(&mut conn).await?;
Result::<_, RsmqError>::Ok((conn, scripts))
})?;
Ok(RsmqSync {
connection: RedisConnection(connection),
functions,
runner: Arc::new(runner),
scripts,
})
}
}
impl RsmqConnectionSync for RsmqSync {
fn change_message_visibility(
&mut self,
qname: &str,
message_id: &str,
hidden: Duration,
) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.change_message_visibility(
&mut self.connection.0,
qname,
message_id,
hidden,
&self.scripts,
)
.await
})
}
fn create_queue(
&mut self,
qname: &str,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
.await
})
}
fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
self.runner.block_on(async {
self.functions
.delete_message(&mut self.connection.0, qname, id)
.await
})
}
fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
self.runner.block_on(async {
self.functions
.delete_queue(&mut self.connection.0, qname)
.await
})
}
fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
self.runner.block_on(async {
self.functions
.get_queue_attributes(&mut self.connection.0, qname, &self.scripts)
.await
})
}
fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
self.runner
.block_on(async { self.functions.list_queues(&mut self.connection.0).await })
}
fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.runner.block_on(async {
self.functions
.pop_message::<E>(&mut self.connection.0, qname, &self.scripts)
.await
})
}
fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
&mut self,
qname: &str,
hidden: Option<Duration>,
) -> RsmqResult<Option<RsmqMessage<E>>> {
self.runner.block_on(async {
self.functions
.receive_message::<E>(&mut self.connection.0, qname, hidden, &self.scripts)
.await
})
}
fn send_message<E: Into<RedisBytes> + Send>(
&mut self,
qname: &str,
message: E,
delay: Option<Duration>,
) -> RsmqResult<String> {
self.runner.block_on(async {
self.functions
.send_message(&mut self.connection.0, qname, message, delay)
.await
})
}
fn set_queue_attributes(
&mut self,
qname: &str,
hidden: Option<Duration>,
delay: Option<Duration>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.runner.block_on(async {
self.functions
.set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize, &self.scripts)
.await
})
}
}