use crate::config::{AuthConfig, AuthSecretConfig};
use commonlib::auth::AuthType;
use rtmp::remuxer::RtmpRemuxer;
use std::sync::Arc;
use xrtsp::relay::pull_client_manager::RtspPullClientManager;
use {
super::api,
super::config::Config,
anyhow::Result,
commonlib::auth::Auth,
hls::remuxer::HlsRemuxer,
hls::server as hls_server,
httpflv::server as httpflv_server,
rtmp::{
relay::{pull_client::PullClient, push_client::PushClient},
rtmp::RtmpServer,
},
streamhub::{notify::http::HttpNotifier, notify::Notifier, StreamsHub},
tokio,
xrtsp::rtsp::RtspServer,
xwebrtc::webrtc::WebRTCServer,
};
pub struct Service {
cfg: Config,
}
impl Service {
pub fn new(cfg: Config) -> Self {
Service { cfg }
}
fn gen_auth(auth_config: &Option<AuthConfig>, authsecret: &AuthSecretConfig) -> Option<Auth> {
if let Some(cfg) = auth_config {
let auth_type = if let Some(push_enabled) = cfg.push_enabled {
if push_enabled && cfg.pull_enabled {
AuthType::Both
} else if !push_enabled && !cfg.pull_enabled {
AuthType::None
} else if push_enabled && !cfg.pull_enabled {
AuthType::Push
} else {
AuthType::Pull
}
} else {
match cfg.pull_enabled {
true => AuthType::Pull,
false => AuthType::None,
}
};
Some(Auth::new(
authsecret.key.clone(),
authsecret.password.clone(),
authsecret.push_password.clone(),
cfg.algorithm.clone(),
auth_type,
))
} else {
None
}
}
pub async fn run(&mut self) -> Result<()> {
let notifier: Option<Arc<dyn Notifier>> = if let Some(httpnotifier) = &self.cfg.httpnotify {
if !httpnotifier.enabled {
None
} else {
Some(Arc::new(HttpNotifier::new(
httpnotifier.on_publish.clone(),
httpnotifier.on_unpublish.clone(),
httpnotifier.on_play.clone(),
httpnotifier.on_stop.clone(),
)))
}
} else {
None
};
let mut stream_hub = StreamsHub::new(notifier);
self.start_httpflv(&mut stream_hub).await?;
self.start_hls(&mut stream_hub).await?;
self.start_rtmp(&mut stream_hub).await?;
self.start_rtsp(&mut stream_hub).await?;
self.start_webrtc(&mut stream_hub).await?;
self.start_http_api_server(&mut stream_hub).await?;
self.start_rtmp_remuxer(&mut stream_hub).await?;
tokio::spawn(async move {
stream_hub.run().await;
log::info!("stream hub end...");
});
Ok(())
}
async fn start_http_api_server(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let producer = stream_hub.get_hub_event_sender();
let http_api_port = if let Some(httpapi) = &self.cfg.httpapi {
httpapi.port
} else {
8000
};
tokio::spawn(async move {
api::run(producer, http_api_port).await;
});
Ok(())
}
async fn start_rtmp(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let rtmp_cfg = &self.cfg.rtmp;
if let Some(rtmp_cfg_value) = rtmp_cfg {
if !rtmp_cfg_value.enabled {
return Ok(());
}
let gop_num = if let Some(gop_num_val) = rtmp_cfg_value.gop_num {
gop_num_val
} else {
1
};
let producer = stream_hub.get_hub_event_sender();
if let Some(push_cfg_values) = &rtmp_cfg_value.push {
for push_value in push_cfg_values {
if !push_value.enabled {
continue;
}
log::info!("start rtmp push client..");
let address = format!(
"{ip}:{port}",
ip = push_value.address,
port = push_value.port
);
let mut push_client = PushClient::new(
address,
stream_hub.get_client_event_consumer(),
producer.clone(),
);
tokio::spawn(async move {
if let Err(err) = push_client.run().await {
log::error!("push client error {}", err);
}
});
stream_hub.set_rtmp_push_enabled(true);
}
}
if let Some(pull_cfg_value) = &rtmp_cfg_value.pull {
if pull_cfg_value.enabled {
let address = format!(
"{ip}:{port}",
ip = pull_cfg_value.address,
port = pull_cfg_value.port
);
log::info!("start rtmp pull client from address: {}", address);
let mut pull_client = PullClient::new(
address,
stream_hub.get_client_event_consumer(),
producer.clone(),
);
tokio::spawn(async move {
if let Err(err) = pull_client.run().await {
log::error!("pull client error {}", err);
}
});
stream_hub.set_rtmp_pull_enabled(true);
}
}
let listen_port = rtmp_cfg_value.port;
let address = format!("0.0.0.0:{listen_port}");
let auth = Self::gen_auth(&rtmp_cfg_value.auth, &self.cfg.authsecret);
let mut rtmp_server = RtmpServer::new(address, producer, gop_num, auth);
tokio::spawn(async move {
if let Err(err) = rtmp_server.run().await {
log::error!("rtmp server error: {}", err);
}
});
}
Ok(())
}
async fn start_rtmp_remuxer(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let mut rtsp_enabled = false;
if let Some(rtsp_cfg_value) = &self.cfg.rtsp {
if rtsp_cfg_value.enabled {
rtsp_enabled = true;
}
}
let mut whip_enabled = false;
if let Some(whip_cfg_value) = &self.cfg.webrtc {
if whip_cfg_value.enabled {
whip_enabled = true;
}
}
if !rtsp_enabled && !whip_enabled {
return Ok(());
}
let mut rtmp_enabled: bool = false;
if let Some(rtmp_cfg_value) = &self.cfg.rtmp {
if rtmp_cfg_value.enabled {
rtmp_enabled = true;
}
}
if !rtmp_enabled {
return Ok(());
}
let event_producer = stream_hub.get_hub_event_sender();
let broadcast_event_receiver = stream_hub.get_client_event_consumer();
let mut remuxer = RtmpRemuxer::new(broadcast_event_receiver, event_producer);
stream_hub.set_rtmp_remuxer_enabled(true);
tokio::spawn(async move {
if let Err(err) = remuxer.run().await {
log::error!("rtmp remuxer server error: {}", err);
}
});
Ok(())
}
async fn start_rtsp(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let rtsp_cfg = &self.cfg.rtsp;
if let Some(rtsp_cfg_value) = rtsp_cfg {
if !rtsp_cfg_value.enabled {
return Ok(());
}
let producer = stream_hub.get_hub_event_sender();
let listen_port = rtsp_cfg_value.port;
let address = format!("0.0.0.0:{listen_port}");
let auth = Self::gen_auth(&rtsp_cfg_value.auth, &self.cfg.authsecret);
let mut rtsp_server = RtspServer::new(address, producer, auth);
tokio::spawn(async move {
if let Err(err) = rtsp_server.run().await {
log::error!("rtsp server error: {}", err);
}
});
if rtsp_cfg_value.relay_enabled {
let mut rtsp_relay_manager = RtspPullClientManager::new(
stream_hub.get_client_event_consumer(),
stream_hub.get_hub_event_sender(),
);
tokio::spawn(async move {
if let Err(err) = rtsp_relay_manager.run().await {
log::error!("rtsp relay manager error: {}", err);
}
});
}
}
Ok(())
}
async fn start_webrtc(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let webrtc_cfg = &self.cfg.webrtc;
if let Some(webrtc_cfg_value) = webrtc_cfg {
if !webrtc_cfg_value.enabled {
return Ok(());
}
let producer = stream_hub.get_hub_event_sender();
let listen_port = webrtc_cfg_value.port;
let address = format!("0.0.0.0:{listen_port}");
let auth = Self::gen_auth(&webrtc_cfg_value.auth, &self.cfg.authsecret);
let mut webrtc_server = WebRTCServer::new(address, producer, auth);
tokio::spawn(async move {
if let Err(err) = webrtc_server.run().await {
log::error!("webrtc server error: {}", err);
}
});
}
Ok(())
}
async fn start_httpflv(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let httpflv_cfg = &self.cfg.httpflv;
if let Some(httpflv_cfg_value) = httpflv_cfg {
if !httpflv_cfg_value.enabled {
return Ok(());
}
let port = httpflv_cfg_value.port;
let event_producer = stream_hub.get_hub_event_sender();
let auth = Self::gen_auth(&httpflv_cfg_value.auth, &self.cfg.authsecret);
tokio::spawn(async move {
if let Err(err) = httpflv_server::run(event_producer, port, auth).await {
log::error!("httpflv server error: {}", err);
}
});
}
Ok(())
}
async fn start_hls(&mut self, stream_hub: &mut StreamsHub) -> Result<()> {
let hls_cfg = &self.cfg.hls;
if let Some(hls_cfg_value) = hls_cfg {
if !hls_cfg_value.enabled {
return Ok(());
}
let event_producer = stream_hub.get_hub_event_sender();
let cient_event_consumer = stream_hub.get_client_event_consumer();
let mut hls_remuxer = HlsRemuxer::new(
cient_event_consumer,
event_producer,
hls_cfg_value.need_record,
);
tokio::spawn(async move {
if let Err(err) = hls_remuxer.run().await {
log::error!("rtmp event processor error: {}", err);
}
});
let port = hls_cfg_value.port;
let auth = Self::gen_auth(&hls_cfg_value.auth, &self.cfg.authsecret);
tokio::spawn(async move {
if let Err(err) = hls_server::run(port, auth).await {
log::error!("hls server error: {}", err);
}
});
stream_hub.set_hls_enabled(true);
}
Ok(())
}
}