#pragma once
#include <list>
#include <map>
#include <queue>
#include <set>
#include <utility>
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/trace_reader_writer.h"
#include "rocksdb/trace_record.h"
#include "rocksdb/write_batch.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
extern const size_t kShadowValueSize ;
enum TraceOperationType : int {
kGet = 0,
kPut = 1,
kDelete = 2,
kSingleDelete = 3,
kRangeDelete = 4,
kMerge = 5,
kIteratorSeek = 6,
kIteratorSeekForPrev = 7,
kMultiGet = 8,
kPutEntity = 9,
kTaTypeNum = 10
};
struct TraceUnit {
uint64_t ts;
uint32_t type;
uint32_t cf_id;
size_t value_size;
std::string key;
};
struct TypeCorrelation {
uint64_t count;
uint64_t total_ts;
};
struct StatsUnit {
uint64_t key_id;
uint64_t access_count;
uint64_t latest_ts;
uint64_t succ_count; uint32_t cf_id;
size_t value_size;
std::vector<TypeCorrelation> v_correlation;
};
class AnalyzerOptions {
public:
std::vector<std::vector<int>> correlation_map;
std::vector<std::pair<int, int>> correlation_list;
AnalyzerOptions();
~AnalyzerOptions();
void SparseCorrelationInput(const std::string& in_str);
};
struct TraceStats {
uint32_t cf_id;
std::string cf_name;
uint64_t a_count;
uint64_t a_succ_count;
uint64_t a_key_id;
uint64_t a_key_size_sqsum;
uint64_t a_key_size_sum;
uint64_t a_key_mid;
uint64_t a_value_size_sqsum;
uint64_t a_value_size_sum;
uint64_t a_value_mid;
uint32_t a_peak_qps;
double a_ave_qps;
std::map<std::string, StatsUnit> a_key_stats;
std::map<uint64_t, uint64_t> a_count_stats;
std::map<uint64_t, uint64_t> a_key_size_stats;
std::map<uint64_t, uint64_t> a_value_size_stats;
std::map<uint32_t, uint32_t> a_qps_stats;
std::map<uint32_t, std::map<std::string, uint32_t>> a_qps_prefix_stats;
std::priority_queue<std::pair<uint64_t, std::string>,
std::vector<std::pair<uint64_t, std::string>>,
std::greater<std::pair<uint64_t, std::string>>>
top_k_queue;
std::priority_queue<std::pair<uint64_t, std::string>,
std::vector<std::pair<uint64_t, std::string>>,
std::greater<std::pair<uint64_t, std::string>>>
top_k_prefix_access;
std::priority_queue<std::pair<double, std::string>,
std::vector<std::pair<double, std::string>>,
std::greater<std::pair<double, std::string>>>
top_k_prefix_ave;
std::priority_queue<std::pair<uint32_t, uint32_t>,
std::vector<std::pair<uint32_t, uint32_t>>,
std::greater<std::pair<uint32_t, uint32_t>>>
top_k_qps_sec;
std::list<TraceUnit> time_series;
std::vector<std::pair<uint64_t, uint64_t>> correlation_output;
std::map<uint32_t, uint64_t> uni_key_num;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> time_series_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_count_dist_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_prefix_cut_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_value_size_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_size_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_key_num_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_qps_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> a_top_qps_prefix_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_key_f;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> w_prefix_cut_f;
TraceStats();
~TraceStats();
TraceStats(const TraceStats&) = delete;
TraceStats& operator=(const TraceStats&) = delete;
TraceStats(TraceStats&&) = default;
TraceStats& operator=(TraceStats&&) = default;
};
struct TypeUnit {
std::string type_name;
bool enabled;
uint64_t total_keys;
uint64_t total_access;
uint64_t total_succ_access;
uint32_t sample_count;
std::map<uint32_t, TraceStats> stats;
TypeUnit() = default;
~TypeUnit() = default;
TypeUnit(const TypeUnit&) = delete;
TypeUnit& operator=(const TypeUnit&) = delete;
TypeUnit(TypeUnit&&) = default;
TypeUnit& operator=(TypeUnit&&) = default;
};
struct CfUnit {
uint32_t cf_id;
uint64_t w_count; uint64_t a_count; std::map<uint64_t, uint64_t> w_key_size_stats; std::map<uint32_t, uint32_t> cf_qps;
};
class TraceAnalyzer : private TraceRecord::Handler,
private WriteBatch::Handler {
public:
TraceAnalyzer(std::string& trace_path, std::string& output_path,
AnalyzerOptions _analyzer_opts);
~TraceAnalyzer();
Status PrepareProcessing();
Status StartProcessing();
Status MakeStatistics();
Status ReProcessing();
Status EndProcessing();
Status WriteTraceUnit(TraceUnit& unit);
std::vector<TypeUnit>& GetTaVector() { return ta_; }
private:
using TraceRecord::Handler::Handle;
Status Handle(const WriteQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
Status Handle(const GetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
Status Handle(const IteratorSeekQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
Status Handle(const MultiGetQueryTraceRecord& record,
std::unique_ptr<TraceRecordResult>* result) override;
using WriteBatch::Handler::PutCF;
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override;
using WriteBatch::Handler::PutEntityCF;
Status PutEntityCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override;
using WriteBatch::Handler::DeleteCF;
Status DeleteCF(uint32_t column_family_id, const Slice& key) override;
using WriteBatch::Handler::SingleDeleteCF;
Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) override;
using WriteBatch::Handler::DeleteRangeCF;
Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
const Slice& end_key) override;
using WriteBatch::Handler::MergeCF;
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override;
using WriteBatch::Handler::PutBlobIndexCF;
Status PutBlobIndexCF(uint32_t , const Slice& ,
const Slice& ) override {
return Status::OK();
}
using WriteBatch::Handler::LogData;
void LogData(const Slice& ) override {}
using WriteBatch::Handler::MarkBeginPrepare;
Status MarkBeginPrepare(bool = false) override { return Status::OK(); }
using WriteBatch::Handler::MarkEndPrepare;
Status MarkEndPrepare(const Slice& ) override { return Status::OK(); }
using WriteBatch::Handler::MarkNoop;
Status MarkNoop(bool ) override { return Status::OK(); }
using WriteBatch::Handler::MarkRollback;
Status MarkRollback(const Slice& ) override { return Status::OK(); }
using WriteBatch::Handler::MarkCommit;
Status MarkCommit(const Slice& ) override { return Status::OK(); }
using WriteBatch::Handler::MarkCommitWithTimestamp;
Status MarkCommitWithTimestamp(const Slice& ,
const Slice& ) override {
return Status::OK();
}
Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
std::vector<uint32_t> cf_ids,
std::vector<Slice> keys,
std::vector<size_t> value_sizes);
Status OutputAnalysisResult(TraceOperationType op_type, uint64_t timestamp,
uint32_t cf_id, const Slice& key,
size_t value_size);
ROCKSDB_NAMESPACE::Env* env_;
EnvOptions env_options_;
std::unique_ptr<TraceReader> trace_reader_;
size_t offset_;
char buffer_[1024];
uint64_t write_batch_ts_;
std::string trace_name_;
std::string output_path_;
AnalyzerOptions analyzer_opts_;
uint64_t total_requests_;
uint64_t total_access_keys_;
uint64_t total_gets_;
uint64_t total_writes_;
uint64_t total_seeks_;
uint64_t total_seek_prevs_;
uint64_t total_multigets_;
uint64_t trace_create_time_;
uint64_t begin_time_;
uint64_t end_time_;
uint64_t time_series_start_;
uint32_t sample_max_;
uint32_t cur_time_sec_;
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
trace_sequence_f_; std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile> qps_f_; std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>
cf_qps_f_; std::vector<TypeUnit> ta_; std::map<uint32_t, CfUnit> cfs_; std::vector<uint32_t> qps_peak_;
std::vector<double> qps_ave_;
Status ReadTraceHeader(Trace* header);
Status ReadTraceFooter(Trace* footer);
Status ReadTraceRecord(Trace* trace);
Status KeyStatsInsertion(const uint32_t& type, const uint32_t& cf_id,
const std::string& key, const size_t value_size,
const uint64_t ts);
Status StatsUnitCorrelationUpdate(StatsUnit& unit, const uint32_t& type,
const uint64_t& ts, const std::string& key);
Status OpenStatsOutputFiles(const std::string& type, TraceStats& new_stats);
Status CreateOutputFile(
const std::string& type, const std::string& cf_name,
const std::string& ending,
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>* f_ptr);
Status CloseOutputFiles();
void PrintStatistics();
Status TraceUnitWriter(
std::unique_ptr<ROCKSDB_NAMESPACE::WritableFile>& f_ptr, TraceUnit& unit);
Status WriteTraceSequence(const uint32_t& type, const uint32_t& cf_id,
const Slice& key, const size_t value_size,
const uint64_t ts);
Status MakeStatisticKeyStatsOrPrefix(TraceStats& stats);
Status MakeStatisticCorrelation(TraceStats& stats, StatsUnit& unit);
Status MakeStatisticQPS();
int db_version_;
};
int trace_analyzer_tool(int argc, char** argv);
}