#pragma once
#include <glog/logging.h>
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "coding.h"
#include "manifest_buffer.h"
#include "types.h"
namespace eloqstore
{
constexpr uint32_t num_reserved_fd = 100;
inline std::pair<std::string_view, std::string_view> ParseFileName(
std::string_view name)
{
size_t pos = name.find(FileNameSeparator);
std::string_view file_type;
std::string_view suffix;
if (pos == std::string::npos)
{
file_type = name;
suffix = std::string_view{};
}
else
{
file_type = name.substr(0, pos);
suffix = name.substr(pos + 1);
}
return {file_type, suffix};
}
inline bool ParseUint64(std::string_view str, uint64_t &out)
{
if (str.empty())
{
return false;
}
errno = 0;
char *end = nullptr;
out = std::strtoull(str.data(), &end, 10);
if (errno != 0 || end != str.data() + str.size())
{
return false;
}
return true;
}
inline std::string NormalizeBranchName(std::string_view branch_name)
{
if (branch_name.empty())
{
LOG(WARNING) << "Branch name is empty";
return "";
}
std::string normalized;
normalized.reserve(branch_name.size());
for (char c : branch_name)
{
if ((c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-')
{
normalized.push_back(c);
}
else if (c >= 'A' && c <= 'Z')
{
normalized.push_back(c + ('a' - 'A'));
}
else
{
LOG(WARNING) << "Invalid character in branch name: '" << branch_name
<< "' (contains '" << c << "')";
return "";
}
}
return normalized;
}
inline bool IsValidBranchName(std::string_view branch_name)
{
if (branch_name.empty())
{
return false;
}
for (char c : branch_name)
{
if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') || c == '-')
{
continue; }
else
{
return false; }
}
return true; }
inline bool ParseDataFileSuffix(std::string_view suffix,
FileId &file_id,
std::string_view &branch_name,
uint64_t &term)
{
file_id = 0;
term = 0;
if (suffix.empty())
{
return false;
}
size_t first_sep = suffix.find(FileNameSeparator);
if (first_sep == std::string::npos)
{
return false;
}
size_t second_sep = suffix.find(FileNameSeparator, first_sep + 1);
if (second_sep == std::string::npos)
{
return false;
}
std::string_view file_id_str = suffix.substr(0, first_sep);
std::string_view branch_str =
suffix.substr(first_sep + 1, second_sep - first_sep - 1);
std::string_view term_str = suffix.substr(second_sep + 1);
uint64_t parsed_id = 0;
if (!ParseUint64(file_id_str, parsed_id))
{
return false;
}
if (!IsValidBranchName(branch_str))
{
return false; }
uint64_t parsed_term = 0;
if (!ParseUint64(term_str, parsed_term))
{
return false;
}
file_id = static_cast<FileId>(parsed_id);
branch_name = branch_str;
term = parsed_term;
return true;
}
inline bool ParseManifestFileSuffix(std::string_view suffix,
std::string_view &branch_name,
uint64_t &term,
std::optional<std::string> &tag)
{
term = 0;
tag.reset();
if (suffix.empty())
{
return false;
}
size_t first_sep = suffix.find(FileNameSeparator);
if (first_sep == std::string::npos)
{
return false;
}
std::string_view branch_str = suffix.substr(0, first_sep);
if (!IsValidBranchName(branch_str))
{
return false; }
uint64_t dummy = 0;
if (ParseUint64(branch_str, dummy))
{
return false;
}
std::string_view remainder = suffix.substr(first_sep + 1);
size_t second_sep = remainder.find(FileNameSeparator);
if (second_sep == std::string::npos)
{
uint64_t parsed_term = 0;
if (!ParseUint64(remainder, parsed_term))
{
return false;
}
branch_name = branch_str;
term = parsed_term;
return true;
}
std::string_view term_str = remainder.substr(0, second_sep);
std::string_view tag_str = remainder.substr(second_sep + 1);
uint64_t parsed_term = 0;
if (!ParseUint64(term_str, parsed_term) || tag_str.empty())
{
return false;
}
branch_name = branch_str;
term = parsed_term;
tag = std::string(tag_str);
return true;
}
inline uint64_t ManifestTermFromFilename(std::string_view filename)
{
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameManifest)
{
return 0;
}
std::string_view branch_name;
uint64_t term = 0;
std::optional<std::string> tag;
if (!ParseManifestFileSuffix(suffix, branch_name, term, tag))
{
return 0;
}
return term;
}
inline bool IsArchiveFile(std::string_view filename)
{
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameManifest)
{
return false;
}
std::string_view branch_name;
uint64_t term = 0;
std::optional<std::string> tag;
if (!ParseManifestFileSuffix(suffix, branch_name, term, tag))
{
return false;
}
return tag.has_value();
}
inline bool ParseCurrentTermFilename(std::string_view filename,
std::string_view &branch_name,
PartitonGroupId &pg_id)
{
constexpr std::string_view prefix = CurrentTermFileName;
if (filename.size() <= prefix.size() ||
filename.substr(0, prefix.size()) != prefix)
{
return false;
}
if (filename[prefix.size()] != FileNameSeparator)
{
return false;
}
std::string_view remainder = filename.substr(prefix.size() + 1);
auto last_sep = remainder.rfind(FileNameSeparator);
if (last_sep == std::string_view::npos || last_sep == 0 ||
last_sep == remainder.size() - 1)
{
return false;
}
std::string_view branch_str = remainder.substr(0, last_sep);
std::string_view pg_id_str = remainder.substr(last_sep + 1);
if (!IsValidBranchName(branch_str))
{
return false;
}
uint32_t parsed_pg_id = 0;
for (char c : pg_id_str)
{
if (c >= '0' && c <= '9')
{
uint32_t digit = static_cast<uint32_t>(c - '0');
if (parsed_pg_id > (UINT32_MAX - digit) / 10)
{
return false; }
parsed_pg_id = parsed_pg_id * 10 + digit;
}
else
{
return false; }
}
branch_name = branch_str;
pg_id = parsed_pg_id;
return true;
}
inline bool ParseCurrentTermFilename(std::string_view filename,
std::string_view &branch_name)
{
PartitonGroupId pg_id = 0;
return ParseCurrentTermFilename(filename, branch_name, pg_id);
}
inline std::string BranchDataFileName(FileId file_id,
std::string_view branch_name,
uint64_t term)
{
std::string normalized_branch = NormalizeBranchName(branch_name);
if (normalized_branch.empty())
{
return ""; }
std::string name;
name.reserve(std::size(FileNameData) + normalized_branch.size() + 32);
name.append(FileNameData);
name.push_back(FileNameSeparator);
name.append(std::to_string(file_id));
name.push_back(FileNameSeparator);
name.append(normalized_branch);
name.push_back(FileNameSeparator);
name.append(std::to_string(term));
return name;
}
inline std::string BranchManifestFileName(std::string_view branch_name,
uint64_t term)
{
std::string normalized_branch = NormalizeBranchName(branch_name);
if (normalized_branch.empty())
{
return ""; }
std::string name;
name.reserve(std::size(FileNameManifest) + normalized_branch.size() + 16);
name.append(FileNameManifest);
name.push_back(FileNameSeparator);
name.append(normalized_branch);
name.push_back(FileNameSeparator);
name.append(std::to_string(term));
return name;
}
inline std::string BranchArchiveName(std::string_view branch_name,
uint64_t term,
std::string_view tag)
{
std::string normalized_branch = NormalizeBranchName(branch_name);
if (normalized_branch.empty())
{
return ""; }
std::string name;
name.reserve(std::size(FileNameManifest) + normalized_branch.size() + 32);
name.append(FileNameManifest);
name.push_back(FileNameSeparator);
name.append(normalized_branch);
name.push_back(FileNameSeparator);
name.append(std::to_string(term));
name.push_back(FileNameSeparator);
name.append(tag);
return name;
}
inline std::string CurrentTermFileNameForBranchAndPartitionGroup(
std::string_view branch_name, PartitonGroupId partition_group_id)
{
std::string normalized_branch = NormalizeBranchName(branch_name);
if (normalized_branch.empty())
{
return ""; }
std::string name;
name.reserve(std::size(CurrentTermFileName) + normalized_branch.size() +
16);
name.append(CurrentTermFileName);
name.push_back(FileNameSeparator);
name.append(normalized_branch);
name.push_back(FileNameSeparator);
name.append(std::to_string(partition_group_id));
return name;
}
inline bool ParseBranchTerm(std::string_view content, uint64_t &term)
{
if (content.empty())
{
return false;
}
uint64_t result = 0;
for (char c : content)
{
if (c >= '0' && c <= '9')
{
uint64_t digit = static_cast<uint64_t>(c - '0');
if (result > (UINT64_MAX - digit) / 10)
{
return false; }
result = result * 10 + digit;
}
else
{
return false; }
}
term = result;
return true;
}
inline std::string TermToString(uint64_t term)
{
return std::to_string(term);
}
inline bool IsBranchManifest(std::string_view filename)
{
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameManifest)
{
return false;
}
std::string_view branch_name;
uint64_t term = 0;
std::optional<std::string> ts;
if (!ParseManifestFileSuffix(suffix, branch_name, term, ts))
{
return false;
}
return !ts.has_value();
}
inline bool IsBranchArchive(std::string_view filename)
{
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameManifest)
{
return false;
}
std::string_view branch_name;
uint64_t term = 0;
std::optional<std::string> ts;
if (!ParseManifestFileSuffix(suffix, branch_name, term, ts))
{
return false;
}
return ts.has_value();
}
inline bool IsBranchDataFile(std::string_view filename)
{
auto [type, suffix] = ParseFileName(filename);
if (type != FileNameData)
{
return false;
}
FileId file_id = 0;
std::string_view branch_name;
uint64_t term = 0;
return ParseDataFileSuffix(suffix, file_id, branch_name, term);
}
inline BranchFileMapping::const_iterator FindBranchRange(
const BranchFileMapping &mapping, FileId file_id)
{
BranchFileRange target;
target.max_file_id = file_id;
return std::lower_bound(mapping.begin(), mapping.end(), target);
}
inline bool FileIdInBranch(const BranchFileMapping &mapping,
FileId file_id,
std::string_view branch_name)
{
auto it = FindBranchRange(mapping, file_id);
if (it == mapping.end())
{
return false;
}
return it->branch_name == branch_name;
}
inline bool GetBranchNameAndTerm(const BranchFileMapping &mapping,
FileId file_id,
std::string &branch_name,
uint64_t &term)
{
auto it = FindBranchRange(mapping, file_id);
if (it == mapping.end())
{
return false;
}
branch_name = it->branch_name;
term = it->term;
return true;
}
inline std::string SerializeBranchFileMapping(const BranchFileMapping &mapping)
{
std::string result;
uint64_t num_entries = static_cast<uint64_t>(mapping.size());
result.append(reinterpret_cast<const char *>(&num_entries),
sizeof(uint64_t));
for (const auto &range : mapping)
{
uint32_t name_len = static_cast<uint32_t>(range.branch_name.size());
result.append(reinterpret_cast<const char *>(&name_len),
sizeof(uint32_t));
result.append(range.branch_name);
uint64_t term = range.term;
result.append(reinterpret_cast<const char *>(&term), sizeof(uint64_t));
uint64_t max_file_id = range.max_file_id;
result.append(reinterpret_cast<const char *>(&max_file_id),
sizeof(uint64_t));
}
return result;
}
inline BranchFileMapping DeserializeBranchFileMapping(std::string_view data)
{
BranchFileMapping mapping;
if (data.size() < sizeof(uint64_t))
{
return mapping;
}
uint64_t num_entries = 0;
std::memcpy(&num_entries, data.data(), sizeof(uint64_t));
data = data.substr(sizeof(uint64_t));
for (uint64_t i = 0; i < num_entries; ++i)
{
if (data.size() < sizeof(uint32_t))
{
return BranchFileMapping{}; }
uint32_t name_len = 0;
std::memcpy(&name_len, data.data(), sizeof(uint32_t));
data = data.substr(sizeof(uint32_t));
if (data.size() < name_len + sizeof(uint64_t) * 2)
{
return BranchFileMapping{}; }
BranchFileRange range;
range.branch_name = std::string(data.substr(0, name_len));
data = data.substr(name_len);
std::memcpy(&range.term, data.data(), sizeof(uint64_t));
data = data.substr(sizeof(uint64_t));
std::memcpy(&range.max_file_id, data.data(), sizeof(uint64_t));
data = data.substr(sizeof(uint64_t));
mapping.push_back(std::move(range));
}
return mapping;
}
inline std::string SerializeBranchManifestMetadata(
const BranchManifestMetadata &metadata)
{
std::string result;
uint32_t name_len = static_cast<uint32_t>(metadata.branch_name.size());
result.append(reinterpret_cast<const char *>(&name_len), sizeof(uint32_t));
result.append(metadata.branch_name);
uint64_t term = metadata.term;
result.append(reinterpret_cast<const char *>(&term), sizeof(uint64_t));
std::string mapping_str = SerializeBranchFileMapping(metadata.file_ranges);
result.append(mapping_str);
return result;
}
inline bool DeserializeBranchManifestMetadata(std::string_view data,
BranchManifestMetadata &metadata)
{
metadata = {};
if (data.size() < sizeof(uint32_t))
{
return false;
}
uint32_t name_len = 0;
std::memcpy(&name_len, data.data(), sizeof(uint32_t));
data = data.substr(sizeof(uint32_t));
if (data.size() < name_len + sizeof(uint64_t))
{
return false;
}
metadata.branch_name = std::string(data.substr(0, name_len));
data = data.substr(name_len);
std::memcpy(&metadata.term, data.data(), sizeof(uint64_t));
data = data.substr(sizeof(uint64_t));
metadata.file_ranges = DeserializeBranchFileMapping(data);
return true;
}
}