package utils
import (
"sync"
"time"
"github.com/frostbyte73/core"
"google.golang.org/protobuf/proto"
)
type ProtoProxy[T proto.Message] struct {
message T
updateFn func() T
fuse core.Fuse
updateChan chan struct{}
awaitChan chan struct{}
done chan struct{}
queueUpdate chan struct{}
dirty bool
refreshedAt time.Time
refreshInterval time.Duration
lock sync.RWMutex
}
func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T] {
p := &ProtoProxy[T]{
updateChan: make(chan struct{}, 1),
updateFn: updateFn,
done: make(chan struct{}),
refreshInterval: refreshInterval,
queueUpdate: make(chan struct{}, 1),
}
p.performUpdate(true, time.Now())
if refreshInterval > 0 {
go p.worker()
}
return p
}
func (p *ProtoProxy[T]) MarkDirty(immediate bool) <-chan struct{} {
p.lock.Lock()
p.dirty = true
shouldUpdate := immediate || time.Since(p.refreshedAt) > p.refreshInterval
if p.awaitChan == nil {
p.awaitChan = make(chan struct{})
}
awaitChan := p.awaitChan
p.lock.Unlock()
if shouldUpdate {
select {
case p.queueUpdate <- struct{}{}:
default:
}
}
return awaitChan
}
func (p *ProtoProxy[T]) Updated() <-chan struct{} {
return p.updateChan
}
func (p *ProtoProxy[T]) Get() T {
p.lock.RLock()
defer p.lock.RUnlock()
return CloneProto(p.message)
}
func (p *ProtoProxy[T]) Stop() {
p.fuse.Break()
if p.refreshInterval > 0 {
<-p.done
}
p.lock.Lock()
defer p.lock.Unlock()
if awaitChan := p.awaitChan; awaitChan != nil {
p.awaitChan = nil
close(awaitChan)
}
}
func (p *ProtoProxy[T]) performUpdate(skipNotify bool, refreshTime time.Time) {
p.lock.Lock()
p.dirty = false
if awaitChan := p.awaitChan; awaitChan != nil {
p.awaitChan = nil
defer close(awaitChan)
}
p.lock.Unlock()
msg := p.updateFn()
p.lock.Lock()
if proto.Equal(p.message, msg) {
p.lock.Unlock()
return
}
p.message = msg
p.refreshedAt = refreshTime
p.lock.Unlock()
if !skipNotify {
select {
case p.updateChan <- struct{}{}:
default:
}
}
}
func (p *ProtoProxy[T]) worker() {
ticker := time.NewTicker(p.refreshInterval)
defer ticker.Stop()
defer close(p.done)
for {
select {
case <-p.fuse.Watch():
return
case now := <-ticker.C:
p.lock.RLock()
shouldUpdate := p.dirty && time.Since(p.refreshedAt) >= p.refreshInterval
p.lock.RUnlock()
if shouldUpdate {
p.performUpdate(false, now)
}
case <-p.queueUpdate:
p.performUpdate(false, time.Now())
}
}
}