#ifndef HIGHS_SPLIT_DEQUE_H_
#define HIGHS_SPLIT_DEQUE_H_
#include <array>
#include <atomic>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <thread>
#include "parallel/HighsBinarySemaphore.h"
#include "parallel/HighsCacheAlign.h"
#include "parallel/HighsSpinMutex.h"
#include "parallel/HighsTask.h"
#include "util/HighsInt.h"
#include "util/HighsRandom.h"
#ifdef __has_feature
#if __has_feature(thread_sanitizer)
#define TSAN_ENABLED
#endif
#endif
#ifdef __SANITIZE_THREAD__
#define TSAN_ENABLED
#endif
#ifdef TSAN_ENABLED
#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr) \
AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr))
#define TSAN_ANNOTATE_HAPPENS_AFTER(addr) \
AnnotateHappensAfter(__FILE__, __LINE__, (void*)(addr))
extern "C" void AnnotateHappensBefore(const char* f, int l, void* addr);
extern "C" void AnnotateHappensAfter(const char* f, int l, void* addr);
#else
#define TSAN_ANNOTATE_HAPPENS_BEFORE(addr)
#define TSAN_ANNOTATE_HAPPENS_AFTER(addr)
#endif
class HighsSplitDeque {
using cache_aligned = highs::cache_aligned;
public:
enum Constants {
kTaskArraySize = 8192,
};
struct WorkerBunk;
private:
struct OwnerData {
cache_aligned::shared_ptr<WorkerBunk> workerBunk = nullptr;
cache_aligned::unique_ptr<HighsSplitDeque>* workers = nullptr;
HighsRandom randgen;
uint32_t head = 0;
uint32_t splitCopy = 0;
int numWorkers = 0;
int ownerId = -1;
HighsTask* rootTask = nullptr;
bool allStolenCopy = true;
};
struct StealerData {
HighsBinarySemaphore semaphore{0};
HighsTask* injectedTask{nullptr};
std::atomic<uint64_t> ts{0};
std::atomic<bool> allStolen{true};
};
struct TaskMetadata {
std::atomic<HighsSplitDeque*> stealer;
};
static constexpr uint64_t makeTailSplit(uint32_t tail, uint32_t split) {
return (uint64_t(tail) << 32) | split;
}
static constexpr uint32_t tail(uint64_t tailSplit) { return tailSplit >> 32; }
static constexpr uint32_t split(uint64_t tailSplit) {
return static_cast<uint32_t>(tailSplit);
}
struct WorkerBunkData {
std::atomic<HighsSplitDeque*> nextSleeper{nullptr};
int ownerId;
};
public:
struct WorkerBunk {
static constexpr uint64_t kAbaTagShift = 20;
static constexpr uint64_t kIndexMask = (uint64_t{1} << kAbaTagShift) - 1;
alignas(64) std::atomic<int> haveJobs;
alignas(64) std::atomic<uint64_t> sleeperStack;
WorkerBunk() : haveJobs{0}, sleeperStack(0) {}
void pushSleeper(HighsSplitDeque* deque) {
uint64_t stackState = sleeperStack.load(std::memory_order_relaxed);
uint64_t newStackState;
HighsSplitDeque* head;
do {
head =
stackState & kIndexMask
? deque->ownerData.workers[(stackState & kIndexMask) - 1].get()
: nullptr;
deque->workerBunkData.nextSleeper.store(head,
std::memory_order_relaxed);
newStackState = (stackState >> kAbaTagShift) + 1;
newStackState = (newStackState << kAbaTagShift) |
uint64_t(deque->workerBunkData.ownerId + 1);
} while (!sleeperStack.compare_exchange_weak(stackState, newStackState,
std::memory_order_release,
std::memory_order_relaxed));
}
HighsSplitDeque* popSleeper(HighsSplitDeque* localDeque) {
uint64_t stackState = sleeperStack.load(std::memory_order_relaxed);
HighsSplitDeque* head;
HighsSplitDeque* newHead;
uint64_t newStackState;
do {
if ((stackState & kIndexMask) == 0) return nullptr;
head =
localDeque->ownerData.workers[(stackState & kIndexMask) - 1].get();
newHead =
head->workerBunkData.nextSleeper.load(std::memory_order_relaxed);
int newHeadId =
newHead != nullptr ? newHead->workerBunkData.ownerId + 1 : 0;
newStackState = (stackState >> kAbaTagShift) + 1;
newStackState = (newStackState << kAbaTagShift) | uint64_t(newHeadId);
} while (!sleeperStack.compare_exchange_weak(stackState, newStackState,
std::memory_order_acquire,
std::memory_order_relaxed));
head->workerBunkData.nextSleeper.store(nullptr,
std::memory_order_relaxed);
return head;
}
void publishWork(HighsSplitDeque* localDeque) {
HighsSplitDeque* sleeper = popSleeper(localDeque);
while (sleeper) {
uint32_t t = localDeque->selfStealAndGetTail();
if (t == localDeque->ownerData.splitCopy) {
if (localDeque->ownerData.head == localDeque->ownerData.splitCopy) {
localDeque->ownerData.allStolenCopy = true;
localDeque->stealerData.allStolen.store(true,
std::memory_order_relaxed);
haveJobs.fetch_add(-1, std::memory_order_release);
}
pushSleeper(sleeper);
return;
} else {
sleeper->injectTaskAndNotify(&localDeque->taskArray[t]);
}
if (t == localDeque->ownerData.splitCopy - 1) {
if (localDeque->ownerData.head == localDeque->ownerData.splitCopy) {
localDeque->ownerData.allStolenCopy = true;
localDeque->stealerData.allStolen.store(true,
std::memory_order_relaxed);
haveJobs.fetch_add(-1, std::memory_order_release);
}
return;
}
sleeper = popSleeper(localDeque);
}
}
HighsTask* waitForNewTask(HighsSplitDeque* localDeque) {
pushSleeper(localDeque);
localDeque->stealerData.semaphore.acquire();
TSAN_ANNOTATE_HAPPENS_AFTER(&localDeque->stealerData.injectedTask);
return localDeque->stealerData.injectedTask;
}
};
private:
static_assert(sizeof(OwnerData) <= 64,
"sizeof(OwnerData) exceeds cache line size");
static_assert(sizeof(StealerData) <= 64,
"sizeof(StealerData) exceeds cache line size");
static_assert(sizeof(WorkerBunkData) <= 64,
"sizeof(WorkerBunkData) exceeds cache line size");
alignas(64) OwnerData ownerData;
alignas(64) std::atomic<bool> splitRequest;
alignas(64) StealerData stealerData;
alignas(64) WorkerBunkData workerBunkData;
alignas(64) std::array<HighsTask, kTaskArraySize> taskArray;
void growShared() {
int haveJobs =
ownerData.workerBunk->haveJobs.load(std::memory_order_relaxed);
bool splitRq = false;
uint32_t newSplit;
if (haveJobs == ownerData.numWorkers) {
splitRq = splitRequest.load(std::memory_order_relaxed);
if (!splitRq) return;
}
newSplit = std::min(uint32_t{kTaskArraySize}, ownerData.head);
assert(newSplit > ownerData.splitCopy);
uint64_t xorMask = ownerData.splitCopy ^ newSplit;
assert((xorMask >> 32) == 0);
stealerData.ts.fetch_xor(xorMask, std::memory_order_release);
ownerData.splitCopy = newSplit;
if (splitRq)
splitRequest.store(false, std::memory_order_relaxed);
else
ownerData.workerBunk->publishWork(this);
}
bool shrinkShared() {
uint32_t t = tail(stealerData.ts.load(std::memory_order_relaxed));
uint32_t s = ownerData.splitCopy;
if (t != s) {
ownerData.splitCopy = (t + s) / 2;
t = tail(stealerData.ts.fetch_add(uint64_t{ownerData.splitCopy} - s,
std::memory_order_acq_rel));
if (t != s) {
if (t > ownerData.splitCopy) {
ownerData.splitCopy = (t + s) / 2;
stealerData.ts.store(makeTailSplit(t, ownerData.splitCopy),
std::memory_order_relaxed);
}
return false;
}
}
stealerData.allStolen.store(true, std::memory_order_relaxed);
ownerData.allStolenCopy = true;
ownerData.workerBunk->haveJobs.fetch_add(-1, std::memory_order_relaxed);
return true;
}
public:
HighsSplitDeque(const cache_aligned::shared_ptr<WorkerBunk>& workerBunk,
cache_aligned::unique_ptr<HighsSplitDeque>* workers,
int ownerId, int numWorkers) {
ownerData.ownerId = ownerId;
ownerData.workers = workers;
ownerData.numWorkers = numWorkers;
workerBunkData.ownerId = ownerId;
ownerData.randgen.initialise(ownerId);
ownerData.workerBunk = workerBunk;
splitRequest.store(false, std::memory_order_relaxed);
assert((reinterpret_cast<uintptr_t>(this) & 63u) == 0);
static_assert(offsetof(HighsSplitDeque, splitRequest) == 64,
"alignas failed to guarantee 64 byte alignment");
static_assert(offsetof(HighsSplitDeque, stealerData) == 128,
"alignas failed to guarantee 64 byte alignment");
static_assert(offsetof(HighsSplitDeque, workerBunkData) == 192,
"alignas failed to guarantee 64 byte alignment");
static_assert(offsetof(HighsSplitDeque, taskArray) == 256,
"alignas failed to guarantee 64 byte alignment");
}
void checkInterrupt() {
if (ownerData.rootTask && ownerData.rootTask->isCancelled())
throw HighsTask::Interrupt();
}
void cancelTask(HighsInt taskIndex) {
assert(taskIndex < (HighsInt)ownerData.head);
assert(taskIndex >= 0);
taskArray[taskIndex].cancel();
}
template <typename F>
void push(F&& f) {
if (ownerData.head >= kTaskArraySize) {
if (ownerData.splitCopy < kTaskArraySize && !ownerData.allStolenCopy)
growShared();
ownerData.head += 1;
f();
return;
}
taskArray[ownerData.head++].setTaskData(std::forward<F>(f));
if (ownerData.allStolenCopy) {
assert(ownerData.head > 0);
stealerData.ts.store(makeTailSplit(ownerData.head - 1, ownerData.head),
std::memory_order_release);
stealerData.allStolen.store(false, std::memory_order_relaxed);
ownerData.splitCopy = ownerData.head;
ownerData.allStolenCopy = false;
if (splitRequest.load(std::memory_order_relaxed))
splitRequest.store(false, std::memory_order_relaxed);
int haveJobs = ownerData.workerBunk->haveJobs.fetch_add(
1, std::memory_order_release);
if (haveJobs < ownerData.numWorkers - 1)
ownerData.workerBunk->publishWork(this);
} else
growShared();
}
enum class Status {
kEmpty,
kStolen,
kWork,
kOverflown,
};
std::pair<Status, HighsTask*> pop() {
if (ownerData.head == 0) return std::make_pair(Status::kEmpty, nullptr);
if (ownerData.head > kTaskArraySize) {
ownerData.head -= 1;
return std::make_pair(Status::kOverflown, nullptr);
}
if (ownerData.allStolenCopy)
return std::make_pair(Status::kStolen, &taskArray[ownerData.head - 1]);
if (ownerData.splitCopy == ownerData.head) {
if (shrinkShared())
return std::make_pair(Status::kStolen, &taskArray[ownerData.head - 1]);
}
ownerData.head -= 1;
if (ownerData.head == 0) {
if (!ownerData.allStolenCopy) {
ownerData.allStolenCopy = true;
stealerData.allStolen.store(true, std::memory_order_relaxed);
ownerData.workerBunk->haveJobs.fetch_add(-1, std::memory_order_release);
}
} else if (ownerData.head != ownerData.splitCopy)
growShared();
return std::make_pair(Status::kWork, &taskArray[ownerData.head]);
}
void popStolen() {
ownerData.head -= 1;
if (!ownerData.allStolenCopy) {
ownerData.allStolenCopy = true;
stealerData.allStolen.store(true, std::memory_order_relaxed);
ownerData.workerBunk->haveJobs.fetch_add(-1, std::memory_order_release);
}
}
HighsTask* steal() {
if (stealerData.allStolen.load(std::memory_order_relaxed)) return nullptr;
uint64_t ts = stealerData.ts.load(std::memory_order_relaxed);
uint32_t t = tail(ts);
uint32_t s = split(ts);
if (t < s) {
if (stealerData.ts.compare_exchange_weak(ts, makeTailSplit(t + 1, s),
std::memory_order_acquire,
std::memory_order_relaxed))
return &taskArray[t];
t = tail(ts);
s = split(ts);
if (t < s) {
return nullptr;
}
}
if (t < kTaskArraySize && !splitRequest.load(std::memory_order_relaxed))
splitRequest.store(true, std::memory_order_relaxed);
return nullptr;
}
HighsTask* stealWithRetryLoop() {
if (stealerData.allStolen.load(std::memory_order_relaxed)) return nullptr;
uint64_t ts = stealerData.ts.load(std::memory_order_relaxed);
uint32_t t = tail(ts);
uint32_t s = split(ts);
while (t < s) {
if (stealerData.ts.compare_exchange_weak(ts, makeTailSplit(t + 1, s),
std::memory_order_acquire,
std::memory_order_relaxed))
return &taskArray[t];
t = tail(ts);
s = split(ts);
}
if (t < kTaskArraySize && !splitRequest.load(std::memory_order_relaxed))
splitRequest.store(true, std::memory_order_relaxed);
return nullptr;
}
uint32_t selfStealAndGetTail() {
if (ownerData.allStolenCopy) return ownerData.splitCopy;
uint32_t t = tail(stealerData.ts.fetch_add(makeTailSplit(1, 0),
std::memory_order_relaxed));
if (t == ownerData.splitCopy)
stealerData.ts.store(makeTailSplit(t, ownerData.splitCopy),
std::memory_order_relaxed);
return t;
}
HighsTask* randomSteal() {
HighsInt next = ownerData.randgen.integer(ownerData.numWorkers - 1);
next += next >= ownerData.ownerId;
assert(next != ownerData.ownerId);
assert(next >= 0);
assert(next < ownerData.numWorkers);
return ownerData.workers[next]->steal();
}
void injectTaskAndNotify(HighsTask* t) {
stealerData.injectedTask = t;
TSAN_ANNOTATE_HAPPENS_BEFORE(&stealerData.injectedTask);
stealerData.semaphore.release();
}
void notify() { stealerData.semaphore.release(); }
void wait() { stealerData.semaphore.acquire(); }
void runStolenTask(HighsTask* task) {
HighsTask* prevRootTask = ownerData.rootTask;
ownerData.rootTask = task;
uint32_t currentHead = ownerData.head;
try {
HighsSplitDeque* owner = task->run(this);
if (owner) owner->notify();
} catch (const HighsTask::Interrupt&) {
for (uint32_t i = currentHead; i < ownerData.head; ++i)
taskArray[i].cancel();
while (ownerData.head != currentHead) {
std::pair<Status, HighsTask*> popResult = pop();
assert(popResult.first != Status::kEmpty);
if (popResult.first != Status::kStolen) continue;
HighsSplitDeque* stealer = popResult.second->getStealerIfUnfinished();
if (stealer == nullptr) {
popStolen();
continue;
}
int numTries = HighsSchedulerConstants::kNumTryFac;
auto tStart = std::chrono::high_resolution_clock::now();
bool isFinished = popResult.second->isFinished();
while (!isFinished) {
for (int i = 0; i < numTries; ++i) {
HighsSpinMutex::yieldProcessor();
isFinished = popResult.second->isFinished();
if (isFinished) break;
}
if (!isFinished) {
auto numMicroSecs =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now() - tStart)
.count();
if (numMicroSecs < HighsSchedulerConstants::kMicroSecsBeforeSleep)
numTries *= 2;
else {
waitForTaskToFinish(popResult.second, stealer);
break;
}
}
}
popStolen();
}
HighsSplitDeque* owner = task->markAsFinished(this);
if (owner) owner->notify();
}
ownerData.rootTask = prevRootTask;
checkInterrupt();
}
bool leapfrogStolenTask(HighsTask* task, HighsSplitDeque*& stealer) {
bool cancelled;
stealer = task->getStealerIfUnfinished(&cancelled);
if (stealer == nullptr) return true;
if (!cancelled) {
do {
HighsTask* t = stealer->stealWithRetryLoop();
if (t == nullptr) break;
runStolenTask(t);
} while (!task->isFinished());
}
return task->isFinished();
}
void waitForTaskToFinish(HighsTask* t, HighsSplitDeque* stealer) {
std::unique_lock<std::mutex> lg =
stealerData.semaphore.lockMutexForAcquire();
if (!t->requestNotifyWhenFinished(this, stealer)) return;
stealerData.semaphore.acquire(std::move(lg));
}
void yield() {
HighsTask* t = randomSteal();
if (t) runStolenTask(t);
}
int getOwnerId() const { return ownerData.ownerId; }
int getNumWorkers() const { return ownerData.numWorkers; }
int getCurrentHead() const { return ownerData.head; }
HighsSplitDeque* getWorkerById(int id) const {
return ownerData.workers[id].get();
}
};
#endif