#include "storage/wal/wal.h"
#include "common/exception/runtime.h"
#include "common/file_system/file_info.h"
#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_file.h"
#include "common/serializer/in_mem_file_writer.h"
#include "main/client_context.h"
#include "main/database.h"
#include "main/db_config.h"
#include "storage/file_db_id_utils.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/wal/checksum_writer.h"
#include "storage/wal/local_wal.h"
using namespace lbug::common;
namespace lbug {
namespace storage {
WAL::WAL(const std::string& dbPath, bool readOnly, bool enableChecksums, VirtualFileSystem* vfs)
: walPath{StorageUtils::getWALFilePath(dbPath)},
checkpointWalPath{StorageUtils::getCheckpointWALFilePath(dbPath)},
inMemory{main::DBConfig::isDBPathInMemory(dbPath)}, readOnly{readOnly}, vfs{vfs},
enableChecksums(enableChecksums) {}
WAL::~WAL() {}
void WAL::logCommittedWAL(LocalWAL& localWAL, main::ClientContext* context,
uint64_t& commitSequence) {
DASSERT(!readOnly);
commitSequence = 0;
if (inMemory || localWAL.getSize() == 0) {
return; }
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
initWriter(context);
localWAL.inMemWriter->flush(*serializer->getWriter());
commitSequence = ++appendedCommitSequence;
waitForDurabilityNoLock(commitSequence, lck);
}
void WAL::logAndFlushCheckpoint(main::ClientContext* context) {
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
initWriter(context);
CheckpointRecord walRecord;
addNewWALRecordNoLock(walRecord);
flushAndSyncNoLock();
}
bool WAL::rotateForCheckpoint(main::ClientContext* ) {
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
if (inMemory) {
return false;
}
if (!serializer && !vfs->fileOrPathExists(walPath)) {
return false;
}
if (serializer) {
flushAndSyncNoLock();
durableCommitSequence = appendedCommitSequence;
groupCommitCV.notify_all();
fileInfo.reset();
serializer.reset();
}
vfs->renameFile(walPath, checkpointWalPath);
return true;
}
void WAL::logAndFlushCheckpointToFrozen(main::ClientContext* context) {
{
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
}
auto frozenFileInfo = vfs->openFile(checkpointWalPath,
FileOpenFlags(FileFlags::READ_ONLY | FileFlags::WRITE), context);
std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*frozenFileInfo);
auto& bufferedWriter = writer->cast<BufferedFileWriter>();
if (enableChecksums) {
writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
}
auto frozenSerializer = std::make_unique<Serializer>(std::move(writer));
bufferedWriter.setFileOffset(frozenFileInfo->getFileSize());
CheckpointRecord walRecord;
frozenSerializer->getWriter()->onObjectBegin();
WALRecord::serializeWithLength(*frozenSerializer, walRecord);
frozenSerializer->getWriter()->onObjectEnd();
try {
frozenSerializer->getWriter()->flush();
frozenSerializer->getWriter()->sync();
} catch (const std::exception& e) {
std::unique_lock lck{mtx};
poisonNoLock(e.what());
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes until "
"restart. Original error: " +
std::string{e.what()});
} catch (...) {
std::unique_lock lck{mtx};
poisonNoLock("unknown exception");
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes until "
"restart. Original error: unknown exception");
}
}
void WAL::clearFrozenWAL() {
vfs->removeFileIfExists(checkpointWalPath);
}
void WAL::clear() {
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
serializer->getWriter()->clear();
durableCommitSequence = appendedCommitSequence;
syncInProgress = false;
groupCommitCV.notify_all();
}
void WAL::reset() {
std::unique_lock lck{mtx};
durableCommitSequence = appendedCommitSequence;
syncInProgress = false;
groupCommitCV.notify_all();
fileInfo.reset();
serializer.reset();
vfs->removeFileIfExists(walPath);
}
void WAL::waitForDurabilityNoLock(uint64_t commitSequence, std::unique_lock<std::mutex>& lck) {
while (durableCommitSequence < commitSequence) {
throwIfPoisonedNoLock();
if (syncInProgress) {
groupCommitCV.wait(lck);
continue;
}
syncInProgress = true;
while (durableCommitSequence < appendedCommitSequence) {
const auto targetSequence = appendedCommitSequence;
serializer->getWriter()->flush();
auto* fileToSync = fileInfo.get();
lck.unlock();
try {
fileToSync->syncFile();
} catch (const std::exception& e) {
lck.lock();
poisonNoLock(e.what());
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes "
"until restart. Original error: " +
std::string{e.what()});
} catch (...) {
lck.lock();
poisonNoLock("unknown exception");
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes "
"until restart. Original error: unknown exception");
}
lck.lock();
durableCommitSequence = targetSequence;
groupCommitCV.notify_all();
}
syncInProgress = false;
groupCommitCV.notify_all();
}
}
void WAL::flushAndSyncNoLock() {
serializer->getWriter()->flush();
try {
serializer->getWriter()->sync();
} catch (const std::exception& e) {
poisonNoLock(e.what());
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes until "
"restart. Original error: " +
std::string{e.what()});
} catch (...) {
poisonNoLock("unknown exception");
throw RuntimeException(
"WAL sync failed; database is in a panic state and refuses further writes until "
"restart. Original error: unknown exception");
}
}
uint64_t WAL::getFileSize() {
std::unique_lock lck{mtx};
if (!serializer) {
if (inMemory || !vfs->fileOrPathExists(walPath)) {
return 0;
}
return vfs->openFile(walPath, FileOpenFlags(FileFlags::READ_ONLY))->getFileSize();
}
return serializer->getWriter()->getSize();
}
void WAL::throwIfPoisoned() {
std::unique_lock lck{mtx};
throwIfPoisonedNoLock();
}
void WAL::throwIfPoisonedNoLock() const {
if (!poisoned) {
return;
}
throw RuntimeException("WAL sync failed; database is in a panic state and refuses further "
"writes until restart. Original error: " +
poisonReason);
}
void WAL::poisonNoLock(const std::string& reason) {
poisoned = true;
poisonReason = reason;
syncInProgress = false;
groupCommitCV.notify_all();
}
void WAL::writeHeader(main::ClientContext& context) {
serializer->getWriter()->onObjectBegin();
FileDBIDUtils::writeDatabaseID(*serializer,
StorageManager::Get(context)->getOrInitDatabaseID(context));
serializer->write(enableChecksums);
serializer->getWriter()->onObjectEnd();
}
void WAL::initWriter(main::ClientContext* context) {
if (serializer) {
return;
}
fileInfo = vfs->openFile(walPath,
FileOpenFlags(FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE),
context);
std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*fileInfo);
auto& bufferedWriter = writer->cast<BufferedFileWriter>();
if (enableChecksums) {
writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
}
serializer = std::make_unique<Serializer>(std::move(writer));
if (fileInfo->getFileSize() == 0) {
writeHeader(*context);
}
bufferedWriter.setFileOffset(fileInfo->getFileSize());
}
void WAL::addNewWALRecordNoLock(const WALRecord& walRecord) {
DASSERT(walRecord.type != WALRecordType::INVALID_RECORD);
DASSERT(!inMemory);
DASSERT(serializer != nullptr);
serializer->getWriter()->onObjectBegin();
WALRecord::serializeWithLength(*serializer, walRecord);
serializer->getWriter()->onObjectEnd();
}
WAL* WAL::Get(const main::ClientContext& context) {
DASSERT(context.getDatabase() && context.getDatabase()->getStorageManager());
return &context.getDatabase()->getStorageManager()->getWAL();
}
} }