kmailbox 0.1.6

A simple kafka mailbox
Documentation
#[tokio::main]
async fn main() {
    let name = "test-shared";

    let build = kmailbox::Builder::new(&["192.168.102.8:9092"]);
    let mb = build.shared_mailbox(name, 1);
    if let Err(e) = mb {
        println!("shared_mailbox: {e:?}");
        return;
    }

    let mb = mb.unwrap();
    println!("create: {:?}", mb.create().await);

    let writer = build.writer();
    if let Err(e) = writer {
        println!("writer: {e:?}");
        return;
    }

    let writer = writer.unwrap();
    for i in 0..10 {
        println!(
            "write:{:?}",
            writer
                .write(
                    name,
                    kmailbox::SendMail::new()
                        .key(Some(&format!("key{}", i)))
                        .payload(Some(&format!("hello{}", i)))
                        .add_header("/path", Some(""))
                )
                .await
        );
    }

    let mut handles = vec![];
    for i in 0..2 {
        let reader = build.shared_reader(name).unwrap();
        handles.push(tokio::spawn(async move {
            loop {
                tokio::select! {
                    rs = reader.read(async move |mail| {
                        println!("{} read mail: {:?}", i, mail);
                        (kmailbox::ReadState::Fail, mail)
                    }) => {
                        println!("read: {rs:?}");
                    },

                    rs = reader.retry(async move |mail| {
                        println!("{} retry mail: {:?}", i, mail);
                        if mail.equal_key("key1") {
                            (kmailbox::RetryState::Discard, mail)
                        } else {
                            (kmailbox::RetryState::DeadLetter, mail)
                        }
                    }) => {
                        println!("retry: {rs:?}");
                    },

                    _ = tokio::time::sleep(tokio::time::Duration::from_secs(15)) => {
                        break
                    }
                }
            }
        }));
    }

    for h in handles {
        let _ = h.await;
    }

    println!("delete: {:?}", mb.delete().await);
}