use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use bytes::Bytes;
use object_store::path::Path as ObjectPath;
use object_store::{GetOptions, GetRange, ObjectStore, ObjectStoreExt, PutPayload, WriteMultipart};
use crate::error::RemoteStorageError;
use crate::metadata::{CustomMetadata, RemoteLogSegmentMetadata};
use crate::storage_manager::{IndexType, LogSegmentData, RemoteStorageManager};
pub const DEFAULT_MULTIPART_THRESHOLD: u64 = 100 * 1024 * 1024;
pub const DEFAULT_MULTIPART_CHUNK_SIZE: usize = 16 * 1024 * 1024;
pub struct S3RemoteStorage {
store: Arc<dyn ObjectStore>,
prefix: Option<String>,
multipart_threshold: u64,
multipart_chunk_size: usize,
}
impl std::fmt::Debug for S3RemoteStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("S3RemoteStorage")
.field("prefix", &self.prefix)
.finish_non_exhaustive()
}
}
#[derive(Clone)]
pub struct S3Config {
pub bucket: String,
pub prefix: Option<String>,
pub region: String,
pub endpoint: Option<String>,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub allow_http: bool,
pub multipart_threshold: u64,
pub multipart_chunk_size: usize,
}
impl std::fmt::Debug for S3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let redact = |opt: &Option<String>| opt.as_ref().map(|_| "***");
f.debug_struct("S3Config")
.field("bucket", &self.bucket)
.field("prefix", &self.prefix)
.field("region", &self.region)
.field("endpoint", &self.endpoint)
.field("access_key_id", &redact(&self.access_key_id))
.field("secret_access_key", &redact(&self.secret_access_key))
.field("allow_http", &self.allow_http)
.field("multipart_threshold", &self.multipart_threshold)
.field("multipart_chunk_size", &self.multipart_chunk_size)
.finish()
}
}
impl Default for S3Config {
fn default() -> Self {
Self {
bucket: String::new(),
prefix: None,
region: String::new(),
endpoint: None,
access_key_id: None,
secret_access_key: None,
allow_http: false,
multipart_threshold: DEFAULT_MULTIPART_THRESHOLD,
multipart_chunk_size: DEFAULT_MULTIPART_CHUNK_SIZE,
}
}
}
impl S3RemoteStorage {
#[must_use]
pub fn with_store(store: Arc<dyn ObjectStore>, prefix: Option<String>) -> Self {
Self {
store,
prefix,
multipart_threshold: DEFAULT_MULTIPART_THRESHOLD,
multipart_chunk_size: DEFAULT_MULTIPART_CHUNK_SIZE,
}
}
#[must_use]
pub fn with_multipart_tuning(mut self, threshold: u64, chunk_size: usize) -> Self {
self.multipart_threshold = threshold;
self.multipart_chunk_size = chunk_size;
self
}
pub fn from_s3_config(cfg: &S3Config) -> Result<Self, RemoteStorageError> {
let mut builder = object_store::aws::AmazonS3Builder::new()
.with_bucket_name(&cfg.bucket)
.with_region(&cfg.region)
.with_allow_http(cfg.allow_http);
if let Some(endpoint) = &cfg.endpoint {
builder = builder.with_endpoint(endpoint);
}
if let (Some(k), Some(s)) = (&cfg.access_key_id, &cfg.secret_access_key) {
builder = builder.with_access_key_id(k).with_secret_access_key(s);
}
let store = builder
.build()
.map_err(|e| RemoteStorageError::InvalidArgument(format!("S3 builder: {e}")))?;
Ok(Self {
store: Arc::new(store),
prefix: cfg.prefix.clone(),
multipart_threshold: cfg.multipart_threshold,
multipart_chunk_size: cfg.multipart_chunk_size,
})
}
fn segment_key(&self, metadata: &RemoteLogSegmentMetadata, suffix: &str) -> ObjectPath {
use std::fmt::Write;
let id = metadata.remote_log_segment_id();
let tp = &id.topic_id_partition;
let mut key = String::new();
if let Some(p) = &self.prefix {
key.push_str(p);
key.push('/');
}
let _ = write!(key, "{}_{}/{}/{}", tp.topic_id, tp.partition, id.id, suffix);
ObjectPath::from(key)
}
fn log_key(&self, metadata: &RemoteLogSegmentMetadata) -> ObjectPath {
self.segment_key(metadata, "log")
}
fn index_key(&self, metadata: &RemoteLogSegmentMetadata, index_type: IndexType) -> ObjectPath {
self.segment_key(metadata, index_filename(index_type))
}
fn block<T, F>(fut: F) -> Result<T, RemoteStorageError>
where
F: std::future::Future<Output = Result<T, object_store::Error>>,
{
let handle = tokio::runtime::Handle::try_current().map_err(|_| {
RemoteStorageError::Backend(
"S3RemoteStorage requires an active Tokio runtime; call from spawn_blocking".into(),
)
})?;
let result = tokio::task::block_in_place(|| handle.block_on(fut));
result.map_err(map_object_store_error)
}
fn put_path(&self, key: &ObjectPath, path: &Path) -> Result<(), RemoteStorageError> {
let len = std::fs::metadata(path)?.len();
if len < self.multipart_threshold {
let bytes = std::fs::read(path)?;
Self::block(self.store.put(key, PutPayload::from(bytes)))?;
return Ok(());
}
self.put_path_multipart(key, path)
}
fn put_path_multipart(&self, key: &ObjectPath, path: &Path) -> Result<(), RemoteStorageError> {
let file = std::fs::File::open(path)?;
let store = self.store.clone();
let key = key.clone();
let chunk_size = self.multipart_chunk_size;
Self::block(async move {
let upload = store.put_multipart(&key).await?;
let mut writer = WriteMultipart::new_with_chunk_size(upload, chunk_size);
let mut file = file;
let mut buf = vec![0u8; chunk_size];
loop {
let n =
Read::read(&mut file, &mut buf).map_err(|e| object_store::Error::Generic {
store: "S3RemoteStorage",
source: Box::new(e),
})?;
if n == 0 {
break;
}
writer.write(&buf[..n]);
}
writer.finish().await.map(|_| ())
})
}
fn put_bytes(&self, key: &ObjectPath, bytes: Bytes) -> Result<(), RemoteStorageError> {
Self::block(self.store.put(key, PutPayload::from_bytes(bytes)))?;
Ok(())
}
}
fn index_filename(index_type: IndexType) -> &'static str {
match index_type {
IndexType::Offset => "offset_index",
IndexType::Timestamp => "time_index",
IndexType::ProducerSnapshot => "producer_snapshot",
IndexType::LeaderEpoch => "leader_epoch",
IndexType::Transaction => "txn_index",
}
}
fn map_object_store_error(e: object_store::Error) -> RemoteStorageError {
match e {
object_store::Error::NotFound { .. } => {
RemoteStorageError::Backend(format!("not found: {e}"))
}
other => RemoteStorageError::Backend(other.to_string()),
}
}
impl RemoteStorageManager for S3RemoteStorage {
fn copy_log_segment_data(
&self,
metadata: &RemoteLogSegmentMetadata,
data: &LogSegmentData,
) -> Result<Option<CustomMetadata>, RemoteStorageError> {
self.put_path(&self.log_key(metadata), &data.log_segment)?;
self.put_path(
&self.index_key(metadata, IndexType::Offset),
&data.offset_index,
)?;
self.put_path(
&self.index_key(metadata, IndexType::Timestamp),
&data.time_index,
)?;
if let Some(snap) = &data.producer_snapshot_index {
self.put_path(&self.index_key(metadata, IndexType::ProducerSnapshot), snap)?;
}
self.put_bytes(
&self.index_key(metadata, IndexType::LeaderEpoch),
data.leader_epoch_index.clone(),
)?;
if let Some(txn) = &data.transaction_index {
self.put_path(&self.index_key(metadata, IndexType::Transaction), txn)?;
}
Ok(None)
}
fn fetch_log_segment(
&self,
metadata: &RemoteLogSegmentMetadata,
start_position: u32,
end_position: Option<u32>,
) -> Result<Vec<u8>, RemoteStorageError> {
let key = self.log_key(metadata);
let opts = GetOptions {
range: Some(match end_position {
Some(end) => {
if end < start_position {
return Err(RemoteStorageError::InvalidArgument(format!(
"end_position {end} < start_position {start_position}"
)));
}
GetRange::Bounded(u64::from(start_position)..u64::from(end).saturating_add(1))
}
None => GetRange::Offset(u64::from(start_position)),
}),
..Default::default()
};
let result = Self::block(self.store.get_opts(&key, opts));
match result {
Ok(get) => {
let bytes = Self::block(get.bytes())?;
Ok(bytes.to_vec())
}
Err(RemoteStorageError::Backend(ref msg)) if msg.starts_with("not found:") => Err(
RemoteStorageError::SegmentNotFound(metadata.remote_log_segment_id().clone()),
),
Err(other) => Err(other),
}
}
fn fetch_index(
&self,
metadata: &RemoteLogSegmentMetadata,
index_type: IndexType,
) -> Result<Vec<u8>, RemoteStorageError> {
let key = self.index_key(metadata, index_type);
let result = Self::block(self.store.get(&key));
match result {
Ok(get) => {
let bytes = Self::block(get.bytes())?;
Ok(bytes.to_vec())
}
Err(RemoteStorageError::Backend(ref msg)) if msg.starts_with("not found:") => Err(
RemoteStorageError::SegmentNotFound(metadata.remote_log_segment_id().clone()),
),
Err(other) => Err(other),
}
}
fn delete_log_segment_data(
&self,
metadata: &RemoteLogSegmentMetadata,
) -> Result<(), RemoteStorageError> {
for key in [
self.log_key(metadata),
self.index_key(metadata, IndexType::Offset),
self.index_key(metadata, IndexType::Timestamp),
self.index_key(metadata, IndexType::ProducerSnapshot),
self.index_key(metadata, IndexType::LeaderEpoch),
self.index_key(metadata, IndexType::Transaction),
] {
match Self::block(self.store.delete(&key)) {
Ok(()) => {}
Err(RemoteStorageError::Backend(msg)) if msg.starts_with("not found:") => {}
Err(e) => return Err(e),
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::collections::BTreeMap;
use std::io::Write;
use std::path::PathBuf;
use object_store::memory::InMemory;
use tempfile::TempDir;
use uuid::Uuid;
use crate::metadata::{
RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState, TopicIdPartition,
};
fn rsm(prefix: Option<&str>) -> S3RemoteStorage {
S3RemoteStorage::with_store(Arc::new(InMemory::new()), prefix.map(str::to_string))
}
#[test]
fn s3_config_debug_redacts_credentials() {
let cfg = S3Config {
bucket: "logs".to_string(),
region: "us-east-1".to_string(),
access_key_id: Some("AKIAEXAMPLEKEYID".to_string()),
secret_access_key: Some("super-secret-key-value".to_string()),
..Default::default()
};
let dbg = format!("{cfg:?}");
assert!(!dbg.contains("super-secret-key-value"));
assert!(!dbg.contains("AKIAEXAMPLEKEYID"));
assert!(dbg.contains("***"));
assert!(dbg.contains("logs"));
assert!(dbg.contains("us-east-1"));
}
fn sample_metadata(id: u128) -> RemoteLogSegmentMetadata {
RemoteLogSegmentMetadata::new(
RemoteLogSegmentId::new(
TopicIdPartition::new(Uuid::from_u128(1), "orders", 0),
Uuid::from_u128(id),
),
0,
99,
123,
1,
456,
8,
RemoteLogSegmentState::CopySegmentStarted,
BTreeMap::from([(0, 0)]),
)
.unwrap()
}
fn write_file(dir: &std::path::Path, name: &str, contents: &[u8]) -> PathBuf {
let p = dir.join(name);
std::fs::File::create(&p)
.unwrap()
.write_all(contents)
.unwrap();
p
}
fn sample_data(src: &std::path::Path, with_txn: bool) -> LogSegmentData {
LogSegmentData {
log_segment: write_file(src, "00.log", b"0123456789"),
offset_index: write_file(src, "00.index", b"OFFSET-IDX"),
time_index: write_file(src, "00.timeindex", b"TIME-IDX"),
transaction_index: with_txn.then(|| write_file(src, "00.txnindex", b"TXN-IDX")),
producer_snapshot_index: Some(write_file(src, "00.snapshot", b"SNAP")),
leader_epoch_index: Bytes::from_static(b"EPOCH-BYTES"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn copy_then_fetch_full_segment() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let md = sample_metadata(10);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap();
assert!(store.fetch_log_segment(&md, 0, None).unwrap() == b"0123456789");
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fetch_partial_byte_ranges() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let md = sample_metadata(10);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&md, &sample_data(src.path(), false))
.unwrap();
assert!(store.fetch_log_segment(&md, 2, Some(5)).unwrap() == b"2345");
assert!(store.fetch_log_segment(&md, 7, None).unwrap() == b"789");
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fetch_each_index_type() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let md = sample_metadata(11);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap();
assert!(store.fetch_index(&md, IndexType::Offset).unwrap() == b"OFFSET-IDX");
assert!(store.fetch_index(&md, IndexType::Timestamp).unwrap() == b"TIME-IDX");
assert!(store.fetch_index(&md, IndexType::ProducerSnapshot).unwrap() == b"SNAP");
assert!(store.fetch_index(&md, IndexType::LeaderEpoch).unwrap() == b"EPOCH-BYTES");
assert!(store.fetch_index(&md, IndexType::Transaction).unwrap() == b"TXN-IDX");
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fetch_before_copy_is_not_found() {
let store = rsm(None);
let md = sample_metadata(404);
let err = tokio::task::spawn_blocking(move || store.fetch_log_segment(&md, 0, None))
.await
.unwrap()
.unwrap_err();
assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn missing_optional_txn_index_is_not_found() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let md = sample_metadata(12);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&md, &sample_data(src.path(), false))
.unwrap();
let err = store.fetch_index(&md, IndexType::Transaction).unwrap_err();
assert!(matches!(err, RemoteStorageError::SegmentNotFound(_)));
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn delete_is_idempotent() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let md = sample_metadata(13);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&md, &sample_data(src.path(), true))
.unwrap();
store.delete_log_segment_data(&md).unwrap();
store.delete_log_segment_data(&md).unwrap();
assert!(matches!(
store.fetch_log_segment(&md, 0, None).unwrap_err(),
RemoteStorageError::SegmentNotFound(_)
));
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn segments_are_isolated_by_id() {
let store = rsm(None);
let src = TempDir::new().unwrap();
let a = sample_metadata(20);
let b = sample_metadata(21);
tokio::task::spawn_blocking(move || {
store
.copy_log_segment_data(&a, &sample_data(src.path(), false))
.unwrap();
store
.copy_log_segment_data(&b, &sample_data(src.path(), false))
.unwrap();
store.delete_log_segment_data(&a).unwrap();
assert!(store.fetch_log_segment(&b, 0, None).unwrap() == b"0123456789");
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn prefix_isolates_clusters() {
let store_a =
S3RemoteStorage::with_store(Arc::new(InMemory::new()), Some("cluster-a".to_string()));
let _ = store_a;
let md = sample_metadata(30);
let store = S3RemoteStorage::with_store(Arc::new(InMemory::new()), Some("c".to_string()));
let key = store.log_key(&md);
assert!(
key.as_ref().starts_with("c/"),
"expected prefix to be applied, got {key:?}",
);
}
fn write_log_segment(dir: &std::path::Path, len: usize) -> PathBuf {
let p = dir.join("00.log");
let mut f = std::fs::File::create(&p).unwrap();
let bytes: Vec<u8> = (0..len).map(|i| u8::try_from(i % 251).unwrap()).collect();
f.write_all(&bytes).unwrap();
p
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_path_uses_multipart_above_threshold_and_round_trips() {
const SEG_LEN: usize = 100 * 1024;
const CHUNK: usize = 4 * 1024;
let store = S3RemoteStorage::with_store(Arc::new(InMemory::new()), None)
.with_multipart_tuning(8 * 1024, CHUNK);
let src = TempDir::new().unwrap();
let md = sample_metadata(40);
let log_path = write_log_segment(src.path(), SEG_LEN);
let data = LogSegmentData {
log_segment: log_path,
offset_index: write_file(src.path(), "00.index", b"OFFSET-IDX"),
time_index: write_file(src.path(), "00.timeindex", b"TIME-IDX"),
transaction_index: None,
producer_snapshot_index: Some(write_file(src.path(), "00.snapshot", b"SNAP")),
leader_epoch_index: Bytes::from_static(b"EPOCH-BYTES"),
};
tokio::task::spawn_blocking(move || {
store.copy_log_segment_data(&md, &data).unwrap();
let fetched = store.fetch_log_segment(&md, 0, None).unwrap();
assert!(fetched.len() == SEG_LEN);
for (i, b) in fetched.iter().enumerate() {
assert!(*b == u8::try_from(i % 251).unwrap(), "byte mismatch at {i}");
}
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multipart_flushes_partial_tail_chunk() {
const CHUNK: usize = 4 * 1024;
const SEG_LEN: usize = 3 * CHUNK + 137; let store = S3RemoteStorage::with_store(Arc::new(InMemory::new()), None)
.with_multipart_tuning(1024, CHUNK);
let src = TempDir::new().unwrap();
let md = sample_metadata(41);
let log_path = write_log_segment(src.path(), SEG_LEN);
let data = LogSegmentData {
log_segment: log_path,
offset_index: write_file(src.path(), "00.index", b"OFFSET-IDX"),
time_index: write_file(src.path(), "00.timeindex", b"TIME-IDX"),
transaction_index: None,
producer_snapshot_index: None,
leader_epoch_index: Bytes::from_static(b"EPOCH-BYTES"),
};
tokio::task::spawn_blocking(move || {
store.copy_log_segment_data(&md, &data).unwrap();
let fetched = store.fetch_log_segment(&md, 0, None).unwrap();
assert!(fetched.len() == SEG_LEN);
assert!(
fetched.last().copied() == Some(u8::try_from((SEG_LEN - 1) % 251).unwrap()),
"tail byte was dropped"
);
})
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn put_path_stays_on_single_put_below_threshold() {
let store = S3RemoteStorage::with_store(Arc::new(InMemory::new()), None)
.with_multipart_tuning(1024 * 1024, 4 * 1024);
let src = TempDir::new().unwrap();
let md = sample_metadata(42);
let log_path = write_log_segment(src.path(), 10); let data = LogSegmentData {
log_segment: log_path,
offset_index: write_file(src.path(), "00.index", b"OFFSET-IDX"),
time_index: write_file(src.path(), "00.timeindex", b"TIME-IDX"),
transaction_index: None,
producer_snapshot_index: None,
leader_epoch_index: Bytes::from_static(b"EPOCH-BYTES"),
};
tokio::task::spawn_blocking(move || {
store.copy_log_segment_data(&md, &data).unwrap();
let fetched = store.fetch_log_segment(&md, 0, None).unwrap();
assert!(fetched.len() == 10);
})
.await
.unwrap();
}
}