use std::time::Duration;
use lapin::Connection;
use stream_consumer_task::StreamConsumerTask;
use tokio::{
select,
signal::unix::{signal, SignalKind},
spawn,
task::JoinSet,
time::sleep,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let queue_name = "test";
let conn = Connection::connect("amqp://127.0.0.1:5672/%2f", Default::default()).await?;
let chan_csm = conn.create_channel().await?;
chan_csm
.queue_declare(queue_name, Default::default(), Default::default())
.await?;
let csm = chan_csm
.basic_consume(queue_name, "", Default::default(), Default::default())
.await?;
let csm = StreamConsumerTask::start(csm, |delivery, mut stop_rx| async move {
match delivery {
Ok(delivery) => {
let res: anyhow::Result<()> = async {
let payload = String::from_utf8(delivery.data.clone())?;
select! {
_ = sleep(Duration::from_secs(5)) => {
println!("{payload} - processed");
delivery.ack(Default::default()).await?;
}
_ = stop_rx.changed() => {
println!("{payload} - stopped");
}
}
Ok(())
}
.await;
if let Err(err) = res {
eprintln!("failed to handle delivery: {err}");
}
}
Err(err) => {
eprintln!("failed to handle delivery: {err}");
}
}
});
let chan_prd = conn.create_channel().await?;
let mut idx = 0;
let _task = spawn(async move {
loop {
let payload = format!("msg_{idx:0>3}");
let res = chan_prd
.basic_publish(
"",
queue_name,
Default::default(),
payload.as_bytes(),
Default::default(),
)
.await;
match res {
Ok(_) => println!("{payload} - sent"),
Err(err) => eprintln!("failed to send payload: {err}"),
}
sleep(Duration::from_secs(1)).await;
idx += 1;
}
});
let mut sig_int = signal(SignalKind::interrupt())?;
let mut sig_term = signal(SignalKind::terminate())?;
let mut sigs = JoinSet::new();
sigs.spawn(async move {
sig_int.recv().await;
});
sigs.spawn(async move {
sig_term.recv().await;
});
sigs.join_next().await;
csm.stop().await;
Ok(())
}