rusty_cdk_core/sqs/
builder.rs

1use crate::shared::Id;
2use crate::sqs::{Queue, QueueProperties, RedrivePolicy};
3use crate::sqs::QueueRef;
4use crate::stack::{Resource, StackBuilder};
5use crate::wrappers::{
6    DelaySeconds, MaximumMessageSize, MessageRetentionPeriod, NonZeroNumber, ReceiveMessageWaitTime,
7    StringWithOnlyAlphaNumericsAndUnderscores, VisibilityTimeout,
8};
9use serde_json::Value;
10use std::marker::PhantomData;
11use crate::type_state;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15pub enum DeduplicationScope {
16    Queue,
17    MessageGroup,
18}
19
20impl From<DeduplicationScope> for String {
21    fn from(value: DeduplicationScope) -> Self {
22        match value {
23            DeduplicationScope::Queue => "queue".to_string(),
24            DeduplicationScope::MessageGroup => "messageGroup".to_string(),
25        }
26    }
27}
28
29pub enum FifoThroughputLimit {
30    PerQueue,
31    PerMessageGroupId,
32}
33
34impl From<FifoThroughputLimit> for String {
35    fn from(value: FifoThroughputLimit) -> Self {
36        match value {
37            FifoThroughputLimit::PerQueue => "perQueue".to_string(),
38            FifoThroughputLimit::PerMessageGroupId => "perMessageGroupId".to_string(),
39        }
40    }
41}
42
43type_state!(
44    QueueBuilderState,
45    StartState,
46    StandardState,
47    FifoState,
48);
49
50/// Builder for SQS queues.
51///
52/// Supports both standard and FIFO queues. FIFO queues have additional configuration
53/// options for deduplication and throughput.
54///
55/// # Example
56///
57/// ```rust
58/// use rusty_cdk_core::stack::StackBuilder;
59/// use rusty_cdk_core::sqs::QueueBuilder;
60/// use rusty_cdk_core::wrappers::*;
61/// use rusty_cdk_macros::{delay_seconds, message_retention_period, visibility_timeout};
62///
63/// let mut stack_builder = StackBuilder::new();
64///
65/// // Create a standard queue
66/// let standard_queue = QueueBuilder::new("standard-queue")
67///     .standard_queue()
68///     .visibility_timeout(visibility_timeout!(60))
69///     .build(&mut stack_builder);
70/// 
71/// // Create a FIFO queue
72/// let queue = QueueBuilder::new("my-queue")
73///     .fifo_queue()
74///     .content_based_deduplication(true)
75///     .delay_seconds(delay_seconds!(30))
76///     .message_retention_period(message_retention_period!(600))
77///     .build(&mut stack_builder);
78/// ```
79pub struct QueueBuilder<T: QueueBuilderState> {
80    state: PhantomData<T>,
81    id: Id,
82    queue_name: Option<String>,
83    delay_seconds: Option<u32>,
84    maximum_message_size: Option<u32>,
85    message_retention_period: Option<u32>,
86    receive_message_wait_time_seconds: Option<u32>,
87    visibility_timeout: Option<u32>,
88    content_based_deduplication: Option<bool>,
89    deduplication_scope: Option<String>,
90    fifo_throughput_limit: Option<String>,
91    sqs_managed_sse_enabled: Option<bool>,
92    redrive_policy: Option<RedrivePolicy>,
93    redrive_allow_policy: Option<Value>,
94}
95
96impl QueueBuilder<StartState> {
97    /// Creates a new SQS queue builder.
98    ///
99    /// # Arguments
100    /// * `id` - Unique identifier for the queue
101    pub fn new(id: &str) -> Self {
102        Self {
103            state: Default::default(),
104            id: Id(id.to_string()),
105            queue_name: None,
106            delay_seconds: None,
107            maximum_message_size: None,
108            message_retention_period: None,
109            receive_message_wait_time_seconds: None,
110            visibility_timeout: None,
111            content_based_deduplication: None,
112            deduplication_scope: None,
113            fifo_throughput_limit: None,
114            sqs_managed_sse_enabled: None,
115            redrive_policy: None,
116            redrive_allow_policy: None,
117        }
118    }
119
120    pub fn standard_queue(self) -> QueueBuilder<StandardState> {
121        QueueBuilder {
122            state: Default::default(),
123            id: self.id,
124            queue_name: self.queue_name,
125            delay_seconds: self.delay_seconds,
126            maximum_message_size: self.maximum_message_size,
127            message_retention_period: self.message_retention_period,
128            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
129            visibility_timeout: self.visibility_timeout,
130            content_based_deduplication: self.content_based_deduplication,
131            deduplication_scope: self.deduplication_scope,
132            fifo_throughput_limit: self.fifo_throughput_limit,
133            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
134            redrive_policy: self.redrive_policy,
135            redrive_allow_policy: self.redrive_allow_policy,
136        }
137    }
138
139    pub fn fifo_queue(self) -> QueueBuilder<FifoState> {
140        QueueBuilder {
141            state: Default::default(),
142            id: self.id,
143            queue_name: self.queue_name,
144            delay_seconds: self.delay_seconds,
145            maximum_message_size: self.maximum_message_size,
146            message_retention_period: self.message_retention_period,
147            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
148            visibility_timeout: self.visibility_timeout,
149            content_based_deduplication: self.content_based_deduplication,
150            deduplication_scope: self.deduplication_scope,
151            fifo_throughput_limit: self.fifo_throughput_limit,
152            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
153            redrive_policy: self.redrive_policy,
154            redrive_allow_policy: self.redrive_allow_policy,
155        }
156    }
157}
158
159impl<T: QueueBuilderState> QueueBuilder<T> {
160    pub fn delay_seconds(self, delay: DelaySeconds) -> Self {
161        Self {
162            delay_seconds: Some(delay.0 as u32),
163            ..self
164        }
165    }
166
167    pub fn maximum_message_size(self, size: MaximumMessageSize) -> Self {
168        Self {
169            maximum_message_size: Some(size.0),
170            ..self
171        }
172    }
173
174    pub fn message_retention_period(self, period: MessageRetentionPeriod) -> Self {
175        Self {
176            message_retention_period: Some(period.0),
177            ..self
178        }
179    }
180
181    pub fn receive_message_wait_time_seconds(self, wait_time: ReceiveMessageWaitTime) -> Self {
182        Self {
183            receive_message_wait_time_seconds: Some(wait_time.0 as u32),
184            ..self
185        }
186    }
187
188    pub fn visibility_timeout(self, timeout: VisibilityTimeout) -> Self {
189        Self {
190            visibility_timeout: Some(timeout.0),
191            ..self
192        }
193    }
194
195    pub fn sqs_managed_sse_enabled(self, enabled: bool) -> Self {
196        Self {
197            sqs_managed_sse_enabled: Some(enabled),
198            ..self
199        }
200    }
201
202    pub fn dead_letter_queue<D: Into<String>>(self, dead_letter_target_arn: D, max_receive_count: NonZeroNumber) -> Self {
203        Self {
204            redrive_policy: Some(RedrivePolicy {
205                dead_letter_target_arn: dead_letter_target_arn.into(),
206                max_receive_count: max_receive_count.0,
207            }),
208            ..self
209        }
210    }
211
212    pub fn redrive_allow_policy(self, policy: Value) -> Self {
213        Self {
214            redrive_allow_policy: Some(policy),
215            ..self
216        }
217    }
218
219    pub fn queue_name(self, name: StringWithOnlyAlphaNumericsAndUnderscores) -> Self {
220        Self {
221            queue_name: Some(name.0),
222            ..self
223        }
224    }
225
226    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> QueueRef {
227        let properties = QueueProperties {
228            queue_name: self.queue_name,
229            delay_seconds: self.delay_seconds,
230            maximum_message_size: self.maximum_message_size,
231            message_retention_period: self.message_retention_period,
232            receive_message_wait_time_seconds: self.receive_message_wait_time_seconds,
233            visibility_timeout: self.visibility_timeout,
234            fifo_queue: if fifo { Some(true) } else { None },
235            content_based_deduplication: self.content_based_deduplication,
236            deduplication_scope: self.deduplication_scope,
237            fifo_throughput_limit: self.fifo_throughput_limit,
238            sqs_managed_sse_enabled: self.sqs_managed_sse_enabled,
239            redrive_policy: self.redrive_policy,
240            redrive_allow_policy: self.redrive_allow_policy,
241        };
242
243        let resource_id = Resource::generate_id("SqsQueue");
244        stack_builder.add_resource(Queue {
245            id: self.id,
246            resource_id: resource_id.clone(),
247            r#type: "AWS::SQS::Queue".to_string(),
248            properties,
249        });
250        
251        QueueRef::new(resource_id)
252    }
253}
254
255impl QueueBuilder<StandardState> {
256    pub fn build(self, stack_builder: &mut StackBuilder) -> QueueRef {
257        self.build_internal(false, stack_builder)
258    }
259}
260
261impl QueueBuilder<FifoState> {
262    pub fn content_based_deduplication(self, enabled: bool) -> Self {
263        Self {
264            content_based_deduplication: Some(enabled),
265            ..self
266        }
267    }
268
269    /// Enables high throughput mode for FIFO queues.
270    ///
271    /// Sets deduplication scope to MessageGroup and throughput limit to PerMessageGroupId.
272    pub fn high_throughput_fifo(self) -> Self {
273        Self {
274            deduplication_scope: Some(DeduplicationScope::MessageGroup.into()),
275            fifo_throughput_limit: Some(FifoThroughputLimit::PerMessageGroupId.into()),
276            ..self
277        }
278    }
279
280    pub fn deduplication_scope(self, scope: DeduplicationScope) -> Self {
281        Self {
282            deduplication_scope: Some(scope.into()),
283            ..self
284        }
285    }
286
287    pub fn fifo_throughput_limit(self, limit: FifoThroughputLimit) -> Self {
288        Self {
289            fifo_throughput_limit: Some(limit.into()),
290            ..self
291        }
292    }
293
294    /// Builds the FIFO queue and adds it to the stack.
295    ///
296    /// Automatically appends the required ".fifo" suffix to the queue name if not already present.
297    pub fn build(mut self, stack_builder: &mut StackBuilder) -> QueueRef {
298        if let Some(ref name) = self.queue_name
299            && !name.ends_with(FIFO_SUFFIX) {
300                self.queue_name = Some(format!("{}{}", name, FIFO_SUFFIX));
301            }
302        self.build_internal(true, stack_builder)
303    }
304}