package pool
import (
"errors"
"sync"
"time"
)
var ErrWorkerPoolExiting = errors.New("worker pool exiting")
const DefaultWorkerTimeout = 90 * time.Second
type (
WorkerState interface {
Reset()
Cleanup()
}
WorkerConfig struct {
NewWorkerState func() WorkerState
NumWorkers int
WorkerTimeout time.Duration
}
Worker struct {
started sync.Once
stopped sync.Once
cfg *WorkerConfig
requests chan *request
work chan *request
workerSem chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
request struct {
fn func(WorkerState) error
errChan chan error
}
)
func NewWorker(cfg *WorkerConfig) *Worker {
return &Worker{
cfg: cfg,
requests: make(chan *request),
workerSem: make(chan struct{}, cfg.NumWorkers),
work: make(chan *request),
quit: make(chan struct{}),
}
}
func (w *Worker) Start() error {
w.started.Do(func() {
w.wg.Add(1)
go w.requestHandler()
})
return nil
}
func (w *Worker) Stop() error {
w.stopped.Do(func() {
close(w.quit)
w.wg.Wait()
})
return nil
}
func (w *Worker) Submit(fn func(WorkerState) error) error {
req := &request{
fn: fn,
errChan: make(chan error, 1),
}
select {
case w.requests <- req:
case w.work <- req:
case <-w.quit:
return ErrWorkerPoolExiting
}
select {
case err := <-req.errChan:
return err
case <-w.quit:
return ErrWorkerPoolExiting
}
}
func (w *Worker) requestHandler() {
defer w.wg.Done()
for {
select {
case req := <-w.requests:
select {
case w.workerSem <- struct{}{}:
w.wg.Add(1)
go w.spawnWorker(req)
case w.work <- req:
case <-w.quit:
return
}
case <-w.quit:
return
}
}
}
func (w *Worker) spawnWorker(req *request) {
defer w.wg.Done()
defer func() { <-w.workerSem }()
state := w.cfg.NewWorkerState()
defer state.Cleanup()
req.errChan <- req.fn(state)
var t *time.Timer
for {
state.Reset()
select {
case req := <-w.work:
req.errChan <- req.fn(state)
continue
case <-w.quit:
return
default:
}
if t != nil {
t.Reset(w.cfg.WorkerTimeout)
} else {
t = time.NewTimer(w.cfg.WorkerTimeout)
}
select {
case req := <-w.work:
req.errChan <- req.fn(state)
if !t.Stop() {
<-t.C
}
case <-t.C:
return
case <-w.quit:
return
}
}
}