1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6 DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7 StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::shared::QUEUE_POLICY_ID_SUFFIX;
13use crate::type_state;
14
15const FIFO_SUFFIX: &str = ".fifo";
16
17#[derive(Debug, Clone)]
18pub enum DeduplicationScope {
19 Queue,
20 MessageGroup,
21}
22
23impl From<DeduplicationScope> for String {
24 fn from(value: DeduplicationScope) -> Self {
25 match value {
26 DeduplicationScope::Queue => "queue".to_string(),
27 DeduplicationScope::MessageGroup => "messageGroup".to_string(),
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
33pub enum FifoThroughputLimit {
34 PerQueue,
35 PerMessageGroupId,
36}
37
38impl From<FifoThroughputLimit> for String {
39 fn from(value: FifoThroughputLimit) -> Self {
40 match value {
41 FifoThroughputLimit::PerQueue => "perQueue".to_string(),
42 FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
43 }
44 }
45}
46
47type_state!(
48 QueueBuilderState,
49 StartState,
50 StandardState,
51 FifoState,
52);
53
54pub struct QueueBuilder<T: QueueBuilderState> {
84 state: PhantomData<T>,
85 id: Id,
86 queue_name: Option<String>,
87 delay_seconds: Option<u32>,
88 maximum_message_size: Option<u32>,
89 message_retention_period: Option<u32>,
90 receive_message_wait_time_seconds: Option<u32>,
91 visibility_timeout: Option<u32>,
92 content_based_deduplication: Option<bool>,
93 deduplication_scope: Option<String>,
94 fifo_throughput_limit: Option<String>,
95 sqs_managed_sse_enabled: Option<bool>,
96 redrive_policy: Option<RedrivePolicy>,
97 redrive_allow_policy: Option<Value>,
98 queue_policy_doc: Option<PolicyDocument>,
99}
100
101impl QueueBuilder<StartState> {
102 pub fn new(id: &str) -> Self {
107 Self {
108 state: Default::default(),
109 id: Id(id.to_string()),
110 queue_name: None,
111 delay_seconds: None,
112 maximum_message_size: None,
113 message_retention_period: None,
114 receive_message_wait_time_seconds: None,
115 visibility_timeout: None,
116 content_based_deduplication: None,
117 deduplication_scope: None,
118 fifo_throughput_limit: None,
119 sqs_managed_sse_enabled: None,
120 redrive_policy: None,
121 redrive_allow_policy: None,
122 queue_policy_doc: None,
123 }
124 }
125
126 pub fn standard_queue(self) -> QueueBuilder<StandardState> {
127 QueueBuilder {
128 state: Default::default(),
129 id: self.id,
130 queue_name: self.queue_name,
131 delay_seconds: self.delay_seconds,
132 maximum_message_size: self.maximum_message_size,
133 message_retention_period: self.message_retention_period,
134 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
135 visibility_timeout: self.visibility_timeout,
136 content_based_deduplication: self.content_based_deduplication,
137 deduplication_scope: self.deduplication_scope,
138 fifo_throughput_limit: self.fifo_throughput_limit,
139 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
140 redrive_policy: self.redrive_policy,
141 redrive_allow_policy: self.redrive_allow_policy,
142 queue_policy_doc: self.queue_policy_doc,
143 }
144 }
145
146 pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
147 QueueBuilder {
148 state: Default::default(),
149 id: self.id,
150 queue_name: self.queue_name,
151 delay_seconds: self.delay_seconds,
152 maximum_message_size: self.maximum_message_size,
153 message_retention_period: self.message_retention_period,
154 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
155 visibility_timeout: self.visibility_timeout,
156 content_based_deduplication: self.content_based_deduplication,
157 deduplication_scope: self.deduplication_scope,
158 fifo_throughput_limit: self.fifo_throughput_limit,
159 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
160 redrive_policy: self.redrive_policy,
161 redrive_allow_policy: self.redrive_allow_policy,
162 queue_policy_doc: self.queue_policy_doc,
163 }
164 }
165}
166
167impl<T: QueueBuilderState> QueueBuilder<T> {
168 pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
169 Self {
170 delay_seconds: Some(delay.0 as u32),
171 ..self
172 }
173 }
174
175 pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
176 Self {
177 maximum_message_size: Some(size.0),
178 ..self
179 }
180 }
181
182 pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
183 Self {
184 message_retention_period: Some(period.0),
185 ..self
186 }
187 }
188
189 pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
190 Self {
191 receive_message_wait_time_seconds: Some(wait_time.0 as u32),
192 ..self
193 }
194 }
195
196 pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
197 Self {
198 visibility_timeout: Some(timeout.0),
199 ..self
200 }
201 }
202
203 pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
204 Self {
205 sqs_managed_sse_enabled: Some(enabled),
206 ..self
207 }
208 }
209
210 pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
211 Self {
212 redrive_policy: Some(RedrivePolicy {
213 dead_letter_target_arn: dead_letter_target_arn.into(),
214 max_receive_count: max_receive_count.0,
215 }),
216 ..self
217 }
218 }
219
220 pub fn redrive_allow_policy(self, policy: Value) -> Self {
221 Self {
222 redrive_allow_policy: Some(policy),
223 ..self
224 }
225 }
226
227 pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
228 Self {
229 queue_name: Some(name.0),
230 ..self
231 }
232 }
233
234 pub fn queue_policy(self, doc: PolicyDocument) -> Self {
237 Self {
238 queue_policy_doc: Some(doc),
239 ..self
240 }
241 }
242
243 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
244 let properties = QueueProperties {
245 queue_name: self.queue_name,
246 delay_seconds: self.delay_seconds,
247 maximum_message_size: self.maximum_message_size,
248 message_retention_period: self.message_retention_period,
249 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
250 visibility_timeout: self.visibility_timeout,
251 fifo_queue: if fifo { Some(true) } else { None },
252 content_based_deduplication: self.content_based_deduplication,
253 deduplication_scope: self.deduplication_scope,
254 fifo_throughput_limit: self.fifo_throughput_limit,
255 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
256 redrive_policy: self.redrive_policy,
257 redrive_allow_policy: self.redrive_allow_policy,
258 };
259
260 let resource_id = Resource::generate_id("SqsQueue");
261 let queue_ref = QueueRef::new(self.id.clone(), resource_id.clone());
262
263 if let Some(mut policy) = self.queue_policy_doc {
264 for statement in &mut policy.statements {
265 statement.resource = Some(vec![queue_ref.get_arn()]);
267 }
268 QueuePolicyBuilder::new(Id::generate_id(&self.id, QUEUE_POLICY_ID_SUFFIX), policy, vec![&queue_ref]).build(stack_builder);
269 }
270
271 stack_builder.add_resource(Queue {
272 id: self.id,
273 resource_id,
274 r#type: "AWS::SQS::Queue".to_string(),
275 properties,
276 });
277
278 queue_ref
279 }
280}
281
282impl QueueBuilder<StandardState> {
283 pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
284 self.build_internal(false, stack_builder)
285 }
286}
287
288impl QueueBuilder<FifoState> {
289 pub fn content_based_deduplication(self, enabled: bool) -> Self {
290 Self {
291 content_based_deduplication: Some(enabled),
292 ..self
293 }
294 }
295
296 pub fn high_throughput_fifo(self) -> Self {
300 Self {
301 deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
302 fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
303 ..self
304 }
305 }
306
307 pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
308 Self {
309 deduplication_scope: Some(scope.into()),
310 ..self
311 }
312 }
313
314 pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
315 Self {
316 fifo_throughput_limit: Some(limit.into()),
317 ..self
318 }
319 }
320
321 pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
325 if let Some(ref name) = self.queue_name
326 && !name.ends_with(FIFO_SUFFIX) {
327 self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
328 }
329 self.build_internal(true, stack_builder)
330 }
331}
332
333pub(crate) struct QueuePolicyBuilder {
334 id: Id,
335 doc: PolicyDocument,
336 queues: Vec<Value>
337}
338
339impl QueuePolicyBuilder {
340 pub(crate) fn new(id: Id, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
342 Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
343 }
344
345 pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, queues: Vec<Value>) -> Self {
346 Self {
347 id,
348 doc,
349 queues,
350 }
351 }
352
353 pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
354 let resource_id = Resource::generate_id("QueuePolicy");
355 stack_builder.add_resource(QueuePolicy {
356 id: self.id.clone(),
357 resource_id: resource_id.clone(),
358 r#type: "AWS::SQS::QueuePolicy".to_string(),
359 properties: QueuePolicyProperties {
360 doc: self.doc,
361 queues: self.queues,
362 },
363 });
364
365 QueuePolicyRef::new(self.id, resource_id)
366 }
367}