Skip to main content

rusty_cdk_core/sqs/
builder.rs

1use crate::iam::PolicyDocument;
2use crate::shared::Id;
3use crate::shared::QUEUE_POLICY_ID_SUFFIX;
4use crate::sqs::{QueuePolicyType, QueueRef, QueueType};
5use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{DelaySeconds, KeyReusePeriod, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime, StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::kms::KeyRef;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15#[derive(Debug, Clone)]
16pub enum DeduplicationScope {
17    Queue,
18    MessageGroup,
19}
20
21impl From<DeduplicationScope> for String {
22    fn from(value: DeduplicationScope) -> Self {
23        match value {
24            DeduplicationScope::Queue => "queue".to_string(),
25            DeduplicationScope::MessageGroup => "messageGroup".to_string(),
26        }
27    }
28}
29
30#[derive(Debug, Clone)]
31pub enum FifoThroughputLimit {
32    PerQueue,
33    PerMessageGroupId,
34}
35
36impl From<FifoThroughputLimit> for String {
37    fn from(value: FifoThroughputLimit) -> Self {
38        match value {
39            FifoThroughputLimit::PerQueue => "perQueue".to_string(),
40            FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
41        }
42    }
43}
44
45type_state!(QueueBuilderState, StartState, StandardState, FifoState,);
46
47/// Builder for SQS queues.
48///
49/// Supports both standard and FIFO queues. FIFO queues have additional configuration
50/// options for deduplication and throughput.
51///
52/// # Example
53///
54/// ```rust
55/// use rusty_cdk_core::stack::StackBuilder;
56/// use rusty_cdk_core::sqs::QueueBuilder;
57/// use rusty_cdk_core::wrappers::*;
58/// use rusty_cdk_macros::{delay_seconds, message_retention_period, visibility_timeout};
59///
60/// let mut stack_builder = StackBuilder::new();
61///
62/// // Create a standard queue
63/// let standard_queue = QueueBuilder::new("standard-queue")
64///     .standard_queue()
65///     .visibility_timeout(visibility_timeout!(60))
66///     .build(&mut stack_builder);
67///
68/// // Create a FIFO queue
69/// let queue = QueueBuilder::new("my-queue")
70///     .fifo_queue()
71///     .content_based_deduplication(true)
72///     .delay_seconds(delay_seconds!(30))
73///     .message_retention_period(message_retention_period!(600))
74///     .build(&mut stack_builder);
75/// ```
76pub struct QueueBuilder<T: QueueBuilderState> {
77    state: PhantomData<T>,
78    id: Id,
79    queue_name: Option<String>,
80    delay_seconds: Option<u32>,
81    maximum_message_size: Option<u32>,
82    message_retention_period: Option<u32>,
83    receive_message_wait_time_seconds: Option<u32>,
84    visibility_timeout: Option<u32>,
85    content_based_deduplication: Option<bool>,
86    deduplication_scope: Option<String>,
87    fifo_throughput_limit: Option<String>,
88    sqs_managed_sse_enabled: Option<bool>,
89    redrive_policy: Option<RedrivePolicy>,
90    redrive_allow_policy: Option<Value>,
91    queue_policy_doc: Option<PolicyDocument>,
92    kms_data_key_reuse_period_seconds: Option<u32>,
93    kms_master_key_id: Option<Value>,
94}
95
96impl QueueBuilder<StartState> {
97    /// Creates a new SQS queue builder.
98    ///
99    /// # Arguments
100    /// * `id` - Unique identifier for the queue
101    pub fn new(id: &str) -> Self {
102        Self {
103            state: Default::default(),
104            id: Id(id.to_string()),
105            queue_name: None,
106            delay_seconds: None,
107            maximum_message_size: None,
108            message_retention_period: None,
109            receive_message_wait_time_seconds: None,
110            visibility_timeout: None,
111            content_based_deduplication: None,
112            deduplication_scope: None,
113            fifo_throughput_limit: None,
114            sqs_managed_sse_enabled: None,
115            redrive_policy: None,
116            redrive_allow_policy: None,
117            queue_policy_doc: None,
118            kms_data_key_reuse_period_seconds: None,
119            kms_master_key_id: None,
120        }
121    }
122
123    pub fn standard_queue(self) -> QueueBuilder<StandardState> {
124        QueueBuilder {
125            state: Default::default(),
126            id: self.id,
127            queue_name: self.queue_name,
128            delay_seconds: self.delay_seconds,
129            maximum_message_size: self.maximum_message_size,
130            message_retention_period: self.message_retention_period,
131            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
132            visibility_timeout: self.visibility_timeout,
133            content_based_deduplication: self.content_based_deduplication,
134            deduplication_scope: self.deduplication_scope,
135            fifo_throughput_limit: self.fifo_throughput_limit,
136            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
137            redrive_policy: self.redrive_policy,
138            redrive_allow_policy: self.redrive_allow_policy,
139            queue_policy_doc: self.queue_policy_doc,
140            kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
141            kms_master_key_id: self.kms_master_key_id,
142        }
143    }
144
145    pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
146        QueueBuilder {
147            state: Default::default(),
148            id: self.id,
149            queue_name: self.queue_name,
150            delay_seconds: self.delay_seconds,
151            maximum_message_size: self.maximum_message_size,
152            message_retention_period: self.message_retention_period,
153            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
154            visibility_timeout: self.visibility_timeout,
155            content_based_deduplication: self.content_based_deduplication,
156            deduplication_scope: self.deduplication_scope,
157            fifo_throughput_limit: self.fifo_throughput_limit,
158            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
159            redrive_policy: self.redrive_policy,
160            redrive_allow_policy: self.redrive_allow_policy,
161            queue_policy_doc: self.queue_policy_doc,
162            kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
163            kms_master_key_id: self.kms_master_key_id,
164        }
165    }
166}
167
168impl<T: QueueBuilderState> QueueBuilder<T> {
169    pub fn kms_data_key_reuse_period(self, reuse_period_seconds: KeyReusePeriod) -> Self {
170        Self {
171            kms_data_key_reuse_period_seconds: Some(reuse_period_seconds.0),
172            ..self
173        }
174    }
175
176    pub fn kms_master_key(self, kms_master_key: &KeyRef) -> Self {
177        Self {
178            kms_master_key_id: Some(kms_master_key.get_ref()),
179            ..self
180        }
181    }
182
183    pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
184        Self {
185            delay_seconds: Some(delay.0 as u32),
186            ..self
187        }
188    }
189
190    pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
191        Self {
192            maximum_message_size: Some(size.0),
193            ..self
194        }
195    }
196
197    pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
198        Self {
199            message_retention_period: Some(period.0),
200            ..self
201        }
202    }
203
204    pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
205        Self {
206            receive_message_wait_time_seconds: Some(wait_time.0 as u32),
207            ..self
208        }
209    }
210
211    pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
212        Self {
213            visibility_timeout: Some(timeout.0),
214            ..self
215        }
216    }
217
218    pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
219        Self {
220            sqs_managed_sse_enabled: Some(enabled),
221            ..self
222        }
223    }
224
225    pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
226        Self {
227            redrive_policy: Some(RedrivePolicy {
228                dead_letter_target_arn: dead_letter_target_arn.into(),
229                max_receive_count: max_receive_count.0,
230            }),
231            ..self
232        }
233    }
234
235    pub fn redrive_allow_policy(self, policy: Value) -> Self {
236        Self {
237            redrive_allow_policy: Some(policy),
238            ..self
239        }
240    }
241
242    pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
243        Self {
244            queue_name: Some(name.0),
245            ..self
246        }
247    }
248
249    /// Adds an SQS Queue Policy for this queue.
250    /// The code will automatically set the `resources` section of the `PolicyDocument` to the ARN of this queue, so there's no need to pass that in.
251    pub fn queue_policy(self, doc: PolicyDocument) -> Self {
252        Self {
253            queue_policy_doc: Some(doc),
254            ..self
255        }
256    }
257
258    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
259        let properties = QueueProperties {
260            queue_name: self.queue_name,
261            delay_seconds: self.delay_seconds,
262            maximum_message_size: self.maximum_message_size,
263            message_retention_period: self.message_retention_period,
264            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
265            visibility_timeout: self.visibility_timeout,
266            fifo_queue: if fifo { Some(true) } else { None },
267            content_based_deduplication: self.content_based_deduplication,
268            deduplication_scope: self.deduplication_scope,
269            fifo_throughput_limit: self.fifo_throughput_limit,
270            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
271            redrive_policy: self.redrive_policy,
272            redrive_allow_policy: self.redrive_allow_policy,
273            kms_data_key_reuse_period_seconds: self.kms_data_key_reuse_period_seconds,
274            kms_master_key_id: self.kms_master_key_id,
275        };
276
277        let resource_id = Resource::generate_id("SqsQueue");
278        let queue_ref = QueueRef::internal_new(self.id.clone(), resource_id.clone());
279
280        if let Some(mut policy) = self.queue_policy_doc {
281            for statement in &mut policy.statements {
282                // point the statements of this policy to the queue
283                statement.resource = Some(vec![queue_ref.get_arn()]);
284            }
285            QueuePolicyBuilder::new(Id::generate_id(&self.id, QUEUE_POLICY_ID_SUFFIX), policy, vec![&queue_ref]).build(stack_builder);
286        }
287
288        stack_builder.add_resource(Queue {
289            id: self.id,
290            resource_id,
291            r#type: QueueType::QueueType,
292            properties,
293        });
294
295        queue_ref
296    }
297}
298
299impl QueueBuilder<StandardState> {
300    pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
301        self.build_internal(false, stack_builder)
302    }
303}
304
305impl QueueBuilder<FifoState> {
306    pub fn content_based_deduplication(self, enabled: bool) -> Self {
307        Self {
308            content_based_deduplication: Some(enabled),
309            ..self
310        }
311    }
312
313    /// Enables high throughput mode for FIFO queues.
314    ///
315    /// Sets deduplication scope to MessageGroup and throughput limit to PerMessageGroupId.
316    pub fn high_throughput_fifo(self) -> Self {
317        Self {
318            deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
319            fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
320            ..self
321        }
322    }
323
324    pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
325        Self {
326            deduplication_scope: Some(scope.into()),
327            ..self
328        }
329    }
330
331    pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
332        Self {
333            fifo_throughput_limit: Some(limit.into()),
334            ..self
335        }
336    }
337
338    /// Builds the FIFO queue and adds it to the stack.
339    ///
340    /// Automatically appends the required ".fifo" suffix to the queue name if not already present.
341    pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
342        if let Some(ref name) = self.queue_name
343            && !name.ends_with(FIFO_SUFFIX)
344        {
345            self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
346        }
347        self.build_internal(true, stack_builder)
348    }
349}
350
351pub(crate) struct QueuePolicyBuilder {
352    id: Id,
353    doc: PolicyDocument,
354    queues: Vec<Value>,
355}
356
357impl QueuePolicyBuilder {
358    /// Use the `queue_policy` method of `QueueBuilder` to add a queue policy to a queue
359    pub(crate) fn new(id: Id, doc: PolicyDocument, queues: Vec<&QueueRef>) -> Self {
360        Self::new_with_values(id, doc, queues.into_iter().map(|v| v.get_ref()).collect())
361    }
362
363    pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, queues: Vec<Value>) -> Self {
364        Self { id, doc, queues }
365    }
366
367    pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
368        let resource_id = Resource::generate_id("QueuePolicy");
369        stack_builder.add_resource(QueuePolicy {
370            id: self.id.clone(),
371            resource_id: resource_id.clone(),
372            r#type: QueuePolicyType::QueuePolicyType,
373            properties: QueuePolicyProperties {
374                doc: self.doc,
375                queues: self.queues,
376            },
377        });
378
379        QueuePolicyRef::internal_new(self.id, resource_id)
380    }
381}