#include "tasks/prewarm_task.h"
#include <glog/logging.h>
#include <algorithm>
#include <cassert>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "async_io_manager.h"
#ifdef ELOQ_MODULE_ENABLED
#include <bthread/bthread.h>
#include "bthread/eloq_module.h"
#endif
#include "common.h"
#include "eloq_store.h"
#include "error.h"
#include "storage/shard.h"
#include "tasks/task.h"
#include "utils.h"
namespace eloqstore
{
Prewarmer::Prewarmer(CloudStoreMgr *io_mgr) : io_mgr_(io_mgr)
{
assert(io_mgr_ != nullptr);
}
bool Prewarmer::PopNext(PrewarmFile &file)
{
return io_mgr_->PopPrewarmFile(file);
}
void Prewarmer::Run()
{
bool registered_active = false;
auto register_active = [&]()
{
if (!registered_active)
{
shard->TaskMgr()->AddExternalTask();
io_mgr_->RegisterPrewarmActive();
registered_active = true;
}
};
auto unregister_active = [&]()
{
if (registered_active)
{
io_mgr_->UnregisterPrewarmActive();
shard->TaskMgr()->FinishExternalTask();
registered_active = false;
}
};
while (true)
{
if (shutting_down_)
{
io_mgr_->GetPrewarmStats().completion_reason =
PrewarmStats::CompletionReason::Shutdown;
io_mgr_->ClearPrewarmFiles();
io_mgr_->StopAllPrewarmTasks();
stop_ = true;
unregister_active();
break;
}
if (stop_)
{
if (io_mgr_->IsPrewarmListingComplete() &&
io_mgr_->GetPrewarmPendingCount() == 0)
{
io_mgr_->GetPrewarmStats().end_time =
std::chrono::steady_clock::now();
const auto &stats = io_mgr_->GetPrewarmStats();
size_t files_pulled = io_mgr_->GetPrewarmFilesPulled();
unregister_active();
status_ = TaskStatus::Idle;
if (io_mgr_->ActivePrewarmTasks() == 0)
{
LOG(INFO) << "Shard " << shard->shard_id_ << " prewarm "
<< stats.CompletionReasonString() << ". Pulled "
<< files_pulled << " files in "
<< stats.DurationSeconds() << "s";
}
Yield();
continue;
}
else if (!io_mgr_->IsPrewarmListingComplete())
{
unregister_active();
status_ = TaskStatus::Idle;
Yield();
continue;
}
}
register_active();
PrewarmFile file;
bool popped = PopNext(file);
if (!popped)
{
if (!io_mgr_->IsPrewarmListingComplete())
{
unregister_active();
status_ = TaskStatus::Idle;
Yield();
continue;
}
else
{
io_mgr_->ClearPrewarmFiles();
io_mgr_->StopAllPrewarmTasks();
continue;
}
}
if (io_mgr_->LocalCacheRemained() < file.file_size)
{
io_mgr_->GetPrewarmStats().end_time =
std::chrono::steady_clock::now();
const auto &stats = io_mgr_->GetPrewarmStats();
size_t files_pulled = io_mgr_->GetPrewarmFilesPulled();
LOG(INFO) << "Shard " << shard->shard_id_
<< " out of local disk space during prewarm. "
<< "Pulled " << files_pulled << " files in "
<< stats.DurationSeconds()
<< "s before running out of space.";
io_mgr_->GetPrewarmStats().completion_reason =
PrewarmStats::CompletionReason::DiskFull;
io_mgr_->MarkPrewarmListingComplete();
io_mgr_->ClearPrewarmFiles();
io_mgr_->StopAllPrewarmTasks();
unregister_active();
status_ = TaskStatus::Idle;
Yield();
continue;
}
DLOG(INFO) << "Prewarm downloading: " << file.tbl_id.ToString() << "/"
<< (file.is_manifest
? BranchManifestFileName(file.branch_name, file.term)
: BranchDataFileName(
file.file_id, file.branch_name, file.term))
<< ", size: " << file.file_size << " bytes)";
auto [fd_ref, err] = io_mgr_->OpenFD(
file.tbl_id, file.file_id, true, file.branch_name, file.term);
if (err == KvError::NoError)
{
fd_ref = nullptr;
}
else if (err == KvError::NotFound)
{
io_mgr_->GetPrewarmStats().files_skipped_missing++;
LOG(WARNING) << "Prewarm skip missing "
<< (file.is_manifest ? "manifest" : "data file")
<< " for " << file.tbl_id;
}
else
{
io_mgr_->GetPrewarmStats().files_failed++;
LOG(WARNING) << "Prewarm failed for " << file.tbl_id << " file "
<< file.file_id << ": " << ErrorString(err);
io_mgr_->GetPrewarmStats().end_time =
std::chrono::steady_clock::now();
const auto &stats = io_mgr_->GetPrewarmStats();
size_t files_pulled = io_mgr_->GetPrewarmFilesPulled();
LOG(INFO) << "Shard " << shard->shard_id_
<< " aborting prewarm due to persistent errors. "
<< "Pulled " << files_pulled << " files, "
<< stats.files_failed << " failed in "
<< stats.DurationSeconds() << "s";
io_mgr_->GetPrewarmStats().completion_reason =
PrewarmStats::CompletionReason::DownloadError;
io_mgr_->MarkPrewarmListingComplete();
io_mgr_->ClearPrewarmFiles();
io_mgr_->StopAllPrewarmTasks();
continue;
}
if (shard->HasPendingRequests() ||
shard->TaskMgr()->NumActive() > io_mgr_->ActivePrewarmTasks())
{
unregister_active();
status_ = TaskStatus::BlockedIO;
Yield();
}
}
}
void Prewarmer::Shutdown()
{
if (!Options()->prewarm_cloud_cache || stop_)
return;
shutting_down_ = true;
stop_ = true;
shard->running_ = this;
coro_ = coro_.resume();
}
PrewarmService::PrewarmService(EloqStore *store) : store_(store)
{
assert(store_ != nullptr);
}
PrewarmService::~PrewarmService()
{
Stop();
}
void PrewarmService::Start()
{
stop_requested_.store(false, std::memory_order_relaxed);
stopped_.store(false, std::memory_order_release);
thread_ = std::thread(
[this]
{
std::string prewarm_all("");
PrewarmCloudCache(prewarm_all);
PrewarmLoop();
stopped_.store(true, std::memory_order_release);
});
}
void PrewarmService::PrewarmLoop()
{
while (!stop_requested_.load(std::memory_order_acquire))
{
std::array<TableIdent, 100> prewarm_tables;
size_t sz;
do
{
sz = prewarm_tables_.wait_dequeue_bulk_timed(
prewarm_tables.data(),
prewarm_tables.size(),
std::chrono::seconds(1));
for (size_t i = 0; i < sz; ++i)
{
const TableIdent &table = prewarm_tables[i];
std::string remote_path = table.ToString();
PrewarmCloudCache(remote_path);
}
} while (sz == prewarm_tables.size());
}
}
void PrewarmService::Stop()
{
stop_requested_.store(true, std::memory_order_release);
AbortPrewarmWorkers();
#ifdef ELOQ_MODULE_ENABLED
while (!stopped_.load(std::memory_order_acquire))
{
bthread_usleep(1000);
}
#endif
if (thread_.joinable())
{
thread_.join();
}
}
void PrewarmService::Prewarm(const TableIdent &table)
{
prewarm_tables_.enqueue(table);
}
bool PrewarmService::ListCloudObjects(
const std::string &remote_path,
std::vector<utils::CloudObjectInfo> &details,
const std::string &continuation_token,
std::string *next_continuation_token)
{
if (store_->shards_.empty())
{
return false;
}
details.clear();
ListObjectRequest request(nullptr);
request.SetRemotePath(remote_path);
request.SetDetails(&details);
request.SetRecursive(true);
request.SetContinuationToken(continuation_token);
request.err_ = KvError::NoError;
#ifdef ELOQ_MODULE_ENABLED
{
std::lock_guard<bthread::Mutex> lk(request.mutex_);
request.done_ = false;
}
#else
request.done_.store(false, std::memory_order_relaxed);
#endif
request.callback_ = nullptr;
size_t random_shard = std::rand() % store_->shards_.size();
if (!store_->shards_[random_shard]->AddKvRequest(&request))
{
return false;
}
request.Wait();
if (next_continuation_token)
{
*next_continuation_token = *request.GetNextContinuationToken();
}
return request.Error() == KvError::NoError;
}
void PrewarmService::PrewarmCloudCache(const std::string &remote_path)
{
const uint16_t num_threads =
std::max<uint16_t>(uint16_t{1}, store_->options_.num_threads);
const size_t shard_limit =
store_->options_.local_space_limit / static_cast<size_t>(num_threads);
if (shard_limit == 0)
{
LOG(INFO) << "Skip cloud prewarm: no local cache space per shard";
return;
}
auto prewarm_start = std::chrono::steady_clock::now();
LOG(INFO) << "Starting cloud cache prewarm across "
<< store_->shards_.size() << " shards";
for (size_t i = 0; i < store_->shards_.size(); ++i)
{
auto *cloud_mgr =
static_cast<CloudStoreMgr *>(store_->shards_[i]->IoManager());
if (!cloud_mgr->HasPrewarmWorkers())
{
continue;
}
cloud_mgr->prewarm_files_pulled_.store(0, std::memory_order_relaxed);
cloud_mgr->prewarm_listing_complete_.store(false,
std::memory_order_relaxed);
cloud_mgr->prewarm_queue_size_.store(0, std::memory_order_relaxed);
cloud_mgr->GetPrewarmStats() = PrewarmStats{};
cloud_mgr->GetPrewarmStats().start_time = prewarm_start;
}
std::string continuation_token;
size_t total_files_listed = 0;
size_t total_files_skipped = 0;
size_t api_call_count = 0;
PrewarmStats::CompletionReason completion_reason =
PrewarmStats::CompletionReason::Success;
const TableIdent scoped_tbl_id = TableIdent::FromString(remote_path);
const bool scoped_to_single_partition =
!remote_path.empty() && scoped_tbl_id.IsValid();
auto should_abort = [&]()
{
if (stop_requested_.load(std::memory_order_acquire))
{
completion_reason = PrewarmStats::CompletionReason::Shutdown;
return true;
}
return false;
};
if (should_abort())
{
goto listing_done;
}
do
{
if (should_abort())
{
goto listing_done;
}
if (api_call_count > 0 && api_call_count % 10 == 0)
{
LOG(INFO) << "Prewarm listing progress: " << total_files_listed
<< " files across " << api_call_count << " API calls";
}
std::vector<utils::CloudObjectInfo> batch_infos;
std::string next_token;
if (!ListCloudObjects(
remote_path, batch_infos, continuation_token, &next_token))
{
LOG(WARNING) << "Failed to list cloud objects after "
<< api_call_count << " API calls, prewarm aborted. "
<< "Listed " << total_files_listed << " files so far.";
completion_reason = PrewarmStats::CompletionReason::ListingError;
for (auto &shard : store_->shards_)
{
auto *cloud_mgr =
reinterpret_cast<CloudStoreMgr *>(shard->IoManager());
cloud_mgr->GetPrewarmStats().completion_reason =
completion_reason;
cloud_mgr->MarkPrewarmListingComplete();
}
goto listing_done;
}
api_call_count++;
std::vector<std::vector<PrewarmFile>> shard_files(
store_->shards_.size());
for (const auto &info : batch_infos)
{
if (info.is_dir)
{
total_files_skipped++;
continue;
}
const std::string &path =
!info.path.empty() ? info.path : info.name;
if (path.empty())
{
total_files_skipped++;
continue;
}
TableIdent tbl_id;
std::string filename;
if (scoped_to_single_partition)
{
tbl_id = scoped_tbl_id;
size_t slash = path.find_last_of('/');
filename =
slash == std::string::npos ? path : path.substr(slash + 1);
if (filename.empty())
{
total_files_skipped++;
continue;
}
}
else if (!ExtractPartition(path, tbl_id, filename))
{
total_files_skipped++;
continue;
}
if (store_->options_.partition_filter &&
!store_->options_.partition_filter(tbl_id))
{
total_files_skipped++;
continue;
}
if (filename.ends_with(TmpSuffix))
{
total_files_skipped++;
continue;
}
PrewarmFile file;
file.tbl_id = tbl_id;
file.mod_time = info.mod_time;
auto [file_type, suffix] = ParseFileName(filename);
if (file_type == FileNameManifest)
{
std::string_view branch_name;
uint64_t term = 0;
std::optional<std::string> tag;
if (!ParseManifestFileSuffix(suffix, branch_name, term, tag) ||
tag.has_value())
{
DLOG(INFO)
<< "ParseManifestFileSuffix failed, suffix: " << suffix;
total_files_skipped++;
continue;
}
file.file_id = IouringMgr::LruFD::kManifest;
file.term = term;
file.branch_name = std::string(branch_name);
file.is_manifest = true;
}
else if (file_type == FileNameData)
{
std::string_view branch_name;
if (!ParseDataFileSuffix(
suffix, file.file_id, branch_name, file.term))
{
DLOG(INFO)
<< "ParseDataFileSuffix failed, suffix: " << suffix;
total_files_skipped++;
continue;
}
file.branch_name = std::string(branch_name);
file.is_manifest = false;
}
else
{
total_files_skipped++;
continue;
}
file.file_size = static_cast<size_t>(info.size);
const size_t shard_index =
file.tbl_id.ShardIndex(shard_files.size());
shard_files[shard_index].push_back(std::move(file));
}
for (auto &files : shard_files)
{
std::sort(files.begin(),
files.end(),
[](const PrewarmFile &lhs, const PrewarmFile &rhs)
{
if (lhs.is_manifest != rhs.is_manifest)
{
return lhs.is_manifest && !rhs.is_manifest;
}
if (!lhs.is_manifest && !rhs.is_manifest &&
lhs.mod_time != rhs.mod_time)
{
return lhs.mod_time > rhs.mod_time;
}
return lhs.file_id > rhs.file_id;
});
}
for (size_t i = 0; i < shard_files.size(); ++i)
{
if (shard_files[i].empty())
{
continue;
}
auto *cloud_mgr =
static_cast<CloudStoreMgr *>(store_->shards_[i]->IoManager());
if (cloud_mgr == nullptr || !cloud_mgr->HasPrewarmWorkers())
{
continue;
}
size_t batch_size = shard_files[i].size();
total_files_listed += batch_size;
cloud_mgr->GetPrewarmStats().total_files_listed += batch_size;
if (!cloud_mgr->AppendPrewarmFiles(shard_files[i]))
{
LOG(INFO) << "Prewarm listing aborted by shard " << i
<< " consumer after " << api_call_count
<< " API calls. " << "Listed " << total_files_listed
<< " files total.";
if (stop_requested_.load(std::memory_order_acquire))
{
completion_reason =
PrewarmStats::CompletionReason::Shutdown;
}
else
{
completion_reason =
PrewarmStats::CompletionReason::DiskFull;
}
for (auto &shard : store_->shards_)
{
auto *mgr =
reinterpret_cast<CloudStoreMgr *>(shard->IoManager());
mgr->GetPrewarmStats().completion_reason =
completion_reason;
mgr->MarkPrewarmListingComplete();
}
goto listing_done; }
#ifdef ELOQ_MODULE_ENABLED
eloq::EloqModule::NotifyWorker(i);
#endif
}
continuation_token = std::move(next_token);
} while (!continuation_token.empty());
listing_done:
auto prewarm_end = std::chrono::steady_clock::now();
size_t total_pulled = 0;
for (auto &shard : store_->shards_)
{
auto *cloud_mgr = reinterpret_cast<CloudStoreMgr *>(shard->IoManager());
auto &stats = cloud_mgr->GetPrewarmStats();
stats.end_time = prewarm_end;
stats.total_files_pulled = cloud_mgr->GetPrewarmFilesPulled();
stats.files_skipped_filtered =
total_files_skipped / store_->shards_.size();
if (completion_reason != PrewarmStats::CompletionReason::ListingError &&
completion_reason != PrewarmStats::CompletionReason::DiskFull)
{
stats.completion_reason = completion_reason;
}
total_pulled += stats.total_files_pulled;
cloud_mgr->MarkPrewarmListingComplete();
#ifdef ELOQ_MODULE_ENABLED
eloq::EloqModule::NotifyWorker(
static_cast<Shard *>(shard.get())->shard_id_);
#endif
}
double duration = std::chrono::duration_cast<std::chrono::milliseconds>(
prewarm_end - prewarm_start)
.count() /
1000.0;
PrewarmStats temp_stats;
temp_stats.completion_reason = completion_reason;
LOG(INFO) << "Prewarm listing " << temp_stats.CompletionReasonString()
<< ". Duration: " << duration << "s"
<< ", API calls: " << api_call_count
<< ", Files listed: " << total_files_listed
<< ", Files skipped: " << total_files_skipped
<< ", Files pulled: " << total_pulled
<< ", Shards: " << store_->shards_.size();
}
bool PrewarmService::ExtractPartition(const std::string &path,
TableIdent &tbl_id,
std::string &filename)
{
size_t start = 0;
while (start < path.size())
{
size_t slash = path.find('/', start);
size_t len =
slash == std::string::npos ? path.size() - start : slash - start;
if (len == 0)
{
if (slash == std::string::npos)
{
break;
}
start = slash + 1;
continue;
}
std::string component = path.substr(start, len);
if (component.find('.') != std::string::npos)
{
TableIdent ident = TableIdent::FromString(component);
if (!ident.IsValid())
{
return false;
}
tbl_id = std::move(ident);
if (slash == std::string::npos || slash + 1 >= path.size())
{
return false;
}
filename = path.substr(slash + 1);
return !filename.empty();
}
if (slash == std::string::npos)
{
break;
}
start = slash + 1;
}
return false;
}
void PrewarmService::AbortPrewarmWorkers()
{
if (store_ == nullptr)
{
return;
}
for (auto &shard_ptr : store_->shards_)
{
if (shard_ptr == nullptr)
{
continue;
}
auto *cloud_mgr =
reinterpret_cast<CloudStoreMgr *>(shard_ptr->IoManager());
cloud_mgr->GetPrewarmStats().completion_reason =
PrewarmStats::CompletionReason::Shutdown;
cloud_mgr->MarkPrewarmListingComplete();
cloud_mgr->ClearPrewarmFiles();
cloud_mgr->StopAllPrewarmTasks();
}
}
}