package events
import (
"sync"
"golang.org/x/exp/maps"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/options"
)
const DefaultQueueSize = 16
type Options struct {
QueueSize int
Blocking bool
Logger logger.Logger
}
type Option func(o *Options)
func WithQueueSize(size int) Option {
return func(o *Options) {
o.QueueSize = size
}
}
func WithBlocking() Option {
return func(o *Options) {
o.Blocking = true
}
}
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
func WithOptions(src Options) Option {
return func(o *Options) {
*o = src
}
}
func DefaultOptions() Options {
return Options{
QueueSize: DefaultQueueSize,
Logger: logger.GetLogger(),
}
}
type Emitter[K comparable, V any] struct {
options Options
mu sync.RWMutex
observers map[K]*ObserverList[V]
}
func NewEmitter[K comparable, V any](opts ...Option) *Emitter[K, V] {
o := DefaultOptions()
options.Apply(&o, opts)
return &Emitter[K, V]{
options: o,
observers: map[K]*ObserverList[V]{},
}
}
func (e *Emitter[K, V]) Emit(k K, v V) {
e.mu.RLock()
l, ok := e.observers[k]
e.mu.RUnlock()
if !ok {
return
}
l.Emit(v)
}
type eventEmitterObserver[K comparable, V any] struct {
e *Emitter[K, V]
k K
Observer[V]
}
func (o *eventEmitterObserver[K, V]) Stop() {
o.Observer.Stop()
o.e.cleanUpObserverList(o.k)
}
func (e *Emitter[K, V]) On(k K, f func(V)) func() {
e.mu.Lock()
o := e.getOrCreateObserverList(k).on(f)
e.mu.Unlock()
return (&eventEmitterObserver[K, V]{e, k, o}).Stop
}
func (e *Emitter[K, V]) Notify(k K, ch chan V) func() {
return e.observe(k, ch).Stop
}
func (e *Emitter[K, V]) Observe(k K) Observer[V] {
return e.observe(k, make(chan V, e.options.QueueSize))
}
func (e *Emitter[K, V]) observe(k K, ch chan V) *eventEmitterObserver[K, V] {
e.mu.Lock()
o := e.getOrCreateObserverList(k).observe(ch)
e.mu.Unlock()
return &eventEmitterObserver[K, V]{e, k, o}
}
func (e *Emitter[K, V]) getOrCreateObserverList(k K) *ObserverList[V] {
l, ok := e.observers[k]
if !ok {
l = newObserverList[V](e.options)
e.observers[k] = l
}
return l
}
func (e *Emitter[K, V]) ObservedKeys() []K {
e.mu.Lock()
defer e.mu.Unlock()
return maps.Keys(e.observers)
}
func (e *Emitter[K, V]) cleanUpObserverList(k K) {
e.mu.Lock()
defer e.mu.Unlock()
l, ok := e.observers[k]
if ok && l.Len() == 0 {
delete(e.observers, k)
}
}
type ObserverList[V any] struct {
options Options
mu sync.RWMutex
observers []*eventObserverListObserver[V]
}
func NewObserverList[V any](opts ...Option) *ObserverList[V] {
o := DefaultOptions()
options.Apply(&o, opts)
return newObserverList[V](o)
}
func newObserverList[V any](options Options) *ObserverList[V] {
return &ObserverList[V]{
options: options,
}
}
func (l *ObserverList[V]) Len() int {
l.mu.RLock()
defer l.mu.RUnlock()
return len(l.observers)
}
type eventObserverListObserver[V any] struct {
l *ObserverList[V]
Observer[V]
index int
}
func (o *eventObserverListObserver[V]) Stop() {
o.Observer.Stop()
o.l.stopObserving(o)
}
func (l *ObserverList[V]) On(f func(V)) func() {
return l.on(f).Stop
}
func (l *ObserverList[V]) on(f func(V)) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}
if l.options.Blocking {
o.Observer = blockingCallback[V](f)
} else {
o.Observer = nonblockingCallback[V](f)
}
l.startObserving(o)
return o
}
func (l *ObserverList[V]) Notify(ch chan V) func() {
return l.observe(ch).Stop
}
func (l *ObserverList[V]) Observe() Observer[V] {
return l.observe(make(chan V, l.options.QueueSize))
}
func (l *ObserverList[V]) observe(ch chan V) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}
if l.options.Blocking {
o.Observer = &blockingObserver[V]{
done: make(chan struct{}),
ch: ch,
}
} else {
o.Observer = &nonblockingObserver[V]{
logger: l.options.Logger,
ch: ch,
}
}
l.startObserving(o)
return o
}
func (l *ObserverList[V]) Emit(v V) {
l.mu.RLock()
defer l.mu.RUnlock()
for _, o := range l.observers {
o.emit(v)
}
}
func (l *ObserverList[V]) startObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
o.index = len(l.observers)
l.observers = append(l.observers, o)
}
func (l *ObserverList[V]) stopObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
l.observers[o.index] = l.observers[len(l.observers)-1]
l.observers[o.index].index = o.index
l.observers[len(l.observers)-1] = nil
l.observers = l.observers[:len(l.observers)-1]
}
type Observer[V any] interface {
emit(v V)
Stop()
Events() <-chan V
}
type eventObserver[V any] struct {
stopFunc func()
Observer[V]
}
func (o *eventObserver[V]) Stop() {
o.stopFunc()
o.Observer.Stop()
}
func NewObserver[V any](stopFunc func()) (Observer[V], func(v V)) {
o := &nonblockingObserver[V]{
logger: logger.GetLogger(),
ch: make(chan V, DefaultQueueSize),
}
return &eventObserver[V]{stopFunc, o}, o.emit
}
type nonblockingObserver[V any] struct {
logger logger.Logger
ch chan V
}
func (o *nonblockingObserver[V]) emit(v V) {
select {
case o.ch <- v:
default:
o.logger.Warnw("could not add event to observer queue", nil)
}
}
func (o *nonblockingObserver[V]) Stop() {}
func (o *nonblockingObserver[V]) Events() <-chan V {
return o.ch
}
type blockingObserver[V any] struct {
done chan struct{}
ch chan V
}
func (o *blockingObserver[V]) emit(v V) {
select {
case o.ch <- v:
case <-o.done:
}
}
func (o *blockingObserver[V]) Stop() {
close(o.done)
}
func (o *blockingObserver[V]) Events() <-chan V {
return o.ch
}
type nonblockingCallback[V any] func(V)
func (o nonblockingCallback[V]) emit(v V) {
go o(v)
}
func (o nonblockingCallback[V]) Stop() {}
func (o nonblockingCallback[V]) Events() <-chan V { return nil }
type blockingCallback[V any] func(V)
func (o blockingCallback[V]) emit(v V) {
o(v)
}
func (o blockingCallback[V]) Stop() {}
func (o blockingCallback[V]) Events() <-chan V { return nil }