use derive_builder::Builder;
#[cfg(feature = "std")]
use futures::{select_biased, FutureExt};
use crate::{
core::{
blocking,
utils::encoding::{
url_encode_extended, url_encoded_channel_groups, url_encoded_channels,
UrlEncodeExtension,
},
Deserializer, PubNubError, Transport, {TransportMethod, TransportRequest},
},
dx::{
pubnub_client::PubNubClientInstance,
subscribe::{builders, result::SubscribeResult, SubscribeResponseBody, SubscriptionCursor},
},
lib::{
alloc::{
format,
string::{String, ToString},
vec::Vec,
},
collections::HashMap,
},
};
#[cfg(feature = "std")]
use crate::core::event_engine::cancel::CancellationTask;
#[cfg(all(feature = "presence", feature = "std"))]
use crate::lib::alloc::vec;
#[derive(Builder)]
#[builder(
pattern = "owned",
build_fn(vis = "pub(in crate::dx::subscribe)", validate = "Self::validate"),
no_std
)]
pub(crate) struct SubscribeRequest<T, D> {
#[builder(field(vis = "pub(in crate::dx::subscribe)"), setter(custom))]
pub(in crate::dx::subscribe) pubnub_client: PubNubClientInstance<T, D>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(custom, strip_option),
default = "Vec::new()"
)]
pub(in crate::dx::subscribe) channels: Vec<String>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(custom, strip_option),
default = "Vec::new()"
)]
pub(in crate::dx::subscribe) channel_groups: Vec<String>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(strip_option),
default = "Default::default()"
)]
pub(in crate::dx::subscribe) cursor: SubscriptionCursor,
#[cfg(feature = "presence")]
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(custom, strip_option),
default = "None"
)]
pub(in crate::dx::subscribe) state: Option<Vec<u8>>,
#[builder(field(vis = "pub(in crate::dx::subscribe)"))]
pub(in crate::dx::subscribe) heartbeat: u64,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(strip_option),
default = "None"
)]
pub(in crate::dx::subscribe) filter_expression: Option<String>,
}
impl<T, D> SubscribeRequestBuilder<T, D> {
pub fn channels<L>(mut self, channels: L) -> Self
where
L: Into<Vec<String>>,
{
let mut unique = channels.into();
unique.sort_unstable();
unique.dedup();
self.channels = Some(unique);
self
}
pub fn channel_groups<L>(mut self, channel_groups: L) -> Self
where
L: Into<Vec<String>>,
{
let mut unique = channel_groups.into();
unique.sort_unstable();
unique.dedup();
self.channel_groups = Some(unique);
self
}
#[cfg(all(feature = "presence", feature = "std"))]
pub(in crate::dx::subscribe) fn state(mut self, state: HashMap<String, Vec<u8>>) -> Self {
let mut serialized_state = vec![b'{'];
for (key, mut value) in state {
serialized_state.append(&mut format!("\"{}\":", key).as_bytes().to_vec());
serialized_state.append(&mut value);
serialized_state.push(b',');
}
if serialized_state.last() == Some(&b',') {
serialized_state.pop();
}
serialized_state.push(b'}');
self.state = Some(Some(serialized_state));
self
}
fn validate(&self) -> Result<(), String> {
let groups_len = self.channel_groups.as_ref().map_or_else(|| 0, |v| v.len());
let channels_len = self.channels.as_ref().map_or_else(|| 0, |v| v.len());
builders::validate_configuration(&self.pubnub_client).and_then(|_| {
if channels_len == groups_len && channels_len == 0 {
Err("Either channels or channel groups should be provided".into())
} else {
Ok(())
}
})
}
fn request(self) -> Result<SubscribeRequest<T, D>, PubNubError> {
self.build()
.map_err(|err| PubNubError::general_api_error(err.to_string(), None, None))
}
}
impl<T, D> SubscribeRequest<T, D> {
pub(in crate::dx::subscribe) fn transport_request(
&self,
) -> Result<TransportRequest, PubNubError> {
let config = &self.pubnub_client.config;
let sub_key = &config.subscribe_key;
let mut query: HashMap<String, String> = HashMap::new();
query.extend::<HashMap<String, String>>(self.cursor.clone().into());
url_encoded_channel_groups(&self.channel_groups)
.and_then(|groups| query.insert("channel-group".into(), groups));
#[cfg(feature = "presence")]
if let Some(state) = self.state.as_ref() {
let state_json =
String::from_utf8(state.clone()).map_err(|err| PubNubError::Serialization {
details: err.to_string(),
})?;
query.insert("state".into(), state_json);
}
self.filter_expression
.as_ref()
.filter(|e| !e.is_empty())
.and_then(|e| {
query.insert(
"filter-expr".into(),
url_encode_extended(e.as_bytes(), UrlEncodeExtension::NonChannelPath),
)
});
query.insert("heartbeat".into(), self.heartbeat.to_string());
Ok(TransportRequest {
path: format!(
"/v2/subscribe/{sub_key}/{}/0",
url_encoded_channels(&self.channels)
),
query_parameters: query,
method: TransportMethod::Get,
#[cfg(feature = "std")]
timeout: config.transport.subscribe_request_timeout,
..Default::default()
})
}
}
impl<T, D> SubscribeRequestBuilder<T, D>
where
T: Transport + 'static,
D: Deserializer + 'static,
{
pub async fn execute(self) -> Result<SubscribeResult, PubNubError> {
let request = self.request()?;
let transport_request = request.transport_request()?;
let client = request.pubnub_client.clone();
let deserializer = client.deserializer.clone();
transport_request
.send::<SubscribeResponseBody, _, _, _>(
&client.transport,
deserializer,
#[cfg(feature = "std")]
&client.config.transport.retry_configuration,
#[cfg(feature = "std")]
&client.runtime,
)
.await
}
#[cfg(feature = "std")]
pub async fn execute_with_cancel(
self,
cancel_task: CancellationTask,
) -> Result<SubscribeResult, PubNubError> {
select_biased! {
_ = cancel_task.wait_for_cancel().fuse() => {
Err(PubNubError::EffectCanceled)
},
response = self.execute().fuse() => {
response
}
}
}
}
impl<T, D> SubscribeRequestBuilder<T, D>
where
T: blocking::Transport,
D: Deserializer + 'static,
{
pub fn execute_blocking(self) -> Result<SubscribeResult, PubNubError> {
let request = self
.build()
.map_err(|err| PubNubError::general_api_error(err.to_string(), None, None))?;
let transport_request = request.transport_request()?;
let client = request.pubnub_client.clone();
let deserializer = client.deserializer.clone();
transport_request
.send_blocking::<SubscribeResponseBody, _, _, _>(&client.transport, deserializer)
}
}
#[cfg(feature = "std")]
#[cfg(test)]
mod should {
use super::*;
use crate::{core::TransportResponse, PubNubClientBuilder};
#[tokio::test]
async fn be_able_to_cancel_subscribe_call() {
struct MockTransport;
#[async_trait::async_trait]
impl Transport for MockTransport {
async fn send(&self, _req: TransportRequest) -> Result<TransportResponse, PubNubError> {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
Ok(TransportResponse::default())
}
}
let (tx, rx) = async_channel::bounded(1);
let cancel_task = CancellationTask::new(rx, "test".into());
tx.send("test".into()).await.unwrap();
let result = PubNubClientBuilder::with_transport(MockTransport)
.with_keyset(crate::Keyset {
subscribe_key: "test",
publish_key: Some("test"),
secret_key: None,
})
.with_user_id("test")
.build()
.unwrap()
.subscribe_request()
.channels(vec!["test".into()])
.execute_with_cancel(cancel_task)
.await;
assert!(matches!(result, Err(PubNubError::EffectCanceled)));
}
}