package hwstats
import (
"time"
"github.com/frostbyte73/core"
"github.com/prometheus/procfs"
"go.uber.org/atomic"
"github.com/livekit/protocol/logger"
)
type platformCPUMonitor interface {
getCPUIdle() (float64, error)
numCPU() float64
}
type CPUStats struct {
idleCPUs atomic.Float64
platform platformCPUMonitor
idleCallback func(idle float64)
procCallback func(*ProcStats)
warningThrottle core.Throttle
closeChan chan struct{}
}
type ProcMemoryEntry struct {
Name string
Memory int
}
type GroupMemory struct {
Total int
Procs map[int]ProcMemoryEntry
}
type ProcStats struct {
CpuIdle float64
Cpu map[int]float64
MemoryTotal int
Memory map[int]*GroupMemory
}
type procEntry struct {
pid int
ppid int
comm string
rss int
}
func aggregateMemoryStats(entries []procEntry, selfPID, pageSize int) (memoryTotal int, memory map[int]*GroupMemory, groups map[int]int) {
ppids := make(map[int]int)
for _, e := range entries {
if e.pid != selfPID {
ppids[e.pid] = e.ppid
}
}
memory = make(map[int]*GroupMemory)
groups = make(map[int]int)
for _, e := range entries {
pidForGroup := e.pid
for ppids[pidForGroup] != selfPID && ppids[pidForGroup] != 0 {
pidForGroup = ppids[pidForGroup]
}
groups[e.pid] = pidForGroup
mem := e.rss * pageSize
gm := memory[pidForGroup]
if gm == nil {
gm = &GroupMemory{Procs: make(map[int]ProcMemoryEntry)}
memory[pidForGroup] = gm
}
gm.Total += mem
gm.Procs[e.pid] = ProcMemoryEntry{Name: e.comm, Memory: mem}
memoryTotal += mem
}
return
}
func NewCPUStats(idleUpdateCallback func(idle float64)) (*CPUStats, error) {
p, err := newPlatformCPUMonitor()
if err != nil {
return nil, err
}
c := &CPUStats{
platform: p,
warningThrottle: core.NewThrottle(time.Minute),
idleCallback: idleUpdateCallback,
closeChan: make(chan struct{}),
}
go c.monitorCPULoad()
return c, nil
}
func NewProcMonitor(onUpdate func(*ProcStats)) (*CPUStats, error) {
p, err := newPlatformCPUMonitor()
if err != nil {
return nil, err
}
c := &CPUStats{
platform: p,
warningThrottle: core.NewThrottle(time.Minute),
procCallback: onUpdate,
closeChan: make(chan struct{}),
}
go c.monitorProcesses()
return c, nil
}
func (c *CPUStats) GetCPUIdle() float64 {
return c.idleCPUs.Load()
}
func (c *CPUStats) NumCPU() float64 {
return c.platform.numCPU()
}
func (c *CPUStats) GetCPULoad() float64 {
var cpuLoad float64
cpuIdle := c.GetCPUIdle()
nCPU := c.NumCPU()
if nCPU > 0 && cpuIdle > 0 {
cpuLoad = 1 - (cpuIdle / c.NumCPU())
}
return cpuLoad
}
func (c *CPUStats) Stop() {
close(c.closeChan)
}
func (c *CPUStats) monitorCPULoad() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-c.closeChan:
return
case <-ticker.C:
idle, err := c.platform.getCPUIdle()
if err != nil {
logger.Errorw("failed retrieving CPU idle", err)
continue
}
c.idleCPUs.Store(idle)
idleRatio := idle / c.platform.numCPU()
if idleRatio < 0.1 {
c.warningThrottle(func() { logger.Infow("high cpu load", "load", 1-idleRatio) })
}
if c.idleCallback != nil {
c.idleCallback(idle)
}
}
}
}
func (c *CPUStats) monitorProcesses() {
numCPU := c.platform.numCPU()
pageSize := getPageSize()
fs, err := procfs.NewFS(procfs.DefaultMountPoint)
if err != nil {
logger.Errorw("failed to read proc fs", err)
return
}
hostCPU, err := getHostCPUCount(fs)
if err != nil {
logger.Errorw("failed to read pod cpu count", err)
return
}
self, err := fs.Self()
if err != nil {
logger.Errorw("failed to read self", err)
return
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var prevTotalTime float64
var prevStats map[int]procfs.ProcStat
for {
select {
case <-c.closeChan:
return
case <-ticker.C:
procStats := make(map[int]procfs.ProcStat)
procs, err := procfs.AllProcs()
if err != nil {
logger.Errorw("failed to read processes", err)
continue
}
total, err := fs.Stat()
if err != nil {
logger.Errorw("failed to read stats", err)
continue
}
var entries []procEntry
for _, proc := range procs {
stat, err := proc.Stat()
if err != nil {
continue
}
procStats[proc.PID] = stat
entries = append(entries, procEntry{
pid: proc.PID,
ppid: stat.PPID,
comm: stat.Comm,
rss: stat.RSS,
})
}
totalHostTime := total.CPUTotal.Idle + total.CPUTotal.Iowait +
total.CPUTotal.User + total.CPUTotal.Nice + total.CPUTotal.System +
total.CPUTotal.IRQ + total.CPUTotal.SoftIRQ + total.CPUTotal.Steal
memTotal, memory, groups := aggregateMemoryStats(entries, self.PID, pageSize)
stats := &ProcStats{
CpuIdle: numCPU,
Cpu: make(map[int]float64),
MemoryTotal: memTotal,
Memory: memory,
}
for pid, stat := range procStats {
pidForGroup := groups[pid]
procPercentUsage := float64(stat.UTime + stat.STime - prevStats[pid].UTime - prevStats[pid].STime)
if procPercentUsage == 0 {
continue
}
cpu := hostCPU * procPercentUsage / 100 / (totalHostTime - prevTotalTime)
stats.Cpu[pidForGroup] += cpu
stats.CpuIdle -= cpu
}
c.idleCPUs.Store(stats.CpuIdle)
if c.procCallback != nil {
c.procCallback(stats)
}
prevTotalTime = totalHostTime
prevStats = procStats
}
}
}