use crate::{hls::define::*, packet::PacketEs, relay::SourceRelay};
use anyhow::Result;
use bytes::Bytes;
use cyberex::xserde::{
duration_serde::{DurationAsMillis, DurationAsSecs},
num_serde::{NumberI64, NumberU32, NumberU64, NumberUsize},
};
use cyberex::{ActorMessage, xasync::call::ActorPacket};
use derive_builder::Builder;
use serde::Deserialize;
use serde_with::{DefaultOnNull, serde_as};
use std::path::PathBuf;
use thiserror::Error;
use tokio::sync::{
broadcast,
mpsc::{Receiver, UnboundedSender},
};
pub type SourceEventSender = UnboundedSender<SourceEvent>;
pub type SourceDataSender = broadcast::Sender<PacketEs>;
pub type EsDataSender = broadcast::Sender<PacketEs>;
pub type IndexDataSender = broadcast::Sender<IndexPacket>;
pub type SourceEleReceiver = broadcast::Receiver<PacketEs>;
pub type EsDataSubscriber = Receiver<PacketEs>;
pub type ResourceSubscriber = Receiver<Result<Bytes>>;
pub type SourceResourceFileSubscriber = tokio::sync::oneshot::Receiver<Result<SourceResourceFile>>;
pub type EsDataSubscriberSender = UnboundedSender<PacketEs>;
pub type OneshotResult<T> = tokio::sync::oneshot::Sender<Result<T>>;
pub type SourceResourceFile = PathBuf;
pub type SourceResourceFileRet = OneshotResult<SourceResourceFile>;
pub type SourceStreamResourceFileRet = OneshotResult<ResourceSubscriber>;
pub type EsDataSubscriberRet = OneshotResult<EsDataSubscriber>;
pub(crate) type BroadcastRecvError = tokio::sync::broadcast::error::RecvError;
pub(crate) const RTP_STREAM_SEPERATE: &[u8; 4] = b"\r\n\r\n";
pub const SEGMENT_ROOT_PATH_PLACEHOLDER: &str = "${SEGMENT_ROOT_PATH}";
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub struct SchemeKey {
scheme: SSScheme,
muxer_config: String,
}
impl SchemeKey {
pub fn new(scheme: SSScheme, muxer_config: impl Into<String>) -> Self {
Self {
scheme,
muxer_config: muxer_config.into(),
}
}
}
#[derive(ActorMessage)]
pub(crate) enum Sub {
Sub(ActorPacket<(), EsDataSubscriber>),
Fet(ActorPacket<String, SourceResourceFile>),
}
pub struct IndexPacketSender {
pub input_data: EsDataSubscriberSender,
}
#[derive(Debug)]
pub struct SourceDataReceiver {
pub full_data: EsDataSubscriber,
}
#[derive(Clone)]
pub struct IndexPacket {
pub index: usize,
pub payload: Vec<PacketEs>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum SourceEvent {
Idles(String),
PushIdle { source_id: String, hint: String },
}
#[allow(clippy::enum_variant_names)]
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum HlsDlg {
M3uFile,
TsFile,
XFile(String),
}
impl HlsDlg {
pub fn new(uri: &str) -> Result<Self> {
let no_qs_uri = uri.split_once('?').unwrap_or((uri, uri)).0;
let extension = PathBuf::from(no_qs_uri)
.extension()
.ok_or_else(|| anyhow::anyhow!("hls dlg error, uri:{}", no_qs_uri))?
.to_string_lossy()
.to_string();
if extension == "m3u" || extension == "m3u8" {
Ok(HlsDlg::M3uFile)
} else if extension == "ts" {
Ok(HlsDlg::TsFile)
} else {
Ok(HlsDlg::XFile(extension))
}
}
}
#[derive(
Copy, Clone, Debug, PartialEq, Eq, Hash, strum_macros::Display, strum_macros::EnumString, strum_macros::EnumIter,
)]
pub enum SSScheme {
#[strum(serialize = "unkown")]
Unkown,
#[strum(serialize = "live.ts")]
MpegTs,
#[strum(to_string = "m3u8")] #[strum(serialize = "ts", serialize = "hls", serialize = "fmp4")] Hls,
#[strum(serialize = "live.fmp4")]
Fmp4,
#[strum(serialize = "live.flv")]
Flv,
#[strum(serialize = "live.eflv")]
EnhanceFlv,
#[strum(serialize = "es")]
Es,
#[strum(serialize = "live.rtp")]
Rtp,
#[strum(serialize = "mp2p")]
Mp2p,
#[strum(serialize = "live.ps")]
Ps,
#[strum(serialize = "live.data")]
Data,
#[strum(serialize = "live.blob")]
Blob,
#[strum(serialize = "live.g711a")]
G711a,
}
impl SSScheme {
pub fn to_content_type(self) -> &'static str {
match self {
SSScheme::Fmp4 => "video/mp4",
SSScheme::Flv | SSScheme::EnhanceFlv => "video/x-flv",
SSScheme::MpegTs => "video/mp2t",
SSScheme::Blob => "application/octet-stream",
_ => "",
}
}
}
#[serde_as]
#[derive(Deserialize, Builder, Debug, Clone, PartialEq, Eq, Default)]
#[serde(default)]
#[builder(setter(into))]
#[builder(default)]
#[allow(non_snake_case)]
pub struct SSConfig {
#[serde(rename(deserialize = "hls.filePath"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub hls_filePath: HlsFilePath,
#[serde(rename(deserialize = "hls.segDur"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub hls_segDur: DurationAsSecs<2>,
#[serde(rename(deserialize = "source.hls.idleFactor"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_hls_idleFactor: NumberUsize<5>,
#[serde(rename(deserialize = "hls.liveCnt"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub hls_liveCnt: NumberU64<6>,
#[serde(rename(deserialize = "hls.muxerType"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub hls_muxer_type: NumberU32<8>,
#[serde(rename(deserialize = "hls.retryCnt"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub hls_retryCnt: NumberI64<30>,
#[serde(rename(deserialize = "source.idleTime"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_idleTime: DurationAsMillis<3000>,
#[serde(rename(deserialize = "source.protectTime"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_protectTime: DurationAsMillis<20000>,
#[serde(rename(deserialize = "source.data.idleTime"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_data_IdleTime: DurationAsMillis<20000>,
#[serde(rename(deserialize = "source.enable_audio"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_enable_audio: bool,
#[serde(rename(deserialize = "source.enable_push_input"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_enable_push_input: bool,
#[serde(rename(deserialize = "source.fmp4.audioForceAAC"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_fmp4_audioForceAAC: bool,
#[serde(rename(deserialize = "source.ts.audioForceAAC"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_ts_audioForceAAC: bool,
#[serde(rename(deserialize = "source.flv.audioForceAAC"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_flv_audioForceAAC: bool,
#[serde(rename(deserialize = "source.rtp.audioForceAAC"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_rtp_audioForceAAC: bool,
#[serde(rename(deserialize = "source.waitAudioTime"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_wait_audio: DurationAsMillis<1000>,
#[serde(rename(deserialize = "source.data.gopMax"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_data_GopMax: NumberUsize<2>,
#[serde(rename(deserialize = "source.ts.enable.timestamp"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_ts_enable_timeStamp: bool,
#[serde(rename(deserialize = "source.g711a.forceSampleRate8000"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_g711a_forceSampleRate8000: bool,
#[serde(rename(deserialize = "source.jitterBufferMax"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_jitter_max: NumberUsize<128>,
#[serde(rename(deserialize = "source.clientJitterBufferMax"))]
#[serde_as(deserialize_as = "DefaultOnNull")]
pub source_client_jitter_max: NumberUsize<64>,
}
#[serde_as]
#[derive(Deserialize, Builder, Debug, Clone, PartialEq, Eq, Default)]
pub struct BlobMuxConfig {}
pub(crate) enum ProbeState {
Success,
Already,
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum SourceSSError {
#[error("Relay no found, relay_id: {0}")]
RelayNofound(String),
#[error("Source no found, source_id: {0}")]
SourceNofound(String),
}
#[derive(Debug)]
pub struct RelaySubject {
pub relay: SourceRelay,
pub relay_rx: Option<SourceDataReceiver>,
pub have_relay: bool, }
pub struct FetchM3u8Args {
relay_id: String,
resource_name: String,
res: SourceResourceFileRet,
}
impl FetchM3u8Args {
pub fn new(resource_name: impl Into<String>, relay_id: impl Into<String>, res: SourceResourceFileRet) -> Self {
Self {
relay_id: relay_id.into(),
resource_name: resource_name.into(),
res,
}
}
pub fn relay_id(&self) -> &str {
&self.relay_id
}
pub fn into_res(self) -> SourceResourceFileRet {
self.res
}
pub fn resource_name(&self) -> &str {
&self.resource_name
}
}
#[derive(Clone)]
pub enum SubId {
SourceId(String), RelayId(String), }
#[derive(ActorMessage)]
pub enum Ctrl {
AddRelaySource(ActorPacket<SourceRelay, ()>),
Subscribe(ActorPacket<String, RelaySubject>),
SubscribeInput(ActorPacket<SubId, IndexPacketSender>),
RemoveRelaySource(ActorPacket<String, ()>),
FetchM3u8(ActorPacket<FetchM3u8Args, RelaySubject>),
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use anyhow::bail;
use super::*;
#[test]
fn test_case_source_ss_error() {
fn error_return(use_my_error: bool) -> Result<()> {
if use_my_error {
Err(SourceSSError::RelayNofound("".to_string()).into())
} else {
bail!("another error")
}
}
assert!(error_return(true).unwrap_err().downcast::<SourceSSError>().is_ok());
assert!(error_return(false).unwrap_err().downcast::<SourceSSError>().is_err());
}
#[test]
fn test_ssscheme_from() {
use strum::IntoEnumIterator;
for scheme in SSScheme::iter() {
assert_eq!(scheme.to_string(), format!("{}", scheme));
let s = scheme.to_string();
assert_eq!(SSScheme::from_str(&s).unwrap(), scheme);
}
assert_eq!(SSScheme::from_str("ts").unwrap(), SSScheme::Hls);
assert_eq!(SSScheme::from_str("m3u8").unwrap(), SSScheme::Hls);
assert_eq!(SSScheme::from_str("fmp4").unwrap(), SSScheme::Hls);
assert_eq!(SSScheme::from_str("hls").unwrap(), SSScheme::Hls);
}
#[test]
fn test_ssscheme_hls_from() {
assert_eq!(SSScheme::Hls.to_string(), "m3u8");
assert_eq!(format!("{}", SSScheme::Hls), "m3u8");
}
#[test]
fn test_full() {
{
let c: SSConfig = serde_yaml::from_str(
r#"
hls.filePath: /root/path
hls.segDur: 3
hls.retryCnt: 10
hls.liveCnt: 5
fuck: you
fmp4.segMode: Gop
source.idleTime: 2000
source.protectTime: 30000
source.data.idleTime: 20000
source.data.gopMax: 3
source.waitAudioTime: 200
source.ts.enable.timestamp: true
source.ts.audioForceAAC: true
source.rtp.audioForceAAC: true
source.flv.audioForceAAC: true
source.g711a.forceSampleRate8000: true
source.hls.idleFactor: 1
source.enable_push_input: true
source.jitterBufferMax: 64
source.clientJitterBufferMax: 124
hls.muxerType: 13
source.hook.enable.lazyCreate: true
source.hook.enable: true
source.hook.timeout: 1000
"#,
)
.unwrap();
assert_eq!(
c,
SSConfig {
hls_filePath: "/root/path".to_string().into(),
hls_segDur: 3.into(),
hls_retryCnt: 10.into(),
hls_liveCnt: 5.into(),
source_idleTime: 2000.into(),
source_protectTime: 30000.into(),
source_data_IdleTime: 20000.into(),
source_enable_audio: false,
source_data_GopMax: 3.into(),
source_fmp4_audioForceAAC: false,
source_wait_audio: 200.into(),
source_ts_enable_timeStamp: true,
source_ts_audioForceAAC: true,
source_rtp_audioForceAAC: true,
source_flv_audioForceAAC: true,
source_g711a_forceSampleRate8000: true,
source_hls_idleFactor: 1.into(),
source_enable_push_input: true,
source_jitter_max: 64.into(),
source_client_jitter_max: 124.into(),
hls_muxer_type: 13.into(),
}
);
}
}
#[test]
fn test_empty() {
let c: SSConfig = serde_yaml::from_str(
r#"
fuck: you
"#,
)
.unwrap();
assert_eq!(c, SSConfig::default());
}
#[test]
fn test_null() {
let c: SSConfig = serde_yaml::from_str(
r#"
hls.filePath: null
hls.segDur: ~
hls.retryCnt:
"#,
)
.unwrap();
assert_eq!(c, SSConfig::default());
}
#[test]
fn test_hls_dlg_new() {
assert_eq!(HlsDlg::new("/test/1.ts").unwrap(), HlsDlg::TsFile);
assert_eq!(HlsDlg::new("/test/1.m3u8").unwrap(), HlsDlg::M3uFile);
assert_eq!(HlsDlg::new("/test/test.m3u8?auth=6d337538").unwrap(), HlsDlg::M3uFile);
assert_eq!(HlsDlg::new("/test/1.fmp4").unwrap(), HlsDlg::XFile("fmp4".to_string()));
assert_eq!(HlsDlg::new("/test/1.flv").unwrap(), HlsDlg::XFile("flv".to_string()));
}
}