use {
super::{
define::{tag_type, HttpResponseDataProducer},
errors::{HttpFLvError, HttpFLvErrorValue},
},
crate::rtmp::{
cache::metadata::MetaData,
channels::define::{ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer},
session::{
common::SessionInfo,
define::SessionSubType,
errors::{SessionError, SessionErrorValue},
},
},
bytes::BytesMut,
std::time::Duration,
tokio::{
sync::{mpsc, oneshot},
time::sleep,
},
uuid::Uuid,
xflv::muxer::{FlvMuxer, HEADER_LENGTH},
};
pub struct HttpFlv {
app_name: String,
stream_name: String,
muxer: FlvMuxer,
event_producer: ChannelEventProducer,
data_consumer: ChannelDataConsumer,
http_response_data_producer: HttpResponseDataProducer,
subscriber_id: Uuid,
}
impl HttpFlv {
pub fn new(
app_name: String,
stream_name: String,
event_producer: ChannelEventProducer,
http_response_data_producer: HttpResponseDataProducer,
) -> Self {
let (_, data_consumer) = mpsc::unbounded_channel();
let subscriber_id = Uuid::new_v4();
Self {
app_name,
stream_name,
muxer: FlvMuxer::new(),
data_consumer,
event_producer,
http_response_data_producer,
subscriber_id,
}
}
pub async fn run(&mut self) -> Result<(), HttpFLvError> {
self.subscribe_from_rtmp_channels().await?;
self.send_media_stream().await?;
Ok(())
}
pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
self.muxer.write_flv_header()?;
self.muxer.write_previous_tag_size(0)?;
self.flush_response_data()?;
let mut retry_count = 0;
loop {
if let Some(data) = self.data_consumer.recv().await {
if let Err(err) = self.write_flv_tag(data) {
log::error!("write_flv_tag err: {}", err);
retry_count += 1;
} else {
retry_count = 0;
}
} else {
retry_count += 1;
}
if retry_count > 10 {
break;
}
}
self.unsubscribe_from_rtmp_channels().await
}
pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> {
let common_data: BytesMut;
let common_timestamp: u32;
let tag_type: u8;
match channel_data {
ChannelData::Audio { timestamp, data } => {
common_data = data;
common_timestamp = timestamp;
tag_type = tag_type::AUDIO;
}
ChannelData::Video { timestamp, data } => {
common_data = data;
common_timestamp = timestamp;
tag_type = tag_type::VIDEO;
}
ChannelData::MetaData { timestamp, data } => {
let mut metadata = MetaData::default();
metadata.save(data);
let data = metadata.remove_set_data_frame()?;
common_data = data;
common_timestamp = timestamp;
tag_type = tag_type::SCRIPT_DATA_AMF;
}
}
let common_data_len = common_data.len() as u32;
self.muxer
.write_flv_tag_header(tag_type, common_data_len, common_timestamp)?;
self.muxer.write_flv_tag_body(common_data)?;
self.muxer
.write_previous_tag_size(common_data_len + HEADER_LENGTH)?;
self.flush_response_data()?;
Ok(())
}
pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
let data = self.muxer.writer.extract_current_bytes();
self.http_response_data_producer.start_send(Ok(data))?;
Ok(())
}
pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
let session_info = SessionInfo {
subscriber_id: self.subscriber_id,
session_sub_type: SessionSubType::Player,
};
let subscribe_event = ChannelEvent::UnSubscribe {
app_name: self.app_name.clone(),
stream_name: self.stream_name.clone(),
session_info,
};
if let Err(err) = self.event_producer.send(subscribe_event) {
log::error!("unsubscribe_from_channels err {}\n", err);
}
Ok(())
}
pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
let mut retry_count: u8 = 0;
loop {
let (sender, receiver) = oneshot::channel();
let session_info = SessionInfo {
subscriber_id: self.subscriber_id,
session_sub_type: SessionSubType::Player,
};
let subscribe_event = ChannelEvent::Subscribe {
app_name: self.app_name.clone(),
stream_name: self.stream_name.clone(),
session_info: session_info,
responder: sender,
};
let rv = self.event_producer.send(subscribe_event);
match rv {
Err(_) => {
let session_error = SessionError {
value: SessionErrorValue::SendChannelDataErr,
};
return Err(HttpFLvError {
value: HttpFLvErrorValue::SessionError(session_error),
});
}
_ => {}
}
match receiver.await {
Ok(consumer) => {
self.data_consumer = consumer;
break;
}
Err(_) => {
if retry_count > 10 {
let session_error = SessionError {
value: SessionErrorValue::SubscribeCountLimitReach,
};
return Err(HttpFLvError {
value: HttpFLvErrorValue::SessionError(session_error),
});
}
}
}
sleep(Duration::from_millis(800)).await;
retry_count = retry_count + 1;
}
Ok(())
}
}