use super::registry::Registry;
use super::subscribe_loop::{
subscribe_loop, ControlCommand, ControlTx, ExitTx, ListenerType, SubscribeLoopParams,
};
use super::subscription::Subscription;
use crate::data::channel;
use crate::runtime::Runtime;
use crate::transport::Transport;
use crate::PubNub;
use futures_channel::{mpsc, oneshot};
use futures_util::sink::SinkExt;
use log::debug;
#[derive(Debug)]
pub(crate) struct SubscribeLoopSupervisor {
params: SubscribeLoopSupervisorParams,
control_tx: Option<ControlTx>,
}
#[derive(Debug)]
pub(crate) struct SubscribeLoopSupervisorParams {
pub exit_tx: Option<ExitTx>,
}
impl SubscribeLoopSupervisor {
pub fn new(params: SubscribeLoopSupervisorParams) -> Self {
Self {
params,
control_tx: None,
}
}
}
impl SubscribeLoopSupervisor {
pub async fn subscribe<'a, TTransport, TRuntime>(
&mut self,
pubnub: &'a mut PubNub<TTransport, TRuntime>,
channel: channel::Name,
) -> Subscription<TRuntime>
where
TTransport: Transport + 'static,
TRuntime: Runtime + 'static,
{
let (id, control_tx, channel_rx) = loop {
let (channel_tx, channel_rx) = mpsc::channel(10);
let id_or_retry = if let Some(ref mut control_tx) = self.control_tx {
debug!("Adding channel {:?} to the running loop", channel);
let (id_tx, id_rx) = oneshot::channel();
let listener = ListenerType::Channel(channel.clone());
let control_comm_result = control_tx
.send(ControlCommand::Add(listener, channel_tx, id_tx))
.await;
if control_comm_result.is_err() {
self.control_tx = None;
debug!("Restarting the subscription loop");
None
} else {
let id = id_rx.await.unwrap();
Some((id, control_tx.clone()))
}
} else {
let mut channels = Registry::new();
let (id, _) = channels.register(channel.clone(), channel_tx);
let (control_tx, control_rx) = mpsc::channel(10);
let (ready_tx, ready_rx) = oneshot::channel();
debug!("Creating the subscribe loop");
let subscribe_loop_params = SubscribeLoopParams {
control_rx,
ready_tx: Some(ready_tx),
exit_tx: self.params.exit_tx.clone(),
transport: pubnub.transport.clone(),
channels,
channel_groups: Registry::new(),
};
pubnub.runtime.spawn(subscribe_loop(subscribe_loop_params));
debug!("Waiting for subscription loop ready...");
ready_rx.await.expect("Unable to receive ready message");
self.control_tx = Some(control_tx.clone());
Some((id, control_tx))
};
match id_or_retry {
Some((id, control_tx)) => break (id, control_tx, channel_rx),
None => continue,
}
};
Subscription {
runtime: pubnub.runtime.clone(),
name: ListenerType::Channel(channel),
id,
control_tx,
channel_rx,
}
}
}