#include "storage/wal/wal.h"
#include "common/file_system/file_info.h"
#include "common/file_system/virtual_file_system.h"
#include "common/serializer/buffered_file.h"
#include "common/serializer/in_mem_file_writer.h"
#include "main/client_context.h"
#include "main/database.h"
#include "main/db_config.h"
#include "storage/file_db_id_utils.h"
#include "storage/storage_manager.h"
#include "storage/storage_utils.h"
#include "storage/wal/checksum_writer.h"
#include "storage/wal/local_wal.h"
using namespace lbug::common;
namespace lbug {
namespace storage {
WAL::WAL(const std::string& dbPath, bool readOnly, bool enableChecksums, VirtualFileSystem* vfs)
: walPath{StorageUtils::getWALFilePath(dbPath)},
checkpointWalPath{StorageUtils::getCheckpointWALFilePath(dbPath)},
inMemory{main::DBConfig::isDBPathInMemory(dbPath)}, readOnly{readOnly}, vfs{vfs},
enableChecksums(enableChecksums) {}
WAL::~WAL() {}
void WAL::logCommittedWAL(LocalWAL& localWAL, main::ClientContext* context) {
DASSERT(!readOnly);
if (inMemory || localWAL.getSize() == 0) {
return; }
std::unique_lock lck{mtx};
initWriter(context);
localWAL.inMemWriter->flush(*serializer->getWriter());
flushAndSyncNoLock();
}
void WAL::logAndFlushCheckpoint(main::ClientContext* context) {
std::unique_lock lck{mtx};
initWriter(context);
CheckpointRecord walRecord;
addNewWALRecordNoLock(walRecord);
flushAndSyncNoLock();
}
bool WAL::rotateForCheckpoint(main::ClientContext* ) {
std::unique_lock lck{mtx};
if (inMemory) {
return false;
}
if (!serializer && !vfs->fileOrPathExists(walPath)) {
return false;
}
if (serializer) {
flushAndSyncNoLock();
fileInfo.reset();
serializer.reset();
}
vfs->renameFile(walPath, checkpointWalPath);
return true;
}
void WAL::logAndFlushCheckpointToFrozen(main::ClientContext* context) {
auto frozenFileInfo = vfs->openFile(checkpointWalPath,
FileOpenFlags(FileFlags::READ_ONLY | FileFlags::WRITE), context);
std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*frozenFileInfo);
auto& bufferedWriter = writer->cast<BufferedFileWriter>();
if (enableChecksums) {
writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
}
auto frozenSerializer = std::make_unique<Serializer>(std::move(writer));
bufferedWriter.setFileOffset(frozenFileInfo->getFileSize());
CheckpointRecord walRecord;
frozenSerializer->getWriter()->onObjectBegin();
walRecord.serialize(*frozenSerializer);
frozenSerializer->getWriter()->onObjectEnd();
frozenSerializer->getWriter()->flush();
frozenSerializer->getWriter()->sync();
}
void WAL::clearFrozenWAL() {
vfs->removeFileIfExists(checkpointWalPath);
}
void WAL::clear() {
std::unique_lock lck{mtx};
serializer->getWriter()->clear();
}
void WAL::reset() {
std::unique_lock lck{mtx};
fileInfo.reset();
serializer.reset();
vfs->removeFileIfExists(walPath);
}
void WAL::flushAndSyncNoLock() {
serializer->getWriter()->flush();
serializer->getWriter()->sync();
}
uint64_t WAL::getFileSize() {
std::unique_lock lck{mtx};
return serializer->getWriter()->getSize();
}
void WAL::writeHeader(main::ClientContext& context) {
serializer->getWriter()->onObjectBegin();
FileDBIDUtils::writeDatabaseID(*serializer,
StorageManager::Get(context)->getOrInitDatabaseID(context));
serializer->write(enableChecksums);
serializer->getWriter()->onObjectEnd();
}
void WAL::initWriter(main::ClientContext* context) {
if (serializer) {
return;
}
fileInfo = vfs->openFile(walPath,
FileOpenFlags(FileFlags::CREATE_IF_NOT_EXISTS | FileFlags::READ_ONLY | FileFlags::WRITE),
context);
std::shared_ptr<Writer> writer = std::make_shared<BufferedFileWriter>(*fileInfo);
auto& bufferedWriter = writer->cast<BufferedFileWriter>();
if (enableChecksums) {
writer = std::make_shared<ChecksumWriter>(std::move(writer), *MemoryManager::Get(*context));
}
serializer = std::make_unique<Serializer>(std::move(writer));
if (fileInfo->getFileSize() == 0) {
writeHeader(*context);
}
bufferedWriter.setFileOffset(fileInfo->getFileSize());
}
void WAL::addNewWALRecordNoLock(const WALRecord& walRecord) {
DASSERT(walRecord.type != WALRecordType::INVALID_RECORD);
DASSERT(!inMemory);
DASSERT(serializer != nullptr);
serializer->getWriter()->onObjectBegin();
walRecord.serialize(*serializer);
serializer->getWriter()->onObjectEnd();
}
WAL* WAL::Get(const main::ClientContext& context) {
DASSERT(context.getDatabase() && context.getDatabase()->getStorageManager());
return &context.getDatabase()->getStorageManager()->getWAL();
}
} }