#pragma once
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "direct_io_buffer.h"
#include "error.h"
#include "storage/data_page.h"
#include "storage/index_page_manager.h"
#include "storage/object_store.h"
#include "storage/page_mapper.h"
#include "storage/root_meta.h"
#include "tasks/task.h"
#include "tasks/write_buffer_aggregator.h"
#include "types.h"
namespace eloqstore
{
class WriteTask : public KvTask
{
public:
struct UploadState
{
DirectIoBuffer buffer;
std::string filename;
uint64_t start_offset{0};
uint64_t end_offset{0};
bool initialized{false};
bool invalid{false};
void ResetMetadata()
{
filename.clear();
start_offset = 0;
end_offset = 0;
initialized = false;
invalid = false;
buffer.clear();
}
};
WriteTask() = default;
WriteTask(const WriteTask &) = delete;
void Abort() override;
virtual void Reset(const TableIdent &tbl_id);
const TableIdent &TableId() const;
void WritePageCallback(VarPage page, KvError err);
UploadState &MutableUploadState()
{
return upload_state_;
}
const UploadState &GetUploadState() const
{
return upload_state_;
}
void ResetUploadState();
void EnsureUploadStateBuffer();
void AddInflightUploadTask();
void CompletePendingUploadTask(ObjectStore::UploadTask *task);
void AddPendingUploadTask(std::unique_ptr<ObjectStore::UploadTask> task)
{
pending_upload_tasks_.emplace_back(std::move(task));
}
KvError WaitWrite();
KvError DeleteArchive(uint64_t term, std::string_view tag);
void ReleaseDirBusy();
bool NeedWaitDirEviction() const
{
return need_wait_dir_eviction_;
}
void ClearNeedWaitDirEviction()
{
need_wait_dir_eviction_ = false;
}
KvError write_err_{KvError::NoError};
protected:
KvError FlushManifest();
KvError UpdateMeta();
void CompactIfNeeded(PageMapper *mapper) const;
void TriggerTTL();
void TriggerFileGC() const;
KvError TriggerLocalFileGC() const;
std::pair<DataPage, KvError> LoadDataPage(PageId page_id);
std::pair<OverflowPage, KvError> LoadOverflowPage(PageId page_id);
std::pair<PageId, FilePageId> AllocatePage(PageId page_id);
std::string_view TaskTypeName() const;
void FreePage(PageId page_id);
FilePageId ToFilePage(PageId page_id);
TableIdent tbl_ident_;
CowRootMeta cow_meta_;
ManifestBuilder wal_builder_;
KvError WritePage(DataPage &&page);
KvError WritePage(OverflowPage &&page);
KvError WritePage(MemIndexPage::Handle &page);
KvError WritePage(MemIndexPage::Handle &page, FilePageId file_page_id);
KvError WritePage(VarPage page, FilePageId file_page_id);
KvError AppendWritePage(VarPage page, FilePageId file_page_id);
void FlushAppendWrites();
KvError WaitPendingUploads();
KvError ConsumePendingUploadResults();
std::pair<FileId, uint32_t> ConvFilePageId(FilePageId file_page_id) const;
virtual DirectIoBuffer AcquireUploadStateBuffer();
virtual void ReleaseUploadStateBuffer(DirectIoBuffer buffer);
std::optional<FileId> last_append_file_id_;
WriteBufferAggregator append_aggregator_{0};
UploadState upload_state_;
uint32_t inflight_upload_tasks_{0};
WaitingSeat upload_waiting_;
std::vector<std::unique_ptr<ObjectStore::UploadTask>> pending_upload_tasks_;
bool dir_busy_registered_{false};
bool need_wait_dir_eviction_{false};
};
}