use futures_util::StreamExt;
use pdd::{
pmc::{PmcClient, PmcConsumers},
Config,
};
use tracing::{error, Level};
use tracing_subscriber::FmtSubscriber;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::TRACE)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
loop {
let client = PmcClient::new(Config::from_env()?);
let mut s = client.connect().await?;
msg_handler(&mut s).await;
error!("连接断开,5秒后重新连接");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
async fn msg_handler(s: &mut PmcConsumers) {
while let Some(msg) = s.next().await {
if msg.is_err() {
return;
}
let msg = msg.unwrap();
println!("{:?}", msg);
s.ack(&msg).await;
}
}