use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use uuid::Uuid;
use crate::error::RemoteStorageError;
#[derive(Debug, Clone)]
pub struct TopicIdPartition {
pub topic_id: Uuid,
pub topic: String,
pub partition: i32,
}
impl TopicIdPartition {
#[must_use]
pub fn new(topic_id: Uuid, topic: impl Into<String>, partition: i32) -> Self {
Self {
topic_id,
topic: topic.into(),
partition,
}
}
}
impl PartialEq for TopicIdPartition {
fn eq(&self, other: &Self) -> bool {
self.topic_id == other.topic_id && self.partition == other.partition
}
}
impl Eq for TopicIdPartition {}
impl Hash for TopicIdPartition {
fn hash<H: Hasher>(&self, state: &mut H) {
self.topic_id.hash(state);
self.partition.hash(state);
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RemoteLogSegmentId {
pub topic_id_partition: TopicIdPartition,
pub id: Uuid,
}
impl RemoteLogSegmentId {
#[must_use]
pub fn new(topic_id_partition: TopicIdPartition, id: Uuid) -> Self {
Self {
topic_id_partition,
id,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteLogSegmentState {
CopySegmentStarted,
CopySegmentFinished,
DeleteSegmentStarted,
DeleteSegmentFinished,
}
impl RemoteLogSegmentState {
#[must_use]
pub fn is_valid_transition(self, target: Self) -> bool {
use RemoteLogSegmentState::{
CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
};
matches!(
(self, target),
(
CopySegmentStarted,
CopySegmentFinished | DeleteSegmentStarted
) | (CopySegmentFinished, DeleteSegmentStarted)
| (DeleteSegmentStarted, DeleteSegmentFinished)
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CustomMetadata(pub Vec<u8>);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteLogSegmentMetadata {
remote_log_segment_id: RemoteLogSegmentId,
start_offset: i64,
end_offset: i64,
max_timestamp_ms: i64,
broker_id: i32,
event_timestamp_ms: i64,
segment_size_in_bytes: i32,
custom_metadata: Option<CustomMetadata>,
state: RemoteLogSegmentState,
segment_leader_epochs: BTreeMap<i32, i64>,
txn_index_empty: bool,
}
impl RemoteLogSegmentMetadata {
#[allow(clippy::too_many_arguments)]
pub fn new(
remote_log_segment_id: RemoteLogSegmentId,
start_offset: i64,
end_offset: i64,
max_timestamp_ms: i64,
broker_id: i32,
event_timestamp_ms: i64,
segment_size_in_bytes: i32,
state: RemoteLogSegmentState,
segment_leader_epochs: BTreeMap<i32, i64>,
) -> Result<Self, RemoteStorageError> {
if segment_leader_epochs.is_empty() {
return Err(RemoteStorageError::InvalidArgument(
"segment_leader_epochs must not be empty".into(),
));
}
if end_offset < start_offset {
return Err(RemoteStorageError::InvalidArgument(format!(
"end_offset ({end_offset}) < start_offset ({start_offset})"
)));
}
if segment_size_in_bytes < 0 {
return Err(RemoteStorageError::InvalidArgument(format!(
"segment_size_in_bytes ({segment_size_in_bytes}) must be >= 0"
)));
}
Ok(Self {
remote_log_segment_id,
start_offset,
end_offset,
max_timestamp_ms,
broker_id,
event_timestamp_ms,
segment_size_in_bytes,
custom_metadata: None,
state,
segment_leader_epochs,
txn_index_empty: false,
})
}
pub fn with_update(
&self,
update: &RemoteLogSegmentMetadataUpdate,
) -> Result<Self, RemoteStorageError> {
if update.remote_log_segment_id != self.remote_log_segment_id {
return Err(RemoteStorageError::InvalidArgument(
"update segment id does not match metadata segment id".into(),
));
}
if !self.state.is_valid_transition(update.state) {
return Err(RemoteStorageError::InvalidSegmentTransition {
id: self.remote_log_segment_id.clone(),
from: self.state,
to: update.state,
});
}
let mut next = self.clone();
next.state = update.state;
next.event_timestamp_ms = update.event_timestamp_ms;
next.broker_id = update.broker_id;
if update.custom_metadata.is_some() {
next.custom_metadata.clone_from(&update.custom_metadata);
}
Ok(next)
}
#[must_use]
pub fn remote_log_segment_id(&self) -> &RemoteLogSegmentId {
&self.remote_log_segment_id
}
#[must_use]
pub fn start_offset(&self) -> i64 {
self.start_offset
}
#[must_use]
pub fn end_offset(&self) -> i64 {
self.end_offset
}
#[must_use]
pub fn max_timestamp_ms(&self) -> i64 {
self.max_timestamp_ms
}
#[must_use]
pub fn broker_id(&self) -> i32 {
self.broker_id
}
#[must_use]
pub fn event_timestamp_ms(&self) -> i64 {
self.event_timestamp_ms
}
#[must_use]
pub fn segment_size_in_bytes(&self) -> i32 {
self.segment_size_in_bytes
}
#[must_use]
pub fn custom_metadata(&self) -> Option<&CustomMetadata> {
self.custom_metadata.as_ref()
}
#[must_use]
pub fn state(&self) -> RemoteLogSegmentState {
self.state
}
#[must_use]
pub fn segment_leader_epochs(&self) -> &BTreeMap<i32, i64> {
&self.segment_leader_epochs
}
#[must_use]
pub fn with_custom_metadata(mut self, custom: CustomMetadata) -> Self {
self.custom_metadata = Some(custom);
self
}
#[must_use]
pub fn txn_index_empty(&self) -> bool {
self.txn_index_empty
}
#[must_use]
pub fn with_txn_index_empty(mut self, empty: bool) -> Self {
self.txn_index_empty = empty;
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteLogSegmentMetadataUpdate {
pub remote_log_segment_id: RemoteLogSegmentId,
pub event_timestamp_ms: i64,
pub custom_metadata: Option<CustomMetadata>,
pub state: RemoteLogSegmentState,
pub broker_id: i32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemotePartitionDeleteState {
DeletePartitionMarked,
DeletePartitionStarted,
DeletePartitionFinished,
}
impl RemotePartitionDeleteState {
#[must_use]
pub fn is_valid_transition(from: Option<Self>, target: Self) -> bool {
use RemotePartitionDeleteState::{
DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
};
matches!(
(from, target),
(None, DeletePartitionMarked)
| (Some(DeletePartitionMarked), DeletePartitionStarted)
| (Some(DeletePartitionStarted), DeletePartitionFinished)
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemotePartitionDeleteMetadata {
pub topic_id_partition: TopicIdPartition,
pub state: RemotePartitionDeleteState,
pub event_timestamp_ms: i64,
pub broker_id: i32,
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::HashSet;
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
}
fn seg_id() -> RemoteLogSegmentId {
RemoteLogSegmentId::new(tp(), Uuid::from_u128(99))
}
fn epochs() -> BTreeMap<i32, i64> {
BTreeMap::from([(0, 0)])
}
#[test]
fn topic_id_partition_identity_ignores_name() {
let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 3);
let b = TopicIdPartition::new(Uuid::from_u128(7), "renamed", 3);
assert!(a == b);
let set: HashSet<_> = [a, b].into_iter().collect();
assert!(set.len() == 1, "same id+partition must collapse in a set");
}
#[test]
fn topic_id_partition_distinct_partitions_differ() {
let a = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 0);
let b = TopicIdPartition::new(Uuid::from_u128(7), "alpha", 1);
assert!(a != b);
}
#[test]
fn segment_state_valid_transitions() {
use RemoteLogSegmentState::{
CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
};
assert!(CopySegmentStarted.is_valid_transition(CopySegmentFinished));
assert!(CopySegmentStarted.is_valid_transition(DeleteSegmentStarted));
assert!(CopySegmentFinished.is_valid_transition(DeleteSegmentStarted));
assert!(DeleteSegmentStarted.is_valid_transition(DeleteSegmentFinished));
}
#[test]
fn segment_state_invalid_transitions() {
use RemoteLogSegmentState::{
CopySegmentFinished, CopySegmentStarted, DeleteSegmentFinished, DeleteSegmentStarted,
};
assert!(!CopySegmentStarted.is_valid_transition(CopySegmentStarted));
assert!(!CopySegmentStarted.is_valid_transition(DeleteSegmentFinished));
assert!(!CopySegmentFinished.is_valid_transition(CopySegmentStarted));
assert!(!CopySegmentFinished.is_valid_transition(CopySegmentFinished));
assert!(!DeleteSegmentStarted.is_valid_transition(CopySegmentFinished));
assert!(!DeleteSegmentFinished.is_valid_transition(DeleteSegmentStarted));
}
#[test]
fn metadata_rejects_empty_leader_epochs() {
let err = RemoteLogSegmentMetadata::new(
seg_id(),
0,
10,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::new(),
)
.unwrap_err();
assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
}
#[test]
fn metadata_rejects_end_before_start() {
let err = RemoteLogSegmentMetadata::new(
seg_id(),
10,
5,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs(),
)
.unwrap_err();
assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
}
#[test]
fn with_update_advances_state_and_fields() {
let started = RemoteLogSegmentMetadata::new(
seg_id(),
0,
10,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs(),
)
.unwrap();
let update = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: seg_id(),
event_timestamp_ms: 789,
custom_metadata: Some(CustomMetadata(vec![1, 2, 3])),
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 2,
};
let finished = started.with_update(&update).unwrap();
assert!(finished.state() == RemoteLogSegmentState::CopySegmentFinished);
assert!(finished.event_timestamp_ms() == 789);
assert!(finished.broker_id() == 2);
assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![1, 2, 3])));
assert!(finished.start_offset() == 0);
assert!(finished.end_offset() == 10);
}
#[test]
fn with_update_keeps_custom_metadata_when_update_omits_it() {
let started = RemoteLogSegmentMetadata::new(
seg_id(),
0,
10,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs(),
)
.unwrap()
.with_custom_metadata(CustomMetadata(vec![9]));
let update = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: seg_id(),
event_timestamp_ms: 789,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 2,
};
let finished = started.with_update(&update).unwrap();
assert!(finished.custom_metadata() == Some(&CustomMetadata(vec![9])));
}
#[test]
fn with_update_rejects_invalid_transition() {
let started = RemoteLogSegmentMetadata::new(
seg_id(),
0,
10,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs(),
)
.unwrap();
let update = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: seg_id(),
event_timestamp_ms: 789,
custom_metadata: None,
state: RemoteLogSegmentState::DeleteSegmentFinished,
broker_id: 2,
};
let err = started.with_update(&update).unwrap_err();
assert!(matches!(
err,
RemoteStorageError::InvalidSegmentTransition { .. }
));
}
#[test]
fn with_update_rejects_mismatched_id() {
let started = RemoteLogSegmentMetadata::new(
seg_id(),
0,
10,
123,
1,
456,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs(),
)
.unwrap();
let other = RemoteLogSegmentId::new(tp(), Uuid::from_u128(1234));
let update = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: other,
event_timestamp_ms: 789,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 2,
};
let err = started.with_update(&update).unwrap_err();
assert!(matches!(err, RemoteStorageError::InvalidArgument(_)));
}
#[test]
fn txn_index_empty_defaults_false_and_is_settable() {
let md = RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(
TopicIdPartition::new(Uuid::from_u128(1), "t", 0),
Uuid::from_u128(2),
),
0,
9,
9,
1,
100,
1024,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, 0)]),
)
.unwrap();
assert!(!md.txn_index_empty());
let md = md.with_txn_index_empty(true);
assert!(md.txn_index_empty());
}
#[test]
fn partition_delete_transitions() {
use RemotePartitionDeleteState::{
DeletePartitionFinished, DeletePartitionMarked, DeletePartitionStarted,
};
assert!(RemotePartitionDeleteState::is_valid_transition(
None,
DeletePartitionMarked
));
assert!(RemotePartitionDeleteState::is_valid_transition(
Some(DeletePartitionMarked),
DeletePartitionStarted
));
assert!(RemotePartitionDeleteState::is_valid_transition(
Some(DeletePartitionStarted),
DeletePartitionFinished
));
assert!(!RemotePartitionDeleteState::is_valid_transition(
None,
DeletePartitionStarted
));
assert!(!RemotePartitionDeleteState::is_valid_transition(
Some(DeletePartitionMarked),
DeletePartitionMarked
));
assert!(!RemotePartitionDeleteState::is_valid_transition(
Some(DeletePartitionFinished),
DeletePartitionStarted
));
}
}