1use std::{collections::VecDeque, marker::PhantomData, time::Duration as StdDuration};
2
3use crate::{
4 amqp::amqp_consumer::{single::receive_event_batch, AmqpConsumer},
5 authorization::event_hub_token_credential::EventHubTokenCredential,
6 consumer::EventPosition,
7 core::BasicRetryPolicy,
8 event_hubs_retry_policy::EventHubsRetryPolicy,
9 EventHubConnection, EventHubsRetryOptions, ReceivedEventData,
10};
11
12use super::partition_receiver_options::PartitionReceiverOptions;
13
14#[derive(Debug)]
18pub struct PartitionReceiver<RP> {
19 connection: EventHubConnection,
20 inner_consumer: AmqpConsumer<RP>,
21 options: PartitionReceiverOptions,
22}
23
24#[derive(Debug)]
26pub struct PartitionReceiverBuilder<RP> {
27 _retry_policy_marker: PhantomData<RP>,
28}
29
30impl PartitionReceiver<BasicRetryPolicy> {
31 pub fn with_policy<RP>() -> PartitionReceiverBuilder<RP>
33 where
34 RP: EventHubsRetryPolicy + From<EventHubsRetryOptions> + Send,
35 {
36 PartitionReceiverBuilder {
37 _retry_policy_marker: PhantomData,
38 }
39 }
40
41 pub async fn new_from_connection_string(
43 consumer_group: &str,
44 partition_id: &str,
45 event_position: EventPosition,
46 connection_string: impl Into<String>,
47 event_hub_name: impl Into<Option<String>>,
48 options: PartitionReceiverOptions,
49 ) -> Result<Self, azure_core::Error> {
50 Self::with_policy()
51 .new_from_connection_string(
52 consumer_group,
53 partition_id,
54 event_position,
55 connection_string,
56 event_hub_name,
57 options,
58 )
59 .await
60 }
61
62 pub async fn new_from_credential(
64 consumer_group: &str,
65 partition_id: &str,
66 event_position: EventPosition,
67 fully_qualified_namespace: impl Into<String>,
68 event_hub_name: impl Into<String>,
69 credential: impl Into<EventHubTokenCredential>,
70 options: PartitionReceiverOptions,
71 ) -> Result<Self, azure_core::Error> {
72 Self::with_policy()
73 .new_from_credential(
74 consumer_group,
75 partition_id,
76 event_position,
77 fully_qualified_namespace,
78 event_hub_name,
79 credential,
80 options,
81 )
82 .await
83 }
84
85 pub async fn with_conneciton(
87 consumer_group: &str,
88 partition_id: &str,
89 event_position: EventPosition,
90 connection: EventHubConnection,
91 options: PartitionReceiverOptions,
92 ) -> Result<Self, azure_core::Error> {
93 Self::with_policy()
94 .with_connection(
95 consumer_group,
96 partition_id,
97 event_position,
98 connection,
99 options,
100 )
101 .await
102 }
103}
104
105impl<RP> PartitionReceiverBuilder<RP>
106where
107 RP: EventHubsRetryPolicy + From<EventHubsRetryOptions> + Send,
108{
109 pub async fn new_from_connection_string(
111 self,
112 consumer_group: &str,
113 partition_id: &str,
114 event_position: EventPosition,
115 connection_string: impl Into<String>,
116 event_hub_name: impl Into<Option<String>>,
117 options: PartitionReceiverOptions,
118 ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
119 let connection = EventHubConnection::new_from_connection_string(
120 connection_string.into(),
121 event_hub_name.into(),
122 options.connection_options.clone(),
123 )
124 .await?;
125
126 self.with_connection(
127 consumer_group,
128 partition_id,
129 event_position,
130 connection,
131 options,
132 )
133 .await
134 }
135
136 #[allow(clippy::too_many_arguments)] pub async fn new_from_credential(
139 self,
140 consumer_group: &str,
141 partition_id: &str,
142 event_position: EventPosition,
143 fully_qualified_namespace: impl Into<String>,
144 event_hub_name: impl Into<String>,
145 credential: impl Into<EventHubTokenCredential>,
146 options: PartitionReceiverOptions,
147 ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
148 let connection = EventHubConnection::new_from_credential(
149 fully_qualified_namespace.into(),
150 event_hub_name.into(),
151 credential.into(),
152 options.connection_options.clone(),
153 )
154 .await?;
155
156 self.with_connection(
157 consumer_group,
158 partition_id,
159 event_position,
160 connection,
161 options,
162 )
163 .await
164 }
165
166 pub async fn with_connection(
168 self,
169 consumer_group: &str,
170 partition_id: &str,
171 event_position: EventPosition,
172 mut connection: EventHubConnection,
173 options: PartitionReceiverOptions,
174 ) -> Result<PartitionReceiver<RP>, azure_core::Error> {
175 let consumer_identifier = options.identifier.clone();
176 let retry_policy = RP::from(options.retry_options.clone());
177 let inner_consumer = connection
178 .create_transport_consumer(
179 consumer_group,
180 partition_id,
181 consumer_identifier,
182 event_position,
183 retry_policy,
184 options.track_last_enqueued_event_properties,
185 options.owner_level,
186 Some(options.prefetch_count),
187 )
188 .await?;
189
190 Ok(PartitionReceiver {
191 connection,
192 inner_consumer,
193 options,
194 })
195 }
196}
197
198impl<RP> PartitionReceiver<RP>
199where
200 RP: EventHubsRetryPolicy + Send,
201{
202 pub async fn recv_batch(
204 &mut self,
205 max_event_count: usize,
206 max_wait_time: impl Into<Option<StdDuration>>,
207 ) -> Result<impl Iterator<Item = ReceivedEventData> + ExactSizeIterator, azure_core::Error>
208 {
209 let mut buffer = VecDeque::with_capacity(max_event_count);
210 let max_wait_time = max_wait_time.into();
211 let max_wait_time = max_wait_time.map(|t| t.max(self.options.maximum_receive_wait_time));
212 receive_event_batch(
213 &mut self.connection.inner,
214 &mut self.inner_consumer,
215 &mut buffer,
216 max_wait_time,
217 )
218 .await?;
219 Ok(buffer.into_iter())
220 }
221}
222
223impl<RP> PartitionReceiver<RP> {
224 pub async fn close(self) -> Result<(), azure_core::Error> {
226 self.inner_consumer.close().await?;
227 self.connection.close_if_owned().await?;
228 Ok(())
229 }
230}