#[tokio::main]
async fn main() {
let name = "test-shared";
let build = kmailbox::Builder::new(&["192.168.102.8:9092"]);
let mb = build.shared_mailbox(name, 1);
if let Err(e) = mb {
println!("shared_mailbox: {e:?}");
return;
}
let mb = mb.unwrap();
println!("create: {:?}", mb.create().await);
let writer = build.writer();
if let Err(e) = writer {
println!("writer: {e:?}");
return;
}
let writer = writer.unwrap();
for i in 0..10 {
println!(
"write:{:?}",
writer
.write(
name,
kmailbox::SendMail::new()
.key(Some(&format!("key{}", i)))
.payload(Some(&format!("hello{}", i)))
.add_header("/path", Some(""))
)
.await
);
}
let mut handles = vec![];
for i in 0..2 {
let reader = build.shared_reader(name).unwrap();
handles.push(tokio::spawn(async move {
loop {
tokio::select! {
rs = reader.read(async move |mail| {
println!("{} read mail: {:?}", i, mail);
(kmailbox::ReadState::Fail, mail)
}) => {
println!("read: {rs:?}");
},
rs = reader.retry(async move |mail| {
println!("{} retry mail: {:?}", i, mail);
if mail.equal_key("key1") {
(kmailbox::RetryState::Discard, mail)
} else {
(kmailbox::RetryState::DeadLetter, mail)
}
}) => {
println!("retry: {rs:?}");
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(15)) => {
break
}
}
}
}));
}
for h in handles {
let _ = h.await;
}
println!("delete: {:?}", mb.delete().await);
}