import numpy as np
from ruopus import Bandwidth, OpusDecoder, OpusEncoder
SR = 48000
FRAME = 960
def make_frame(i: int) -> np.ndarray:
t = (np.arange(FRAME) + i * FRAME) / SR
return (0.3 * np.sin(2 * np.pi * (300 + 40 * i) * t)).astype(np.float32).reshape(FRAME, 1)
def _maxcorr(a: np.ndarray, b: np.ndarray) -> float:
a, b = a.reshape(-1), b.reshape(-1)
n = min(len(a), len(b))
a, b = a[:n], b[:n]
best = 0.0
for lag in range(-200, 201, 4):
x, y = a[max(0, lag) : n + min(0, lag)], b[max(0, -lag) : n + min(0, -lag)]
if len(x) > 100 and x.std() > 1e-6 and y.std() > 1e-6:
best = max(best, abs(float(np.corrcoef(x, y)[0, 1])))
return best
def main() -> None:
enc = OpusEncoder(1, bitrate=24000, inband_fec=True, packet_loss_perc=30)
enc.bandwidth = Bandwidth.WideBand
packets = [enc.encode_silk(make_frame(i)) for i in range(5)]
reference_frame2 = None
d = OpusDecoder(1)
for i, p in enumerate(packets):
out = d.decode_packet(p)
if i == 2:
reference_frame2 = out
fec_dec = OpusDecoder(1)
fec_dec.decode_packet(packets[0])
fec_dec.decode_packet(packets[1])
recovered = fec_dec.decode_fec(packets[3], FRAME)
con_dec = OpusDecoder(1)
con_dec.decode_packet(packets[0])
con_dec.decode_packet(packets[1])
concealed = con_dec.decode_lost(FRAME)
fec_corr = _maxcorr(recovered, reference_frame2)
con_corr = _maxcorr(concealed, reference_frame2)
print(f"FEC recovery -> {recovered.shape}, correlation with the lost frame: {fec_corr:.3f}")
print(f"PLC concealment -> {concealed.shape}, correlation with the lost frame: {con_corr:.3f}")
print(f"FEC reconstructs the lost frame ({fec_corr:.2f}); concealment can only guess ({con_corr:.2f}).")
if __name__ == "__main__":
main()