pubnub-core 0.1.0

PubNub core crate, modular and composable
Documentation
use super::registry::Registry;
use super::subscribe_loop::{
    subscribe_loop, ControlCommand, ControlTx, ExitTx, ListenerType, SubscribeLoopParams,
};
use super::subscription::Subscription;
use crate::data::channel;
use crate::runtime::Runtime;
use crate::transport::Transport;
use crate::PubNub;
use futures_channel::{mpsc, oneshot};
use futures_util::sink::SinkExt;
use log::debug;

/// SubscribeLoopSupervisor is responsible for the lifecycle of the subscribe
/// loop.
/// It owns the subscribe loop control handle and provides a high-level
/// interface to the control operations.
/// It will intelligently spawn and respawn the subscribe loop on a need basis.
///
/// Deliberately doesn't implement `Clone` to avoid issues with improper
/// duplication of control handles.
#[derive(Debug)]
pub(crate) struct SubscribeLoopSupervisor {
    /// Configuration params.
    params: SubscribeLoopSupervisorParams,

    /// Control handle to the subscribe loop.
    control_tx: Option<ControlTx>,
}

/// SubscribeLoopSupervisorParams configuration params.
#[derive(Debug)]
pub(crate) struct SubscribeLoopSupervisorParams {
    /// If set, gets a signal when subscribe loop exits.
    pub exit_tx: Option<ExitTx>,
}

impl SubscribeLoopSupervisor {
    pub fn new(params: SubscribeLoopSupervisorParams) -> Self {
        Self {
            params,
            control_tx: None,
        }
    }
}

impl SubscribeLoopSupervisor {
    pub async fn subscribe<'a, TTransport, TRuntime>(
        &mut self,
        pubnub: &'a mut PubNub<TTransport, TRuntime>,
        channel: channel::Name,
    ) -> Subscription<TRuntime>
    where
        TTransport: Transport + 'static,
        TRuntime: Runtime + 'static,
    {
        // Since recursion is troublesome with async fns, we use the loop trick.
        let (id, control_tx, channel_rx) = loop {
            let (channel_tx, channel_rx) = mpsc::channel(10);

            let id_or_retry = if let Some(ref mut control_tx) = self.control_tx {
                // Send a command to add the channel to the running
                // subscribe loop.

                debug!("Adding channel {:?} to the running loop", channel);

                let (id_tx, id_rx) = oneshot::channel();

                // TODO: unify interfaces to either use `ListenerType` or
                // `&str` when we refer to a channel.
                let listener = ListenerType::Channel(channel.clone());

                let control_comm_result = control_tx
                    .send(ControlCommand::Add(listener, channel_tx, id_tx))
                    .await;

                if control_comm_result.is_err() {
                    // We got send error, this only happens when the receive
                    // half of the channel is closed.
                    // Assuming it was dropped because of being out of
                    // scope, we conclude the subscribe loop has completed.
                    // We simply cleanup the control tx, and retry
                    // subscribing.
                    // The successive subscribtion attempt will result in
                    // starting off of a new subscription loop and properly
                    // registering the channel there.
                    self.control_tx = None;

                    debug!("Restarting the subscription loop");

                    // This is equivalent to calling the `subscribe` fn
                    // recursively, given we're in the loop context.
                    None
                } else {
                    // We succesfully submitted the command, wait for
                    // subscription loop to communicate the subscription ID
                    // back to us.
                    let id = id_rx.await.unwrap();

                    // Return the values from the loop.
                    Some((id, control_tx.clone()))
                }
            } else {
                // Since there's no subscribe loop loop found, spawn a new
                // one.

                let mut channels = Registry::new();
                let (id, _) = channels.register(channel.clone(), channel_tx);

                let (control_tx, control_rx) = mpsc::channel(10);
                let (ready_tx, ready_rx) = oneshot::channel();

                debug!("Creating the subscribe loop");
                let subscribe_loop_params = SubscribeLoopParams {
                    control_rx,
                    ready_tx: Some(ready_tx),
                    exit_tx: self.params.exit_tx.clone(),

                    transport: pubnub.transport.clone(),

                    channels,
                    channel_groups: Registry::new(),
                };

                // Spawn the subscribe loop onto the runtime
                pubnub.runtime.spawn(subscribe_loop(subscribe_loop_params));

                // Waiting for subscription loop to communicate that it's
                // ready.
                // Will deadlock if the signal is never received, which will
                // only happen if the subscription loop is stuck somehow.
                // If subscription loop fails and goes out of scope we'll
                // get an error properly communicating that.
                debug!("Waiting for subscription loop ready...");
                ready_rx.await.expect("Unable to receive ready message");

                // Keep the control tx for later.
                self.control_tx = Some(control_tx.clone());

                // Return the values from the loop.
                Some((id, control_tx))
            };

            match id_or_retry {
                Some((id, control_tx)) => break (id, control_tx, channel_rx),
                None => continue,
            }
        };

        Subscription {
            runtime: pubnub.runtime.clone(),
            name: ListenerType::Channel(channel),
            id,
            control_tx,
            channel_rx,
        }
    }
}