Skip to main content

simple/
simple.rs

1use std::sync::Arc;
2
3use riskless::{
4    batch_coordinator::simple::SimpleBatchCoordinator,
5    consume, flush,
6    messages::{ConsumeRequest, ProduceRequest, ProduceRequestCollection},
7};
8
9#[tokio::main]
10async fn main() {
11    let object_store =
12        Arc::new(object_store::local::LocalFileSystem::new_with_prefix("data").expect(""));
13    let batch_coordinator = Arc::new(SimpleBatchCoordinator::new("index".to_string()));
14
15    let col = ProduceRequestCollection::new();
16
17    col.collect(ProduceRequest {
18        request_id: 1,
19        topic: "example-topic".to_string(),
20        partition: Vec::from(&1_u8.to_be_bytes()),
21        data: "hello".as_bytes().to_vec(),
22    })
23    .expect("");
24
25    let produce_response = flush(col, object_store.clone(), batch_coordinator.clone())
26        .await
27        .expect("");
28
29    assert_eq!(produce_response.len(), 1);
30
31    let consume_response = consume(
32        ConsumeRequest {
33            topic: "example-topic".to_string(),
34            partition: Vec::from(&1_u8.to_be_bytes()),
35            offset: 0,
36            max_partition_fetch_bytes: 0,
37        },
38        object_store,
39        batch_coordinator,
40    )
41    .await;
42
43    let mut resp = consume_response.expect("");
44    let batch = resp.recv().await;
45
46    println!("Batch: {:#?}", batch);
47}