use kinbox::config::{CommonConfig, ReadConfig, WriteConfig};
use kinbox::reader::{ReadState, Reader, RetryState};
use rdkafka::Message;
use rdkafka::message::ToBytes;
#[tokio::main]
async fn main() {
println!("{:?}", run().await);
}
async fn run() -> Result<(), kinbox::KafkaError> {
let mut read_config = ReadConfig::new();
read_config.set_brokers(&["192.168.102.8:9092"]);
let mut write_config = WriteConfig::new();
write_config.set_brokers(&["192.168.102.8:9092"]);
let reader = Reader::new(&read_config, &write_config, "default.rpc-test", Some(2))?;
loop {
tokio::select! {
rs = reader.read(|msg| async move {
println!("read msg: {:?}", msg);
if msg.key() == Some("name1".to_bytes()){
ReadState::Success
} else {
ReadState::Fail
}
}) => {
println!("read result: {:?}", rs);
},
rs = reader.retry(|msg| async move {
println!("retry msg: {:?}", msg);
if msg.key() == Some("name1".to_bytes()){
RetryState::Success
} else {
RetryState::DeadLetter
}
}) => {
println!("retry result: {:?}", rs);
},
}
}
}