#include <grpc/support/port_platform.h>
#include <functional>
#include <utility>
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include <grpcpp/alarm.h>
#include <grpcpp/completion_queue.h>
#include <grpcpp/impl/completion_queue_tag.h>
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
namespace grpc {
namespace internal {
class AlarmImpl : public grpc::internal::CompletionQueueTag {
public:
AlarmImpl() : cq_(nullptr), tag_(nullptr) {
gpr_ref_init(&refs_, 1);
grpc_timer_init_unset(&timer_);
}
~AlarmImpl() override {}
bool FinalizeResult(void** tag, bool* ) override {
*tag = tag_;
Unref();
return true;
}
void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
cq_ = cq->cq();
tag_ = tag;
GPR_ASSERT(grpc_cq_begin_op(cq_, this));
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
AlarmImpl* alarm = static_cast<AlarmImpl*>(arg);
alarm->Ref();
grpc_completion_queue* cq = alarm->cq_;
alarm->cq_ = nullptr;
grpc_cq_end_op(
cq, alarm, error,
[](void* , grpc_cq_completion* ) {}, arg,
&alarm->completion_);
GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
}
void Set(gpr_timespec deadline, std::function<void(bool)> f) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
callback_ = std::move(f);
Ref();
GRPC_CLOSURE_INIT(
&on_alarm_,
[](void* arg, grpc_error_handle error) {
grpc_core::Executor::Run(GRPC_CLOSURE_CREATE(
[](void* arg, grpc_error_handle error) {
AlarmImpl* alarm =
static_cast<AlarmImpl*>(arg);
alarm->callback_(error.ok());
alarm->Unref();
},
arg, nullptr),
error);
},
this, grpc_schedule_on_exec_ctx);
grpc_timer_init(&timer_,
grpc_core::Timestamp::FromTimespecRoundUp(deadline),
&on_alarm_);
}
void Cancel() {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_timer_cancel(&timer_);
}
void Destroy() {
Cancel();
Unref();
}
private:
void Ref() { gpr_ref(&refs_); }
void Unref() {
if (gpr_unref(&refs_)) {
delete this;
}
}
grpc_timer timer_;
gpr_refcount refs_;
grpc_closure on_alarm_;
grpc_cq_completion completion_;
grpc_completion_queue* cq_;
void* tag_;
std::function<void(bool)> callback_;
};
}
Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
void* tag) {
static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
}
void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
}
Alarm::~Alarm() {
if (alarm_ != nullptr) {
static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
}
}
void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
}