rusty_cdk_core/sns/
builder.rs

1use crate::iam::PolicyDocument;
2use crate::intrinsic::{get_arn, get_ref};
3use crate::lambda::{FunctionRef, PermissionBuilder};
4use crate::shared::{Id, TOPIC_POLICY_ID_SUFFIX};
5use crate::sns::{SnsSubscriptionProperties, Subscription, Topic, TopicPolicy, TopicPolicyProperties, TopicPolicyRef, TopicProperties, TopicRef};
6use crate::stack::{Resource, StackBuilder};
7use crate::type_state;
8use crate::wrappers::{LambdaPermissionAction, StringWithOnlyAlphaNumericsUnderscoresAndHyphens};
9use serde_json::Value;
10use std::marker::PhantomData;
11
12const FIFO_SUFFIX: &str = ".fifo";
13
14#[derive(Debug, Clone)]
15pub enum FifoThroughputScope {
16    Topic,
17    MessageGroup
18}
19
20pub enum SubscriptionType<'a> {
21    Lambda(&'a FunctionRef)
22}
23
24impl From<FifoThroughputScope> for String {
25    fn from(value: FifoThroughputScope) -> Self {
26        match value {
27            FifoThroughputScope::Topic => "Topic".to_string(),
28            FifoThroughputScope::MessageGroup => "MessageGroup".to_string(),
29        }
30    }
31}
32
33type_state!(
34    TopicBuilderState,
35    StartState,
36    StandardStateWithSubscriptions,
37    FifoState,
38    FifoStateWithSubscriptions,
39);
40
41/// Builder for SNS topics.
42///
43/// Supports both standard and FIFO topics with Lambda subscriptions.
44/// FIFO topics have additional configuration for deduplication and throughput.
45///
46/// # Example
47///
48/// ```rust,no_run
49/// use rusty_cdk_core::stack::StackBuilder;
50/// use rusty_cdk_core::sns::{TopicBuilder, SubscriptionType};
51/// # use rusty_cdk_core::lambda::{FunctionBuilder, Architecture, Runtime, Zip};
52/// # use rusty_cdk_core::wrappers::*;
53/// # use rusty_cdk_macros::{memory, timeout, zip_file};
54///
55/// let mut stack_builder = StackBuilder::new();
56///
57/// // Create a simple topic without subscriptions
58/// let simple_topic = TopicBuilder::new("simple-topic")
59///     .build(&mut stack_builder);
60/// 
61/// let function = unimplemented!("create a function");
62///
63/// // Create a topic with a Lambda subscription
64/// let topic = TopicBuilder::new("my-topic")
65///     .add_subscription(SubscriptionType::Lambda(&function))
66///     .build(&mut stack_builder);
67///
68/// ```
69pub struct TopicBuilder<T: TopicBuilderState> {
70    state: PhantomData<T>,
71    id: Id,
72    topic_name: Option<String>,
73    content_based_deduplication: Option<bool>,
74    fifo_throughput_scope: Option<FifoThroughputScope>,
75    topic_policy_doc: Option<PolicyDocument>,
76    lambda_subscription_ids: Vec<(Id, String)>,
77}
78
79impl TopicBuilder<StartState> {
80    /// Creates a new SNS topic builder.
81    ///
82    /// # Arguments
83    /// * `id` - Unique identifier for the topic
84    pub fn new(id: &str) -> Self {
85        Self {
86            state: Default::default(),
87            id: Id(id.to_string()),
88            topic_name: None,
89            content_based_deduplication: None,
90            fifo_throughput_scope: None,
91            topic_policy_doc: None,
92            lambda_subscription_ids: vec![],
93        }
94    }
95
96    /// Adds a subscription to the topic.
97    ///
98    /// For Lambda subscriptions, automatically creates the necessary permission.
99    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
100        self.add_subscription_internal(subscription);
101
102        TopicBuilder {
103            state: Default::default(),
104            id: self.id,
105            topic_name: self.topic_name,
106            content_based_deduplication: self.content_based_deduplication,
107            fifo_throughput_scope: self.fifo_throughput_scope,
108            topic_policy_doc: self.topic_policy_doc,
109            lambda_subscription_ids: self.lambda_subscription_ids,
110        }
111    }
112
113    pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
114        self.build_internal(false, stack_builder)
115    }
116}
117
118impl TopicBuilder<StandardStateWithSubscriptions> {
119    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<StandardStateWithSubscriptions> {
120        self.add_subscription_internal(subscription);
121
122        TopicBuilder {
123            state: Default::default(),
124            id: self.id,
125            topic_name: self.topic_name,
126            content_based_deduplication: self.content_based_deduplication,
127            fifo_throughput_scope: self.fifo_throughput_scope,
128            topic_policy_doc: self.topic_policy_doc,
129            lambda_subscription_ids: self.lambda_subscription_ids,
130        }
131    }
132
133    pub fn build(self, stack_builder: &mut StackBuilder) -> TopicRef {
134        self.build_internal(false, stack_builder)
135    }
136}
137
138impl<T: TopicBuilderState> TopicBuilder<T> {
139    pub fn topic_name(self, topic_name: StringWithOnlyAlphaNumericsUnderscoresAndHyphens) -> TopicBuilder<T> {
140        TopicBuilder {
141            topic_name: Some(topic_name.0),
142            id: self.id,
143            state: Default::default(),
144            content_based_deduplication: self.content_based_deduplication,
145            fifo_throughput_scope: self.fifo_throughput_scope,
146            topic_policy_doc: self.topic_policy_doc,
147            lambda_subscription_ids: self.lambda_subscription_ids,
148        }
149    }
150
151    pub fn fifo(self) -> TopicBuilder<FifoState> {
152        TopicBuilder {
153            state: Default::default(),
154            id: self.id,
155            topic_name: self.topic_name,
156            content_based_deduplication: self.content_based_deduplication,
157            fifo_throughput_scope: self.fifo_throughput_scope,
158            topic_policy_doc: self.topic_policy_doc,
159            lambda_subscription_ids: self.lambda_subscription_ids,
160        }
161    }
162
163    /// Adds an SNS Topic Policy for this topic.
164    /// The code will automatically set the `resources` section of the `PolicyDocument` to the ARN of this queue, so there's no need to pass that in.
165    pub fn topic_policy(self, doc: PolicyDocument) -> TopicBuilder<T> {
166        TopicBuilder {
167            topic_policy_doc: Some(doc),
168            topic_name: self.topic_name,
169            id: self.id,
170            state: Default::default(),
171            content_based_deduplication: self.content_based_deduplication,
172            fifo_throughput_scope: self.fifo_throughput_scope,
173            lambda_subscription_ids: self.lambda_subscription_ids,
174        }
175    }
176    
177    fn add_subscription_internal(&mut self, subscription: SubscriptionType) {
178        match subscription {
179            SubscriptionType::Lambda(l) => self.lambda_subscription_ids.push((l.get_id().clone(), l.get_resource_id().to_string()))
180        };
181    }
182    
183    fn build_internal(self, fifo: bool, stack_builder: &mut StackBuilder) -> TopicRef {
184        let topic_resource_id = Resource::generate_id("SnsTopic");
185        
186        self.lambda_subscription_ids.iter().for_each(|(to_subscribe_id, to_subscribe_resource_id)| {
187            let subscription_id = Id::combine_ids(&self.id, to_subscribe_id);
188            let subscription_resource_id = Resource::generate_id("SnsSubscription");
189            
190            PermissionBuilder::new(&Id::generate_id(&subscription_id, "Permission"), LambdaPermissionAction("lambda:InvokeFunction".to_string()), get_arn(to_subscribe_resource_id), "sns.amazonaws.com")
191                .source_arn(get_ref(&topic_resource_id))
192                .build(stack_builder);
193
194            let subscription = Subscription {
195                id: subscription_id,
196                resource_id: subscription_resource_id,
197                r#type: "AWS::SNS::Subscription".to_string(),
198                properties: SnsSubscriptionProperties {
199                    protocol: "lambda".to_string(),
200                    endpoint: get_arn(to_subscribe_resource_id),
201                    topic_arn: get_ref(&topic_resource_id),
202                },
203            };
204
205            stack_builder.add_resource(subscription);
206        });
207        
208        let properties = TopicProperties {
209            topic_name: self.topic_name,
210            fifo_topic: Some(fifo),
211            content_based_deduplication: self.content_based_deduplication,
212            fifo_throughput_scope: self.fifo_throughput_scope.map(Into::into),
213        };
214
215        let topic_ref = TopicRef::new(self.id.clone(), topic_resource_id.to_string());
216        
217        if let Some(mut policy) = self.topic_policy_doc {
218            for statement in &mut policy.statements {
219                // point the statements of this policy to the queue
220                statement.resource = Some(vec![topic_ref.get_ref()]);
221            }
222            TopicPolicyBuilder::new(Id::generate_id(&self.id, TOPIC_POLICY_ID_SUFFIX), policy, vec![&topic_ref]).build(stack_builder);
223        }
224
225        stack_builder.add_resource(Topic {
226            id: self.id,
227            resource_id: topic_resource_id,
228            r#type: "AWS::SNS::Topic".to_string(),
229            properties,
230        });
231        
232        topic_ref
233    }
234}
235
236impl TopicBuilder<FifoState> {
237    pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoState> {
238        Self {
239            fifo_throughput_scope: Some(scope),
240            ..self
241        }
242    }
243
244    pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoState> {
245        Self {
246            content_based_deduplication: Some(content_based_deduplication),
247            ..self
248        }
249    }
250
251    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
252        self.add_subscription_internal(subscription);
253
254        TopicBuilder {
255            state: Default::default(),
256            id: self.id,
257            topic_name: self.topic_name,
258            content_based_deduplication: self.content_based_deduplication,
259            fifo_throughput_scope: self.fifo_throughput_scope,
260            topic_policy_doc: self.topic_policy_doc,
261            lambda_subscription_ids: self.lambda_subscription_ids,
262        }
263    }
264
265    /// Builds the FIFO topic and adds it to the stack.
266    ///
267    /// Automatically appends the required ".fifo" suffix to the topic name if not already present.
268    pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
269        if let Some(ref name) = self.topic_name
270            && !name.ends_with(FIFO_SUFFIX) {
271                self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
272            }
273        self.build_internal(true, stack_builder)
274    }
275}
276
277impl TopicBuilder<FifoStateWithSubscriptions> {
278    pub fn fifo_throughput_scope(self, scope: FifoThroughputScope) -> TopicBuilder<FifoStateWithSubscriptions> {
279        Self {
280            fifo_throughput_scope: Some(scope),
281            ..self
282        }
283    }
284
285    pub fn content_based_deduplication(self, content_based_deduplication: bool) -> TopicBuilder<FifoStateWithSubscriptions> {
286        Self {
287            content_based_deduplication: Some(content_based_deduplication),
288            ..self
289        }
290    }
291
292    pub fn add_subscription(mut self, subscription: SubscriptionType) -> TopicBuilder<FifoStateWithSubscriptions> {
293        self.add_subscription_internal(subscription);
294
295        TopicBuilder {
296            state: Default::default(),
297            id: self.id,
298            topic_name: self.topic_name,
299            content_based_deduplication: self.content_based_deduplication,
300            fifo_throughput_scope: self.fifo_throughput_scope,
301            topic_policy_doc: self.topic_policy_doc,
302            lambda_subscription_ids: self.lambda_subscription_ids,
303        }
304    }
305    
306    /// Builds the FIFO topic with subscriptions and adds it to the stack.
307    ///
308    /// Automatically appends the required ".fifo" suffix to the topic name if not already present.
309    /// Creates Lambda permissions for all subscriptions.
310    pub fn build(mut self, stack_builder: &mut StackBuilder) -> TopicRef {
311        if let Some(ref name) = self.topic_name
312            && !name.ends_with(FIFO_SUFFIX) {
313                self.topic_name = Some(format!("{}{}", name, FIFO_SUFFIX));
314            }
315        self.build_internal(true, stack_builder)
316    }
317}
318
319pub(crate) struct TopicPolicyBuilder {
320    id: Id,
321    doc: PolicyDocument,
322    topics: Vec<Value>
323}
324
325impl TopicPolicyBuilder {
326    /// Use the `topic_policy` method of `TopicBuilder` to add a topic policy to a topic
327    pub(crate) fn new(id: Id, doc: PolicyDocument, topics: Vec<&TopicRef>) -> Self {
328        Self::new_with_values(id, doc, topics.into_iter().map(|v| v.get_ref()).collect())
329    }
330    
331    pub(crate) fn new_with_values(id: Id, doc: PolicyDocument, topics: Vec<Value>) -> Self {
332        Self {
333            id,
334            doc,
335            topics,
336        }
337    }
338    
339    pub(crate) fn build(self, stack_builder: &mut StackBuilder) -> TopicPolicyRef {
340        let resource_id = Resource::generate_id("TopicPolicy");
341        stack_builder.add_resource(TopicPolicy {
342            id: self.id.clone(),
343            resource_id: resource_id.clone(),
344            r#type: "AWS::SNS::TopicPolicy".to_string(),
345            properties: TopicPolicyProperties {
346                doc: self.doc,
347                topics: self.topics,
348            },
349        });
350        
351        TopicPolicyRef::new(self.id, resource_id)
352    }
353}
354