1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6 DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7 StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::type_state;
13
14const FIFO_SUFFIX: &str = ".fifo";
15
16#[derive(Debug, Clone)]
17pub enum DeduplicationScope {
18 Queue,
19 MessageGroup,
20}
21
22impl From<DeduplicationScope> for String {
23 fn from(value: DeduplicationScope) -> Self {
24 match value {
25 DeduplicationScope::Queue => "queue".to_string(),
26 DeduplicationScope::MessageGroup => "messageGroup".to_string(),
27 }
28 }
29}
30
31#[derive(Debug, Clone)]
32pub enum FifoThroughputLimit {
33 PerQueue,
34 PerMessageGroupId,
35}
36
37impl From<FifoThroughputLimit> for String {
38 fn from(value: FifoThroughputLimit) -> Self {
39 match value {
40 FifoThroughputLimit::PerQueue => "perQueue".to_string(),
41 FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
42 }
43 }
44}
45
46type_state!(
47 QueueBuilderState,
48 StartState,
49 StandardState,
50 FifoState,
51);
52
53pub struct QueueBuilder<T: QueueBuilderState> {
83 state: PhantomData<T>,
84 id: Id,
85 queue_name: Option<String>,
86 delay_seconds: Option<u32>,
87 maximum_message_size: Option<u32>,
88 message_retention_period: Option<u32>,
89 receive_message_wait_time_seconds: Option<u32>,
90 visibility_timeout: Option<u32>,
91 content_based_deduplication: Option<bool>,
92 deduplication_scope: Option<String>,
93 fifo_throughput_limit: Option<String>,
94 sqs_managed_sse_enabled: Option<bool>,
95 redrive_policy: Option<RedrivePolicy>,
96 redrive_allow_policy: Option<Value>,
97}
98
99impl QueueBuilder<StartState> {
100 pub fn new(id: &str) -> Self {
105 Self {
106 state: Default::default(),
107 id: Id(id.to_string()),
108 queue_name: None,
109 delay_seconds: None,
110 maximum_message_size: None,
111 message_retention_period: None,
112 receive_message_wait_time_seconds: None,
113 visibility_timeout: None,
114 content_based_deduplication: None,
115 deduplication_scope: None,
116 fifo_throughput_limit: None,
117 sqs_managed_sse_enabled: None,
118 redrive_policy: None,
119 redrive_allow_policy: None,
120 }
121 }
122
123 pub fn standard_queue(self) -> QueueBuilder<StandardState> {
124 QueueBuilder {
125 state: Default::default(),
126 id: self.id,
127 queue_name: self.queue_name,
128 delay_seconds: self.delay_seconds,
129 maximum_message_size: self.maximum_message_size,
130 message_retention_period: self.message_retention_period,
131 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
132 visibility_timeout: self.visibility_timeout,
133 content_based_deduplication: self.content_based_deduplication,
134 deduplication_scope: self.deduplication_scope,
135 fifo_throughput_limit: self.fifo_throughput_limit,
136 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
137 redrive_policy: self.redrive_policy,
138 redrive_allow_policy: self.redrive_allow_policy,
139 }
140 }
141
142 pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
143 QueueBuilder {
144 state: Default::default(),
145 id: self.id,
146 queue_name: self.queue_name,
147 delay_seconds: self.delay_seconds,
148 maximum_message_size: self.maximum_message_size,
149 message_retention_period: self.message_retention_period,
150 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
151 visibility_timeout: self.visibility_timeout,
152 content_based_deduplication: self.content_based_deduplication,
153 deduplication_scope: self.deduplication_scope,
154 fifo_throughput_limit: self.fifo_throughput_limit,
155 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
156 redrive_policy: self.redrive_policy,
157 redrive_allow_policy: self.redrive_allow_policy,
158 }
159 }
160}
161
162impl<T: QueueBuilderState> QueueBuilder<T> {
163 pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
164 Self {
165 delay_seconds: Some(delay.0 as u32),
166 ..self
167 }
168 }
169
170 pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
171 Self {
172 maximum_message_size: Some(size.0),
173 ..self
174 }
175 }
176
177 pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
178 Self {
179 message_retention_period: Some(period.0),
180 ..self
181 }
182 }
183
184 pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
185 Self {
186 receive_message_wait_time_seconds: Some(wait_time.0 as u32),
187 ..self
188 }
189 }
190
191 pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
192 Self {
193 visibility_timeout: Some(timeout.0),
194 ..self
195 }
196 }
197
198 pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
199 Self {
200 sqs_managed_sse_enabled: Some(enabled),
201 ..self
202 }
203 }
204
205 pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
206 Self {
207 redrive_policy: Some(RedrivePolicy {
208 dead_letter_target_arn: dead_letter_target_arn.into(),
209 max_receive_count: max_receive_count.0,
210 }),
211 ..self
212 }
213 }
214
215 pub fn redrive_allow_policy(self, policy: Value) -> Self {
216 Self {
217 redrive_allow_policy: Some(policy),
218 ..self
219 }
220 }
221
222 pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
223 Self {
224 queue_name: Some(name.0),
225 ..self
226 }
227 }
228
229 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
230 let properties = QueueProperties {
231 queue_name: self.queue_name,
232 delay_seconds: self.delay_seconds,
233 maximum_message_size: self.maximum_message_size,
234 message_retention_period: self.message_retention_period,
235 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
236 visibility_timeout: self.visibility_timeout,
237 fifo_queue: if fifo { Some(true) } else { None },
238 content_based_deduplication: self.content_based_deduplication,
239 deduplication_scope: self.deduplication_scope,
240 fifo_throughput_limit: self.fifo_throughput_limit,
241 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
242 redrive_policy: self.redrive_policy,
243 redrive_allow_policy: self.redrive_allow_policy,
244 };
245
246 let resource_id = Resource::generate_id("SqsQueue");
247 stack_builder.add_resource(Queue {
248 id: self.id,
249 resource_id: resource_id.clone(),
250 r#type: "AWS::SQS::Queue".to_string(),
251 properties,
252 });
253
254 QueueRef::new(resource_id)
255 }
256}
257
258impl QueueBuilder<StandardState> {
259 pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
260 self.build_internal(false, stack_builder)
261 }
262}
263
264impl QueueBuilder<FifoState> {
265 pub fn content_based_deduplication(self, enabled: bool) -> Self {
266 Self {
267 content_based_deduplication: Some(enabled),
268 ..self
269 }
270 }
271
272 pub fn high_throughput_fifo(self) -> Self {
276 Self {
277 deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
278 fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
279 ..self
280 }
281 }
282
283 pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
284 Self {
285 deduplication_scope: Some(scope.into()),
286 ..self
287 }
288 }
289
290 pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
291 Self {
292 fifo_throughput_limit: Some(limit.into()),
293 ..self
294 }
295 }
296
297 pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
301 if let Some(ref name) = self.queue_name
302 && !name.ends_with(FIFO_SUFFIX) {
303 self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
304 }
305 self.build_internal(true, stack_builder)
306 }
307}
308
309pub struct QueuePolicyBuilder {
310 id: Id,
311 doc: PolicyDocument,
312 queues: Vec<Value>
313}
314
315impl QueuePolicyBuilder {
316 pub fn new(id: &str, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
328 Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
329 }
330
331 pub(crate) fn new_with_values(id: &str, doc: PolicyDocument, queues: Vec<Value>) -> Self {
332 Self {
333 id: Id(id.to_string()),
334 doc,
335 queues,
336 }
337 }
338
339 pub fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
340 let resource_id = Resource::generate_id("QueuePolicy");
341 stack_builder.add_resource(QueuePolicy {
342 id: self.id.clone(),
343 resource_id: resource_id.clone(),
344 r#type: "AWS::SQS::QueuePolicy".to_string(),
345 properties: QueuePolicyProperties {
346 doc: self.doc,
347 queues: self.queues,
348 },
349 });
350
351 QueuePolicyRef::new(self.id, resource_id)
352 }
353}