1use std::sync::Arc;
2
3use riskless::{
4 batch_coordinator::simple::SimpleBatchCoordinator,
5 consume, flush,
6 messages::{ConsumeRequest, ProduceRequest, ProduceRequestCollection},
7};
8
9#[tokio::main]
10async fn main() {
11 let object_store =
12 Arc::new(object_store::local::LocalFileSystem::new_with_prefix("data").expect(""));
13 let batch_coordinator = Arc::new(SimpleBatchCoordinator::new("index".to_string()));
14
15 let col = ProduceRequestCollection::new();
16
17 col.collect(ProduceRequest {
18 request_id: 1,
19 topic: "example-topic".to_string(),
20 partition: Vec::from(&1_u8.to_be_bytes()),
21 data: "hello".as_bytes().to_vec(),
22 })
23 .expect("");
24
25 let produce_response = flush(col, object_store.clone(), batch_coordinator.clone())
26 .await
27 .expect("");
28
29 assert_eq!(produce_response.len(), 1);
30
31 let consume_response = consume(
32 ConsumeRequest {
33 topic: "example-topic".to_string(),
34 partition: Vec::from(&1_u8.to_be_bytes()),
35 offset: 0,
36 max_partition_fetch_bytes: 0,
37 },
38 object_store,
39 batch_coordinator,
40 )
41 .await;
42
43 let mut resp = consume_response.expect("");
44 let batch = resp.recv().await;
45
46 println!("Batch: {:#?}", batch);
47}