Crate riskless

Source
Expand description

Riskless

An implementation of KIP-1150 - Diskless Topics as a reuseable library for general implementation of distributed logs on object storage.

example usage:


let object_store = Arc::new(object_store::local::LocalFileSystem::new_with_prefix("data").unwrap());
let batch_coordinator = Arc::new(SimpleBatchCoordinator::new("index".to_string()));

let collection = ProduceRequestCollection::new();

collection.collect(
    ProduceRequest {
        request_id: 1,
        topic: "example-topic".to_string(),
        partition: 1,
        data: "hello".as_bytes().to_vec(),
    },
)
.unwrap();

let produce_response = flush(collection, object_store.clone(), batch_coordinator.clone())
    .await
    .unwrap();

assert_eq!(produce_response.len(), 1);

let consume_response = consume(
    ConsumeRequest {
        topic: "example-topic".to_string(),
        partition: 1,
        offset: 0,
        max_partition_fetch_bytes: 0,
    },
    object_store,
    batch_coordinator,
)
.await;

let mut resp = consume_response.unwrap();
let batch = resp.recv().await;

println!("Batch: {:#?}", batch);

Re-exports§

pub use object_store;

Modules§

batch_coordinator
This entire interface is generated directly from the underlying KIP-1164 interface found here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=350783984#KIP1164:TopicBasedBatchCoordinator-BatchCoordinatorpluggableinterface
error
Errors.
messages
General data structures for communicating data.

Functions§

consume
Handles a consume request by retrieving messages from object storage.
delete_record
Delete a specific record.
flush
Flush the ProduceRequestCollection to the ObjectStore/BatchCoordinator.
scan_and_permanently_delete_records
As Records become “soft” deleted over time, the underlying storage mechanism may have clusters that do not have any references to live records after a certain amount of time.