azeventhubs/primitives/
partition_receiver.rs

1use std::{collections::VecDeque, marker::PhantomData, time::Duration as StdDuration};
2
3use crate::{
4    amqp::amqp_consumer::{single::receive_event_batch, AmqpConsumer},
5    authorization::event_hub_token_credential::EventHubTokenCredential,
6    consumer::EventPosition,
7    core::BasicRetryPolicy,
8    event_hubs_retry_policy::EventHubsRetryPolicy,
9    EventHubConnection, EventHubsRetryOptions, ReceivedEventData,
10};
11
12use super::partition_receiver_options::PartitionReceiverOptions;
13
14/// Allows reading events from a specific partition of an Event Hub, and in the context of a
15/// specific consumer group, to be read with a greater level of control over communication with the
16/// Event Hubs service than is offered by other event consumers.
17#[derive(Debug)]
18pub struct PartitionReceiver<RP> {
19    connection: EventHubConnection,
20    inner_consumer: AmqpConsumer<RP>,
21    options: PartitionReceiverOptions,
22}
23
24/// A builder for a [`PartitionReceiver`].
25#[derive(Debug)]
26pub struct PartitionReceiverBuilder<RP> {
27    _retry_policy_marker: PhantomData<RP>,
28}
29
30impl PartitionReceiver<BasicRetryPolicy> {
31    /// Creates a new [`PartitionReceiverBuilder`] with a custom retry policy.
32    pub fn with_policy<RP>() -> PartitionReceiverBuilder<RP>
33    where
34        RP: EventHubsRetryPolicy + From<EventHubsRetryOptions> + Send,
35    {
36        PartitionReceiverBuilder {
37            _retry_policy_marker: PhantomData,
38        }
39    }
40
41    /// Creates a new [`PartitionReceiver`] from a connection string.
42    pub async fn new_from_connection_string(
43        consumer_group: &str,
44        partition_id: &str,
45        event_position: EventPosition,
46        connection_string: impl Into<String>,
47        event_hub_name: impl Into<Option<String>>,
48        options: PartitionReceiverOptions,
49    ) -> Result<Self, azure_core::Error> {
50        Self::with_policy()
51            .new_from_connection_string(
52                consumer_group,
53                partition_id,
54                event_position,
55                connection_string,
56                event_hub_name,
57                options,
58            )
59            .await
60    }
61
62    /// Creates a new [`PartitionReceiver`] from a namespace and a credential.
63    pub async fn new_from_credential(
64        consumer_group: &str,
65        partition_id: &str,
66        event_position: EventPosition,
67        fully_qualified_namespace: impl Into<String>,
68        event_hub_name: impl Into<String>,
69        credential: impl Into<EventHubTokenCredential>,
70        options: PartitionReceiverOptions,
71    ) -> Result<Self, azure_core::Error> {
72        Self::with_policy()
73            .new_from_credential(
74                consumer_group,
75                partition_id,
76                event_position,
77                fully_qualified_namespace,
78                event_hub_name,
79                credential,
80                options,
81            )
82            .await
83    }
84
85    /// Creates a new [`PartitionReceiver`] from an existing [`EventHubConnection`].
86    pub async fn with_conneciton(
87        consumer_group: &str,
88        partition_id: &str,
89        event_position: EventPosition,
90        connection: EventHubConnection,
91        options: PartitionReceiverOptions,
92    ) -> Result<Self, azure_core::Error> {
93        Self::with_policy()
94            .with_connection(
95                consumer_group,
96                partition_id,
97                event_position,
98                connection,
99                options,
100            )
101            .await
102    }
103}
104
105impl<RP> PartitionReceiverBuilder<RP>
106where
107    RP: EventHubsRetryPolicy + From<EventHubsRetryOptions> + Send,
108{
109    /// Creates a new [`PartitionReceiver`] from a connection string.
110    pub async fn new_from_connection_string(
111        self,
112        consumer_group: &str,
113        partition_id: &str,
114        event_position: EventPosition,
115        connection_string: impl Into<String>,
116        event_hub_name: impl Into<Option<String>>,
117        options: PartitionReceiverOptions,
118    ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
119        let connection = EventHubConnection::new_from_connection_string(
120            connection_string.into(),
121            event_hub_name.into(),
122            options.connection_options.clone(),
123        )
124        .await?;
125
126        self.with_connection(
127            consumer_group,
128            partition_id,
129            event_position,
130            connection,
131            options,
132        )
133        .await
134    }
135
136    /// Creates a new [`PartitionReceiver`] from a namespace and a credential.
137    #[allow(clippy::too_many_arguments)] // TODO: how to reduce the number of arguments?
138    pub async fn new_from_credential(
139        self,
140        consumer_group: &str,
141        partition_id: &str,
142        event_position: EventPosition,
143        fully_qualified_namespace: impl Into<String>,
144        event_hub_name: impl Into<String>,
145        credential: impl Into<EventHubTokenCredential>,
146        options: PartitionReceiverOptions,
147    ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
148        let connection = EventHubConnection::new_from_credential(
149            fully_qualified_namespace.into(),
150            event_hub_name.into(),
151            credential.into(),
152            options.connection_options.clone(),
153        )
154        .await?;
155
156        self.with_connection(
157            consumer_group,
158            partition_id,
159            event_position,
160            connection,
161            options,
162        )
163        .await
164    }
165
166    /// Creates a new [`PartitionReceiver`] from an existing [`EventHubConnection`].
167    pub async fn with_connection(
168        self,
169        consumer_group: &str,
170        partition_id: &str,
171        event_position: EventPosition,
172        mut connection: EventHubConnection,
173        options: PartitionReceiverOptions,
174    ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
175        let consumer_identifier = options.identifier.clone();
176        let retry_policy = RP::from(options.retry_options.clone());
177        let inner_consumer = connection
178            .create_transport_consumer(
179                consumer_group,
180                partition_id,
181                consumer_identifier,
182                event_position,
183                retry_policy,
184                options.track_last_enqueued_event_properties,
185                options.owner_level,
186                Some(options.prefetch_count),
187            )
188            .await?;
189
190        Ok(PartitionReceiver {
191            connection,
192            inner_consumer,
193            options,
194        })
195    }
196}
197
198impl<RP> PartitionReceiver<RP>
199where
200    RP: EventHubsRetryPolicy + Send,
201{
202    /// Receives a batch of events from the Event Hub partition.
203    pub async fn recv_batch(
204        &mut self,
205        max_event_count: usize,
206        max_wait_time: impl Into<Option<StdDuration>>,
207    ) -> Result<impl Iterator<Item = ReceivedEventData> + ExactSizeIterator, azure_core::Error>
208    {
209        let mut buffer = VecDeque::with_capacity(max_event_count);
210        let max_wait_time = max_wait_time.into();
211        let max_wait_time = max_wait_time.map(|t| t.max(self.options.maximum_receive_wait_time));
212        receive_event_batch(
213            &mut self.connection.inner,
214            &mut self.inner_consumer,
215            &mut buffer,
216            max_wait_time,
217        )
218        .await?;
219        Ok(buffer.into_iter())
220    }
221}
222
223impl<RP> PartitionReceiver<RP> {
224    /// Closes the [`PartitionReceiver`].
225    pub async fn close(self) -> Result<(), azure_core::Error> {
226        self.inner_consumer.close().await?;
227        self.connection.close_if_owned().await?;
228        Ok(())
229    }
230}