1use crate::shared::Id;
2use crate::sqs::{Queue, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6 DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7 StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::type_state;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15pub enum DeduplicationScope {
16 Queue,
17 MessageGroup,
18}
19
20impl From<DeduplicationScope> for String {
21 fn from(value: DeduplicationScope) -> Self {
22 match value {
23 DeduplicationScope::Queue => "queue".to_string(),
24 DeduplicationScope::MessageGroup => "messageGroup".to_string(),
25 }
26 }
27}
28
29pub enum FifoThroughputLimit {
30 PerQueue,
31 PerMessageGroupId,
32}
33
34impl From<FifoThroughputLimit> for String {
35 fn from(value: FifoThroughputLimit) -> Self {
36 match value {
37 FifoThroughputLimit::PerQueue => "perQueue".to_string(),
38 FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
39 }
40 }
41}
42
43type_state!(
44 QueueBuilderState,
45 StartState,
46 StandardState,
47 FifoState,
48);
49
50pub struct QueueBuilder<T: QueueBuilderState> {
80 state: PhantomData<T>,
81 id: Id,
82 queue_name: Option<String>,
83 delay_seconds: Option<u32>,
84 maximum_message_size: Option<u32>,
85 message_retention_period: Option<u32>,
86 receive_message_wait_time_seconds: Option<u32>,
87 visibility_timeout: Option<u32>,
88 content_based_deduplication: Option<bool>,
89 deduplication_scope: Option<String>,
90 fifo_throughput_limit: Option<String>,
91 sqs_managed_sse_enabled: Option<bool>,
92 redrive_policy: Option<RedrivePolicy>,
93 redrive_allow_policy: Option<Value>,
94}
95
96impl QueueBuilder<StartState> {
97 pub fn new(id: &str) -> Self {
102 Self {
103 state: Default::default(),
104 id: Id(id.to_string()),
105 queue_name: None,
106 delay_seconds: None,
107 maximum_message_size: None,
108 message_retention_period: None,
109 receive_message_wait_time_seconds: None,
110 visibility_timeout: None,
111 content_based_deduplication: None,
112 deduplication_scope: None,
113 fifo_throughput_limit: None,
114 sqs_managed_sse_enabled: None,
115 redrive_policy: None,
116 redrive_allow_policy: None,
117 }
118 }
119
120 pub fn standard_queue(self) -> QueueBuilder<StandardState> {
121 QueueBuilder {
122 state: Default::default(),
123 id: self.id,
124 queue_name: self.queue_name,
125 delay_seconds: self.delay_seconds,
126 maximum_message_size: self.maximum_message_size,
127 message_retention_period: self.message_retention_period,
128 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
129 visibility_timeout: self.visibility_timeout,
130 content_based_deduplication: self.content_based_deduplication,
131 deduplication_scope: self.deduplication_scope,
132 fifo_throughput_limit: self.fifo_throughput_limit,
133 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
134 redrive_policy: self.redrive_policy,
135 redrive_allow_policy: self.redrive_allow_policy,
136 }
137 }
138
139 pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
140 QueueBuilder {
141 state: Default::default(),
142 id: self.id,
143 queue_name: self.queue_name,
144 delay_seconds: self.delay_seconds,
145 maximum_message_size: self.maximum_message_size,
146 message_retention_period: self.message_retention_period,
147 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
148 visibility_timeout: self.visibility_timeout,
149 content_based_deduplication: self.content_based_deduplication,
150 deduplication_scope: self.deduplication_scope,
151 fifo_throughput_limit: self.fifo_throughput_limit,
152 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
153 redrive_policy: self.redrive_policy,
154 redrive_allow_policy: self.redrive_allow_policy,
155 }
156 }
157}
158
159impl<T: QueueBuilderState> QueueBuilder<T> {
160 pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
161 Self {
162 delay_seconds: Some(delay.0 as u32),
163 ..self
164 }
165 }
166
167 pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
168 Self {
169 maximum_message_size: Some(size.0),
170 ..self
171 }
172 }
173
174 pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
175 Self {
176 message_retention_period: Some(period.0),
177 ..self
178 }
179 }
180
181 pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
182 Self {
183 receive_message_wait_time_seconds: Some(wait_time.0 as u32),
184 ..self
185 }
186 }
187
188 pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
189 Self {
190 visibility_timeout: Some(timeout.0),
191 ..self
192 }
193 }
194
195 pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
196 Self {
197 sqs_managed_sse_enabled: Some(enabled),
198 ..self
199 }
200 }
201
202 pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
203 Self {
204 redrive_policy: Some(RedrivePolicy {
205 dead_letter_target_arn: dead_letter_target_arn.into(),
206 max_receive_count: max_receive_count.0,
207 }),
208 ..self
209 }
210 }
211
212 pub fn redrive_allow_policy(self, policy: Value) -> Self {
213 Self {
214 redrive_allow_policy: Some(policy),
215 ..self
216 }
217 }
218
219 pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
220 Self {
221 queue_name: Some(name.0),
222 ..self
223 }
224 }
225
226 fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
227 let properties = QueueProperties {
228 queue_name: self.queue_name,
229 delay_seconds: self.delay_seconds,
230 maximum_message_size: self.maximum_message_size,
231 message_retention_period: self.message_retention_period,
232 receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
233 visibility_timeout: self.visibility_timeout,
234 fifo_queue: if fifo { Some(true) } else { None },
235 content_based_deduplication: self.content_based_deduplication,
236 deduplication_scope: self.deduplication_scope,
237 fifo_throughput_limit: self.fifo_throughput_limit,
238 sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
239 redrive_policy: self.redrive_policy,
240 redrive_allow_policy: self.redrive_allow_policy,
241 };
242
243 let resource_id = Resource::generate_id("SqsQueue");
244 stack_builder.add_resource(Queue {
245 id: self.id,
246 resource_id: resource_id.clone(),
247 r#type: "AWS::SQS::Queue".to_string(),
248 properties,
249 });
250
251 QueueRef::new(resource_id)
252 }
253}
254
255impl QueueBuilder<StandardState> {
256 pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
257 self.build_internal(false, stack_builder)
258 }
259}
260
261impl QueueBuilder<FifoState> {
262 pub fn content_based_deduplication(self, enabled: bool) -> Self {
263 Self {
264 content_based_deduplication: Some(enabled),
265 ..self
266 }
267 }
268
269 pub fn high_throughput_fifo(self) -> Self {
273 Self {
274 deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
275 fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
276 ..self
277 }
278 }
279
280 pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
281 Self {
282 deduplication_scope: Some(scope.into()),
283 ..self
284 }
285 }
286
287 pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
288 Self {
289 fifo_throughput_limit: Some(limit.into()),
290 ..self
291 }
292 }
293
294 pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
298 if let Some(ref name) = self.queue_name
299 && !name.ends_with(FIFO_SUFFIX) {
300 self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
301 }
302 self.build_internal(true, stack_builder)
303 }
304}