use std::sync::{Arc, RwLock};
use crabka_remote_storage::{
RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteLogSegmentMetadataUpdate,
RemotePartitionDeleteMetadata, RemoteStorageError, TopicIdPartition,
};
pub struct SwappableRlmm {
inner: RwLock<Arc<dyn RemoteLogMetadataManager>>,
}
impl SwappableRlmm {
#[must_use]
pub fn new(initial: Arc<dyn RemoteLogMetadataManager>) -> Self {
Self {
inner: RwLock::new(initial),
}
}
pub fn swap(&self, new: Arc<dyn RemoteLogMetadataManager>) {
*self
.inner
.write()
.expect("SwappableRlmm write lock poisoned") = new;
}
fn current(&self) -> Arc<dyn RemoteLogMetadataManager> {
self.inner
.read()
.expect("SwappableRlmm read lock poisoned")
.clone()
}
}
impl RemoteLogMetadataManager for SwappableRlmm {
fn add_remote_log_segment_metadata(
&self,
metadata: RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
self.current().add_remote_log_segment_metadata(metadata)
}
fn update_remote_log_segment_metadata(
&self,
update: RemoteLogSegmentMetadataUpdate,
) -> Result<(), RemoteStorageError> {
self.current().update_remote_log_segment_metadata(update)
}
fn remote_log_segment_metadata(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
offset: i64,
) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
self.current()
.remote_log_segment_metadata(topic_id_partition, leader_epoch, offset)
}
fn highest_offset_for_epoch(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
) -> Result<Option<i64>, RemoteStorageError> {
self.current()
.highest_offset_for_epoch(topic_id_partition, leader_epoch)
}
fn list_remote_log_segments(
&self,
topic_id_partition: &TopicIdPartition,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
self.current().list_remote_log_segments(topic_id_partition)
}
fn list_remote_log_segments_by_epoch(
&self,
topic_id_partition: &TopicIdPartition,
leader_epoch: i32,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
self.current()
.list_remote_log_segments_by_epoch(topic_id_partition, leader_epoch)
}
fn put_remote_partition_delete_metadata(
&self,
metadata: RemotePartitionDeleteMetadata,
) -> Result<(), RemoteStorageError> {
self.current()
.put_remote_partition_delete_metadata(metadata)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::BTreeMap;
use uuid::Uuid;
use crabka_remote_storage::{
InmemoryRemoteLogMetadataManager, RemoteLogSegmentId, RemoteLogSegmentState,
};
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
}
fn started(id: u128, start: i64, end: i64) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
start,
end,
end + 1,
1,
100,
2048,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, start)]),
)
.unwrap()
}
#[test]
fn delegates_to_initial_then_to_swapped() {
let first: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let swap = SwappableRlmm::new(first.clone());
swap.add_remote_log_segment_metadata(started(10, 0, 99))
.unwrap();
assert!(first.list_remote_log_segments(&tp()).unwrap().len() == 1);
let second: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
swap.swap(second.clone());
assert!(swap.list_remote_log_segments(&tp()).unwrap().is_empty());
assert!(first.list_remote_log_segments(&tp()).unwrap().len() == 1);
swap.add_remote_log_segment_metadata(started(11, 100, 199))
.unwrap();
assert!(second.list_remote_log_segments(&tp()).unwrap().len() == 1);
assert!(first.list_remote_log_segments(&tp()).unwrap().len() == 1);
}
}