use std::{
cmp,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicBool, AtomicI64, Ordering},
},
time::Duration,
};
use better_tokio_select::tokio_select;
use bytes::Bytes;
use chrono::Utc;
use tokio::sync::{mpsc::channel, oneshot};
use tokio_util::sync::CancellationToken;
use xux_rs::{
define::*,
hls::{
hlx::{Hls, HlsConfigBuilder},
io_fs::{HlsIoFs, HlsIoFsConfigBuilder},
},
};
use tracing::Instrument;
use crate::{
define::*,
packet::{EsInfo, PacketEs, Video},
relay::SourceRelay,
traits::ISource,
utils::wait_file_ready,
};
use anyhow::{Result, anyhow, bail};
struct Ctx {
last_client_time: Arc<AtomicI64>,
config: SSConfig,
have_relay: Arc<AtomicBool>,
hls: Hls,
source_id: String,
}
pub struct SourceHls {
cmd_tx: SubCaller,
cancel_token: CancellationToken,
}
#[async_trait::async_trait]
impl ISource for SourceHls {
async fn fetch(&self, res: SourceResourceFileRet, resource_name: &str) -> Result<()> {
let resource_name = resource_name.to_owned();
let cmd_tx = self.cmd_tx.clone();
tokio::spawn(async move {
let result = cmd_tx.fet(resource_name).await;
let _ = res.send(result);
});
Ok(())
}
async fn subscribe(&self) -> Result<SourceDataReceiver> {
let full_data = self.cmd_tx.sub(()).await?;
Ok(SourceDataReceiver { full_data })
}
}
fn process_data(ctx: &mut Ctx, pkt: PacketEs) -> Result<()> {
let (payload, info) = pkt.into_splite();
tokio::task::block_in_place(move || match info {
EsInfo::Com => bail!("Hls no support ps data input "),
EsInfo::Video(video) => updata_video(ctx, video, payload),
_ => Ok(()),
})
}
async fn subscribe_output(ctx: &Ctx) -> Result<EsDataSubscriber> {
let m3u8_file = format!("{}.m3u8", ctx.source_id);
let rxx = wait_resource_ready(ctx, m3u8_file).await;
let (tx, rx) = channel::<PacketEs>(1);
tokio::spawn(async move {
if let Ok(Ok(m3u8_path)) = rxx.await {
if let Ok(payload) = tokio::fs::read(m3u8_path).await {
let _ = tx.send(PacketEs::new_com(payload)).await;
}
}
});
Ok(rx)
}
async fn process_sub(ctx: &Ctx, sub: Sub) {
match sub {
Sub::Sub(actor_pkt) => {
let (_, ret) = actor_pkt.split();
let sub_result = subscribe_output(ctx).await;
let _ = ret.return_(sub_result);
},
Sub::Fet(actor_pkt) => {
let (args, ret) = actor_pkt.split();
let retv = wait_resource_ready(ctx, args).await;
tokio::spawn(async move {
let v = retv.await.map_err(|e| e.into()).flatten();
let _ = ret.return_(v);
});
},
}
}
async fn wait_resource_ready(ctx: &Ctx, hls_file: String) -> tokio::sync::oneshot::Receiver<Result<PathBuf>> {
let (tx, rx) = oneshot::channel();
let Ok(file_path) = ctx.hls.query_hls(&hls_file) else {
let _ = tx.send(Err(anyhow::anyhow!("query hls fail")));
return rx;
};
let last_client_time = ctx.last_client_time.clone();
let have_relay = ctx.have_relay.clone();
let timeout = Duration::from_secs(1) * (ctx.config.hls_retryCnt.into_inner() as _);
tokio::spawn(async move {
tokio_select!(match .. {
.. if let Ok(_) = wait_file_ready(&file_path) => {
last_client_time.store(Utc::now().timestamp(), Ordering::SeqCst);
have_relay.store(true, Ordering::SeqCst);
let _ = tx.send(Ok(PathBuf::from(&file_path)));
},
.. if let _ = tokio::time::sleep(timeout) => {
let _ = tx.send(Err(anyhow::anyhow!("timeout wait hls file, timeout:{:?}", timeout)));
},
});
});
rx
}
impl SourceHls {
pub fn new(
source_relay: &SourceRelay,
config: &SSConfig,
is_idle: Arc<AtomicBool>,
mut data_rx: SourceEleReceiver,
) -> Result<Self> {
let span = tracing::error_span!("Hls", source_id = source_relay.source_id());
let _guard = span.enter();
let last_client_time = Arc::new(AtomicI64::new(Utc::now().timestamp()));
let have_relay = Arc::new(AtomicBool::new(false));
let cancel_token = CancellationToken::new();
let source_id = source_relay.source_id().to_string();
tokio::spawn(
{
let cancel_token = cancel_token.child_token();
let last_client_time = last_client_time.clone();
let have_relay = have_relay.clone();
let timeout_idle =
config.hls_segDur.into_inner() * (*config.hls_liveCnt as _) * (*config.source_hls_idleFactor as _);
let protect_time = config.source_protectTime.into_inner();
let scan_time = cmp::min(timeout_idle, protect_time) / 5;
let mut interval = tokio::time::interval(scan_time);
async move {
cancel_token
.run_until_cancelled(async move {
loop {
interval.tick().await;
let have_relay = have_relay.load(Ordering::SeqCst);
let idle_time = if have_relay { timeout_idle } else { protect_time };
let have_idle = {
let dif = Utc::now().timestamp() - last_client_time.load(Ordering::SeqCst);
if dif < 0 {
true
} else {
dif as u64 >= idle_time.as_secs()
}
};
is_idle.store(have_idle, Ordering::SeqCst);
if have_idle {
tracing::info!(
"Hls source idle, idle_time:{:?}, have_relay: {}",
idle_time,
have_relay
);
return;
}
}
})
.await
}
}
.instrument(span.clone()),
);
let all_root = std::env::current_exe()?
.parent()
.ok_or_else(|| anyhow!("get root hls segment path fail"))?
.join(&*config.hls_filePath)
.join(&source_id)
.display()
.to_string();
let hls_muxer_type = *config.hls_muxer_type;
let hls_config = HlsConfigBuilder::default()
.hls_live_cnt((*config.hls_liveCnt).try_into()?)
.hls_segDur(config.hls_segDur.into_inner())
.muxer_type(hls_muxer_type)
.use_time_stamp(config.source_ts_enable_timeStamp)
.segment_root_path(SEGMENT_ROOT_PATH_PLACEHOLDER.to_string())
.source_enable_audio(config.source_enable_audio)
.build()?;
let fs_io_config = HlsIoFsConfigBuilder::default().all_root(&all_root).build()?;
let hls = Hls::new_with_config(&hls_config, HlsIoFs::new_with_config(&fs_io_config)?)?;
let mut ctx = Ctx {
hls,
last_client_time,
config: config.to_owned(),
have_relay,
source_id,
};
let (cmd_tx, mut cmd_rx) = Sub::actor();
tokio::spawn(
{
let cancel_token = cancel_token.child_token();
async move {
loop {
tokio_select!(match .. {
.. if let Ok(pkt) = data_rx.recv() => {
let _ = process_data(&mut ctx, pkt);
},
.. if let Some(ret) = cmd_rx.recv() => {
process_sub(&ctx, ret).await;
},
.. if let _ = cancel_token.cancelled() => {
clean(ctx);
return;
},
_ => {
return;
},
});
}
}
}
.instrument(span.clone()),
);
Ok(Self { cmd_tx, cancel_token })
}
fn clear(&self) {
self.cancel_token.cancel();
}
}
fn updata_video(
ctx: &mut Ctx,
Video {
frame_type,
codec,
height,
width,
fps,
time_stamp,
}: Video,
payload: Bytes,
) -> Result<()> {
ctx.hls.updata(
&payload,
&CESStreamInfo {
vCodecType: codec as _,
frameType: frame_type as _,
timestamp: time_stamp as _,
frameRate: fps as _,
video_height: height,
video_width: width,
..Default::default()
},
)
}
fn clean(ctx: Ctx) {
ctx.hls.clear();
}
impl Drop for SourceHls {
fn drop(&mut self) {
self.clear();
}
}