Skip to main content

rusty_cdk_core/sns/
builder.rs

1use crate::iam::{PolicyDocument, RoleRef};
2use crate::intrinsic::{get_arn, get_ref};
3use crate::lambda::{FunctionRef, PermissionBuilder};
4use crate::shared::{Id, TOPIC_POLICY_ID_SUFFIX};
5use crate::sns::{LoggingConfig, SnsSubscriptionProperties, Subscription, SubscriptionDtoType, Topic, TopicPolicy, TopicPolicyProperties, TopicPolicyRef, TopicPolicyType, TopicProperties, TopicRef, TopicType};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{ArchivePolicy, LambdaPermissionAction, StringWithOnlyAlphaNumericsUnderscoresAndHyphens, SuccessFeedbackSampleRate, TopicDisplayName};
9use serde_json::{json, Value};
10use std::marker::PhantomData;
11use crate::kms::KeyRef;
12
13const FIFO_SUFFIX: &str = ".fifo";
14
15#[derive(Debug, Clone)]
16pub enum FifoThroughputScope {
17    Topic,
18    MessageGroup
19}
20
21pub enum SubscriptionType<'a> {
22    Lambda(&'a FunctionRef)
23}
24
25impl From<FifoThroughputScope> for String {
26    fn from(value: FifoThroughputScope) -> Self {
27        match value {
28            FifoThroughputScope::Topic => "Topic".to_string(),
29            FifoThroughputScope::MessageGroup => "MessageGroup".to_string(),
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub enum TracingConfig {
36    PassThrough,
37    Active,
38}
39
40impl From<TracingConfig> for String {
41    fn from(value: TracingConfig) -> Self {
42        match value {
43            TracingConfig::PassThrough => "PassThrough".to_string(),
44            TracingConfig::Active => "Active".to_string(),
45        }
46    }
47}
48
49type_state!(
50    TopicBuilderState,
51    StartState,
52    StandardStateWithSubscriptions,
53    FifoState,
54    FifoStateWithSubscriptions,
55);
56
57/// Builder for SNS topics.
58///
59/// Supports both standard and FIFO topics with Lambda subscriptions.
60/// FIFO topics have additional configuration for deduplication and throughput.
61///
62/// # Example
63///
64/// ```rust,no_run
65/// use rusty_cdk_core::stack::StackBuilder;
66/// use rusty_cdk_core::sns::{TopicBuilder, SubscriptionType};
67/// # use rusty_cdk_core::lambda::{FunctionBuilder, Architecture, Runtime, Zip};
68/// # use rusty_cdk_core::wrappers::*;
69/// # use rusty_cdk_macros::{memory, timeout, zip_file};
70///
71/// let mut stack_builder = StackBuilder::new();
72///
73/// // Create a simple topic without subscriptions
74/// let simple_topic = TopicBuilder::new("simple-topic")
75///     .build(&mut stack_builder);
76/// 
77/// let function = unimplemented!("create a function");
78///
79/// // Create a topic with a Lambda subscription
80/// let topic = TopicBuilder::new("my-topic")
81///     .add_subscription(SubscriptionType::Lambda(&function))
82///     .build(&mut stack_builder);
83///
84/// ```
85pub struct TopicBuilder<T: TopicBuilderState> {
86    state: PhantomData<T>,
87    id: Id,
88    topic_name: Option<String>,
89    content_based_deduplication: Option<bool>,
90    fifo_throughput_scope: Option<FifoThroughputScope>,
91    topic_policy_doc: Option<PolicyDocument>,
92    lambda_subscription_ids: Vec<(Id, String)>,
93    archive_policy: Option<String>,
94    display_name: Option<String>,
95    kms_master_key_id: Option<Value>,
96    tracing_config: Option<String>,
97    logging_config: Option<LoggingConfig>,
98}
99
100impl TopicBuilder<StartState> {
101    /// Creates a new SNS topic builder.
102    ///
103    /// # Arguments
104    /// * `id` - Unique identifier for the topic
105    pub fn new(id: &str) -> Self {
106        Self {
107            state: Default::default(),
108            id: Id(id.to_string()),
109            topic_name: None,
110            content_based_deduplication: None,
111            fifo_throughput_scope: None,
112            topic_policy_doc: None,
113            lambda_subscription_ids: vec![],
114            archive_policy: None,
115            display_name: None,
116            kms_master_key_id: None,
117            tracing_config: None,
118            logging_config: None,
119        }
120    }
121
122    /// Adds a subscription to the topic.
123    ///
124    /// For Lambda subscriptions, automatically creates the necessary permission.
125    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
126        self.add_subscription_internal(subscription);
127
128        TopicBuilder {
129            state: Default::default(),
130            id: self.id,
131            topic_name: self.topic_name,
132            content_based_deduplication: self.content_based_deduplication,
133            fifo_throughput_scope: self.fifo_throughput_scope,
134            topic_policy_doc: self.topic_policy_doc,
135            lambda_subscription_ids: self.lambda_subscription_ids,
136            archive_policy: self.archive_policy,
137            display_name: self.display_name,
138            kms_master_key_id: self.kms_master_key_id,
139            tracing_config: self.tracing_config,
140            logging_config: self.logging_config,
141        }
142    }
143
144    pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
145        self.build_internal(false, stack_builder)
146    }
147}
148
149impl TopicBuilder<StandardStateWithSubscriptions> {
150    pub fn add_subscription(mut self, subscription: SubscriptionType) -> Self {
151        self.add_subscription_internal(subscription);
152        self
153    }
154
155    pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
156        self.build_internal(false, stack_builder)
157    }
158}
159
160impl<T: TopicBuilderState> TopicBuilder<T> {
161    pub fn display_name(self, display_name: TopicDisplayName) -> Self {
162        Self {
163            display_name: Some(display_name.0),
164            ..self
165        }
166    }
167    
168    pub fn logging_config(self, logging_config: LoggingConfig) -> Self {
169        Self {
170            logging_config: Some(logging_config),
171            ..self
172        }
173    }
174
175    pub fn kms_master_key(self, kms_key: &KeyRef) -> Self {
176        Self {
177            kms_master_key_id: Some(kms_key.get_ref()),
178            ..self
179        }
180    }
181
182    pub fn tracing_config(self, tracing_config: TracingConfig) -> Self {
183        Self {
184            tracing_config: Some(tracing_config.into()),
185            ..self
186        }
187    }
188
189    pub fn topic_name(self, topic_name: StringWithOnlyAlphaNumericsUnderscoresAndHyphens) -> Self {
190        Self {
191            topic_name: Some(topic_name.0),
192            ..self
193        }
194    }
195
196    pub fn fifo(self) -> TopicBuilder<FifoState> {
197        TopicBuilder {
198            state: Default::default(),
199            id: self.id,
200            topic_name: self.topic_name,
201            content_based_deduplication: self.content_based_deduplication,
202            fifo_throughput_scope: self.fifo_throughput_scope,
203            topic_policy_doc: self.topic_policy_doc,
204            lambda_subscription_ids: self.lambda_subscription_ids,
205            display_name: self.display_name,
206            kms_master_key_id: self.kms_master_key_id,
207            archive_policy: self.archive_policy,
208            tracing_config: self.tracing_config,
209            logging_config: self.logging_config,
210        }
211    }
212
213    /// Adds an SNS Topic Policy for this topic.
214    /// The code will automatically set the `resources` section of the `PolicyDocument` to the ARN of this queue, so there's no need to pass that in.
215    pub fn topic_policy(self, doc: PolicyDocument) -> Self {
216        Self {
217            topic_policy_doc: Some(doc),
218            ..self
219        }
220    }
221    
222    fn add_subscription_internal(&mut self, subscription: SubscriptionType) {
223        match subscription {
224            SubscriptionType::Lambda(l) => self.lambda_subscription_ids.push((l.get_id().clone(), l.get_resource_id().to_string()))
225        };
226    }
227    
228    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> TopicRef {
229        let topic_resource_id = Resource::generate_id("SnsTopic");
230        
231        self.lambda_subscription_ids.iter().for_each(|(to_subscribe_id, to_subscribe_resource_id)| {
232            let subscription_id = Id::combine_ids(&self.id, to_subscribe_id);
233            let subscription_resource_id = Resource::generate_id("SnsSubscription");
234            
235            PermissionBuilder::new(&Id::generate_id(&subscription_id, "Permission"), LambdaPermissionAction("lambda:InvokeFunction".to_string()), get_arn(to_subscribe_resource_id), "sns.amazonaws.com")
236                .source_arn(get_ref(&topic_resource_id))
237                .build(stack_builder);
238
239            let subscription = Subscription {
240                id: subscription_id,
241                resource_id: subscription_resource_id,
242                r#type: SubscriptionDtoType::SubscriptionType,
243                properties: SnsSubscriptionProperties {
244                    protocol: "lambda".to_string(),
245                    endpoint: get_arn(to_subscribe_resource_id),
246                    topic_arn: get_ref(&topic_resource_id),
247                },
248            };
249
250            stack_builder.add_resource(subscription);
251        });
252
253        let archive_policy = if let Some(policy_retention_time) = self.archive_policy {
254            Some(json!({ "MessageRetentionPeriod": policy_retention_time }))
255        } else {
256            None
257        };
258        
259        let properties = TopicProperties {
260            topic_name: self.topic_name,
261            fifo_topic: Some(fifo),
262            content_based_deduplication: self.content_based_deduplication,
263            fifo_throughput_scope: self.fifo_throughput_scope.map(Into::into),
264            archive_policy,
265            display_name: self.display_name,
266            kms_master_key_id: self.kms_master_key_id,
267            tracing_config: self.tracing_config,
268            delivery_status_logging: self.logging_config,
269        };
270
271        let topic_ref = TopicRef::internal_new(self.id.clone(), topic_resource_id.to_string());
272        
273        if let Some(mut policy) = self.topic_policy_doc {
274            for statement in &mut policy.statements {
275                // point the statements of this policy to the queue
276                statement.resource = Some(vec![topic_ref.get_ref()]);
277            }
278            TopicPolicyBuilder::new(Id::generate_id(&self.id, TOPIC_POLICY_ID_SUFFIX), policy, vec![&topic_ref]).build(stack_builder);
279        }
280
281        stack_builder.add_resource(Topic {
282            id: self.id,
283            resource_id: topic_resource_id,
284            r#type: TopicType::TopicType,
285            properties,
286        });
287        
288        topic_ref
289    }
290}
291
292impl TopicBuilder<FifoState> {
293    pub fn archive_policy(self, archive_policy: ArchivePolicy) -> Self {
294        Self {
295            archive_policy: Some(archive_policy.0.to_string()),
296            ..self
297        }
298    }
299
300    pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoState> {
301        Self {
302            fifo_throughput_scope: Some(scope),
303            ..self
304        }
305    }
306
307    pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoState> {
308        Self {
309            content_based_deduplication: Some(content_based_deduplication),
310            ..self
311        }
312    }
313
314    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
315        self.add_subscription_internal(subscription);
316
317        TopicBuilder {
318            id: self.id,
319            state: Default::default(),
320            topic_name: self.topic_name,
321            content_based_deduplication: self.content_based_deduplication,
322            fifo_throughput_scope: self.fifo_throughput_scope,
323            topic_policy_doc: self.topic_policy_doc,
324            lambda_subscription_ids: self.lambda_subscription_ids,
325            display_name: self.display_name,
326            kms_master_key_id: self.kms_master_key_id,
327            archive_policy: self.archive_policy,
328            tracing_config: self.tracing_config,
329            logging_config: self.logging_config,
330        }
331    }
332
333    /// Builds the FIFO topic and adds it to the stack.
334    ///
335    /// Automatically appends the required ".fifo" suffix to the topic name if not already present.
336    pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
337        if let Some(ref name) = self.topic_name
338            && !name.ends_with(FIFO_SUFFIX) {
339                self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
340            }
341        self.build_internal(true, stack_builder)
342    }
343}
344
345impl TopicBuilder<FifoStateWithSubscriptions> {
346    pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> Self {
347        Self {
348            fifo_throughput_scope: Some(scope),
349            ..self
350        }
351    }
352
353    pub fn content_based_deduplication(self, content_based_deduplication: bool) -> Self {
354        Self {
355            content_based_deduplication: Some(content_based_deduplication),
356            ..self
357        }
358    }
359
360    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
361        self.add_subscription_internal(subscription);
362        self
363    }
364    
365    /// Builds the FIFO topic with subscriptions and adds it to the stack.
366    ///
367    /// Automatically appends the required ".fifo" suffix to the topic name if not already present.
368    /// Creates Lambda permissions for all subscriptions.
369    pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
370        if let Some(ref name) = self.topic_name
371            && !name.ends_with(FIFO_SUFFIX) {
372                self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
373            }
374        self.build_internal(true, stack_builder)
375    }
376}
377
378pub enum Protocol {
379    HTTP,
380    SQS,
381    Lambda,
382    Firehose,
383    Application,
384}
385
386impl From<Protocol> for String {
387    fn from(value: Protocol) -> String {
388        match value {
389            Protocol::HTTP => "http".to_string(),
390            Protocol::SQS => "sqs".to_string(),
391            Protocol::Lambda => "lambda".to_string(),
392            Protocol::Firehose => "firehose".to_string(),
393            Protocol::Application => "application".to_string(),
394        }
395    }
396}
397
398pub struct LoggingConfigBuilder {
399    protocol: String,
400    success_feedback_sample_rate: Option<u8>,
401    failure_feedback_role: Option<Value>,
402    success_feedback_role: Option<Value>,
403}
404
405impl LoggingConfigBuilder {
406    pub fn new(protocol: Protocol) -> Self {
407        Self {
408            protocol: protocol.into(),
409            success_feedback_sample_rate: None,
410            failure_feedback_role: None,
411            success_feedback_role: None,
412        }
413    }
414    
415    pub fn success_feedback_role(self, role: &RoleRef) -> Self {
416        Self {
417            success_feedback_role: Some(role.get_arn()),
418            ..self
419        }
420    }
421    
422    pub fn failure_feedback_role(self, role: &RoleRef) -> Self {
423        Self {
424            failure_feedback_role: Some(role.get_arn()),
425            ..self
426        }
427    }
428    
429    pub fn success_feedback_sample_rate(self, success_feedback_sample_rate: SuccessFeedbackSampleRate) -> Self {
430        Self {
431            success_feedback_sample_rate: Some(success_feedback_sample_rate.0),
432            ..self
433        }
434    }
435    
436    pub fn build(self) -> LoggingConfig {
437        LoggingConfig {
438            failure_feedback_role_arn: self.failure_feedback_role,
439            protocol: self.protocol,
440            success_feedback_role_arn: self.success_feedback_role,
441            success_feedback_sample_rate: self.success_feedback_sample_rate,
442        }
443    }
444}
445
446pub(crate) struct TopicPolicyBuilder {
447    id: Id,
448    doc: PolicyDocument,
449    topics: Vec<Value>
450}
451
452impl TopicPolicyBuilder {
453    /// Use the `topic_policy` method of `TopicBuilder` to add a topic policy to a topic
454    pub(crate) fn new(id: Id, doc: PolicyDocument, topics: Vec<&TopicRef>) -> Self {
455        Self::new_with_values(id, doc, topics.into_iter().map(|v| v.get_ref()).collect())
456    }
457    
458    pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, topics: Vec<Value>) -> Self {
459        Self {
460            id,
461            doc,
462            topics,
463        }
464    }
465    
466    pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> TopicPolicyRef {
467        let resource_id = Resource::generate_id("TopicPolicy");
468        stack_builder.add_resource(TopicPolicy {
469            id: self.id.clone(),
470            resource_id: resource_id.clone(),
471            r#type: TopicPolicyType::TopicPolicyType,
472            properties: TopicPolicyProperties {
473                doc: self.doc,
474                topics: self.topics,
475            },
476        });
477        
478        TopicPolicyRef::internal_new(self.id, resource_id)
479    }
480}
481