1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#pragma once
// Based on the following design:
// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
#include <atomic>
#include "common/assert.h"
#include "common/copy_constructors.h"
namespace lbug {
namespace common {
// Producers are completely wait-free.
template<typename T>
class MPSCQueue {
struct Node {
T data;
std::atomic<Node*> next;
explicit Node(T data) : data(std::move(data)), next(nullptr) {}
};
public:
MPSCQueue() : head(nullptr), tail(nullptr), _approxSize(0) {
// Allocate a dummy element.
Node* stub = new Node(T());
head = stub;
// Ordering doesn't matter.
tail.store(stub, std::memory_order_relaxed);
}
DELETE_BOTH_COPY(MPSCQueue);
MPSCQueue(MPSCQueue&& other)
: head(other.head), tail(other.tail.exchange(nullptr, std::memory_order_relaxed)),
_approxSize(other._approxSize.load(std::memory_order_relaxed)) {
other.head = nullptr;
}
// If this method existed, it wouldn't be atomic, and so would be rather error-prone. Maybe
// there's a valid future use case.
DELETE_MOVE_ASSN(MPSCQueue);
// NOTE: It is NOT guaranteed that the result of a push() is accessible to a thread that calls
// pop() after the push(), because of implementation details. See the body of the function for
// details.
void push(T elem) {
Node* node = new Node(std::move(elem));
_approxSize.fetch_add(1, std::memory_order_relaxed);
// ORDERING: must acquire any updates to prev before modifying it, and release our updates
// to node for other producers.
Node* prev = tail.exchange(node, std::memory_order_acq_rel);
// NOTE: If the thread is suspended here, then ALL FUTURE push() calls will be INACCESSIBLE
// by pop() calls until the next line runs. In order to guarantee that a push() is visible
// to a thread that calls pop(), ALL push() calls must have completed.
// ORDERING: must make updates visible to consumers.
prev->next.store(node, std::memory_order_release);
}
// NOTE: It is NOT safe to call pop() from multiple threads without synchronization.
bool pop(T& elem) {
// ORDERING: Acquire any updates made by producers.
// Note that head is accessed only by the single consumer, so accesses to it need not be
// synchronized.
Node* next = head->next.load(std::memory_order_acquire);
if (next == nullptr) {
return false;
}
// Free the old element.
delete head;
head = next;
elem = std::move(head->data);
_approxSize.fetch_sub(1, std::memory_order_relaxed);
// Now the current head has dummy data in it again (i.e., whatever was leftover after the
// move()).
return true;
}
// Return an approximation of the number of elements in the queue.
// Due to implementation details, this number must not be relied on. However, it can be used to
// get a rough estimate for the size of the queue.
size_t approxSize() const { return _approxSize.load(std::memory_order_relaxed); }
// Drain the queue. All operations on the queue MUST have finished. I.e., there must be NO
// push() or pop() operations in progress of any kind.
~MPSCQueue() {
// If we were moved out of, return.
if (!head) {
return;
}
T dummy;
while (pop(dummy)) {}
DASSERT(head == tail.load(std::memory_order_relaxed));
delete head;
}
private:
// Head is always present, but always has dummy data. This ensures that it is always easy to
// append to the list, without branching in the methods.
Node* head;
std::atomic<Node*> tail;
std::atomic<size_t> _approxSize;
};
} // namespace common
} // namespace lbug