1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cassert>
#include <condition_variable>
#include <mutex>
#ifdef ROCKSDB_USE_STD_SEMAPHORES
#include <semaphore>
#endif
#include "port/port.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
// Wrapper providing a chosen counting semaphore implementation. The default
// implementation based on a mutex and condvar unfortunately can result in
// Release() temporarily waiting on another thread to make progress (if that
// other thread is preempted while holding the mutex), but that should be rare.
// However, alternative implementations may have correctness issues or even
// worse performance. See std::counting_semaphore for general contract.
//
// NOTE1: std::counting_semaphore is known to be buggy on many std library
// implementations, so be cautious about enabling it. Reportedly, an acquire()
// can falsely block indefinitely. And we can't easily work around that with
// try_acquire_for because another common bug has that function consistently
// sleeping for the entire timeout duration even if a release() happens earlier.
// Therefore, using std::counting_semaphore/binary_semaphore is strictly opt-in
// for now.
//
// NOTE2: Also tried wrapping folly::fibers::Semaphore here but it was not as
// efficient (for parallel compression) as even the mutex+condvar version.
class ALIGN_AS(CACHE_LINE_SIZE) CountingSemaphore {
public:
explicit CountingSemaphore(std::ptrdiff_t starting_count)
#ifdef ROCKSDB_USE_STD_SEMAPHORES
: sem_(starting_count)
#else
: count_(static_cast<int32_t>(starting_count))
#endif // ROCKSDB_USE_STD_SEMAPHORES
{
assert(starting_count >= 0);
assert(starting_count <= INT32_MAX);
}
void Acquire() {
#ifdef ROCKSDB_USE_STD_SEMAPHORES
sem_.acquire();
#else
std::unique_lock<std::mutex> lock(mutex_);
assert(count_ >= 0);
cv_.wait(lock, [this] { return count_ > 0; });
--count_;
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
bool TryAcquire() {
#ifdef ROCKSDB_USE_STD_SEMAPHORES
return sem_.try_acquire();
#else
std::unique_lock<std::mutex> lock(mutex_);
assert(count_ >= 0);
if (count_ == 0) {
return false;
} else {
--count_;
return true;
}
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
void Release(std::ptrdiff_t n = 1) {
#ifdef ROCKSDB_USE_STD_SEMAPHORES
sem_.release(n);
#else
assert(n >= 0);
assert(n <= INT32_MAX);
if (n > 0) {
std::unique_lock<std::mutex> lock(mutex_);
assert(count_ >= 0);
count_ += static_cast<int32_t>(n);
assert(count_ >= 0); // no overflow
if (n == 1) {
cv_.notify_one();
} else {
cv_.notify_all();
}
}
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
private:
#ifdef ROCKSDB_USE_STD_SEMAPHORES
std::counting_semaphore<INT32_MAX> sem_;
#else
int32_t count_;
std::mutex mutex_;
std::condition_variable cv_;
#endif // ROCKSDB_USE_STD_SEMAPHORES
}; // namespace ROCKSDB_NAMESPACE
// Wrapper providing a chosen binary semaphore implementation. See notes on
// CountingSemaphore above, and on Release() below.
class BinarySemaphore {
public:
explicit BinarySemaphore(std::ptrdiff_t starting_count)
#ifdef ROCKSDB_USE_STD_SEMAPHORES
: sem_(starting_count)
#else
: state_(starting_count > 0)
#endif // ROCKSDB_USE_STD_SEMAPHORES
{
assert(starting_count >= 0);
}
void Acquire() {
#ifdef ROCKSDB_USE_STD_SEMAPHORES
sem_.acquire();
#else
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return state_; });
state_ = false;
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
bool TryAcquire() {
#ifdef ROCKSDB_USE_STD_SEMAPHORES
return sem_.try_acquire();
#else
std::unique_lock<std::mutex> lock(mutex_);
if (state_) {
state_ = false;
return true;
} else {
return false;
}
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
void Release() {
// NOTE: implementations of std::binary_semaphore::release() tend to behave
// like counting semaphores in the case of multiple Release() calls without
// Acquire() in between, though it is undefined behavior. It is also OK to
// cap the count at 1.
#ifdef ROCKSDB_USE_STD_SEMAPHORES
sem_.release();
#else
std::unique_lock<std::mutex> lock(mutex_);
// check precondition to avoid UB in std implementation
assert(state_ == false);
state_ = true;
cv_.notify_one();
#endif // ROCKSDB_USE_STD_SEMAPHORES
}
private:
#ifdef ROCKSDB_USE_STD_SEMAPHORES
std::binary_semaphore sem_;
#else
bool state_;
std::mutex mutex_;
std::condition_variable cv_;
#endif // ROCKSDB_USE_STD_SEMAPHORES
};
} // namespace ROCKSDB_NAMESPACE