package queue
import (
"container/list"
"sync"
)
type ConcurrentQueue struct {
started sync.Once
stopped sync.Once
chanIn chan interface{}
chanOut chan interface{}
overflow *list.List
wg sync.WaitGroup
quit chan struct{}
}
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
return &ConcurrentQueue{
chanIn: make(chan interface{}),
chanOut: make(chan interface{}, bufferSize),
overflow: list.New(),
quit: make(chan struct{}),
}
}
func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
return cq.chanIn
}
func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
return cq.chanOut
}
func (cq *ConcurrentQueue) Start() {
cq.started.Do(cq.start)
}
func (cq *ConcurrentQueue) start() {
cq.wg.Add(1)
go func() {
defer cq.wg.Done()
readLoop:
for {
nextElement := cq.overflow.Front()
if nextElement == nil {
select {
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
select {
case cq.chanOut <- item:
default:
cq.overflow.PushBack(item)
}
case <-cq.quit:
return
}
} else {
select {
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
cq.overflow.PushBack(item)
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
}
}
nextElement := cq.overflow.Front()
for nextElement != nil {
select {
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
nextElement = cq.overflow.Front()
}
close(cq.chanOut)
}()
}
func (cq *ConcurrentQueue) Stop() {
cq.stopped.Do(func() {
close(cq.quit)
cq.wg.Wait()
})
}