Skip to main content

Crate crabka_remote_storage

Crate crabka_remote_storage 

Source
Expand description

KIP-405 tiered-storage SPI and reference implementations for Crabka.

This crate is the foundation layer for Crabka’s tiered storage: it defines the two plugin SPIs and the data model exchanged across them, and ships the two reference implementations that the rest of the tiered-storage stack is built and tested against. It mirrors the shapes of Apache Kafka’s storage-api module (org.apache.kafka.server.log.remote.storage).

§What this crate provides

§Boundary with the broker

This crate is the SPI and reference-implementation layer. Broker-specific behavior such as segment-copy scheduling, Fetch remote reads, local-vs-remote retention policy, and topic config parsing lives in the broker.

The SPIs are intentionally synchronous — they mirror Kafka’s blocking RemoteStorageManager / RemoteLogMetadataManager, which the broker drives from a thread pool (the broker wraps calls in spawn_blocking). Keeping them sync keeps this crate free of the async runtime.

§Filesystem-backed remote tier

use bytes::Bytes;
use crabka_remote_storage::{
    IndexType, LocalTieredStorage, LogSegmentData, RemoteLogSegmentId,
    RemoteLogSegmentMetadata, RemoteLogSegmentState, RemoteStorageManager, TopicIdPartition,
};
use std::collections::BTreeMap;
use std::path::PathBuf;
use uuid::Uuid;

let storage = LocalTieredStorage::new(PathBuf::from("/var/lib/crabka-remote"));
let topic_partition = TopicIdPartition::new(Uuid::new_v4(), "orders", 0);
let segment_id = RemoteLogSegmentId::new(topic_partition, Uuid::new_v4());
let mut leader_epochs = BTreeMap::new();
leader_epochs.insert(0, 0);
let metadata = RemoteLogSegmentMetadata::new(
    segment_id,
    0,
    999,
    1_713_000_000_000,
    1,
    1_713_000_000_000,
    1_048_576,
    RemoteLogSegmentState::CopySegmentStarted,
    leader_epochs,
)?;

// The broker fills these paths from a closed local log segment.
let segment = LogSegmentData {
    log_segment: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.log"),
    offset_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.index"),
    time_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.timeindex"),
    transaction_index: None,
    producer_snapshot_index: None,
    leader_epoch_index: Bytes::new(),
};
let _custom_metadata = storage.copy_log_segment_data(&metadata, &segment)?;
let bytes = storage.fetch_index(&metadata, IndexType::Offset)?;

Re-exports§

pub use dump::PartitionDump;
pub use dump::RlmmCacheDump;

Modules§

dump
RlmmCacheDump — a flat, owned snapshot of an InmemoryRemoteLogMetadataManager’s cache, used by the topic-backed manager’s on-disk snapshot. Unlike the live mutation path, importing a dump bypasses lifecycle-transition validation: the dumped states are already the product of valid transitions, so re-applying them through add/update would wrongly reject terminal states.

Structs§

CustomMetadata
Opaque bytes an RemoteStorageManager may return from copy_log_segment_data and have echoed back on every later call for that segment (e.g. an object-store key or version id).
InmemoryRemoteLogMetadataManager
In-memory RemoteLogMetadataManager: one RemoteLogMetadataCache per partition behind a single mutex. Not durable — state is lost on restart — but enforces the full lifecycle state machine, so it is a faithful stand-in for the topic-backed production manager in tests and single-process setups.
LocalTieredStorage
A RemoteStorageManager that keeps offloaded segments on a local filesystem under root.
LogSegmentData
The local files (and in-memory leader-epoch bytes) that make up one log segment to be copied to the remote tier.
RemoteLogSegmentId
Globally-unique identifier for one remote log segment: the owning partition plus a random per-segment UUID.
RemoteLogSegmentMetadata
Metadata describing one segment stored (or being stored) in the remote tier.
RemoteLogSegmentMetadataUpdate
An update to an existing RemoteLogSegmentMetadata’s lifecycle state.
RemotePartitionDeleteMetadata
Metadata describing the deletion lifecycle of a partition’s remote data.
S3Config
Connection / bucket parameters for S3RemoteStorage::from_s3_config.
S3RemoteStorage
A RemoteStorageManager backed by any S3-compatible object store.
TopicIdPartition
A partition addressed by its stable topic UUID (plus the topic name for diagnostics).

Enums§

IndexType
The kinds of index a segment carries alongside its .log data.
RemoteLogSegmentState
Lifecycle state of a remote log segment.
RemotePartitionDeleteState
Lifecycle state of a remote partition deletion.
RemoteStorageError
Errors raised by RemoteStorageManager and RemoteLogMetadataManager implementations.

Constants§

DEFAULT_MULTIPART_CHUNK_SIZE
Default per-part size for multipart uploads. 16 MiB. AWS requires every part except the last to be at least 5 MiB and caps the total parts at 10 000, so 16 MiB scales to a ~160 GiB segment before bumping into the part-count limit — far beyond any realistic Kafka segment.
DEFAULT_MULTIPART_THRESHOLD
Default threshold above which S3RemoteStorage::put_path switches from a single PUT to a streaming multipart upload. 100 MiB. AWS’s hard cap on single-PUT objects is 5 GiB; defaulting well below that keeps us comfortably inside the single-PUT regime for the common segment sizes (Kafka’s default segment.bytes is 1 GiB) while ensuring segments at the upper end (or operator-bumped segment.bytes) never silently exceed the cap.

Traits§

RemoteLogMetadataManager
SPI for the store that holds metadata about remote log segments.
RemoteStorageManager
SPI for the remote object store that holds offloaded segment data.