general-mq 0.4.3

General purposed interfaces for message queues.
Documentation
use std::{env, error::Error as StdError, sync::Arc, time::Duration};

use async_trait::async_trait;

use general_mq::{
    AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions, MqttConnection,
    MqttConnectionOptions, MqttQueue, MqttQueueOptions,
    connection::{EventHandler as ConnHandler, GmqConnection, Status as ConnStatus},
    queue::{
        EventHandler as QueueHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
    },
};

struct TestConnHandler;

struct TestQueueHandler {
    pub name: String,
}

const TEST_BROADCAST: bool = true;
const TEST_RELIABLE: bool = true;

#[async_trait]
impl ConnHandler for TestConnHandler {
    async fn on_error(
        &self,
        handler_id: String,
        _conn: Arc<dyn GmqConnection>,
        err: Box<dyn StdError + Send + Sync>,
    ) {
        println!("handler_id: {}, ev: {}", handler_id.as_str(), err);
    }

    async fn on_status(
        &self,
        handler_id: String,
        _conn: Arc<dyn GmqConnection>,
        status: ConnStatus,
    ) {
        let status = match status {
            ConnStatus::Closing => "status: closing",
            ConnStatus::Closed => "status: closed",
            ConnStatus::Connecting => "status: connecting",
            ConnStatus::Connected => "status: connected",
            ConnStatus::Disconnected => "status: disconnected",
        };
        println!("handler_id: {}, status: {}", handler_id.as_str(), status);
    }
}

#[async_trait]
impl QueueHandler for TestQueueHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        println!(
            "name: {}, queue: {}, error: {}",
            self.name.as_str(),
            queue.name(),
            err
        );
    }

    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: QueueStatus) {
        let status = match status {
            QueueStatus::Closing => "status: closing",
            QueueStatus::Closed => "status: closed",
            QueueStatus::Connecting => "status: connecting",
            QueueStatus::Connected => "status: connected",
            QueueStatus::Disconnected => "status: disconnected",
        };
        println!(
            "name: {}, queue: {}, status: {}",
            self.name.as_str(),
            queue.name(),
            status
        );
    }
}

#[async_trait]
impl MessageHandler for TestQueueHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        match String::from_utf8(msg.payload().to_vec()) {
            Err(e) => {
                println!(
                    "name {} received bin {:?} with parse error: {}",
                    self.name.as_str(),
                    msg.payload(),
                    e
                );
                match msg.ack().await {
                    Err(e) => println!(
                        "name {} ack {:?} error: {}",
                        self.name.as_str(),
                        msg.payload(),
                        e
                    ),
                    Ok(()) => {
                        println!("name {} ack {:?} ok", self.name.as_str(), msg.payload())
                    }
                }
            }
            Ok(payload) => {
                println!("name {} received {}", self.name.as_str(), payload.as_str());
                match msg.ack().await {
                    Err(e) => println!(
                        "name {} ack {} error: {}",
                        self.name.as_str(),
                        payload.as_str(),
                        e
                    ),
                    Ok(()) => println!("name {} ack {} ok", self.name.as_str(), payload.as_str()),
                }
            }
        };
    }
}

#[tokio::main]
async fn main() {
    let run_mqtt = env::var("RUN_MQTT").is_ok();
    if run_mqtt {
        println!("Run MQTT");
        test_mqtt().await;
    } else {
        println!("Run AMQP");
        test_amqp().await;
    }
}

async fn test_amqp() {
    let opts = AmqpConnectionOptions::default();
    let mut conn = match AmqpConnection::new(opts) {
        Err(e) => {
            println!("new AmqpConnection error: {}", e);
            return;
        }
        Ok(conn) => conn,
    };
    conn.add_handler(Arc::new(TestConnHandler {}));
    conn.add_handler(Arc::new(TestConnHandler {}));

    let opts = AmqpQueueOptions {
        name: "test".to_string(),
        is_recv: false,
        reliable: TEST_RELIABLE,
        broadcast: TEST_BROADCAST,
        reconnect_millis: 1000,
        prefetch: 10,
        ..Default::default()
    };
    let mut send_queue = match AmqpQueue::new(opts, &conn) {
        Err(e) => {
            println!("new AmqpQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    send_queue.set_handler(Arc::new(TestQueueHandler {
        name: "send".to_string(),
    }));
    if let Err(e) = send_queue.connect() {
        println!("connect send queue error: {}", e);
        return;
    }

    let opts = AmqpQueueOptions {
        name: "test".to_string(),
        is_recv: true,
        reliable: TEST_RELIABLE,
        broadcast: TEST_BROADCAST,
        reconnect_millis: 1000,
        prefetch: 10,
        ..Default::default()
    };
    let mut recv_queue1 = match AmqpQueue::new(opts.clone(), &conn) {
        Err(e) => {
            println!("new AmqpQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    let handler = Arc::new(TestQueueHandler {
        name: "recv1".to_string(),
    });
    recv_queue1.set_handler(handler.clone());
    recv_queue1.set_msg_handler(handler);
    if let Err(e) = recv_queue1.connect() {
        println!("connect recv1 queue error: {}", e);
        return;
    }
    let mut recv_queue2 = match AmqpQueue::new(opts, &conn) {
        Err(e) => {
            println!("new AmqpQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    let handler = Arc::new(TestQueueHandler {
        name: "recv2".to_string(),
    });
    recv_queue2.set_handler(handler.clone());
    recv_queue2.set_msg_handler(handler);
    if let Err(e) = recv_queue2.connect() {
        println!("connect recv2 queue error: {}", e);
        return;
    }

    loop {
        if let Err(e) = conn.connect() {
            println!("connect error: {}", e);
            return;
        }
        let mut count = 10;
        while count > 0 {
            tokio::time::sleep(Duration::from_secs(2)).await;
            let str = format!("count {}", count);
            match send_queue.send_msg(str.as_bytes().to_vec()).await {
                Err(e) => println!("send {} error: {}", str, e),
                Ok(()) => println!("send {} ok", str),
            }
            count = count - 1;
        }
        if let Err(e) = conn.close().await {
            println!("close error: {}", e);
            return;
        }
        tokio::time::sleep(Duration::from_secs(5)).await;
    }
}

async fn test_mqtt() {
    let opts = MqttConnectionOptions::default();
    let mut conn = match MqttConnection::new(opts) {
        Err(e) => {
            println!("new MqttConnection error: {}", e);
            return;
        }
        Ok(conn) => conn,
    };
    conn.add_handler(Arc::new(TestConnHandler {}));
    conn.add_handler(Arc::new(TestConnHandler {}));
    let opts = MqttConnectionOptions::default();
    let mut conn2 = match MqttConnection::new(opts) {
        Err(e) => {
            println!("new MqttConnection error: {}", e);
            return;
        }
        Ok(conn) => conn,
    };
    conn2.add_handler(Arc::new(TestConnHandler {}));

    let opts = MqttQueueOptions {
        name: "test".to_string(),
        is_recv: false,
        reliable: TEST_RELIABLE,
        broadcast: TEST_BROADCAST,
        reconnect_millis: 1000,
        shared_prefix: Some("$share/general-mq/".to_string()),
        ..Default::default()
    };
    let mut send_queue = match MqttQueue::new(opts, &conn) {
        Err(e) => {
            println!("new MqttQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    send_queue.set_handler(Arc::new(TestQueueHandler {
        name: "send".to_string(),
    }));
    if let Err(e) = send_queue.connect() {
        println!("connect send queue error: {}", e);
        return;
    }

    let opts = MqttQueueOptions {
        name: "test".to_string(),
        is_recv: true,
        reliable: TEST_RELIABLE,
        broadcast: TEST_BROADCAST,
        reconnect_millis: 1000,
        shared_prefix: Some("$share/general-mq/".to_string()),
        ..Default::default()
    };
    let mut recv_queue1 = match MqttQueue::new(opts.clone(), &conn) {
        Err(e) => {
            println!("new MqttQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    let handler = Arc::new(TestQueueHandler {
        name: "recv1".to_string(),
    });
    recv_queue1.set_handler(handler.clone());
    recv_queue1.set_msg_handler(handler);
    if let Err(e) = recv_queue1.connect() {
        println!("connect recv1 queue error: {}", e);
        return;
    }
    let mut recv_queue2 = match MqttQueue::new(opts, &conn2) {
        Err(e) => {
            println!("new MqttQueue error: {}", e);
            return;
        }
        Ok(queue) => queue,
    };
    let handler = Arc::new(TestQueueHandler {
        name: "recv2".to_string(),
    });
    recv_queue2.set_handler(handler.clone());
    recv_queue2.set_msg_handler(handler);
    if let Err(e) = recv_queue2.connect() {
        println!("connect recv2 queue error: {}", e);
        return;
    }

    loop {
        if let Err(e) = conn.connect() {
            println!("connect error: {}", e);
            return;
        }
        if let Err(e) = conn2.connect() {
            println!("connect 2 error: {}", e);
            return;
        }
        let mut count = 10;
        while count > 0 {
            tokio::time::sleep(Duration::from_secs(2)).await;
            let str = format!("count {}", count);
            match send_queue.send_msg(str.as_bytes().to_vec()).await {
                Err(e) => println!("send {} error: {}", str, e),
                Ok(()) => println!("send {} ok", str),
            }
            count = count - 1;
        }
        if let Err(e) = conn.close().await {
            println!("close error: {}", e);
            return;
        }
        if let Err(e) = conn2.close().await {
            println!("close 2 error: {}", e);
            return;
        }
        tokio::time::sleep(Duration::from_secs(5)).await;
    }
}