use derive_builder::Builder;
use crate::{
core::{blocking, Deserializer, PubNubError, Transport},
dx::{
pubnub_client::PubNubClientInstance,
subscribe::{SubscriptionCursor, Update},
},
lib::alloc::{collections::VecDeque, string::String, string::ToString, vec::Vec},
};
#[derive(Builder)]
#[builder(
pattern = "owned",
name = "RawSubscriptionBuilder",
build_fn(private, name = "build_internal", validate = "Self::validate"),
no_std
)]
pub struct RawSubscription<T, D> {
#[builder(field(vis = "pub(in crate::dx::subscribe)"), setter(custom))]
pub(in crate::dx::subscribe) pubnub_client: PubNubClientInstance<T, D>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(custom, strip_option),
default = "Vec::new()"
)]
pub(in crate::dx::subscribe) channels: Vec<String>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(custom, strip_option),
default = "Vec::new()"
)]
pub(in crate::dx::subscribe) channel_groups: Vec<String>,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(strip_option),
default = "Default::default()"
)]
pub(in crate::dx::subscribe) cursor: Option<u64>,
#[builder(field(vis = "pub(in crate::dx::subscribe)"))]
pub(in crate::dx::subscribe) heartbeat: u64,
#[builder(
field(vis = "pub(in crate::dx::subscribe)"),
setter(strip_option, into),
default = "None"
)]
pub(in crate::dx::subscribe) filter_expression: Option<String>,
}
impl<T, D> RawSubscriptionBuilder<T, D> {
pub fn channels<L>(mut self, channels: L) -> Self
where
L: Into<Vec<String>>,
{
let mut unique = channels.into();
unique.sort_unstable();
unique.dedup();
self.channels = Some(unique);
self
}
pub fn channel_groups<L>(mut self, channel_groups: L) -> Self
where
L: Into<Vec<String>>,
{
let mut unique = channel_groups.into();
unique.sort_unstable();
unique.dedup();
self.channel_groups = Some(unique);
self
}
fn validate(&self) -> Result<(), String> {
let groups_len = self.channel_groups.as_ref().map_or_else(|| 0, |v| v.len());
let channels_len = self.channels.as_ref().map_or_else(|| 0, |v| v.len());
if channels_len == groups_len && channels_len == 0 {
Err("Either channels or channel groups should be provided".into())
} else {
Ok(())
}
}
}
impl<T, D> RawSubscriptionBuilder<T, D>
where
T: Transport,
D: Deserializer,
{
pub fn execute(self) -> Result<RawSubscription<T, D>, PubNubError> {
self.build_internal()
.map_err(|e| PubNubError::SubscribeInitialization {
details: e.to_string(),
})
}
}
#[cfg(feature = "blocking")]
impl<T, D> RawSubscriptionBuilder<T, D>
where
T: blocking::Transport,
{
pub fn execute_blocking(self) -> Result<RawSubscription<T, D>, PubNubError> {
self.build_internal()
.map_err(|e| PubNubError::SubscribeInitialization {
details: e.to_string(),
})
}
}
impl<T, D> RawSubscription<T, D>
where
T: Transport + 'static,
D: Deserializer + 'static,
{
pub fn stream(self) -> impl futures::Stream<Item = Result<Update, PubNubError>> {
let cursor = self
.cursor
.map(|tt| SubscriptionCursor {
timetoken: tt.to_string(),
region: 0,
})
.unwrap_or_default();
let context = SubscriptionContext {
subscription: self,
cursor,
messages: VecDeque::new(),
};
futures::stream::unfold(context, |mut ctx| async {
while ctx.messages.is_empty() {
let mut request = ctx
.subscription
.pubnub_client
.subscribe_request()
.cursor(ctx.cursor.clone())
.channels(ctx.subscription.channels.clone())
.channel_groups(ctx.subscription.channel_groups.clone())
.heartbeat(ctx.subscription.heartbeat);
if let Some(filter_expr) = ctx.subscription.filter_expression.clone() {
request = request.filter_expression(filter_expr);
}
let response = request.execute().await;
if let Err(e) = response {
return Some((
Err(PubNubError::general_api_error(e.to_string(), None, None)),
ctx,
));
}
let response = response.expect("Should be Ok");
ctx.cursor = response.cursor;
ctx.messages.extend(response.messages.into_iter().map(Ok));
}
Some((ctx.messages.pop_front().expect("Shouldn't be empty!"), ctx))
})
}
}
#[cfg(feature = "blocking")]
impl<T, D> RawSubscription<T, D>
where
T: blocking::Transport,
{
pub fn iter(self) -> RawSubscriptionIter<T, D> {
let cursor = self
.cursor
.map(|tt| SubscriptionCursor {
timetoken: tt.to_string(),
region: 0,
})
.unwrap_or_default();
let context = SubscriptionContext {
subscription: self,
cursor,
messages: VecDeque::new(),
};
RawSubscriptionIter(context)
}
}
#[cfg(feature = "blocking")]
impl<T, D> Iterator for RawSubscriptionIter<T, D>
where
T: blocking::Transport,
D: Deserializer + 'static,
{
type Item = Result<Update, PubNubError>;
fn next(&mut self) -> Option<Self::Item> {
let ctx = &mut self.0;
while ctx.messages.is_empty() {
let mut request = ctx
.subscription
.pubnub_client
.subscribe_request()
.cursor(ctx.cursor.clone())
.channels(ctx.subscription.channels.clone())
.channel_groups(ctx.subscription.channel_groups.clone())
.heartbeat(ctx.subscription.heartbeat);
if let Some(filter_expr) = ctx.subscription.filter_expression.clone() {
request = request.filter_expression(filter_expr);
}
let response = request.execute_blocking();
if let Err(e) = response {
return Some(Err(PubNubError::general_api_error(
e.to_string(),
None,
None,
)));
}
let response = response.expect("Should be Ok");
let messages: Vec<_> = if let Some(cryptor) = &ctx.subscription.pubnub_client.cryptor {
response
.messages
.into_iter()
.map(|update| update.decrypt(cryptor))
.map(Ok)
.collect()
} else {
response.messages.into_iter().map(Ok).collect()
};
ctx.cursor = response.cursor;
ctx.messages.extend(messages);
}
Some(ctx.messages.pop_front().expect("Shouldn't be empty!"))
}
}
struct SubscriptionContext<T, D> {
subscription: RawSubscription<T, D>,
cursor: SubscriptionCursor,
messages: VecDeque<Result<Update, PubNubError>>,
}
pub struct RawSubscriptionIter<T, D>(SubscriptionContext<T, D>);
#[cfg(test)]
mod should {
use super::*;
use crate::{
core::{blocking, PubNubError, Transport, TransportRequest, TransportResponse},
providers::deserialization_serde::DeserializerSerde,
transport::middleware::PubNubMiddleware,
Keyset, PubNubClientBuilder,
};
struct MockTransport;
#[async_trait::async_trait]
impl Transport for MockTransport {
async fn send(&self, _req: TransportRequest) -> Result<TransportResponse, PubNubError> {
Ok(TransportResponse::default())
}
}
impl blocking::Transport for MockTransport {
fn send(&self, _req: TransportRequest) -> Result<TransportResponse, PubNubError> {
Ok(TransportResponse::default())
}
}
fn client() -> PubNubClientInstance<PubNubMiddleware<MockTransport>, DeserializerSerde> {
PubNubClientBuilder::with_transport(MockTransport)
.with_keyset(Keyset {
subscribe_key: "demo",
publish_key: None,
secret_key: None,
})
.with_user_id("rust-test-user")
.build()
.unwrap()
}
fn sut() -> RawSubscriptionBuilder<PubNubMiddleware<MockTransport>, DeserializerSerde> {
RawSubscriptionBuilder {
pubnub_client: Some(client()),
heartbeat: Some(300),
..Default::default()
}
}
#[test]
fn validate_channels_and_channel_groups() {
let builder = sut();
assert!(builder.validate().is_err());
let builder = sut().channels(vec!["ch1".into()]);
assert!(builder.validate().is_ok());
let builder = sut().channel_groups(vec!["cg1".into()]);
assert!(builder.validate().is_ok());
}
#[tokio::test]
async fn call_subscribe_endpoint_async() {
use futures::StreamExt;
let message = sut()
.channels(vec!["ch1".into()])
.execute()
.unwrap()
.stream()
.boxed()
.next()
.await;
assert!(message.is_some());
}
#[test]
fn call_subscribe_endpoint_blocking() {
let message = sut()
.channels(vec!["ch1".into()])
.execute_blocking()
.unwrap()
.iter()
.next();
assert!(message.is_some());
}
}