use std::collections::{BTreeMap, HashMap};
use uuid::Uuid;
use crate::error::RemoteStorageError;
use crate::metadata::{
RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState,
RemotePartitionDeleteState,
};
#[derive(Debug, Default)]
pub(crate) struct RemoteLogMetadataCache {
id_to_metadata: HashMap<Uuid, RemoteLogSegmentMetadata>,
epoch_to_offset_to_id: HashMap<i32, BTreeMap<i64, Uuid>>,
delete_state: Option<RemotePartitionDeleteState>,
}
impl RemoteLogMetadataCache {
pub(crate) fn add(
&mut self,
metadata: RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
let id = metadata.remote_log_segment_id().clone();
if metadata.state() != RemoteLogSegmentState::CopySegmentStarted {
return Err(RemoteStorageError::InvalidAdd {
id,
reason: format!(
"starting state must be CopySegmentStarted, got {:?}",
metadata.state()
),
});
}
if self.id_to_metadata.contains_key(&id.id) {
return Err(RemoteStorageError::InvalidAdd {
id,
reason: "segment id already exists".into(),
});
}
self.id_to_metadata.insert(id.id, metadata);
Ok(())
}
pub(crate) fn update(
&mut self,
update: &RemoteLogSegmentMetadataUpdate,
) -> Result<(), RemoteStorageError> {
let id = update.remote_log_segment_id.clone();
let existing = self
.id_to_metadata
.get(&id.id)
.ok_or_else(|| RemoteStorageError::SegmentNotFound(id.clone()))?;
let updated = existing.with_update(update)?;
let new_state = updated.state();
match new_state {
RemoteLogSegmentState::CopySegmentFinished => self.index_epochs(&updated),
RemoteLogSegmentState::DeleteSegmentStarted => self.deindex_epochs(&updated),
RemoteLogSegmentState::DeleteSegmentFinished => {
self.deindex_epochs(&updated);
self.id_to_metadata.remove(&id.id);
return Ok(());
}
RemoteLogSegmentState::CopySegmentStarted => {}
}
self.id_to_metadata.insert(id.id, updated);
Ok(())
}
fn index_epochs(&mut self, metadata: &RemoteLogSegmentMetadata) {
let id = metadata.remote_log_segment_id().id;
for (&epoch, &start) in metadata.segment_leader_epochs() {
self.epoch_to_offset_to_id
.entry(epoch)
.or_default()
.insert(start, id);
}
}
fn deindex_epochs(&mut self, metadata: &RemoteLogSegmentMetadata) {
let id = metadata.remote_log_segment_id().id;
for (&epoch, &start) in metadata.segment_leader_epochs() {
if let Some(map) = self.epoch_to_offset_to_id.get_mut(&epoch) {
if map.get(&start) == Some(&id) {
map.remove(&start);
}
if map.is_empty() {
self.epoch_to_offset_to_id.remove(&epoch);
}
}
}
}
pub(crate) fn segment_for(
&self,
leader_epoch: i32,
offset: i64,
) -> Option<RemoteLogSegmentMetadata> {
let map = self.epoch_to_offset_to_id.get(&leader_epoch)?;
let (_start, id) = map.range(..=offset).next_back()?;
let md = self.id_to_metadata.get(id)?;
if md.state() == RemoteLogSegmentState::CopySegmentFinished && offset <= md.end_offset() {
Some(md.clone())
} else {
None
}
}
pub(crate) fn highest_offset_for_epoch(&self, leader_epoch: i32) -> Option<i64> {
let map = self.epoch_to_offset_to_id.get(&leader_epoch)?;
map.values()
.filter_map(|id| self.id_to_metadata.get(id))
.map(RemoteLogSegmentMetadata::end_offset)
.max()
}
pub(crate) fn list(&self) -> Vec<RemoteLogSegmentMetadata> {
let mut out: Vec<RemoteLogSegmentMetadata> =
self.id_to_metadata.values().cloned().collect();
sort_by_start_offset(&mut out);
out
}
pub(crate) fn list_by_epoch(&self, leader_epoch: i32) -> Vec<RemoteLogSegmentMetadata> {
let mut out: Vec<RemoteLogSegmentMetadata> = self
.id_to_metadata
.values()
.filter(|m| m.segment_leader_epochs().contains_key(&leader_epoch))
.cloned()
.collect();
sort_by_start_offset(&mut out);
out
}
pub(crate) fn dump_segments(&self) -> Vec<RemoteLogSegmentMetadata> {
self.id_to_metadata.values().cloned().collect()
}
pub(crate) fn seed(
&mut self,
segments: Vec<RemoteLogSegmentMetadata>,
delete_state: Option<RemotePartitionDeleteState>,
) {
for md in segments {
let id = md.remote_log_segment_id().id;
if md.state() == RemoteLogSegmentState::CopySegmentFinished {
self.index_epochs(&md);
}
if md.state() != RemoteLogSegmentState::DeleteSegmentFinished {
self.id_to_metadata.insert(id, md);
}
}
self.delete_state = delete_state;
}
pub(crate) fn delete_state(&self) -> Option<RemotePartitionDeleteState> {
self.delete_state
}
pub(crate) fn set_delete_state(&mut self, state: RemotePartitionDeleteState) {
self.delete_state = Some(state);
}
}
fn sort_by_start_offset(segments: &mut [RemoteLogSegmentMetadata]) {
segments.sort_by(|a, b| {
a.start_offset().cmp(&b.start_offset()).then_with(|| {
a.remote_log_segment_id()
.id
.cmp(&b.remote_log_segment_id().id)
})
});
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::{CustomMetadata, RemoteLogSegmentId, TopicIdPartition};
use assert2::assert;
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "t", 0)
}
fn seg(id: u128, epochs: &[(i32, i64)], start: i64, end: i64) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
start,
end,
end,
1,
100,
1024,
RemoteLogSegmentState::CopySegmentStarted,
epochs.iter().copied().collect(),
)
.unwrap()
}
fn finish(id: u128) -> RemoteLogSegmentMetadataUpdate {
RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
event_timestamp_ms: 200,
custom_metadata: Some(CustomMetadata(vec![1])),
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 1,
}
}
fn transition(id: u128, state: RemoteLogSegmentState) -> RemoteLogSegmentMetadataUpdate {
RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
event_timestamp_ms: 300,
custom_metadata: None,
state,
broker_id: 1,
}
}
#[test]
fn started_segment_is_invisible_until_finished() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
assert!(c.segment_for(0, 50).is_none(), "started not yet readable");
c.update(&finish(10)).unwrap();
let got = c.segment_for(0, 50).expect("finished is readable");
assert!(got.remote_log_segment_id().id == Uuid::from_u128(10));
}
#[test]
fn offset_lookup_across_segments_one_epoch() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
c.add(seg(11, &[(0, 100)], 100, 199)).unwrap();
c.update(&finish(10)).unwrap();
c.update(&finish(11)).unwrap();
assert!(c.segment_for(0, 0).unwrap().remote_log_segment_id().id == Uuid::from_u128(10));
assert!(c.segment_for(0, 99).unwrap().remote_log_segment_id().id == Uuid::from_u128(10));
assert!(c.segment_for(0, 100).unwrap().remote_log_segment_id().id == Uuid::from_u128(11));
assert!(c.segment_for(0, 199).unwrap().remote_log_segment_id().id == Uuid::from_u128(11));
assert!(c.segment_for(0, 200).is_none(), "past the end");
}
#[test]
fn offset_lookup_respects_epoch() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0), (1, 50)], 0, 99)).unwrap();
c.update(&finish(10)).unwrap();
c.add(seg(11, &[(1, 100)], 100, 199)).unwrap();
c.update(&finish(11)).unwrap();
assert!(c.segment_for(0, 10).unwrap().remote_log_segment_id().id == Uuid::from_u128(10));
assert!(
c.segment_for(0, 150).is_none(),
"epoch 0 has no segment at 150"
);
assert!(c.segment_for(1, 60).unwrap().remote_log_segment_id().id == Uuid::from_u128(10));
assert!(c.segment_for(1, 150).unwrap().remote_log_segment_id().id == Uuid::from_u128(11));
}
#[test]
fn highest_offset_for_epoch_is_max_end() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
c.add(seg(11, &[(0, 100)], 100, 199)).unwrap();
c.update(&finish(10)).unwrap();
c.update(&finish(11)).unwrap();
assert!(c.highest_offset_for_epoch(0) == Some(199));
assert!(c.highest_offset_for_epoch(7) == None);
}
#[test]
fn delete_started_hides_segment_delete_finished_drops_it() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
c.update(&finish(10)).unwrap();
assert!(c.segment_for(0, 50).is_some());
c.update(&transition(10, RemoteLogSegmentState::DeleteSegmentStarted))
.unwrap();
assert!(c.segment_for(0, 50).is_none(), "delete-started hides it");
assert!(
c.list().len() == 1,
"still tracked while delete in progress"
);
c.update(&transition(
10,
RemoteLogSegmentState::DeleteSegmentFinished,
))
.unwrap();
assert!(c.list().is_empty(), "delete-finished drops it entirely");
}
#[test]
fn update_unknown_segment_errors() {
let mut c = RemoteLogMetadataCache::default();
let err = c.update(&finish(404)).unwrap_err();
assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
}
#[test]
fn add_with_wrong_state_errors() {
let mut c = RemoteLogMetadataCache::default();
let mut s = seg(10, &[(0, 0)], 0, 99);
s = s
.with_update(&finish(10))
.expect("force to finished for the test");
let err = c.add(s).unwrap_err();
assert!(matches!(err, RemoteStorageError::InvalidAdd { .. }));
}
#[test]
fn duplicate_add_errors() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
let err = c.add(seg(10, &[(0, 0)], 0, 99)).unwrap_err();
assert!(matches!(err, RemoteStorageError::InvalidAdd { .. }));
}
#[test]
fn dump_then_seed_rebuilds_epoch_index() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
c.add(seg(11, &[(0, 100)], 100, 199)).unwrap();
c.update(&finish(10)).unwrap();
c.update(&finish(11)).unwrap();
c.update(&transition(11, RemoteLogSegmentState::DeleteSegmentStarted))
.unwrap();
c.set_delete_state(RemotePartitionDeleteState::DeletePartitionMarked);
let segments = c.dump_segments();
let delete_state = c.delete_state();
let mut seeded = RemoteLogMetadataCache::default();
seeded.seed(segments, delete_state);
assert!(
seeded
.segment_for(0, 50)
.unwrap()
.remote_log_segment_id()
.id
== Uuid::from_u128(10)
);
assert!(seeded.segment_for(0, 150).is_none());
assert!(seeded.list().len() == 2);
assert!(seeded.delete_state() == Some(RemotePartitionDeleteState::DeletePartitionMarked));
}
#[test]
fn list_is_ordered_by_start_offset() {
let mut c = RemoteLogMetadataCache::default();
c.add(seg(11, &[(0, 100)], 100, 199)).unwrap();
c.add(seg(10, &[(0, 0)], 0, 99)).unwrap();
let listed = c.list();
assert!(listed[0].start_offset() == 0);
assert!(listed[1].start_offset() == 100);
}
}