package utils
import (
"fmt"
"os"
"github.com/fsnotify/fsnotify"
"go.uber.org/atomic"
"gopkg.in/yaml.v3"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/events"
)
type ConfigBuilder[T any] interface {
New() (*T, error)
}
type ConfigDefaulter[T any] interface {
InitDefaults(*T) error
}
type ConfigObserver[T any] struct {
builder ConfigBuilder[T]
watcher *fsnotify.Watcher
observers *events.ObserverList[*T]
conf atomic.Pointer[T]
}
func NewConfigObserver[T any](path string, builder ConfigBuilder[T]) (*ConfigObserver[T], *T, error) {
c := &ConfigObserver[T]{
builder: builder,
observers: events.NewObserverList[*T](events.WithBlocking()),
}
conf, err := c.load(path)
if err != nil {
return nil, nil, err
}
if path != "" {
c.watcher, err = fsnotify.NewWatcher()
if err != nil {
return nil, nil, err
}
if err := c.watcher.Add(path); err != nil {
c.watcher.Close()
return nil, nil, err
}
go c.watch()
}
return c, conf, nil
}
func (c *ConfigObserver[T]) Close() {
if c != nil && c.watcher != nil {
c.watcher.Close()
}
}
func (c *ConfigObserver[T]) EmitConfigUpdate(conf *T) {
c.observers.Emit(conf)
}
func (c *ConfigObserver[T]) Observe(cb func(*T)) func() {
if c == nil {
return func() {}
}
return c.observers.On(cb)
}
func (c *ConfigObserver[T]) Load() *T {
return c.conf.Load()
}
func (c *ConfigObserver[T]) watch() {
for {
select {
case event, ok := <-c.watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Remove) {
if err := c.watcher.Add(event.Name); err != nil {
logger.Errorw("unable to rewatch config file", err, "file", event.Name)
}
}
if event.Has(fsnotify.Write | fsnotify.Remove) {
if err := c.reload(event.Name); err != nil {
logger.Errorw("unable to update config file", err, "file", event.Name)
} else {
logger.Infow("config file has been updated", "file", event.Name)
}
}
case err, ok := <-c.watcher.Errors:
if !ok {
return
}
logger.Errorw("config file watcher error", err)
}
}
}
func (c *ConfigObserver[T]) reload(path string) error {
conf, err := c.load(path)
if err != nil {
return err
}
c.EmitConfigUpdate(conf)
return nil
}
func (c *ConfigObserver[T]) load(path string) (*T, error) {
conf, err := c.builder.New()
if err != nil {
return nil, err
}
if path != "" {
b, err := os.ReadFile(path)
if err != nil {
return nil, err
}
if len(b) == 0 {
return nil, fmt.Errorf("cannot parse config: file empty")
}
if err := yaml.Unmarshal(b, conf); err != nil {
return nil, fmt.Errorf("cannot parse config: %v", err)
}
}
if d, ok := c.builder.(ConfigDefaulter[T]); ok {
d.InitDefaults(conf)
}
c.conf.Store(conf)
return conf, err
}