Skip to main content

crabka_remote_storage/
metadata.rs

1//! The data model exchanged across the two tiered-storage SPIs.
2//!
3//! Shapes mirror Apache Kafka's `storage-api`
4//! (`org.apache.kafka.server.log.remote.storage`): [`TopicIdPartition`],
5//! [`RemoteLogSegmentId`], [`RemoteLogSegmentMetadata`] +
6//! [`RemoteLogSegmentMetadataUpdate`], the [`RemoteLogSegmentState`]
7//! lifecycle, and the partition-delete lifecycle
8//! ([`RemotePartitionDeleteMetadata`] / [`RemotePartitionDeleteState`]).
9
10use std::collections::BTreeMap;
11use std::hash::{Hash, Hasher};
12
13use uuid::Uuid;
14
15use crate::error::RemoteStorageError;
16
17/// A partition addressed by its stable topic UUID (plus the topic name for
18/// diagnostics).
19///
20/// Equality and hashing are by `topic_id` + `partition` only — the topic
21/// name is informational and a topic's id is its identity, matching
22/// Kafka's `TopicIdPartition`.
23#[derive(Debug, Clone)]
24pub struct TopicIdPartition {
25    /// Stable topic UUID, as assigned at topic creation.
26    pub topic_id: Uuid,
27    /// Topic name (informational; not part of identity).
28    pub topic: String,
29    /// Partition index.
30    pub partition: i32,
31}
32
33impl TopicIdPartition {
34    /// Construct a [`TopicIdPartition`].
35    #[must_use]
36    pub fn new(topic_id: Uuid, topic: impl Into<String>, partition: i32) -> Self {
37        Self {
38            topic_id,
39            topic: topic.into(),
40            partition,
41        }
42    }
43}
44
45impl PartialEq for TopicIdPartition {
46    fn eq(&self, other: &Self) -> bool {
47        self.topic_id == other.topic_id && self.partition == other.partition
48    }
49}
50
51impl Eq for TopicIdPartition {}
52
53impl Hash for TopicIdPartition {
54    fn hash<H: Hasher>(&self, state: &mut H) {
55        self.topic_id.hash(state);
56        self.partition.hash(state);
57    }
58}
59
60/// Globally-unique identifier for one remote log segment: the owning
61/// partition plus a random per-segment UUID.
62#[derive(Debug, Clone, PartialEq, Eq, Hash)]
63pub struct RemoteLogSegmentId {
64    /// The partition this segment belongs to.
65    pub topic_id_partition: TopicIdPartition,
66    /// Random per-segment UUID.
67    pub id: Uuid,
68}
69
70impl RemoteLogSegmentId {
71    /// Construct a [`RemoteLogSegmentId`] from an explicit UUID.
72    #[must_use]
73    pub fn new(topic_id_partition: TopicIdPartition, id: Uuid) -> Self {
74        Self {
75            topic_id_partition,
76            id,
77        }
78    }
79}
80
81/// Lifecycle state of a remote log segment.
82///
83/// Valid transitions (see [`RemoteLogSegmentState::is_valid_transition`]):
84///
85/// ```text
86/// CopySegmentStarted ──► CopySegmentFinished ──► DeleteSegmentStarted ──► DeleteSegmentFinished
87///         └───────────────────────────────────►┘
88/// ```
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub enum RemoteLogSegmentState {
91    /// A copy to the remote tier has begun but not finished. The starting
92    /// state of every segment.
93    CopySegmentStarted,
94    /// The copy finished; the segment is durable in the remote tier and
95    /// readable.
96    CopySegmentFinished,
97    /// Deletion from the remote tier has begun.
98    DeleteSegmentStarted,
99    /// The segment has been fully removed from the remote tier.
100    DeleteSegmentFinished,
101}
102
103impl RemoteLogSegmentState {
104    /// `true` if a segment currently in `self` may transition to `target`.
105    ///
106    /// A same-state "transition" is not valid (callers treat it as a
107    /// no-op / duplicate, not an advance).
108    #[must_use]
109    pub fn is_valid_transition(self, target: Self) -> bool {
110        use RemoteLogSegmentState::{
111            CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
112        };
113        matches!(
114            (self, target),
115            (
116                CopySegmentStarted,
117                CopySegmentFinished | DeleteSegmentStarted
118            ) | (CopySegmentFinished, DeleteSegmentStarted)
119                | (DeleteSegmentStarted, DeleteSegmentFinished)
120        )
121    }
122}
123
124/// Opaque bytes an [`RemoteStorageManager`](crate::RemoteStorageManager)
125/// may return from `copy_log_segment_data` and have echoed back on every
126/// later call for that segment (e.g. an object-store key or version id).
127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct CustomMetadata(pub Vec<u8>);
129
130/// Metadata describing one segment stored (or being stored) in the remote
131/// tier.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct RemoteLogSegmentMetadata {
134    remote_log_segment_id: RemoteLogSegmentId,
135    start_offset: i64,
136    end_offset: i64,
137    max_timestamp_ms: i64,
138    broker_id: i32,
139    event_timestamp_ms: i64,
140    segment_size_in_bytes: i32,
141    custom_metadata: Option<CustomMetadata>,
142    state: RemoteLogSegmentState,
143    segment_leader_epochs: BTreeMap<i32, i64>,
144    /// KIP-405 `txnIndexEmpty`: `true` when the segment carries no transaction
145    /// index. Serialized as tagged field (tag 0) in the JVM record format.
146    /// Defaults to `false`.
147    txn_index_empty: bool,
148}
149
150impl RemoteLogSegmentMetadata {
151    /// Construct a [`RemoteLogSegmentMetadata`].
152    ///
153    /// # Errors
154    ///
155    /// Returns [`RemoteStorageError::InvalidArgument`] when
156    /// `segment_leader_epochs` is empty, `end_offset < start_offset`, or
157    /// `segment_size_in_bytes < 0`.
158    #[allow(clippy::too_many_arguments)]
159    pub fn new(
160        remote_log_segment_id: RemoteLogSegmentId,
161        start_offset: i64,
162        end_offset: i64,
163        max_timestamp_ms: i64,
164        broker_id: i32,
165        event_timestamp_ms: i64,
166        segment_size_in_bytes: i32,
167        state: RemoteLogSegmentState,
168        segment_leader_epochs: BTreeMap<i32, i64>,
169    ) -> Result<Self, RemoteStorageError> {
170        if segment_leader_epochs.is_empty() {
171            return Err(RemoteStorageError::InvalidArgument(
172                "segment_leader_epochs must not be empty".into(),
173            ));
174        }
175        if end_offset < start_offset {
176            return Err(RemoteStorageError::InvalidArgument(format!(
177                "end_offset ({end_offset}) < start_offset ({start_offset})"
178            )));
179        }
180        if segment_size_in_bytes < 0 {
181            return Err(RemoteStorageError::InvalidArgument(format!(
182                "segment_size_in_bytes ({segment_size_in_bytes}) must be >= 0"
183            )));
184        }
185        Ok(Self {
186            remote_log_segment_id,
187            start_offset,
188            end_offset,
189            max_timestamp_ms,
190            broker_id,
191            event_timestamp_ms,
192            segment_size_in_bytes,
193            custom_metadata: None,
194            state,
195            segment_leader_epochs,
196            txn_index_empty: false,
197        })
198    }
199
200    /// Apply a [`RemoteLogSegmentMetadataUpdate`], returning the updated
201    /// copy. The update advances `state`, refreshes `event_timestamp_ms`
202    /// and `broker_id`, and replaces `custom_metadata` when the update
203    /// carries `Some`.
204    ///
205    /// # Errors
206    ///
207    /// Returns [`RemoteStorageError::InvalidArgument`] if the update's
208    /// segment id does not match, or
209    /// [`RemoteStorageError::InvalidSegmentTransition`] if the state change
210    /// is not permitted from the current state.
211    pub fn with_update(
212        &self,
213        update: &RemoteLogSegmentMetadataUpdate,
214    ) -> Result<Self, RemoteStorageError> {
215        if update.remote_log_segment_id != self.remote_log_segment_id {
216            return Err(RemoteStorageError::InvalidArgument(
217                "update segment id does not match metadata segment id".into(),
218            ));
219        }
220        if !self.state.is_valid_transition(update.state) {
221            return Err(RemoteStorageError::InvalidSegmentTransition {
222                id: self.remote_log_segment_id.clone(),
223                from: self.state,
224                to: update.state,
225            });
226        }
227        let mut next = self.clone();
228        next.state = update.state;
229        next.event_timestamp_ms = update.event_timestamp_ms;
230        next.broker_id = update.broker_id;
231        if update.custom_metadata.is_some() {
232            next.custom_metadata.clone_from(&update.custom_metadata);
233        }
234        Ok(next)
235    }
236
237    /// The segment's unique id.
238    #[must_use]
239    pub fn remote_log_segment_id(&self) -> &RemoteLogSegmentId {
240        &self.remote_log_segment_id
241    }
242
243    /// First offset (inclusive) covered by this segment.
244    #[must_use]
245    pub fn start_offset(&self) -> i64 {
246        self.start_offset
247    }
248
249    /// Last offset (inclusive) covered by this segment.
250    #[must_use]
251    pub fn end_offset(&self) -> i64 {
252        self.end_offset
253    }
254
255    /// Highest record timestamp in this segment.
256    #[must_use]
257    pub fn max_timestamp_ms(&self) -> i64 {
258        self.max_timestamp_ms
259    }
260
261    /// Id of the broker that produced this metadata.
262    #[must_use]
263    pub fn broker_id(&self) -> i32 {
264        self.broker_id
265    }
266
267    /// Wall-clock time the latest event for this segment was created.
268    #[must_use]
269    pub fn event_timestamp_ms(&self) -> i64 {
270        self.event_timestamp_ms
271    }
272
273    /// Size of the `.log` data in bytes.
274    #[must_use]
275    pub fn segment_size_in_bytes(&self) -> i32 {
276        self.segment_size_in_bytes
277    }
278
279    /// Opaque metadata the [`RemoteStorageManager`](crate::RemoteStorageManager)
280    /// attached at copy time, if any.
281    #[must_use]
282    pub fn custom_metadata(&self) -> Option<&CustomMetadata> {
283        self.custom_metadata.as_ref()
284    }
285
286    /// Current lifecycle state.
287    #[must_use]
288    pub fn state(&self) -> RemoteLogSegmentState {
289        self.state
290    }
291
292    /// Map of leader epoch → first offset that epoch contributed to this
293    /// segment.
294    #[must_use]
295    pub fn segment_leader_epochs(&self) -> &BTreeMap<i32, i64> {
296        &self.segment_leader_epochs
297    }
298
299    /// Attach custom metadata (builder-style; used by RSM copy paths that
300    /// produce a key before recording `CopySegmentFinished`).
301    #[must_use]
302    pub fn with_custom_metadata(mut self, custom: CustomMetadata) -> Self {
303        self.custom_metadata = Some(custom);
304        self
305    }
306
307    /// `true` if the segment has no transaction index (KIP-405 `txnIndexEmpty`).
308    /// Defaults to `false`. Serialized as the JVM record's tagged field (tag 0).
309    #[must_use]
310    pub fn txn_index_empty(&self) -> bool {
311        self.txn_index_empty
312    }
313
314    /// Builder-style setter for [`Self::txn_index_empty`].
315    #[must_use]
316    pub fn with_txn_index_empty(mut self, empty: bool) -> Self {
317        self.txn_index_empty = empty;
318        self
319    }
320}
321
322/// An update to an existing [`RemoteLogSegmentMetadata`]'s lifecycle state.
323#[derive(Debug, Clone, PartialEq, Eq)]
324pub struct RemoteLogSegmentMetadataUpdate {
325    /// The segment being updated.
326    pub remote_log_segment_id: RemoteLogSegmentId,
327    /// Wall-clock time of this update.
328    pub event_timestamp_ms: i64,
329    /// New custom metadata, when the update introduces or changes it.
330    pub custom_metadata: Option<CustomMetadata>,
331    /// The new lifecycle state.
332    pub state: RemoteLogSegmentState,
333    /// Broker that produced this update.
334    pub broker_id: i32,
335}
336
337/// Lifecycle state of a remote *partition* deletion.
338///
339/// ```text
340/// DeletePartitionMarked ──► DeletePartitionStarted ──► DeletePartitionFinished
341/// ```
342#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
343pub enum RemotePartitionDeleteState {
344    /// The partition has been marked for deletion of all its remote
345    /// segments.
346    DeletePartitionMarked,
347    /// Deletion of the partition's remote segments has begun.
348    DeletePartitionStarted,
349    /// All remote segments for the partition have been deleted.
350    DeletePartitionFinished,
351}
352
353impl RemotePartitionDeleteState {
354    /// `true` if a partition currently in `from` (or never marked, when
355    /// `from` is `None`) may transition to `target`.
356    #[must_use]
357    pub fn is_valid_transition(from: Option<Self>, target: Self) -> bool {
358        use RemotePartitionDeleteState::{
359            DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
360        };
361        matches!(
362            (from, target),
363            (None, DeletePartitionMarked)
364                | (Some(DeletePartitionMarked), DeletePartitionStarted)
365                | (Some(DeletePartitionStarted), DeletePartitionFinished)
366        )
367    }
368}
369
370/// Metadata describing the deletion lifecycle of a partition's remote data.
371#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct RemotePartitionDeleteMetadata {
373    /// The partition being deleted from the remote tier.
374    pub topic_id_partition: TopicIdPartition,
375    /// Current deletion state.
376    pub state: RemotePartitionDeleteState,
377    /// Wall-clock time of this event.
378    pub event_timestamp_ms: i64,
379    /// Broker that produced this metadata.
380    pub broker_id: i32,
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use assert2::assert;
387    use std::collections::HashSet;
388
389    fn tp() -> TopicIdPartition {
390        TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
391    }
392
393    fn seg_id() -> RemoteLogSegmentId {
394        RemoteLogSegmentId::new(tp(), Uuid::from_u128(99))
395    }
396
397    fn epochs() -> BTreeMap<i32, i64> {
398        BTreeMap::from([(0, 0)])
399    }
400
401    #[test]
402    fn topic_id_partition_identity_ignores_name() {
403        let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 3);
404        let b = TopicIdPartition::new(Uuid::from_u128(7), "renamed", 3);
405        assert!(a == b);
406        let set: HashSet<_> = [a, b].into_iter().collect();
407        assert!(set.len() == 1, "same id+partition must collapse in a set");
408    }
409
410    #[test]
411    fn topic_id_partition_distinct_partitions_differ() {
412        let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 0);
413        let b = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 1);
414        assert!(a != b);
415    }
416
417    #[test]
418    fn segment_state_valid_transitions() {
419        use RemoteLogSegmentState::{
420            CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
421        };
422        assert!(CopySegmentStarted.is_valid_transition(CopySegmentFinished));
423        assert!(CopySegmentStarted.is_valid_transition(DeleteSegmentStarted));
424        assert!(CopySegmentFinished.is_valid_transition(DeleteSegmentStarted));
425        assert!(DeleteSegmentStarted.is_valid_transition(DeleteSegmentFinished));
426    }
427
428    #[test]
429    fn segment_state_invalid_transitions() {
430        use RemoteLogSegmentState::{
431            CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
432        };
433        // No backward / skipping / same-state transitions.
434        assert!(!CopySegmentStarted.is_valid_transition(CopySegmentStarted));
435        assert!(!CopySegmentStarted.is_valid_transition(DeleteSegmentFinished));
436        assert!(!CopySegmentFinished.is_valid_transition(CopySegmentStarted));
437        assert!(!CopySegmentFinished.is_valid_transition(CopySegmentFinished));
438        assert!(!DeleteSegmentStarted.is_valid_transition(CopySegmentFinished));
439        assert!(!DeleteSegmentFinished.is_valid_transition(DeleteSegmentStarted));
440    }
441
442    #[test]
443    fn metadata_rejects_empty_leader_epochs() {
444        let err = RemoteLogSegmentMetadata::new(
445            seg_id(),
446            0,
447            10,
448            123,
449            1,
450            456,
451            1024,
452            RemoteLogSegmentState::CopySegmentStarted,
453            BTreeMap::new(),
454        )
455        .unwrap_err();
456        assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
457    }
458
459    #[test]
460    fn metadata_rejects_end_before_start() {
461        let err = RemoteLogSegmentMetadata::new(
462            seg_id(),
463            10,
464            5,
465            123,
466            1,
467            456,
468            1024,
469            RemoteLogSegmentState::CopySegmentStarted,
470            epochs(),
471        )
472        .unwrap_err();
473        assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
474    }
475
476    #[test]
477    fn with_update_advances_state_and_fields() {
478        let started = RemoteLogSegmentMetadata::new(
479            seg_id(),
480            0,
481            10,
482            123,
483            1,
484            456,
485            1024,
486            RemoteLogSegmentState::CopySegmentStarted,
487            epochs(),
488        )
489        .unwrap();
490        let update = RemoteLogSegmentMetadataUpdate {
491            remote_log_segment_id: seg_id(),
492            event_timestamp_ms: 789,
493            custom_metadata: Some(CustomMetadata(vec![1, 2, 3])),
494            state: RemoteLogSegmentState::CopySegmentFinished,
495            broker_id: 2,
496        };
497        let finished = started.with_update(&update).unwrap();
498        assert!(finished.state() == RemoteLogSegmentState::CopySegmentFinished);
499        assert!(finished.event_timestamp_ms() == 789);
500        assert!(finished.broker_id() == 2);
501        assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![1, 2, 3])));
502        // Untouched fields survive.
503        assert!(finished.start_offset() == 0);
504        assert!(finished.end_offset() == 10);
505    }
506
507    #[test]
508    fn with_update_keeps_custom_metadata_when_update_omits_it() {
509        let started = RemoteLogSegmentMetadata::new(
510            seg_id(),
511            0,
512            10,
513            123,
514            1,
515            456,
516            1024,
517            RemoteLogSegmentState::CopySegmentStarted,
518            epochs(),
519        )
520        .unwrap()
521        .with_custom_metadata(CustomMetadata(vec![9]));
522        let update = RemoteLogSegmentMetadataUpdate {
523            remote_log_segment_id: seg_id(),
524            event_timestamp_ms: 789,
525            custom_metadata: None,
526            state: RemoteLogSegmentState::CopySegmentFinished,
527            broker_id: 2,
528        };
529        let finished = started.with_update(&update).unwrap();
530        assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![9])));
531    }
532
533    #[test]
534    fn with_update_rejects_invalid_transition() {
535        let started = RemoteLogSegmentMetadata::new(
536            seg_id(),
537            0,
538            10,
539            123,
540            1,
541            456,
542            1024,
543            RemoteLogSegmentState::CopySegmentStarted,
544            epochs(),
545        )
546        .unwrap();
547        let update = RemoteLogSegmentMetadataUpdate {
548            remote_log_segment_id: seg_id(),
549            event_timestamp_ms: 789,
550            custom_metadata: None,
551            state: RemoteLogSegmentState::DeleteSegmentFinished,
552            broker_id: 2,
553        };
554        let err = started.with_update(&update).unwrap_err();
555        assert!(matches!(
556            err,
557            RemoteStorageError::InvalidSegmentTransition { .. }
558        ));
559    }
560
561    #[test]
562    fn with_update_rejects_mismatched_id() {
563        let started = RemoteLogSegmentMetadata::new(
564            seg_id(),
565            0,
566            10,
567            123,
568            1,
569            456,
570            1024,
571            RemoteLogSegmentState::CopySegmentStarted,
572            epochs(),
573        )
574        .unwrap();
575        let other = RemoteLogSegmentId::new(tp(), Uuid::from_u128(1234));
576        let update = RemoteLogSegmentMetadataUpdate {
577            remote_log_segment_id: other,
578            event_timestamp_ms: 789,
579            custom_metadata: None,
580            state: RemoteLogSegmentState::CopySegmentFinished,
581            broker_id: 2,
582        };
583        let err = started.with_update(&update).unwrap_err();
584        assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
585    }
586
587    #[test]
588    fn txn_index_empty_defaults_false_and_is_settable() {
589        let md = RemoteLogSegmentMetadata::new(
590            RemoteLogSegmentId::new(
591                TopicIdPartition::new(Uuid::from_u128(1), "t", 0),
592                Uuid::from_u128(2),
593            ),
594            0,
595            9,
596            9,
597            1,
598            100,
599            1024,
600            RemoteLogSegmentState::CopySegmentStarted,
601            BTreeMap::from([(0, 0)]),
602        )
603        .unwrap();
604        assert!(!md.txn_index_empty());
605        let md = md.with_txn_index_empty(true);
606        assert!(md.txn_index_empty());
607    }
608
609    #[test]
610    fn partition_delete_transitions() {
611        use RemotePartitionDeleteState::{
612            DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
613        };
614        assert!(RemotePartitionDeleteState::is_valid_transition(
615            None,
616            DeletePartitionMarked
617        ));
618        assert!(RemotePartitionDeleteState::is_valid_transition(
619            Some(DeletePartitionMarked),
620            DeletePartitionStarted
621        ));
622        assert!(RemotePartitionDeleteState::is_valid_transition(
623            Some(DeletePartitionStarted),
624            DeletePartitionFinished
625        ));
626        // Invalid: skipping, restarting, or marking twice.
627        assert!(!RemotePartitionDeleteState::is_valid_transition(
628            None,
629            DeletePartitionStarted
630        ));
631        assert!(!RemotePartitionDeleteState::is_valid_transition(
632            Some(DeletePartitionMarked),
633            DeletePartitionMarked
634        ));
635        assert!(!RemotePartitionDeleteState::is_valid_transition(
636            Some(DeletePartitionFinished),
637            DeletePartitionStarted
638        ));
639    }
640}