use std::{
pin::Pin,
sync::{Arc, atomic::AtomicBool},
};
use crate::{
cpu_bound::AACEncoder,
utils::{DataInstant, ElapsedState},
};
use crate::{define::*, gop_cache::GopCache, packet::*, traits::ISource};
use anyhow::{Result, anyhow, bail};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use xux_rs::{define::*, flvMuxer::FlvMuxer};
const SCHEME: SSScheme = SSScheme::Flv;
pub struct SourceFlv {
cmd_tx: SubCaller,
}
type ThisMuxer = FlvMuxer;
struct Ctx {
muxer: Option<Pin<Box<ThisMuxer>>>,
cache: GopCache,
encoder: Option<AACEncoder>,
config: SSConfig,
got_data_time: DataInstant,
enhance: bool,
is_probe: bool,
probe_cache: Vec<PacketEs>,
}
#[async_trait::async_trait]
impl ISource for SourceFlv {
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn make_sure_probe(ctx: &mut Ctx, pkt: &PacketEs) -> Result<ProbeState> {
if ctx.is_probe {
return Ok(ProbeState::Already);
}
ctx.probe_cache.push(pkt.clone());
let mut goto_probe = false;
if ctx.config.source_enable_audio {
if ctx.got_data_time.elapsed(pkt.info()) == ElapsedState::Timeout {
tracing::warn!(
"Got to probe due to wait audio timeout, scheme:{}, source_wait_audio: {:?}",
SCHEME,
ctx.got_data_time.timeout()
);
goto_probe = true;
}
} else {
goto_probe = true;
}
if pkt.is_audio_packet() {
goto_probe = true;
}
if !goto_probe {
bail!("Continue");
}
probe(ctx)?;
ctx.is_probe = true;
let probe_cache = std::mem::take(&mut ctx.probe_cache);
if probe_cache.len() > *ctx.config.source_client_jitter_max as _ {
tracing::warn!(
"probe cache maybe too much and will lead to tuncate, probe_cache_len: {}, source_client_jitter_max: {}, see source.clientJitterMax for more detail",
probe_cache.len(),
ctx.config.source_client_jitter_max
);
}
for pkt in probe_cache {
let _ = update_detail(ctx, pkt);
}
Ok(ProbeState::Success)
}
fn probe(ctx: &mut Ctx) -> Result<()> {
let (video, payload) = ctx
.probe_cache
.iter()
.find(|e| e.is_video_key_packet())
.ok_or_else(|| anyhow!("Can not wait any video key frame"))?
.clone()
.into_video();
let muxer = ctx.muxer.get_or_insert_with(ThisMuxer::new);
let _ = mux_write_video(muxer, true, video, payload);
if ctx.enhance {
muxer.enable_enhance();
}
if let Some(first_audio) = ctx.probe_cache.iter().find(|e| e.is_audio_packet()) {
let (audio, payload) = first_audio.to_owned().into_audio();
let _ = mux_write_audio(
muxer,
true,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
payload,
);
}
match muxer.get_init_data() {
Ok(init_data) => {
let es = PacketEs::new(init_data, EsInfo::Flv);
ctx.cache.set_init_data(es);
},
Err(e) => {
tracing::error!("Get init data fail, error: {}, scheme: {}", e, SCHEME);
},
}
Ok(())
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
match make_sure_probe(ctx, &pkt)? {
ProbeState::Success => Ok(()),
ProbeState::Already => update_detail(ctx, pkt),
}
}
fn update_detail(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
let (payload, info) = pkt.into_splite();
match info {
EsInfo::Video(video) => updata_video(ctx, video, payload),
EsInfo::Audio(audio) => update_audio(ctx, audio, payload),
EsInfo::Com => Ok(()),
_ => Ok(()),
}
}
fn process_sub(ctx: &mut Ctx, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub = ctx.cache.subscribe_output();
let _ = ret.return_ok(sub);
},
Sub::Fet(actor_pkt) => {
let _ = actor_pkt.return_fail(anyhow!("No support yet"));
},
}
}
fn mux_write_video(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
Video {
frame_type,
codec,
width,
height,
fps,
time_stamp: _,
}: Video,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let flvs = muxer.write(
&payload,
&CESStreamInfo {
streamType: if is_probe { 9 } else { 0 },
vCodecType: codec as _,
frameType: frame_type as _,
frameRate: fps as _,
video_height: height,
video_width: width,
..Default::default()
},
)?;
let mut flv_bytes = Vec::new();
for flv in flvs {
let info = EsInfo::Flv;
let es = PacketEs::new(Bytes::copy_from_slice(flv), info);
flv_bytes.push(es);
}
Ok(flv_bytes)
}
fn updata_video(ctx: &mut Ctx, video: Video, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
let is_key = video.frame_type == FrameType::I;
let out = mux_write_video(muxer, false, video, payload)?;
ctx.cache.input_packet(out, is_key);
Ok(())
}
fn update_audio(ctx: &mut Ctx, audio: Audio, payload: Bytes) -> Result<()> {
let Some(muxer) = &mut ctx.muxer else {
return Ok(());
};
if ctx.config.source_flv_audioForceAAC && audio.codec != AudioCodec::Aac {
ctx.encoder.get_or_insert_with(AACEncoder::new);
}
let aac = match &mut ctx.encoder {
Some(encoder) => Some(encoder.write(&audio, payload)?),
None => {
if audio.codec == AudioCodec::Aac {
Some(payload)
} else {
None
}
},
};
let Some(aac) = aac else {
tracing::warn!("Flv audio only support aac");
return Ok(());
};
let outs = mux_write_audio(
muxer,
false,
audio.codec,
audio.sample_rate,
audio.channels,
audio.bits,
aac,
)?;
ctx.cache.input_packet(outs, false);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn mux_write_audio(
muxer: &mut Pin<Box<ThisMuxer>>,
is_probe: bool,
_codec: AudioCodec, sample_rate: i32,
channels: i32,
bits: i32,
payload: Bytes,
) -> Result<Vec<PacketEs>> {
let flvs = muxer.write(
&payload,
&CESStreamInfo {
streamType: if is_probe { 10 } else { 1 },
aCodecType: 2,
aSampleRate: sample_rate,
aChannels: channels,
aSampleBits: bits,
..Default::default()
},
)?;
let mut flv_bytes = Vec::new();
for flv in flvs {
let info = EsInfo::Flv;
let es = PacketEs::new(Bytes::copy_from_slice(flv), info);
flv_bytes.push(es);
}
Ok(flv_bytes)
}
impl SourceFlv {
pub fn new(
config: &SSConfig,
enhance: bool,
is_idle: Arc<AtomicBool>,
mut data_rx: SourceEleReceiver,
) -> Result<Self> {
let cache = GopCache::new_from_config(config, is_idle, true);
let config = config.clone();
let (cmd_tx, mut cmd_rx) = Sub::actor();
let mut ctx = Ctx {
muxer: None,
cache,
encoder: None,
config: config.to_owned(),
got_data_time: DataInstant::new(*config.source_wait_audio),
enhance,
probe_cache: Vec::new(),
is_probe: false,
};
tokio::spawn(async move {
loop {
tokio_select!(match .. {
.. if let pkt = data_rx.recv() => {
match pkt {
Ok(pkt) => {
let _ = process_data(&mut ctx, pkt);
},
Err(BroadcastRecvError::Closed) => {
return;
},
Err(BroadcastRecvError::Lagged(skiped)) => {
tracing::warn!("`Slow source` detect, data drop, lagged:{}", skiped)
},
}
},
.. if let ret = cmd_rx.recv() => {
match ret {
Some(ret) => process_sub(&mut ctx, ret),
None => break,
}
},
});
}
});
Ok(Self { cmd_tx })
}
}