use async_channel::Sender;
use futures::future::BoxFuture;
use spin::RwLock;
use crate::{
core::{event_engine::Effect, PubNubError},
dx::subscribe::{
event_engine::{
types::{SubscriptionInput, SubscriptionParams},
SubscribeEffectInvocation, SubscribeEvent,
},
result::{SubscribeResult, Update},
ConnectionStatus, SubscriptionCursor,
},
lib::{
alloc::{string::String, sync::Arc, vec::Vec},
core::fmt::{Debug, Formatter},
},
};
mod emit_messages;
mod emit_status;
mod handshake;
mod receive;
pub(in crate::dx::subscribe) type SubscribeEffectExecutor = dyn Fn(SubscriptionParams) -> BoxFuture<'static, Result<SubscribeResult, PubNubError>>
+ Send
+ Sync;
pub(in crate::dx::subscribe) type EmitStatusEffectExecutor = dyn Fn(ConnectionStatus) + Send + Sync;
pub(in crate::dx::subscribe) type EmitMessagesEffectExecutor =
dyn Fn(Vec<Update>, SubscriptionCursor) + Send + Sync;
pub(crate) enum SubscribeEffect {
Handshake {
id: String,
cancelled: RwLock<bool>,
input: SubscriptionInput,
cursor: Option<SubscriptionCursor>,
executor: Arc<SubscribeEffectExecutor>,
cancellation_channel: Sender<String>,
},
Receive {
id: String,
cancelled: RwLock<bool>,
input: SubscriptionInput,
cursor: SubscriptionCursor,
executor: Arc<SubscribeEffectExecutor>,
cancellation_channel: Sender<String>,
},
EmitStatus {
id: String,
status: ConnectionStatus,
executor: Arc<EmitStatusEffectExecutor>,
},
EmitMessages {
id: String,
next_cursor: SubscriptionCursor,
updates: Vec<Update>,
executor: Arc<EmitMessagesEffectExecutor>,
},
}
impl Debug for SubscribeEffect {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
Self::Handshake { input, .. } => write!(
f,
"SubscribeEffect::Handshake {{ channels: {:?}, channel groups: {:?} }}",
input.channels(),
input.channel_groups()
),
Self::Receive { input, cursor, .. } => write!(
f,
"SubscribeEffect::Receive {{ channels: {:?}, channel groups: {:?}, cursor: \
{cursor:?} }}",
input.channels(),
input.channel_groups()
),
Self::EmitStatus { status, .. } => {
write!(f, "SubscribeEffect::EmitStatus {{ status: {status:?} }}")
}
Self::EmitMessages { updates, .. } => {
write!(
f,
"SubscribeEffect::EmitMessages {{ messages: {updates:?} }}"
)
}
}
}
}
#[async_trait::async_trait]
impl Effect for SubscribeEffect {
type Invocation = SubscribeEffectInvocation;
fn name(&self) -> String {
match self {
Self::Handshake { .. } => "HANDSHAKE",
Self::Receive { .. } => "RECEIVE_MESSAGES",
Self::EmitStatus { .. } => "EMIT_STATUS",
Self::EmitMessages { .. } => "EMIT_MESSAGES",
}
.into()
}
fn id(&self) -> String {
match self {
Self::Handshake { id, .. }
| Self::Receive { id, .. }
| Self::EmitStatus { id, .. }
| Self::EmitMessages { id, .. } => id,
}
.into()
}
async fn run(&self) -> Vec<SubscribeEvent> {
match self {
Self::Handshake {
id,
input,
cursor,
executor,
..
} => handshake::execute(input, cursor, id, executor).await,
Self::Receive {
id,
input,
cursor,
executor,
..
} => receive::execute(input, cursor, id, executor).await,
Self::EmitStatus {
status, executor, ..
} => emit_status::execute(status.clone(), executor).await,
Self::EmitMessages {
updates,
executor,
next_cursor,
..
} => emit_messages::execute(next_cursor.clone(), updates.clone(), executor).await,
}
}
fn cancel(&self) {
match self {
Self::Handshake {
id,
cancelled,
cancellation_channel,
..
}
| Self::Receive {
id,
cancelled,
cancellation_channel,
..
} => {
{
let mut cancelled_slot = cancelled.write();
*cancelled_slot = true;
}
cancellation_channel
.send_blocking(id.clone())
.expect("cancellation pipe is broken!");
}
_ => { }
}
}
fn is_cancelled(&self) -> bool {
match self {
Self::Handshake { cancelled, .. } | Self::Receive { cancelled, .. } => {
*cancelled.read()
}
_ => false,
}
}
}
#[cfg(test)]
mod should {
use super::*;
use futures::FutureExt;
use uuid::Uuid;
#[tokio::test]
async fn send_cancellation_notification() {
let (tx, rx) = async_channel::bounded::<String>(1);
let effect = SubscribeEffect::Handshake {
id: Uuid::new_v4().to_string(),
cancelled: RwLock::new(false),
input: SubscriptionInput::new(&None, &None),
cursor: None,
executor: Arc::new(|_| {
async move {
Ok(SubscribeResult {
cursor: SubscriptionCursor::default(),
messages: vec![],
})
}
.boxed()
}),
cancellation_channel: tx,
};
effect.cancel();
assert_eq!(rx.recv().await.unwrap(), effect.id());
}
}