azure_messaging_eventhubs/consumer/mod.rs
1// Copyright (c) Microsoft Corporation. All Rights reserved
2// Licensed under the MIT license.
3
4#![doc = include_str!("README.md")]
5/// Receive messages from a partition.
6pub(crate) mod event_receiver;
7
8use crate::{
9 common::{recoverable::RecoverableConnection, ManagementInstance},
10 error::Result,
11 models::{ConsumerClientDetails, EventHubPartitionProperties, EventHubProperties},
12 EventHubsError, RetryOptions,
13};
14use azure_core::{credentials::TokenCredential, http::Url, time::Duration, Uuid};
15#[cfg(test)]
16use azure_core_amqp::AmqpError;
17use azure_core_amqp::{
18 message::AmqpSourceFilter, AmqpDescribed, AmqpOrderedMap, AmqpReceiverOptions, AmqpSource,
19 AmqpSymbol, AmqpValue, ReceiverCreditMode,
20};
21pub use event_receiver::EventReceiver;
22use std::{
23 default::Default,
24 fmt::Debug,
25 sync::Arc,
26 time::{SystemTime, UNIX_EPOCH},
27};
28use tracing::{debug, trace};
29
30/// A client that can be used to receive events from an Event Hub.
31pub struct ConsumerClient {
32 recoverable_connection: Arc<RecoverableConnection>,
33 consumer_group: String,
34 eventhub: String,
35 endpoint: Url,
36 // The instance ID to set.
37 instance_id: Option<String>,
38}
39
40// Clippy complains if a method has too many parameters, so we put some of the
41// parameters into a private client options structure.
42struct ConsumerClientOptions {
43 application_id: Option<String>,
44 instance_id: Option<String>,
45 retry_options: Option<RetryOptions>,
46 custom_endpoint: Option<Url>,
47}
48
49impl ConsumerClient {
50 /// Builds a new [`ConsumerClient`] instance with the specified parameters.
51 ///
52 /// This function returns a builder which enables creation of a new [`ConsumerClient`]
53 /// instance with the specified parameters.
54 ///
55 ///
56 /// # Returns
57 ///
58 /// A new [`builders::ConsumerClientBuilder`] instance which can be used to create and open a consumer client.
59 ///
60 /// # Examples
61 ///
62 /// ```no_run
63 /// # #[tokio::main]
64 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
65 /// use azure_messaging_eventhubs::ConsumerClient;
66 /// use azure_identity::DeveloperToolsCredential;
67 ///
68 /// let my_credential = DeveloperToolsCredential::new(None)?;
69 /// let consumer = ConsumerClient::builder()
70 /// .open("my_namespace", "my_eventhub".to_string(), my_credential.clone()).await?;
71 /// # Ok(())}
72 /// ```
73 ///
74 pub fn builder() -> builders::ConsumerClientBuilder {
75 builders::ConsumerClientBuilder::new()
76 }
77
78 fn new(
79 fully_qualified_namespace: &str,
80 eventhub_name: String,
81 consumer_group: Option<String>,
82 credential: Arc<dyn TokenCredential>,
83 options: ConsumerClientOptions,
84 ) -> Result<Self> {
85 let consumer_group = consumer_group.unwrap_or("$Default".into());
86 let url = format!(
87 "amqps://{}/{}/ConsumerGroups/{}",
88 fully_qualified_namespace, eventhub_name, consumer_group
89 );
90 let url = Url::parse(&url).map_err(azure_core::Error::from)?;
91
92 trace!("Creating consumer client for {url}.");
93 let retry_options = options.retry_options.unwrap_or_default();
94 Ok(Self {
95 instance_id: options.instance_id,
96 recoverable_connection: RecoverableConnection::new(
97 url.clone(),
98 options.application_id,
99 options.custom_endpoint,
100 credential,
101 retry_options,
102 ),
103 eventhub: eventhub_name,
104 endpoint: url,
105 consumer_group,
106 })
107 }
108
109 /// Closes the connection to the Event Hub.
110 ///
111 /// This method closes the connection to the Event Hubs instance associated with the [`ConsumerClient`].
112 /// It returns a [`Result`] indicating whether the operation was successful or not.
113 ///
114 /// Note that closing a consumer will cancel all outstanding receive requests.
115 ///
116 /// # Returns
117 ///
118 /// A [`Result`] indicating whether the operation was successful or not.
119 ///
120 /// # Examples
121 ///
122 /// ``` no_run
123 /// use azure_messaging_eventhubs::ConsumerClient;
124 /// use azure_identity::DeveloperToolsCredential;
125 ///
126 /// #[tokio::main]
127 /// async fn main() {
128 /// let my_credential = DeveloperToolsCredential::new(None).unwrap();
129 /// let consumer = ConsumerClient::builder()
130 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
131 ///
132 /// let result = consumer.close().await;
133 ///
134 /// match result {
135 /// Ok(_) => {
136 /// // Connection closed successfully
137 /// println!("Connection closed successfully");
138 /// }
139 /// Err(err) => {
140 /// // Handle the error
141 /// eprintln!("Error closing connection: {:?}", err);
142 /// }
143 /// }
144 /// }
145 /// ```
146 pub async fn close(self) -> Result<()> {
147 trace!("Closing consumer client for {}.", self.endpoint);
148 let recoverable_connection =
149 Arc::try_unwrap(self.recoverable_connection).map_err(|_| {
150 EventHubsError::with_message(
151 "Could not close consumer recoverable connection, multiple references exist",
152 )
153 })?;
154 trace!(
155 "No references to connection, closing connection for {}.",
156 self.endpoint
157 );
158 recoverable_connection.close_connection().await?;
159 Ok(())
160 }
161
162 /// Forces an error on the connection.
163 #[cfg(test)]
164 pub fn force_error(&self, error: AmqpError) -> Result<()> {
165 self.recoverable_connection.force_error(error)
166 }
167
168 /// Retrieves the details of the consumer client.
169 ///
170 /// This function retrieves the details of the consumer client associated with the [`ConsumerClient`].
171 pub(crate) fn get_details(&self) -> Result<ConsumerClientDetails> {
172 Ok(ConsumerClientDetails {
173 eventhub_name: self.eventhub.clone(),
174 consumer_group: self.consumer_group.clone(),
175 fully_qualified_namespace: self
176 .endpoint
177 .host()
178 .ok_or_else(|| {
179 EventHubsError::with_message("Could not find host in consumer client")
180 })?
181 .to_string(),
182 client_id: self.recoverable_connection.get_connection_id().to_string(),
183 })
184 }
185
186 /// Attaches a message receiver to a specific partition of the Event Hub.
187 ///
188 /// This function establishes a connection to the specified partition of the Event Hubs instance and returns a MessageReceiver which can be used to receive messages from it.
189 ///
190 /// # Arguments
191 ///
192 /// * `partition_id` - The ID of the partition to receive events from.
193 /// * `options` - Optional [`OpenReceiverOptions`] to configure the behavior of the receiver.
194 ///
195 /// # Returns
196 ///
197 /// A MessageReceiver which can be used to receive messages from the partition.
198 ///
199 /// Note that by default, a message receiver will receive events starting from the latest event in the partition (in
200 /// other words, it will receive new events only). To receive events from another location within the partition you can
201 /// specify a different starting position using the `options` parameter.
202 ///
203 /// # Examples
204 ///
205 /// ```no_run
206 /// use azure_messaging_eventhubs::ConsumerClient;
207 /// use azure_identity::DeveloperToolsCredential;
208 /// use futures::stream::StreamExt;
209 ///
210 /// #[tokio::main]
211 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
212 /// let my_credential = DeveloperToolsCredential::new(None)?;
213 /// let consumer = ConsumerClient::builder()
214 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
215 /// let partition_id = "0".to_string();
216 ///
217 /// let receiver = consumer.open_receiver_on_partition(partition_id, None).await?;
218 ///
219 /// let mut event_stream = receiver.stream_events();
220 ///
221 /// while let Some(event_result) = event_stream.next().await {
222 /// match event_result {
223 /// Ok(event) => {
224 /// // Process the received event
225 /// println!("Received event: {:?}", event);
226 /// }
227 /// Err(err) => {
228 /// // Handle the error
229 /// eprintln!("Error receiving event: {:?}", err);
230 /// }
231 /// }
232 /// }
233 /// Ok(())
234 /// }
235 /// ```
236 pub async fn open_receiver_on_partition(
237 &self,
238 partition_id: String,
239 options: Option<OpenReceiverOptions>,
240 ) -> Result<EventReceiver> {
241 let options = options.unwrap_or_default();
242
243 let receiver_name = self
244 .instance_id
245 .clone()
246 .unwrap_or_else(|| Uuid::new_v4().to_string());
247 let start_expression = StartPosition::start_expression(&options.start_position);
248
249 trace!(
250 "Opening receiver on url {} partition {partition_id}.",
251 self.endpoint
252 );
253
254 let source_url = format!("{}/Partitions/{}", self.endpoint, partition_id);
255 let source_url = Url::parse(&source_url).map_err(azure_core::Error::from)?;
256
257 let message_source = AmqpSource::builder()
258 .with_address(source_url.to_string())
259 .add_to_filter(
260 AmqpSourceFilter::selector_filter().description().into(),
261 Box::new(AmqpDescribed::new(
262 AmqpSourceFilter::selector_filter().code(),
263 start_expression,
264 )),
265 )
266 .build();
267 let mut receiver_properties: AmqpOrderedMap<AmqpSymbol, AmqpValue> =
268 vec![("com.microsoft.com:receiver-name", receiver_name.clone())]
269 .into_iter()
270 .map(|(k, v)| (AmqpSymbol::from(k), AmqpValue::from(v)))
271 .collect();
272
273 if let Some(owner_level) = options.owner_level {
274 receiver_properties.insert("com.microsoft:epoch".into(), AmqpValue::from(owner_level));
275 }
276
277 let receiver_options = AmqpReceiverOptions {
278 name: Some(receiver_name),
279 properties: Some(receiver_properties),
280 credit_mode: Some(ReceiverCreditMode::Auto(options.prefetch.unwrap_or(300))),
281 auto_accept: true,
282 ..Default::default()
283 };
284
285 debug!("Receiver attached on partition {partition_id}.");
286 Ok(EventReceiver::new(
287 self.recoverable_connection.clone(),
288 receiver_options,
289 message_source,
290 source_url,
291 partition_id,
292 options.receive_timeout,
293 ))
294 }
295
296 /// Retrieves the properties of the Event Hub.
297 ///
298 /// This function retrieves the properties of the Event Hub associated with the [`ConsumerClient`].
299 /// It returns a [`Result`] containing the [`EventHubProperties`] if the operation is successful.
300 ///
301 /// # Returns
302 ///
303 /// A [`Result`] containing the [`EventHubProperties`] if the operation is successful.
304 ///
305 /// # Examples
306 ///
307 /// ``` no_run
308 /// use azure_messaging_eventhubs::ConsumerClient;
309 /// use azure_identity::DeveloperToolsCredential;
310 ///
311 /// #[tokio::main]
312 /// async fn main(){
313 /// let my_credential = DeveloperToolsCredential::new(None).unwrap();
314 /// let consumer = ConsumerClient::builder()
315 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
316 ///
317 /// let eventhub_properties = consumer.get_eventhub_properties().await;
318 ///
319 /// match eventhub_properties {
320 /// Ok(properties) => {
321 /// // Process the Event Hub instance properties
322 /// println!("Event Hub properties: {:?}", properties);
323 /// }
324 /// Err(err) => {
325 /// // Handle the error
326 /// eprintln!("Error retrieving Event Hubs properties: {:?}", err);
327 /// }
328 /// }
329 /// }
330 /// ```
331 pub async fn get_eventhub_properties(&self) -> Result<EventHubProperties> {
332 self.get_management_instance()
333 .await?
334 .get_eventhub_properties(&self.eventhub)
335 .await
336 }
337
338 /// Retrieves the properties of a specific partition in the Event Hub.
339 ///
340 /// This function retrieves the properties of the specified partition in the Event Hub.
341 /// It returns a [`Result`] containing the [`EventHubPartitionProperties`] if the operation is successful.
342 ///
343 /// # Arguments
344 ///
345 /// * `partition_id` - The ID of the partition to retrieve properties for.
346 ///
347 /// # Returns
348 ///
349 /// A [`Result`] containing the [`EventHubPartitionProperties`] if the operation is successful.
350 ///
351 /// # Examples
352 ///
353 /// ``` no_run
354 /// use azure_messaging_eventhubs::ConsumerClient;
355 /// use azure_identity::DeveloperToolsCredential;
356 ///
357 /// #[tokio::main]
358 /// async fn main() {
359 /// let my_credential = DeveloperToolsCredential::new(None).unwrap();
360 /// let consumer = ConsumerClient::builder()
361 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
362 /// let partition_id = "0";
363 ///
364 /// let partition_properties = consumer.get_partition_properties(partition_id).await;
365 ///
366 /// match partition_properties {
367 /// Ok(properties) => {
368 /// // Process the partition properties
369 /// println!("Partition properties: {:?}", properties);
370 /// }
371 /// Err(err) => {
372 /// // Handle the error
373 /// eprintln!("Error retrieving partition properties: {:?}", err);
374 /// }
375 /// }
376 /// }
377 /// ```
378 pub async fn get_partition_properties(
379 &self,
380 partition_id: &str,
381 ) -> Result<EventHubPartitionProperties> {
382 self.get_management_instance()
383 .await?
384 .get_eventhub_partition_properties(&self.eventhub, partition_id)
385 .await
386 }
387
388 async fn get_management_instance(&self) -> Result<Arc<ManagementInstance>> {
389 Ok(ManagementInstance::new(self.recoverable_connection.clone()))
390 }
391
392 async fn ensure_connection(&self) -> azure_core_amqp::Result<()> {
393 self.recoverable_connection.ensure_connection().await?;
394 Ok(())
395 }
396}
397
398/// Represents the options for receiving events from an Event Hub.
399#[derive(Debug, Clone, Default)]
400pub struct OpenReceiverOptions {
401 /// The owner level for messages being retrieved.
402 pub owner_level: Option<i64>,
403 /// The prefetch count for messages being retrieved.
404 pub prefetch: Option<u32>,
405 /// The starting position for messages being retrieved.
406 pub start_position: Option<StartPosition>,
407
408 /// Optional timeout for receiving messages. If not provided, the default timeout is infinite.
409 ///
410 /// Note: This is the timeout for individual messages, not the entire receive operation.
411 /// As long as there are messages available, then they will be included in the stream events regardless of the timeout.
412 pub receive_timeout: Option<Duration>,
413}
414/// Represents the options for receiving events from an Event Hub.
415impl OpenReceiverOptions {}
416
417/// Represents the starting position of a consumer when receiving events from an Event Hub.
418#[derive(Debug, Default, PartialEq, Clone)]
419pub enum StartLocation {
420 /// The starting position is specified by an offset.
421 Offset(String),
422 /// The starting position is specified by a sequence number.
423 SequenceNumber(i64),
424 /// The starting position is specified by an enqueued time.
425 EnqueuedTime(SystemTime),
426 /// The starting position is the earliest event in the partition.
427 Earliest,
428 #[default]
429 /// The starting position is the latest event in the partition.
430 Latest,
431}
432
433pub(crate) const ENQUEUED_TIME_ANNOTATION: &str = "amqp.annotation.x-opt-enqueued-time";
434pub(crate) const OFFSET_ANNOTATION: &str = "amqp.annotation.x-opt-offset";
435pub(crate) const SEQUENCE_NUMBER_ANNOTATION: &str = "amqp.annotation.x-opt-sequence-number";
436
437/// Represents the starting position of a consumer when receiving events from an Event Hub.
438///
439/// This enum provides different ways to specify the starting position of a consumer when receiving events from an Event Hub.
440/// The starting position can be specified using an offset, a sequence number, an enqueued time, or the earliest or latest event in the partition.
441///
442/// The default starting position is the latest event in the partition (always receive new events).
443///
444/// # Examples
445///
446/// Basic usage:
447///
448/// ```
449/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
450///
451/// let start_position = StartPosition{
452/// location: StartLocation::SequenceNumber(12345),
453/// ..Default::default()};;
454/// ```
455///
456/// ```
457/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
458///
459/// let start_position = StartPosition{
460/// location: StartLocation::EnqueuedTime(std::time::SystemTime::now()),
461/// ..Default::default()
462/// };
463/// ```
464///
465/// ```
466/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
467///
468/// let start_position = StartPosition{
469/// location: StartLocation::Offset("12345".to_string()),
470/// ..Default::default()
471/// };
472/// ```
473///
474/// ```
475/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
476///
477/// let start_position = StartPosition{
478/// location: StartLocation::Earliest,
479/// ..Default::default()
480/// };
481/// ```
482///
483/// ```
484/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
485///
486/// let start_position = StartPosition{
487/// location: StartLocation::Latest,
488/// ..Default::default()
489/// };
490/// ```
491///
492/// ```
493/// use azure_messaging_eventhubs::StartPosition;
494///
495/// let start_position = StartPosition::default();
496/// ```
497///
498#[derive(Debug, PartialEq, Clone, Default)]
499pub struct StartPosition {
500 /// The location of the starting position.
501 pub location: StartLocation,
502
503 /// Whether the starting position is inclusive (includes the event at StartLocation).
504 pub inclusive: bool,
505}
506
507impl StartPosition {
508 pub(crate) fn start_expression(position: &Option<StartPosition>) -> String {
509 if let Some(position) = position {
510 let mut greater_than: &str = ">";
511 if position.inclusive {
512 greater_than = ">=";
513 }
514 match &position.location {
515 StartLocation::Offset(offset) => {
516 format!("{} {}'{}'", OFFSET_ANNOTATION, greater_than, offset)
517 }
518 StartLocation::SequenceNumber(sequence_number) => {
519 format!(
520 "{} {}'{}'",
521 SEQUENCE_NUMBER_ANNOTATION, greater_than, sequence_number
522 )
523 }
524 StartLocation::EnqueuedTime(enqueued_time) => {
525 let enqueued_time = enqueued_time
526 .duration_since(UNIX_EPOCH)
527 .expect("Time went backwards")
528 .as_millis();
529 format!(
530 "{} {}'{}'",
531 ENQUEUED_TIME_ANNOTATION, greater_than, enqueued_time
532 )
533 }
534 StartLocation::Earliest => "amqp.annotation.x-opt-offset > '-1'".to_string(),
535 StartLocation::Latest => "amqp.annotation.x-opt-offset > '@latest'".to_string(),
536 }
537 } else {
538 "amqp.annotation.x-opt-offset > '@latest'".to_string()
539 }
540 }
541}
542
543pub mod builders {
544 use super::*;
545 use crate::Result;
546 use std::sync::Arc;
547
548 /// A builder for creating a [`ConsumerClient`].
549 ///
550 /// This builder is used to create a new [`ConsumerClient`] with the specified parameters.
551 ///
552 /// # Examples
553 ///
554 /// ```no_run
555 /// use azure_messaging_eventhubs::ConsumerClient;
556 /// use azure_identity::DeveloperToolsCredential;
557 ///
558 /// #[tokio::main]
559 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
560 /// let my_credential = DeveloperToolsCredential::new(None).unwrap();
561 /// let consumer = ConsumerClient::builder()
562 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
563 /// Ok(())
564 /// }
565 /// ```
566 #[derive(Default)]
567 pub struct ConsumerClientBuilder {
568 consumer_group: Option<String>,
569 application_id: Option<String>,
570 instance_id: Option<String>,
571 retry_options: Option<RetryOptions>,
572 custom_endpoint: Option<String>,
573 }
574
575 impl ConsumerClientBuilder {
576 pub(super) fn new() -> Self {
577 Self {
578 ..Default::default()
579 }
580 }
581
582 /// Specifies the name of the application creating the [`ConsumerClient`].
583 pub fn with_application_id(mut self, application_id: String) -> Self {
584 self.application_id = Some(application_id);
585 self
586 }
587
588 /// Specifies the consumer group for the [`ConsumerClient`].
589 ///
590 /// If not specified, the default consumer group will be used.
591 ///
592 /// For more information on Event Hubs consumer groups, see
593 /// [Consumer groups](https://learn.microsoft.com/azure/event-hubs/event-hubs-features#consumer-groups).
594 ///
595 /// # Examples
596 ///
597 /// ```no_run
598 /// use azure_messaging_eventhubs::ConsumerClient;
599 /// use azure_identity::DeveloperToolsCredential;
600 ///
601 /// #[tokio::main]
602 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
603 /// let my_credential = DeveloperToolsCredential::new(None)?;
604 /// let consumer = ConsumerClient::builder()
605 /// .with_consumer_group("my_consumer_group".to_string())
606 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
607 /// Ok(())
608 /// }
609 ///
610 /// ```
611 ///
612 pub fn with_consumer_group(mut self, consumer_group: String) -> Self {
613 self.consumer_group = Some(consumer_group);
614 self
615 }
616
617 /// Specifies an instance ID for this instance of a [`ConsumerClient`].
618 pub fn with_instance_id(mut self, instance_id: String) -> Self {
619 self.instance_id = Some(instance_id);
620 self
621 }
622
623 /// Specifies the retry options for the [`ConsumerClient`].
624 pub fn with_retry_options(mut self, retry_options: RetryOptions) -> Self {
625 self.retry_options = Some(retry_options);
626 self
627 }
628
629 /// Sets a custom endpoint for the Event Hub.
630 ///
631 /// # Arguments
632 /// * `endpoint` - The custom endpoint for the Event Hub.
633 ///
634 /// # Returns
635 /// The updated [`ConsumerClientBuilder`].
636 ///
637 /// Note: The custom endpoint option allows a customer to specify an AMQP proxy
638 /// which will be used to forward requests to the actual Event Hub instance.
639 ///
640 pub fn with_custom_endpoint(mut self, endpoint: String) -> Self {
641 self.custom_endpoint = Some(endpoint);
642 self
643 }
644
645 /// Opens a connection to the Event Hub.
646 ///
647 /// This method establishes a connection to the Event Hubs instance associated
648 /// with the [`ConsumerClientBuilder`]. It returns a `Result` indicating whether the
649 /// operation was successful or not.
650 ///
651 /// # Returns
652 ///
653 /// A `Result` indicating whether the operation was successful or not.
654 ///
655 /// # Examples
656 ///
657 /// ```
658 /// use azure_messaging_eventhubs::ConsumerClient;
659 /// use azure_identity::DeveloperToolsCredential;
660 ///
661 /// #[tokio::main]
662 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
663 /// let my_credential = DeveloperToolsCredential::new(None).unwrap();
664 /// let result = ConsumerClient::builder()
665 /// .open("my_namespace", "my_eventhub".to_string(), my_credential).await;
666 ///
667 /// match result {
668 /// Ok(_connection) => {
669 /// // Connection opened successfully
670 /// println!("Connection opened successfully");
671 /// }
672 /// Err(err) => {
673 /// // Handle the error
674 /// eprintln!("Error opening connection: {:?}", err);
675 /// }
676 /// }
677 /// Ok(())
678 /// }
679 /// ```
680 pub async fn open(
681 self,
682 fully_qualified_namespace: &str,
683 eventhub_name: String,
684 credential: Arc<dyn azure_core::credentials::TokenCredential>,
685 ) -> Result<super::ConsumerClient> {
686 let custom_endpoint = match self.custom_endpoint {
687 Some(endpoint) => Some(Url::parse(&endpoint).map_err(azure_core::Error::from)?),
688 None => None,
689 };
690 trace!("Opening consumer client on {fully_qualified_namespace}.");
691 let consumer = super::ConsumerClient::new(
692 fully_qualified_namespace,
693 eventhub_name,
694 self.consumer_group,
695 credential,
696 ConsumerClientOptions {
697 application_id: self.application_id,
698 instance_id: self.instance_id,
699 retry_options: self.retry_options,
700 custom_endpoint,
701 },
702 )?;
703 consumer.ensure_connection().await?;
704 Ok(consumer)
705 }
706 }
707}
708
709#[cfg(test)]
710pub(crate) mod tests {
711 use crate::{
712 common::tests::force_errors, ConsumerClient, Result, StartLocation, StartPosition,
713 };
714 use azure_core::time::Duration;
715 use azure_core_amqp::error::AmqpErrorKind;
716 use azure_core_test::{recorded, TestContext};
717 use std::{
718 sync::Arc,
719 time::{SystemTime, UNIX_EPOCH},
720 };
721 use tracing::info;
722
723 // static INIT_LOGGING: std::sync::Once = std::sync::Once::new();
724
725 // #[test]
726 // pub(crate) fn setup() {
727 // INIT_LOGGING.call_once(|| {
728 // println!("Setting up test logger...");
729
730 // use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
731 // tracing_subscriber::fmt()
732 // .with_env_filter(EnvFilter::from_default_env())
733 // .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
734 // .with_ansi(std::env::var("NO_COLOR").map_or(true, |v| v.is_empty()))
735 // .with_writer(std::io::stderr)
736 // .init();
737 // });
738 // }
739
740 #[recorded::test]
741 async fn test_start_position_builder_with_sequence_number(_ctx: TestContext) -> Result<()> {
742 let sequence_number = 12345i64;
743 let start_position = StartPosition {
744 location: StartLocation::SequenceNumber(sequence_number),
745 ..Default::default()
746 };
747 assert_eq!(
748 start_position.location,
749 StartLocation::SequenceNumber(sequence_number)
750 );
751 assert_eq!(
752 StartPosition::start_expression(&Some(start_position)),
753 "amqp.annotation.x-opt-sequence-number >'12345'"
754 );
755
756 let start_position = StartPosition {
757 location: StartLocation::SequenceNumber(sequence_number),
758 inclusive: true,
759 };
760 assert_eq!(
761 StartPosition::start_expression(&Some(start_position)),
762 "amqp.annotation.x-opt-sequence-number >='12345'"
763 );
764 Ok(())
765 }
766
767 #[recorded::test]
768 async fn test_start_position_builder_with_enqueued_time(_ctx: TestContext) -> Result<()> {
769 let enqueued_time = SystemTime::now();
770 let start_position = StartPosition {
771 location: StartLocation::EnqueuedTime(enqueued_time),
772 ..Default::default()
773 };
774 info!("enqueued_time: {:?}", enqueued_time);
775 info!(
776 "enqueued_time: {:?}",
777 enqueued_time.duration_since(UNIX_EPOCH)
778 );
779 info!(
780 "enqueued_time: {:?}",
781 enqueued_time
782 .duration_since(UNIX_EPOCH)
783 .unwrap()
784 .as_millis()
785 );
786 assert_eq!(
787 start_position.location,
788 StartLocation::EnqueuedTime(enqueued_time)
789 );
790 assert!(!start_position.inclusive);
791 assert_eq!(
792 StartPosition::start_expression(&Some(start_position)),
793 format!(
794 "amqp.annotation.x-opt-enqueued-time >'{}'",
795 enqueued_time
796 .duration_since(UNIX_EPOCH)
797 .unwrap()
798 .as_millis()
799 )
800 );
801
802 let start_position = StartPosition {
803 location: StartLocation::EnqueuedTime(enqueued_time),
804 inclusive: true,
805 };
806 assert_eq!(
807 StartPosition::start_expression(&Some(start_position)),
808 format!(
809 "amqp.annotation.x-opt-enqueued-time >='{}'",
810 enqueued_time
811 .duration_since(std::time::UNIX_EPOCH)
812 .unwrap()
813 .as_millis()
814 )
815 );
816 Ok(())
817 }
818
819 #[recorded::test]
820 async fn test_start_position_builder_with_offset(_ctx: TestContext) -> Result<()> {
821 let offset = "12345".to_string();
822 let start_position = StartPosition {
823 location: StartLocation::Offset(offset.clone()),
824 ..Default::default()
825 };
826 assert_eq!(
827 start_position.location,
828 StartLocation::Offset(offset.clone())
829 );
830 assert_eq!(
831 "amqp.annotation.x-opt-offset >'12345'",
832 StartPosition::start_expression(&Some(start_position)),
833 );
834
835 let start_position = StartPosition {
836 location: StartLocation::Offset(offset.clone()),
837 inclusive: true,
838 };
839 assert_eq!(
840 "amqp.annotation.x-opt-offset >='12345'",
841 StartPosition::start_expression(&Some(start_position)),
842 );
843 Ok(())
844 }
845
846 #[recorded::test]
847 async fn test_start_position_builder_inclusive(_ctx: TestContext) -> Result<()> {
848 let start_position = StartPosition {
849 inclusive: true,
850 ..Default::default()
851 };
852 assert!(start_position.inclusive);
853 let start_position = StartPosition::default();
854 assert!(!start_position.inclusive);
855 Ok(())
856 }
857
858 #[recorded::test(live)]
859 async fn force_errors_consumer_properties_link(ctx: TestContext) -> Result<()> {
860 const TEST_NAME: &str = "force_errors_consumer_properties_link";
861 let recording = ctx.recording();
862 let host = recording.var("EVENTHUBS_HOST", None);
863 let eventhub = recording.var("EVENTHUB_NAME", None);
864 let credential = recording.credential();
865 let consumer = Arc::new(
866 ConsumerClient::builder()
867 .with_application_id(TEST_NAME.to_string())
868 .open(host.as_str(), eventhub, credential.clone())
869 .await?,
870 );
871
872 force_errors(
873 consumer.clone(),
874 |consumer: Arc<ConsumerClient>| {
875 let consumer = consumer.clone();
876 async move {
877 loop {
878 consumer.get_eventhub_properties().await.unwrap();
879 }
880 }
881 },
882 |consumer| {
883 consumer
884 .force_error(azure_core_amqp::AmqpError::from(
885 AmqpErrorKind::LinkClosedByRemote(Box::new(azure_core::error::Error::new(
886 azure_core::error::ErrorKind::Other,
887 "Forced error",
888 ))),
889 ))
890 .unwrap();
891 },
892 Duration::seconds(10), // Seconds until forcing the error.
893 Duration::seconds(20), // Seconds until test timeout.
894 )
895 .await?;
896
897 if let Ok(consumer) = Arc::try_unwrap(consumer) {
898 consumer.close().await?;
899 } else {
900 panic!("Consumer client has unresolved references.");
901 }
902
903 Ok(())
904 }
905
906 #[recorded::test(live)]
907 async fn force_errors_consumer_properties_session(ctx: TestContext) -> Result<()> {
908 const TEST_NAME: &str = "force_errors_consumer_properties_session";
909 let recording = ctx.recording();
910 let host = recording.var("EVENTHUBS_HOST", None);
911 let eventhub = recording.var("EVENTHUB_NAME", None);
912 let credential = recording.credential();
913 let consumer = Arc::new(
914 ConsumerClient::builder()
915 .with_application_id(TEST_NAME.to_string())
916 .open(host.as_str(), eventhub, credential.clone())
917 .await?,
918 );
919
920 force_errors(
921 consumer.clone(),
922 |consumer: Arc<ConsumerClient>| {
923 let consumer = consumer.clone();
924 async move {
925 loop {
926 consumer.get_eventhub_properties().await.unwrap();
927 }
928 }
929 },
930 |consumer| {
931 consumer
932 .force_error(azure_core_amqp::AmqpError::from(
933 AmqpErrorKind::SessionClosedByRemote(Box::new(
934 azure_core::error::Error::new(
935 azure_core::error::ErrorKind::Other,
936 "Forced error",
937 ),
938 )),
939 ))
940 .unwrap();
941 },
942 Duration::seconds(10), // Seconds until forcing the error.
943 Duration::seconds(20), // Seconds until test timeout.
944 )
945 .await?;
946
947 if let Ok(consumer) = Arc::try_unwrap(consumer) {
948 consumer.close().await?;
949 } else {
950 panic!("Consumer client has unresolved references.");
951 }
952
953 Ok(())
954 }
955 #[recorded::test(live)]
956 async fn force_errors_consumer_properties_connection(ctx: TestContext) -> Result<()> {
957 const TEST_NAME: &str = "force_errors_consumer_properties_connection";
958 let recording = ctx.recording();
959 let host = recording.var("EVENTHUBS_HOST", None);
960 let eventhub = recording.var("EVENTHUB_NAME", None);
961 let credential = recording.credential();
962 let consumer = Arc::new(
963 ConsumerClient::builder()
964 .with_application_id(TEST_NAME.to_string())
965 .open(host.as_str(), eventhub, credential.clone())
966 .await?,
967 );
968
969 force_errors(
970 consumer.clone(),
971 |consumer: Arc<ConsumerClient>| {
972 let consumer = consumer.clone();
973 async move {
974 loop {
975 consumer.get_eventhub_properties().await.unwrap();
976 }
977 }
978 },
979 |consumer| {
980 consumer
981 .force_error(azure_core_amqp::AmqpError::from(
982 AmqpErrorKind::ConnectionClosedByRemote(Box::new(
983 azure_core::error::Error::new(
984 azure_core::error::ErrorKind::Other,
985 "Forced error",
986 ),
987 )),
988 ))
989 .unwrap();
990 },
991 Duration::seconds(10), // Seconds until forcing the error.
992 Duration::seconds(20), // Seconds until test timeout.
993 )
994 .await?;
995
996 Ok(())
997 }
998}