riskless 0.7.1

A pure Rust implementation of Diskless Topics
Documentation
use std::{sync::Arc, time::Duration};

use riskless::{
    batch_coordinator::simple::SimpleBatchCoordinator,
    consume, flush,
    messages::{ConsumeRequest, ProduceRequest, ProduceRequestCollection},
};
use tokio::sync::RwLock;

#[tokio::main]
async fn main() {
    let object_store =
        Arc::new(object_store::local::LocalFileSystem::new_with_prefix("data").expect(""));
    let batch_coordinator = Arc::new(SimpleBatchCoordinator::new("index".to_string()));

    let col = Arc::new(RwLock::new(ProduceRequestCollection::new()));

    let col_produce = col.clone();

    let handle_one = tokio::spawn(async move {
        let col_lock = col_produce.read().await;

        col_lock
            .collect(ProduceRequest {
                request_id: 1,
                topic: "example-topic".to_string(),
                partition: Vec::from(&1_u8.to_be_bytes()),
                data: "hello".as_bytes().to_vec(),
            })
            .expect("");
    });

    let col_flush = col.clone();
    let flush_object_store_ref = object_store.clone();
    let flush_batch_coord_ref = batch_coordinator.clone();

    let handle_two = tokio::spawn(async move {
        tokio::time::sleep(Duration::from_millis(20)).await;

        let mut col_lock = col_flush.write().await;

        let new_ref = col_lock.take();

        drop(col_lock);

        let produce_response = flush(new_ref, flush_object_store_ref, flush_batch_coord_ref)
            .await
            .expect("");

        assert_eq!(produce_response.len(), 1);
    });

    let _ = tokio::join!(handle_one, handle_two);

    let consume_response = consume(
        ConsumeRequest {
            topic: "example-topic".to_string(),
            partition: Vec::from(&1_u8.to_be_bytes()),
            offset: 0,
            max_partition_fetch_bytes: 0,
        },
        object_store,
        batch_coordinator,
    )
    .await;

    assert!(consume_response.is_ok());

    let mut resp = consume_response.expect("");
    let batch = resp.recv().await;

    println!("Batch: {:#?}", batch);
}