eloqstore-sys 1.1.1

Low-level Rust FFI bindings for EloqStore
#include "cloud_storage_service.h"

#include <glog/logging.h>

#include <algorithm>
#include <chrono>

#ifdef ELOQ_MODULE_ENABLED
#include <bthread/eloq_module.h>
#endif

#include "async_io_manager.h"
#include "eloq_store.h"
#include "storage/shard.h"

namespace eloqstore
{
namespace
{
constexpr std::chrono::milliseconds kIdleWait{10};
}

CloudStorageService::CloudStorageService(EloqStore *store) : store_(store)
{
    CHECK(store_ != nullptr);
    size_t shard_count = store_->Options().num_threads;
    shard_stores_.assign(shard_count, nullptr);
    shard_locks_.resize(shard_count);

    worker_count_ =
        std::max<size_t>(store_->Options().cloud_request_threads, 1);
    job_queues_.resize(worker_count_);
}

CloudStorageService::~CloudStorageService()
{
    Stop();
}

void CloudStorageService::Start()
{
    bool expected = true;
    if (!stopping_.compare_exchange_strong(expected, false))
    {
        return;
    }
#ifdef ELOQ_MODULE_ENABLED
    {
        std::lock_guard<bthread::Mutex> lk(bootstrap_state_.mutex_);
        bootstrap_state_.err_ = KvError::Busy;
        bootstrap_state_.done_ = false;
    }
#else
    bootstrap_state_.err_ = KvError::Busy;
    bootstrap_state_.done_.store(false, std::memory_order_relaxed);
#endif
    accepting_jobs_.store(true, std::memory_order_release);
    workers_.reserve(worker_count_);
    for (size_t i = 0; i < worker_count_; ++i)
    {
        workers_.emplace_back(&CloudStorageService::RunWorker, this, i);
    }
}

void CloudStorageService::Stop()
{
    accepting_jobs_.store(false, std::memory_order_release);
    bool was_running = !stopping_.exchange(true, std::memory_order_acq_rel);
    for (auto &queue : job_queues_)
    {
        queue.enqueue({nullptr, nullptr});
    }
    if (!was_running && workers_.empty())
    {
        return;
    }
    for (auto &worker : workers_)
    {
        if (worker.joinable())
        {
            worker.join();
        }
    }
    workers_.clear();
}

void CloudStorageService::RegisterObjectStore(ObjectStore *store,
                                              size_t shard_id)
{
    if (shard_id >= shard_stores_.size())
    {
        LOG(ERROR) << "Shard id " << shard_id
                   << " is out of range when registering object store";
        return;
    }
    std::unique_lock lk(shard_locks_[shard_id].mutex);
    shard_stores_[shard_id] = store;
}

void CloudStorageService::UnregisterObjectStore(size_t shard_id)
{
    if (shard_id >= shard_stores_.size())
    {
        return;
    }
    std::unique_lock lk(shard_locks_[shard_id].mutex);
    shard_stores_[shard_id] = nullptr;
}

void CloudStorageService::Submit(ObjectStore *store, ObjectStore::Task *task)
{
    if (!accepting_jobs_.load(std::memory_order_acquire))
    {
        task->error_ = KvError::NotRunning;
        NotifyTaskFinished(task);
        return;
    }
    Shard *owner = task->owner_shard_;
    CHECK(owner != nullptr) << "Cloud task missing owner shard";
    pending_jobs_.fetch_add(1, std::memory_order_relaxed);
    if (!accepting_jobs_.load(std::memory_order_acquire))
    {
        pending_jobs_.fetch_sub(1, std::memory_order_relaxed);
        task->error_ = KvError::NotRunning;
        NotifyTaskFinished(task);
        return;
    }
    size_t worker_idx = owner->shard_id_ % worker_count_;
    job_queues_[worker_idx].enqueue({store, task});
}

void CloudStorageService::NotifyTaskFinished(ObjectStore::Task *task)
{
    CHECK(task != nullptr);
    CHECK(task->owner_shard_ != nullptr && task->kv_task_ != nullptr);
    auto *cloud_mgr =
        reinterpret_cast<CloudStoreMgr *>(task->owner_shard_->IoManager());
#ifdef ELOQ_MODULE_ENABLED
    int shard_id = static_cast<int>(task->owner_shard_->shard_id_);
#endif
    cloud_mgr->EnqueueCloudReadyTask(task);
#ifdef ELOQ_MODULE_ENABLED
    eloq::EloqModule::NotifyWorker(shard_id);
#endif
}

void CloudStorageService::RunWorker(size_t worker_index)
{
    const int64_t wait_timeout_us =
        std::chrono::duration_cast<std::chrono::microseconds>(kIdleWait)
            .count();
    if (worker_index == 0)
    {
        KvError err = KvError::Busy;
        if (stopping_.load(std::memory_order_acquire))
        {
            err = KvError::NotRunning;
        }
        else
        {
            ObjectStore *bootstrap_store = nullptr;
            {
                std::shared_lock lk(shard_locks_.front().mutex);
                bootstrap_store = shard_stores_.front();
            }
            err = bootstrap_store == nullptr
                      ? KvError::CloudErr
                      : bootstrap_store->BootstrapUpsertTermFile(
                            CurrentTermFileNameForBranchAndPartitionGroup(
                                store_->Branch(), store_->PartitionGroupId()),
                            store_->Term());
        }
        bootstrap_state_.SetDone(err);
    }

    while (true)
    {
        bool http_active = ProcessHttpWork(worker_index);
        const bool stopping = stopping_.load(std::memory_order_acquire);

        bool started_jobs = false;
        PendingJob ready_jobs[128];
        size_t nready = job_queues_[worker_index].try_dequeue_bulk(
            ready_jobs, std::size(ready_jobs));
        for (size_t i = 0; i < nready; ++i)
        {
            PendingJob &ready_job = ready_jobs[i];
            if (ready_job.store == nullptr || ready_job.task == nullptr)
            {
                continue;
            }
            pending_jobs_.fetch_sub(1, std::memory_order_relaxed);
            if (stopping)
            {
                ready_job.task->error_ = KvError::NotRunning;
                NotifyTaskFinished(ready_job.task);
                continue;
            }
            ready_job.store->StartHttpRequest(ready_job.task);
            started_jobs = true;
        }

        if (stopping)
        {
            if (!http_active)
            {
                break;
            }
            continue;
        }

        if (http_active || started_jobs)
        {
            continue;
        }

        PendingJob job;
        bool has_job =
            job_queues_[worker_index].wait_dequeue_timed(job, wait_timeout_us);
        if (has_job && job.store != nullptr && job.task != nullptr)
        {
            pending_jobs_.fetch_sub(1, std::memory_order_relaxed);
            if (stopping_.load(std::memory_order_acquire))
            {
                job.task->error_ = KvError::NotRunning;
                NotifyTaskFinished(job.task);
                continue;
            }
            job.store->StartHttpRequest(job.task);
        }
    }
}

bool CloudStorageService::ProcessHttpWork(size_t worker_index)
{
    bool active = false;
    for (size_t shard_idx = worker_index; shard_idx < shard_stores_.size();
         shard_idx += worker_count_)
    {
        ObjectStore *object_store = nullptr;
        std::shared_lock lk(shard_locks_[shard_idx].mutex);
        object_store = shard_stores_[shard_idx];
        if (object_store == nullptr)
        {
            continue;
        }
        object_store->RunHttpWork();
        active |= !object_store->HttpWorkIdle();
    }
    return active;
}

}  // namespace eloqstore