pipeamqp 0.1.3

A pipebase plugin using lapin
Documentation
use super::connection::ConnectionConfig;
use async_trait::async_trait;
use futures_util::stream::StreamExt;
use lapin::{
    options::{BasicAckOptions, BasicConsumeOptions},
    types::FieldTable,
    Channel, Connection, ConnectionProperties,
};
use pipebase::{
    common::{ConfigInto, FromConfig, FromPath},
    listen::Listen,
};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use tokio_amqp::LapinTokioExt;

#[derive(Deserialize)]
pub struct AmqpConsumerConfig {
    connection: ConnectionConfig,
    queue: String,
    consumer_tag: String,
}

impl FromPath for AmqpConsumerConfig {}

impl ConfigInto<AmqpConsumer> for AmqpConsumerConfig {}

pub struct AmqpConsumer {
    channel: Channel,
    queue: String,
    consumer_tag: String,
    tx: Option<Sender<Vec<u8>>>,
}

#[async_trait]
impl FromConfig<AmqpConsumerConfig> for AmqpConsumer {
    async fn from_config(config: AmqpConsumerConfig) -> anyhow::Result<Self> {
        let uri = config.connection.uri;
        let queue = config.queue;
        let consumer_tag = config.consumer_tag;
        let connection =
            Connection::connect(&uri, ConnectionProperties::default().with_tokio()).await?;
        let channel = connection.create_channel().await?;
        Ok(AmqpConsumer {
            channel,
            queue,
            consumer_tag,
            tx: None,
        })
    }
}

#[async_trait]
impl Listen<Vec<u8>, AmqpConsumerConfig> for AmqpConsumer {
    async fn run(&mut self) -> anyhow::Result<()> {
        let mut consumer = self
            .channel
            .basic_consume(
                &self.queue,
                &self.consumer_tag,
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;
        let tx = self.tx.as_ref().expect("sender not set for AmqpConsumer");
        while let Some(item) = consumer.next().await {
            let (_, delivery) = item?;
            delivery
                .ack(BasicAckOptions::default())
                .await
                .expect("failed to ack");
            tx.send(delivery.data).await?;
        }
        Ok(())
    }

    fn set_sender(&mut self, sender: Sender<Vec<u8>>) {
        self.tx = Some(sender)
    }
}