#ifndef GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
#define GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <atomic>
#include <string>
#include <type_traits>
#include <utility>
#include "absl/strings/str_cat.h"
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/promise/activity.h"
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/trace.h"
namespace grpc_core {
template <typename T>
class Latch {
public:
Latch() = default;
Latch(const Latch&) = delete;
Latch& operator=(const Latch&) = delete;
Latch(Latch&& other) noexcept
: value_(std::move(other.value_)), has_value_(other.has_value_) {
#ifndef NDEBUG
GPR_DEBUG_ASSERT(!other.has_had_waiters_);
#endif
}
Latch& operator=(Latch&& other) noexcept {
#ifndef NDEBUG
GPR_DEBUG_ASSERT(!other.has_had_waiters_);
#endif
value_ = std::move(other.value_);
has_value_ = other.has_value_;
return *this;
}
auto Wait() {
#ifndef NDEBUG
has_had_waiters_ = true;
#endif
return [this]() -> Poll<T> {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sWait %s", DebugTag().c_str(),
StateString().c_str());
}
if (has_value_) {
return std::move(value_);
} else {
return waiter_.pending();
}
};
}
auto WaitAndCopy() {
#ifndef NDEBUG
has_had_waiters_ = true;
#endif
return [this]() -> Poll<T> {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sWaitAndCopy %s", DebugTag().c_str(),
StateString().c_str());
}
if (has_value_) {
return value_;
} else {
return waiter_.pending();
}
};
}
void Set(T value) {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
}
GPR_DEBUG_ASSERT(!has_value_);
value_ = std::move(value);
has_value_ = true;
waiter_.Wake();
}
bool is_set() const { return has_value_; }
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
std::string StateString() {
return absl::StrCat("has_value:", has_value_ ? "true" : "false",
" waiter:", waiter_.DebugString());
}
GPR_NO_UNIQUE_ADDRESS T value_;
bool has_value_ = false;
#ifndef NDEBUG
bool has_had_waiters_ = false;
#endif
IntraActivityWaiter waiter_;
};
template <>
class Latch<void> {
public:
Latch() = default;
Latch(const Latch&) = delete;
Latch& operator=(const Latch&) = delete;
Latch(Latch&& other) noexcept : is_set_(other.is_set_) {
#ifndef NDEBUG
GPR_DEBUG_ASSERT(!other.has_had_waiters_);
#endif
}
Latch& operator=(Latch&& other) noexcept {
#ifndef NDEBUG
GPR_DEBUG_ASSERT(!other.has_had_waiters_);
#endif
is_set_ = other.is_set_;
return *this;
}
auto Wait() {
#ifndef NDEBUG
has_had_waiters_ = true;
#endif
return [this]() -> Poll<Empty> {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
StateString().c_str());
}
if (is_set_) {
return Empty{};
} else {
return waiter_.pending();
}
};
}
void Set() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
}
GPR_DEBUG_ASSERT(!is_set_);
is_set_ = true;
waiter_.Wake();
}
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
std::string StateString() {
return absl::StrCat("is_set:", is_set_ ? "true" : "false",
" waiter:", waiter_.DebugString());
}
bool is_set_ = false;
#ifndef NDEBUG
bool has_had_waiters_ = false;
#endif
IntraActivityWaiter waiter_;
};
template <typename T>
class ExternallyObservableLatch;
template <>
class ExternallyObservableLatch<void> {
public:
ExternallyObservableLatch() = default;
ExternallyObservableLatch(const ExternallyObservableLatch&) = delete;
ExternallyObservableLatch& operator=(const ExternallyObservableLatch&) =
delete;
auto Wait() {
return [this]() -> Poll<Empty> {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
StateString().c_str());
}
if (IsSet()) {
return Empty{};
} else {
return waiter_.pending();
}
};
}
void Set() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
}
is_set_.store(true, std::memory_order_relaxed);
waiter_.Wake();
}
bool IsSet() const { return is_set_.load(std::memory_order_relaxed); }
void Reset() {
if (grpc_trace_promise_primitives.enabled()) {
gpr_log(GPR_INFO, "%sReset %s", DebugTag().c_str(),
StateString().c_str());
}
is_set_.store(false, std::memory_order_relaxed);
}
private:
std::string DebugTag() {
return absl::StrCat(Activity::current()->DebugTag(), " LATCH(void)[0x",
reinterpret_cast<uintptr_t>(this), "]: ");
}
std::string StateString() {
return absl::StrCat(
"is_set:", is_set_.load(std::memory_order_relaxed) ? "true" : "false",
" waiter:", waiter_.DebugString());
}
std::atomic<bool> is_set_{false};
IntraActivityWaiter waiter_;
};
template <typename T>
using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait());
}
#endif