stream-consumer-task 0.1.0

Start asynchronous task in background to consume stream
Documentation
use std::time::Duration;

use lapin::Connection;
use stream_consumer_task::StreamConsumerTask;
use tokio::{
    select,
    signal::unix::{signal, SignalKind},
    spawn,
    task::JoinSet,
    time::sleep,
};

// main

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let queue_name = "test";
    let conn = Connection::connect("amqp://127.0.0.1:5672/%2f", Default::default()).await?;
    let chan_csm = conn.create_channel().await?;
    chan_csm
        .queue_declare(queue_name, Default::default(), Default::default())
        .await?;
    let csm = chan_csm
        .basic_consume(queue_name, "", Default::default(), Default::default())
        .await?;
    let csm = StreamConsumerTask::start(csm, |delivery, mut stop_rx| async move {
        match delivery {
            Ok(delivery) => {
                let res: anyhow::Result<()> = async {
                    let payload = String::from_utf8(delivery.data.clone())?;
                    select! {
                        _ = sleep(Duration::from_secs(5)) => {
                            println!("{payload} - processed");
                            delivery.ack(Default::default()).await?;
                        }
                        _ = stop_rx.changed() => {
                            println!("{payload} - stopped");
                        }
                    }
                    Ok(())
                }
                .await;
                if let Err(err) = res {
                    eprintln!("failed to handle delivery: {err}");
                }
            }
            Err(err) => {
                eprintln!("failed to handle delivery: {err}");
            }
        }
    });
    let chan_prd = conn.create_channel().await?;
    let mut idx = 0;
    let _task = spawn(async move {
        loop {
            let payload = format!("msg_{idx:0>3}");
            let res = chan_prd
                .basic_publish(
                    "",
                    queue_name,
                    Default::default(),
                    payload.as_bytes(),
                    Default::default(),
                )
                .await;
            match res {
                Ok(_) => println!("{payload} - sent"),
                Err(err) => eprintln!("failed to send payload: {err}"),
            }
            sleep(Duration::from_secs(1)).await;
            idx += 1;
        }
    });
    let mut sig_int = signal(SignalKind::interrupt())?;
    let mut sig_term = signal(SignalKind::terminate())?;
    let mut sigs = JoinSet::new();
    sigs.spawn(async move {
        sig_int.recv().await;
    });
    sigs.spawn(async move {
        sig_term.recv().await;
    });
    sigs.join_next().await;
    csm.stop().await;
    Ok(())
}