Skip to main content

azure_messaging_eventhubs/consumer/
mod.rs

1// Copyright (c) Microsoft Corporation. All Rights reserved
2// Licensed under the MIT license.
3
4#![doc = include_str!("README.md")]
5/// Receive messages from a partition.
6pub(crate) mod event_receiver;
7
8use crate::{
9    common::{recoverable::RecoverableConnection, ManagementInstance},
10    error::Result,
11    models::{ConsumerClientDetails, EventHubPartitionProperties, EventHubProperties},
12    EventHubsError, RetryOptions,
13};
14use azure_core::{credentials::TokenCredential, http::Url, time::Duration, Uuid};
15#[cfg(test)]
16use azure_core_amqp::AmqpError;
17use azure_core_amqp::{
18    message::AmqpSourceFilter, AmqpDescribed, AmqpOrderedMap, AmqpReceiverOptions, AmqpSource,
19    AmqpSymbol, AmqpValue, ReceiverCreditMode,
20};
21pub use event_receiver::EventReceiver;
22use std::{
23    default::Default,
24    fmt::Debug,
25    sync::Arc,
26    time::{SystemTime, UNIX_EPOCH},
27};
28use tracing::{debug, trace};
29
30/// A client that can be used to receive events from an Event Hub.
31pub struct ConsumerClient {
32    recoverable_connection: Arc<RecoverableConnection>,
33    consumer_group: String,
34    eventhub: String,
35    endpoint: Url,
36    // The instance ID to set.
37    instance_id: Option<String>,
38}
39
40// Clippy complains if a method has too many parameters, so we put some of the
41// parameters into a private client options structure.
42struct ConsumerClientOptions {
43    application_id: Option<String>,
44    instance_id: Option<String>,
45    retry_options: Option<RetryOptions>,
46    custom_endpoint: Option<Url>,
47}
48
49impl ConsumerClient {
50    /// Builds a new [`ConsumerClient`] instance with the specified parameters.
51    ///
52    /// This function returns a builder which enables creation of a new [`ConsumerClient`]
53    /// instance with the specified parameters.
54    ///
55    ///
56    /// # Returns
57    ///
58    /// A new [`builders::ConsumerClientBuilder`] instance which can be used to create and open a consumer client.
59    ///
60    /// # Examples
61    ///
62    /// ```no_run
63    /// # #[tokio::main]
64    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
65    /// use azure_messaging_eventhubs::ConsumerClient;
66    /// use azure_identity::DeveloperToolsCredential;
67    ///
68    ///     let my_credential = DeveloperToolsCredential::new(None)?;
69    /// let consumer = ConsumerClient::builder()
70    ///    .open("my_namespace", "my_eventhub".to_string(), my_credential.clone()).await?;
71    /// # Ok(())}
72    /// ```
73    ///
74    pub fn builder() -> builders::ConsumerClientBuilder {
75        builders::ConsumerClientBuilder::new()
76    }
77
78    fn new(
79        fully_qualified_namespace: &str,
80        eventhub_name: String,
81        consumer_group: Option<String>,
82        credential: Arc<dyn TokenCredential>,
83        options: ConsumerClientOptions,
84    ) -> Result<Self> {
85        let consumer_group = consumer_group.unwrap_or("$Default".into());
86        let url = format!(
87            "amqps://{}/{}/ConsumerGroups/{}",
88            fully_qualified_namespace, eventhub_name, consumer_group
89        );
90        let url = Url::parse(&url).map_err(azure_core::Error::from)?;
91
92        trace!("Creating consumer client for {url}.");
93        let retry_options = options.retry_options.unwrap_or_default();
94        Ok(Self {
95            instance_id: options.instance_id,
96            recoverable_connection: RecoverableConnection::new(
97                url.clone(),
98                options.application_id,
99                options.custom_endpoint,
100                credential,
101                retry_options,
102            ),
103            eventhub: eventhub_name,
104            endpoint: url,
105            consumer_group,
106        })
107    }
108
109    /// Closes the connection to the Event Hub.
110    ///
111    /// This method closes the connection to the Event Hubs instance associated with the [`ConsumerClient`].
112    /// It returns a [`Result`] indicating whether the operation was successful or not.
113    ///
114    /// Note that closing a consumer will cancel all outstanding receive requests.
115    ///
116    /// # Returns
117    ///
118    /// A [`Result`] indicating whether the operation was successful or not.
119    ///
120    /// # Examples
121    ///
122    /// ``` no_run
123    /// use azure_messaging_eventhubs::ConsumerClient;
124    /// use azure_identity::DeveloperToolsCredential;
125    ///
126    /// #[tokio::main]
127    /// async fn main() {
128    ///     let my_credential = DeveloperToolsCredential::new(None).unwrap();
129    ///     let consumer = ConsumerClient::builder()
130    ///         .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
131    ///
132    ///     let result = consumer.close().await;
133    ///
134    ///     match result {
135    ///         Ok(_) => {
136    ///             // Connection closed successfully
137    ///             println!("Connection closed successfully");
138    ///         }
139    ///         Err(err) => {
140    ///             // Handle the error
141    ///             eprintln!("Error closing connection: {:?}", err);
142    ///         }
143    ///     }
144    /// }
145    /// ```
146    pub async fn close(self) -> Result<()> {
147        trace!("Closing consumer client for {}.", self.endpoint);
148        let recoverable_connection =
149            Arc::try_unwrap(self.recoverable_connection).map_err(|_| {
150                EventHubsError::with_message(
151                    "Could not close consumer recoverable connection, multiple references exist",
152                )
153            })?;
154        trace!(
155            "No references to connection, closing connection for {}.",
156            self.endpoint
157        );
158        recoverable_connection.close_connection().await?;
159        Ok(())
160    }
161
162    /// Forces an error on the connection.
163    #[cfg(test)]
164    pub fn force_error(&self, error: AmqpError) -> Result<()> {
165        self.recoverable_connection.force_error(error)
166    }
167
168    /// Retrieves the details of the consumer client.
169    ///
170    /// This function retrieves the details of the consumer client associated with the [`ConsumerClient`].
171    pub(crate) fn get_details(&self) -> Result<ConsumerClientDetails> {
172        Ok(ConsumerClientDetails {
173            eventhub_name: self.eventhub.clone(),
174            consumer_group: self.consumer_group.clone(),
175            fully_qualified_namespace: self
176                .endpoint
177                .host()
178                .ok_or_else(|| {
179                    EventHubsError::with_message("Could not find host in consumer client")
180                })?
181                .to_string(),
182            client_id: self.recoverable_connection.get_connection_id().to_string(),
183        })
184    }
185
186    /// Attaches a message receiver to a specific partition of the Event Hub.
187    ///
188    /// This function establishes a connection to the specified partition of the Event Hubs instance and returns a MessageReceiver which can be used to receive messages from it.
189    ///
190    /// # Arguments
191    ///
192    /// * `partition_id` - The ID of the partition to receive events from.
193    /// * `options` - Optional [`OpenReceiverOptions`] to configure the behavior of the receiver.
194    ///
195    /// # Returns
196    ///
197    /// A MessageReceiver which can be used to receive messages from the partition.
198    ///
199    /// Note that by default, a message receiver will receive events starting from the latest event in the partition (in
200    /// other words, it will receive new events only). To receive events from another location within the partition you can
201    /// specify a different starting position using the `options` parameter.
202    ///
203    /// # Examples
204    ///
205    /// ```no_run
206    /// use azure_messaging_eventhubs::ConsumerClient;
207    /// use azure_identity::DeveloperToolsCredential;
208    /// use futures::stream::StreamExt;
209    ///
210    /// #[tokio::main]
211    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
212    ///     let my_credential = DeveloperToolsCredential::new(None)?;
213    ///     let consumer = ConsumerClient::builder()
214    ///        .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
215    ///     let partition_id = "0".to_string();
216    ///
217    ///     let receiver  = consumer.open_receiver_on_partition(partition_id, None).await?;
218    ///
219    ///     let mut event_stream = receiver.stream_events();
220    ///
221    ///     while let Some(event_result) = event_stream.next().await {
222    ///         match event_result {
223    ///             Ok(event) => {
224    ///                 // Process the received event
225    ///                 println!("Received event: {:?}", event);
226    ///             }
227    ///             Err(err) => {
228    ///                 // Handle the error
229    ///                 eprintln!("Error receiving event: {:?}", err);
230    ///             }
231    ///         }
232    ///     }
233    ///     Ok(())
234    /// }
235    /// ```
236    pub async fn open_receiver_on_partition(
237        &self,
238        partition_id: String,
239        options: Option<OpenReceiverOptions>,
240    ) -> Result<EventReceiver> {
241        let options = options.unwrap_or_default();
242
243        let receiver_name = self
244            .instance_id
245            .clone()
246            .unwrap_or_else(|| Uuid::new_v4().to_string());
247        let start_expression = StartPosition::start_expression(&options.start_position);
248
249        trace!(
250            "Opening receiver on url {} partition {partition_id}.",
251            self.endpoint
252        );
253
254        let source_url = format!("{}/Partitions/{}", self.endpoint, partition_id);
255        let source_url = Url::parse(&source_url).map_err(azure_core::Error::from)?;
256
257        let message_source = AmqpSource::builder()
258            .with_address(source_url.to_string())
259            .add_to_filter(
260                AmqpSourceFilter::selector_filter().description().into(),
261                Box::new(AmqpDescribed::new(
262                    AmqpSourceFilter::selector_filter().code(),
263                    start_expression,
264                )),
265            )
266            .build();
267        let mut receiver_properties: AmqpOrderedMap<AmqpSymbol, AmqpValue> =
268            vec![("com.microsoft.com:receiver-name", receiver_name.clone())]
269                .into_iter()
270                .map(|(k, v)| (AmqpSymbol::from(k), AmqpValue::from(v)))
271                .collect();
272
273        if let Some(owner_level) = options.owner_level {
274            receiver_properties.insert("com.microsoft:epoch".into(), AmqpValue::from(owner_level));
275        }
276
277        let receiver_options = AmqpReceiverOptions {
278            name: Some(receiver_name),
279            properties: Some(receiver_properties),
280            credit_mode: Some(ReceiverCreditMode::Auto(options.prefetch.unwrap_or(300))),
281            auto_accept: true,
282            ..Default::default()
283        };
284
285        debug!("Receiver attached on partition {partition_id}.");
286        Ok(EventReceiver::new(
287            self.recoverable_connection.clone(),
288            receiver_options,
289            message_source,
290            source_url,
291            partition_id,
292            options.receive_timeout,
293        ))
294    }
295
296    /// Retrieves the properties of the Event Hub.
297    ///
298    /// This function retrieves the properties of the Event Hub associated with the [`ConsumerClient`].
299    /// It returns a [`Result`] containing the [`EventHubProperties`] if the operation is successful.
300    ///
301    /// # Returns
302    ///
303    /// A [`Result`] containing the [`EventHubProperties`] if the operation is successful.
304    ///
305    /// # Examples
306    ///
307    /// ``` no_run
308    /// use azure_messaging_eventhubs::ConsumerClient;
309    /// use azure_identity::DeveloperToolsCredential;
310    ///
311    /// #[tokio::main]
312    /// async fn main(){
313    ///     let my_credential = DeveloperToolsCredential::new(None).unwrap();
314    ///     let consumer = ConsumerClient::builder()
315    ///         .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
316    ///
317    ///     let eventhub_properties = consumer.get_eventhub_properties().await;
318    ///
319    ///     match eventhub_properties {
320    ///         Ok(properties) => {
321    ///             // Process the Event Hub instance properties
322    ///             println!("Event Hub properties: {:?}", properties);
323    ///         }
324    ///         Err(err) => {
325    ///             // Handle the error
326    ///             eprintln!("Error retrieving Event Hubs properties: {:?}", err);
327    ///         }
328    ///     }
329    /// }
330    /// ```
331    pub async fn get_eventhub_properties(&self) -> Result<EventHubProperties> {
332        self.get_management_instance()
333            .await?
334            .get_eventhub_properties(&self.eventhub)
335            .await
336    }
337
338    /// Retrieves the properties of a specific partition in the Event Hub.
339    ///
340    /// This function retrieves the properties of the specified partition in the Event Hub.
341    /// It returns a [`Result`] containing the [`EventHubPartitionProperties`] if the operation is successful.
342    ///
343    /// # Arguments
344    ///
345    /// * `partition_id` - The ID of the partition to retrieve properties for.
346    ///
347    /// # Returns
348    ///
349    /// A [`Result`] containing the [`EventHubPartitionProperties`] if the operation is successful.
350    ///
351    /// # Examples
352    ///
353    /// ``` no_run
354    /// use azure_messaging_eventhubs::ConsumerClient;
355    /// use azure_identity::DeveloperToolsCredential;
356    ///
357    /// #[tokio::main]
358    /// async fn main() {
359    ///     let my_credential = DeveloperToolsCredential::new(None).unwrap();
360    ///     let consumer = ConsumerClient::builder()
361    ///         .open("my_namespace", "my_eventhub".to_string(), my_credential).await.unwrap();
362    ///     let partition_id = "0";
363    ///
364    ///     let partition_properties = consumer.get_partition_properties(partition_id).await;
365    ///
366    ///     match partition_properties {
367    ///         Ok(properties) => {
368    ///             // Process the partition properties
369    ///             println!("Partition properties: {:?}", properties);
370    ///         }
371    ///         Err(err) => {
372    ///             // Handle the error
373    ///             eprintln!("Error retrieving partition properties: {:?}", err);
374    ///         }
375    ///     }
376    /// }
377    /// ```
378    pub async fn get_partition_properties(
379        &self,
380        partition_id: &str,
381    ) -> Result<EventHubPartitionProperties> {
382        self.get_management_instance()
383            .await?
384            .get_eventhub_partition_properties(&self.eventhub, partition_id)
385            .await
386    }
387
388    async fn get_management_instance(&self) -> Result<Arc<ManagementInstance>> {
389        Ok(ManagementInstance::new(self.recoverable_connection.clone()))
390    }
391
392    async fn ensure_connection(&self) -> azure_core_amqp::Result<()> {
393        self.recoverable_connection.ensure_connection().await?;
394        Ok(())
395    }
396}
397
398/// Represents the options for receiving events from an Event Hub.
399#[derive(Debug, Clone, Default)]
400pub struct OpenReceiverOptions {
401    /// The owner level for messages being retrieved.
402    pub owner_level: Option<i64>,
403    /// The prefetch count for messages being retrieved.
404    pub prefetch: Option<u32>,
405    /// The starting position for messages being retrieved.
406    pub start_position: Option<StartPosition>,
407
408    /// Optional timeout for receiving messages. If not provided, the default timeout is infinite.
409    ///
410    /// Note: This is the timeout for individual messages, not the entire receive operation.
411    /// As long as there are messages available, then they will be included in the stream events regardless of the timeout.
412    pub receive_timeout: Option<Duration>,
413}
414/// Represents the options for receiving events from an Event Hub.
415impl OpenReceiverOptions {}
416
417/// Represents the starting position of a consumer when receiving events from an Event Hub.
418#[derive(Debug, Default, PartialEq, Clone)]
419pub enum StartLocation {
420    /// The starting position is specified by an offset.
421    Offset(String),
422    /// The starting position is specified by a sequence number.
423    SequenceNumber(i64),
424    /// The starting position is specified by an enqueued time.
425    EnqueuedTime(SystemTime),
426    /// The starting position is the earliest event in the partition.
427    Earliest,
428    #[default]
429    /// The starting position is the latest event in the partition.
430    Latest,
431}
432
433pub(crate) const ENQUEUED_TIME_ANNOTATION: &str = "amqp.annotation.x-opt-enqueued-time";
434pub(crate) const OFFSET_ANNOTATION: &str = "amqp.annotation.x-opt-offset";
435pub(crate) const SEQUENCE_NUMBER_ANNOTATION: &str = "amqp.annotation.x-opt-sequence-number";
436
437/// Represents the starting position of a consumer when receiving events from an Event Hub.
438///
439/// This enum provides different ways to specify the starting position of a consumer when receiving events from an Event Hub.
440/// The starting position can be specified using an offset, a sequence number, an enqueued time, or the earliest or latest event in the partition.
441///
442/// The default starting position is the latest event in the partition (always receive new events).
443///
444/// # Examples
445///
446/// Basic usage:
447///
448/// ```
449/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
450///
451/// let start_position = StartPosition{
452///   location: StartLocation::SequenceNumber(12345),
453///    ..Default::default()};;
454/// ```
455///
456/// ```
457/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
458///
459/// let start_position = StartPosition{
460///  location: StartLocation::EnqueuedTime(std::time::SystemTime::now()),
461///  ..Default::default()
462/// };
463/// ```
464///
465/// ```
466/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
467///
468/// let start_position = StartPosition{
469///   location: StartLocation::Offset("12345".to_string()),
470///   ..Default::default()
471/// };
472/// ```
473///
474/// ```
475/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
476///
477/// let start_position = StartPosition{
478///   location: StartLocation::Earliest,
479///   ..Default::default()
480/// };
481/// ```
482///
483/// ```
484/// use azure_messaging_eventhubs::{StartPosition, StartLocation};
485///
486/// let start_position = StartPosition{
487///   location: StartLocation::Latest,
488///   ..Default::default()
489/// };
490/// ```
491///
492/// ```
493/// use azure_messaging_eventhubs::StartPosition;
494///
495/// let start_position = StartPosition::default();
496/// ```
497///
498#[derive(Debug, PartialEq, Clone, Default)]
499pub struct StartPosition {
500    /// The location of the starting position.
501    pub location: StartLocation,
502
503    /// Whether the starting position is inclusive (includes the event at StartLocation).
504    pub inclusive: bool,
505}
506
507impl StartPosition {
508    pub(crate) fn start_expression(position: &Option<StartPosition>) -> String {
509        if let Some(position) = position {
510            let mut greater_than: &str = ">";
511            if position.inclusive {
512                greater_than = ">=";
513            }
514            match &position.location {
515                StartLocation::Offset(offset) => {
516                    format!("{} {}'{}'", OFFSET_ANNOTATION, greater_than, offset)
517                }
518                StartLocation::SequenceNumber(sequence_number) => {
519                    format!(
520                        "{} {}'{}'",
521                        SEQUENCE_NUMBER_ANNOTATION, greater_than, sequence_number
522                    )
523                }
524                StartLocation::EnqueuedTime(enqueued_time) => {
525                    let enqueued_time = enqueued_time
526                        .duration_since(UNIX_EPOCH)
527                        .expect("Time went backwards")
528                        .as_millis();
529                    format!(
530                        "{} {}'{}'",
531                        ENQUEUED_TIME_ANNOTATION, greater_than, enqueued_time
532                    )
533                }
534                StartLocation::Earliest => "amqp.annotation.x-opt-offset > '-1'".to_string(),
535                StartLocation::Latest => "amqp.annotation.x-opt-offset > '@latest'".to_string(),
536            }
537        } else {
538            "amqp.annotation.x-opt-offset > '@latest'".to_string()
539        }
540    }
541}
542
543pub mod builders {
544    use super::*;
545    use crate::Result;
546    use std::sync::Arc;
547
548    /// A builder for creating a [`ConsumerClient`].
549    ///
550    /// This builder is used to create a new [`ConsumerClient`] with the specified parameters.
551    ///
552    /// # Examples
553    ///
554    /// ```no_run
555    /// use azure_messaging_eventhubs::ConsumerClient;
556    /// use azure_identity::DeveloperToolsCredential;
557    ///
558    /// #[tokio::main]
559    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
560    ///    let my_credential = DeveloperToolsCredential::new(None).unwrap();
561    ///   let consumer = ConsumerClient::builder()
562    ///      .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
563    ///   Ok(())
564    /// }
565    /// ```
566    #[derive(Default)]
567    pub struct ConsumerClientBuilder {
568        consumer_group: Option<String>,
569        application_id: Option<String>,
570        instance_id: Option<String>,
571        retry_options: Option<RetryOptions>,
572        custom_endpoint: Option<String>,
573    }
574
575    impl ConsumerClientBuilder {
576        pub(super) fn new() -> Self {
577            Self {
578                ..Default::default()
579            }
580        }
581
582        /// Specifies the name of the application creating the [`ConsumerClient`].
583        pub fn with_application_id(mut self, application_id: String) -> Self {
584            self.application_id = Some(application_id);
585            self
586        }
587
588        /// Specifies the consumer group for the [`ConsumerClient`].
589        ///
590        /// If not specified, the default consumer group will be used.
591        ///
592        /// For more information on Event Hubs consumer groups, see
593        /// [Consumer groups](https://learn.microsoft.com/azure/event-hubs/event-hubs-features#consumer-groups).
594        ///
595        /// # Examples
596        ///
597        /// ```no_run
598        /// use azure_messaging_eventhubs::ConsumerClient;
599        /// use azure_identity::DeveloperToolsCredential;
600        ///
601        /// #[tokio::main]
602        /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
603        ///    let my_credential = DeveloperToolsCredential::new(None)?;
604        ///    let consumer = ConsumerClient::builder()
605        ///      .with_consumer_group("my_consumer_group".to_string())
606        ///      .open("my_namespace", "my_eventhub".to_string(), my_credential).await?;
607        ///   Ok(())
608        /// }
609        ///
610        /// ```
611        ///
612        pub fn with_consumer_group(mut self, consumer_group: String) -> Self {
613            self.consumer_group = Some(consumer_group);
614            self
615        }
616
617        /// Specifies an instance ID for this instance of a [`ConsumerClient`].
618        pub fn with_instance_id(mut self, instance_id: String) -> Self {
619            self.instance_id = Some(instance_id);
620            self
621        }
622
623        /// Specifies the retry options for the [`ConsumerClient`].
624        pub fn with_retry_options(mut self, retry_options: RetryOptions) -> Self {
625            self.retry_options = Some(retry_options);
626            self
627        }
628
629        /// Sets a custom endpoint for the Event Hub.
630        ///
631        /// # Arguments
632        /// * `endpoint` - The custom endpoint for the Event Hub.
633        ///
634        /// # Returns
635        /// The updated [`ConsumerClientBuilder`].
636        ///
637        /// Note: The custom endpoint option allows a customer to specify an AMQP proxy
638        /// which will be used to forward requests to the actual Event Hub instance.
639        ///
640        pub fn with_custom_endpoint(mut self, endpoint: String) -> Self {
641            self.custom_endpoint = Some(endpoint);
642            self
643        }
644
645        /// Opens a connection to the Event Hub.
646        ///
647        /// This method establishes a connection to the Event Hubs instance associated
648        /// with the [`ConsumerClientBuilder`]. It returns a `Result` indicating whether the
649        /// operation was successful or not.
650        ///
651        /// # Returns
652        ///
653        /// A `Result` indicating whether the operation was successful or not.
654        ///
655        /// # Examples
656        ///
657        /// ```
658        /// use azure_messaging_eventhubs::ConsumerClient;
659        /// use azure_identity::DeveloperToolsCredential;
660        ///
661        /// #[tokio::main]
662        /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
663        ///     let my_credential = DeveloperToolsCredential::new(None).unwrap();
664        ///     let result = ConsumerClient::builder()
665        ///         .open("my_namespace", "my_eventhub".to_string(), my_credential).await;
666        ///
667        ///     match result {
668        ///         Ok(_connection) => {
669        ///             // Connection opened successfully
670        ///             println!("Connection opened successfully");
671        ///         }
672        ///         Err(err) => {
673        ///             // Handle the error
674        ///             eprintln!("Error opening connection: {:?}", err);
675        ///         }
676        ///     }
677        ///     Ok(())
678        /// }
679        /// ```
680        pub async fn open(
681            self,
682            fully_qualified_namespace: &str,
683            eventhub_name: String,
684            credential: Arc<dyn azure_core::credentials::TokenCredential>,
685        ) -> Result<super::ConsumerClient> {
686            let custom_endpoint = match self.custom_endpoint {
687                Some(endpoint) => Some(Url::parse(&endpoint).map_err(azure_core::Error::from)?),
688                None => None,
689            };
690            trace!("Opening consumer client on {fully_qualified_namespace}.");
691            let consumer = super::ConsumerClient::new(
692                fully_qualified_namespace,
693                eventhub_name,
694                self.consumer_group,
695                credential,
696                ConsumerClientOptions {
697                    application_id: self.application_id,
698                    instance_id: self.instance_id,
699                    retry_options: self.retry_options,
700                    custom_endpoint,
701                },
702            )?;
703            consumer.ensure_connection().await?;
704            Ok(consumer)
705        }
706    }
707}
708
709#[cfg(test)]
710pub(crate) mod tests {
711    use crate::{
712        common::tests::force_errors, ConsumerClient, Result, StartLocation, StartPosition,
713    };
714    use azure_core::time::Duration;
715    use azure_core_amqp::error::AmqpErrorKind;
716    use azure_core_test::{recorded, TestContext};
717    use std::{
718        sync::Arc,
719        time::{SystemTime, UNIX_EPOCH},
720    };
721    use tracing::info;
722
723    // static INIT_LOGGING: std::sync::Once = std::sync::Once::new();
724
725    // #[test]
726    // pub(crate) fn setup() {
727    //     INIT_LOGGING.call_once(|| {
728    //         println!("Setting up test logger...");
729
730    //         use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
731    //         tracing_subscriber::fmt()
732    //             .with_env_filter(EnvFilter::from_default_env())
733    //             .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
734    //             .with_ansi(std::env::var("NO_COLOR").map_or(true, |v| v.is_empty()))
735    //             .with_writer(std::io::stderr)
736    //             .init();
737    //     });
738    // }
739
740    #[recorded::test]
741    async fn test_start_position_builder_with_sequence_number(_ctx: TestContext) -> Result<()> {
742        let sequence_number = 12345i64;
743        let start_position = StartPosition {
744            location: StartLocation::SequenceNumber(sequence_number),
745            ..Default::default()
746        };
747        assert_eq!(
748            start_position.location,
749            StartLocation::SequenceNumber(sequence_number)
750        );
751        assert_eq!(
752            StartPosition::start_expression(&Some(start_position)),
753            "amqp.annotation.x-opt-sequence-number >'12345'"
754        );
755
756        let start_position = StartPosition {
757            location: StartLocation::SequenceNumber(sequence_number),
758            inclusive: true,
759        };
760        assert_eq!(
761            StartPosition::start_expression(&Some(start_position)),
762            "amqp.annotation.x-opt-sequence-number >='12345'"
763        );
764        Ok(())
765    }
766
767    #[recorded::test]
768    async fn test_start_position_builder_with_enqueued_time(_ctx: TestContext) -> Result<()> {
769        let enqueued_time = SystemTime::now();
770        let start_position = StartPosition {
771            location: StartLocation::EnqueuedTime(enqueued_time),
772            ..Default::default()
773        };
774        info!("enqueued_time: {:?}", enqueued_time);
775        info!(
776            "enqueued_time: {:?}",
777            enqueued_time.duration_since(UNIX_EPOCH)
778        );
779        info!(
780            "enqueued_time: {:?}",
781            enqueued_time
782                .duration_since(UNIX_EPOCH)
783                .unwrap()
784                .as_millis()
785        );
786        assert_eq!(
787            start_position.location,
788            StartLocation::EnqueuedTime(enqueued_time)
789        );
790        assert!(!start_position.inclusive);
791        assert_eq!(
792            StartPosition::start_expression(&Some(start_position)),
793            format!(
794                "amqp.annotation.x-opt-enqueued-time >'{}'",
795                enqueued_time
796                    .duration_since(UNIX_EPOCH)
797                    .unwrap()
798                    .as_millis()
799            )
800        );
801
802        let start_position = StartPosition {
803            location: StartLocation::EnqueuedTime(enqueued_time),
804            inclusive: true,
805        };
806        assert_eq!(
807            StartPosition::start_expression(&Some(start_position)),
808            format!(
809                "amqp.annotation.x-opt-enqueued-time >='{}'",
810                enqueued_time
811                    .duration_since(std::time::UNIX_EPOCH)
812                    .unwrap()
813                    .as_millis()
814            )
815        );
816        Ok(())
817    }
818
819    #[recorded::test]
820    async fn test_start_position_builder_with_offset(_ctx: TestContext) -> Result<()> {
821        let offset = "12345".to_string();
822        let start_position = StartPosition {
823            location: StartLocation::Offset(offset.clone()),
824            ..Default::default()
825        };
826        assert_eq!(
827            start_position.location,
828            StartLocation::Offset(offset.clone())
829        );
830        assert_eq!(
831            "amqp.annotation.x-opt-offset >'12345'",
832            StartPosition::start_expression(&Some(start_position)),
833        );
834
835        let start_position = StartPosition {
836            location: StartLocation::Offset(offset.clone()),
837            inclusive: true,
838        };
839        assert_eq!(
840            "amqp.annotation.x-opt-offset >='12345'",
841            StartPosition::start_expression(&Some(start_position)),
842        );
843        Ok(())
844    }
845
846    #[recorded::test]
847    async fn test_start_position_builder_inclusive(_ctx: TestContext) -> Result<()> {
848        let start_position = StartPosition {
849            inclusive: true,
850            ..Default::default()
851        };
852        assert!(start_position.inclusive);
853        let start_position = StartPosition::default();
854        assert!(!start_position.inclusive);
855        Ok(())
856    }
857
858    #[recorded::test(live)]
859    async fn force_errors_consumer_properties_link(ctx: TestContext) -> Result<()> {
860        const TEST_NAME: &str = "force_errors_consumer_properties_link";
861        let recording = ctx.recording();
862        let host = recording.var("EVENTHUBS_HOST", None);
863        let eventhub = recording.var("EVENTHUB_NAME", None);
864        let credential = recording.credential();
865        let consumer = Arc::new(
866            ConsumerClient::builder()
867                .with_application_id(TEST_NAME.to_string())
868                .open(host.as_str(), eventhub, credential.clone())
869                .await?,
870        );
871
872        force_errors(
873            consumer.clone(),
874            |consumer: Arc<ConsumerClient>| {
875                let consumer = consumer.clone();
876                async move {
877                    loop {
878                        consumer.get_eventhub_properties().await.unwrap();
879                    }
880                }
881            },
882            |consumer| {
883                consumer
884                    .force_error(azure_core_amqp::AmqpError::from(
885                        AmqpErrorKind::LinkClosedByRemote(Box::new(azure_core::error::Error::new(
886                            azure_core::error::ErrorKind::Other,
887                            "Forced error",
888                        ))),
889                    ))
890                    .unwrap();
891            },
892            Duration::seconds(10), // Seconds until forcing the error.
893            Duration::seconds(20), // Seconds until test timeout.
894        )
895        .await?;
896
897        if let Ok(consumer) = Arc::try_unwrap(consumer) {
898            consumer.close().await?;
899        } else {
900            panic!("Consumer client has unresolved references.");
901        }
902
903        Ok(())
904    }
905
906    #[recorded::test(live)]
907    async fn force_errors_consumer_properties_session(ctx: TestContext) -> Result<()> {
908        const TEST_NAME: &str = "force_errors_consumer_properties_session";
909        let recording = ctx.recording();
910        let host = recording.var("EVENTHUBS_HOST", None);
911        let eventhub = recording.var("EVENTHUB_NAME", None);
912        let credential = recording.credential();
913        let consumer = Arc::new(
914            ConsumerClient::builder()
915                .with_application_id(TEST_NAME.to_string())
916                .open(host.as_str(), eventhub, credential.clone())
917                .await?,
918        );
919
920        force_errors(
921            consumer.clone(),
922            |consumer: Arc<ConsumerClient>| {
923                let consumer = consumer.clone();
924                async move {
925                    loop {
926                        consumer.get_eventhub_properties().await.unwrap();
927                    }
928                }
929            },
930            |consumer| {
931                consumer
932                    .force_error(azure_core_amqp::AmqpError::from(
933                        AmqpErrorKind::SessionClosedByRemote(Box::new(
934                            azure_core::error::Error::new(
935                                azure_core::error::ErrorKind::Other,
936                                "Forced error",
937                            ),
938                        )),
939                    ))
940                    .unwrap();
941            },
942            Duration::seconds(10), // Seconds until forcing the error.
943            Duration::seconds(20), // Seconds until test timeout.
944        )
945        .await?;
946
947        if let Ok(consumer) = Arc::try_unwrap(consumer) {
948            consumer.close().await?;
949        } else {
950            panic!("Consumer client has unresolved references.");
951        }
952
953        Ok(())
954    }
955    #[recorded::test(live)]
956    async fn force_errors_consumer_properties_connection(ctx: TestContext) -> Result<()> {
957        const TEST_NAME: &str = "force_errors_consumer_properties_connection";
958        let recording = ctx.recording();
959        let host = recording.var("EVENTHUBS_HOST", None);
960        let eventhub = recording.var("EVENTHUB_NAME", None);
961        let credential = recording.credential();
962        let consumer = Arc::new(
963            ConsumerClient::builder()
964                .with_application_id(TEST_NAME.to_string())
965                .open(host.as_str(), eventhub, credential.clone())
966                .await?,
967        );
968
969        force_errors(
970            consumer.clone(),
971            |consumer: Arc<ConsumerClient>| {
972                let consumer = consumer.clone();
973                async move {
974                    loop {
975                        consumer.get_eventhub_properties().await.unwrap();
976                    }
977                }
978            },
979            |consumer| {
980                consumer
981                    .force_error(azure_core_amqp::AmqpError::from(
982                        AmqpErrorKind::ConnectionClosedByRemote(Box::new(
983                            azure_core::error::Error::new(
984                                azure_core::error::ErrorKind::Other,
985                                "Forced error",
986                            ),
987                        )),
988                    ))
989                    .unwrap();
990            },
991            Duration::seconds(10), // Seconds until forcing the error.
992            Duration::seconds(20), // Seconds until test timeout.
993        )
994        .await?;
995
996        Ok(())
997    }
998}