1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use {
    crate::{channels, decenc, error::Result, message},
    futures::future,
    log::info,
    std::net::IpAddr,
    thfmr_util::DropResult,
    tokio::{io, net, sync, sync::mpsc},
};

/// Producer facing side of the protocol. It is used to setup communication to producers.
///
/// A producer picks songs to play and delivers information about playing songs. It responds to
/// [crate::message::Message::Want] with [crate::message::Message::Next].
pub struct Producer {
    producer_address: (IpAddr, u16),
}

/// An individual connected producer instance
pub struct ProducerClient();

impl Producer {
    /// Construct a new [Producer] listening to the given producer_address.
    pub fn new(producer_address: (IpAddr, u16)) -> Producer {
        Producer { producer_address }
    }

    pub async fn run(
        &self,
        mut channels: channels::ChanPair<message::Message>,
        producer_conn: mpsc::Sender<mpsc::Sender<message::Message>>,
    ) -> Result<()> {
        let client_channels: sync::RwLock<Vec<mpsc::Sender<message::Message>>> =
            sync::RwLock::new(Vec::new());

        let (send_back, recv) = channels.split();

        let fan_out_future = Self::run_fan_out(&client_channels, recv);
        let runner_future = self.run_listener(&client_channels, send_back, producer_conn);

        future::try_join(fan_out_future, runner_future)
            .await
            .drop_result()
    }

    async fn run_fan_out(
        client_channels: &sync::RwLock<Vec<mpsc::Sender<message::Message>>>,
        recv: &mut mpsc::Receiver<message::Message>,
    ) -> Result<()> {
        let mut to_delete = Vec::new();

        loop {
            let received_message = match recv.recv().await {
                Some(s) => s,
                None => return Ok(()),
            };

            let client_channels_guard = client_channels.read().await;

            for (pos, client_channel) in client_channels_guard.iter().enumerate() {
                let res = client_channel.send(received_message.clone()).await;

                if res.is_err() {
                    to_delete.push(pos);
                }
            }
            drop(client_channels_guard);

            let mut client_channels_guard = client_channels.write().await;
            for idx in to_delete.iter().rev() {
                client_channels_guard.remove(*idx);
            }
            to_delete.clear();
            drop(client_channels_guard);
        }
    }

    async fn run_listener(
        &self,
        client_channels: &sync::RwLock<Vec<mpsc::Sender<message::Message>>>,
        send_back: &mpsc::Sender<message::Message>,
        producer_conn: mpsc::Sender<mpsc::Sender<message::Message>>,
    ) -> Result<()> {
        let listener = net::TcpListener::bind(&self.producer_address).await?;

        loop {
            let (socket, address) = listener.accept().await?;

            {
                let (send, recv) = mpsc::channel(1);

                let mut client_channels = client_channels.write().await;
                producer_conn.send(send.clone()).await?;
                client_channels.push(send);

                info!("Connected: {:?}", address);

                let mut channels = channels::ChanPair::from(send_back.clone(), recv);

                tokio::spawn(async move {
                    let mut client = ProducerClient {};

                    if let Err(e) = client.run(socket, &mut channels).await {
                        info!("Terminated: {:?}", e);
                    } else {
                        info!("Connection closed gracefully");
                    };
                });
            }
        }
    }
}

impl ProducerClient {
    async fn run(
        &mut self,
        socket: net::TcpStream,
        channels: &mut channels::ChanPair<message::Message>,
    ) -> Result<()> {
        let (send, mut recv) = channels.split();

        let (r, w) = io::split(socket);

        let read_future = decenc::run_read(r, &send);
        let write_future = decenc::run_write(w, &mut recv);

        future::try_join(read_future, write_future)
            .await
            .drop_result()
    }
}