use std::sync::Arc;
use crabka_protocol::records::RecordBatch;
use crabka_remote_storage::{
IndexType, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteLogSegmentState,
RemoteStorageError, RemoteStorageManager, TopicIdPartition,
};
use tracing::warn;
use zerocopy::byteorder::{I64, U32};
use zerocopy::{BigEndian, FromBytes, Immutable, KnownLayout, Unaligned};
#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub(crate) struct OffsetIndexEntry {
relative_offset: U32<BigEndian>,
position: U32<BigEndian>,
}
const _: () = assert!(std::mem::size_of::<OffsetIndexEntry>() == 8);
#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub(crate) struct TimeIndexEntry {
timestamp: I64<BigEndian>,
relative_offset: U32<BigEndian>,
}
const _: () = assert!(std::mem::size_of::<TimeIndexEntry>() == 12);
#[derive(Debug, Clone, Copy, FromBytes, KnownLayout, Immutable, Unaligned)]
#[repr(C)]
pub(crate) struct AbortedTxnIndexEntry {
start_offset: I64<BigEndian>,
last_offset: I64<BigEndian>,
producer_id: I64<BigEndian>,
}
const _: () = assert!(std::mem::size_of::<AbortedTxnIndexEntry>() == 24);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct AbortedTxnEntry {
pub(crate) start_offset: i64,
pub(crate) last_offset: i64,
pub(crate) producer_id: i64,
}
pub(crate) struct RemoteReader {
pub(crate) rsm: Arc<dyn RemoteStorageManager>,
pub(crate) rlmm: Arc<dyn RemoteLogMetadataManager>,
}
impl RemoteReader {
pub(crate) fn new(
rsm: Arc<dyn RemoteStorageManager>,
rlmm: Arc<dyn RemoteLogMetadataManager>,
) -> Self {
Self { rsm, rlmm }
}
pub(crate) async fn fetch_batch(
&self,
tp: &TopicIdPartition,
leader_epoch: i32,
offset: i64,
max_bytes: usize,
) -> Result<Option<RecordBatch>, RemoteStorageError> {
let primary = self
.rlmm
.remote_log_segment_metadata(tp, leader_epoch, offset)?;
let metadata = if let Some(m) = primary {
m
} else {
let candidates = self.rlmm.list_remote_log_segments(tp)?;
let covering: Vec<_> = candidates
.into_iter()
.filter(|m| {
m.state() == RemoteLogSegmentState::CopySegmentFinished
&& m.start_offset() <= offset
&& offset <= m.end_offset()
})
.collect();
let Some(m) = covering
.iter()
.filter(|m| m.segment_leader_epochs().contains_key(&leader_epoch))
.max_by_key(|m| m.start_offset())
.or_else(|| {
covering.iter().max_by_key(|m| m.start_offset())
})
.cloned()
else {
return Ok(None);
};
m
};
if metadata.state() != RemoteLogSegmentState::CopySegmentFinished {
return Ok(None);
}
let index_bytes = self
.fetch_index_blocking(metadata.clone(), IndexType::Offset)
.await?;
let entries = parse_offset_index(&index_bytes)?;
let target_rel = u32::try_from((offset - metadata.start_offset()).max(0)).unwrap_or(0);
let start_position = position_for_relative_offset(entries, target_rel);
let segment_size =
u32::try_from(metadata.segment_size_in_bytes().max(0)).unwrap_or(u32::MAX);
let end_position = end_position_for(start_position, segment_size, max_bytes);
let data = self
.fetch_log_blocking(metadata.clone(), start_position, end_position)
.await?;
let batch = first_batch_at_or_after(&data, offset);
Ok(batch)
}
pub(crate) async fn aborted_transactions(
&self,
tp: &TopicIdPartition,
leader_epoch: i32,
from_offset: i64,
to_offset: i64,
) -> Result<Vec<AbortedTxnEntry>, RemoteStorageError> {
let Some(metadata) =
self.rlmm
.remote_log_segment_metadata(tp, leader_epoch, from_offset)?
else {
return Ok(Vec::new());
};
if metadata.state() != RemoteLogSegmentState::CopySegmentFinished {
return Ok(Vec::new());
}
let index_bytes = match self
.fetch_index_blocking(metadata, IndexType::Transaction)
.await
{
Ok(bytes) => bytes,
Err(RemoteStorageError::SegmentNotFound(_)) => return Ok(Vec::new()),
Err(e) => return Err(e),
};
let entries = parse_txn_index(&index_bytes)?;
Ok(entries
.iter()
.filter(|e| txn_overlaps(e, from_offset, to_offset))
.map(|e| AbortedTxnEntry {
start_offset: e.start_offset.get(),
last_offset: e.last_offset.get(),
producer_id: e.producer_id.get(),
})
.collect())
}
pub(crate) fn earliest_offset(
&self,
tp: &TopicIdPartition,
) -> Result<Option<i64>, RemoteStorageError> {
let listed = self.rlmm.list_remote_log_segments(tp)?;
Ok(listed
.into_iter()
.filter(|md| md.state() == RemoteLogSegmentState::CopySegmentFinished)
.map(|md| md.start_offset())
.min())
}
pub(crate) async fn offset_for_timestamp(
&self,
tp: &TopicIdPartition,
target_timestamp: i64,
) -> Result<Option<i64>, RemoteStorageError> {
let mut listed = self.rlmm.list_remote_log_segments(tp)?;
listed.retain(|md| md.state() == RemoteLogSegmentState::CopySegmentFinished);
listed.sort_by_key(RemoteLogSegmentMetadata::start_offset);
let Some(metadata) = listed
.into_iter()
.find(|md| md.max_timestamp_ms() >= target_timestamp)
else {
return Ok(None);
};
let index_bytes = self
.fetch_index_blocking(metadata.clone(), IndexType::Timestamp)
.await?;
let entries = parse_time_index(&index_bytes)?;
let Some(rel) = relative_offset_for_timestamp(entries, target_timestamp) else {
return Ok(Some(metadata.start_offset()));
};
Ok(Some(metadata.start_offset() + i64::from(rel)))
}
async fn fetch_index_blocking(
&self,
metadata: RemoteLogSegmentMetadata,
kind: IndexType,
) -> Result<Vec<u8>, RemoteStorageError> {
let rsm = self.rsm.clone();
match tokio::task::spawn_blocking(move || rsm.fetch_index(&metadata, kind)).await {
Ok(res) => res,
Err(e) => {
warn!(error = %e, "remote-reader: fetch_index task panicked");
Err(RemoteStorageError::Io(std::io::Error::other(
"fetch_index task panicked",
)))
}
}
}
async fn fetch_log_blocking(
&self,
metadata: RemoteLogSegmentMetadata,
start_position: u32,
end_position: Option<u32>,
) -> Result<Vec<u8>, RemoteStorageError> {
let rsm = self.rsm.clone();
match tokio::task::spawn_blocking(move || {
rsm.fetch_log_segment(&metadata, start_position, end_position)
})
.await
{
Ok(res) => res,
Err(e) => {
warn!(error = %e, "remote-reader: fetch_log_segment task panicked");
Err(RemoteStorageError::Io(std::io::Error::other(
"fetch_log_segment task panicked",
)))
}
}
}
}
pub(crate) fn end_position_for(
start_position: u32,
segment_size: u32,
max_bytes: usize,
) -> Option<u32> {
if max_bytes == 0 {
return None;
}
let max_bytes_u32 = u32::try_from(max_bytes).unwrap_or(u32::MAX);
let exclusive_end = start_position.saturating_add(max_bytes_u32);
if exclusive_end >= segment_size {
None
} else {
Some(exclusive_end.saturating_sub(1))
}
}
fn corrupt_index(kind: &str) -> RemoteStorageError {
RemoteStorageError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("corrupt remote {kind} index bytes"),
))
}
pub(crate) fn parse_offset_index(bytes: &[u8]) -> Result<&[OffsetIndexEntry], RemoteStorageError> {
let truncated_len = (bytes.len() / 8) * 8;
<[OffsetIndexEntry]>::ref_from_bytes(&bytes[..truncated_len])
.map_err(|_| corrupt_index("offset"))
}
#[must_use]
pub(crate) fn position_for_relative_offset(entries: &[OffsetIndexEntry], target_rel: u32) -> u32 {
match entries.binary_search_by_key(&target_rel, |e| e.relative_offset.get()) {
Ok(i) => entries[i].position.get(),
Err(0) => 0,
Err(i) => entries[i - 1].position.get(),
}
}
pub(crate) fn parse_time_index(bytes: &[u8]) -> Result<&[TimeIndexEntry], RemoteStorageError> {
let truncated_len = (bytes.len() / 12) * 12;
<[TimeIndexEntry]>::ref_from_bytes(&bytes[..truncated_len]).map_err(|_| corrupt_index("time"))
}
pub(crate) fn parse_txn_index(bytes: &[u8]) -> Result<&[AbortedTxnIndexEntry], RemoteStorageError> {
let truncated_len = (bytes.len() / 24) * 24;
<[AbortedTxnIndexEntry]>::ref_from_bytes(&bytes[..truncated_len])
.map_err(|_| corrupt_index("transaction"))
}
#[must_use]
pub(crate) fn txn_overlaps(entry: &AbortedTxnIndexEntry, from_offset: i64, to_offset: i64) -> bool {
entry.start_offset.get() <= to_offset && entry.last_offset.get() >= from_offset
}
#[must_use]
pub(crate) fn relative_offset_for_timestamp(
entries: &[TimeIndexEntry],
target_ts: i64,
) -> Option<u32> {
entries
.iter()
.find(|e| e.timestamp.get() >= target_ts)
.map(|e| e.relative_offset.get())
}
fn first_batch_at_or_after(data: &[u8], floor: i64) -> Option<RecordBatch> {
let mut cur: &[u8] = data;
while !cur.is_empty() {
let Ok(batch) = RecordBatch::decode(&mut cur) else {
break;
};
let last_offset = batch.base_offset + i64::from(batch.last_offset_delta);
if last_offset >= floor {
return Some(batch);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn parse_offset_index_round_trips_known_entries() {
let mut buf = Vec::new();
for (rel, pos) in [(0_u32, 0_u32), (10, 256), (20, 512)] {
buf.extend_from_slice(&rel.to_be_bytes());
buf.extend_from_slice(&pos.to_be_bytes());
}
let entries = parse_offset_index(&buf).expect("valid offset index");
let decoded: Vec<(u32, u32)> = entries
.iter()
.map(|e| (e.relative_offset.get(), e.position.get()))
.collect();
assert!(decoded == vec![(0, 0), (10, 256), (20, 512)]);
}
fn offset_entries(pairs: &[(u32, u32)]) -> Vec<OffsetIndexEntry> {
pairs
.iter()
.map(|&(rel, pos)| OffsetIndexEntry {
relative_offset: U32::new(rel),
position: U32::new(pos),
})
.collect()
}
fn time_entries(pairs: &[(i64, u32)]) -> Vec<TimeIndexEntry> {
pairs
.iter()
.map(|&(ts, rel)| TimeIndexEntry {
timestamp: I64::new(ts),
relative_offset: U32::new(rel),
})
.collect()
}
#[test]
fn position_for_relative_offset_returns_floor() {
let entries = offset_entries(&[(0, 0), (10, 256), (20, 512), (30, 1024)]);
assert!(position_for_relative_offset(&entries, 10) == 256, "exact");
assert!(position_for_relative_offset(&entries, 15) == 256, "between");
assert!(
position_for_relative_offset(&entries, 0) == 0,
"first entry exact"
);
assert!(
position_for_relative_offset(&entries, 100) == 1024,
"after last"
);
assert!(position_for_relative_offset(&[], 50) == 0, "empty");
}
#[test]
fn position_for_relative_offset_below_first() {
let entries = offset_entries(&[(5, 100), (10, 200)]);
assert!(position_for_relative_offset(&entries, 3) == 0);
}
#[test]
fn parse_time_index_round_trips_known_entries() {
let mut buf = Vec::new();
for (ts, rel) in [(1_000_i64, 0_u32), (2_000, 10), (3_000, 20)] {
buf.extend_from_slice(&ts.to_be_bytes());
buf.extend_from_slice(&rel.to_be_bytes());
}
let entries = parse_time_index(&buf).expect("valid time index");
let decoded: Vec<(i64, u32)> = entries
.iter()
.map(|e| (e.timestamp.get(), e.relative_offset.get()))
.collect();
assert!(decoded == vec![(1_000, 0), (2_000, 10), (3_000, 20)]);
}
#[test]
fn relative_offset_for_timestamp_returns_first_ge() {
let entries = time_entries(&[(1_000, 0), (2_000, 10), (3_000, 20)]);
assert!(
relative_offset_for_timestamp(&entries, 1_000) == Some(0),
"exact match"
);
assert!(
relative_offset_for_timestamp(&entries, 1_500) == Some(10),
"between → next"
);
assert!(
relative_offset_for_timestamp(&entries, 4_000) == None,
"after last"
);
assert!(relative_offset_for_timestamp(&[], 1_000) == None, "empty");
}
#[test]
fn end_position_for_caps_with_max_bytes() {
assert!(end_position_for(0, 1024, 256) == Some(255));
assert!(end_position_for(512, 1024, 999_999) == None);
assert!(end_position_for(0, 1024, 0) == None);
assert!(end_position_for(u32::MAX, 1024, 100) == None);
}
#[test]
fn first_batch_at_or_after_decodes_and_skips() {
use bytes::{Bytes, BytesMut};
use crabka_protocol::records::Record;
let mut a = RecordBatch {
base_offset: 0,
last_offset_delta: 9,
..RecordBatch::default()
};
for i in 0..10 {
a.records.push(Record {
offset_delta: i,
value: Some(Bytes::from(vec![b'a'; 4])),
..Default::default()
});
}
let mut b = RecordBatch {
base_offset: 10,
last_offset_delta: 9,
..RecordBatch::default()
};
for i in 0..10 {
b.records.push(Record {
offset_delta: i,
value: Some(Bytes::from(vec![b'b'; 4])),
..Default::default()
});
}
let mut buf = BytesMut::new();
a.encode(&mut buf).unwrap();
b.encode(&mut buf).unwrap();
let bytes = buf.freeze();
let got = first_batch_at_or_after(&bytes, 10).expect("found batch");
assert!(got.base_offset == 10);
let got = first_batch_at_or_after(&bytes, 0).expect("found batch");
assert!(got.base_offset == 0);
assert!(first_batch_at_or_after(&bytes, 1_000).is_none());
assert!(first_batch_at_or_after(&[], 0).is_none());
}
#[test]
fn parse_txn_index_round_trips_known_entries() {
let mut buf = Vec::new();
for (start, last, pid) in [(0_i64, 4_i64, 1000_i64), (10, 14, 2000)] {
buf.extend_from_slice(&start.to_be_bytes());
buf.extend_from_slice(&last.to_be_bytes());
buf.extend_from_slice(&pid.to_be_bytes());
}
let entries = parse_txn_index(&buf).expect("valid txn index");
assert!(entries.len() == 2);
assert!(entries[0].start_offset.get() == 0);
assert!(entries[0].last_offset.get() == 4);
assert!(entries[0].producer_id.get() == 1000);
assert!(entries[1].start_offset.get() == 10);
assert!(entries[1].last_offset.get() == 14);
assert!(entries[1].producer_id.get() == 2000);
}
#[test]
fn parse_txn_index_truncates_trailing_partial_bytes() {
let mut buf = Vec::new();
for v in [0_i64, 4, 1000] {
buf.extend_from_slice(&v.to_be_bytes());
}
buf.extend_from_slice(&[0xAA; 5]);
let entries = parse_txn_index(&buf).expect("valid txn index");
assert!(entries.len() == 1, "partial trailing entry ignored");
assert!(entries[0].producer_id.get() == 1000);
}
#[test]
fn parse_txn_index_empty_is_empty() {
assert!(parse_txn_index(&[]).expect("empty is valid").is_empty());
}
#[test]
fn txn_overlaps_boundaries() {
let e = AbortedTxnIndexEntry {
start_offset: I64::new(10),
last_offset: I64::new(14),
producer_id: I64::new(1),
};
assert!(!txn_overlaps(&e, 0, 9), "range ends just before entry");
assert!(txn_overlaps(&e, 0, 10), "range ends on entry start");
assert!(txn_overlaps(&e, 11, 13), "range inside entry");
assert!(txn_overlaps(&e, 14, 100), "range starts on entry last");
assert!(!txn_overlaps(&e, 15, 100), "range starts just after entry");
assert!(txn_overlaps(&e, 0, 100), "range covers entry");
}
use crabka_log::{Log, LogConfig};
use crabka_protocol::records::Record;
use crabka_remote_storage::{
InmemoryRemoteLogMetadataManager, LocalTieredStorage, RemoteLogMetadataManager,
RemoteStorageManager,
};
use std::collections::BTreeMap;
use std::fmt::Write as _;
use uuid::Uuid;
fn tp() -> TopicIdPartition {
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0)
}
fn batch_of(n: i32, value_size: usize) -> crabka_protocol::records::RecordBatch {
use bytes::Bytes;
let mut b = crabka_protocol::records::RecordBatch {
last_offset_delta: n - 1,
..crabka_protocol::records::RecordBatch::default()
};
for i in 0..n {
b.records.push(Record {
offset_delta: i,
key: Some(Bytes::from(format!("k{i}"))),
value: Some(Bytes::from(vec![b'x'; value_size])),
..Default::default()
});
}
b
}
fn populated_reader(
log_dir: &std::path::Path,
remote_dir: &std::path::Path,
) -> (RemoteReader, Log) {
let mut log = Log::open(
log_dir,
LogConfig {
segment_bytes: 256,
..LogConfig::default()
},
)
.unwrap();
for _ in 0..12 {
let mut b = batch_of(2, 64);
log.append(&mut b).unwrap();
}
let exports = log.tierable_segments();
assert!(exports.len() >= 2, "test needs multiple sealed segments");
let rsm: Arc<dyn RemoteStorageManager> = Arc::new(LocalTieredStorage::new(remote_dir));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
for ex in &exports {
let id = crabka_remote_storage::RemoteLogSegmentId::new(tp(), Uuid::new_v4());
let epochs: BTreeMap<i32, i64> = if ex.leader_epochs.is_empty() {
BTreeMap::from([(0, ex.base_offset)])
} else {
ex.leader_epochs.iter().copied().collect()
};
let md = RemoteLogSegmentMetadata::new(
id.clone(),
ex.base_offset,
ex.last_offset,
ex.max_timestamp,
1,
ex.max_timestamp,
i32::try_from(ex.size_bytes).unwrap_or(i32::MAX),
RemoteLogSegmentState::CopySegmentStarted,
epochs.clone(),
)
.unwrap();
rlmm.add_remote_log_segment_metadata(md.clone()).unwrap();
let mut s = String::from("0\n");
let _ = writeln!(s, "{}", epochs.len());
for (e, st) in &epochs {
let _ = writeln!(s, "{e} {st}");
}
let data = crabka_remote_storage::LogSegmentData {
log_segment: ex.log_path.clone(),
offset_index: ex.offset_index_path.clone(),
time_index: ex.time_index_path.clone(),
transaction_index: ex.transaction_index_path.clone(),
producer_snapshot_index: None,
leader_epoch_index: bytes::Bytes::from(s.into_bytes()),
};
rsm.copy_log_segment_data(&md, &data).unwrap();
rlmm.update_remote_log_segment_metadata(
crabka_remote_storage::RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id,
event_timestamp_ms: ex.max_timestamp,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 1,
},
)
.unwrap();
}
(RemoteReader::new(rsm, rlmm), log)
}
fn populated_reader_with_abort(
log_dir: &std::path::Path,
remote_dir: &std::path::Path,
) -> (RemoteReader, Log, (i64, i64, i64)) {
let mut log = Log::open(
log_dir,
LogConfig {
segment_bytes: 256,
..LogConfig::default()
},
)
.unwrap();
for _ in 0..12 {
let mut b = batch_of(2, 64);
log.append(&mut b).unwrap();
}
let exports = log.tierable_segments();
assert!(exports.len() >= 2, "test needs multiple sealed segments");
let first = &exports[0];
let abort = (first.base_offset, first.last_offset, 7777_i64);
let mut txn_bytes = Vec::new();
txn_bytes.extend_from_slice(&abort.0.to_be_bytes());
txn_bytes.extend_from_slice(&abort.1.to_be_bytes());
txn_bytes.extend_from_slice(&abort.2.to_be_bytes());
let txn_path = first.log_path.with_extension("txnindex");
std::fs::write(&txn_path, &txn_bytes).unwrap();
let exports = log.tierable_segments();
assert!(
exports[0].transaction_index_path.is_some(),
"first segment must now carry a .txnindex"
);
let rsm: Arc<dyn RemoteStorageManager> = Arc::new(LocalTieredStorage::new(remote_dir));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
for ex in &exports {
let id = crabka_remote_storage::RemoteLogSegmentId::new(tp(), Uuid::new_v4());
let epochs: BTreeMap<i32, i64> = if ex.leader_epochs.is_empty() {
BTreeMap::from([(0, ex.base_offset)])
} else {
ex.leader_epochs.iter().copied().collect()
};
let md = RemoteLogSegmentMetadata::new(
id.clone(),
ex.base_offset,
ex.last_offset,
ex.max_timestamp,
1,
ex.max_timestamp,
i32::try_from(ex.size_bytes).unwrap_or(i32::MAX),
RemoteLogSegmentState::CopySegmentStarted,
epochs.clone(),
)
.unwrap();
rlmm.add_remote_log_segment_metadata(md.clone()).unwrap();
let mut s = String::from("0\n");
let _ = writeln!(s, "{}", epochs.len());
for (e, st) in &epochs {
let _ = writeln!(s, "{e} {st}");
}
let data = crabka_remote_storage::LogSegmentData {
log_segment: ex.log_path.clone(),
offset_index: ex.offset_index_path.clone(),
time_index: ex.time_index_path.clone(),
transaction_index: ex.transaction_index_path.clone(),
producer_snapshot_index: None,
leader_epoch_index: bytes::Bytes::from(s.into_bytes()),
};
rsm.copy_log_segment_data(&md, &data).unwrap();
rlmm.update_remote_log_segment_metadata(
crabka_remote_storage::RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id,
event_timestamp_ms: ex.max_timestamp,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 1,
},
)
.unwrap();
}
(RemoteReader::new(rsm, rlmm), log, abort)
}
#[tokio::test]
async fn aborted_transactions_returns_copied_abort() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, _log, abort) = populated_reader_with_abort(log_dir.path(), remote_dir.path());
let (start, last, pid) = abort;
let got = reader
.aborted_transactions(&tp(), 0, start, last)
.await
.expect("ok");
assert!(got.len() == 1, "the copied abort is returned");
assert!(got[0].start_offset == start);
assert!(got[0].last_offset == last);
assert!(got[0].producer_id == pid);
}
#[tokio::test]
async fn aborted_transactions_empty_when_segment_has_no_txnindex() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, log) = populated_reader(log_dir.path(), remote_dir.path());
let exports = log.tierable_segments();
let seg = &exports[0];
let got = reader
.aborted_transactions(&tp(), 0, seg.base_offset, seg.last_offset)
.await
.expect("ok");
assert!(
got.is_empty(),
"segment with no .txnindex yields an empty list, not an error"
);
}
#[tokio::test]
async fn fetch_batch_finds_segment_and_returns_first_batch() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, log) = populated_reader(log_dir.path(), remote_dir.path());
let exports = log.tierable_segments();
let target_offset = exports[1].base_offset;
let got = reader
.fetch_batch(&tp(), 0, target_offset, 4096)
.await
.expect("ok")
.expect("found a batch");
let last = got.base_offset + i64::from(got.last_offset_delta);
assert!(
got.base_offset <= target_offset && last >= target_offset,
"batch [{},{}] doesn't cover target {target_offset}",
got.base_offset,
last
);
}
#[tokio::test]
async fn fetch_batch_returns_none_when_segment_not_in_rlmm() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let reader = RemoteReader::new(rsm, rlmm);
let got = reader.fetch_batch(&tp(), 0, 0, 4096).await.unwrap();
assert!(got.is_none());
}
#[tokio::test]
async fn aborted_transactions_empty_when_no_segment() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let reader = RemoteReader::new(rsm, rlmm);
let got = reader
.aborted_transactions(&tp(), 0, 0, 100)
.await
.expect("ok");
assert!(got.is_empty());
}
#[tokio::test]
async fn fetch_batch_returns_none_for_in_progress_segment() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let id = crabka_remote_storage::RemoteLogSegmentId::new(tp(), Uuid::new_v4());
let md = RemoteLogSegmentMetadata::new(
id,
0,
99,
100,
1,
100,
1024,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0_i32, 0_i64)]),
)
.unwrap();
rlmm.add_remote_log_segment_metadata(md).unwrap();
let reader = RemoteReader::new(rsm, rlmm);
let got = reader.fetch_batch(&tp(), 0, 50, 4096).await.unwrap();
assert!(
got.is_none(),
"started (not finished) segment must be invisible"
);
}
#[tokio::test]
async fn earliest_offset_returns_lowest_finished_start() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, log) = populated_reader(log_dir.path(), remote_dir.path());
let exports = log.tierable_segments();
let expected = exports.iter().map(|e| e.base_offset).min().unwrap();
let got = reader.earliest_offset(&tp()).unwrap();
assert!(got == Some(expected));
}
#[tokio::test]
async fn earliest_offset_returns_none_when_no_finished_segments() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> =
Arc::new(InmemoryRemoteLogMetadataManager::new());
let reader = RemoteReader::new(rsm, rlmm);
assert!(reader.earliest_offset(&tp()).unwrap() == None);
}
#[tokio::test]
async fn offset_for_timestamp_locates_remote_segment() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, log) = populated_reader(log_dir.path(), remote_dir.path());
let exports = log.tierable_segments();
let target_ts = 0_i64;
let got = reader
.offset_for_timestamp(&tp(), target_ts)
.await
.unwrap()
.expect("first segment matches ts=0");
let expected = exports.iter().map(|e| e.base_offset).min().unwrap();
assert!(got == expected);
}
#[tokio::test]
async fn offset_for_timestamp_returns_none_when_past_last() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, _log) = populated_reader(log_dir.path(), remote_dir.path());
let got = reader.offset_for_timestamp(&tp(), 1).await.unwrap();
assert!(got == None);
}
struct NotReadyRlmm;
impl RemoteLogMetadataManager for NotReadyRlmm {
fn add_remote_log_segment_metadata(
&self,
_m: RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
Ok(())
}
fn update_remote_log_segment_metadata(
&self,
_u: crabka_remote_storage::RemoteLogSegmentMetadataUpdate,
) -> Result<(), RemoteStorageError> {
Ok(())
}
fn remote_log_segment_metadata(
&self,
_tp: &TopicIdPartition,
_epoch: i32,
_offset: i64,
) -> Result<Option<RemoteLogSegmentMetadata>, RemoteStorageError> {
Err(RemoteStorageError::NotReady { partition: 3 })
}
fn highest_offset_for_epoch(
&self,
_tp: &TopicIdPartition,
_epoch: i32,
) -> Result<Option<i64>, RemoteStorageError> {
Ok(None)
}
fn list_remote_log_segments(
&self,
_tp: &TopicIdPartition,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
Err(RemoteStorageError::NotReady { partition: 3 })
}
fn list_remote_log_segments_by_epoch(
&self,
_tp: &TopicIdPartition,
_epoch: i32,
) -> Result<Vec<RemoteLogSegmentMetadata>, RemoteStorageError> {
Ok(Vec::new())
}
fn put_remote_partition_delete_metadata(
&self,
_m: crabka_remote_storage::RemotePartitionDeleteMetadata,
) -> Result<(), RemoteStorageError> {
Ok(())
}
}
#[tokio::test]
async fn fetch_batch_propagates_not_ready() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> = Arc::new(NotReadyRlmm);
let reader = RemoteReader::new(rsm, rlmm);
let err = reader.fetch_batch(&tp(), 0, 0, 4096).await.unwrap_err();
assert!(matches!(err, RemoteStorageError::NotReady { partition: 3 }));
}
#[tokio::test]
async fn earliest_offset_propagates_not_ready() {
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> = Arc::new(NotReadyRlmm);
let reader = RemoteReader::new(rsm, rlmm);
let err = reader.earliest_offset(&tp()).unwrap_err();
assert!(matches!(err, RemoteStorageError::NotReady { .. }));
}
async fn assign_and_wait_ready(
m: &Arc<crabka_remote_storage_topic::TopicBasedRemoteLogMetadataManager>,
mp: i32,
tp: &TopicIdPartition,
) {
m.reconcile_assignment(&[mp]).await;
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
match m.list_remote_log_segments(tp) {
Ok(_) => return,
Err(RemoteStorageError::NotReady { .. }) => {
assert!(
std::time::Instant::now() < deadline,
"list path never became ready"
);
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
Err(e) => panic!("unexpected list error: {e:?}"),
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn list_path_observes_not_ready_and_unassigned_from_real_manager() {
use crabka_remote_storage_topic::{
InProcessMetadataEventLog, MetadataEventLog, TopicBasedRemoteLogMetadataManager,
metadata_partition_for,
};
let topic_id = Uuid::from_u128(0xABCD);
let owned = TopicIdPartition::new(topic_id, "orders", 0);
let not_owned = TopicIdPartition::new(topic_id, "orders", 1);
let n = 16;
let mp_owned = metadata_partition_for(&owned, n);
let mp_other = metadata_partition_for(¬_owned, n);
assert!(mp_owned != mp_other, "test needs distinct metadata buckets");
let log: Arc<dyn MetadataEventLog> = InProcessMetadataEventLog::new(n);
let writer_snap_dir = tempfile::tempdir().unwrap();
let mgr_snap_dir = tempfile::tempdir().unwrap();
{
let writer = TopicBasedRemoteLogMetadataManager::start(
log.clone(),
tokio::runtime::Handle::current(),
writer_snap_dir.path().to_path_buf(),
std::time::Duration::from_hours(1),
)
.await
.unwrap();
writer
.reconcile_assignment(&(0..n).collect::<Vec<_>>())
.await;
let id = crabka_remote_storage::RemoteLogSegmentId::new(owned.clone(), Uuid::new_v4());
let md = RemoteLogSegmentMetadata::new(
id.clone(),
0,
99,
100,
1,
100,
2048,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, 0)]),
)
.unwrap();
let w2 = writer.clone();
let md2 = md.clone();
tokio::task::spawn_blocking(move || {
w2.add_remote_log_segment_metadata(md2).unwrap();
})
.await
.unwrap();
let w2 = writer.clone();
tokio::task::spawn_blocking(move || {
w2.update_remote_log_segment_metadata(
crabka_remote_storage::RemoteLogSegmentMetadataUpdate {
remote_log_segment_id: id,
event_timestamp_ms: 100,
custom_metadata: None,
state: RemoteLogSegmentState::CopySegmentFinished,
broker_id: 1,
},
)
.unwrap();
})
.await
.unwrap();
writer.shutdown();
}
let m = TopicBasedRemoteLogMetadataManager::start(
log.clone(),
tokio::runtime::Handle::current(),
mgr_snap_dir.path().to_path_buf(),
std::time::Duration::from_hours(1),
)
.await
.unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let rsm: Arc<dyn RemoteStorageManager> =
Arc::new(LocalTieredStorage::new(remote_dir.path()));
let rlmm: Arc<dyn RemoteLogMetadataManager> = m.clone();
let reader = RemoteReader::new(rsm, rlmm);
assert!(
reader.earliest_offset(¬_owned).unwrap() == None,
"unassigned partition is an empty list-path result, not NotReady"
);
assign_and_wait_ready(&m, mp_owned, &owned).await;
assert!(
reader.earliest_offset(&owned).unwrap() == Some(0),
"owned + caught up → real earliest from the remote tier"
);
m.reconcile_assignment(&[]).await;
assert!(
reader.earliest_offset(&owned).unwrap() == None,
"removed partition's list path returns empty, not stale segments"
);
m.shutdown();
}
#[tokio::test]
async fn fallback_resolves_segment_across_leader_epoch_change() {
let log_dir = tempfile::tempdir().unwrap();
let remote_dir = tempfile::tempdir().unwrap();
let (reader, log) = populated_reader(log_dir.path(), remote_dir.path());
let exports = log.tierable_segments();
let target_offset = exports[0].base_offset;
let got = reader
.fetch_batch(&tp(), 1, target_offset, 4096)
.await
.expect("ok")
.expect("defensive fallback must resolve the segment despite epoch mismatch");
let last = got.base_offset + i64::from(got.last_offset_delta);
assert!(
got.base_offset <= target_offset && last >= target_offset,
"batch [{},{}] doesn't cover target {target_offset}",
got.base_offset,
last,
);
}
}