eloqstore-sys 1.1.1

Low-level Rust FFI bindings for EloqStore
#include "tasks/reopen_task.h"

#include <string>

#include "eloq_store.h"
#include "standby_service.h"
#include "storage/index_page_manager.h"
#include "storage/shard.h"
#include "tasks/prewarm_task.h"

namespace eloqstore
{

KvError ReopenTask::Reopen(const TableIdent &tbl_id)
{
    auto *request = static_cast<ReopenRequest *>(req_);
    CHECK(request != nullptr);
    StoreMode mode = shard->store_->Mode();
    if (mode == StoreMode::Local)
    {
        return KvError::InvalidArgs;
    }
    StandbyService *standby_service = nullptr;
    std::string tag;
    if (mode == StoreMode::StandbyReplica)
    {
        standby_service = shard->store_->GetStandbyService();
        if (standby_service == nullptr)
        {
            LOG(ERROR) << "Reopen " << tbl_id
                       << " failed: standby_service is null, tag "
                       << request->Tag();
            return KvError::InvalidArgs;
        }
        tag = request->Tag();
        if (!request->Clean())
        {
            KvTask *current_task = ThdTask();
            CHECK(current_task != nullptr);
            KvError enqueue_err = standby_service->RsyncPartition(tbl_id, tag);
            if (enqueue_err != KvError::NoError)
            {
                LOG(ERROR) << "Reopen " << tbl_id
                           << " rsync enqueue failed, tag " << tag << ", error "
                           << static_cast<uint32_t>(enqueue_err);
                return enqueue_err;
            }
            current_task->WaitIo();
            KvError sync_err = static_cast<KvError>(current_task->io_res_);
            if (sync_err != KvError::NoError && sync_err != KvError::NotFound &&
                sync_err != KvError::ResourceMissing)
            {
                LOG(ERROR) << "Reopen " << tbl_id << " rsync failed, tag "
                           << tag << ", error "
                           << static_cast<uint32_t>(sync_err);
                return sync_err;
            }
        }
    }

    KvError err = KvError::NoError;
    if (request->Clean())
    {
        err = shard->IndexManager()->InstallEmptySnapshot(tbl_id, cow_meta_);
    }
    else
    {
        err = shard->IndexManager()->InstallExternalSnapshot(
            tbl_id, cow_meta_, request->Tag());
    }
    if (err != KvError::NoError)
    {
        LOG(ERROR) << "Reopen " << tbl_id
                   << " InstallExternalSnapshot failed, tag " << request->Tag()
                   << ", mode " << static_cast<int>(mode) << ", error "
                   << static_cast<uint32_t>(err);
        return err;
    }
    if (mode == StoreMode::Cloud && Options()->prewarm_cloud_cache)
    {
        CHECK(shard->store_ != nullptr);
        PrewarmService *prewarm_service = shard->store_->GetPrewarmService();
        CHECK(prewarm_service != nullptr);
        prewarm_service->Prewarm(tbl_id);
    }

    const bool empty_snapshot =
        cow_meta_.root_id_ == MaxPageId && cow_meta_.ttl_root_id_ == MaxPageId;
    if (mode == StoreMode::Cloud && empty_snapshot)
    {
        auto *cloud_mgr = static_cast<CloudStoreMgr *>(shard->IoManager());
        err = cloud_mgr->CleanupLocalPartitionFiles(tbl_id);
        if (err != KvError::NoError)
        {
            LOG(ERROR) << "Reopen " << tbl_id
                       << " failed to cleanup local partition files, tag "
                       << request->Tag() << ", error "
                       << static_cast<uint32_t>(err);
            return err;
        }
    }
    else if (!shard->HasPendingLocalGc(tbl_id))
    {
        shard->AddPendingLocalGc(tbl_id);
    }
    return err;
}

}  // namespace eloqstore