#ifndef PMEMOBJ_CONDVARIABLE_HPP
#define PMEMOBJ_CONDVARIABLE_HPP
#include <chrono>
#include <condition_variable>
#include "libpmemobj++/detail/conversions.hpp"
#include "libpmemobj++/mutex.hpp"
#include "libpmemobj/thread.h"
namespace pmem
{
namespace obj
{
class condition_variable {
typedef std::chrono::system_clock clock_type;
public:
typedef PMEMcond *native_handle_type;
condition_variable()
{
PMEMobjpool *pop;
if ((pop = pmemobj_pool_by_ptr(&pcond)) == nullptr)
throw lock_error(
1, std::generic_category(),
"Persistent condition variable not from"
" persistent memory.");
pmemobj_cond_zero(pop, &pcond);
}
~condition_variable() = default;
void
notify_one()
{
PMEMobjpool *pop = pmemobj_pool_by_ptr(this);
if (int ret = pmemobj_cond_signal(pop, &this->pcond))
throw lock_error(ret, std::system_category(),
"Error notifying one on "
"a condition variable.");
}
void
notify_all()
{
PMEMobjpool *pop = pmemobj_pool_by_ptr(this);
if (int ret = pmemobj_cond_broadcast(pop, &this->pcond))
throw lock_error(ret, std::system_category(),
"Error notifying all on "
"a condition variable.");
}
void
wait(mutex &lock)
{
this->wait_impl(lock);
}
template <typename Lock>
void
wait(Lock &lock)
{
this->wait_impl(*lock.mutex());
}
template <typename Predicate>
void
wait(mutex &lock, Predicate pred)
{
this->wait_impl(lock, std::move(pred));
}
template <typename Lock, typename Predicate>
void
wait(Lock &lock, Predicate pred)
{
this->wait_impl(*lock.mutex(), std::move(pred));
}
template <typename Clock, typename Duration>
std::cv_status
wait_until(mutex &lock,
const std::chrono::time_point<Clock, Duration> &timeout)
{
return this->wait_until_impl(lock, timeout);
}
template <typename Lock, typename Clock, typename Duration>
std::cv_status
wait_until(Lock &lock,
const std::chrono::time_point<Clock, Duration> &timeout)
{
return this->wait_until_impl(*lock.mutex(), timeout);
}
template <typename Clock, typename Duration, typename Predicate>
bool
wait_until(mutex &lock,
const std::chrono::time_point<Clock, Duration> &timeout,
Predicate pred)
{
return this->wait_until_impl(lock, timeout, std::move(pred));
}
template <typename Lock, typename Clock, typename Duration,
typename Predicate>
bool
wait_until(Lock &lock,
const std::chrono::time_point<Clock, Duration> &timeout,
Predicate pred)
{
return this->wait_until_impl(*lock.mutex(), timeout,
std::move(pred));
}
template <typename Lock, typename Rep, typename Period>
std::cv_status
wait_for(Lock &lock, const std::chrono::duration<Rep, Period> &rel_time)
{
return this->wait_until_impl(*lock.mutex(),
clock_type::now() + rel_time);
}
template <typename Lock, typename Rep, typename Period,
typename Predicate>
bool
wait_for(Lock &lock, const std::chrono::duration<Rep, Period> &rel_time,
Predicate pred)
{
return this->wait_until_impl(*lock.mutex(),
clock_type::now() + rel_time,
std::move(pred));
}
template <typename Rep, typename Period>
std::cv_status
wait_for(mutex &lock,
const std::chrono::duration<Rep, Period> &rel_time)
{
return this->wait_until_impl(lock,
clock_type::now() + rel_time);
}
template <typename Rep, typename Period, typename Predicate>
bool
wait_for(mutex &lock,
const std::chrono::duration<Rep, Period> &rel_time,
Predicate pred)
{
return this->wait_until_impl(lock, clock_type::now() + rel_time,
std::move(pred));
}
native_handle_type
native_handle() noexcept
{
return &this->pcond;
}
condition_variable &operator=(const condition_variable &) = delete;
condition_variable(const condition_variable &) = delete;
private:
void
wait_impl(mutex &lock)
{
PMEMobjpool *pop = pmemobj_pool_by_ptr(this);
if (int ret = pmemobj_cond_wait(pop, &this->pcond,
lock.native_handle()))
throw lock_error(ret, std::system_category(),
"Error waiting on a condition "
"variable.");
}
template <typename Predicate>
void
wait_impl(mutex &lock, Predicate pred)
{
while (!pred())
this->wait(lock);
}
template <typename Clock, typename Duration>
std::cv_status
wait_until_impl(
mutex &lock,
const std::chrono::time_point<Clock, Duration> &abs_timeout)
{
PMEMobjpool *pop = pmemobj_pool_by_ptr(this);
const typename Clock::time_point their_now = Clock::now();
const clock_type::time_point my_now = clock_type::now();
const auto delta = abs_timeout - their_now;
const auto my_rel = my_now + delta;
struct timespec ts = detail::timepoint_to_timespec(my_rel);
auto ret = pmemobj_cond_timedwait(pop, &this->pcond,
lock.native_handle(), &ts);
if (ret == 0)
return std::cv_status::no_timeout;
else if (ret == ETIMEDOUT)
return std::cv_status::timeout;
else
throw lock_error(ret, std::system_category(),
"Error waiting on a condition "
"variable.");
}
template <typename Clock, typename Duration, typename Predicate>
bool
wait_until_impl(
mutex &lock,
const std::chrono::time_point<Clock, Duration> &abs_timeout,
Predicate pred)
{
while (!pred())
if (this->wait_until_impl(lock, abs_timeout) ==
std::cv_status::timeout)
return pred();
return true;
}
PMEMcond pcond;
};
}
}
#endif