kmailbox 0.1.6

A simple kafka mailbox
Documentation
#[tokio::main]
async fn main() {
    let name = "test-exclusive";

    let build = kmailbox::Builder::new(&["192.168.102.8:9092"]);
    let mb = build.exclusive_mailbox(name, 2, 1);
    if let Err(e) = mb {
        println!("exclusive_mailbox: {e:?}");
        return;
    }

    let mb = mb.unwrap();
    println!("create: {:?}", mb.create().await);

    let writer = build.writer();
    if let Err(e) = writer {
        println!("writer: {e:?}");
        return;
    }

    let writer = writer.unwrap();
    for i in 0..10 {
        println!(
            "write:{:?}",
            writer
                .write(
                    name,
                    kmailbox::SendMail::new()
                        .key(Some(&format!("key{}", i)))
                        .payload(Some(&format!("hello{}", i)))
                        .add_header("/path", Some(""))
                        .partition(Some(0))
                )
                .await
        );
    }

    let reader1 = build.exclusive_reader(name, 0).unwrap();
    let reader2 = build.exclusive_reader(name, 1).unwrap();

    let mut handles = vec![];
    handles.push(tokio::spawn(async move {
        loop {
            tokio::select! {
                rs = reader1.read(async move |mail| {
                    println!("read1 mail: {:?}", mail);
                    (kmailbox::ReadState::Fail, mail)
                }) => {
                    println!("read1: {rs:?}");
                },

                rs = reader1.retry(async move |mail| {
                    println!("read1 retry mail: {:?}", mail);
                    (kmailbox::RetryState::DeadLetter, mail)
                }) => {
                    println!("retry1: {rs:?}");
                },

                _ = tokio::time::sleep(tokio::time::Duration::from_secs(100)) => {
                    break
                }
            }
        }
    }));

    handles.push(tokio::spawn(async move {
        loop {
            tokio::select! {
                rs = reader2.read(async move |mail| {
                    println!("read2 mail: {:?}", mail);
                    (kmailbox::ReadState::Fail, mail)
                }) => {
                    println!("read2: {rs:?}");
                },

                rs = reader2.retry(async move |mail| {
                    println!("read2 retry mail: {:?}", mail);
                    (kmailbox::RetryState::DeadLetter, mail)
                }) => {
                    println!("retry2: {rs:?}");
                },

                _ = tokio::time::sleep(tokio::time::Duration::from_secs(100)) => {
                    break
                }
            }
        }
    }));

    for h in handles {
        let _ = h.await;
    }

    println!("delete: {:?}", mb.delete().await);
}