use crate::cpu_bound::AACEncoder;
use crate::define::*;
use crate::gop_cache::GopCache;
use crate::packet::{Audio, AudioCodec, EsInfo, FrameType, PacketEs, Video};
use crate::traits::ISource;
use crate::utils::{DataInstant, ElapsedState};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use xux_rs::define::{CESStreamInfo, MpegtsMuxerConfigBuilder};
use xux_rs::muxer::Muxer;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
const SCHEME: SSScheme = SSScheme::MpegTs;
type ThisMuxer = Muxer;
pub struct Ctx {
muxer: Option<Pin<Box<ThisMuxer>>>,
cache: GopCache,
encoder: Option<AACEncoder>,
config: SSConfig,
probe_cache: Vec<PacketEs>,
is_probe: bool,
got_data_time: DataInstant,
}
pub struct SourceTs {
cmd_tx: SubCaller,
}
#[async_trait::async_trait]
impl ISource for SourceTs {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
match make_sure_probe(ctx, &pkt)? {
ProbeState::Success => Ok(()),
ProbeState::Already => update_detail(ctx, pkt),
}
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = ctx.cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow::anyhow!("No support yet"));
},
}
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
if pkt.is_mpegts() {
return Ok(ProbeState::Already);
}
if ctx.is_probe {
return Ok(ProbeState::Already);
}
ctx.probe_cache.push(pkt.clone());
let mut goto_probe = false;
if ctx.config.source_enable_audio {
if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
tracing::warn!(
"Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
SCHEME,
ctx.got_data_time.timeout()
);
goto_probe = true;
}
} else {
goto_probe = true;
}
if pkt.is_audio_packet() {
goto_probe = true;
}
if !goto_probe {
bail!("Continue");
}
probe(ctx)?;
ctx.is_probe = true;
let probe_cache = std::mem::take(&mut ctx.probe_cache);
if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
tracing::warn!(
"probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
probe_cache.len(),
ctx.config.source_client_jitter_max
);
}
for pkt in probe_cache {
let _ = update_detail(ctx, pkt);
}
Ok(ProbeState::Success)
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
let (payload, info) = pkt.into_splite();
match info {
EsInfo::Video(video) => updata_video(ctx, video, payload),
EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
EsInfo::MpegTs => update_mpegts(ctx, payload),
EsInfo::Com => Ok(()),
_ => Ok(()),
}
}
fn probe(ctx: &mut Ctx) -> Result<()> {
let (video, payload) = ctx
.probe_cache
.iter()
.find(|e| e.is_video_key_packet())
.ok_or_else(|| anyhow!("Can not wait any video key frame"))?
.clone()
.into_video();
let muxer = match &mut ctx.muxer {
Some(muxer) => muxer,
None => {
let mux_config = MpegtsMuxerConfigBuilder::default()
.use_time_stamp(ctx.config.source_ts_enable_timeStamp)
.build()?;
let config_str = serde_json::to_string(&mux_config)?;
let muxer = ThisMuxer::new_with_name("mpegtsMuxer", &config_str)?;
ctx.muxer.insert(muxer)
},
};
let _ = mux_write_video(muxer, true, video, payload)?;
if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
let (audio, payload) = first_audio.to_owned().into_audio();
if ctx.config.source_ts_audioForceAAC && audio.codec != AudioCodec::Aac {
ctx.encoder.get_or_insert_with(AACEncoder::new);
}
let aac = match &mut ctx.encoder {
Some(encoder) => Some(encoder.write(&audio, payload)?),
None => {
if audio.codec != AudioCodec::Aac {
None
} else {
Some(payload)
}
},
};
let Some(aac) = aac else {
return Ok(());
};
let _ = mux_write_audio(muxer, true, audio, aac);
}
Ok(())
}
fn mux_write_video(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
Video {
frame_type,
codec,
width,
height,
fps,
time_stamp,
}: Video,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let tss = muxer.write(
&payload,
&CESStreamInfo {
streamType: if is_probe { 9 } else { 0 },
vCodecType: codec as _,
frameType: frame_type as _,
timestamp: time_stamp as _,
frameRate: fps as _,
video_height: height,
video_width: width,
..Default::default()
},
)?;
let mut ts_bytes = Vec::new();
for ts in tss {
let info = EsInfo::MpegTs;
let es = PacketEs::new(Bytes::copy_from_slice(ts), info);
ts_bytes.push(es);
}
Ok(ts_bytes)
}
fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let is_key = video.frame_type == FrameType::I;
let outs = mux_write_video(muxer, false, video, payload)?;
ctx.cache.input_packet(outs, is_key);
Ok(())
}
fn mux_write_audio(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
Audio {
codec: _,
channels,
bits,
sample_rate,
}: Audio,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let tss = muxer.write(
&payload,
&CESStreamInfo {
streamType: if is_probe { 10 } else { 1 },
aCodecType: 2,
aSampleRate: sample_rate,
aChannels: channels,
aSampleBits: bits,
..Default::default()
},
)?;
let ts_bytes = tss
.into_iter()
.map(|ts| PacketEs::new(Bytes::copy_from_slice(ts), EsInfo::MpegTs))
.collect();
Ok(ts_bytes)
}
fn update_mpegts(ctx: &mut Ctx, payload: Bytes) -> Result<()> {
let ts_pkt = PacketEs::new(payload, EsInfo::MpegTs);
ctx.cache.input_packet(vec![ts_pkt], true);
Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let aac = match &mut ctx.encoder {
Some(encoder) => Some(encoder.write(&audio, payload)?),
None => {
if audio.codec != AudioCodec::Aac {
None
} else {
Some(payload)
}
},
};
let Some(aac) = aac else {
return Ok(());
};
let outs = mux_write_audio(muxer, false, audio, aac)?;
ctx.cache.input_packet(outs, false);
Ok(())
}
impl SourceTs {
pub fn new(config: &SSConfig, is_idle: Arc<AtomicBool>, mut data_rx: SourceEleReceiver) -> Result<Self> {
let cache = GopCache::new_from_config(config, is_idle, false);
let (cmd_tx, mut cmd_rx) = Sub::actor();
let mut ctx = Ctx {
muxer: None,
cache,
config: config.to_owned(),
encoder: None,
probe_cache: Vec::new(),
is_probe: false,
got_data_time: DataInstant::new(*config.source_wait_audio),
};
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => {
let _ = process_data(&mut ctx, pkt);
},
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut ctx, ret),
None => break,
}
},
});
}
});
Ok(Self { cmd_tx })
}
}