use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use uuid::Uuid;
use crabka_log::{LogConfig, SegmentExport};
use crabka_metadata::NodeId;
use crabka_remote_storage::{
LogSegmentData, RemoteLogMetadataManager, RemoteLogSegmentId, RemoteLogSegmentMetadata,
RemoteLogSegmentMetadataUpdate, RemoteLogSegmentState, RemotePartitionDeleteMetadata,
RemotePartitionDeleteState, RemoteStorageManager, TopicIdPartition,
};
use crate::partition::Partition;
use crate::partition_registry::PartitionRegistry;
#[derive(Debug, Clone)]
pub(crate) struct RemoteLogManagerConfig {
pub interval: Duration,
}
impl Default for RemoteLogManagerConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(30),
}
}
}
#[allow(clippy::too_many_arguments)] pub(crate) async fn run(
partitions: Arc<PartitionRegistry>,
controller: Arc<dyn crate::metadata_source::MetadataSource>,
rsm: Arc<dyn RemoteStorageManager>,
rlmm: Arc<dyn RemoteLogMetadataManager>,
node_id: NodeId,
broker_id: i32,
cfg: RemoteLogManagerConfig,
shutdown: CancellationToken,
) {
let mut ticker = tokio::time::interval(cfg.interval);
loop {
tokio::select! {
_ = ticker.tick() => {}
() = shutdown.cancelled() => {
debug!("remote-log-manager task shutting down");
return;
}
}
tick_all(&partitions, &*controller, &rsm, &rlmm, node_id, broker_id).await;
}
}
async fn tick_all(
partitions: &PartitionRegistry,
controller: &dyn crate::metadata_source::MetadataSource,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
node_id: NodeId,
broker_id: i32,
) {
let snapshot: Vec<Arc<Partition>> = partitions.arcs();
let image = controller.current_image();
for partition in snapshot {
if partition.current_leader.load(Ordering::Relaxed) != node_id {
continue;
}
let (log_config, exports) = {
let log = partition.log.lock().expect("log mutex poisoned");
let cfg = log.config_snapshot();
if !cfg.remote_storage_enable {
continue;
}
(cfg, log.tierable_segments())
};
if exports.is_empty() {
continue;
}
let Some(topic_id) = image.topic(&partition.topic).map(|t| t.topic_id) else {
continue;
};
let leader_epoch = partition.current_leader_epoch.load(Ordering::Acquire);
let tp = TopicIdPartition::new(topic_id, partition.topic.clone(), partition.partition_id);
copy_eligible(&tp, broker_id, leader_epoch, exports.clone(), rsm, rlmm).await;
local_retention_pass(&tp, &partition, &exports, &log_config, rlmm, now_ms()).await;
remote_retention_pass(&tp, broker_id, &log_config, rsm, rlmm, now_ms()).await;
}
}
pub(crate) async fn copy_eligible(
tp: &TopicIdPartition,
broker_id: i32,
leader_epoch: i32,
exports: Vec<SegmentExport>,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
) -> usize {
let known: HashSet<i64> = match rlmm.list_remote_log_segments(tp) {
Ok(list) => list
.iter()
.map(RemoteLogSegmentMetadata::start_offset)
.collect(),
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to list remote segments");
return 0;
}
};
let mut copied = 0;
for ex in exports {
if known.contains(&ex.base_offset) {
continue;
}
if copy_one(tp, broker_id, leader_epoch, &ex, rsm, rlmm).await {
copied += 1;
}
}
copied
}
pub(crate) fn local_retention_target(
exports: &[SegmentExport],
finished_bases: &HashSet<i64>,
effective_local_ms: Option<i64>,
effective_local_bytes: Option<u64>,
now_ms: i64,
) -> Option<i64> {
let sealed_total: u64 = exports.iter().map(|e| e.size_bytes).sum();
let mut deletable_size_remaining = match effective_local_bytes {
Some(budget) if sealed_total > budget => sealed_total - budget,
_ => 0,
};
let mut delete_through_last: Option<i64> = None;
for ex in exports {
if !finished_bases.contains(&ex.base_offset) {
break;
}
let by_time = matches!(
effective_local_ms,
Some(retention) if now_ms.saturating_sub(ex.max_timestamp) > retention
);
let by_size = deletable_size_remaining > 0;
if !(by_time || by_size) {
break;
}
delete_through_last = Some(ex.last_offset);
if by_size {
deletable_size_remaining = deletable_size_remaining.saturating_sub(ex.size_bytes);
}
}
delete_through_last.map(|last| last + 1)
}
#[allow(clippy::unused_async)]
pub(crate) async fn local_retention_pass(
tp: &TopicIdPartition,
partition: &Partition,
exports: &[SegmentExport],
log_config: &LogConfig,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
now_ms: i64,
) -> usize {
let effective_local_ms = log_config
.local_retention_ms
.or(log_config.retention_ms)
.map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
let effective_local_bytes = log_config
.local_retention_bytes
.or(log_config.retention_bytes);
let finished_bases: HashSet<i64> = match rlmm.list_remote_log_segments(tp) {
Ok(list) => list
.iter()
.filter(|md| md.state() == RemoteLogSegmentState::CopySegmentFinished)
.map(RemoteLogSegmentMetadata::start_offset)
.collect(),
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to list remote segments for local retention");
return 0;
}
};
let Some(target) = local_retention_target(
exports,
&finished_bases,
effective_local_ms,
effective_local_bytes,
now_ms,
) else {
return 0;
};
let result = {
let mut log = partition.log.lock().expect("log mutex poisoned");
log.delete_local_segments_through(target)
};
match result {
Ok(n) => {
if n > 0 {
debug!(topic = %tp.topic, partition = tp.partition, target, removed = n,
"remote-log-manager: deleted local segments past local-retention floor");
}
n
}
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, target, error = %e,
"remote-log-manager: failed to delete local segments");
0
}
}
}
pub(crate) fn remote_retention_eviction_set(
finished: &[RemoteLogSegmentMetadata],
retention_ms: Option<i64>,
retention_bytes: Option<u64>,
now_ms: i64,
) -> Vec<RemoteLogSegmentMetadata> {
let total: u64 = finished
.iter()
.map(|m| u64::try_from(m.segment_size_in_bytes().max(0)).unwrap_or(0))
.sum();
let mut size_to_reclaim = match retention_bytes {
Some(budget) if total > budget => total - budget,
_ => 0,
};
let mut out = Vec::new();
for md in finished {
let by_time = matches!(
retention_ms,
Some(window) if now_ms.saturating_sub(md.max_timestamp_ms()) > window
);
let by_size = size_to_reclaim > 0;
if !(by_time || by_size) {
break;
}
let bytes = u64::try_from(md.segment_size_in_bytes().max(0)).unwrap_or(0);
if by_size {
size_to_reclaim = size_to_reclaim.saturating_sub(bytes);
}
out.push(md.clone());
}
out
}
pub(crate) async fn remote_retention_pass(
tp: &TopicIdPartition,
broker_id: i32,
log_config: &LogConfig,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
now_ms: i64,
) -> usize {
let retention_ms = log_config
.retention_ms
.map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
let retention_bytes = log_config.retention_bytes;
if retention_ms.is_none() && retention_bytes.is_none() {
return 0;
}
let mut finished: Vec<RemoteLogSegmentMetadata> = match rlmm.list_remote_log_segments(tp) {
Ok(list) => list
.into_iter()
.filter(|md| md.state() == RemoteLogSegmentState::CopySegmentFinished)
.collect(),
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to list remote segments for retention");
return 0;
}
};
finished.sort_by_key(RemoteLogSegmentMetadata::start_offset);
let evict = remote_retention_eviction_set(&finished, retention_ms, retention_bytes, now_ms);
let mut deleted = 0;
for md in evict {
if delete_one_segment(tp, broker_id, &md, rsm, rlmm).await {
deleted += 1;
} else {
break;
}
}
deleted
}
pub(crate) async fn cascade_remote_partition_delete(
tp: TopicIdPartition,
broker_id: i32,
rsm: Arc<dyn RemoteStorageManager>,
rlmm: Arc<dyn RemoteLogMetadataManager>,
) {
if let Err(e) = put_partition_state(
&rlmm,
&tp,
RemotePartitionDeleteState::DeletePartitionMarked,
broker_id,
)
.await
{
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to mark partition deleted");
return;
}
if let Err(e) = put_partition_state(
&rlmm,
&tp,
RemotePartitionDeleteState::DeletePartitionStarted,
broker_id,
)
.await
{
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to start partition delete");
return;
}
let segments = match rlmm.list_remote_log_segments(&tp) {
Ok(list) => list,
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to list segments for partition delete");
return;
}
};
for md in segments {
if md.state() == RemoteLogSegmentState::DeleteSegmentFinished {
continue;
}
let _ = delete_one_segment(&tp, broker_id, &md, &rsm, &rlmm).await;
}
if let Err(e) = put_partition_state(
&rlmm,
&tp,
RemotePartitionDeleteState::DeletePartitionFinished,
broker_id,
)
.await
{
warn!(topic = %tp.topic, partition = tp.partition, error = %e,
"remote-log-manager: failed to finish partition delete");
}
}
async fn rlmm_mutate<F>(
rlmm: &Arc<dyn RemoteLogMetadataManager>,
op: F,
) -> Result<(), crabka_remote_storage::RemoteStorageError>
where
F: FnOnce(
&dyn RemoteLogMetadataManager,
) -> Result<(), crabka_remote_storage::RemoteStorageError>
+ Send
+ 'static,
{
let rlmm = Arc::clone(rlmm);
match tokio::task::spawn_blocking(move || op(rlmm.as_ref())).await {
Ok(res) => res,
Err(e) => Err(crabka_remote_storage::RemoteStorageError::Backend(format!(
"RLMM mutation task panicked: {e}"
))),
}
}
async fn put_partition_state(
rlmm: &Arc<dyn RemoteLogMetadataManager>,
tp: &TopicIdPartition,
state: RemotePartitionDeleteState,
broker_id: i32,
) -> Result<(), crabka_remote_storage::RemoteStorageError> {
let md = RemotePartitionDeleteMetadata {
topic_id_partition: tp.clone(),
state,
event_timestamp_ms: now_ms(),
broker_id,
};
rlmm_mutate(rlmm, move |m| m.put_remote_partition_delete_metadata(md)).await
}
async fn delete_one_segment(
tp: &TopicIdPartition,
broker_id: i32,
md: &RemoteLogSegmentMetadata,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
) -> bool {
let id = md.remote_log_segment_id().clone();
if md.state() == RemoteLogSegmentState::CopySegmentFinished {
let upd = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id.clone(),
event_timestamp_ms: now_ms(),
custom_metadata: None,
state: RemoteLogSegmentState::DeleteSegmentStarted,
broker_id,
};
if let Err(e) = rlmm_mutate(rlmm, move |m| m.update_remote_log_segment_metadata(upd)).await
{
warn!(topic = %tp.topic, partition = tp.partition, base = md.start_offset(),
error = %e,
"remote-log-manager: failed to record DeleteSegmentStarted");
return false;
}
}
let rsm_del = rsm.clone();
let md_del = md.clone();
let delete_result =
tokio::task::spawn_blocking(move || rsm_del.delete_log_segment_data(&md_del)).await;
match delete_result {
Ok(Ok(())) => {}
Ok(Err(e)) => {
warn!(topic = %tp.topic, partition = tp.partition, base = md.start_offset(),
error = %e, "remote-log-manager: RSM delete failed");
return false;
}
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, base = md.start_offset(),
error = %e, "remote-log-manager: RSM delete task panicked");
return false;
}
}
let upd = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id,
event_timestamp_ms: now_ms(),
custom_metadata: None,
state: RemoteLogSegmentState::DeleteSegmentFinished,
broker_id,
};
if let Err(e) = rlmm_mutate(rlmm, move |m| m.update_remote_log_segment_metadata(upd)).await {
warn!(topic = %tp.topic, partition = tp.partition, base = md.start_offset(),
error = %e, "remote-log-manager: failed to record DeleteSegmentFinished");
return false;
}
debug!(topic = %tp.topic, partition = tp.partition, base = md.start_offset(),
"remote-log-manager: deleted remote segment");
true
}
async fn copy_one(
tp: &TopicIdPartition,
broker_id: i32,
leader_epoch: i32,
ex: &SegmentExport,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
) -> bool {
let id = RemoteLogSegmentId::new(tp.clone(), Uuid::new_v4());
let epochs: BTreeMap<i32, i64> = if ex.leader_epochs.is_empty() {
BTreeMap::from([(leader_epoch.max(0), ex.base_offset)])
} else {
ex.leader_epochs.iter().copied().collect()
};
let size = i32::try_from(ex.size_bytes).unwrap_or(i32::MAX);
let metadata = match RemoteLogSegmentMetadata::new(
id.clone(),
ex.base_offset,
ex.last_offset,
ex.max_timestamp,
broker_id,
now_ms(),
size,
RemoteLogSegmentState::CopySegmentStarted,
epochs.clone(),
) {
Ok(m) => m,
Err(e) => {
warn!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
error = %e, "remote-log-manager: skipping segment with invalid metadata");
return false;
}
};
let metadata = if ex.transaction_index_path.is_none() {
metadata.with_txn_index_empty(true)
} else {
metadata
};
let md_started = metadata.clone();
if let Err(e) = rlmm_mutate(rlmm, move |m| m.add_remote_log_segment_metadata(md_started)).await
{
warn!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
error = %e, "remote-log-manager: failed to record CopySegmentStarted");
return false;
}
let data = LogSegmentData {
log_segment: ex.log_path.clone(),
offset_index: ex.offset_index_path.clone(),
time_index: ex.time_index_path.clone(),
transaction_index: ex.transaction_index_path.clone(),
producer_snapshot_index: None,
leader_epoch_index: leader_epoch_index_bytes(&epochs),
};
let rsm_copy = rsm.clone();
let md_copy = metadata.clone();
let copy_result =
tokio::task::spawn_blocking(move || rsm_copy.copy_log_segment_data(&md_copy, &data)).await;
let copy_ok = matches!(copy_result, Ok(Ok(_)));
if copy_ok {
let upd = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id,
event_timestamp_ms: now_ms(),
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id,
};
if let Err(e) = rlmm_mutate(rlmm, move |m| m.update_remote_log_segment_metadata(upd)).await
{
warn!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
error = %e, "remote-log-manager: failed to record CopySegmentFinished");
return false;
}
debug!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
end = ex.last_offset, "remote-log-manager: copied segment to remote tier");
return true;
}
match copy_result {
Ok(Err(e)) => warn!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
error = %e, "remote-log-manager: segment copy failed"),
Err(e) => warn!(topic = %tp.topic, partition = tp.partition, base = ex.base_offset,
error = %e, "remote-log-manager: segment copy task panicked"),
Ok(Ok(_)) => unreachable!("copy_ok handled above"),
}
rollback(&metadata, broker_id, rsm, rlmm).await;
false
}
async fn rollback(
metadata: &RemoteLogSegmentMetadata,
broker_id: i32,
rsm: &Arc<dyn RemoteStorageManager>,
rlmm: &Arc<dyn RemoteLogMetadataManager>,
) {
let id = metadata.remote_log_segment_id().clone();
let rsm_del = rsm.clone();
let md_del = metadata.clone();
let _ = tokio::task::spawn_blocking(move || rsm_del.delete_log_segment_data(&md_del)).await;
for state in [
RemoteLogSegmentState::DeleteSegmentStarted,
RemoteLogSegmentState::DeleteSegmentFinished,
] {
let upd = RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id.clone(),
event_timestamp_ms: now_ms(),
custom_metadata: None,
state,
broker_id,
};
let _ = rlmm_mutate(rlmm, move |m| m.update_remote_log_segment_metadata(upd)).await;
}
}
fn leader_epoch_index_bytes(epochs: &BTreeMap<i32, i64>) -> Bytes {
use std::fmt::Write as _;
let mut s = String::from("0\n");
let _ = writeln!(s, "{}", epochs.len());
for (epoch, start) in epochs {
let _ = writeln!(s, "{epoch} {start}");
}
Bytes::from(s.into_bytes())
}
fn now_ms() -> i64 {
let millis = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis();
i64::try_from(millis).unwrap_or(i64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_log::{Log, LogConfig};
use crabka_protocol::records::{Record, RecordBatch};
use crabka_remote_storage::{
CustomMetadata, IndexType, InmemoryRemoteLogMetadataManager, LocalTieredStorage,
RemoteStorageError,
};
struct AlwaysFailRsm;
impl RemoteStorageManager for AlwaysFailRsm {
fn copy_log_segment_data(
&self,
_metadata: &RemoteLogSegmentMetadata,
_data: &LogSegmentData,
) -> Result<Option<CustomMetadata>, RemoteStorageError> {
Err(RemoteStorageError::InvalidArgument("boom".into()))
}
fn fetch_log_segment(
&self,
metadata: &RemoteLogSegmentMetadata,
_start: u32,
_end: Option<u32>,
) -> Result<Vec<u8>, RemoteStorageError> {
Err(RemoteStorageError::SegmentNotFound(
metadata.remote_log_segment_id().clone(),
))
}
fn fetch_index(
&self,
metadata: &RemoteLogSegmentMetadata,
_index_type: IndexType,
) -> Result<Vec<u8>, RemoteStorageError> {
Err(RemoteStorageError::SegmentNotFound(
metadata.remote_log_segment_id().clone(),
))
}
fn delete_log_segment_data(
&self,
_metadata: &RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
Ok(())
}
}
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
}
fn batch(n: i32) -> RecordBatch {
let mut b = RecordBatch {
last_offset_delta: n - 1,
..RecordBatch::default()
};
for i in 0..n {
b.records.push(Record {
offset_delta: i,
key: Some(Bytes::from(format!("k{i}"))),
value: Some(Bytes::from(vec![b'x'; 64])),
..Default::default()
});
}
b
}
fn rolled_log(dir: &std::path::Path) -> Log {
let mut log = Log::open(
dir,
LogConfig {
segment_bytes: 256, ..LogConfig::default()
},
)
.unwrap();
for _ in 0..12 {
let mut b = batch(2);
log.append(&mut b).unwrap();
}
log
}
#[tokio::test]
async fn copies_all_sealed_segments_and_records_finished() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
assert!(exports.len() >= 2, "test needs multiple sealed segments");
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(copied == exports.len());
let listed = rlmm.list_remote_log_segments(&tp()).unwrap();
assert!(listed.len() == exports.len());
for md in &listed {
assert!(md.state() == RemoteLogSegmentState::CopySegmentFinished);
assert!(!rsm.fetch_log_segment(md, 0, None).unwrap().is_empty());
assert!(!rsm.fetch_index(md, IndexType::Offset).unwrap().is_empty());
assert!(
!rsm.fetch_index(md, IndexType::LeaderEpoch)
.unwrap()
.is_empty()
);
}
}
#[tokio::test]
async fn re_running_is_idempotent() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let first = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(first == exports.len());
let second = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(second == 0);
assert!(rlmm.list_remote_log_segments(&tp()).unwrap().len() == exports.len());
}
#[tokio::test]
async fn empty_exports_copies_nothing() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, Vec::new(), &rsm, &rlmm).await;
assert!(copied == 0);
assert!(rlmm.list_remote_log_segments(&tp()).unwrap().is_empty());
}
#[tokio::test]
async fn copy_failure_rolls_back_and_leaves_no_metadata() {
let log_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
assert!(!exports.is_empty());
let rsm: Arc<dyn RemoteStorageManager> = Arc::new(AlwaysFailRsm);
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(copied == 0, "every copy failed");
assert!(
rlmm.list_remote_log_segments(&tp()).unwrap().is_empty(),
"failed copies must not leave dangling metadata"
);
}
#[tokio::test]
async fn fallback_leader_epoch_when_export_has_none() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let src = tempfile::tempdir().unwrap();
let write = |name: &str, bytes: &[u8]| {
let p = src.path().join(name);
std::fs::write(&p, bytes).unwrap();
p
};
let export = SegmentExport {
base_offset: 0,
last_offset: 9,
max_timestamp: 42,
size_bytes: 10,
log_path: write("00.log", b"0123456789"),
offset_index_path: write("00.index", b"i"),
time_index_path: write("00.timeindex", b"t"),
transaction_index_path: None,
leader_epochs: Vec::new(),
};
let copied = copy_eligible(&tp(), 7, 3, vec![export], &rsm, &rlmm).await;
assert!(copied == 1);
let md = &rlmm.list_remote_log_segments(&tp()).unwrap()[0];
assert!(md.segment_leader_epochs().get(&3) == Some(&0));
}
fn synth_export(base: i64, last: i64, max_ts: i64, size: u64) -> SegmentExport {
SegmentExport {
base_offset: base,
last_offset: last,
max_timestamp: max_ts,
size_bytes: size,
log_path: std::path::PathBuf::new(),
offset_index_path: std::path::PathBuf::new(),
time_index_path: std::path::PathBuf::new(),
transaction_index_path: None,
leader_epochs: Vec::new(),
}
}
#[test]
fn local_retention_target_returns_none_when_no_finished_segments() {
let exports = vec![synth_export(0, 9, 100, 64), synth_export(10, 19, 200, 64)];
let finished: HashSet<i64> = HashSet::new();
assert!(local_retention_target(&exports, &finished, Some(1), None, 10_000) == None);
}
#[test]
fn local_retention_target_time_based_eviction() {
let exports = vec![
synth_export(0, 9, 100, 64),
synth_export(10, 19, 200, 64),
synth_export(20, 29, 5_000, 64),
];
let finished: HashSet<i64> = [0, 10, 20].into_iter().collect();
let target = local_retention_target(&exports, &finished, Some(500), None, 1_000);
assert!(target == Some(20));
}
#[test]
fn local_retention_target_size_based_eviction() {
let exports = vec![
synth_export(0, 9, 100, 100),
synth_export(10, 19, 200, 100),
synth_export(20, 29, 300, 100),
];
let finished: HashSet<i64> = [0, 10, 20].into_iter().collect();
let target = local_retention_target(&exports, &finished, None, Some(150), 1_000);
assert!(target == Some(20));
let target = local_retention_target(&exports, &finished, None, Some(50), 1_000);
assert!(target == Some(30));
let target = local_retention_target(&exports, &finished, None, Some(10_000), 1_000);
assert!(target == None);
}
#[test]
fn local_retention_target_skips_unfinished_segments_and_stops() {
let exports = vec![
synth_export(0, 9, 100, 64),
synth_export(10, 19, 200, 64),
synth_export(20, 29, 300, 64),
];
let finished: HashSet<i64> = [0, 20].into_iter().collect();
let target = local_retention_target(&exports, &finished, Some(1), None, 10_000);
assert!(
target == Some(10),
"only seg0 deletable; walk stops at seg1"
);
}
#[test]
fn local_retention_target_uses_already_resolved_effective_ms() {
let exports = vec![synth_export(0, 9, 100, 64), synth_export(10, 19, 200, 64)];
let finished: HashSet<i64> = [0, 10].into_iter().collect();
let target = local_retention_target(&exports, &finished, Some(250), None, 1_000);
assert!(target == Some(20));
}
fn local_retention_drive(
log: &mut Log,
finished_bases: &HashSet<i64>,
log_config: &LogConfig,
now_ms: i64,
) -> usize {
let effective_local_ms = log_config
.local_retention_ms
.or(log_config.retention_ms)
.map(|d| i64::try_from(d.as_millis()).unwrap_or(i64::MAX));
let effective_local_bytes = log_config
.local_retention_bytes
.or(log_config.retention_bytes);
let exports = log.tierable_segments();
let Some(target) = local_retention_target(
&exports,
finished_bases,
effective_local_ms,
effective_local_bytes,
now_ms,
) else {
return 0;
};
log.delete_local_segments_through(target).unwrap()
}
#[tokio::test]
async fn local_retention_drive_deletes_copied_segments() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let mut log = Log::open(
log_dir.path(),
LogConfig {
segment_bytes: 256,
remote_storage_enable: true,
local_retention_ms: Some(Duration::from_millis(1)),
..LogConfig::default()
},
)
.unwrap();
for _ in 0..12 {
let mut b = batch(2);
log.append(&mut b).unwrap();
}
let exports = log.tierable_segments();
assert!(exports.len() >= 2, "test needs multiple sealed segments");
let log_config = log.config_snapshot();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(copied == exports.len());
let finished_bases: HashSet<i64> = rlmm
.list_remote_log_segments(&tp())
.unwrap()
.iter()
.filter(|md| md.state() == RemoteLogSegmentState::CopySegmentFinished)
.map(RemoteLogSegmentMetadata::start_offset)
.collect();
assert!(finished_bases.len() == exports.len());
let future = now_ms() + 1_000_000;
let removed = local_retention_drive(&mut log, &finished_bases, &log_config, future);
assert!(removed == exports.len());
let last = exports.last().unwrap().last_offset;
assert!(log.local_log_start_offset() == last + 1);
for ex in &exports {
assert!(
!ex.log_path.exists(),
"sealed segment {:?} should be deleted",
ex.log_path
);
}
let removed_again = local_retention_drive(&mut log, &finished_bases, &log_config, future);
assert!(removed_again == 0);
}
fn synth_remote_md(
id: u128,
start: i64,
end: i64,
max_ts: i64,
size: i32,
) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
start,
end,
max_ts,
1,
max_ts,
size,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, start)]),
)
.unwrap()
.with_update(&RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: RemoteLogSegmentId::new(tp(), Uuid::from_u128(id)),
event_timestamp_ms: max_ts,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 1,
})
.unwrap()
}
#[test]
fn remote_retention_eviction_set_returns_empty_when_no_segments() {
let out = remote_retention_eviction_set(&[], Some(1), Some(1), 10_000);
assert!(out.is_empty());
}
#[test]
fn remote_retention_eviction_set_time_based_picks_oldest_until_first_in_window() {
let segs = vec![
synth_remote_md(10, 0, 9, 100, 100),
synth_remote_md(11, 10, 19, 200, 100),
synth_remote_md(12, 20, 29, 9_500, 100),
];
let out = remote_retention_eviction_set(&segs, Some(500), None, 10_000);
assert!(out.len() == 2);
assert!(out[0].start_offset() == 0);
assert!(out[1].start_offset() == 10);
}
#[test]
fn remote_retention_eviction_set_size_based_evicts_oldest_first() {
let segs = vec![
synth_remote_md(10, 0, 9, 100, 100),
synth_remote_md(11, 10, 19, 200, 100),
synth_remote_md(12, 20, 29, 300, 100),
];
let out = remote_retention_eviction_set(&segs, None, Some(150), 1_000);
assert!(out.len() == 2);
let out = remote_retention_eviction_set(&segs, None, Some(50), 1_000);
assert!(out.len() == 3);
let out = remote_retention_eviction_set(&segs, None, Some(10_000), 1_000);
assert!(out.is_empty());
}
#[test]
fn remote_retention_eviction_set_time_and_size_take_union_of_either() {
let segs = vec![
synth_remote_md(10, 0, 9, 100, 100),
synth_remote_md(11, 10, 19, 200, 100),
synth_remote_md(12, 20, 29, 5_000, 100),
];
let out = remote_retention_eviction_set(&segs, Some(500), Some(10_000), 1_000);
assert!(out.len() == 2);
}
#[test]
fn remote_retention_eviction_set_none_settings_disable_axis() {
let segs = vec![synth_remote_md(10, 0, 9, 100, 100)];
assert!(remote_retention_eviction_set(&segs, None, None, 10_000).is_empty());
}
#[test]
fn remote_retention_eviction_set_walk_stops_at_first_non_deletable() {
let segs = vec![
synth_remote_md(10, 0, 9, 100, 100), synth_remote_md(11, 10, 19, 9_500, 100), synth_remote_md(12, 20, 29, 200, 100), ];
let out = remote_retention_eviction_set(&segs, Some(500), None, 10_000);
assert!(out.len() == 1);
assert!(out[0].start_offset() == 0);
}
#[tokio::test]
async fn remote_retention_pass_evicts_old_segments_through_lifecycle() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(copied == exports.len());
let pre = rlmm.list_remote_log_segments(&tp()).unwrap();
assert!(!pre.is_empty());
let cfg = LogConfig {
retention_ms: Some(Duration::from_millis(1)),
..LogConfig::default()
};
let deleted =
remote_retention_pass(&tp(), 1, &cfg, &rsm, &rlmm, now_ms() + 1_000_000).await;
assert!(deleted == exports.len());
let post = rlmm.list_remote_log_segments(&tp()).unwrap();
assert!(
post.is_empty(),
"every segment should be gone, got {} left",
post.len()
);
for md in &pre {
assert!(rsm.fetch_log_segment(md, 0, None).is_err());
}
}
#[tokio::test]
async fn remote_retention_pass_noop_when_nothing_qualifies() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
let cfg = LogConfig {
retention_ms: Some(Duration::from_hours(8760)),
retention_bytes: None,
..LogConfig::default()
};
let deleted = remote_retention_pass(&tp(), 1, &cfg, &rsm, &rlmm, 1).await;
assert!(deleted == 0);
assert!(rlmm.list_remote_log_segments(&tp()).unwrap().len() == exports.len());
}
#[tokio::test]
async fn remote_retention_pass_no_settings_no_op() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let cfg = LogConfig {
retention_ms: None,
retention_bytes: None,
..LogConfig::default()
};
let deleted = remote_retention_pass(&tp(), 1, &cfg, &rsm, &rlmm, now_ms()).await;
assert!(deleted == 0);
}
#[tokio::test]
async fn cascade_remote_partition_delete_drops_every_segment() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let log = rolled_log(log_dir.path());
let exports = log.tierable_segments();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let copied = copy_eligible(&tp(), 1, 0, exports.clone(), &rsm, &rlmm).await;
assert!(copied == exports.len());
cascade_remote_partition_delete(tp(), 1, rsm.clone(), rlmm.clone()).await;
assert!(rlmm.list_remote_log_segments(&tp()).unwrap().is_empty());
let part_dir = remote_dir.path().join(tp().topic_id.to_string()).join("0");
if part_dir.exists() {
let entries: Vec<_> = std::fs::read_dir(&part_dir).unwrap().collect();
assert!(entries.is_empty(), "stray remote files: {entries:?}");
}
}
#[tokio::test]
async fn cascade_remote_partition_delete_is_noop_on_empty_partition() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
cascade_remote_partition_delete(tp(), 1, rsm, rlmm.clone()).await;
assert!(rlmm.list_remote_log_segments(&tp()).unwrap().is_empty());
}
}