#pragma once
#include <limits>
#include <list>
#include <map>
#include <string>
#include <vector>
#include "monitoring/histogram.h"
#include "rocksdb/env.h"
#include "rocksdb/persistent_cache.h"
#include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
namespace ROCKSDB_NAMESPACE {
struct PersistentCacheConfig {
explicit PersistentCacheConfig(
Env* const _env, const std::string& _path, const uint64_t _cache_size,
const std::shared_ptr<Logger>& _log,
const uint32_t _write_buffer_size = 1 * 1024 * 1024 ) {
env = _env;
clock = (env != nullptr) ? env->GetSystemClock().get()
: SystemClock::Default().get();
path = _path;
log = _log;
cache_size = _cache_size;
writer_dispatch_size = write_buffer_size = _write_buffer_size;
}
Status ValidateSettings() const {
if (!env || path.empty()) {
return Status::InvalidArgument("empty or null args");
}
if (cache_size < cache_file_size || write_buffer_size >= cache_file_size ||
write_buffer_size * write_buffer_count() < 2 * cache_file_size) {
return Status::InvalidArgument("invalid cache size");
}
if (!writer_qdepth || writer_dispatch_size > write_buffer_size ||
write_buffer_size % writer_dispatch_size) {
return Status::InvalidArgument("invalid writer settings");
}
return Status::OK();
}
Env* env;
SystemClock* clock;
std::string path;
std::shared_ptr<Logger> log;
bool enable_direct_reads = true;
bool enable_direct_writes = false;
uint64_t cache_size = std::numeric_limits<uint64_t>::max();
uint32_t cache_file_size = 100ULL * 1024 * 1024;
uint32_t writer_qdepth = 1;
bool pipeline_writes = true;
uint64_t max_write_pipeline_backlog_size = 1ULL * 1024 * 1024 * 1024;
uint32_t write_buffer_size = 1ULL * 1024 * 1024;
size_t write_buffer_count() const {
assert(write_buffer_size);
return static_cast<size_t>((writer_qdepth + 1.2) * cache_file_size /
write_buffer_size);
}
uint64_t writer_dispatch_size = 1ULL * 1024 * 1024;
bool is_compressed = true;
PersistentCacheConfig MakePersistentCacheConfig(
const std::string& path, const uint64_t size,
const std::shared_ptr<Logger>& log);
std::string ToString() const;
};
class PersistentCacheTier : public PersistentCache {
public:
using Tier = std::shared_ptr<PersistentCacheTier>;
virtual ~PersistentCacheTier() {}
virtual Status Open();
virtual Status Close();
virtual bool Reserve(const size_t size);
virtual bool Erase(const Slice& key);
virtual std::string PrintStats();
PersistentCache::StatsType Stats() override;
Status Insert(const Slice& page_key, const char* data,
const size_t size) override = 0;
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override = 0;
bool IsCompressed() override = 0;
std::string GetPrintableOptions() const override = 0;
uint64_t NewId() override;
virtual Tier& next_tier() { return next_tier_; }
virtual void set_next_tier(const Tier& tier) {
assert(!next_tier_);
next_tier_ = tier;
}
virtual void TEST_Flush() {
if (next_tier_) {
next_tier_->TEST_Flush();
}
}
private:
Tier next_tier_; std::atomic<uint64_t> last_id_{1};
};
class PersistentTieredCache : public PersistentCacheTier {
public:
virtual ~PersistentTieredCache();
Status Open() override;
Status Close() override;
bool Erase(const Slice& key) override;
std::string PrintStats() override;
PersistentCache::StatsType Stats() override;
Status Insert(const Slice& page_key, const char* data,
const size_t size) override;
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
size_t* size) override;
bool IsCompressed() override;
std::string GetPrintableOptions() const override {
return "PersistentTieredCache";
}
void AddTier(const Tier& tier);
Tier& next_tier() override {
auto it = tiers_.end();
return (*it)->next_tier();
}
void set_next_tier(const Tier& tier) override {
auto it = tiers_.end();
(*it)->set_next_tier(tier);
}
void TEST_Flush() override {
assert(!tiers_.empty());
tiers_.front()->TEST_Flush();
PersistentCacheTier::TEST_Flush();
}
protected:
std::list<Tier> tiers_; };
}