1use crate::iam::PolicyDocument;
2use crate::shared::Id;
3use crate::shared::QUEUE_POLICY_ID_SUFFIX;
4use crate::sqs::{QueuePolicyType, QueueRef, QueueType};
5use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{DelaySeconds, KeyReusePeriod, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime, StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::kms::KeyRef;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15#[derive(Debug, Clone)]
16pub enum DeduplicationScope {
17 Queue,
18 MessageGroup,
19}
20
21impl From<DeduplicationScope> for String {
22 fn from(value: DeduplicationScope) -> Self {
23 match value {
24 DeduplicationScope::Queue => "queue".to_string(),
25 DeduplicationScope::MessageGroup => "messageGroup".to_string(),
26 }
27 }
28}
29
30#[derive(Debug, Clone)]
31pub enum FifoThroughputLimit {
32 PerQueue,
33 PerMessageGroupId,
34}
35
36impl From<FifoThroughputLimit> for String {
37 fn from(value: FifoThroughputLimit) -> Self {
38 match value {
39 FifoThroughputLimit::PerQueue => "perQueue".to_string(),
40 FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
41 }
42 }
43}
44
45type_state!(QueueBuilderState, StartState, StandardState, FifoState,);
46
47pub struct QueueBuilder<T: QueueBuilderState> {
77 state: PhantomData<T>,
78 id: Id,
79 queue_name: Option<String>,
80 delay_seconds: Option<u32>,
81 maximum_message_size: Option<u32>,
82 message_retention_period: Option<u32>,
83 receive_message_wait_time_seconds: Option<u32>,
84 visibility_timeout: Option<u32>,
85 content_based_deduplication: Option<bool>,
86 deduplication_scope: Option<String>,
87 fifo_throughput_limit: Option<String>,
88 sqs_managed_sse_enabled: Option<bool>,
89 redrive_policy: Option<RedrivePolicy>,
90 redrive_allow_policy: Option<Value>,
91 queue_policy_doc: Option<PolicyDocument>,
92 kms_data_key_reuse_period_seconds: Option<u32>,
93 kms_master_key_id: Option<Value>,
94}
95
96impl QueueBuilder<StartState> {
97 pub fn new(id: &str) -> Self {
102 Self {
103 state: Default::default(),
104 id: Id(id.to_string()),
105 queue_name: None,
106 delay_seconds: None,
107 maximum_message_size: None,
108 message_retention_period: None,
109 receive_message_wait_time_seconds: None,
110 visibility_timeout: None,
111 content_based_deduplication: None,
112 deduplication_scope: None,
113 fifo_throughput_limit: None,
114 sqs_managed_sse_enabled: None,
115 redrive_policy: None,
116 redrive_allow_policy: None,
117 queue_policy_doc: None,
118 kms_data_key_reuse_period_seconds: None,
119 kms_master_key_id: None,
120 }
121 }
122
123 pub fn standard_queue(self) -> QueueBuilder<StandardState> {
124 QueueBuilder {
125 state: Default::default(),
126 id: self.id,
127 queue_name: self.queue_name,
128 delay_seconds: self.delay_seconds,
129 maximum_message_size: self.maximum_message_size,
130 message_retention_period: self.message_retention_period,
131 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
132 visibility_timeout: self.visibility_timeout,
133 content_based_deduplication: self.content_based_deduplication,
134 deduplication_scope: self.deduplication_scope,
135 fifo_throughput_limit: self.fifo_throughput_limit,
136 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
137 redrive_policy: self.redrive_policy,
138 redrive_allow_policy: self.redrive_allow_policy,
139 queue_policy_doc: self.queue_policy_doc,
140 kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
141 kms_master_key_id: self.kms_master_key_id,
142 }
143 }
144
145 pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
146 QueueBuilder {
147 state: Default::default(),
148 id: self.id,
149 queue_name: self.queue_name,
150 delay_seconds: self.delay_seconds,
151 maximum_message_size: self.maximum_message_size,
152 message_retention_period: self.message_retention_period,
153 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
154 visibility_timeout: self.visibility_timeout,
155 content_based_deduplication: self.content_based_deduplication,
156 deduplication_scope: self.deduplication_scope,
157 fifo_throughput_limit: self.fifo_throughput_limit,
158 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
159 redrive_policy: self.redrive_policy,
160 redrive_allow_policy: self.redrive_allow_policy,
161 queue_policy_doc: self.queue_policy_doc,
162 kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
163 kms_master_key_id: self.kms_master_key_id,
164 }
165 }
166}
167
168impl<T: QueueBuilderState> QueueBuilder<T> {
169 pub fn kms_data_key_reuse_period(self, reuse_period_seconds: KeyReusePeriod) -> Self {
170 Self {
171 kms_data_key_reuse_period_seconds: Some(reuse_period_seconds.0),
172 ..self
173 }
174 }
175
176 pub fn kms_master_key(self, kms_master_key: &KeyRef) -> Self {
177 Self {
178 kms_master_key_id: Some(kms_master_key.get_ref()),
179 ..self
180 }
181 }
182
183 pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
184 Self {
185 delay_seconds: Some(delay.0 as u32),
186 ..self
187 }
188 }
189
190 pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
191 Self {
192 maximum_message_size: Some(size.0),
193 ..self
194 }
195 }
196
197 pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
198 Self {
199 message_retention_period: Some(period.0),
200 ..self
201 }
202 }
203
204 pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
205 Self {
206 receive_message_wait_time_seconds: Some(wait_time.0 as u32),
207 ..self
208 }
209 }
210
211 pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
212 Self {
213 visibility_timeout: Some(timeout.0),
214 ..self
215 }
216 }
217
218 pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
219 Self {
220 sqs_managed_sse_enabled: Some(enabled),
221 ..self
222 }
223 }
224
225 pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
226 Self {
227 redrive_policy: Some(RedrivePolicy {
228 dead_letter_target_arn: dead_letter_target_arn.into(),
229 max_receive_count: max_receive_count.0,
230 }),
231 ..self
232 }
233 }
234
235 pub fn redrive_allow_policy(self, policy: Value) -> Self {
236 Self {
237 redrive_allow_policy: Some(policy),
238 ..self
239 }
240 }
241
242 pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
243 Self {
244 queue_name: Some(name.0),
245 ..self
246 }
247 }
248
249 pub fn queue_policy(self, doc: PolicyDocument) -> Self {
252 Self {
253 queue_policy_doc: Some(doc),
254 ..self
255 }
256 }
257
258 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
259 let properties = QueueProperties {
260 queue_name: self.queue_name,
261 delay_seconds: self.delay_seconds,
262 maximum_message_size: self.maximum_message_size,
263 message_retention_period: self.message_retention_period,
264 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
265 visibility_timeout: self.visibility_timeout,
266 fifo_queue: if fifo { Some(true) } else { None },
267 content_based_deduplication: self.content_based_deduplication,
268 deduplication_scope: self.deduplication_scope,
269 fifo_throughput_limit: self.fifo_throughput_limit,
270 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
271 redrive_policy: self.redrive_policy,
272 redrive_allow_policy: self.redrive_allow_policy,
273 kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
274 kms_master_key_id: self.kms_master_key_id,
275 };
276
277 let resource_id = Resource::generate_id("SqsQueue");
278 let queue_ref = QueueRef::internal_new(self.id.clone(), resource_id.clone());
279
280 if let Some(mut policy) = self.queue_policy_doc {
281 for statement in &mut policy.statements {
282 statement.resource = Some(vec![queue_ref.get_arn()]);
284 }
285 QueuePolicyBuilder::new(Id::generate_id(&self.id, QUEUE_POLICY_ID_SUFFIX), policy, vec![&queue_ref]).build(stack_builder);
286 }
287
288 stack_builder.add_resource(Queue {
289 id: self.id,
290 resource_id,
291 r#type: QueueType::QueueType,
292 properties,
293 });
294
295 queue_ref
296 }
297}
298
299impl QueueBuilder<StandardState> {
300 pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
301 self.build_internal(false, stack_builder)
302 }
303}
304
305impl QueueBuilder<FifoState> {
306 pub fn content_based_deduplication(self, enabled: bool) -> Self {
307 Self {
308 content_based_deduplication: Some(enabled),
309 ..self
310 }
311 }
312
313 pub fn high_throughput_fifo(self) -> Self {
317 Self {
318 deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
319 fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
320 ..self
321 }
322 }
323
324 pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
325 Self {
326 deduplication_scope: Some(scope.into()),
327 ..self
328 }
329 }
330
331 pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
332 Self {
333 fifo_throughput_limit: Some(limit.into()),
334 ..self
335 }
336 }
337
338 pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
342 if let Some(ref name) = self.queue_name
343 && !name.ends_with(FIFO_SUFFIX)
344 {
345 self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
346 }
347 self.build_internal(true, stack_builder)
348 }
349}
350
351pub(crate) struct QueuePolicyBuilder {
352 id: Id,
353 doc: PolicyDocument,
354 queues: Vec<Value>,
355}
356
357impl QueuePolicyBuilder {
358 pub(crate) fn new(id: Id, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
360 Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
361 }
362
363 pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, queues: Vec<Value>) -> Self {
364 Self { id, doc, queues }
365 }
366
367 pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
368 let resource_id = Resource::generate_id("QueuePolicy");
369 stack_builder.add_resource(QueuePolicy {
370 id: self.id.clone(),
371 resource_id: resource_id.clone(),
372 r#type: QueuePolicyType::QueuePolicyType,
373 properties: QueuePolicyProperties {
374 doc: self.doc,
375 queues: self.queues,
376 },
377 });
378
379 QueuePolicyRef::internal_new(self.id, resource_id)
380 }
381}