use base64::{engine::general_purpose, Engine};
use crate::{
core::{CryptoProvider, PubNubError, ScalarValue},
dx::subscribe::result::{Envelope, EnvelopePayload, ObjectDataBody, Update},
lib::{
alloc::{
borrow::ToOwned,
boxed::Box,
string::{String, ToString},
sync::Arc,
vec::Vec,
},
collections::HashMap,
core::{
cmp::{Ord, Ordering, PartialOrd},
fmt::{Debug, Formatter},
result::Result,
},
},
};
#[derive(Debug, Clone)]
pub enum SubscribeStreamEvent {
Status(ConnectionStatus),
Update(Update),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize), serde(untagged))]
pub enum SubscribeMessageType {
Message = 0,
Signal = 1,
Object = 2,
MessageAction = 3,
File = 4,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum SubscriptionOptions {
ReceivePresenceEvents,
}
pub struct SubscriptionParams<'subscription, N: Into<String>> {
pub channels: Option<&'subscription [N]>,
pub channel_groups: Option<&'subscription [N]>,
pub options: Option<Vec<SubscriptionOptions>>,
}
#[derive(Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
pub struct SubscriptionCursor {
#[cfg_attr(feature = "serde", serde(rename = "t"))]
pub timetoken: String,
#[cfg_attr(feature = "serde", serde(rename = "r"))]
pub region: u32,
}
#[derive(Clone, PartialEq)]
pub enum ConnectionStatus {
Connected,
Reconnected,
Disconnected,
ConnectionError(PubNubError),
DisconnectedUnexpectedly(PubNubError),
SubscriptionChanged {
channels: Option<Vec<String>>,
channel_groups: Option<Vec<String>>,
},
}
#[derive(Debug, Clone)]
pub enum Presence {
Join {
timestamp: usize,
uuid: String,
channel: String,
subscription: String,
occupancy: usize,
#[cfg(feature = "serde")]
data: Option<serde_json::Value>,
#[cfg(not(feature = "serde"))]
data: Option<Vec<u8>>,
event_timestamp: usize,
},
Leave {
timestamp: usize,
channel: String,
subscription: String,
occupancy: usize,
uuid: String,
event_timestamp: usize,
},
Timeout {
timestamp: usize,
channel: String,
subscription: String,
occupancy: usize,
uuid: String,
event_timestamp: usize,
},
Interval {
timestamp: usize,
channel: String,
subscription: String,
occupancy: usize,
join: Option<Vec<String>>,
leave: Option<Vec<String>>,
timeout: Option<Vec<String>>,
event_timestamp: usize,
here_now_refresh: bool,
},
StateChange {
timestamp: usize,
channel: String,
subscription: String,
uuid: String,
#[cfg(feature = "serde")]
data: serde_json::Value,
#[cfg(not(feature = "serde"))]
data: Vec<u8>,
event_timestamp: usize,
},
}
#[derive(Debug, Clone)]
pub enum AppContext {
Channel {
event: Option<ObjectEvent>,
timestamp: Option<usize>,
name: Option<String>,
description: Option<String>,
r#type: Option<String>,
status: Option<String>,
id: String,
custom: Option<HashMap<String, ScalarValue>>,
updated: String,
tag: String,
subscription: String,
},
Uuid {
event: Option<ObjectEvent>,
timestamp: Option<usize>,
name: Option<String>,
email: Option<String>,
external_id: Option<String>,
profile_url: Option<String>,
r#type: Option<String>,
status: Option<String>,
id: String,
custom: Option<HashMap<String, ScalarValue>>,
updated: String,
tag: String,
subscription: String,
},
Membership {
event: Option<ObjectEvent>,
timestamp: Option<usize>,
channel: Box<AppContext>,
custom: Option<HashMap<String, ScalarValue>>,
status: Option<String>,
uuid: String,
updated: String,
tag: String,
subscription: String,
},
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct Message {
pub sender: Option<String>,
pub timestamp: usize,
pub channel: String,
pub subscription: String,
pub data: Vec<u8>,
pub r#type: Option<String>,
pub space_id: Option<String>,
#[cfg(feature = "serde")]
pub user_metadata: Option<serde_json::Value>,
#[cfg(not(feature = "serde"))]
pub user_metadata: Option<Vec<u8>>,
pub decryption_error: Option<PubNubError>,
}
#[derive(Debug, Clone)]
pub struct MessageAction {
pub event: MessageActionEvent,
pub sender: String,
pub timestamp: usize,
pub channel: String,
pub subscription: String,
pub message_timetoken: String,
pub action_timetoken: String,
pub r#type: String,
pub value: String,
}
#[derive(Debug, Clone)]
pub struct File {
pub sender: String,
pub timestamp: usize,
pub channel: String,
pub subscription: String,
pub message: String,
pub id: String,
pub name: String,
}
#[derive(Debug, Copy, Clone)]
pub enum ObjectEvent {
Update,
Delete,
}
#[derive(Debug, Copy, Clone)]
pub enum MessageActionEvent {
Update,
Delete,
}
impl SubscriptionCursor {
#[cfg(feature = "std")]
pub(crate) fn is_valid(&self) -> bool {
self.timetoken.len() == 17 && self.timetoken.chars().all(char::is_numeric)
}
}
impl Default for SubscriptionCursor {
fn default() -> Self {
Self {
timetoken: "0".into(),
region: 0,
}
}
}
impl PartialOrd for SubscriptionCursor {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
fn lt(&self, other: &Self) -> bool {
let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
lhs < rhs
}
fn le(&self, other: &Self) -> bool {
let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
lhs <= rhs
}
fn gt(&self, other: &Self) -> bool {
let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
lhs > rhs
}
fn ge(&self, other: &Self) -> bool {
let lhs = self.timetoken.parse::<u64>().expect("Invalid timetoken");
let rhs = other.timetoken.parse::<u64>().expect("Invalid timetoken");
lhs >= rhs
}
}
impl Ord for SubscriptionCursor {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap_or(Ordering::Equal)
}
}
impl Debug for SubscriptionCursor {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"SubscriptionCursor {{ timetoken: {}, region: {} }}",
self.timetoken, self.region
)
}
}
impl From<String> for SubscriptionCursor {
fn from(value: String) -> Self {
let mut timetoken = value;
if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) {
timetoken = "-1".into();
}
SubscriptionCursor {
timetoken,
..Default::default()
}
}
}
impl From<&str> for SubscriptionCursor {
fn from(value: &str) -> Self {
let mut timetoken = value;
if timetoken.len() != 17 || !timetoken.chars().all(char::is_numeric) {
timetoken = "-1";
}
SubscriptionCursor {
timetoken: timetoken.to_string(),
..Default::default()
}
}
}
impl From<usize> for SubscriptionCursor {
fn from(value: usize) -> Self {
let mut timetoken = value.to_string();
if timetoken.len() != 17 {
timetoken = "-1".into();
}
SubscriptionCursor {
timetoken,
..Default::default()
}
}
}
impl From<u64> for SubscriptionCursor {
fn from(value: u64) -> Self {
let mut timetoken = value.to_string();
if timetoken.len() != 17 {
timetoken = "-1".into();
}
SubscriptionCursor {
timetoken,
..Default::default()
}
}
}
impl TryFrom<String> for ObjectEvent {
type Error = PubNubError;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"update" => Ok(Self::Update),
"delete" => Ok(Self::Delete),
_ => Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected object event type".to_string(),
}),
}
}
}
impl TryFrom<String> for MessageActionEvent {
type Error = PubNubError;
fn try_from(value: String) -> Result<Self, Self::Error> {
match value.as_str() {
"update" => Ok(Self::Update),
"delete" => Ok(Self::Delete),
_ => Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected message action event type".to_string(),
}),
}
}
}
impl From<SubscriptionCursor> for HashMap<String, String> {
fn from(value: SubscriptionCursor) -> Self {
if value.timetoken.eq(&"0") {
HashMap::from([(String::from("tt"), value.timetoken)])
} else {
HashMap::from([
(String::from("tt"), value.timetoken.to_string()),
(String::from("tr"), value.region.to_string()),
])
}
}
}
impl Debug for ConnectionStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
Self::Connected => write!(f, "Connected"),
Self::Reconnected => write!(f, "Reconnected"),
Self::Disconnected => write!(f, "Disconnected"),
Self::ConnectionError(err) => write!(f, "ConnectionError({err:?})"),
ConnectionStatus::DisconnectedUnexpectedly(err) => {
write!(f, "DisconnectedUnexpectedly({err:?})")
}
Self::SubscriptionChanged {
channels,
channel_groups,
} => {
write!(
f,
"SubscriptionChanged {{ channels: {channels:?}, \
channel_groups: {channel_groups:?} }}"
)
}
}
}
}
#[cfg(feature = "std")]
impl Presence {
pub(crate) fn subscription(&self) -> String {
match self {
Self::Join { subscription, .. }
| Self::Leave { subscription, .. }
| Self::Timeout { subscription, .. }
| Self::Interval { subscription, .. }
| Self::StateChange { subscription, .. } => subscription.clone(),
}
}
pub(crate) fn event_timestamp(&self) -> usize {
match self {
Self::Join {
event_timestamp, ..
}
| Self::Leave {
event_timestamp, ..
}
| Self::Timeout {
event_timestamp, ..
}
| Self::Interval {
event_timestamp, ..
}
| Self::StateChange {
event_timestamp, ..
} => *event_timestamp,
}
}
}
#[cfg(feature = "std")]
impl AppContext {
pub(crate) fn subscription(&self) -> String {
match self {
Self::Channel { subscription, .. }
| Self::Uuid { subscription, .. }
| Self::Membership { subscription, .. } => subscription.clone(),
}
}
pub(crate) fn event_timestamp(&self) -> usize {
match self {
Self::Channel { timestamp, .. }
| Self::Uuid { timestamp, .. }
| Self::Membership { timestamp, .. } => timestamp.unwrap_or(0),
}
}
}
impl Update {
pub(in crate::dx::subscribe) fn decrypt(
self,
cryptor: &Arc<dyn CryptoProvider + Send + Sync>,
) -> Self {
if !matches!(self, Self::Message(_) | Self::Signal(_)) {
return self;
}
match self {
Self::Message(message) => Self::Message(message.decrypt(cryptor)),
Self::Signal(message) => Self::Signal(message.decrypt(cryptor)),
_ => unreachable!(),
}
}
}
impl Message {
fn decrypt(mut self, cryptor: &Arc<dyn CryptoProvider + Send + Sync>) -> Self {
let lossy_string = String::from_utf8_lossy(self.data.as_slice()).to_string();
let trimmed = lossy_string.trim_matches('"');
let decryption_result = general_purpose::STANDARD
.decode(trimmed)
.map_err(|err| PubNubError::Decryption {
details: err.to_string(),
})
.and_then(|base64_bytes| cryptor.decrypt(base64_bytes));
match decryption_result {
Ok(bytes) => {
self.data = bytes;
}
Err(error) => self.decryption_error = Some(error),
};
self
}
}
impl TryFrom<Envelope> for Presence {
type Error = PubNubError;
fn try_from(value: Envelope) -> Result<Self, Self::Error> {
let event_timestamp = value.published.timetoken.parse::<usize>().ok().unwrap_or(0);
let subscription = resolve_subscription_value(value.subscription, &value.channel);
let channel = value.channel.replace("-pnpres", "");
if let EnvelopePayload::PresenceStateChange {
timestamp,
uuid,
data,
..
} = value.payload
{
Ok(Self::StateChange {
timestamp,
uuid,
channel,
subscription,
data,
event_timestamp,
})
} else if let EnvelopePayload::PresenceAnnounce {
action,
timestamp,
uuid,
occupancy,
data,
} = value.payload
{
match action.as_str() {
"join" => Ok(Self::Join {
timestamp,
uuid,
channel,
subscription,
occupancy,
data,
event_timestamp,
}),
"leave" => Ok(Self::Leave {
timestamp,
uuid,
channel,
subscription,
occupancy,
event_timestamp,
}),
_ => Ok(Self::Timeout {
timestamp,
uuid,
channel,
subscription,
occupancy,
event_timestamp,
}),
}
} else if let EnvelopePayload::PresenceInterval {
timestamp,
occupancy,
join,
leave,
timeout,
here_now_refresh,
} = value.payload
{
Ok(Self::Interval {
timestamp,
channel,
subscription,
occupancy,
join,
leave,
timeout,
here_now_refresh: here_now_refresh.unwrap_or(false),
event_timestamp,
})
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected payload for presence.".to_string(),
})
}
}
}
impl TryFrom<Envelope> for AppContext {
type Error = PubNubError;
fn try_from(value: Envelope) -> Result<Self, Self::Error> {
let timestamp = value.published.timetoken.parse::<usize>();
if let EnvelopePayload::Object {
event,
r#type,
data,
..
} = value.payload
{
let update_type = r#type;
let subscription = resolve_subscription_value(value.subscription, &value.channel);
match data {
ObjectDataBody::Channel {
name,
description,
r#type,
status,
id,
custom,
updated,
tag,
} if update_type.as_str().eq("channel") => Ok(Self::Channel {
event: Some(event.try_into()?),
timestamp: timestamp.ok(),
name,
description,
r#type,
status,
id,
custom,
updated,
tag,
subscription,
}),
ObjectDataBody::Uuid {
name,
email,
external_id,
profile_url,
r#type,
status,
id,
custom,
updated,
tag,
} if update_type.as_str().eq("uuid") => Ok(Self::Uuid {
event: Some(event.try_into()?),
timestamp: timestamp.ok(),
name,
email,
external_id,
profile_url,
r#type,
status,
id,
custom,
updated,
tag,
subscription,
}),
ObjectDataBody::Membership {
channel,
custom,
uuid,
status,
updated,
tag,
} if update_type.as_str().eq("membership") => {
if let ObjectDataBody::Channel {
name,
description: channel_description,
r#type: channel_type,
status: channel_status,
id,
custom: channel_custom,
updated: channel_updated,
tag: channel_tag,
} = *channel
{
Ok(Self::Membership {
event: Some(event.try_into()?),
timestamp: timestamp.ok(),
channel: Box::new(AppContext::Channel {
event: None,
timestamp: None,
name,
description: channel_description,
r#type: channel_type,
status: channel_status,
id,
custom: channel_custom,
updated: channel_updated,
tag: channel_tag,
subscription: subscription.clone(),
}),
custom,
status,
uuid,
updated,
tag,
subscription,
})
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unknown object type.".to_string(),
})
}
}
_ => Err(PubNubError::Deserialization {
details: "Unable deserialize: unknown object type.".to_string(),
}),
}
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected payload for object.".to_string(),
})
}
}
}
impl TryFrom<Envelope> for Message {
type Error = PubNubError;
fn try_from(value: Envelope) -> Result<Self, Self::Error> {
let timestamp = value.published.timetoken.parse::<usize>().ok().unwrap_or(0);
let subscription = resolve_subscription_value(value.subscription, &value.channel);
if let EnvelopePayload::Message(_) = value.payload {
Ok(Self {
sender: value.sender,
timestamp,
channel: value.channel,
subscription,
data: value.payload.into(),
r#type: value.r#type,
space_id: value.space_id,
user_metadata: value.user_metadata,
decryption_error: None,
})
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected payload for message.".to_string(),
})
}
}
}
impl TryFrom<Envelope> for MessageAction {
type Error = PubNubError;
fn try_from(value: Envelope) -> Result<Self, Self::Error> {
let timestamp = value.published.timetoken.parse::<usize>().ok().unwrap_or(0);
let sender = value.sender.unwrap_or("".to_string());
let subscription = resolve_subscription_value(value.subscription, &value.channel);
if let EnvelopePayload::MessageAction { event, data, .. } = value.payload {
Ok(Self {
event: event.try_into()?,
sender,
timestamp,
channel: value.channel,
subscription,
message_timetoken: data.message_timetoken,
action_timetoken: data.action_timetoken,
r#type: data.r#type,
value: data.value,
})
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected payload for message action.".to_string(),
})
}
}
}
impl TryFrom<Envelope> for File {
type Error = PubNubError;
fn try_from(value: Envelope) -> Result<Self, Self::Error> {
let timestamp = value.published.timetoken.parse::<usize>().ok().unwrap_or(0);
let sender = value.sender.unwrap_or("".to_string());
let subscription = resolve_subscription_value(value.subscription, &value.channel);
if let EnvelopePayload::File { message, file } = value.payload {
Ok(Self {
sender,
timestamp,
channel: value.channel.clone(),
subscription,
message,
id: file.id,
name: file.name,
})
} else {
Err(PubNubError::Deserialization {
details: "Unable deserialize: unexpected payload for file.".to_string(),
})
}
}
}
fn resolve_subscription_value(subscription: Option<String>, channel: &str) -> String {
subscription.unwrap_or(channel.to_owned())
}
#[cfg(test)]
mod should {
use test_case::test_case;
use super::*;
#[test_case(
None,
"channel" => "channel".to_string();
"no subscription"
)]
#[test_case(
Some("channel".into()),
"channel2" => "channel".to_string();
"different subscription and channel"
)]
fn resolve_subscription_field_value(subscription: Option<String>, channel: &str) -> String {
resolve_subscription_value(subscription, channel)
}
#[test]
#[cfg(feature = "std")]
fn create_valid_subscription_cursor_as_struct() {
let cursor = SubscriptionCursor {
timetoken: "12345678901234567".into(),
region: 0,
};
assert!(cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_valid_subscription_cursor_from_string() {
let cursor: SubscriptionCursor = "12345678901234567".to_string().into();
assert!(cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_valid_subscription_cursor_from_string_slice() {
let cursor: SubscriptionCursor = "12345678901234567".into();
assert!(cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_valid_subscription_cursor_from_usize() {
let timetoken: usize = 12345678901234567;
let cursor: SubscriptionCursor = timetoken.into();
assert!(cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_valid_subscription_cursor_from_u64() {
let timetoken: u64 = 12345678901234567;
let cursor: SubscriptionCursor = timetoken.into();
assert!(cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_short_string() {
let cursor: SubscriptionCursor = "1234567890123467".to_string().into();
assert!(!cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_non_numeric_string() {
let cursor: SubscriptionCursor = "123456789a123467".to_string().into();
assert!(!cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_short_string_slice() {
let cursor: SubscriptionCursor = "1234567890123567".into();
assert!(!cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_non_numeric_string_slice() {
let cursor: SubscriptionCursor = "1234567890123a567".into();
assert!(!cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_too_small_usize() {
let timetoken: usize = 123456789012567;
let cursor: SubscriptionCursor = timetoken.into();
assert!(!cursor.is_valid())
}
#[test]
#[cfg(feature = "std")]
fn create_invalid_subscription_cursor_from_too_small_u64() {
let timetoken: u64 = 123901234567;
let cursor: SubscriptionCursor = timetoken.into();
assert!(!cursor.is_valid())
}
}