package utils
import (
"runtime"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/livekit/protocol/livekit"
)
func TestProtoProxy(t *testing.T) {
t.Run("basics", func(t *testing.T) {
numGoRoutines := runtime.NumGoroutine()
proxy, numParticipants, freeze := createTestProxy()
select {
case <-proxy.Updated():
t.Fatal("should not have received an update")
default:
}
require.EqualValues(t, 0, proxy.Get().NumParticipants)
<-proxy.MarkDirty(true)
awaitProxyUpdate(t, proxy)
require.EqualValues(t, 2, numParticipants.Load())
require.EqualValues(t, 1, proxy.Get().NumParticipants)
proxy.MarkDirty(false)
assertNoProxyUpdate(t, proxy, 5*time.Millisecond)
require.EqualValues(t, 1, proxy.Get().NumParticipants)
freeze.Store(true)
awaitProxyUpdate(t, proxy)
require.EqualValues(t, 2, proxy.Get().NumParticipants)
proxy.MarkDirty(false)
assertNoProxyUpdate(t, proxy, 100*time.Millisecond)
require.EqualValues(t, 2, proxy.Get().NumParticipants)
proxy.Stop()
for range 10 {
if runtime.NumGoroutine() <= numGoRoutines {
break
}
time.Sleep(100 * time.Millisecond)
}
require.LessOrEqual(t, runtime.NumGoroutine(), numGoRoutines)
})
t.Run("await next update after marking dirty", func(t *testing.T) {
proxy, _, _ := createTestProxy()
require.EqualValues(t, 0, proxy.Get().NumParticipants)
<-proxy.MarkDirty(true)
require.EqualValues(t, 1, proxy.Get().NumParticipants)
})
t.Run("await resolves when proxy is stopped", func(t *testing.T) {
proxy, _, _ := createTestProxy()
done := proxy.MarkDirty(true)
proxy.Stop()
<-done
})
t.Run("multiple awaits resolve for one update", func(t *testing.T) {
proxy, _, _ := createTestProxy()
done0 := proxy.MarkDirty(false)
done1 := proxy.MarkDirty(true)
<-done0
<-done1
require.EqualValues(t, 1, proxy.Get().NumParticipants)
})
t.Run("await resolve when there is no change", func(t *testing.T) {
proxy := NewProtoProxy(10*time.Millisecond, func() *livekit.Room { return nil })
done := proxy.MarkDirty(true)
time.Sleep(100 * time.Millisecond)
select {
case <-done:
default:
t.FailNow()
}
})
}
func awaitProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room]) {
t.Helper()
select {
case <-proxy.Updated():
case <-time.After(250 * time.Millisecond):
require.FailNow(t, "timed out waiting for proxy update")
}
}
func assertNoProxyUpdate(t *testing.T, proxy *ProtoProxy[*livekit.Room], d time.Duration) {
t.Helper()
select {
case <-proxy.Updated():
require.FailNow(t, "should not have received an update")
case <-time.After(d):
}
}
func createTestProxy() (*ProtoProxy[*livekit.Room], *atomic.Uint32, *atomic.Bool) {
var numParticipants atomic.Uint32
var freeze atomic.Bool
return NewProtoProxy(
10*time.Millisecond,
func() *livekit.Room {
if !freeze.Load() {
defer numParticipants.Add(1)
}
return &livekit.Room{
NumParticipants: numParticipants.Load(),
}
},
), &numParticipants, &freeze
}