#ifndef UTIL_PCQUEUE_H
#define UTIL_PCQUEUE_H
#include "exception.hh"
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/utility.hpp>
#include <cerrno>
#ifdef __APPLE__
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_traps.h>
#include <mach/mach.h>
#endif
namespace util {
#ifdef __APPLE__
#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")
class Semaphore {
public:
explicit Semaphore(int value) : task_(mach_task_self()) {
MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
}
~Semaphore() {
MACH_CALL(semaphore_destroy(task_, back_));
}
void wait() {
MACH_CALL(semaphore_wait(back_));
}
void post() {
MACH_CALL(semaphore_signal(back_));
}
private:
semaphore_t back_;
task_t task_;
};
inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}
#else
typedef boost::interprocess::interprocess_semaphore Semaphore;
inline void WaitSemaphore (Semaphore &on) {
while (1) {
try {
on.wait();
break;
}
catch (boost::interprocess::interprocess_exception &e) {
if (e.get_native_error() != EINTR) {
throw;
}
}
}
}
#endif
template <class T> class PCQueue : boost::noncopyable {
public:
explicit PCQueue(size_t size)
: empty_(size), used_(0),
storage_(new T[size]),
end_(storage_.get() + size),
produce_at_(storage_.get()),
consume_at_(storage_.get()) {}
void Produce(const T &val) {
WaitSemaphore(empty_);
{
boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
try {
*produce_at_ = val;
}
catch (...) {
empty_.post();
throw;
}
if (++produce_at_ == end_) produce_at_ = storage_.get();
}
used_.post();
}
T& Consume(T &out) {
WaitSemaphore(used_);
{
boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
try {
out = *consume_at_;
}
catch (...) {
used_.post();
throw;
}
if (++consume_at_ == end_) consume_at_ = storage_.get();
}
empty_.post();
return out;
}
T Consume() {
T ret;
Consume(ret);
return ret;
}
private:
Semaphore empty_;
Semaphore used_;
boost::scoped_array<T> storage_;
T *const end_;
T *produce_at_;
boost::mutex produce_at_mutex_;
T *consume_at_;
boost::mutex consume_at_mutex_;
};
}
#endif