#include "catalog/catalog_set.h"
#include <mutex>
#include "binder/ddl/bound_alter_info.h"
#include "catalog/catalog_entry/dummy_catalog_entry.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "common/assert.h"
#include "common/exception/catalog.h"
#include "common/serializer/deserializer.h"
#include "transaction/transaction.h"
#include <format>
using namespace lbug::common;
using namespace lbug::transaction;
namespace lbug {
namespace catalog {
CatalogSet::CatalogSet(bool isInternal) {
if (isInternal) {
nextOID = INTERNAL_CATALOG_SET_START_OID;
}
}
static bool checkWWConflict(const Transaction* transaction, const CatalogEntry* entry) {
return (entry->getTimestamp() >= Transaction::START_TRANSACTION_ID &&
entry->getTimestamp() != transaction->getID()) ||
(entry->getTimestamp() < Transaction::START_TRANSACTION_ID &&
entry->getTimestamp() > transaction->getStartTS());
}
bool CatalogSet::containsEntry(const Transaction* transaction, const std::string& name) {
std::shared_lock lck{mtx};
return containsEntryNoLock(transaction, name);
}
bool CatalogSet::containsEntryNoLock(const Transaction* transaction,
const std::string& name) const {
if (!entries.contains(name)) {
return false;
}
const auto entry =
traverseVersionChainsForTransactionNoLock(transaction, entries.at(name).get());
return !entry->isDeleted();
}
CatalogEntry* CatalogSet::getEntry(const Transaction* transaction, const std::string& name) {
std::shared_lock lck{mtx};
return getEntryNoLock(transaction, name);
}
CatalogEntry* CatalogSet::getEntryNoLock(const Transaction* transaction,
const std::string& name) const {
validateExistNoLock(transaction, name);
const auto entry =
traverseVersionChainsForTransactionNoLock(transaction, entries.at(name).get());
DASSERT(entry != nullptr && !entry->isDeleted());
return entry;
}
oid_t CatalogSet::createEntry(Transaction* transaction, std::unique_ptr<CatalogEntry> entry) {
CatalogEntry* entryPtr = nullptr;
oid_t oid = INVALID_OID;
{
std::unique_lock lck{mtx};
oid = nextOID++;
entry->setOID(oid);
entryPtr = createEntryNoLock(transaction, std::move(entry));
}
DASSERT(entryPtr);
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushCreateDropCatalogEntry(*this, *entryPtr, isInternal());
}
return oid;
}
CatalogEntry* CatalogSet::createEntryNoLock(const Transaction* transaction,
std::unique_ptr<CatalogEntry> entry) {
validateNotExistNoLock(transaction, entry->getName());
entry->setTimestamp(transaction->getID());
if (entries.contains(entry->getName())) {
const auto existingEntry = entries.at(entry->getName()).get();
if (checkWWConflict(transaction, existingEntry)) {
throw CatalogException(std::format(
"Write-write conflict on creating catalog entry with name {}.", entry->getName()));
}
if (!existingEntry->isDeleted()) {
throw CatalogException(
std::format("Catalog entry with name {} already exists.", entry->getName()));
}
}
auto dummyEntry = createDummyEntryNoLock(entry->getName(), entry->getOID());
entries.emplace(entry->getName(), std::move(dummyEntry));
const auto entryPtr = entry.get();
emplaceNoLock(std::move(entry));
return entryPtr->getPrev();
}
void CatalogSet::emplaceNoLock(std::unique_ptr<CatalogEntry> entry) {
if (entries.contains(entry->getName())) {
entry->setPrev(std::move(entries.at(entry->getName())));
entries.erase(entry->getName());
}
entries.emplace(entry->getName(), std::move(entry));
}
void CatalogSet::eraseNoLock(const std::string& name) {
entries.erase(name);
}
std::unique_ptr<CatalogEntry> CatalogSet::createDummyEntryNoLock(std::string name, oid_t oid) {
return std::make_unique<DummyCatalogEntry>(std::move(name), oid);
}
CatalogEntry* CatalogSet::traverseVersionChainsForTransactionNoLock(const Transaction* transaction,
CatalogEntry* currentEntry) {
while (currentEntry) {
if (currentEntry->getTimestamp() == transaction->getID()) {
break;
}
if (currentEntry->getTimestamp() <= transaction->getStartTS()) {
break;
}
currentEntry = currentEntry->getPrev();
}
return currentEntry;
}
CatalogEntry* CatalogSet::getCommittedEntryNoLock(CatalogEntry* entry) {
while (entry) {
if (entry->getTimestamp() < Transaction::START_TRANSACTION_ID) {
break;
}
entry = entry->getPrev();
}
return entry;
}
void CatalogSet::dropEntry(Transaction* transaction, const std::string& name, oid_t oid) {
CatalogEntry* entryPtr = nullptr;
{
std::unique_lock lck{mtx};
entryPtr = dropEntryNoLock(transaction, name, oid);
}
DASSERT(entryPtr);
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushCreateDropCatalogEntry(*this, *entryPtr, isInternal());
}
}
CatalogEntry* CatalogSet::dropEntryNoLock(const Transaction* transaction, const std::string& name,
oid_t oid) {
validateExistNoLock(transaction, name);
auto tombstone = createDummyEntryNoLock(name, oid);
tombstone->setTimestamp(transaction->getID());
const auto tombstonePtr = tombstone.get();
emplaceNoLock(std::move(tombstone));
return tombstonePtr->getPrev();
}
void CatalogSet::alterTableEntry(Transaction* transaction,
const binder::BoundAlterInfo& alterInfo) {
std::unique_lock lck{mtx};
validateExistNoLock(transaction, alterInfo.tableName);
auto entry = getEntryNoLock(transaction, alterInfo.tableName);
DASSERT(entry->getType() == CatalogEntryType::NODE_TABLE_ENTRY ||
entry->getType() == CatalogEntryType::REL_GROUP_ENTRY);
const auto tableEntry = entry->ptrCast<TableCatalogEntry>();
auto newEntry = tableEntry->alter(transaction->getID(), alterInfo, this);
switch (alterInfo.alterType) {
case AlterType::RENAME: {
dropEntryNoLock(transaction, alterInfo.tableName, entry->getOID());
auto createdEntry = createEntryNoLock(transaction, std::move(newEntry));
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushAlterCatalogEntry(*this, *entry, alterInfo);
transaction->pushCreateDropCatalogEntry(*this, *createdEntry, isInternal(),
true );
}
} break;
case AlterType::COMMENT:
case AlterType::ADD_PROPERTY:
case AlterType::DROP_PROPERTY:
case AlterType::RENAME_PROPERTY:
case AlterType::ADD_FROM_TO_CONNECTION:
case AlterType::DROP_FROM_TO_CONNECTION: {
emplaceNoLock(std::move(newEntry));
if (transaction->shouldAppendToUndoBuffer()) {
transaction->pushAlterCatalogEntry(*this, *entry, alterInfo);
}
} break;
default: {
UNREACHABLE_CODE;
}
}
}
CatalogEntrySet CatalogSet::getEntries(const Transaction* transaction) {
CatalogEntrySet result;
std::shared_lock lck{mtx};
for (auto& [name, entry] : entries) {
auto currentEntry = traverseVersionChainsForTransactionNoLock(transaction, entry.get());
if (currentEntry->isDeleted()) {
continue;
}
result.emplace(name, currentEntry);
}
return result;
}
CatalogEntry* CatalogSet::getEntryOfOID(const Transaction* transaction, oid_t oid) {
std::shared_lock lck{mtx};
for (auto& [_, entry] : entries) {
if (entry->getOID() != oid) {
continue;
}
const auto currentEntry =
traverseVersionChainsForTransactionNoLock(transaction, entry.get());
if (currentEntry->isDeleted()) {
continue;
}
return currentEntry;
}
return nullptr;
}
void CatalogSet::serialize(Serializer serializer) const {
std::vector<CatalogEntry*> entriesToSerialize;
for (auto& [_, entry] : entries) {
switch (entry->getType()) {
case CatalogEntryType::SCALAR_FUNCTION_ENTRY:
case CatalogEntryType::REWRITE_FUNCTION_ENTRY:
case CatalogEntryType::AGGREGATE_FUNCTION_ENTRY:
case CatalogEntryType::COPY_FUNCTION_ENTRY:
case CatalogEntryType::TABLE_FUNCTION_ENTRY:
case CatalogEntryType::STANDALONE_TABLE_FUNCTION_ENTRY:
case CatalogEntryType::FOREIGN_TABLE_ENTRY:
continue;
default: {
auto committedEntry = getCommittedEntryNoLock(entry.get());
if (committedEntry && !committedEntry->isDeleted()) {
entriesToSerialize.push_back(committedEntry);
}
}
}
}
serializer.writeDebuggingInfo("nextOID");
serializer.serializeValue<oid_t>(nextOID);
serializer.writeDebuggingInfo("numEntries");
const uint64_t numEntriesToSerialize = entriesToSerialize.size();
serializer.serializeValue<uint64_t>(numEntriesToSerialize);
for (const auto entry : entriesToSerialize) {
entry->serialize(serializer);
}
}
void CatalogSet::serializeSnapshot(Serializer serializer, const Transaction* snapshotTxn) const {
std::shared_lock lck{mtx};
std::vector<CatalogEntry*> entriesToSerialize;
for (auto& [_, entry] : entries) {
switch (entry->getType()) {
case CatalogEntryType::SCALAR_FUNCTION_ENTRY:
case CatalogEntryType::REWRITE_FUNCTION_ENTRY:
case CatalogEntryType::AGGREGATE_FUNCTION_ENTRY:
case CatalogEntryType::COPY_FUNCTION_ENTRY:
case CatalogEntryType::TABLE_FUNCTION_ENTRY:
case CatalogEntryType::STANDALONE_TABLE_FUNCTION_ENTRY:
case CatalogEntryType::FOREIGN_TABLE_ENTRY:
continue;
default: {
auto visibleEntry = traverseVersionChainsForTransactionNoLock(snapshotTxn, entry.get());
if (visibleEntry && !visibleEntry->isDeleted()) {
entriesToSerialize.push_back(visibleEntry);
}
}
}
}
serializer.writeDebuggingInfo("nextOID");
serializer.serializeValue<oid_t>(nextOID);
serializer.writeDebuggingInfo("numEntries");
const uint64_t numEntriesToSerialize = entriesToSerialize.size();
serializer.serializeValue<uint64_t>(numEntriesToSerialize);
for (const auto entry : entriesToSerialize) {
entry->serialize(serializer);
}
}
std::unique_ptr<CatalogSet> CatalogSet::deserialize(Deserializer& deserializer) {
std::string debuggingInfo;
auto catalogSet = std::make_unique<CatalogSet>();
deserializer.validateDebuggingInfo(debuggingInfo, "nextOID");
deserializer.deserializeValue<oid_t>(catalogSet->nextOID);
uint64_t numEntries = 0;
deserializer.validateDebuggingInfo(debuggingInfo, "numEntries");
deserializer.deserializeValue<uint64_t>(numEntries);
for (uint64_t i = 0; i < numEntries; i++) {
auto entry = CatalogEntry::deserialize(deserializer);
if (entry != nullptr) {
catalogSet->emplaceNoLock(std::move(entry));
}
}
return catalogSet;
}
void CatalogSet::validateExistNoLock(const Transaction* transaction,
const std::string& name) const {
if (!containsEntryNoLock(transaction, name)) {
throw CatalogException(std::format("{} does not exist in catalog.", name));
}
}
void CatalogSet::validateNotExistNoLock(const Transaction* transaction,
const std::string& name) const {
if (containsEntryNoLock(transaction, name)) {
throw CatalogException(std::format("{} already exists in catalog.", name));
}
}
} }