Expand description
KIP-405 tiered-storage SPI and reference implementations for Crabka.
This crate is the foundation layer for Crabka’s tiered storage: it
defines the two plugin SPIs and the data model exchanged across them,
and ships the two reference implementations that the rest of the
tiered-storage stack is built and tested against. It mirrors the shapes
of Apache Kafka’s storage-api module
(org.apache.kafka.server.log.remote.storage).
§What this crate provides
RemoteStorageManager— copy / fetch / delete of segment data and indexes to and from the remote tier.RemoteLogMetadataManager— persistence + querying of remote-segment metadata, with a strict lifecycle state machine.- The data model:
TopicIdPartition,RemoteLogSegmentId,RemoteLogSegmentMetadata/RemoteLogSegmentMetadataUpdate,RemoteLogSegmentState,LogSegmentData,IndexType,CustomMetadata, and the partition-delete lifecycle (RemotePartitionDeleteMetadata/RemotePartitionDeleteState). LocalTieredStorage— a filesystemRemoteStorageManager.InmemoryRemoteLogMetadataManager— a process-memoryRemoteLogMetadataManager.
§Boundary with the broker
This crate is the SPI and reference-implementation layer. Broker-specific
behavior such as segment-copy scheduling, Fetch remote reads,
local-vs-remote retention policy, and topic config parsing lives in the
broker.
The SPIs are intentionally synchronous — they mirror Kafka’s
blocking RemoteStorageManager / RemoteLogMetadataManager, which the
broker drives from a thread pool (the broker wraps calls in
spawn_blocking). Keeping them sync keeps this crate free of the async
runtime.
§Filesystem-backed remote tier
use bytes::Bytes;
use crabka_remote_storage::{
IndexType, LocalTieredStorage, LogSegmentData, RemoteLogSegmentId,
RemoteLogSegmentMetadata, RemoteLogSegmentState, RemoteStorageManager, TopicIdPartition,
};
use std::collections::BTreeMap;
use std::path::PathBuf;
use uuid::Uuid;
let storage = LocalTieredStorage::new(PathBuf::from("/var/lib/crabka-remote"));
let topic_partition = TopicIdPartition::new(Uuid::new_v4(), "orders", 0);
let segment_id = RemoteLogSegmentId::new(topic_partition, Uuid::new_v4());
let mut leader_epochs = BTreeMap::new();
leader_epochs.insert(0, 0);
let metadata = RemoteLogSegmentMetadata::new(
segment_id,
0,
999,
1_713_000_000_000,
1,
1_713_000_000_000,
1_048_576,
RemoteLogSegmentState::CopySegmentStarted,
leader_epochs,
)?;
// The broker fills these paths from a closed local log segment.
let segment = LogSegmentData {
log_segment: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.log"),
offset_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.index"),
time_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.timeindex"),
transaction_index: None,
producer_snapshot_index: None,
leader_epoch_index: Bytes::new(),
};
let _custom_metadata = storage.copy_log_segment_data(&metadata, &segment)?;
let bytes = storage.fetch_index(&metadata, IndexType::Offset)?;Re-exports§
pub use dump::PartitionDump;pub use dump::RlmmCacheDump;
Modules§
- dump
RlmmCacheDump— a flat, owned snapshot of anInmemoryRemoteLogMetadataManager’s cache, used by the topic-backed manager’s on-disk snapshot. Unlike the live mutation path, importing a dump bypasses lifecycle-transition validation: the dumped states are already the product of valid transitions, so re-applying them throughadd/updatewould wrongly reject terminal states.
Structs§
- Custom
Metadata - Opaque bytes an
RemoteStorageManagermay return fromcopy_log_segment_dataand have echoed back on every later call for that segment (e.g. an object-store key or version id). - Inmemory
Remote LogMetadata Manager - In-memory
RemoteLogMetadataManager: oneRemoteLogMetadataCacheper partition behind a single mutex. Not durable — state is lost on restart — but enforces the full lifecycle state machine, so it is a faithful stand-in for the topic-backed production manager in tests and single-process setups. - Local
Tiered Storage - A
RemoteStorageManagerthat keeps offloaded segments on a local filesystem underroot. - LogSegment
Data - The local files (and in-memory leader-epoch bytes) that make up one log segment to be copied to the remote tier.
- Remote
LogSegment Id - Globally-unique identifier for one remote log segment: the owning partition plus a random per-segment UUID.
- Remote
LogSegment Metadata - Metadata describing one segment stored (or being stored) in the remote tier.
- Remote
LogSegment Metadata Update - An update to an existing
RemoteLogSegmentMetadata’s lifecycle state. - Remote
Partition Delete Metadata - Metadata describing the deletion lifecycle of a partition’s remote data.
- S3Config
- Connection / bucket parameters for
S3RemoteStorage::from_s3_config. - S3Remote
Storage - A
RemoteStorageManagerbacked by any S3-compatible object store. - Topic
IdPartition - A partition addressed by its stable topic UUID (plus the topic name for diagnostics).
Enums§
- Index
Type - The kinds of index a segment carries alongside its
.logdata. - Remote
LogSegment State - Lifecycle state of a remote log segment.
- Remote
Partition Delete State - Lifecycle state of a remote partition deletion.
- Remote
Storage Error - Errors raised by
RemoteStorageManagerandRemoteLogMetadataManagerimplementations.
Constants§
- DEFAULT_
MULTIPART_ CHUNK_ SIZE - Default per-part size for multipart uploads. 16 MiB. AWS requires every part except the last to be at least 5 MiB and caps the total parts at 10 000, so 16 MiB scales to a ~160 GiB segment before bumping into the part-count limit — far beyond any realistic Kafka segment.
- DEFAULT_
MULTIPART_ THRESHOLD - Default threshold above which
S3RemoteStorage::put_pathswitches from a single PUT to a streaming multipart upload. 100 MiB. AWS’s hard cap on single-PUT objects is 5 GiB; defaulting well below that keeps us comfortably inside the single-PUT regime for the common segment sizes (Kafka’s defaultsegment.bytesis 1 GiB) while ensuring segments at the upper end (or operator-bumpedsegment.bytes) never silently exceed the cap.
Traits§
- Remote
LogMetadata Manager - SPI for the store that holds metadata about remote log segments.
- Remote
Storage Manager - SPI for the remote object store that holds offloaded segment data.