use std::{
pin::Pin,
sync::{Arc, atomic::AtomicBool},
};
use crate::{
define::*,
gop_cache::GopCache,
packet::*,
relay::SourceRelay,
traits::ISource,
utils::{DataInstant, ElapsedState},
};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use th_rs::define::*;
use xux_rs::{define::*, muxer::Muxer};
const SCHEME: SSScheme = SSScheme::Ps;
pub struct SourcePs {
cmd_tx: SubCaller,
}
type ThisMuxer = Muxer;
struct Ctx {
muxer: Option<Pin<Box<ThisMuxer>>>,
cache: GopCache,
config: SSConfig,
got_data_time: DataInstant,
is_probe: bool,
probe_cache: Vec<PacketEs>,
source_relay: SourceRelay,
}
#[async_trait::async_trait]
impl ISource for SourcePs {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
if ctx.is_probe {
return Ok(ProbeState::Already);
}
ctx.probe_cache.push(pkt.clone());
let mut goto_probe = false;
if ctx.config.source_enable_audio {
if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
goto_probe = true;
}
} else {
goto_probe = true;
}
if pkt.is_audio_packet() {
goto_probe = true;
}
if !goto_probe {
bail!("Continue");
}
probe(ctx)?;
ctx.is_probe = true;
let probe_cache = std::mem::take(&mut ctx.probe_cache);
if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
tracing::warn!(
"probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
probe_cache.len(),
ctx.config.source_client_jitter_max
);
}
for pkt in probe_cache {
let _ = update_detail(ctx, pkt);
}
Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
let (video, payload) = ctx
.probe_cache
.iter()
.find(|e| e.is_video_key_packet())
.ok_or_else(|| anyhow!("Can not wait any video key frame"))?
.clone()
.into_video();
let muxer = match &mut ctx.muxer {
Some(muxer) => muxer,
None => ctx
.muxer
.insert(ThisMuxer::new_with_name("psMuxer", ctx.source_relay.muxer_config())?),
};
let _ = mux_write_video(muxer, true, video, payload);
if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
let (audio, payload) = first_audio.to_owned().into_audio();
let _ = mux_write_audio(
muxer,
true,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
payload,
);
}
match muxer.get_init_data() {
Ok(init_data) => {
let es = PacketEs::new(init_data, EsInfo::MpegPs);
ctx.cache.set_init_data(es);
},
Err(e) => {
tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
},
}
Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
match make_sure_probe(ctx, &pkt)? {
ProbeState::Success => Ok(()),
ProbeState::Already => update_detail(ctx, pkt),
}
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
let (payload, info) = pkt.into_splite();
match info {
EsInfo::Video(video) => updata_video(ctx, video, payload),
EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
EsInfo::Com => Ok(()),
_ => Ok(()),
}
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = ctx.cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow!("No support yet"));
},
}
}
fn mux_write_video(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
Video {
frame_type,
codec,
width,
height,
fps,
time_stamp: _,
}: Video,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let info = CESStreamInfo {
streamType: if is_probe {
TH_DATATYPE_TH_DT_EXTRA_VIDEO
} else {
TH_DATATYPE_TH_DT_VIDEO
} as _,
vCodecType: codec as _,
frameType: frame_type as _,
frameRate: fps as _,
video_height: height,
video_width: width,
..Default::default()
};
let muxeds = muxer.write(&payload, &info)?;
let mut outs = Vec::new();
for muxed in muxeds {
let info = EsInfo::MpegPs;
let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
outs.push(es);
}
Ok(outs)
}
fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let is_key = video.frame_type == FrameType::I;
let out = mux_write_video(muxer, false, video, payload)?;
ctx.cache.input_packet(out, is_key);
Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let outs = mux_write_audio(
muxer,
false,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
payload,
)?;
ctx.cache.input_packet(outs, false);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
codec: AudioCodec,
sample_rate: i32,
channels: i32,
bits: i32,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let info = CESStreamInfo {
streamType: if is_probe {
TH_DATATYPE_TH_DT_EXTRA_AUDIO
} else {
TH_DATATYPE_TH_DT_AUDIO
} as _,
aCodecType: codec as _,
aSampleRate: sample_rate,
aChannels: channels,
aSampleBits: bits,
..Default::default()
};
let muxeds = muxer.write(&payload, &info)?;
let mut outs = Vec::new();
for muxed in muxeds {
let info = EsInfo::MpegPs;
let es = PacketEs::new(Bytes::copy_from_slice(muxed), info);
outs.push(es);
}
Ok(outs)
}
impl SourcePs {
pub fn new(
source_relay: &SourceRelay,
config: &SSConfig,
is_idle: Arc<AtomicBool>,
mut data_rx: SourceEleReceiver,
) -> Result<Self> {
let cache = GopCache::new_from_config(config, is_idle, false);
let config = config.clone();
let (cmd_tx, mut cmd_rx) = Sub::actor();
let mut ctx = Ctx {
muxer: None,
cache,
config: config.to_owned(),
got_data_time: DataInstant::new(*config.source_wait_audio),
probe_cache: Vec::new(),
is_probe: false,
source_relay: source_relay.clone(),
};
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => {
let _ = process_data(&mut ctx, pkt);
},
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut ctx, ret),
None => break,
}
},
});
}
});
Ok(Self { cmd_tx })
}
}