#include "absl/flags/internal/sequence_lock.h"
#include <algorithm>
#include <atomic>
#include <thread>
#include <tuple>
#include <vector>
#include "gtest/gtest.h"
#include "absl/base/internal/sysinfo.h"
#include "absl/container/fixed_array.h"
#include "absl/time/clock.h"
namespace {
namespace flags = absl::flags_internal;
class ConcurrentSequenceLockTest
: public testing::TestWithParam<std::tuple<int, int>> {
public:
ConcurrentSequenceLockTest()
: buf_bytes_(std::get<0>(GetParam())),
num_threads_(std::get<1>(GetParam())) {}
protected:
const int buf_bytes_;
const int num_threads_;
};
TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) {
const int buf_words =
flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t);
absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words);
for (auto& v : protected_buf) v = -1;
flags::SequenceLock seq_lock;
std::atomic<bool> stop{false};
std::atomic<int64_t> bad_reads{0};
std::atomic<int64_t> good_reads{0};
std::atomic<int64_t> unsuccessful_reads{0};
std::vector<std::thread> threads;
for (int i = 0; i < num_threads_; i++) {
threads.emplace_back([&]() {
absl::FixedArray<char> local_buf(buf_bytes_);
while (!stop.load(std::memory_order_relaxed)) {
if (seq_lock.TryRead(local_buf.data(), protected_buf.data(),
buf_bytes_)) {
bool good = true;
for (const auto& v : local_buf) {
if (v != local_buf[0]) good = false;
}
if (good) {
good_reads.fetch_add(1, std::memory_order_relaxed);
} else {
bad_reads.fetch_add(1, std::memory_order_relaxed);
}
} else {
unsuccessful_reads.fetch_add(1, std::memory_order_relaxed);
}
}
});
}
while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) {
absl::SleepFor(absl::Milliseconds(1));
}
seq_lock.MarkInitialized();
absl::Time deadline = absl::Now() + absl::Seconds(5);
for (int i = 0; i < 100 && absl::Now() < deadline; i++) {
absl::FixedArray<char> writer_buf(buf_bytes_);
for (auto& v : writer_buf) v = i;
seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_);
absl::SleepFor(absl::Microseconds(10));
}
stop.store(true, std::memory_order_relaxed);
for (auto& t : threads) t.join();
ASSERT_GE(good_reads, 0);
ASSERT_EQ(bad_reads, 0);
}
std::vector<int> MultiplicativeRange(int low, int high, int scale) {
std::vector<int> result;
for (int current = low; current < high; current *= scale) {
result.push_back(current);
}
result.push_back(high);
return result;
}
#ifndef ABSL_HAVE_THREAD_SANITIZER
const int kMaxThreads = absl::base_internal::NumCPUs();
#else
const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4);
#endif
std::vector<int> InterestingBufferSizes() {
std::vector<int> ret;
for (int v : MultiplicativeRange(1, 128, 2)) {
ret.push_back(v);
if (v > 1) {
ret.push_back(v - 1);
}
ret.push_back(v + 1);
}
return ret;
}
INSTANTIATE_TEST_SUITE_P(
TestManyByteSizes, ConcurrentSequenceLockTest,
testing::Combine(
testing::ValuesIn(InterestingBufferSizes()),
testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2))));
class SequenceLockTest : public testing::TestWithParam<int> {};
TEST_P(SequenceLockTest, SingleThreaded) {
const int size = GetParam();
absl::FixedArray<std::atomic<uint64_t>> protected_buf(
flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t));
flags::SequenceLock seq_lock;
seq_lock.MarkInitialized();
std::vector<char> src_buf(size, 'x');
seq_lock.Write(protected_buf.data(), src_buf.data(), size);
std::vector<char> dst_buf(size, '0');
ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size));
ASSERT_EQ(src_buf, dst_buf);
}
INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest,
testing::Range(1, 128));
}