rusty_cdk_core/sqs/
builder.rs

1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6    DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7    StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::shared::QUEUE_POLICY_ID_SUFFIX;
13use crate::type_state;
14
15const FIFO_SUFFIX: &str = ".fifo";
16
17#[derive(Debug, Clone)]
18pub enum DeduplicationScope {
19    Queue,
20    MessageGroup,
21}
22
23impl From<DeduplicationScope> for String {
24    fn from(value: DeduplicationScope) -> Self {
25        match value {
26            DeduplicationScope::Queue => "queue".to_string(),
27            DeduplicationScope::MessageGroup => "messageGroup".to_string(),
28        }
29    }
30}
31
32#[derive(Debug, Clone)]
33pub enum FifoThroughputLimit {
34    PerQueue,
35    PerMessageGroupId,
36}
37
38impl From<FifoThroughputLimit> for String {
39    fn from(value: FifoThroughputLimit) -> Self {
40        match value {
41            FifoThroughputLimit::PerQueue => "perQueue".to_string(),
42            FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
43        }
44    }
45}
46
47type_state!(
48    QueueBuilderState,
49    StartState,
50    StandardState,
51    FifoState,
52);
53
54/// Builder for SQS queues.
55///
56/// Supports both standard and FIFO queues. FIFO queues have additional configuration
57/// options for deduplication and throughput.
58///
59/// # Example
60///
61/// ```rust
62/// use rusty_cdk_core::stack::StackBuilder;
63/// use rusty_cdk_core::sqs::QueueBuilder;
64/// use rusty_cdk_core::wrappers::*;
65/// use rusty_cdk_macros::{delay_seconds, message_retention_period, visibility_timeout};
66///
67/// let mut stack_builder = StackBuilder::new();
68///
69/// // Create a standard queue
70/// let standard_queue = QueueBuilder::new("standard-queue")
71///     .standard_queue()
72///     .visibility_timeout(visibility_timeout!(60))
73///     .build(&mut stack_builder);
74/// 
75/// // Create a FIFO queue
76/// let queue = QueueBuilder::new("my-queue")
77///     .fifo_queue()
78///     .content_based_deduplication(true)
79///     .delay_seconds(delay_seconds!(30))
80///     .message_retention_period(message_retention_period!(600))
81///     .build(&mut stack_builder);
82/// ```
83pub struct QueueBuilder<T: QueueBuilderState> {
84    state: PhantomData<T>,
85    id: Id,
86    queue_name: Option<String>,
87    delay_seconds: Option<u32>,
88    maximum_message_size: Option<u32>,
89    message_retention_period: Option<u32>,
90    receive_message_wait_time_seconds: Option<u32>,
91    visibility_timeout: Option<u32>,
92    content_based_deduplication: Option<bool>,
93    deduplication_scope: Option<String>,
94    fifo_throughput_limit: Option<String>,
95    sqs_managed_sse_enabled: Option<bool>,
96    redrive_policy: Option<RedrivePolicy>,
97    redrive_allow_policy: Option<Value>,
98    queue_policy_doc: Option<PolicyDocument>,
99}
100
101impl QueueBuilder<StartState> {
102    /// Creates a new SQS queue builder.
103    ///
104    /// # Arguments
105    /// * `id` - Unique identifier for the queue
106    pub fn new(id: &str) -> Self {
107        Self {
108            state: Default::default(),
109            id: Id(id.to_string()),
110            queue_name: None,
111            delay_seconds: None,
112            maximum_message_size: None,
113            message_retention_period: None,
114            receive_message_wait_time_seconds: None,
115            visibility_timeout: None,
116            content_based_deduplication: None,
117            deduplication_scope: None,
118            fifo_throughput_limit: None,
119            sqs_managed_sse_enabled: None,
120            redrive_policy: None,
121            redrive_allow_policy: None,
122            queue_policy_doc: None,
123        }
124    }
125
126    pub fn standard_queue(self) -> QueueBuilder<StandardState> {
127        QueueBuilder {
128            state: Default::default(),
129            id: self.id,
130            queue_name: self.queue_name,
131            delay_seconds: self.delay_seconds,
132            maximum_message_size: self.maximum_message_size,
133            message_retention_period: self.message_retention_period,
134            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
135            visibility_timeout: self.visibility_timeout,
136            content_based_deduplication: self.content_based_deduplication,
137            deduplication_scope: self.deduplication_scope,
138            fifo_throughput_limit: self.fifo_throughput_limit,
139            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
140            redrive_policy: self.redrive_policy,
141            redrive_allow_policy: self.redrive_allow_policy,
142            queue_policy_doc: self.queue_policy_doc,
143        }
144    }
145
146    pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
147        QueueBuilder {
148            state: Default::default(),
149            id: self.id,
150            queue_name: self.queue_name,
151            delay_seconds: self.delay_seconds,
152            maximum_message_size: self.maximum_message_size,
153            message_retention_period: self.message_retention_period,
154            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
155            visibility_timeout: self.visibility_timeout,
156            content_based_deduplication: self.content_based_deduplication,
157            deduplication_scope: self.deduplication_scope,
158            fifo_throughput_limit: self.fifo_throughput_limit,
159            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
160            redrive_policy: self.redrive_policy,
161            redrive_allow_policy: self.redrive_allow_policy,
162            queue_policy_doc: self.queue_policy_doc,
163        }
164    }
165}
166
167impl<T: QueueBuilderState> QueueBuilder<T> {
168    pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
169        Self {
170            delay_seconds: Some(delay.0 as u32),
171            ..self
172        }
173    }
174
175    pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
176        Self {
177            maximum_message_size: Some(size.0),
178            ..self
179        }
180    }
181
182    pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
183        Self {
184            message_retention_period: Some(period.0),
185            ..self
186        }
187    }
188
189    pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
190        Self {
191            receive_message_wait_time_seconds: Some(wait_time.0 as u32),
192            ..self
193        }
194    }
195
196    pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
197        Self {
198            visibility_timeout: Some(timeout.0),
199            ..self
200        }
201    }
202
203    pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
204        Self {
205            sqs_managed_sse_enabled: Some(enabled),
206            ..self
207        }
208    }
209
210    pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
211        Self {
212            redrive_policy: Some(RedrivePolicy {
213                dead_letter_target_arn: dead_letter_target_arn.into(),
214                max_receive_count: max_receive_count.0,
215            }),
216            ..self
217        }
218    }
219
220    pub fn redrive_allow_policy(self, policy: Value) -> Self {
221        Self {
222            redrive_allow_policy: Some(policy),
223            ..self
224        }
225    }
226
227    pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
228        Self {
229            queue_name: Some(name.0),
230            ..self
231        }
232    }
233    
234    /// Adds an SQS Queue Policy for this queue.
235    /// The code will automatically set the `resources` section of the `PolicyDocument` to the ARN of this queue, so there's no need to pass that in.
236    pub fn queue_policy(self, doc: PolicyDocument) -> Self {
237        Self {
238            queue_policy_doc: Some(doc),
239            ..self
240        }
241    }
242
243    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
244        let properties = QueueProperties {
245            queue_name: self.queue_name,
246            delay_seconds: self.delay_seconds,
247            maximum_message_size: self.maximum_message_size,
248            message_retention_period: self.message_retention_period,
249            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
250            visibility_timeout: self.visibility_timeout,
251            fifo_queue: if fifo { Some(true) } else { None },
252            content_based_deduplication: self.content_based_deduplication,
253            deduplication_scope: self.deduplication_scope,
254            fifo_throughput_limit: self.fifo_throughput_limit,
255            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
256            redrive_policy: self.redrive_policy,
257            redrive_allow_policy: self.redrive_allow_policy,
258        };
259
260        let resource_id = Resource::generate_id("SqsQueue");
261        let queue_ref = QueueRef::new(self.id.clone(), resource_id.clone());
262        
263        if let Some(mut policy) = self.queue_policy_doc {
264            for statement in &mut policy.statements {
265                // point the statements of this policy to the queue
266                statement.resource = Some(vec![queue_ref.get_arn()]);
267            }
268            QueuePolicyBuilder::new(Id::generate_id(&self.id, QUEUE_POLICY_ID_SUFFIX), policy, vec![&queue_ref]).build(stack_builder);
269        }
270
271        stack_builder.add_resource(Queue {
272            id: self.id,
273            resource_id,
274            r#type: "AWS::SQS::Queue".to_string(),
275            properties,
276        });
277
278        queue_ref
279    }
280}
281
282impl QueueBuilder<StandardState> {
283    pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
284        self.build_internal(false, stack_builder)
285    }
286}
287
288impl QueueBuilder<FifoState> {
289    pub fn content_based_deduplication(self, enabled: bool) -> Self {
290        Self {
291            content_based_deduplication: Some(enabled),
292            ..self
293        }
294    }
295
296    /// Enables high throughput mode for FIFO queues.
297    ///
298    /// Sets deduplication scope to MessageGroup and throughput limit to PerMessageGroupId.
299    pub fn high_throughput_fifo(self) -> Self {
300        Self {
301            deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
302            fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
303            ..self
304        }
305    }
306
307    pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
308        Self {
309            deduplication_scope: Some(scope.into()),
310            ..self
311        }
312    }
313
314    pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
315        Self {
316            fifo_throughput_limit: Some(limit.into()),
317            ..self
318        }
319    }
320
321    /// Builds the FIFO queue and adds it to the stack.
322    ///
323    /// Automatically appends the required ".fifo" suffix to the queue name if not already present.
324    pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
325        if let Some(ref name) = self.queue_name
326            && !name.ends_with(FIFO_SUFFIX) {
327                self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
328            }
329        self.build_internal(true, stack_builder)
330    }
331}
332
333pub(crate) struct QueuePolicyBuilder {
334    id: Id,
335    doc: PolicyDocument,
336    queues: Vec<Value>
337}
338
339impl QueuePolicyBuilder {
340    /// Use the `queue_policy` method of `QueueBuilder` to add a queue policy to a queue
341    pub(crate) fn new(id: Id, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
342        Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
343    }
344
345    pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, queues: Vec<Value>) -> Self {
346        Self {
347            id,
348            doc,
349            queues,
350        }
351    }
352
353    pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
354        let resource_id = Resource::generate_id("QueuePolicy");
355        stack_builder.add_resource(QueuePolicy {
356            id: self.id.clone(),
357            resource_id: resource_id.clone(),
358            r#type: "AWS::SQS::QueuePolicy".to_string(),
359            properties: QueuePolicyProperties {
360                doc: self.doc,
361                queues: self.queues,
362            },
363        });
364
365        QueuePolicyRef::new(self.id, resource_id)
366    }
367}