rusty_cdk_core/sqs/
builder.rs

1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6    DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7    StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::type_state;
13
14const FIFO_SUFFIX: &str = ".fifo";
15
16#[derive(Debug, Clone)]
17pub enum DeduplicationScope {
18    Queue,
19    MessageGroup,
20}
21
22impl From<DeduplicationScope> for String {
23    fn from(value: DeduplicationScope) -> Self {
24        match value {
25            DeduplicationScope::Queue => "queue".to_string(),
26            DeduplicationScope::MessageGroup => "messageGroup".to_string(),
27        }
28    }
29}
30
31#[derive(Debug, Clone)]
32pub enum FifoThroughputLimit {
33    PerQueue,
34    PerMessageGroupId,
35}
36
37impl From<FifoThroughputLimit> for String {
38    fn from(value: FifoThroughputLimit) -> Self {
39        match value {
40            FifoThroughputLimit::PerQueue => "perQueue".to_string(),
41            FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
42        }
43    }
44}
45
46type_state!(
47    QueueBuilderState,
48    StartState,
49    StandardState,
50    FifoState,
51);
52
53/// Builder for SQS queues.
54///
55/// Supports both standard and FIFO queues. FIFO queues have additional configuration
56/// options for deduplication and throughput.
57///
58/// # Example
59///
60/// ```rust
61/// use rusty_cdk_core::stack::StackBuilder;
62/// use rusty_cdk_core::sqs::QueueBuilder;
63/// use rusty_cdk_core::wrappers::*;
64/// use rusty_cdk_macros::{delay_seconds, message_retention_period, visibility_timeout};
65///
66/// let mut stack_builder = StackBuilder::new();
67///
68/// // Create a standard queue
69/// let standard_queue = QueueBuilder::new("standard-queue")
70///     .standard_queue()
71///     .visibility_timeout(visibility_timeout!(60))
72///     .build(&mut stack_builder);
73/// 
74/// // Create a FIFO queue
75/// let queue = QueueBuilder::new("my-queue")
76///     .fifo_queue()
77///     .content_based_deduplication(true)
78///     .delay_seconds(delay_seconds!(30))
79///     .message_retention_period(message_retention_period!(600))
80///     .build(&mut stack_builder);
81/// ```
82pub struct QueueBuilder<T: QueueBuilderState> {
83    state: PhantomData<T>,
84    id: Id,
85    queue_name: Option<String>,
86    delay_seconds: Option<u32>,
87    maximum_message_size: Option<u32>,
88    message_retention_period: Option<u32>,
89    receive_message_wait_time_seconds: Option<u32>,
90    visibility_timeout: Option<u32>,
91    content_based_deduplication: Option<bool>,
92    deduplication_scope: Option<String>,
93    fifo_throughput_limit: Option<String>,
94    sqs_managed_sse_enabled: Option<bool>,
95    redrive_policy: Option<RedrivePolicy>,
96    redrive_allow_policy: Option<Value>,
97}
98
99impl QueueBuilder<StartState> {
100    /// Creates a new SQS queue builder.
101    ///
102    /// # Arguments
103    /// * `id` - Unique identifier for the queue
104    pub fn new(id: &str) -> Self {
105        Self {
106            state: Default::default(),
107            id: Id(id.to_string()),
108            queue_name: None,
109            delay_seconds: None,
110            maximum_message_size: None,
111            message_retention_period: None,
112            receive_message_wait_time_seconds: None,
113            visibility_timeout: None,
114            content_based_deduplication: None,
115            deduplication_scope: None,
116            fifo_throughput_limit: None,
117            sqs_managed_sse_enabled: None,
118            redrive_policy: None,
119            redrive_allow_policy: None,
120        }
121    }
122
123    pub fn standard_queue(self) -> QueueBuilder<StandardState> {
124        QueueBuilder {
125            state: Default::default(),
126            id: self.id,
127            queue_name: self.queue_name,
128            delay_seconds: self.delay_seconds,
129            maximum_message_size: self.maximum_message_size,
130            message_retention_period: self.message_retention_period,
131            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
132            visibility_timeout: self.visibility_timeout,
133            content_based_deduplication: self.content_based_deduplication,
134            deduplication_scope: self.deduplication_scope,
135            fifo_throughput_limit: self.fifo_throughput_limit,
136            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
137            redrive_policy: self.redrive_policy,
138            redrive_allow_policy: self.redrive_allow_policy,
139        }
140    }
141
142    pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
143        QueueBuilder {
144            state: Default::default(),
145            id: self.id,
146            queue_name: self.queue_name,
147            delay_seconds: self.delay_seconds,
148            maximum_message_size: self.maximum_message_size,
149            message_retention_period: self.message_retention_period,
150            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
151            visibility_timeout: self.visibility_timeout,
152            content_based_deduplication: self.content_based_deduplication,
153            deduplication_scope: self.deduplication_scope,
154            fifo_throughput_limit: self.fifo_throughput_limit,
155            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
156            redrive_policy: self.redrive_policy,
157            redrive_allow_policy: self.redrive_allow_policy,
158        }
159    }
160}
161
162impl<T: QueueBuilderState> QueueBuilder<T> {
163    pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
164        Self {
165            delay_seconds: Some(delay.0 as u32),
166            ..self
167        }
168    }
169
170    pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
171        Self {
172            maximum_message_size: Some(size.0),
173            ..self
174        }
175    }
176
177    pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
178        Self {
179            message_retention_period: Some(period.0),
180            ..self
181        }
182    }
183
184    pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
185        Self {
186            receive_message_wait_time_seconds: Some(wait_time.0 as u32),
187            ..self
188        }
189    }
190
191    pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
192        Self {
193            visibility_timeout: Some(timeout.0),
194            ..self
195        }
196    }
197
198    pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
199        Self {
200            sqs_managed_sse_enabled: Some(enabled),
201            ..self
202        }
203    }
204
205    pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
206        Self {
207            redrive_policy: Some(RedrivePolicy {
208                dead_letter_target_arn: dead_letter_target_arn.into(),
209                max_receive_count: max_receive_count.0,
210            }),
211            ..self
212        }
213    }
214
215    pub fn redrive_allow_policy(self, policy: Value) -> Self {
216        Self {
217            redrive_allow_policy: Some(policy),
218            ..self
219        }
220    }
221
222    pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
223        Self {
224            queue_name: Some(name.0),
225            ..self
226        }
227    }
228
229    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
230        let properties = QueueProperties {
231            queue_name: self.queue_name,
232            delay_seconds: self.delay_seconds,
233            maximum_message_size: self.maximum_message_size,
234            message_retention_period: self.message_retention_period,
235            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
236            visibility_timeout: self.visibility_timeout,
237            fifo_queue: if fifo { Some(true) } else { None },
238            content_based_deduplication: self.content_based_deduplication,
239            deduplication_scope: self.deduplication_scope,
240            fifo_throughput_limit: self.fifo_throughput_limit,
241            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
242            redrive_policy: self.redrive_policy,
243            redrive_allow_policy: self.redrive_allow_policy,
244        };
245
246        let resource_id = Resource::generate_id("SqsQueue");
247        stack_builder.add_resource(Queue {
248            id: self.id,
249            resource_id: resource_id.clone(),
250            r#type: "AWS::SQS::Queue".to_string(),
251            properties,
252        });
253        
254        QueueRef::new(resource_id)
255    }
256}
257
258impl QueueBuilder<StandardState> {
259    pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
260        self.build_internal(false, stack_builder)
261    }
262}
263
264impl QueueBuilder<FifoState> {
265    pub fn content_based_deduplication(self, enabled: bool) -> Self {
266        Self {
267            content_based_deduplication: Some(enabled),
268            ..self
269        }
270    }
271
272    /// Enables high throughput mode for FIFO queues.
273    ///
274    /// Sets deduplication scope to MessageGroup and throughput limit to PerMessageGroupId.
275    pub fn high_throughput_fifo(self) -> Self {
276        Self {
277            deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
278            fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
279            ..self
280        }
281    }
282
283    pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
284        Self {
285            deduplication_scope: Some(scope.into()),
286            ..self
287        }
288    }
289
290    pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
291        Self {
292            fifo_throughput_limit: Some(limit.into()),
293            ..self
294        }
295    }
296
297    /// Builds the FIFO queue and adds it to the stack.
298    ///
299    /// Automatically appends the required ".fifo" suffix to the queue name if not already present.
300    pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
301        if let Some(ref name) = self.queue_name
302            && !name.ends_with(FIFO_SUFFIX) {
303                self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
304            }
305        self.build_internal(true, stack_builder)
306    }
307}
308
309pub struct QueuePolicyBuilder {
310    id: Id,
311    doc: PolicyDocument,
312    queues: Vec<Value>
313}
314
315impl QueuePolicyBuilder {
316    // see remarks topic policy
317
318    /// Creates a new SQS queue policy builder.
319    /// 
320    /// *Important* Current limitation: CloudFormation only allows one resource policy for a given queue, applying the last one it receives.
321    /// If you've added a bucket notification for this queue, which requires a policy, and you also define one yourself, one of both will get lost.
322    ///
323    /// # Arguments
324    /// * `id` - Unique identifier for the queue
325    /// * `doc` - The resource policy that should be applied to the queues
326    /// * `queues` - Queues for which the policy is valid
327    pub fn new(id: &str, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
328        Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
329    }
330
331    pub(crate) fn new_with_values(id: &str, doc: PolicyDocument, queues: Vec<Value>) -> Self {
332        Self {
333            id: Id(id.to_string()),
334            doc,
335            queues,
336        }
337    }
338
339    pub fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
340        let resource_id = Resource::generate_id("QueuePolicy");
341        stack_builder.add_resource(QueuePolicy {
342            id: self.id.clone(),
343            resource_id: resource_id.clone(),
344            r#type: "AWS::SQS::QueuePolicy".to_string(),
345            properties: QueuePolicyProperties {
346                doc: self.doc,
347                queues: self.queues,
348            },
349        });
350
351        QueuePolicyRef::new(self.id, resource_id)
352    }
353}