pub struct BroccoliQueue { /* private fields */ }Expand description
Main queue interface for interacting with the message broker.
BroccoliQueue provides methods for publishing and consuming messages,
as well as processing messages with custom handlers.
Implementations§
Source§impl BroccoliQueue
impl BroccoliQueue
Sourcepub fn builder(broker_url: impl Into<String>) -> BroccoliQueueBuilder
pub fn builder(broker_url: impl Into<String>) -> BroccoliQueueBuilder
Sourcepub async fn publish<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
disambiguator: Option<String>,
message: &T,
options: Option<PublishOptions>,
) -> Result<BrokerMessage<T>, BroccoliError>
pub async fn publish<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, disambiguator: Option<String>, message: &T, options: Option<PublishOptions>, ) -> Result<BrokerMessage<T>, BroccoliError>
Sourcepub async fn publish_batch<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
disambiguator: Option<String>,
messages: impl IntoIterator<Item = T>,
options: Option<PublishOptions>,
) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
pub async fn publish_batch<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, disambiguator: Option<String>, messages: impl IntoIterator<Item = T>, options: Option<PublishOptions>, ) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
Sourcepub async fn consume<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
options: Option<ConsumeOptions>,
) -> Result<BrokerMessage<T>, BroccoliError>
pub async fn consume<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, options: Option<ConsumeOptions>, ) -> Result<BrokerMessage<T>, BroccoliError>
Consumes a message from the specified topic. This method will block until a message is available.
This will not acknowledge the message, use acknowledge to remove the message from the processing queue,
or reject to move the message to the failed queue.
§Arguments
topic- The name of the topic.
§Returns
A Result containing the consumed message, or a BroccoliError on failure.
§Errors
If the message fails to consume, a BroccoliError will be returned.
Sourcepub async fn consume_batch<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
batch_size: usize,
timeout: Duration,
options: Option<ConsumeOptions>,
) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
pub async fn consume_batch<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, batch_size: usize, timeout: Duration, options: Option<ConsumeOptions>, ) -> Result<Vec<BrokerMessage<T>>, BroccoliError>
Consumes a batch of messages from the specified topic. This method will block until the specified number of messages are consumed.
This will not acknowledge the message, use acknowledge to remove the message from the processing queue,
or reject to move the message to the failed queue.
§Arguments
topic- The name of the topic.batch_size- The number of messages to consume.timeout- The timeout duration for consuming messages.
§Returns
A Result containing a vector of consumed messages, or a BroccoliError on failure.
§Errors
If the messages fail to consume, a BroccoliError will be returned.
Sourcepub async fn try_consume<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
options: Option<ConsumeOptions>,
) -> Result<Option<BrokerMessage<T>>, BroccoliError>
pub async fn try_consume<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, options: Option<ConsumeOptions>, ) -> Result<Option<BrokerMessage<T>>, BroccoliError>
Attempts to consume a message from the specified topic. This method will not block, returning immediately if no message is available.
This will not acknowledge the message, use acknowledge to remove the message from the processing queue,
or reject to move the message to the failed queue.
§Arguments
topic- The name of the topic.
§Returns
A Result containing an Option with the consumed message if available, or a BroccoliError on failure.
§Errors
If the message fails to consume, a BroccoliError will be returned.
Sourcepub async fn acknowledge<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
message: BrokerMessage<T>,
) -> Result<(), BroccoliError>
pub async fn acknowledge<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, message: BrokerMessage<T>, ) -> Result<(), BroccoliError>
Sourcepub async fn reject<T: Clone + Serialize + DeserializeOwned>(
&self,
topic: &str,
message: BrokerMessage<T>,
) -> Result<(), BroccoliError>
pub async fn reject<T: Clone + Serialize + DeserializeOwned>( &self, topic: &str, message: BrokerMessage<T>, ) -> Result<(), BroccoliError>
Sourcepub async fn process_messages<T, F, Fut>(
&self,
topic: &str,
concurrency: Option<usize>,
consume_options: Option<ConsumeOptions>,
handler: F,
) -> Result<(), BroccoliError>where
T: DeserializeOwned + Send + Clone + Serialize + 'static,
F: Fn(BrokerMessage<T>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
pub async fn process_messages<T, F, Fut>(
&self,
topic: &str,
concurrency: Option<usize>,
consume_options: Option<ConsumeOptions>,
handler: F,
) -> Result<(), BroccoliError>where
T: DeserializeOwned + Send + Clone + Serialize + 'static,
F: Fn(BrokerMessage<T>) -> Fut + Send + Sync + Clone + 'static,
Fut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
Processes messages from the specified topic with the provided handler function.
§Example
use broccoli_queue::queue::BroccoliQueue;
use broccoli_queue::brokers::broker::BrokerMessage;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct JobPayload {
id: String,
task_name: String,
created_at: chrono::DateTime<chrono::Utc>,
}
#[tokio::main]
async fn main() {
let queue = BroccoliQueue::builder("redis://localhost:6379")
.failed_message_retry_strategy(Default::default())
.pool_connections(5)
.build()
.await
.unwrap();
queue.process_messages("jobs", None, None, |message: BrokerMessage<JobPayload>| async move {
println!("Received message: {:?}", message);
Ok(())
}).await.unwrap();
}§Arguments
topic- The name of the topic.concurrency- The number of concurrent message handlers.handler- The handler function to process messages. This function should return aBroccoliErroron failure.
§Returns
A Result indicating success or failure.
§Errors
If the message fails to process, a BroccoliError will be returned.
Sourcepub async fn process_messages_with_handlers<T, F, MessageFut, SuccessFut, ErrorFut, S, E, R>(
&self,
topic: &str,
concurrency: Option<usize>,
consume_options: Option<ConsumeOptions>,
message_handler: F,
on_success: S,
on_error: E,
) -> Result<(), BroccoliError>where
T: DeserializeOwned + Send + Clone + Serialize + 'static,
F: Fn(BrokerMessage<T>) -> MessageFut + Send + Sync + Clone + 'static,
MessageFut: Future<Output = Result<R, BroccoliError>> + Send + 'static,
R: Send + Clone + 'static,
S: Fn(BrokerMessage<T>, R) -> SuccessFut + Send + Sync + Clone + 'static,
SuccessFut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
E: Fn(BrokerMessage<T>, BroccoliError) -> ErrorFut + Send + Sync + Clone + 'static,
ErrorFut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
pub async fn process_messages_with_handlers<T, F, MessageFut, SuccessFut, ErrorFut, S, E, R>(
&self,
topic: &str,
concurrency: Option<usize>,
consume_options: Option<ConsumeOptions>,
message_handler: F,
on_success: S,
on_error: E,
) -> Result<(), BroccoliError>where
T: DeserializeOwned + Send + Clone + Serialize + 'static,
F: Fn(BrokerMessage<T>) -> MessageFut + Send + Sync + Clone + 'static,
MessageFut: Future<Output = Result<R, BroccoliError>> + Send + 'static,
R: Send + Clone + 'static,
S: Fn(BrokerMessage<T>, R) -> SuccessFut + Send + Sync + Clone + 'static,
SuccessFut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
E: Fn(BrokerMessage<T>, BroccoliError) -> ErrorFut + Send + Sync + Clone + 'static,
ErrorFut: Future<Output = Result<(), BroccoliError>> + Send + 'static,
Processes messages from the specified topic with the provided handler functions for message processing, success, and error handling.
§Example
use broccoli_queue::queue::BroccoliQueue;
use broccoli_queue::brokers::broker::BrokerMessage;
use broccoli_queue::error::BroccoliError;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct JobPayload {
id: String,
task_name: String,
created_at: chrono::DateTime<chrono::Utc>,
}
#[tokio::main]
async fn main() {
let queue = BroccoliQueue::builder("redis://localhost:6379")
.failed_message_retry_strategy(Default::default())
.pool_connections(5)
.build()
.await
.unwrap();
// Define handlers
async fn process_message(message: BrokerMessage<JobPayload>) -> Result<(), BroccoliError> {
println!("Processing message: {:?}", message);
Ok(())
}
async fn on_success(message: BrokerMessage<JobPayload>, _result: ()) -> Result<(), BroccoliError> {
println!("Successfully processed message: {}", message.task_id);
Ok(())
}
async fn on_error(message: BrokerMessage<JobPayload>, error: BroccoliError) -> Result<(), BroccoliError> {
println!("Failed to process message {}: {:?}", message.task_id, error);
Ok(())
}
// Process messages with 3 concurrent workers
queue.process_messages_with_handlers(
"jobs",
Some(3),
None,
process_message,
on_success,
on_error
).await.unwrap();
}§Arguments
topic- The name of the topic.concurrency- The number of concurrent message handlers.message_handler- The handler function to process messages. This function should return aBroccoliErroron failure.on_success- The handler function to call on successful message processing. This function should return aBroccoliErroron failure.on_error- The handler function to call on message processing failure. This function should return aBroccoliErroron failure.
§Returns
A Result indicating success or failure.
§Errors
If the message fails to process, a BroccoliError will be returned.
Sourcepub async fn queue_status(
&self,
queue_name: String,
disambiguator: Option<String>,
) -> Result<QueueStatus, BroccoliError>
pub async fn queue_status( &self, queue_name: String, disambiguator: Option<String>, ) -> Result<QueueStatus, BroccoliError>
Trait Implementations§
Auto Trait Implementations§
impl Freeze for BroccoliQueue
impl !RefUnwindSafe for BroccoliQueue
impl Send for BroccoliQueue
impl Sync for BroccoliQueue
impl Unpin for BroccoliQueue
impl !UnwindSafe for BroccoliQueue
Blanket Implementations§
Source§impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedExplicit<'a, E> for Twhere
T: 'a,
Source§impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
impl<'a, T, E> AsTaggedImplicit<'a, E> for Twhere
T: 'a,
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more