use std::sync::{Arc, atomic::AtomicBool};
use crate::{define::*, gop_cache::*, packet::*, traits::ISource};
use anyhow::Result;
use better_tokio_select::tokio_select;
use bytes::Bytes;
type ChunkerInner = cyberex::buf_pro::chunk::Chunker<u8>;
type Chunker = Option<ChunkerInner>;
pub struct SourceG711 {
cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceG711 {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn get_chunker_size(sample_rate: i32) -> usize {
(sample_rate as usize / 8000) * 320
}
fn down_sample<'a>(payload: &'a Bytes, audio: &Audio) -> Vec<&'a [u8]> {
let chunk_size = ((audio.bits / 8) * (audio.sample_rate / ES_G711_DF_SAMPLE_RATE)) as usize;
payload.chunks(chunk_size).map(|c| &c[..2]).collect()
}
fn process_data(config: &SSConfig, chunker: &mut Chunker, cache: &mut GopCache, pkt: PacketEs) {
let Some((payload, mut audio)) = pkt.into_g711a() else {
return;
};
if config.source_g711a_forceSampleRate8000 && audio.sample_rate != ES_G711_DF_SAMPLE_RATE {
for payload in down_sample(&payload, &audio) {
audio.sample_rate = ES_G711_DF_SAMPLE_RATE;
chunk(chunker, payload, &audio, cache);
}
return;
}
chunk(chunker, &payload, &audio, cache);
}
fn chunk(chunker: &mut Chunker, payload: &[u8], audio: &Audio, cache: &mut GopCache) {
let chunker = chunker.get_or_insert_with(|| ChunkerInner::new(get_chunker_size(audio.sample_rate)));
for payload in chunker.chunk(payload) {
let payload = Bytes::copy_from_slice(payload);
let info = EsInfo::Audio(audio.to_owned());
cache.input_packet(vec![PacketEs::new(payload, info)], true);
}
}
fn process_sub(cache: &mut GopCache, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
},
}
}
impl SourceG711 {
pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
let config = config.clone();
let mut cache = GopCache::new_from_config(&config, is_idle, false);
let (cmd_tx, mut cmd_rx) = Sub::actor();
let mut chunker = None;
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => process_data(&config, &mut chunker, &mut cache, pkt),
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut cache, ret),
None => break,
}
},
});
}
});
Ok(Self { cmd_tx })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_case() {
assert_eq!(get_chunker_size(8000), 320);
assert_eq!(get_chunker_size(16000), 640);
assert_eq!(get_chunker_size(44100), 1600);
}
}