httpflv 1.0.0

httpflv library.
Documentation
use {
    super::{
        define::{tag_type, HttpResponseDataProducer},
        errors::{HttpFLvError, HttpFLvErrorValue},
    },
    crate::rtmp::{
        cache::metadata::MetaData,
        channels::define::{ChannelData, ChannelDataConsumer, ChannelEvent, ChannelEventProducer},
        session::{
            common::SessionInfo,
            define::SessionSubType,
            errors::{SessionError, SessionErrorValue},
        },
    },
    bytes::BytesMut,
    std::time::Duration,
    tokio::{
        sync::{mpsc, oneshot},
        time::sleep,
    },
    uuid::Uuid,
    xflv::muxer::{FlvMuxer, HEADER_LENGTH},
};

pub struct HttpFlv {
    app_name: String,
    stream_name: String,

    muxer: FlvMuxer,

    event_producer: ChannelEventProducer,
    data_consumer: ChannelDataConsumer,
    http_response_data_producer: HttpResponseDataProducer,
    subscriber_id: Uuid,
}

impl HttpFlv {
    pub fn new(
        app_name: String,
        stream_name: String,
        event_producer: ChannelEventProducer,
        http_response_data_producer: HttpResponseDataProducer,
    ) -> Self {
        let (_, data_consumer) = mpsc::unbounded_channel();
        let subscriber_id = Uuid::new_v4();

        Self {
            app_name,
            stream_name,
            muxer: FlvMuxer::new(),
            data_consumer,
            event_producer,
            http_response_data_producer,
            subscriber_id,
        }
    }

    pub async fn run(&mut self) -> Result<(), HttpFLvError> {
        self.subscribe_from_rtmp_channels().await?;
        self.send_media_stream().await?;

        Ok(())
    }

    pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> {
        self.muxer.write_flv_header()?;
        self.muxer.write_previous_tag_size(0)?;

        self.flush_response_data()?;
        let mut retry_count = 0;
        //write flv body
        loop {
            if let Some(data) = self.data_consumer.recv().await {
                if let Err(err) = self.write_flv_tag(data) {
                    log::error!("write_flv_tag err: {}", err);
                    retry_count += 1;
                } else {
                    retry_count = 0;
                }
            } else {
                retry_count += 1;
            }
            if retry_count > 10 {
                break;
            }
        }
        self.unsubscribe_from_rtmp_channels().await
    }

    pub fn write_flv_tag(&mut self, channel_data: ChannelData) -> Result<(), HttpFLvError> {
        let common_data: BytesMut;
        let common_timestamp: u32;
        let tag_type: u8;

        match channel_data {
            ChannelData::Audio { timestamp, data } => {
                common_data = data;
                common_timestamp = timestamp;
                tag_type = tag_type::AUDIO;
            }

            ChannelData::Video { timestamp, data } => {
                common_data = data;
                common_timestamp = timestamp;
                tag_type = tag_type::VIDEO;
            }

            ChannelData::MetaData { timestamp, data } => {
                let mut metadata = MetaData::default();
                metadata.save(data);
                let data = metadata.remove_set_data_frame()?;

                common_data = data;
                common_timestamp = timestamp;
                tag_type = tag_type::SCRIPT_DATA_AMF;
            }
        }

        let common_data_len = common_data.len() as u32;

        self.muxer
            .write_flv_tag_header(tag_type, common_data_len, common_timestamp)?;
        self.muxer.write_flv_tag_body(common_data)?;
        self.muxer
            .write_previous_tag_size(common_data_len + HEADER_LENGTH)?;

        self.flush_response_data()?;

        Ok(())
    }

    pub fn flush_response_data(&mut self) -> Result<(), HttpFLvError> {
        let data = self.muxer.writer.extract_current_bytes();
        self.http_response_data_producer.start_send(Ok(data))?;

        Ok(())
    }

    pub async fn unsubscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
        let session_info = SessionInfo {
            subscriber_id: self.subscriber_id,
            session_sub_type: SessionSubType::Player,
        };

        let subscribe_event = ChannelEvent::UnSubscribe {
            app_name: self.app_name.clone(),
            stream_name: self.stream_name.clone(),
            session_info,
        };
        if let Err(err) = self.event_producer.send(subscribe_event) {
            log::error!("unsubscribe_from_channels err {}\n", err);
        }

        Ok(())
    }

    pub async fn subscribe_from_rtmp_channels(&mut self) -> Result<(), HttpFLvError> {
        let mut retry_count: u8 = 0;

        loop {
            let (sender, receiver) = oneshot::channel();

            let session_info = SessionInfo {
                subscriber_id: self.subscriber_id,
                session_sub_type: SessionSubType::Player,
            };

            let subscribe_event = ChannelEvent::Subscribe {
                app_name: self.app_name.clone(),
                stream_name: self.stream_name.clone(),
                session_info: session_info,
                responder: sender,
            };

            let rv = self.event_producer.send(subscribe_event);
            match rv {
                Err(_) => {
                    let session_error = SessionError {
                        value: SessionErrorValue::SendChannelDataErr,
                    };
                    return Err(HttpFLvError {
                        value: HttpFLvErrorValue::SessionError(session_error),
                    });
                }
                _ => {}
            }

            match receiver.await {
                Ok(consumer) => {
                    self.data_consumer = consumer;
                    break;
                }
                Err(_) => {
                    if retry_count > 10 {
                        let session_error = SessionError {
                            value: SessionErrorValue::SubscribeCountLimitReach,
                        };
                        return Err(HttpFLvError {
                            value: HttpFLvErrorValue::SessionError(session_error),
                        });
                    }
                }
            }

            sleep(Duration::from_millis(800)).await;
            retry_count = retry_count + 1;
        }

        Ok(())
    }
}