1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6 DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7 StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::type_state;
13
14const FIFO_SUFFIX: &str = ".fifo";
15
16pub enum DeduplicationScope {
17 Queue,
18 MessageGroup,
19}
20
21impl From<DeduplicationScope> for String {
22 fn from(value: DeduplicationScope) -> Self {
23 match value {
24 DeduplicationScope::Queue => "queue".to_string(),
25 DeduplicationScope::MessageGroup => "messageGroup".to_string(),
26 }
27 }
28}
29
30pub enum FifoThroughputLimit {
31 PerQueue,
32 PerMessageGroupId,
33}
34
35impl From<FifoThroughputLimit> for String {
36 fn from(value: FifoThroughputLimit) -> Self {
37 match value {
38 FifoThroughputLimit::PerQueue => "perQueue".to_string(),
39 FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
40 }
41 }
42}
43
44type_state!(
45 QueueBuilderState,
46 StartState,
47 StandardState,
48 FifoState,
49);
50
51pub struct QueueBuilder<T: QueueBuilderState> {
81 state: PhantomData<T>,
82 id: Id,
83 queue_name: Option<String>,
84 delay_seconds: Option<u32>,
85 maximum_message_size: Option<u32>,
86 message_retention_period: Option<u32>,
87 receive_message_wait_time_seconds: Option<u32>,
88 visibility_timeout: Option<u32>,
89 content_based_deduplication: Option<bool>,
90 deduplication_scope: Option<String>,
91 fifo_throughput_limit: Option<String>,
92 sqs_managed_sse_enabled: Option<bool>,
93 redrive_policy: Option<RedrivePolicy>,
94 redrive_allow_policy: Option<Value>,
95}
96
97impl QueueBuilder<StartState> {
98 pub fn new(id: &str) -> Self {
103 Self {
104 state: Default::default(),
105 id: Id(id.to_string()),
106 queue_name: None,
107 delay_seconds: None,
108 maximum_message_size: None,
109 message_retention_period: None,
110 receive_message_wait_time_seconds: None,
111 visibility_timeout: None,
112 content_based_deduplication: None,
113 deduplication_scope: None,
114 fifo_throughput_limit: None,
115 sqs_managed_sse_enabled: None,
116 redrive_policy: None,
117 redrive_allow_policy: None,
118 }
119 }
120
121 pub fn standard_queue(self) -> QueueBuilder<StandardState> {
122 QueueBuilder {
123 state: Default::default(),
124 id: self.id,
125 queue_name: self.queue_name,
126 delay_seconds: self.delay_seconds,
127 maximum_message_size: self.maximum_message_size,
128 message_retention_period: self.message_retention_period,
129 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
130 visibility_timeout: self.visibility_timeout,
131 content_based_deduplication: self.content_based_deduplication,
132 deduplication_scope: self.deduplication_scope,
133 fifo_throughput_limit: self.fifo_throughput_limit,
134 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
135 redrive_policy: self.redrive_policy,
136 redrive_allow_policy: self.redrive_allow_policy,
137 }
138 }
139
140 pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
141 QueueBuilder {
142 state: Default::default(),
143 id: self.id,
144 queue_name: self.queue_name,
145 delay_seconds: self.delay_seconds,
146 maximum_message_size: self.maximum_message_size,
147 message_retention_period: self.message_retention_period,
148 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
149 visibility_timeout: self.visibility_timeout,
150 content_based_deduplication: self.content_based_deduplication,
151 deduplication_scope: self.deduplication_scope,
152 fifo_throughput_limit: self.fifo_throughput_limit,
153 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
154 redrive_policy: self.redrive_policy,
155 redrive_allow_policy: self.redrive_allow_policy,
156 }
157 }
158}
159
160impl<T: QueueBuilderState> QueueBuilder<T> {
161 pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
162 Self {
163 delay_seconds: Some(delay.0 as u32),
164 ..self
165 }
166 }
167
168 pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
169 Self {
170 maximum_message_size: Some(size.0),
171 ..self
172 }
173 }
174
175 pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
176 Self {
177 message_retention_period: Some(period.0),
178 ..self
179 }
180 }
181
182 pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
183 Self {
184 receive_message_wait_time_seconds: Some(wait_time.0 as u32),
185 ..self
186 }
187 }
188
189 pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
190 Self {
191 visibility_timeout: Some(timeout.0),
192 ..self
193 }
194 }
195
196 pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
197 Self {
198 sqs_managed_sse_enabled: Some(enabled),
199 ..self
200 }
201 }
202
203 pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
204 Self {
205 redrive_policy: Some(RedrivePolicy {
206 dead_letter_target_arn: dead_letter_target_arn.into(),
207 max_receive_count: max_receive_count.0,
208 }),
209 ..self
210 }
211 }
212
213 pub fn redrive_allow_policy(self, policy: Value) -> Self {
214 Self {
215 redrive_allow_policy: Some(policy),
216 ..self
217 }
218 }
219
220 pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
221 Self {
222 queue_name: Some(name.0),
223 ..self
224 }
225 }
226
227 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
228 let properties = QueueProperties {
229 queue_name: self.queue_name,
230 delay_seconds: self.delay_seconds,
231 maximum_message_size: self.maximum_message_size,
232 message_retention_period: self.message_retention_period,
233 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
234 visibility_timeout: self.visibility_timeout,
235 fifo_queue: if fifo { Some(true) } else { None },
236 content_based_deduplication: self.content_based_deduplication,
237 deduplication_scope: self.deduplication_scope,
238 fifo_throughput_limit: self.fifo_throughput_limit,
239 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
240 redrive_policy: self.redrive_policy,
241 redrive_allow_policy: self.redrive_allow_policy,
242 };
243
244 let resource_id = Resource::generate_id("SqsQueue");
245 stack_builder.add_resource(Queue {
246 id: self.id,
247 resource_id: resource_id.clone(),
248 r#type: "AWS::SQS::Queue".to_string(),
249 properties,
250 });
251
252 QueueRef::new(resource_id)
253 }
254}
255
256impl QueueBuilder<StandardState> {
257 pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
258 self.build_internal(false, stack_builder)
259 }
260}
261
262impl QueueBuilder<FifoState> {
263 pub fn content_based_deduplication(self, enabled: bool) -> Self {
264 Self {
265 content_based_deduplication: Some(enabled),
266 ..self
267 }
268 }
269
270 pub fn high_throughput_fifo(self) -> Self {
274 Self {
275 deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
276 fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
277 ..self
278 }
279 }
280
281 pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
282 Self {
283 deduplication_scope: Some(scope.into()),
284 ..self
285 }
286 }
287
288 pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
289 Self {
290 fifo_throughput_limit: Some(limit.into()),
291 ..self
292 }
293 }
294
295 pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
299 if let Some(ref name) = self.queue_name
300 && !name.ends_with(FIFO_SUFFIX) {
301 self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
302 }
303 self.build_internal(true, stack_builder)
304 }
305}
306
307pub struct QueuePolicyBuilder {
308 id: Id,
309 doc: PolicyDocument,
310 queues: Vec<Value>
311}
312
313impl QueuePolicyBuilder {
314 pub fn new(id: &str, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
326 Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
327 }
328
329 pub(crate) fn new_with_values(id: &str, doc: PolicyDocument, queues: Vec<Value>) -> Self {
330 Self {
331 id: Id(id.to_string()),
332 doc,
333 queues,
334 }
335 }
336
337 pub fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
338 let resource_id = Resource::generate_id("QueuePolicy");
339 stack_builder.add_resource(QueuePolicy {
340 id: self.id.clone(),
341 resource_id: resource_id.clone(),
342 r#type: "AWS::SQS::QueuePolicy".to_string(),
343 properties: QueuePolicyProperties {
344 doc: self.doc,
345 queues: self.queues,
346 },
347 });
348
349 QueuePolicyRef::new(self.id, resource_id)
350 }
351}