use std::{
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use crate::{
define::*,
gop_cache::GopCache,
packet::*,
relay::SourceRelay,
traits::ISource,
utils::{DataInstant, ElapsedState},
};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use th_rs::define::*;
use crate::cpu_bound::AACEncoder;
use xux_rs::{define::*, muxer::Muxer};
const SCHEME: SSScheme = SSScheme::Fmp4;
pub struct SourceFmp4 {
cmd_tx: SubCaller,
}
type ThisMuxer = Muxer;
struct Ctx {
muxer: Option<Pin<Box<ThisMuxer>>>,
cache: GopCache,
encoder: Option<AACEncoder>,
config: SSConfig,
got_data_time: DataInstant,
is_probe: bool,
probe_cache: Vec<PacketEs>,
once_audio_warn: AtomicBool,
source_relay: SourceRelay,
}
#[async_trait::async_trait]
impl ISource for SourceFmp4 {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
if ctx.is_probe {
return Ok(ProbeState::Already);
}
ctx.probe_cache.push(pkt.clone());
let mut goto_probe = false;
if ctx.config.source_enable_audio {
if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
tracing::warn!(
"Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
SCHEME,
ctx.got_data_time.timeout()
);
goto_probe = true;
}
} else {
goto_probe = true;
}
if pkt.is_audio_packet() {
goto_probe = true;
}
if !goto_probe {
bail!("Continue");
}
probe(ctx)?;
ctx.is_probe = true;
let probe_cache = std::mem::take(&mut ctx.probe_cache);
if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
tracing::warn!(
"probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
probe_cache.len(),
ctx.config.source_client_jitter_max
);
}
for pkt in probe_cache {
let _ = update_detail(ctx, pkt);
}
Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
let (video, payload) = ctx
.probe_cache
.iter()
.find(|e| e.is_video_key_packet())
.ok_or_else(|| anyhow!("Can not wait any video key frame"))?
.clone()
.into_video();
let muxer = match &mut ctx.muxer {
Some(muxer) => muxer,
None => ctx
.muxer
.insert(ThisMuxer::new_with_name("fmp4Muxer", ctx.source_relay.muxer_config())?),
};
let _ = mux_write_video(muxer, true, video, payload);
if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
let (audio, payload) = first_audio.to_owned().into_audio();
let _ = mux_write_audio(
muxer,
true,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
payload,
);
}
match muxer.get_init_data() {
Ok(init_data) => {
let es = PacketEs::new(init_data, EsInfo::Fmp4);
ctx.cache.set_init_data(es);
},
Err(e) => {
tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
},
}
Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
match make_sure_probe(ctx, &pkt)? {
ProbeState::Success => Ok(()),
ProbeState::Already => update_detail(ctx, pkt),
}
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
let (payload, info) = pkt.into_splite();
match info {
EsInfo::Video(video) => updata_video(ctx, video, payload),
EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
EsInfo::Com => Ok(()),
_ => Ok(()),
}
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = ctx.cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow!("No support yet"));
},
}
}
fn mux_write_video(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
Video {
frame_type,
codec,
width,
height,
fps,
time_stamp: _,
}: Video,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let info = CESStreamInfo {
streamType: if is_probe {
TH_DATATYPE_TH_DT_EXTRA_VIDEO
} else {
TH_DATATYPE_TH_DT_VIDEO
} as _,
vCodecType: codec as _,
frameType: frame_type as _,
frameRate: fps as _,
video_height: height,
video_width: width,
..Default::default()
};
let muxeds = muxer.write(&payload, &info)?;
let mut outs = Vec::new();
for muxed in muxeds {
let info = EsInfo::Fmp4;
let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
outs.push(es);
}
Ok(outs)
}
fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let is_key = video.frame_type == FrameType::I;
let out = mux_write_video(muxer, false, video, payload)?;
ctx.cache.input_packet(out, is_key);
Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
if ctx.config.source_fmp4_audioForceAAC && audio.codec != AudioCodec::Aac {
ctx.encoder.get_or_insert_with(AACEncoder::new);
}
let aac = match &mut ctx.encoder {
Some(encoder) => Some(encoder.write(&audio, payload)?),
None => {
if audio.codec == AudioCodec::Aac {
Some(payload)
} else {
None
}
},
};
let Some(aac) = aac else {
if !ctx.once_audio_warn.swap(true, Ordering::Relaxed) {
tracing::warn!("Fmp4 audio only support aac, but got {:?}", audio);
}
return Ok(());
};
let outs = mux_write_audio(
muxer,
false,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
aac,
)?;
ctx.cache.input_packet(outs, false);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
_codec: AudioCodec, sample_rate: i32,
channels: i32,
bits: i32,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let info = CESStreamInfo {
streamType: if is_probe {
TH_DATATYPE_TH_DT_EXTRA_AUDIO
} else {
TH_DATATYPE_TH_DT_AUDIO
} as _,
aCodecType: TH_AUDIOCODECTYPE_TH_ACT_AAC as _,
aSampleRate: sample_rate,
aChannels: channels,
aSampleBits: bits,
..Default::default()
};
let muxeds = muxer.write(&payload, &info)?;
let mut outs = Vec::new();
for muxed in muxeds {
let info = EsInfo::Fmp4;
let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
outs.push(es);
}
Ok(outs)
}
impl SourceFmp4 {
pub fn new(
source_relay: &SourceRelay,
config: &SSConfig,
is_idle: Arc<AtomicBool>,
mut data_rx: SourceEleReceiver,
) -> Result<Self> {
let cache = GopCache::new_from_config(config, is_idle, true);
let config = config.clone();
let (cmd_tx, mut cmd_rx) = Sub::actor();
let mut ctx = Ctx {
muxer: None,
cache,
encoder: None,
config: config.to_owned(),
got_data_time: DataInstant::new(*config.source_wait_audio),
probe_cache: Vec::new(),
is_probe: false,
once_audio_warn: AtomicBool::new(false),
source_relay: source_relay.clone(),
};
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => {
let _ = process_data(&mut ctx, pkt);
},
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut ctx, ret),
None => break,
}
},
});
}
});
Ok(Self { cmd_tx })
}
}