#include <random.h>
#include <scheduler.h>
#include <util/time.h>
#include <boost/test/unit_test.hpp>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
BOOST_AUTO_TEST_SUITE(scheduler_tests)
static void microTask(CScheduler& s, std::mutex& mutex, int& counter, int delta, std::chrono::steady_clock::time_point rescheduleTime)
{
{
std::lock_guard<std::mutex> lock(mutex);
counter += delta;
}
auto noTime = std::chrono::steady_clock::time_point::min();
if (rescheduleTime != noTime) {
CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime);
s.schedule(f, rescheduleTime);
}
}
BOOST_AUTO_TEST_CASE(manythreads)
{
CScheduler microTasks;
std::mutex counterMutex[10];
int counter[10] = { 0 };
FastRandomContext rng{true};
auto zeroToNine = [](FastRandomContext& rc) -> int { return rc.randrange(10); }; auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); };
auto start = std::chrono::steady_clock::now();
auto now = start;
std::chrono::steady_clock::time_point first, last;
size_t nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 0);
for (int i = 0; i < 100; ++i) {
auto t = now + std::chrono::microseconds(randomMsec(rng));
auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
nTasks = microTasks.getQueueInfo(first, last);
BOOST_CHECK(nTasks == 100);
BOOST_CHECK(first < last);
BOOST_CHECK(last > now);
std::vector<std::thread> microThreads;
microThreads.reserve(10);
for (int i = 0; i < 5; i++)
microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
UninterruptibleSleep(std::chrono::microseconds{600});
now = std::chrono::steady_clock::now();
for (int i = 0; i < 5; i++)
microThreads.emplace_back(std::bind(&CScheduler::serviceQueue, µTasks));
for (int i = 0; i < 100; i++) {
auto t = now + std::chrono::microseconds(randomMsec(rng));
auto tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng));
int whichCounter = zeroToNine(rng);
CScheduler::Function f = std::bind(µTask, std::ref(microTasks),
std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]),
randomDelta(rng), tReschedule);
microTasks.schedule(f, t);
}
microTasks.StopWhenDrained();
for (auto& thread: microThreads) {
if (thread.joinable()) thread.join();
}
int counterSum = 0;
for (int i = 0; i < 10; i++) {
BOOST_CHECK(counter[i] != 0);
counterSum += counter[i];
}
BOOST_CHECK_EQUAL(counterSum, 200);
}
BOOST_AUTO_TEST_CASE(wait_until_past)
{
std::condition_variable condvar;
Mutex mtx;
WAIT_LOCK(mtx, lock);
const auto no_wait = [&](const std::chrono::seconds& d) {
return condvar.wait_until(lock, std::chrono::steady_clock::now() - d);
};
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::seconds{1}));
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::minutes{1}));
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1}));
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{10}));
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{100}));
BOOST_CHECK(std::cv_status::timeout == no_wait(std::chrono::hours{1000}));
}
BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered)
{
CScheduler scheduler;
SerialTaskRunner queue1(scheduler);
SerialTaskRunner queue2(scheduler);
std::vector<std::thread> threads;
threads.reserve(5);
for (int i = 0; i < 5; ++i) {
threads.emplace_back([&] { scheduler.serviceQueue(); });
}
int counter1 = 0;
int counter2 = 0;
for (int i = 0; i < 100; ++i) {
queue1.insert([i, &counter1]() {
bool expectation = i == counter1++;
assert(expectation);
});
queue2.insert([i, &counter2]() {
bool expectation = i == counter2++;
assert(expectation);
});
}
scheduler.StopWhenDrained();
for (auto& thread: threads) {
if (thread.joinable()) thread.join();
}
BOOST_CHECK_EQUAL(counter1, 100);
BOOST_CHECK_EQUAL(counter2, 100);
}
BOOST_AUTO_TEST_CASE(mockforward)
{
CScheduler scheduler;
int counter{0};
CScheduler::Function dummy = [&counter]{counter++;};
scheduler.scheduleFromNow(dummy, std::chrono::minutes{2});
scheduler.scheduleFromNow(dummy, std::chrono::minutes{5});
scheduler.scheduleFromNow(dummy, std::chrono::minutes{8});
std::chrono::steady_clock::time_point first, last;
size_t num_tasks = scheduler.getQueueInfo(first, last);
BOOST_CHECK_EQUAL(num_tasks, 3ul);
std::thread scheduler_thread([&]() { scheduler.serviceQueue(); });
scheduler.MockForward(std::chrono::minutes{5});
scheduler.scheduleFromNow([&scheduler] { scheduler.stop(); }, std::chrono::milliseconds{1});
scheduler_thread.join();
num_tasks = scheduler.getQueueInfo(first, last);
BOOST_CHECK_EQUAL(num_tasks, 1ul);
BOOST_CHECK_EQUAL(counter, 2);
auto now = std::chrono::steady_clock::now();
int delta = std::chrono::duration_cast<std::chrono::seconds>(first - now).count();
BOOST_CHECK(delta > 2*60 && delta < 3*60);
}
BOOST_AUTO_TEST_SUITE_END()