1use std::collections::BTreeMap;
11use std::hash::{Hash, Hasher};
12
13use uuid::Uuid;
14
15use crate::error::RemoteStorageError;
16
17#[derive(Debug, Clone)]
24pub struct TopicIdPartition {
25 pub topic_id: Uuid,
27 pub topic: String,
29 pub partition: i32,
31}
32
33impl TopicIdPartition {
34 #[must_use]
36 pub fn new(topic_id: Uuid, topic: impl Into<String>, partition: i32) -> Self {
37 Self {
38 topic_id,
39 topic: topic.into(),
40 partition,
41 }
42 }
43}
44
45impl PartialEq for TopicIdPartition {
46 fn eq(&self, other: &Self) -> bool {
47 self.topic_id == other.topic_id && self.partition == other.partition
48 }
49}
50
51impl Eq for TopicIdPartition {}
52
53impl Hash for TopicIdPartition {
54 fn hash<H: Hasher>(&self, state: &mut H) {
55 self.topic_id.hash(state);
56 self.partition.hash(state);
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Hash)]
63pub struct RemoteLogSegmentId {
64 pub topic_id_partition: TopicIdPartition,
66 pub id: Uuid,
68}
69
70impl RemoteLogSegmentId {
71 #[must_use]
73 pub fn new(topic_id_partition: TopicIdPartition, id: Uuid) -> Self {
74 Self {
75 topic_id_partition,
76 id,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub enum RemoteLogSegmentState {
91 CopySegmentStarted,
94 CopySegmentFinished,
97 DeleteSegmentStarted,
99 DeleteSegmentFinished,
101}
102
103impl RemoteLogSegmentState {
104 #[must_use]
109 pub fn is_valid_transition(self, target: Self) -> bool {
110 use RemoteLogSegmentState::{
111 CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
112 };
113 matches!(
114 (self, target),
115 (
116 CopySegmentStarted,
117 CopySegmentFinished | DeleteSegmentStarted
118 ) | (CopySegmentFinished, DeleteSegmentStarted)
119 | (DeleteSegmentStarted, DeleteSegmentFinished)
120 )
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
128pub struct CustomMetadata(pub Vec<u8>);
129
130#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct RemoteLogSegmentMetadata {
134 remote_log_segment_id: RemoteLogSegmentId,
135 start_offset: i64,
136 end_offset: i64,
137 max_timestamp_ms: i64,
138 broker_id: i32,
139 event_timestamp_ms: i64,
140 segment_size_in_bytes: i32,
141 custom_metadata: Option<CustomMetadata>,
142 state: RemoteLogSegmentState,
143 segment_leader_epochs: BTreeMap<i32, i64>,
144 txn_index_empty: bool,
148}
149
150impl RemoteLogSegmentMetadata {
151 #[allow(clippy::too_many_arguments)]
159 pub fn new(
160 remote_log_segment_id: RemoteLogSegmentId,
161 start_offset: i64,
162 end_offset: i64,
163 max_timestamp_ms: i64,
164 broker_id: i32,
165 event_timestamp_ms: i64,
166 segment_size_in_bytes: i32,
167 state: RemoteLogSegmentState,
168 segment_leader_epochs: BTreeMap<i32, i64>,
169 ) -> Result<Self, RemoteStorageError> {
170 if segment_leader_epochs.is_empty() {
171 return Err(RemoteStorageError::InvalidArgument(
172 "segment_leader_epochs must not be empty".into(),
173 ));
174 }
175 if end_offset < start_offset {
176 return Err(RemoteStorageError::InvalidArgument(format!(
177 "end_offset ({end_offset}) < start_offset ({start_offset})"
178 )));
179 }
180 if segment_size_in_bytes < 0 {
181 return Err(RemoteStorageError::InvalidArgument(format!(
182 "segment_size_in_bytes ({segment_size_in_bytes}) must be >= 0"
183 )));
184 }
185 Ok(Self {
186 remote_log_segment_id,
187 start_offset,
188 end_offset,
189 max_timestamp_ms,
190 broker_id,
191 event_timestamp_ms,
192 segment_size_in_bytes,
193 custom_metadata: None,
194 state,
195 segment_leader_epochs,
196 txn_index_empty: false,
197 })
198 }
199
200 pub fn with_update(
212 &self,
213 update: &RemoteLogSegmentMetadataUpdate,
214 ) -> Result<Self, RemoteStorageError> {
215 if update.remote_log_segment_id != self.remote_log_segment_id {
216 return Err(RemoteStorageError::InvalidArgument(
217 "update segment id does not match metadata segment id".into(),
218 ));
219 }
220 if !self.state.is_valid_transition(update.state) {
221 return Err(RemoteStorageError::InvalidSegmentTransition {
222 id: self.remote_log_segment_id.clone(),
223 from: self.state,
224 to: update.state,
225 });
226 }
227 let mut next = self.clone();
228 next.state = update.state;
229 next.event_timestamp_ms = update.event_timestamp_ms;
230 next.broker_id = update.broker_id;
231 if update.custom_metadata.is_some() {
232 next.custom_metadata.clone_from(&update.custom_metadata);
233 }
234 Ok(next)
235 }
236
237 #[must_use]
239 pub fn remote_log_segment_id(&self) -> &RemoteLogSegmentId {
240 &self.remote_log_segment_id
241 }
242
243 #[must_use]
245 pub fn start_offset(&self) -> i64 {
246 self.start_offset
247 }
248
249 #[must_use]
251 pub fn end_offset(&self) -> i64 {
252 self.end_offset
253 }
254
255 #[must_use]
257 pub fn max_timestamp_ms(&self) -> i64 {
258 self.max_timestamp_ms
259 }
260
261 #[must_use]
263 pub fn broker_id(&self) -> i32 {
264 self.broker_id
265 }
266
267 #[must_use]
269 pub fn event_timestamp_ms(&self) -> i64 {
270 self.event_timestamp_ms
271 }
272
273 #[must_use]
275 pub fn segment_size_in_bytes(&self) -> i32 {
276 self.segment_size_in_bytes
277 }
278
279 #[must_use]
282 pub fn custom_metadata(&self) -> Option<&CustomMetadata> {
283 self.custom_metadata.as_ref()
284 }
285
286 #[must_use]
288 pub fn state(&self) -> RemoteLogSegmentState {
289 self.state
290 }
291
292 #[must_use]
295 pub fn segment_leader_epochs(&self) -> &BTreeMap<i32, i64> {
296 &self.segment_leader_epochs
297 }
298
299 #[must_use]
302 pub fn with_custom_metadata(mut self, custom: CustomMetadata) -> Self {
303 self.custom_metadata = Some(custom);
304 self
305 }
306
307 #[must_use]
310 pub fn txn_index_empty(&self) -> bool {
311 self.txn_index_empty
312 }
313
314 #[must_use]
316 pub fn with_txn_index_empty(mut self, empty: bool) -> Self {
317 self.txn_index_empty = empty;
318 self
319 }
320}
321
322#[derive(Debug, Clone, PartialEq, Eq)]
324pub struct RemoteLogSegmentMetadataUpdate {
325 pub remote_log_segment_id: RemoteLogSegmentId,
327 pub event_timestamp_ms: i64,
329 pub custom_metadata: Option<CustomMetadata>,
331 pub state: RemoteLogSegmentState,
333 pub broker_id: i32,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
343pub enum RemotePartitionDeleteState {
344 DeletePartitionMarked,
347 DeletePartitionStarted,
349 DeletePartitionFinished,
351}
352
353impl RemotePartitionDeleteState {
354 #[must_use]
357 pub fn is_valid_transition(from: Option<Self>, target: Self) -> bool {
358 use RemotePartitionDeleteState::{
359 DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
360 };
361 matches!(
362 (from, target),
363 (None, DeletePartitionMarked)
364 | (Some(DeletePartitionMarked), DeletePartitionStarted)
365 | (Some(DeletePartitionStarted), DeletePartitionFinished)
366 )
367 }
368}
369
370#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct RemotePartitionDeleteMetadata {
373 pub topic_id_partition: TopicIdPartition,
375 pub state: RemotePartitionDeleteState,
377 pub event_timestamp_ms: i64,
379 pub broker_id: i32,
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use assert2::assert;
387 use std::collections::HashSet;
388
389 fn tp() -> TopicIdPartition {
390 TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
391 }
392
393 fn seg_id() -> RemoteLogSegmentId {
394 RemoteLogSegmentId::new(tp(), Uuid::from_u128(99))
395 }
396
397 fn epochs() -> BTreeMap<i32, i64> {
398 BTreeMap::from([(0, 0)])
399 }
400
401 #[test]
402 fn topic_id_partition_identity_ignores_name() {
403 let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 3);
404 let b = TopicIdPartition::new(Uuid::from_u128(7), "renamed", 3);
405 assert!(a == b);
406 let set: HashSet<_> = [a, b].into_iter().collect();
407 assert!(set.len() == 1, "same id+partition must collapse in a set");
408 }
409
410 #[test]
411 fn topic_id_partition_distinct_partitions_differ() {
412 let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 0);
413 let b = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 1);
414 assert!(a != b);
415 }
416
417 #[test]
418 fn segment_state_valid_transitions() {
419 use RemoteLogSegmentState::{
420 CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
421 };
422 assert!(CopySegmentStarted.is_valid_transition(CopySegmentFinished));
423 assert!(CopySegmentStarted.is_valid_transition(DeleteSegmentStarted));
424 assert!(CopySegmentFinished.is_valid_transition(DeleteSegmentStarted));
425 assert!(DeleteSegmentStarted.is_valid_transition(DeleteSegmentFinished));
426 }
427
428 #[test]
429 fn segment_state_invalid_transitions() {
430 use RemoteLogSegmentState::{
431 CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
432 };
433 assert!(!CopySegmentStarted.is_valid_transition(CopySegmentStarted));
435 assert!(!CopySegmentStarted.is_valid_transition(DeleteSegmentFinished));
436 assert!(!CopySegmentFinished.is_valid_transition(CopySegmentStarted));
437 assert!(!CopySegmentFinished.is_valid_transition(CopySegmentFinished));
438 assert!(!DeleteSegmentStarted.is_valid_transition(CopySegmentFinished));
439 assert!(!DeleteSegmentFinished.is_valid_transition(DeleteSegmentStarted));
440 }
441
442 #[test]
443 fn metadata_rejects_empty_leader_epochs() {
444 let err = RemoteLogSegmentMetadata::new(
445 seg_id(),
446 0,
447 10,
448 123,
449 1,
450 456,
451 1024,
452 RemoteLogSegmentState::CopySegmentStarted,
453 BTreeMap::new(),
454 )
455 .unwrap_err();
456 assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
457 }
458
459 #[test]
460 fn metadata_rejects_end_before_start() {
461 let err = RemoteLogSegmentMetadata::new(
462 seg_id(),
463 10,
464 5,
465 123,
466 1,
467 456,
468 1024,
469 RemoteLogSegmentState::CopySegmentStarted,
470 epochs(),
471 )
472 .unwrap_err();
473 assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
474 }
475
476 #[test]
477 fn with_update_advances_state_and_fields() {
478 let started = RemoteLogSegmentMetadata::new(
479 seg_id(),
480 0,
481 10,
482 123,
483 1,
484 456,
485 1024,
486 RemoteLogSegmentState::CopySegmentStarted,
487 epochs(),
488 )
489 .unwrap();
490 let update = RemoteLogSegmentMetadataUpdate {
491 remote_log_segment_id: seg_id(),
492 event_timestamp_ms: 789,
493 custom_metadata: Some(CustomMetadata(vec![1, 2, 3])),
494 state: RemoteLogSegmentState::CopySegmentFinished,
495 broker_id: 2,
496 };
497 let finished = started.with_update(&update).unwrap();
498 assert!(finished.state() == RemoteLogSegmentState::CopySegmentFinished);
499 assert!(finished.event_timestamp_ms() == 789);
500 assert!(finished.broker_id() == 2);
501 assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![1, 2, 3])));
502 assert!(finished.start_offset() == 0);
504 assert!(finished.end_offset() == 10);
505 }
506
507 #[test]
508 fn with_update_keeps_custom_metadata_when_update_omits_it() {
509 let started = RemoteLogSegmentMetadata::new(
510 seg_id(),
511 0,
512 10,
513 123,
514 1,
515 456,
516 1024,
517 RemoteLogSegmentState::CopySegmentStarted,
518 epochs(),
519 )
520 .unwrap()
521 .with_custom_metadata(CustomMetadata(vec![9]));
522 let update = RemoteLogSegmentMetadataUpdate {
523 remote_log_segment_id: seg_id(),
524 event_timestamp_ms: 789,
525 custom_metadata: None,
526 state: RemoteLogSegmentState::CopySegmentFinished,
527 broker_id: 2,
528 };
529 let finished = started.with_update(&update).unwrap();
530 assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![9])));
531 }
532
533 #[test]
534 fn with_update_rejects_invalid_transition() {
535 let started = RemoteLogSegmentMetadata::new(
536 seg_id(),
537 0,
538 10,
539 123,
540 1,
541 456,
542 1024,
543 RemoteLogSegmentState::CopySegmentStarted,
544 epochs(),
545 )
546 .unwrap();
547 let update = RemoteLogSegmentMetadataUpdate {
548 remote_log_segment_id: seg_id(),
549 event_timestamp_ms: 789,
550 custom_metadata: None,
551 state: RemoteLogSegmentState::DeleteSegmentFinished,
552 broker_id: 2,
553 };
554 let err = started.with_update(&update).unwrap_err();
555 assert!(matches!(
556 err,
557 RemoteStorageError::InvalidSegmentTransition { .. }
558 ));
559 }
560
561 #[test]
562 fn with_update_rejects_mismatched_id() {
563 let started = RemoteLogSegmentMetadata::new(
564 seg_id(),
565 0,
566 10,
567 123,
568 1,
569 456,
570 1024,
571 RemoteLogSegmentState::CopySegmentStarted,
572 epochs(),
573 )
574 .unwrap();
575 let other = RemoteLogSegmentId::new(tp(), Uuid::from_u128(1234));
576 let update = RemoteLogSegmentMetadataUpdate {
577 remote_log_segment_id: other,
578 event_timestamp_ms: 789,
579 custom_metadata: None,
580 state: RemoteLogSegmentState::CopySegmentFinished,
581 broker_id: 2,
582 };
583 let err = started.with_update(&update).unwrap_err();
584 assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
585 }
586
587 #[test]
588 fn txn_index_empty_defaults_false_and_is_settable() {
589 let md = RemoteLogSegmentMetadata::new(
590 RemoteLogSegmentId::new(
591 TopicIdPartition::new(Uuid::from_u128(1), "t", 0),
592 Uuid::from_u128(2),
593 ),
594 0,
595 9,
596 9,
597 1,
598 100,
599 1024,
600 RemoteLogSegmentState::CopySegmentStarted,
601 BTreeMap::from([(0, 0)]),
602 )
603 .unwrap();
604 assert!(!md.txn_index_empty());
605 let md = md.with_txn_index_empty(true);
606 assert!(md.txn_index_empty());
607 }
608
609 #[test]
610 fn partition_delete_transitions() {
611 use RemotePartitionDeleteState::{
612 DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
613 };
614 assert!(RemotePartitionDeleteState::is_valid_transition(
615 None,
616 DeletePartitionMarked
617 ));
618 assert!(RemotePartitionDeleteState::is_valid_transition(
619 Some(DeletePartitionMarked),
620 DeletePartitionStarted
621 ));
622 assert!(RemotePartitionDeleteState::is_valid_transition(
623 Some(DeletePartitionStarted),
624 DeletePartitionFinished
625 ));
626 assert!(!RemotePartitionDeleteState::is_valid_transition(
628 None,
629 DeletePartitionStarted
630 ));
631 assert!(!RemotePartitionDeleteState::is_valid_transition(
632 Some(DeletePartitionMarked),
633 DeletePartitionMarked
634 ));
635 assert!(!RemotePartitionDeleteState::is_valid_transition(
636 Some(DeletePartitionFinished),
637 DeletePartitionStarted
638 ));
639 }
640}