rsmq_async 16.0.0

Async RSMQ port to rust. RSMQ is a simple redis queue system that works in any redis v2.4+. It contains the same methods as the original one in https://github.com/smrchy/rsmq
Documentation
use crate::functions::{CachedScript, RsmqFunctions};
use crate::r#trait::RsmqConnectionSync;
use crate::types::{RedisBytes, RsmqMessage, RsmqOptions, RsmqQueueAttributes};
use crate::{RsmqError, RsmqResult};
use core::convert::TryFrom;
use core::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;

#[derive(Clone)]
struct RedisConnection(redis::aio::MultiplexedConnection);

impl std::fmt::Debug for RedisConnection {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "MultiplexedRedisAsyncConnnection")
    }
}

#[derive(Debug, Clone)]
pub struct RsmqSync {
    connection: RedisConnection,
    functions: RsmqFunctions<redis::aio::MultiplexedConnection>,
    runner: Arc<Runtime>,
    scripts: CachedScript,
}

impl RsmqSync {
    /// Creates a new RSMQ instance, including its connection
    pub async fn new(options: RsmqOptions) -> RsmqResult<RsmqSync> {
        let runner = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .map_err(|e| RsmqError::TokioStart(e.into()))?;

        let mut redis_info = redis::RedisConnectionInfo::default()
            .set_db(options.db.into())
            .set_protocol(options.protocol);
        if let Some(username) = options.username {
            redis_info = redis_info.set_username(username);
        }
        if let Some(password) = options.password {
            redis_info = redis_info.set_password(password);
        }
        let conn_info = format!("redis://{}:{}", options.host, options.port)
            .parse::<redis::ConnectionInfo>()?
            .set_redis_settings(redis_info);

        let client = redis::Client::open(conn_info)?;

        let functions = RsmqFunctions {
            ns: options.ns,
            realtime: options.realtime,
            conn: PhantomData,
        };

        let (connection, scripts) = runner.block_on(async {
            let mut conn = client.get_multiplexed_async_connection().await?;
            let scripts = functions.load_scripts(&mut conn).await?;
            Result::<_, RsmqError>::Ok((conn, scripts))
        })?;

        Ok(RsmqSync {
            connection: RedisConnection(connection),
            functions,
            runner: Arc::new(runner),
            scripts,
        })
    }
}

impl RsmqConnectionSync for RsmqSync {
    fn change_message_visibility(
        &mut self,
        qname: &str,
        message_id: &str,
        hidden: Duration,
    ) -> RsmqResult<()> {
        self.runner.block_on(async {
            self.functions
                .change_message_visibility(
                    &mut self.connection.0,
                    qname,
                    message_id,
                    hidden,
                    &self.scripts,
                )
                .await
        })
    }

    fn create_queue(
        &mut self,
        qname: &str,
        hidden: Option<Duration>,
        delay: Option<Duration>,
        maxsize: Option<i64>,
    ) -> RsmqResult<()> {
        self.runner.block_on(async {
            self.functions
                .create_queue(&mut self.connection.0, qname, hidden, delay, maxsize)
                .await
        })
    }

    fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
        self.runner.block_on(async {
            self.functions
                .delete_message(&mut self.connection.0, qname, id)
                .await
        })
    }
    fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
        self.runner.block_on(async {
            self.functions
                .delete_queue(&mut self.connection.0, qname)
                .await
        })
    }
    fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
        self.runner.block_on(async {
            self.functions
                .get_queue_attributes(&mut self.connection.0, qname, &self.scripts)
                .await
        })
    }

    fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
        self.runner
            .block_on(async { self.functions.list_queues(&mut self.connection.0).await })
    }

    fn pop_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
        &mut self,
        qname: &str,
    ) -> RsmqResult<Option<RsmqMessage<E>>> {
        self.runner.block_on(async {
            self.functions
                .pop_message::<E>(&mut self.connection.0, qname, &self.scripts)
                .await
        })
    }

    fn receive_message<E: TryFrom<RedisBytes, Error = Vec<u8>>>(
        &mut self,
        qname: &str,
        hidden: Option<Duration>,
    ) -> RsmqResult<Option<RsmqMessage<E>>> {
        self.runner.block_on(async {
            self.functions
                .receive_message::<E>(&mut self.connection.0, qname, hidden, &self.scripts)
                .await
        })
    }

    fn send_message<E: Into<RedisBytes> + Send>(
        &mut self,
        qname: &str,
        message: E,
        delay: Option<Duration>,
    ) -> RsmqResult<String> {
        self.runner.block_on(async {
            self.functions
                .send_message(&mut self.connection.0, qname, message, delay)
                .await
        })
    }

    fn set_queue_attributes(
        &mut self,
        qname: &str,
        hidden: Option<Duration>,
        delay: Option<Duration>,
        maxsize: Option<i64>,
    ) -> RsmqResult<RsmqQueueAttributes> {
        self.runner.block_on(async {
            self.functions
                .set_queue_attributes(&mut self.connection.0, qname, hidden, delay, maxsize, &self.scripts)
                .await
        })
    }
}