#pragma once
#include "frame/Frame.hpp"
#include <queue>
namespace libobsensor {
template <typename T = Frame> class FrameQueue {
public:
explicit FrameQueue(size_t capacity) : capacity_(capacity), stopped_(true), stopping_(false), callback_(nullptr), flushing_(false) {}
~FrameQueue() noexcept {
reset();
}
size_t capacity() const {
return capacity_;
}
void resize(size_t capacity) {
capacity_ = capacity;
}
size_t size() const {
return queue_.size();
}
bool empty() const {
return queue_.empty();
}
bool fulled() const {
return queue_.size() >= capacity_;
}
bool enqueue(std::shared_ptr<T> frame) { std::unique_lock<std::mutex> lock(mutex_);
if(queue_.size() >= capacity_ || flushing_) {
return false;
}
queue_.push(frame);
condition_.notify_all();
return true;
}
std::shared_ptr<T> dequeue(uint64_t timeoutMsec = 0) { std::unique_lock<std::mutex> lock(mutex_);
if(!queue_.empty()) {
auto result = queue_.front();
queue_.pop();
return result;
}
if(timeoutMsec == 0) {
return nullptr;
}
condition_.wait_for(lock, std::chrono::milliseconds(timeoutMsec), [this] { return !queue_.empty(); });
if(queue_.empty()) {
return nullptr;
}
auto result = queue_.front();
queue_.pop();
return result;
}
void start(std::function<void(std::shared_ptr<T>)> callback) { if(isStarted()) {
throw libobsensor::wrong_api_call_sequence_exception("FrameQueue have already started!");
}
callback_ = callback;
stopped_ = false;
stopping_ = false;
flushing_ = false;
dequeueThread_ = std::thread([&] {
while(true) {
std::shared_ptr<T> frame;
{
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty() || stopping_ || flushing_; });
if(stopping_) {
break;
}
if(flushing_ && queue_.empty()) {
break;
}
if (!queue_.empty()) {
frame = queue_.front();
queue_.pop();
}
}
if(frame) {
callback_(frame);
}
}
stopped_ = true;
});
}
bool isStarted() const { return !stopped_;
}
void flush() { {
std::unique_lock<std::mutex> lock(mutex_);
flushing_ = true;
condition_.notify_all();
}
if(dequeueThread_.joinable()) {
dequeueThread_.join();
}
}
void stop() { {
std::unique_lock<std::mutex> lock(mutex_);
stopping_ = true;
condition_.notify_all();
}
if(dequeueThread_.joinable()) {
dequeueThread_.join();
}
std::unique_lock<std::mutex> lock(mutex_);
while(!queue_.empty()) {
queue_.pop();
}
}
void reset() {
stop(); callback_ = nullptr;
stopping_ = false;
flushing_ = false;
stopped_ = true;
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::queue<std::shared_ptr<T>> queue_;
size_t capacity_;
std::thread dequeueThread_;
std::atomic<bool> stopped_;
std::atomic<bool> stopping_;
std::function<void(std::shared_ptr<T>)> callback_;
std::atomic<bool> flushing_;
};
}