lbug 0.18.0

An in-process property graph database management system built for query speed and scalability
#include "storage/wal/wal.h"

#include "common/exception/runtime.h"
#include "common/file_system/file_info.h"
#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_file.h"
#include "common/serializer/in_mem_file_writer.h"
#include "main/client_context.h"
#include "main/database.h"
#include "main/db_config.h"
#include "storage/file_db_id_utils.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/wal/checksum_writer.h"
#include "storage/wal/local_wal.h"

using namespace lbug::common;

namespace lbug {
namespace storage {

WAL::WAL(const std::string& dbPath, bool readOnly, bool enableChecksums, VirtualFileSystem* vfs)
    : walPath{StorageUtils::getWALFilePath(dbPath)},
      checkpointWalPath{StorageUtils::getCheckpointWALFilePath(dbPath)},
      inMemory{main::DBConfig::isDBPathInMemory(dbPath)}, readOnly{readOnly}, vfs{vfs},
      enableChecksums(enableChecksums) {}

WAL::~WAL() {}

void WAL::logCommittedWAL(LocalWAL& localWAL, main::ClientContext* context,
    uint64_t& commitSequence) {
    DASSERT(!readOnly);
    commitSequence = 0;
    if (inMemory || localWAL.getSize() == 0) {
        return; // No need to log empty WAL.
    }
    std::unique_lock lck{mtx};
    throwIfPoisonedNoLock();
    initWriter(context);
    localWAL.inMemWriter->flush(*serializer->getWriter());
    commitSequence = ++appendedCommitSequence;
    waitForDurabilityNoLock(commitSequence, lck);
}

void WAL::logAndFlushCheckpoint(main::ClientContext* context) {
    std::unique_lock lck{mtx};
    throwIfPoisonedNoLock();
    initWriter(context);
    CheckpointRecord walRecord;
    addNewWALRecordNoLock(walRecord);
    flushAndSyncNoLock();
}

bool WAL::rotateForCheckpoint(main::ClientContext* /*context*/) {
    std::unique_lock lck{mtx};
    throwIfPoisonedNoLock();
    if (inMemory) {
        return false;
    }
    if (!serializer && !vfs->fileOrPathExists(walPath)) {
        return false;
    }
    if (serializer) {
        flushAndSyncNoLock();
        durableCommitSequence = appendedCommitSequence;
        groupCommitCV.notify_all();
        fileInfo.reset();
        serializer.reset();
    }
    vfs->renameFile(walPath, checkpointWalPath);
    return true;
}

void WAL::logAndFlushCheckpointToFrozen(main::ClientContext* context) {
    {
        std::unique_lock lck{mtx};
        throwIfPoisonedNoLock();
    }
    auto frozenFileInfo = vfs->openFile(checkpointWalPath,
        FileOpenFlags(FileFlags::READ_ONLY | FileFlags::WRITE), context);

    std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*frozenFileInfo);
    auto& bufferedWriter = writer->cast<BufferedFileWriter>();
    if (enableChecksums) {
        writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
    }
    auto frozenSerializer = std::make_unique<Serializer>(std::move(writer));
    bufferedWriter.setFileOffset(frozenFileInfo->getFileSize());

    CheckpointRecord walRecord;
    frozenSerializer->getWriter()->onObjectBegin();
    WALRecord::serializeWithLength(*frozenSerializer, walRecord);
    frozenSerializer->getWriter()->onObjectEnd();
    try {
        frozenSerializer->getWriter()->flush();
        frozenSerializer->getWriter()->sync();
    } catch (const std::exception& e) {
        std::unique_lock lck{mtx};
        poisonNoLock(e.what());
        throw RuntimeException(
            "WAL sync failed; database is in a panic state and refuses further writes until "
            "restart. Original error: " +
            std::string{e.what()});
    } catch (...) {
        std::unique_lock lck{mtx};
        poisonNoLock("unknown exception");
        throw RuntimeException(
            "WAL sync failed; database is in a panic state and refuses further writes until "
            "restart. Original error: unknown exception");
    }
}

void WAL::clearFrozenWAL() {
    vfs->removeFileIfExists(checkpointWalPath);
}

// NOLINTNEXTLINE(readability-make-member-function-const): semantically non-const function.
void WAL::clear() {
    std::unique_lock lck{mtx};
    throwIfPoisonedNoLock();
    serializer->getWriter()->clear();
    durableCommitSequence = appendedCommitSequence;
    syncInProgress = false;
    groupCommitCV.notify_all();
}

void WAL::reset() {
    std::unique_lock lck{mtx};
    durableCommitSequence = appendedCommitSequence;
    syncInProgress = false;
    groupCommitCV.notify_all();
    fileInfo.reset();
    serializer.reset();
    vfs->removeFileIfExists(walPath);
}

void WAL::waitForDurabilityNoLock(uint64_t commitSequence, std::unique_lock<std::mutex>& lck) {
    while (durableCommitSequence < commitSequence) {
        throwIfPoisonedNoLock();
        if (syncInProgress) {
            groupCommitCV.wait(lck);
            continue;
        }
        syncInProgress = true;
        while (durableCommitSequence < appendedCommitSequence) {
            const auto targetSequence = appendedCommitSequence;
            serializer->getWriter()->flush();
            auto* fileToSync = fileInfo.get();
            lck.unlock();
            try {
                fileToSync->syncFile();
            } catch (const std::exception& e) {
                lck.lock();
                poisonNoLock(e.what());
                throw RuntimeException(
                    "WAL sync failed; database is in a panic state and refuses further writes "
                    "until restart. Original error: " +
                    std::string{e.what()});
            } catch (...) {
                lck.lock();
                poisonNoLock("unknown exception");
                throw RuntimeException(
                    "WAL sync failed; database is in a panic state and refuses further writes "
                    "until restart. Original error: unknown exception");
            }
            lck.lock();
            durableCommitSequence = targetSequence;
            groupCommitCV.notify_all();
        }
        syncInProgress = false;
        groupCommitCV.notify_all();
    }
}

// NOLINTNEXTLINE(readability-make-member-function-const): semantically non-const function.
void WAL::flushAndSyncNoLock() {
    serializer->getWriter()->flush();
    try {
        serializer->getWriter()->sync();
    } catch (const std::exception& e) {
        poisonNoLock(e.what());
        throw RuntimeException(
            "WAL sync failed; database is in a panic state and refuses further writes until "
            "restart. Original error: " +
            std::string{e.what()});
    } catch (...) {
        poisonNoLock("unknown exception");
        throw RuntimeException(
            "WAL sync failed; database is in a panic state and refuses further writes until "
            "restart. Original error: unknown exception");
    }
}

uint64_t WAL::getFileSize() {
    std::unique_lock lck{mtx};
    if (!serializer) {
        if (inMemory || !vfs->fileOrPathExists(walPath)) {
            return 0;
        }
        return vfs->openFile(walPath, FileOpenFlags(FileFlags::READ_ONLY))->getFileSize();
    }
    return serializer->getWriter()->getSize();
}

void WAL::throwIfPoisoned() {
    std::unique_lock lck{mtx};
    throwIfPoisonedNoLock();
}

void WAL::throwIfPoisonedNoLock() const {
    if (!poisoned) {
        return;
    }
    throw RuntimeException("WAL sync failed; database is in a panic state and refuses further "
                           "writes until restart. Original error: " +
                           poisonReason);
}

void WAL::poisonNoLock(const std::string& reason) {
    poisoned = true;
    poisonReason = reason;
    syncInProgress = false;
    groupCommitCV.notify_all();
}

void WAL::writeHeader(main::ClientContext& context) {
    serializer->getWriter()->onObjectBegin();
    FileDBIDUtils::writeDatabaseID(*serializer,
        StorageManager::Get(context)->getOrInitDatabaseID(context));
    serializer->write(enableChecksums);
    serializer->getWriter()->onObjectEnd();
}

void WAL::initWriter(main::ClientContext* context) {
    if (serializer) {
        return;
    }
    fileInfo = vfs->openFile(walPath,
        FileOpenFlags(FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE),
        context);

    std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*fileInfo);
    auto& bufferedWriter = writer->cast<BufferedFileWriter>();
    if (enableChecksums) {
        writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
    }
    serializer = std::make_unique<Serializer>(std::move(writer));

    // Write the databaseID at the start of the WAL if needed
    // This is used to ensure that when replaying the WAL matches the database
    if (fileInfo->getFileSize() == 0) {
        writeHeader(*context);
    }

    // WAL should always be APPEND only. We don't want to overwrite the file as it may still
    // contain records not replayed. This can happen if checkpoint is not triggered before the
    // Database is closed last time.
    bufferedWriter.setFileOffset(fileInfo->getFileSize());
}

// NOLINTNEXTLINE(readability-make-member-function-const): semantically non-const function.
void WAL::addNewWALRecordNoLock(const WALRecord& walRecord) {
    DASSERT(walRecord.type != WALRecordType::INVALID_RECORD);
    DASSERT(!inMemory);
    DASSERT(serializer != nullptr);
    serializer->getWriter()->onObjectBegin();
    WALRecord::serializeWithLength(*serializer, walRecord);
    serializer->getWriter()->onObjectEnd();
}

WAL* WAL::Get(const main::ClientContext& context) {
    DASSERT(context.getDatabase() && context.getDatabase()->getStorageManager());
    return &context.getDatabase()->getStorageManager()->getWAL();
}

} // namespace storage
} // namespace lbug