use crate::client::credentials::CredentialsReceiver;
use crate::error::{Error, Result};
use crate::io::{FileIO, Storage};
use crate::metadata::TableBucket;
use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
use futures::TryStreamExt;
use parking_lot::Mutex;
use std::{
cmp::{Ordering, Reverse},
collections::{BinaryHeap, HashMap},
future::Future,
io, mem,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
time::Duration,
};
#[cfg(test)]
use std::{
env,
time::{SystemTime, UNIX_EPOCH},
};
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio::task::JoinSet;
pub const DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM: usize = 4;
pub const DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM: usize = 3;
const RETRY_BACKOFF_BASE_MS: u64 = 100;
const RETRY_BACKOFF_MAX_MS: u64 = 5_000;
const MAX_RETRY_COUNT: u32 = 10;
fn calculate_backoff_delay(retry_count: u32) -> tokio::time::Duration {
use rand::Rng;
let exponential_ms = RETRY_BACKOFF_BASE_MS.saturating_mul(1 << retry_count.min(10));
let capped_ms = exponential_ms.min(RETRY_BACKOFF_MAX_MS);
let mut rng = rand::rng();
let jitter = rng.random_range(0.75..=1.25);
let final_ms = ((capped_ms as f64) * jitter) as u64;
tokio::time::Duration::from_millis(final_ms)
}
#[derive(Debug)]
pub struct FetchResult {
pub file_path: PathBuf,
pub file_size: usize,
}
pub trait RemoteLogFetcher: Send + Sync {
fn fetch(
&self,
request: &RemoteLogDownloadRequest,
) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>>;
}
#[derive(Debug, Clone)]
pub struct RemoteLogSegment {
pub segment_id: String,
pub start_offset: i64,
#[allow(dead_code)]
pub end_offset: i64,
#[allow(dead_code)]
pub size_in_bytes: i32,
pub table_bucket: TableBucket,
pub max_timestamp: i64,
}
impl RemoteLogSegment {
pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self {
Self {
segment_id: segment.remote_log_segment_id.clone(),
start_offset: segment.remote_log_start_offset,
end_offset: segment.remote_log_end_offset,
size_in_bytes: segment.segment_size_in_bytes,
table_bucket,
max_timestamp: segment.max_timestamp.unwrap_or(-1),
}
}
pub fn local_file_name(&self) -> String {
let offset_prefix = format!("{:020}", self.start_offset);
format!("{}_{}.log", self.segment_id, offset_prefix)
}
}
#[derive(Debug, Clone)]
pub struct RemoteLogFetchInfo {
pub remote_log_tablet_dir: String,
#[allow(dead_code)]
pub partition_name: Option<String>,
pub remote_log_segments: Vec<RemoteLogSegment>,
pub first_start_pos: i32,
}
impl RemoteLogFetchInfo {
pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Self {
let segments = info
.remote_log_segments
.iter()
.map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone()))
.collect();
Self {
remote_log_tablet_dir: info.remote_log_tablet_dir.clone(),
partition_name: info.partition_name.clone(),
remote_log_segments: segments,
first_start_pos: info.first_start_pos.unwrap_or(0),
}
}
}
#[derive(Debug)]
pub struct PrefetchPermit {
permit: Option<OwnedSemaphorePermit>,
recycle_notify: Arc<Notify>,
}
impl PrefetchPermit {
fn new(permit: OwnedSemaphorePermit, recycle_notify: Arc<Notify>) -> Self {
Self {
permit: Some(permit),
recycle_notify,
}
}
}
impl Drop for PrefetchPermit {
fn drop(&mut self) {
let _ = self.permit.take();
self.recycle_notify.notify_one();
}
}
#[derive(Debug)]
pub struct RemoteLogFile {
pub file_path: PathBuf,
#[allow(dead_code)]
pub file_size: usize,
pub permit: PrefetchPermit,
}
#[derive(Debug)]
pub struct RemoteLogDownloadRequest {
segment: RemoteLogSegment,
remote_log_tablet_dir: String,
result_sender: oneshot::Sender<Result<RemoteLogFile>>,
retry_count: u32,
next_retry_at: Option<tokio::time::Instant>,
}
impl RemoteLogDownloadRequest {
#[cfg(test)]
pub fn segment(&self) -> &RemoteLogSegment {
&self.segment
}
}
impl Ord for RemoteLogDownloadRequest {
fn cmp(&self, other: &Self) -> Ordering {
if self.segment.table_bucket == other.segment.table_bucket {
self.segment
.start_offset
.cmp(&other.segment.start_offset)
.then_with(|| self.segment.segment_id.cmp(&other.segment.segment_id))
} else {
self.segment
.max_timestamp
.cmp(&other.segment.max_timestamp)
.then_with(|| {
self.segment
.table_bucket
.table_id()
.cmp(&other.segment.table_bucket.table_id())
})
.then_with(|| {
self.segment
.table_bucket
.partition_id()
.cmp(&other.segment.table_bucket.partition_id())
})
.then_with(|| {
self.segment
.table_bucket
.bucket_id()
.cmp(&other.segment.table_bucket.bucket_id())
})
.then_with(|| self.segment.segment_id.cmp(&other.segment.segment_id))
}
}
}
impl PartialOrd for RemoteLogDownloadRequest {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for RemoteLogDownloadRequest {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for RemoteLogDownloadRequest {}
enum DownloadResult {
Success {
result: RemoteLogFile,
result_sender: oneshot::Sender<Result<RemoteLogFile>>,
},
FailedRetry { request: RemoteLogDownloadRequest },
FailedPermanently {
error: Error,
result_sender: oneshot::Sender<Result<RemoteLogFile>>,
},
Cancelled,
}
struct ProductionFetcher {
credentials_rx: CredentialsReceiver,
local_log_dir: Arc<TempDir>,
remote_log_read_concurrency: usize,
}
impl RemoteLogFetcher for ProductionFetcher {
fn fetch(
&self,
request: &RemoteLogDownloadRequest,
) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
let mut credentials_rx = self.credentials_rx.clone();
let local_log_dir = self.local_log_dir.clone();
let remote_log_read_concurrency = self.remote_log_read_concurrency;
let segment = request.segment.clone();
let remote_log_tablet_dir = request.remote_log_tablet_dir.to_string();
Box::pin(async move {
let local_file_name = segment.local_file_name();
let local_file_path = local_log_dir.path().join(&local_file_name);
let offset_prefix = format!("{:020}", segment.start_offset);
let remote_path = format!(
"{}/{}/{}.log",
remote_log_tablet_dir, segment.segment_id, offset_prefix
);
let remote_fs_props = {
let maybe_props = credentials_rx.borrow().clone();
match maybe_props {
Some(props) => props,
None => {
log::info!("Waiting for credentials to be available...");
if let Err(e) = credentials_rx.changed().await {
let io_err = io::Error::new(
io::ErrorKind::BrokenPipe,
format!(
"credentials manager shut down before credentials were obtained: {e}"
),
);
return Err(io_err.into());
}
credentials_rx
.borrow()
.clone()
.ok_or_else(|| Error::UnexpectedError {
message: "credentials not available after watch notification"
.to_string(),
source: None,
})?
}
}
};
let file_path = RemoteLogDownloader::download_file(
&remote_log_tablet_dir,
&remote_path,
&local_file_path,
&remote_fs_props,
remote_log_read_concurrency,
)
.await?;
let metadata = tokio::fs::metadata(&file_path).await?;
let file_size = metadata.len() as usize;
Ok(FetchResult {
file_path,
file_size,
})
})
}
}
struct DownloadCoordinator {
download_queue: BinaryHeap<Reverse<RemoteLogDownloadRequest>>,
active_downloads: JoinSet<DownloadResult>,
in_flight: usize,
prefetch_semaphore: Arc<Semaphore>,
max_concurrent_downloads: usize,
recycle_notify: Arc<Notify>,
fetcher: Arc<dyn RemoteLogFetcher>,
}
impl DownloadCoordinator {
fn should_wait_for_recycle(&self) -> bool {
!self.download_queue.is_empty()
&& self.in_flight < self.max_concurrent_downloads
&& self.prefetch_semaphore.available_permits() == 0
}
fn next_retry_deadline(&self) -> Option<tokio::time::Instant> {
self.download_queue
.iter()
.filter_map(|Reverse(req)| req.next_retry_at)
.min()
}
}
impl DownloadCoordinator {
fn drain(&mut self) {
let mut deferred = Vec::new();
let max_scan = self.download_queue.len().min(100);
let mut scanned = 0;
while !self.download_queue.is_empty()
&& self.in_flight < self.max_concurrent_downloads
&& scanned < max_scan
{
let permit = match self.prefetch_semaphore.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => break, };
let Some(Reverse(request)) = self.download_queue.pop() else {
drop(permit);
break;
};
scanned += 1;
if let Some(next_retry_at) = request.next_retry_at {
let now = tokio::time::Instant::now();
if next_retry_at > now {
drop(permit);
deferred.push(request);
continue; }
}
if request.result_sender.is_closed() {
drop(permit);
continue; }
let fetcher = self.fetcher.clone();
let recycle_notify = self.recycle_notify.clone();
self.active_downloads.spawn(async move {
spawn_download_task(request, permit, fetcher, recycle_notify).await
});
self.in_flight += 1;
}
if !deferred.is_empty() {
for req in deferred {
self.download_queue.push(Reverse(req));
}
}
}
}
async fn spawn_download_task(
request: RemoteLogDownloadRequest,
permit: tokio::sync::OwnedSemaphorePermit,
fetcher: Arc<dyn RemoteLogFetcher>,
recycle_notify: Arc<Notify>,
) -> DownloadResult {
if request.result_sender.is_closed() {
drop(permit);
return DownloadResult::Cancelled;
}
let download_result = fetcher.fetch(&request).await;
match download_result {
Ok(fetch_result) => {
DownloadResult::Success {
result: RemoteLogFile {
file_path: fetch_result.file_path,
file_size: fetch_result.file_size,
permit: PrefetchPermit::new(permit, recycle_notify.clone()),
},
result_sender: request.result_sender,
}
}
Err(_e) if request.result_sender.is_closed() => {
drop(permit);
DownloadResult::Cancelled
}
Err(e) => {
let retry_count = request.retry_count + 1;
if retry_count > MAX_RETRY_COUNT {
log::error!(
"Failed to download remote log segment {} after {} retries: {}. Giving up.",
request.segment.segment_id,
retry_count,
e
);
drop(permit);
DownloadResult::FailedPermanently {
error: Error::UnexpectedError {
message: format!(
"Failed to download remote log segment after {retry_count} retries: {e}"
),
source: Some(Box::new(e)),
},
result_sender: request.result_sender,
}
} else {
let backoff_delay = calculate_backoff_delay(retry_count);
let next_retry_at = tokio::time::Instant::now() + backoff_delay;
log::warn!(
"Failed to download remote log segment {}: {}. Retry {}/{} after {:?}",
request.segment.segment_id,
e,
retry_count,
MAX_RETRY_COUNT,
backoff_delay
);
drop(permit);
let mut retry_request = request;
retry_request.retry_count = retry_count;
retry_request.next_retry_at = Some(next_retry_at);
DownloadResult::FailedRetry {
request: retry_request,
}
}
}
}
}
async fn coordinator_loop(
mut coordinator: DownloadCoordinator,
mut request_receiver: mpsc::UnboundedReceiver<RemoteLogDownloadRequest>,
) {
loop {
coordinator.drain();
let next_retry_sleep = coordinator.next_retry_deadline().map(|deadline| {
let now = tokio::time::Instant::now();
if deadline > now {
deadline - now
} else {
tokio::time::Duration::from_millis(0) }
});
tokio::select! {
Some(request) = request_receiver.recv() => {
coordinator.download_queue.push(Reverse(request));
continue;
}
Some(result) = coordinator.active_downloads.join_next() => {
coordinator.in_flight -= 1;
match result {
Ok(DownloadResult::Success { result, result_sender }) => {
if !result_sender.is_closed() {
let _ = result_sender.send(Ok(result));
}
}
Ok(DownloadResult::FailedRetry { request }) => {
coordinator.download_queue.push(Reverse(request));
}
Ok(DownloadResult::FailedPermanently { error, result_sender }) => {
if !result_sender.is_closed() {
let _ = result_sender.send(Err(error));
}
}
Ok(DownloadResult::Cancelled) => {
}
Err(e) => {
log::error!("Download task panicked: {e:?}");
}
}
continue;
}
_ = coordinator.recycle_notify.notified(),
if coordinator.should_wait_for_recycle() => {
continue;
}
_ = tokio::time::sleep(next_retry_sleep.unwrap_or(tokio::time::Duration::from_secs(3600))),
if next_retry_sleep.is_some() => {
continue;
}
else => break, }
}
}
type CompletionCallback = Box<dyn Fn() + Send + Sync>;
pub struct RemoteLogDownloadFuture {
result: Arc<Mutex<Option<Result<RemoteLogFile>>>>,
completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>>,
}
impl RemoteLogDownloadFuture {
pub fn new(receiver: oneshot::Receiver<Result<RemoteLogFile>>) -> Self {
let result = Arc::new(Mutex::new(None));
let result_clone = Arc::clone(&result);
let completion_callbacks: Arc<Mutex<Vec<CompletionCallback>>> =
Arc::new(Mutex::new(Vec::new()));
let callbacks_clone = Arc::clone(&completion_callbacks);
tokio::spawn(async move {
let download_result = match receiver.await {
Ok(Ok(path)) => Ok(path),
Ok(Err(e)) => Err(e),
Err(e) => Err(Error::UnexpectedError {
message: format!("Download & Read future cancelled: {e:?}"),
source: None,
}),
};
*result_clone.lock() = Some(download_result);
let callbacks: Vec<CompletionCallback> = {
let mut callbacks_guard = callbacks_clone.lock();
mem::take(&mut *callbacks_guard)
};
for callback in callbacks {
callback();
}
});
Self {
result,
completion_callbacks,
}
}
pub fn on_complete<F>(&self, callback: F)
where
F: Fn() + Send + Sync + 'static,
{
let mut callbacks_guard = self.completion_callbacks.lock();
let is_done = self.result.lock().is_some();
if is_done {
drop(callbacks_guard);
callback();
} else {
callbacks_guard.push(Box::new(callback));
}
}
pub fn is_done(&self) -> bool {
self.result.lock().is_some()
}
pub fn take_remote_log_file(&self) -> Result<RemoteLogFile> {
let mut guard = self.result.lock();
match guard.take() {
Some(Ok(remote_log_file)) => Ok(remote_log_file),
Some(Err(e)) => {
let error_msg = format!("{e}");
Err(Error::IoUnexpectedError {
message: format!("Fail to get remote log file: {error_msg}"),
source: io::Error::other(error_msg),
})
}
None => Err(Error::IoUnexpectedError {
message: "Remote log file already taken or not ready".to_string(),
source: io::Error::other("Remote log file already taken or not ready"),
}),
}
}
}
pub struct RemoteLogDownloader {
request_sender: Option<mpsc::UnboundedSender<RemoteLogDownloadRequest>>,
}
impl RemoteLogDownloader {
pub fn new(
local_log_dir: TempDir,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
remote_log_read_concurrency: usize,
credentials_rx: CredentialsReceiver,
) -> Result<Self> {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
remote_log_read_concurrency: remote_log_read_concurrency.max(1),
});
Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)
}
pub fn new_with_fetcher(
fetcher: Arc<dyn RemoteLogFetcher>,
max_prefetch_segments: usize,
max_concurrent_downloads: usize,
) -> Result<Self> {
let (request_sender, request_receiver) = mpsc::unbounded_channel();
let coordinator = DownloadCoordinator {
download_queue: BinaryHeap::new(),
active_downloads: JoinSet::new(),
in_flight: 0,
prefetch_semaphore: Arc::new(Semaphore::new(max_prefetch_segments)),
max_concurrent_downloads,
recycle_notify: Arc::new(Notify::new()),
fetcher,
};
tokio::spawn(coordinator_loop(coordinator, request_receiver));
Ok(Self {
request_sender: Some(request_sender),
})
}
pub fn request_remote_log(
&self,
remote_log_tablet_dir: &str,
segment: &RemoteLogSegment,
) -> RemoteLogDownloadFuture {
let (result_sender, result_receiver) = oneshot::channel();
let request = RemoteLogDownloadRequest {
segment: segment.clone(),
remote_log_tablet_dir: remote_log_tablet_dir.to_string(),
result_sender,
retry_count: 0,
next_retry_at: None,
};
if let Some(ref sender) = self.request_sender {
if sender.send(request).is_err() {
let (error_sender, error_receiver) = oneshot::channel();
let _ = error_sender.send(Err(Error::UnexpectedError {
message: "RemoteLogDownloader coordinator has shut down".to_string(),
source: None,
}));
return RemoteLogDownloadFuture::new(error_receiver);
}
}
RemoteLogDownloadFuture::new(result_receiver)
}
}
impl Drop for RemoteLogDownloader {
fn drop(&mut self) {
drop(self.request_sender.take());
}
}
impl RemoteLogDownloader {
async fn download_file(
remote_log_tablet_dir: &str,
remote_path: &str,
local_path: &Path,
remote_fs_props: &HashMap<String, String>,
remote_log_read_concurrency: usize,
) -> Result<PathBuf> {
let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") {
remote_log_tablet_dir.to_string()
} else {
format!("file://{remote_log_tablet_dir}")
};
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
|| remote_log_tablet_dir.starts_with("s3a://")
|| remote_log_tablet_dir.starts_with("oss://")
{
file_io_builder.with_props(
remote_fs_props
.iter()
.map(|(k, v)| (k.as_str(), v.as_str())),
)
} else {
file_io_builder
};
let storage = Storage::build(file_io_builder)?;
let (op, relative_path) = storage.create(remote_path)?;
const REMOTE_OP_TIMEOUT: Duration = Duration::from_secs(30);
const CHUNK_SIZE: usize = 8 * 1024 * 1024;
Self::download_file_streaming(
&op,
relative_path,
remote_path,
local_path,
CHUNK_SIZE,
remote_log_read_concurrency,
REMOTE_OP_TIMEOUT,
)
.await?;
Ok(local_path.to_path_buf())
}
async fn download_file_streaming(
op: &opendal::Operator,
relative_path: &str,
remote_path: &str,
local_path: &Path,
chunk_size: usize,
streaming_read_concurrency: usize,
remote_op_timeout: Duration,
) -> Result<()> {
let mut local_file = tokio::fs::File::create(local_path).await?;
let reader_future = op
.reader_with(relative_path)
.chunk(chunk_size)
.concurrent(streaming_read_concurrency);
let reader = tokio::time::timeout(remote_op_timeout, reader_future)
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!("Timeout creating streaming reader for {remote_path}: {e}."),
source: io::ErrorKind::TimedOut.into(),
})??;
let mut stream = tokio::time::timeout(remote_op_timeout, reader.into_bytes_stream(..))
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!("Timeout creating streaming bytes stream for {remote_path}: {e}."),
source: io::ErrorKind::TimedOut.into(),
})??;
let mut chunk_count = 0u64;
while let Some(chunk) = tokio::time::timeout(remote_op_timeout, stream.try_next())
.await
.map_err(|e| Error::IoUnexpectedError {
message: format!(
"Timeout streaming chunk from remote storage: {remote_path}, exception: {e}."
),
source: io::ErrorKind::TimedOut.into(),
})??
{
chunk_count += 1;
if chunk_count <= 3 || chunk_count % 10 == 0 {
log::debug!("Remote log streaming download: chunk #{chunk_count} ({remote_path})");
}
local_file.write_all(&chunk).await?;
}
local_file.sync_all().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
fn create_table_bucket(table_id: i64, bucket_id: i32) -> TableBucket {
TableBucket::new(table_id, bucket_id)
}
struct FakeFetcher {
completion_gate: Arc<Notify>,
in_flight: Arc<AtomicUsize>,
max_seen_in_flight: Arc<AtomicUsize>,
fail_count: Arc<Mutex<usize>>,
auto_complete: bool,
}
impl FakeFetcher {
fn new(fail_count: usize, auto_complete: bool) -> Self {
Self {
completion_gate: Arc::new(Notify::new()),
in_flight: Arc::new(AtomicUsize::new(0)),
max_seen_in_flight: Arc::new(AtomicUsize::new(0)),
fail_count: Arc::new(Mutex::new(fail_count)),
auto_complete,
}
}
fn max_seen_in_flight(&self) -> usize {
self.max_seen_in_flight.load(Ordering::SeqCst)
}
fn in_flight(&self) -> usize {
self.in_flight.load(Ordering::SeqCst)
}
fn release_one(&self) {
self.completion_gate.notify_one();
}
fn release_all(&self) {
self.completion_gate.notify_waiters();
}
}
impl RemoteLogFetcher for FakeFetcher {
fn fetch(
&self,
request: &RemoteLogDownloadRequest,
) -> Pin<Box<dyn Future<Output = Result<FetchResult>> + Send>> {
let gate = self.completion_gate.clone();
let in_flight = self.in_flight.clone();
let max_seen = self.max_seen_in_flight.clone();
let fail_count = self.fail_count.clone();
let segment_id = request.segment().segment_id.clone();
let auto_complete = self.auto_complete;
Box::pin(async move {
let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
max_seen.fetch_max(current, Ordering::SeqCst);
if !auto_complete {
gate.notified().await;
} else {
tokio::task::yield_now().await;
}
let should_fail = {
let mut count = fail_count.lock();
if *count > 0 {
*count -= 1;
true
} else {
false
}
};
in_flight.fetch_sub(1, Ordering::SeqCst);
if should_fail {
Err(Error::UnexpectedError {
message: format!("Fake fetch failed for {segment_id}"),
source: None,
})
} else {
let fake_data = vec![1, 2, 3, 4];
let temp_dir = env::temp_dir();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let file_path =
temp_dir.join(format!("fake_segment_{segment_id}_{timestamp}.log"));
tokio::fs::write(&file_path, &fake_data).await?;
Ok(FetchResult {
file_path,
file_size: fake_data.len(),
})
}
})
}
}
fn create_segment(
segment_id: &str,
start_offset: i64,
max_timestamp: i64,
table_bucket: TableBucket,
) -> RemoteLogSegment {
RemoteLogSegment {
segment_id: segment_id.to_string(),
start_offset,
end_offset: start_offset + 1000,
size_in_bytes: 1024,
table_bucket,
max_timestamp,
}
}
fn create_request(segment: RemoteLogSegment) -> RemoteLogDownloadRequest {
let (result_sender, _) = oneshot::channel();
RemoteLogDownloadRequest {
remote_log_tablet_dir: "test_dir".to_string(),
segment,
result_sender,
retry_count: 0,
next_retry_at: None,
}
}
#[test]
fn test_priority_ordering_matching_java_test_case() {
let bucket1 = create_table_bucket(1, 0);
let bucket2 = create_table_bucket(1, 1);
let bucket3 = create_table_bucket(1, 2);
let bucket4 = create_table_bucket(1, 3);
let seg_negative = create_segment("seg_neg", 0, -1, bucket1.clone());
let seg_zero = create_segment("seg_zero", 0, 0, bucket2.clone());
let seg_1000 = create_segment("seg_1000", 0, 1000, bucket3.clone());
let seg_2000 = create_segment("seg_2000", 0, 2000, bucket4.clone());
let seg_same_bucket_100 = create_segment("seg_sb_100", 100, 5000, bucket1.clone());
let seg_same_bucket_50 = create_segment("seg_sb_50", 50, 5000, bucket1.clone());
let mut heap = BinaryHeap::new();
heap.push(Reverse(create_request(seg_2000)));
heap.push(Reverse(create_request(seg_same_bucket_100)));
heap.push(Reverse(create_request(seg_1000)));
heap.push(Reverse(create_request(seg_zero)));
heap.push(Reverse(create_request(seg_negative)));
heap.push(Reverse(create_request(seg_same_bucket_50)));
let first = heap.pop().unwrap().0;
assert_eq!(first.segment.max_timestamp, -1, "Lowest timestamp first");
let second = heap.pop().unwrap().0;
assert_eq!(second.segment.max_timestamp, 0);
let third = heap.pop().unwrap().0;
assert_eq!(third.segment.max_timestamp, 1000);
let fourth = heap.pop().unwrap().0;
assert_eq!(fourth.segment.max_timestamp, 2000);
let fifth = heap.pop().unwrap().0;
assert_eq!(fifth.segment.max_timestamp, 5000);
assert_eq!(
fifth.segment.start_offset, 50,
"Lower offset first within bucket"
);
let sixth = heap.pop().unwrap().0;
assert_eq!(sixth.segment.max_timestamp, 5000);
assert_eq!(sixth.segment.start_offset, 100);
}
#[tokio::test]
async fn test_concurrency_and_priority() {
let fake_fetcher = Arc::new(FakeFetcher::new(0, false));
let downloader = RemoteLogDownloader::new_with_fetcher(
fake_fetcher.clone(),
10, 2, )
.unwrap();
let bucket = create_table_bucket(1, 0);
let segs: Vec<_> = (0..4)
.map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone()))
.collect();
let _futures: Vec<_> = segs
.iter()
.map(|seg| downloader.request_remote_log("dir", seg))
.collect();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
fake_fetcher.in_flight(),
2,
"Concurrency limit: exactly 2 should be in-flight"
);
fake_fetcher.release_one();
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
fake_fetcher.max_seen_in_flight(),
2,
"Max concurrent should not exceed 2"
);
fake_fetcher.release_all();
}
#[tokio::test]
async fn test_prefetch_limit() {
let fake_fetcher = Arc::new(FakeFetcher::new(0, true));
let downloader = RemoteLogDownloader::new_with_fetcher(
fake_fetcher,
2, 10, )
.unwrap();
let bucket = create_table_bucket(1, 0);
let segs: Vec<_> = (0..4)
.map(|i| create_segment(&format!("seg{i}"), i * 100, 1000, bucket.clone()))
.collect();
let mut futures: Vec<_> = segs
.iter()
.map(|seg| downloader.request_remote_log("dir", seg))
.collect();
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if futures.iter().filter(|f| f.is_done()).count() >= 2 {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("Timeout waiting for first 2 downloads");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
futures.iter().filter(|f| f.is_done()).count(),
2,
"Prefetch limit: only 2 should complete"
);
let f4 = futures.pop().unwrap();
let f3 = futures.pop().unwrap();
drop(futures);
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
if f3.is_done() && f4.is_done() {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("Timeout after permit release");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
#[tokio::test]
async fn test_retry_and_cancellation() {
let fake_fetcher = Arc::new(FakeFetcher::new(2, true));
let downloader =
RemoteLogDownloader::new_with_fetcher(fake_fetcher.clone(), 10, 1).unwrap();
let bucket = create_table_bucket(1, 0);
let seg = create_segment("seg1", 0, 1000, bucket);
let future = downloader.request_remote_log("dir", &seg);
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
if future.is_done() {
break;
}
if tokio::time::Instant::now() > deadline {
panic!("Timeout waiting for retry to succeed");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(future.is_done(), "Should succeed after retries");
let seg2 = create_segment("seg2", 100, 1000, create_table_bucket(1, 0));
let fake_fetcher2 = Arc::new(FakeFetcher::new(100, true)); let downloader2 =
RemoteLogDownloader::new_with_fetcher(fake_fetcher2.clone(), 10, 1).unwrap();
let future2 = downloader2.request_remote_log("dir", &seg2);
tokio::time::sleep(Duration::from_millis(50)).await;
drop(future2);
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
fake_fetcher2.in_flight(),
0,
"Cancellation should release resources"
);
}
}