Documentation
use streamhub::stream::StreamIdentifier;

use {
    super::errors::ClientError,
    crate::session::client_session::{ClientSession, ClientSessionType},
    streamhub::define::{BroadcastEvent, BroadcastEventReceiver, StreamHubEventSender},
    tokio::net::TcpStream,
};

pub struct PullClient {
    address: String,
    client_event_consumer: BroadcastEventReceiver,
    channel_event_producer: StreamHubEventSender,
}

impl PullClient {
    pub fn new(
        address: String,
        consumer: BroadcastEventReceiver,
        producer: StreamHubEventSender,
    ) -> Self {
        Self {
            address,

            client_event_consumer: consumer,
            channel_event_producer: producer,
        }
    }

    pub async fn run(&mut self) -> Result<(), ClientError> {
        loop {
            let event = self.client_event_consumer.recv().await?;

            if let BroadcastEvent::Subscribe {
                id: _,
                identifier:
                    StreamIdentifier::Rtmp {
                        app_name,
                        stream_name,
                    },
                server_address: _,
                result_sender: _,
            } = event
            {
                log::info!(
                    "receive pull event, app_name :{}, stream_name: {}",
                    app_name,
                    stream_name
                );
                let stream = TcpStream::connect(self.address.clone()).await?;

                let mut client_session = ClientSession::new(
                    stream,
                    ClientSessionType::Pull,
                    self.address.clone(),
                    app_name.clone(),
                    stream_name.clone(),
                    self.channel_event_producer.clone(),
                    0,
                );

                tokio::spawn(async move {
                    if let Err(err) = client_session.run().await {
                        log::error!("client_session as pull client run error: {}", err);
                    }
                });
            }
        }
    }
}