use async_channel::Sender;
use futures::future::BoxFuture;
use spin::RwLock;
use crate::{
core::{
event_engine::{Effect, EffectInvocation},
PubNubError,
},
lib::{
alloc::{string::String, sync::Arc, vec::Vec},
core::fmt::{Debug, Formatter},
},
presence::{
event_engine::{PresenceEffectInvocation, PresenceInput, PresenceParameters},
HeartbeatResult, LeaveResult,
},
};
mod heartbeat;
mod leave;
mod wait;
pub(in crate::dx::presence) type HeartbeatEffectExecutor = dyn Fn(PresenceParameters) -> BoxFuture<'static, Result<HeartbeatResult, PubNubError>>
+ Send
+ Sync;
pub(in crate::dx::presence) type WaitEffectExecutor =
dyn Fn(&str) -> BoxFuture<'static, Result<(), PubNubError>> + Send + Sync;
pub(in crate::dx::presence) type LeaveEffectExecutor = dyn Fn(PresenceParameters) -> BoxFuture<'static, Result<LeaveResult, PubNubError>>
+ Send
+ Sync;
pub(crate) enum PresenceEffect {
Heartbeat {
id: String,
input: PresenceInput,
executor: Arc<HeartbeatEffectExecutor>,
},
Leave {
id: String,
input: PresenceInput,
executor: Arc<LeaveEffectExecutor>,
},
Wait {
id: String,
cancelled: RwLock<bool>,
input: PresenceInput,
cancellation_channel: Sender<String>,
executor: Arc<WaitEffectExecutor>,
},
}
impl Debug for PresenceEffect {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Heartbeat { input, .. } => write!(
f,
"PresenceEffect::Heartbeat {{ channels: {:?}, channel groups: {:?}}}",
input.channels(),
input.channel_groups()
),
Self::Leave { input, .. } => write!(
f,
"PresenceEffect::Leave {{ channels: {:?}, channel groups: {:?}}}",
input.channels(),
input.channel_groups()
),
Self::Wait { input, .. } => write!(
f,
"PresenceEffect::Wait {{ channels: {:?}, channel groups: {:?}}}",
input.channels(),
input.channel_groups()
),
}
}
}
#[async_trait::async_trait]
impl Effect for PresenceEffect {
type Invocation = PresenceEffectInvocation;
fn name(&self) -> String {
match self {
Self::Heartbeat { .. } => "HEARTBEAT",
Self::Leave { .. } => "LEAVE",
Self::Wait { .. } => "WAIT",
}
.into()
}
fn id(&self) -> String {
match self {
Self::Heartbeat { id, .. } | Self::Leave { id, .. } | Self::Wait { id, .. } => id,
}
.into()
}
async fn run(&self) -> Vec<<Self::Invocation as EffectInvocation>::Event> {
match self {
Self::Heartbeat {
input, executor, ..
} => heartbeat::execute(input, executor).await,
Self::Leave {
input, executor, ..
} => leave::execute(input, executor).await,
Self::Wait { id, executor, .. } => wait::execute(id, executor).await,
}
}
fn cancel(&self) {
match self {
PresenceEffect::Wait {
id,
cancelled,
cancellation_channel,
..
} => {
{
let mut cancelled_slot = cancelled.write();
*cancelled_slot = true;
}
cancellation_channel
.send_blocking(id.clone())
.expect("Cancellation pipe is broken!");
}
_ => { }
}
}
fn is_cancelled(&self) -> bool {
match self {
Self::Wait { cancelled, .. } => *cancelled.read(),
_ => false,
}
}
}
#[cfg(test)]
mod it_should {
use super::*;
use uuid::Uuid;
#[tokio::test]
async fn send_wait_cancellation_wait_notification() {
let (tx, rx) = async_channel::bounded(1);
let effect = PresenceEffect::Wait {
id: Uuid::new_v4().to_string(),
cancelled: RwLock::new(false),
input: PresenceInput::new(&None, &None),
executor: Arc::new(|_| Box::pin(async move { Ok(()) })),
cancellation_channel: tx,
};
effect.cancel();
assert_eq!(rx.recv().await.unwrap(), effect.id())
}
}