use crate::{
core::PubNubError,
lib::{
alloc::{
boxed::Box,
sync::Arc,
{string::String, vec::Vec},
},
collections::HashMap,
core::fmt::{Display, Formatter},
},
};
#[cfg(feature = "std")]
use crate::core::{runtime::RuntimeSupport, RequestRetryConfiguration, Runtime};
type DeserializerClosure<B> = Box<dyn FnOnce(&[u8]) -> Result<B, PubNubError>>;
#[derive(Clone, Eq, PartialEq, Debug, Default)]
pub enum TransportMethod {
#[default]
Get,
Post,
Delete,
}
impl Display for TransportMethod {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
write!(
f,
"{}",
match self {
TransportMethod::Get => "GET",
TransportMethod::Post => "POST",
TransportMethod::Delete => "DELETE",
}
)
}
}
#[derive(Clone, Eq, PartialEq, Debug, Default)]
pub struct TransportRequest {
pub path: String,
pub query_parameters: HashMap<String, String>,
pub method: TransportMethod,
pub headers: HashMap<String, String>,
pub body: Option<Vec<u8>>,
#[cfg(feature = "std")]
pub timeout: u64,
}
impl TransportRequest {
#[cfg(not(feature = "serde"))]
pub(crate) async fn send<B, R, T, D>(
&self,
transport: &T,
deserializer: Arc<D>,
#[cfg(feature = "std")] retry_configuration: RequestRetryConfiguration,
#[cfg(feature = "std")] runtime: RuntimeSupport,
) -> Result<R, PubNubError>
where
B: for<'de> super::Deserialize<'de>,
R: TryFrom<B, Error = PubNubError>,
T: super::Transport,
D: super::Deserializer + 'static,
{
#[cfg(feature = "std")]
{
let mut last_result;
let mut retry_attempt = 0_u8;
loop {
let deserializer_clone = deserializer.clone();
let response = transport.send(self.clone()).await;
last_result = Self::deserialize(
response?.clone(),
Box::new(move |bytes| deserializer_clone.deserialize(bytes)),
);
let Err(error) = last_result.as_ref() else {
break;
};
if let Some(delay) = retry_configuration.retry_delay(
Some(self.path.clone()),
&retry_attempt,
Some(error),
) {
retry_attempt += 1;
runtime.clone().sleep_microseconds(delay).await;
} else {
break;
}
}
last_result
}
#[cfg(not(feature = "std"))]
{
let response = transport.send(self.clone()).await;
Self::deserialize(
response?.clone(),
Box::new(move |bytes| deserializer.deserialize(bytes)),
)
}
}
#[cfg(feature = "serde")]
pub(crate) async fn send<B, R, T, D>(
&self,
transport: &T,
deserializer: Arc<D>,
#[cfg(feature = "std")] retry_configuration: &RequestRetryConfiguration,
#[cfg(feature = "std")] runtime: &RuntimeSupport,
) -> Result<R, PubNubError>
where
B: for<'de> serde::Deserialize<'de>,
R: TryFrom<B, Error = PubNubError>,
T: super::Transport + 'static,
D: super::Deserializer + 'static,
{
#[cfg(feature = "std")]
{
let mut last_result;
let mut retry_attempt = 0_u8;
loop {
let deserializer_clone = deserializer.clone();
let response = transport.send(self.clone()).await;
last_result = Self::deserialize(
response?.clone(),
Box::new(move |bytes| deserializer_clone.deserialize(bytes)),
);
let Err(error) = last_result.as_ref() else {
break;
};
if let Some(delay) = retry_configuration.retry_delay(
Some(self.path.clone()),
&retry_attempt,
Some(error),
) {
retry_attempt += 1;
runtime.clone().sleep_microseconds(delay).await;
} else {
break;
}
}
last_result
}
#[cfg(not(feature = "std"))]
{
let response = transport.send(self.clone()).await;
Self::deserialize(
response?.clone(),
Box::new(move |bytes| deserializer.deserialize(bytes)),
)
}
}
#[cfg(all(not(feature = "serde"), feature = "blocking"))]
pub(crate) fn send_blocking<B, R, T, D>(
&self,
transport: &T,
deserializer: Arc<D>,
) -> Result<R, PubNubError>
where
B: for<'de> super::Deserialize<'de>,
R: TryFrom<B, Error = PubNubError>,
T: super::blocking::Transport,
D: super::Deserializer + 'static,
{
let response = transport.send(self.clone())?;
Self::deserialize(
response.clone(),
Box::new(move |bytes| deserializer.deserialize(bytes)),
)
}
#[cfg(all(feature = "serde", feature = "blocking"))]
pub(crate) fn send_blocking<B, R, T, D>(
&self,
transport: &T,
deserializer: Arc<D>,
) -> Result<R, PubNubError>
where
B: for<'de> serde::Deserialize<'de>,
R: TryFrom<B, Error = PubNubError>,
T: super::blocking::Transport,
D: super::Deserializer + 'static,
{
let response = transport.send(self.clone())?;
Self::deserialize(
response.clone(),
Box::new(move |bytes| deserializer.deserialize(bytes)),
)
}
fn deserialize<B, R>(
response: super::TransportResponse,
des: DeserializerClosure<B>,
) -> Result<R, PubNubError>
where
R: TryFrom<B, Error = PubNubError>,
{
response
.clone()
.body
.map(|bytes| {
let deserialize_result = des(&bytes);
if deserialize_result.is_err() && response.status >= 500 {
Err(PubNubError::general_api_error(
"Unexpected service response",
None,
Some(Box::new(response.clone())),
))
} else {
deserialize_result
}
})
.map_or(
Err(PubNubError::general_api_error(
"No body in the response!",
None,
Some(Box::new(response.clone())),
)),
|response_body| {
response_body.and_then::<R, _>(|body: B| {
body.try_into().map_err(|response_error: PubNubError| {
response_error.attach_response(response)
})
})
},
)
}
}