package mem_ring
import (
"sync"
"sync/atomic"
"unsafe"
"github.com/edwingeng/deque/v2"
)
type QueueMeta struct {
BufferPtr uintptr
BufferLen uintptr
HeadPtr uintptr
TailPtr uintptr
WorkingPtr uintptr
StuckPtr uintptr
WorkingFd int32
UnstuckFd int32
}
type Queue[T any] struct {
bufferPtr unsafe.Pointer
bufferLen uintptr
headPtr *uintptr
tailPtr *uintptr
workingPtr *uint32
stuckPtr *uint32
workingFd int32
unstuckFd int32
}
type ReadQueue[T any] struct {
q Queue[T]
unstuckNotifier Notifier
}
type WriteQueue[T any] struct {
q Queue[T]
Lock *sync.Mutex
pendingTasks *deque.Deque[T]
workingNotifier Notifier
}
func NewQueue[T any](meta QueueMeta) Queue[T] {
return Queue[T]{
bufferPtr: unsafe.Pointer(meta.BufferPtr),
bufferLen: meta.BufferLen,
headPtr: (*uintptr)(unsafe.Pointer(meta.HeadPtr)),
tailPtr: (*uintptr)(unsafe.Pointer(meta.TailPtr)),
workingPtr: (*uint32)(unsafe.Pointer(meta.WorkingPtr)),
stuckPtr: (*uint32)(unsafe.Pointer(meta.StuckPtr)),
workingFd: meta.WorkingFd,
unstuckFd: meta.UnstuckFd,
}
}
func (q *Queue[T]) push(item T) bool {
t_size := unsafe.Sizeof(item)
tail := atomic.LoadUintptr(q.tailPtr)
head := atomic.LoadUintptr(q.headPtr)
if tail-head == q.bufferLen {
return false
}
ptr := unsafe.Add(q.bufferPtr, (tail%q.bufferLen)*t_size)
*(*T)(ptr) = item
atomic.AddUintptr(q.tailPtr, 1)
return true
}
func (q *Queue[T]) pop() *T {
var _t T
t_size := unsafe.Sizeof(_t)
tail := atomic.LoadUintptr(q.tailPtr)
head := atomic.LoadUintptr(q.headPtr)
if tail == head {
return nil
}
ptr := unsafe.Add(q.bufferPtr, (head%q.bufferLen)*t_size)
item := *(*T)(ptr)
atomic.AddUintptr(q.headPtr, 1)
return &item
}
func (q *Queue[T]) isEmpty() bool {
return atomic.LoadUintptr(q.tailPtr) == atomic.LoadUintptr(q.headPtr)
}
func (q *Queue[T]) isFull() bool {
return atomic.LoadUintptr(q.tailPtr)-atomic.LoadUintptr(q.headPtr) == q.bufferLen
}
func (q *Queue[T]) markWorking() {
atomic.StoreUint32(q.workingPtr, 1)
}
func (q *Queue[T]) markUnworking() bool {
atomic.StoreUint32(q.workingPtr, 0)
if q.isEmpty() {
return true
}
q.markWorking()
return false
}
func (q *Queue[T]) working() bool {
return atomic.LoadUint32(q.workingPtr) == 1
}
func (q *Queue[T]) markStuck() {
atomic.StoreUint32(q.stuckPtr, 1)
}
func (q *Queue[T]) markUnstuck() {
atomic.StoreUint32(q.stuckPtr, 0)
}
func (q *Queue[T]) stuck() bool {
return atomic.LoadUint32(q.stuckPtr) == 1
}
func (q Queue[T]) Read() ReadQueue[T] {
unstuckNotifier := NewNotifier(q.unstuckFd)
return ReadQueue[T]{
q,
unstuckNotifier,
}
}
func (q Queue[T]) Write() WriteQueue[T] {
awaiter := NewAwaiter(q.unstuckFd)
wq := WriteQueue[T]{
q: q,
Lock: &sync.Mutex{},
pendingTasks: deque.NewDeque[T](),
workingNotifier: NewNotifier(q.workingFd),
}
go func() {
for {
wq.Lock.Lock()
for item, ok := wq.pendingTasks.TryPopFront(); ok; item, ok = wq.pendingTasks.TryPopFront() {
if !wq.q.push(item) {
wq.pendingTasks.PushFront(item)
break
}
}
if !wq.q.working() {
wq.q.markWorking()
wq.workingNotifier.Notify()
}
if !wq.pendingTasks.IsEmpty() {
wq.q.markStuck()
if !wq.q.isFull() {
continue
}
}
wq.Lock.Unlock()
awaiter.Wait()
}
}()
return wq
}
func (rq *ReadQueue[T]) RunHandler(handler func(T), w ...TinyWaiter) {
var waiter TinyWaiter
if len(w) == 0 {
waiter = &GoSchedWaiter{}
} else {
waiter = w[0]
}
go func() {
awaiter := NewAwaiter(rq.q.workingFd)
rq.q.markWorking()
c: for {
for item := rq.q.pop(); item != nil; item = rq.q.pop() {
handler(*item)
}
waiter.Reset()
for {
stop_wait := waiter.Wait()
if !rq.q.isEmpty() || !rq.q.markUnworking() {
continue c
}
if stop_wait {
break
}
}
awaiter.Wait()
rq.q.markWorking()
}
}()
}
func (wq *WriteQueue[T]) Push(item T) {
wq.Lock.Lock()
if wq.q.push(item) {
if !wq.q.working() {
wq.q.markWorking()
wq.Lock.Unlock()
wq.workingNotifier.Notify()
return
}
} else {
wq.q.markStuck()
wq.pendingTasks.PushBack(item)
}
wq.Lock.Unlock()
}