#[tokio::main]
async fn main() {
let name = "test-exclusive";
let build = kmailbox::Builder::new(&["192.168.102.8:9092"]);
let mb = build.exclusive_mailbox(name, 2, 1);
if let Err(e) = mb {
println!("exclusive_mailbox: {e:?}");
return;
}
let mb = mb.unwrap();
println!("create: {:?}", mb.create().await);
let writer = build.writer();
if let Err(e) = writer {
println!("writer: {e:?}");
return;
}
let writer = writer.unwrap();
for i in 0..10 {
println!(
"write:{:?}",
writer
.write(
name,
kmailbox::SendMail::new()
.key(&format!("key{}", i))
.payload(&format!("hello{}", i))
.add_header("/path", Some(""))
)
.await
);
}
let reader1 = build.exclusive_reader(name, 0).unwrap();
let reader2 = build.exclusive_reader(name, 1).unwrap();
let mut handles = vec![];
handles.push(tokio::spawn(async move {
loop {
tokio::select! {
rs = reader1.read(async move |mail| {
println!("read1 mail: {:?}", mail);
(kmailbox::ReadState::Fail, mail)
}) => {
println!("read1: {rs:?}");
},
rs = reader1.retry(async move |mail| {
println!("read1 retry mail: {:?}", mail);
(kmailbox::RetryState::DeadLetter, mail)
}) => {
println!("retry1: {rs:?}");
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(15)) => {
break
}
}
}
}));
handles.push(tokio::spawn(async move {
loop {
tokio::select! {
rs = reader2.read(async move |mail| {
println!("read2 mail: {:?}", mail);
(kmailbox::ReadState::Fail, mail)
}) => {
println!("read2: {rs:?}");
},
rs = reader2.retry(async move |mail| {
println!("read2 retry mail: {:?}", mail);
(kmailbox::RetryState::DeadLetter, mail)
}) => {
println!("retry2: {rs:?}");
},
_ = tokio::time::sleep(tokio::time::Duration::from_secs(15)) => {
break
}
}
}
}));
for h in handles {
let _ = h.await;
}
println!("delete: {:?}", mb.delete().await);
}