use super::registry::Registry as GenericRegistry;
use crate::data::message::Message;
use crate::data::timetoken::Timetoken;
use crate::data::{channel, request, response};
use crate::transport::Service;
use futures_channel::{mpsc, oneshot};
use futures_util::future::{select, Either, FutureExt};
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use log::{debug, error};
use std::fmt::Debug;
pub(crate) use super::channel::{Rx as ChannelRx, Tx as ChannelTx};
pub(crate) use super::registry::ID as SubscriptionID;
pub(crate) type Registry = GenericRegistry<channel::Name, ChannelTx>;
pub(crate) type ReadyTx = oneshot::Sender<()>;
pub(crate) type ExitTx = mpsc::Sender<()>;
pub(crate) type ControlTx = mpsc::Sender<ControlCommand>;
pub(crate) type ControlRx = mpsc::Receiver<ControlCommand>;
pub(crate) type SubscriptionIdTx = oneshot::Sender<SubscriptionID>;
#[derive(Debug)]
pub(crate) enum ControlCommand {
Drop(SubscriptionID, ListenerType),
Add(ListenerType, ChannelTx, SubscriptionIdTx),
}
#[allow(dead_code)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ListenerType {
Channel(channel::Name), ChannelGroup(channel::Name), }
#[derive(Debug)]
pub(crate) struct SubscribeLoopParams<TTransport> {
pub control_rx: ControlRx,
pub ready_tx: Option<ReadyTx>,
pub exit_tx: Option<ExitTx>,
pub transport: TTransport,
pub channels: Registry,
pub channel_groups: Registry,
}
#[derive(Debug)]
struct StateData {
pub channels: Registry,
pub channel_groups: Registry,
}
pub(crate) async fn subscribe_loop<TTransport>(params: SubscribeLoopParams<TTransport>)
where
TTransport: Service<request::Subscribe, Response = response::Subscribe> + Clone,
<TTransport as Service<request::Subscribe>>::Error: Debug + 'static,
{
debug!("Starting subscribe loop");
#[allow(clippy::unneeded_field_pattern)]
let SubscribeLoopParams {
mut control_rx,
mut ready_tx,
mut exit_tx,
transport,
channels,
channel_groups,
} = params;
let mut state_data = StateData {
channels,
channel_groups,
};
let mut timetoken = Timetoken::default();
loop {
let channels: Vec<_> = state_data.channels.keys().cloned().collect();
let channel_groups: Vec<_> = state_data.channel_groups.keys().cloned().collect();
let request = request::Subscribe {
channels,
channel_groups,
timetoken,
};
let response = transport.call(request);
let response = response.fuse();
futures_util::pin_mut!(response);
let control_rx_recv = control_rx.next();
futures_util::pin_mut!(control_rx_recv);
let (messages, next_timetoken) = match select(control_rx_recv, response).await {
Either::Left((msg, _)) => {
let outcome = handle_control_command(&mut state_data, msg).await;
if let ControlOutcome::Terminate = outcome {
break;
}
continue;
}
Either::Right((res, _)) => {
match res {
Ok(v) => v,
Err(err) => {
error!("Transport error while polling: {:?}", err);
continue;
}
}
}
};
if let Some(ready_tx) = ready_tx.take() {
if let Err(err) = ready_tx.send(()) {
error!("Error sending ready message: {:?}", err);
break;
}
}
timetoken = next_timetoken;
debug!("messages: {:?}", messages);
debug!("timetoken: {:?}", timetoken);
dispatch_messages(&mut state_data, messages).await;
}
debug!("Stopping subscribe loop");
if let Some(ref mut exit_tx) = exit_tx {
exit_tx.send(()).await.expect("Unable to send exit message");
}
}
#[derive(Debug)]
enum ControlOutcome {
Terminate,
CanContinue,
}
async fn handle_control_command(
state_data: &mut StateData,
msg: Option<ControlCommand>,
) -> ControlOutcome {
debug!("Got request: {:?}", msg);
let request = match msg {
Some(v) => v,
None => return ControlOutcome::CanContinue,
};
let StateData {
channels,
channel_groups,
} = state_data;
match request {
ControlCommand::Drop(id, listener) => {
let (name, kind, registry, other_is_empty) = match listener {
ListenerType::Channel(name) => {
(name, "channel", channels, channel_groups.is_empty())
}
ListenerType::ChannelGroup(name) => {
(name, "group", channel_groups, channels.is_empty())
}
};
debug!("Removing {} from SubscribeLoop: {}", kind, name);
let (_, _effect) = registry
.unregister(&name, id)
.expect("Unable to get channel listeners");
if other_is_empty && registry.is_empty() {
ControlOutcome::Terminate
} else {
ControlOutcome::CanContinue
}
}
ControlCommand::Add(listener, channel_tx, id_tx) => {
let (name, kind, registry) = match listener {
ListenerType::Channel(name) => (name, "channel", channels),
ListenerType::ChannelGroup(name) => (name, "group", channel_groups),
};
debug!("Adding {} to SubscribeLoop: {}", kind, name);
let (id, _effect) = registry.register(name, channel_tx);
id_tx.send(id).expect("Unable to send subscription id");
ControlOutcome::CanContinue
}
}
}
async fn dispatch_messages(state_data: &mut StateData, messages: Vec<Message>) {
for message in messages {
let listeners = {
let channel = &message.channel;
let route = match &message.route {
None => None,
Some(route) if route == channel => None,
Some(route) => Some(route),
};
if let Some(ref route) = route {
debug!("route: {} (channel group)", &route);
state_data
.channel_groups
.get_iter_mut(route)
.expect("received a message with a route that has no listeners")
} else {
debug!("route: {} (channel)", &channel);
state_data
.channels
.get_iter_mut(channel)
.expect("received a message with a channel that has no listeners")
}
};
debug!(
"Delivering to {} listeners...",
listeners.size_hint().1.unwrap()
);
for channel_tx in listeners {
if let Err(error) = channel_tx.send(message.clone()).await {
error!("Delivery error: {:?}", error);
}
}
}
}