rusty_cdk_core/sqs/
builder.rs

1use crate::shared::Id;
2use crate::sqs::{Queue, QueuePolicy, QueuePolicyProperties, QueuePolicyRef, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6    DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7    StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::iam::PolicyDocument;
12use crate::type_state;
13
14const FIFO_SUFFIX: &str = ".fifo";
15
16pub enum DeduplicationScope {
17    Queue,
18    MessageGroup,
19}
20
21impl From<DeduplicationScope> for String {
22    fn from(value: DeduplicationScope) -> Self {
23        match value {
24            DeduplicationScope::Queue => "queue".to_string(),
25            DeduplicationScope::MessageGroup => "messageGroup".to_string(),
26        }
27    }
28}
29
30pub enum FifoThroughputLimit {
31    PerQueue,
32    PerMessageGroupId,
33}
34
35impl From<FifoThroughputLimit> for String {
36    fn from(value: FifoThroughputLimit) -> Self {
37        match value {
38            FifoThroughputLimit::PerQueue => "perQueue".to_string(),
39            FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
40        }
41    }
42}
43
44type_state!(
45    QueueBuilderState,
46    StartState,
47    StandardState,
48    FifoState,
49);
50
51/// Builder for SQS queues.
52///
53/// Supports both standard and FIFO queues. FIFO queues have additional configuration
54/// options for deduplication and throughput.
55///
56/// # Example
57///
58/// ```rust
59/// use rusty_cdk_core::stack::StackBuilder;
60/// use rusty_cdk_core::sqs::QueueBuilder;
61/// use rusty_cdk_core::wrappers::*;
62/// use rusty_cdk_macros::{delay_seconds, message_retention_period, visibility_timeout};
63///
64/// let mut stack_builder = StackBuilder::new();
65///
66/// // Create a standard queue
67/// let standard_queue = QueueBuilder::new("standard-queue")
68///     .standard_queue()
69///     .visibility_timeout(visibility_timeout!(60))
70///     .build(&mut stack_builder);
71/// 
72/// // Create a FIFO queue
73/// let queue = QueueBuilder::new("my-queue")
74///     .fifo_queue()
75///     .content_based_deduplication(true)
76///     .delay_seconds(delay_seconds!(30))
77///     .message_retention_period(message_retention_period!(600))
78///     .build(&mut stack_builder);
79/// ```
80pub struct QueueBuilder<T: QueueBuilderState> {
81    state: PhantomData<T>,
82    id: Id,
83    queue_name: Option<String>,
84    delay_seconds: Option<u32>,
85    maximum_message_size: Option<u32>,
86    message_retention_period: Option<u32>,
87    receive_message_wait_time_seconds: Option<u32>,
88    visibility_timeout: Option<u32>,
89    content_based_deduplication: Option<bool>,
90    deduplication_scope: Option<String>,
91    fifo_throughput_limit: Option<String>,
92    sqs_managed_sse_enabled: Option<bool>,
93    redrive_policy: Option<RedrivePolicy>,
94    redrive_allow_policy: Option<Value>,
95}
96
97impl QueueBuilder<StartState> {
98    /// Creates a new SQS queue builder.
99    ///
100    /// # Arguments
101    /// * `id` - Unique identifier for the queue
102    pub fn new(id: &str) -> Self {
103        Self {
104            state: Default::default(),
105            id: Id(id.to_string()),
106            queue_name: None,
107            delay_seconds: None,
108            maximum_message_size: None,
109            message_retention_period: None,
110            receive_message_wait_time_seconds: None,
111            visibility_timeout: None,
112            content_based_deduplication: None,
113            deduplication_scope: None,
114            fifo_throughput_limit: None,
115            sqs_managed_sse_enabled: None,
116            redrive_policy: None,
117            redrive_allow_policy: None,
118        }
119    }
120
121    pub fn standard_queue(self) -> QueueBuilder<StandardState> {
122        QueueBuilder {
123            state: Default::default(),
124            id: self.id,
125            queue_name: self.queue_name,
126            delay_seconds: self.delay_seconds,
127            maximum_message_size: self.maximum_message_size,
128            message_retention_period: self.message_retention_period,
129            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
130            visibility_timeout: self.visibility_timeout,
131            content_based_deduplication: self.content_based_deduplication,
132            deduplication_scope: self.deduplication_scope,
133            fifo_throughput_limit: self.fifo_throughput_limit,
134            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
135            redrive_policy: self.redrive_policy,
136            redrive_allow_policy: self.redrive_allow_policy,
137        }
138    }
139
140    pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
141        QueueBuilder {
142            state: Default::default(),
143            id: self.id,
144            queue_name: self.queue_name,
145            delay_seconds: self.delay_seconds,
146            maximum_message_size: self.maximum_message_size,
147            message_retention_period: self.message_retention_period,
148            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
149            visibility_timeout: self.visibility_timeout,
150            content_based_deduplication: self.content_based_deduplication,
151            deduplication_scope: self.deduplication_scope,
152            fifo_throughput_limit: self.fifo_throughput_limit,
153            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
154            redrive_policy: self.redrive_policy,
155            redrive_allow_policy: self.redrive_allow_policy,
156        }
157    }
158}
159
160impl<T: QueueBuilderState> QueueBuilder<T> {
161    pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
162        Self {
163            delay_seconds: Some(delay.0 as u32),
164            ..self
165        }
166    }
167
168    pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
169        Self {
170            maximum_message_size: Some(size.0),
171            ..self
172        }
173    }
174
175    pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
176        Self {
177            message_retention_period: Some(period.0),
178            ..self
179        }
180    }
181
182    pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
183        Self {
184            receive_message_wait_time_seconds: Some(wait_time.0 as u32),
185            ..self
186        }
187    }
188
189    pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
190        Self {
191            visibility_timeout: Some(timeout.0),
192            ..self
193        }
194    }
195
196    pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
197        Self {
198            sqs_managed_sse_enabled: Some(enabled),
199            ..self
200        }
201    }
202
203    pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
204        Self {
205            redrive_policy: Some(RedrivePolicy {
206                dead_letter_target_arn: dead_letter_target_arn.into(),
207                max_receive_count: max_receive_count.0,
208            }),
209            ..self
210        }
211    }
212
213    pub fn redrive_allow_policy(self, policy: Value) -> Self {
214        Self {
215            redrive_allow_policy: Some(policy),
216            ..self
217        }
218    }
219
220    pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
221        Self {
222            queue_name: Some(name.0),
223            ..self
224        }
225    }
226
227    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
228        let properties = QueueProperties {
229            queue_name: self.queue_name,
230            delay_seconds: self.delay_seconds,
231            maximum_message_size: self.maximum_message_size,
232            message_retention_period: self.message_retention_period,
233            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
234            visibility_timeout: self.visibility_timeout,
235            fifo_queue: if fifo { Some(true) } else { None },
236            content_based_deduplication: self.content_based_deduplication,
237            deduplication_scope: self.deduplication_scope,
238            fifo_throughput_limit: self.fifo_throughput_limit,
239            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
240            redrive_policy: self.redrive_policy,
241            redrive_allow_policy: self.redrive_allow_policy,
242        };
243
244        let resource_id = Resource::generate_id("SqsQueue");
245        stack_builder.add_resource(Queue {
246            id: self.id,
247            resource_id: resource_id.clone(),
248            r#type: "AWS::SQS::Queue".to_string(),
249            properties,
250        });
251        
252        QueueRef::new(resource_id)
253    }
254}
255
256impl QueueBuilder<StandardState> {
257    pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
258        self.build_internal(false, stack_builder)
259    }
260}
261
262impl QueueBuilder<FifoState> {
263    pub fn content_based_deduplication(self, enabled: bool) -> Self {
264        Self {
265            content_based_deduplication: Some(enabled),
266            ..self
267        }
268    }
269
270    /// Enables high throughput mode for FIFO queues.
271    ///
272    /// Sets deduplication scope to MessageGroup and throughput limit to PerMessageGroupId.
273    pub fn high_throughput_fifo(self) -> Self {
274        Self {
275            deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
276            fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
277            ..self
278        }
279    }
280
281    pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
282        Self {
283            deduplication_scope: Some(scope.into()),
284            ..self
285        }
286    }
287
288    pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
289        Self {
290            fifo_throughput_limit: Some(limit.into()),
291            ..self
292        }
293    }
294
295    /// Builds the FIFO queue and adds it to the stack.
296    ///
297    /// Automatically appends the required ".fifo" suffix to the queue name if not already present.
298    pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
299        if let Some(ref name) = self.queue_name
300            && !name.ends_with(FIFO_SUFFIX) {
301                self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
302            }
303        self.build_internal(true, stack_builder)
304    }
305}
306
307// TODO make available to outside
308//  - public new accepts QueueRef
309//  - private new accepts Value
310pub(crate) struct QueuePolicyBuilder {
311    id: Id,
312    doc: PolicyDocument,
313    queues: Vec<Value>
314}
315
316impl QueuePolicyBuilder {
317    pub(crate) fn new(id: &str, doc: PolicyDocument, queues: Vec<Value>) -> Self {
318        Self {
319            id: Id(id.to_string()),
320            doc,
321            queues,
322        }
323    }
324
325    pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> QueuePolicyRef {
326        let resource_id = Resource::generate_id("QueuePolicy");
327        stack_builder.add_resource(QueuePolicy {
328            id: self.id.clone(),
329            resource_id: resource_id.clone(),
330            r#type: "AWS::SQS::QueuePolicy".to_string(),
331            properties: QueuePolicyProperties {
332                doc: self.doc,
333                queues: self.queues,
334            },
335        });
336
337        QueuePolicyRef::new(self.id, resource_id)
338    }
339}