1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
//! KIP-405 tiered-storage SPI and reference implementations for Crabka.
//!
//! This crate is the foundation layer for Crabka's tiered storage: it
//! defines the two plugin SPIs and the data model exchanged across them,
//! and ships the two reference implementations that the rest of the
//! tiered-storage stack is built and tested against. It mirrors the shapes
//! of Apache Kafka's `storage-api` module
//! (`org.apache.kafka.server.log.remote.storage`).
//!
//! ## What this crate provides
//!
//! - [`RemoteStorageManager`] — copy / fetch / delete of segment data and
//! indexes to and from the remote tier.
//! - [`RemoteLogMetadataManager`] — persistence + querying of
//! remote-segment metadata, with a strict lifecycle state machine.
//! - The data model: [`TopicIdPartition`], [`RemoteLogSegmentId`],
//! [`RemoteLogSegmentMetadata`] / [`RemoteLogSegmentMetadataUpdate`],
//! [`RemoteLogSegmentState`], [`LogSegmentData`], [`IndexType`],
//! [`CustomMetadata`], and the partition-delete lifecycle
//! ([`RemotePartitionDeleteMetadata`] / [`RemotePartitionDeleteState`]).
//! - [`LocalTieredStorage`] — a filesystem [`RemoteStorageManager`].
//! - [`InmemoryRemoteLogMetadataManager`] — a process-memory
//! [`RemoteLogMetadataManager`].
//!
//! ## Boundary with the broker
//!
//! This crate is the SPI and reference-implementation layer. Broker-specific
//! behavior such as segment-copy scheduling, `Fetch` remote reads,
//! local-vs-remote retention policy, and topic config parsing lives in the
//! broker.
//!
//! The SPIs are intentionally **synchronous** — they mirror Kafka's
//! blocking `RemoteStorageManager` / `RemoteLogMetadataManager`, which the
//! broker drives from a thread pool (the broker wraps calls in
//! `spawn_blocking`). Keeping them sync keeps this crate free of the async
//! runtime.
//!
//! ## Filesystem-backed remote tier
//!
//! ```no_run
//! use bytes::Bytes;
//! use crabka_remote_storage::{
//! IndexType, LocalTieredStorage, LogSegmentData, RemoteLogSegmentId,
//! RemoteLogSegmentMetadata, RemoteLogSegmentState, RemoteStorageManager, TopicIdPartition,
//! };
//! use std::collections::BTreeMap;
//! use std::path::PathBuf;
//! use uuid::Uuid;
//!
//! # fn run() -> Result<(), Box<dyn std::error::Error>> {
//! let storage = LocalTieredStorage::new(PathBuf::from("/var/lib/crabka-remote"));
//! let topic_partition = TopicIdPartition::new(Uuid::new_v4(), "orders", 0);
//! let segment_id = RemoteLogSegmentId::new(topic_partition, Uuid::new_v4());
//! let mut leader_epochs = BTreeMap::new();
//! leader_epochs.insert(0, 0);
//! let metadata = RemoteLogSegmentMetadata::new(
//! segment_id,
//! 0,
//! 999,
//! 1_713_000_000_000,
//! 1,
//! 1_713_000_000_000,
//! 1_048_576,
//! RemoteLogSegmentState::CopySegmentStarted,
//! leader_epochs,
//! )?;
//!
//! // The broker fills these paths from a closed local log segment.
//! let segment = LogSegmentData {
//! log_segment: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.log"),
//! offset_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.index"),
//! time_index: PathBuf::from("/var/lib/crabka/orders-0/00000000000000000000.timeindex"),
//! transaction_index: None,
//! producer_snapshot_index: None,
//! leader_epoch_index: Bytes::new(),
//! };
//! let _custom_metadata = storage.copy_log_segment_data(&metadata, &segment)?;
//! let bytes = storage.fetch_index(&metadata, IndexType::Offset)?;
//! # let _ = bytes;
//! # Ok(())
//! # }
//! ```
pub use ;
pub use RemoteStorageError;
pub use InmemoryRemoteLogMetadataManager;
pub use LocalTieredStorage;
pub use ;
pub use RemoteLogMetadataManager;
pub use ;
pub use ;