use super::{
FetchResponse, GenericError, Params, RequestPayload, Unsubscribe, ValidationError,
MAX_FETCH_BATCH_SIZE, MAX_RECEIVE_BATCH_SIZE, MAX_SUBSCRIPTION_BATCH_SIZE,
};
use crate::jwt::decode::{MessageId, SubscriptionId, Topic};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[doc(hidden)]
pub struct BatchSubscribe {
pub topics: Vec<Topic>,
}
impl RequestPayload for BatchSubscribe {
type Error = GenericError;
type Response = Vec<SubscriptionId>;
fn validate(&self) -> Result<(), ValidationError> {
let batch_size = self.topics.len();
if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}
if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_SUBSCRIPTION_BATCH_SIZE,
actual: batch_size,
});
}
for topic in &self.topics {
topic.decode().map_err(ValidationError::TopicDecoding)?;
}
Ok(())
}
fn into_params(self) -> Params {
Params::BatchSubscribe(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[doc(hidden)]
pub struct BatchUnsubscribe {
pub subscriptions: Vec<Unsubscribe>,
}
impl RequestPayload for BatchUnsubscribe {
type Error = GenericError;
type Response = bool;
fn validate(&self) -> Result<(), ValidationError> {
let batch_size = self.subscriptions.len();
if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}
if batch_size > MAX_SUBSCRIPTION_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_SUBSCRIPTION_BATCH_SIZE,
actual: batch_size,
});
}
for sub in &self.subscriptions {
sub.validate()?;
}
Ok(())
}
fn into_params(self) -> Params {
Params::BatchUnsubscribe(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[doc(hidden)]
pub struct BatchFetchMessages {
pub topics: Vec<Topic>,
}
impl RequestPayload for BatchFetchMessages {
type Error = GenericError;
type Response = FetchResponse;
fn validate(&self) -> Result<(), ValidationError> {
let batch_size = self.topics.len();
if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}
if batch_size > MAX_FETCH_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_FETCH_BATCH_SIZE,
actual: batch_size,
});
}
for topic in &self.topics {
topic.decode().map_err(ValidationError::TopicDecoding)?;
}
Ok(())
}
fn into_params(self) -> Params {
Params::BatchFetchMessages(self)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[doc(hidden)]
pub struct Receipt {
pub topic: Topic,
pub message_id: MessageId,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[doc(hidden)]
pub struct BatchReceiveMessages {
pub receipts: Vec<Receipt>,
}
impl RequestPayload for BatchReceiveMessages {
type Error = GenericError;
type Response = bool;
fn validate(&self) -> Result<(), ValidationError> {
let batch_size = self.receipts.len();
if batch_size == 0 {
return Err(ValidationError::BatchEmpty);
}
if batch_size > MAX_RECEIVE_BATCH_SIZE {
return Err(ValidationError::BatchLimitExceeded {
limit: MAX_RECEIVE_BATCH_SIZE,
actual: batch_size,
});
}
for receipt in &self.receipts {
receipt.topic.decode().map_err(ValidationError::TopicDecoding)?;
}
Ok(())
}
fn into_params(self) -> Params {
Params::BatchReceiveMessages(self)
}
}