use std::fs;
use std::path::{Path, PathBuf};
use crate::error::RemoteStorageError;
use crate::metadata::{CustomMetadata, RemoteLogSegmentMetadata};
use crate::storage_manager::{IndexType, LogSegmentData, RemoteStorageManager};
#[derive(Debug, Clone)]
pub struct LocalTieredStorage {
root: PathBuf,
}
impl LocalTieredStorage {
#[must_use]
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
fn segment_dir(&self, metadata: &RemoteLogSegmentMetadata) -> PathBuf {
let id = metadata.remote_log_segment_id();
let tp = &id.topic_id_partition;
self.root
.join(format!("{}_{}", tp.topic_id, tp.partition))
.join(id.id.to_string())
}
fn log_path(dir: &Path) -> PathBuf {
dir.join("log")
}
fn index_path(dir: &Path, index_type: IndexType) -> PathBuf {
let name = match index_type {
IndexType::Offset => "offset_index",
IndexType::Timestamp => "time_index",
IndexType::ProducerSnapshot => "producer_snapshot",
IndexType::LeaderEpoch => "leader_epoch",
IndexType::Transaction => "txn_index",
};
dir.join(name)
}
}
impl RemoteStorageManager for LocalTieredStorage {
fn copy_log_segment_data(
&self,
metadata: &RemoteLogSegmentMetadata,
data: &LogSegmentData,
) -> Result<Option<CustomMetadata>, RemoteStorageError> {
let dir = self.segment_dir(metadata);
fs::create_dir_all(&dir)?;
fs::copy(&data.log_segment, Self::log_path(&dir))?;
fs::copy(
&data.offset_index,
Self::index_path(&dir, IndexType::Offset),
)?;
fs::copy(
&data.time_index,
Self::index_path(&dir, IndexType::Timestamp),
)?;
if let Some(snapshot) = &data.producer_snapshot_index {
fs::copy(
snapshot,
Self::index_path(&dir, IndexType::ProducerSnapshot),
)?;
}
fs::write(
Self::index_path(&dir, IndexType::LeaderEpoch),
&data.leader_epoch_index,
)?;
if let Some(txn) = &data.transaction_index {
fs::copy(txn, Self::index_path(&dir, IndexType::Transaction))?;
}
Ok(None)
}
fn fetch_log_segment(
&self,
metadata: &RemoteLogSegmentMetadata,
start_position: u32,
end_position: Option<u32>,
) -> Result<Vec<u8>, RemoteStorageError> {
let dir = self.segment_dir(metadata);
let path = Self::log_path(&dir);
if !path.exists() {
return Err(RemoteStorageError::SegmentNotFound(
metadata.remote_log_segment_id().clone(),
));
}
let bytes = fs::read(&path)?;
let len = bytes.len();
let start = usize::try_from(start_position).expect("u32 fits usize");
if start > len {
return Err(RemoteStorageError::InvalidArgument(format!(
"start_position {start} exceeds segment length {len}"
)));
}
let end_exclusive = match end_position {
Some(end) => {
let end = usize::try_from(end).expect("u32 fits usize");
if end < start {
return Err(RemoteStorageError::InvalidArgument(format!(
"end_position {end} < start_position {start}"
)));
}
end.saturating_add(1).min(len)
}
None => len,
};
Ok(bytes[start..end_exclusive].to_vec())
}
fn fetch_index(
&self,
metadata: &RemoteLogSegmentMetadata,
index_type: IndexType,
) -> Result<Vec<u8>, RemoteStorageError> {
let dir = self.segment_dir(metadata);
let path = Self::index_path(&dir, index_type);
if !path.exists() {
return Err(RemoteStorageError::SegmentNotFound(
metadata.remote_log_segment_id().clone(),
));
}
Ok(fs::read(&path)?)
}
fn delete_log_segment_data(
&self,
metadata: &RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
let dir = self.segment_dir(metadata);
match fs::remove_dir_all(&dir) {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(RemoteStorageError::Io(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::BTreeMap;
use std::io::Write;
use bytes::Bytes;
use uuid::Uuid;
use crate::metadata::{RemoteLogSegmentId, RemoteLogSegmentState, TopicIdPartition};
fn metadata(id: u128) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0),
Uuid::from_u128(id),
),
0,
99,
123,
1,
456,
8,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, 0)]),
)
.unwrap()
}
fn write_file(dir: &Path, name: &str, contents: &[u8]) -> PathBuf {
let p = dir.join(name);
let mut f = fs::File::create(&p).unwrap();
f.write_all(contents).unwrap();
p
}
fn sample_data(src: &Path, with_txn: bool) -> LogSegmentData {
LogSegmentData {
log_segment: write_file(src, "00.log", b"0123456789"),
offset_index: write_file(src, "00.index", b"OFFSET-IDX"),
time_index: write_file(src, "00.timeindex", b"TIME-IDX"),
transaction_index: with_txn.then(|| write_file(src, "00.txnindex", b"TXN-IDX")),
producer_snapshot_index: Some(write_file(src, "00.snapshot", b"SNAP")),
leader_epoch_index: Bytes::from_static(b"EPOCH-BYTES"),
}
}
#[test]
fn copy_then_fetch_full_segment() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(10);
assert!(
rsm.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap()
.is_none()
);
let full = rsm.fetch_log_segment(&md, 0, None).unwrap();
assert!(full == b"0123456789");
}
#[test]
fn fetch_partial_byte_ranges() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(10);
rsm.copy_log_segment_data(&md, &sample_data(src.path(), false))
.unwrap();
assert!(rsm.fetch_log_segment(&md, 2, Some(5)).unwrap() == b"2345");
assert!(rsm.fetch_log_segment(&md, 7, None).unwrap() == b"789");
assert!(rsm.fetch_log_segment(&md, 8, Some(99)).unwrap() == b"89");
assert!(rsm.fetch_log_segment(&md, 10, None).unwrap() == b"");
}
#[test]
fn fetch_each_index_type() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(10);
rsm.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap();
assert!(rsm.fetch_index(&md, IndexType::Offset).unwrap() == b"OFFSET-IDX");
assert!(rsm.fetch_index(&md, IndexType::Timestamp).unwrap() == b"TIME-IDX");
assert!(rsm.fetch_index(&md, IndexType::ProducerSnapshot).unwrap() == b"SNAP");
assert!(rsm.fetch_index(&md, IndexType::LeaderEpoch).unwrap() == b"EPOCH-BYTES");
assert!(rsm.fetch_index(&md, IndexType::Transaction).unwrap() == b"TXN-IDX");
}
#[test]
fn missing_optional_txn_index_is_not_found() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(10);
rsm.copy_log_segment_data(&md, &sample_data(src.path(), false))
.unwrap();
let err = rsm.fetch_index(&md, IndexType::Transaction).unwrap_err();
assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
}
#[test]
fn fetch_before_copy_is_not_found() {
let remote = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(404);
let err = rsm.fetch_log_segment(&md, 0, None).unwrap_err();
assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
}
#[test]
fn delete_is_idempotent_and_removes_data() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let md = metadata(10);
rsm.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap();
rsm.delete_log_segment_data(&md).unwrap();
rsm.delete_log_segment_data(&md).unwrap();
assert!(matches!(
rsm.fetch_log_segment(&md, 0, None).unwrap_err(),
RemoteStorageError::SegmentNotFound(_)
));
}
#[test]
fn segments_are_isolated_by_id() {
let remote = tempfile::tempdir().unwrap();
let src = tempfile::tempdir().unwrap();
let rsm = LocalTieredStorage::new(remote.path());
let a = metadata(10);
let b = metadata(11);
rsm.copy_log_segment_data(&a, &sample_data(src.path(), false))
.unwrap();
rsm.copy_log_segment_data(&b, &sample_data(src.path(), false))
.unwrap();
rsm.delete_log_segment_data(&a).unwrap();
assert!(rsm.fetch_log_segment(&b, 0, None).unwrap() == b"0123456789");
}
}